Marcus/Lidar/SLAM_Navigation.py
2026-04-12 18:50:22 +04:00

208 lines
7.5 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Tuple
import numpy as np
def _safe_float(v: Any, default: float) -> float:
try:
return float(v)
except Exception:
return float(default)
def _safe_int(v: Any, default: int) -> int:
try:
return int(v)
except Exception:
return int(default)
def _as_bool(v: Any, default: bool) -> bool:
if v is None:
return default
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return bool(v)
return str(v).strip().lower() in ("1", "true", "yes", "on")
@dataclass
class NavigationExportConfig:
enabled: bool = True
min_points: int = 400
resolution_m: float = 0.05
z_min_m: float = 0.05 # 5 cm above floor — avoids ground returns, captures low obstacles
z_max_m: float = 1.40 # 1.4 m — covers G1 Edu body (1.25 m) with margin
inflation_radius_m: float = 0.20
padding_m: float = 0.40
@staticmethod
def from_dict(d: Dict[str, Any] | None) -> "NavigationExportConfig":
src = d or {}
return NavigationExportConfig(
enabled=_as_bool(src.get("enabled", True), True),
min_points=max(50, _safe_int(src.get("min_points", 400), 400)),
resolution_m=max(0.01, _safe_float(src.get("resolution_m", 0.05), 0.05)),
z_min_m=_safe_float(src.get("z_min_m", -0.40), -0.40),
z_max_m=_safe_float(src.get("z_max_m", 1.20), 1.20),
inflation_radius_m=max(0.0, _safe_float(src.get("inflation_radius_m", 0.20), 0.20)),
padding_m=max(0.0, _safe_float(src.get("padding_m", 0.40), 0.40)),
)
def patched(self, patch: Dict[str, Any] | None) -> "NavigationExportConfig":
src = patch or {}
cur = self
return NavigationExportConfig(
enabled=_as_bool(src.get("enabled", cur.enabled), cur.enabled),
min_points=max(50, _safe_int(src.get("min_points", cur.min_points), cur.min_points)),
resolution_m=max(0.01, _safe_float(src.get("resolution_m", cur.resolution_m), cur.resolution_m)),
z_min_m=_safe_float(src.get("z_min_m", cur.z_min_m), cur.z_min_m),
z_max_m=_safe_float(src.get("z_max_m", cur.z_max_m), cur.z_max_m),
inflation_radius_m=max(
0.0,
_safe_float(src.get("inflation_radius_m", cur.inflation_radius_m), cur.inflation_radius_m),
),
padding_m=max(0.0, _safe_float(src.get("padding_m", cur.padding_m), cur.padding_m)),
)
def _inflate_binary(mask: np.ndarray, radius_cells: int) -> np.ndarray:
if radius_cells <= 0:
return mask.copy()
h, w = mask.shape
out = mask.copy()
offsets = []
rr = radius_cells * radius_cells
for dy in range(-radius_cells, radius_cells + 1):
for dx in range(-radius_cells, radius_cells + 1):
if (dy * dy + dx * dx) <= rr:
offsets.append((dy, dx))
for dy, dx in offsets:
ys = max(0, -dy)
ye = min(h, h - dy)
xs = max(0, -dx)
xe = min(w, w - dx)
yd = max(0, dy)
xd = max(0, dx)
out[yd : yd + (ye - ys), xd : xd + (xe - xs)] |= mask[ys:ye, xs:xe]
return out
def _sanitize_basename(name: str) -> str:
base = (name or "").strip() or "map"
for bad in ("/", "\\", ":", "*", "?", "\"", "<", ">", "|"):
base = base.replace(bad, "_")
return base
class NavigationExporter:
def __init__(self, cfg: NavigationExportConfig, data_folder: str):
self.cfg = cfg
self.data_folder = Path(data_folder)
self.data_folder.mkdir(parents=True, exist_ok=True)
def update_config(self, cfg: NavigationExportConfig) -> None:
self.cfg = cfg
def _filter_nav_points(self, points: np.ndarray) -> np.ndarray:
pts = np.asarray(points, dtype=np.float32)
if pts.ndim != 2 or pts.shape[1] < 3:
raise RuntimeError("Invalid point array for navigation export.")
zmin, zmax = float(self.cfg.z_min_m), float(self.cfg.z_max_m)
m = (pts[:, 2] >= zmin) & (pts[:, 2] <= zmax)
return pts[m]
def count_nav_points(self, points: np.ndarray) -> int:
try:
return int(len(self._filter_nav_points(points)))
except Exception:
return 0
def _build_grid(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
pts = self._filter_nav_points(points)
if len(pts) < int(self.cfg.min_points):
raise RuntimeError(f"Not enough points for nav export ({len(pts)}).")
res = float(self.cfg.resolution_m)
pad = float(self.cfg.padding_m)
min_xy = pts[:, :2].min(axis=0) - pad
max_xy = pts[:, :2].max(axis=0) + pad
span = np.maximum(max_xy - min_xy, res)
w = int(np.ceil(span[0] / res)) + 1
h = int(np.ceil(span[1] / res)) + 1
occ = np.zeros((h, w), dtype=bool)
gx = np.clip(((pts[:, 0] - min_xy[0]) / res).astype(np.int32), 0, w - 1)
gy = np.clip(((pts[:, 1] - min_xy[1]) / res).astype(np.int32), 0, h - 1)
occ[gy, gx] = True
rad_cells = int(np.ceil(float(self.cfg.inflation_radius_m) / res))
inf = _inflate_binary(occ, rad_cells)
return occ, inf, min_xy
def export(self, base_name: str, points: np.ndarray) -> Dict[str, Any]:
if not self.cfg.enabled:
raise RuntimeError("Navigation export is disabled in config.")
occ, inf, min_xy = self._build_grid(points)
base = _sanitize_basename(base_name)
folder = self.data_folder
pgm_name = f"{base}_nav.pgm"
yaml_name = f"{base}_nav.yaml"
cost_name = f"{base}_costmap.npy"
n = 1
while (folder / pgm_name).exists() or (folder / yaml_name).exists() or (folder / cost_name).exists():
pgm_name = f"{base}_nav({n}).pgm"
yaml_name = f"{base}_nav({n}).yaml"
cost_name = f"{base}_costmap({n}).npy"
n += 1
# ROS map image: 0=occupied, 254=free
img = np.where(inf, 0, 254).astype(np.uint8)
img = np.flipud(img) # image origin top-left, map origin bottom-left
pgm_path = folder / pgm_name
with open(pgm_path, "wb") as f:
h, w = img.shape
f.write(f"P5\n{w} {h}\n255\n".encode("ascii"))
f.write(img.tobytes())
yaml_path = folder / yaml_name
# Write proper YAML — NOT json.dumps (quoted keys break ROS2 Nav2 map_server)
ox, oy = float(min_xy[0]), float(min_xy[1])
yaml_lines = [
f"image: {pgm_name}",
f"resolution: {float(self.cfg.resolution_m):.6f}",
f"origin: [{ox:.6f}, {oy:.6f}, 0.000000]",
"negate: 0",
"occupied_thresh: 0.65",
"free_thresh: 0.196",
"mode: trinary", # required by Nav2 map_server
]
yaml_path.write_text("\n".join(yaml_lines) + "\n", encoding="utf-8")
cost = np.zeros_like(img, dtype=np.uint8)
occ_img = np.flipud(occ)
inf_img = np.flipud(inf)
cost[inf_img] = 180
cost[occ_img] = 255
cost_path = folder / cost_name
np.save(cost_path, cost)
return {
"pgm": str(pgm_path),
"yaml": str(yaml_path),
"costmap": str(cost_path),
"shape": [int(img.shape[0]), int(img.shape[1])],
"occupied_cells": int(np.count_nonzero(occ)),
"inflated_cells": int(np.count_nonzero(inf)),
}