from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Tuple import numpy as np def _safe_float(v: Any, default: float) -> float: try: return float(v) except Exception: return float(default) def _safe_int(v: Any, default: int) -> int: try: return int(v) except Exception: return int(default) def _as_bool(v: Any, default: bool) -> bool: if v is None: return default if isinstance(v, bool): return v if isinstance(v, (int, float)): return bool(v) return str(v).strip().lower() in ("1", "true", "yes", "on") @dataclass class NavigationExportConfig: enabled: bool = True min_points: int = 400 resolution_m: float = 0.05 z_min_m: float = 0.05 # 5 cm above floor — avoids ground returns, captures low obstacles z_max_m: float = 1.40 # 1.4 m — covers G1 Edu body (1.25 m) with margin inflation_radius_m: float = 0.20 padding_m: float = 0.40 @staticmethod def from_dict(d: Dict[str, Any] | None) -> "NavigationExportConfig": src = d or {} return NavigationExportConfig( enabled=_as_bool(src.get("enabled", True), True), min_points=max(50, _safe_int(src.get("min_points", 400), 400)), resolution_m=max(0.01, _safe_float(src.get("resolution_m", 0.05), 0.05)), z_min_m=_safe_float(src.get("z_min_m", -0.40), -0.40), z_max_m=_safe_float(src.get("z_max_m", 1.20), 1.20), inflation_radius_m=max(0.0, _safe_float(src.get("inflation_radius_m", 0.20), 0.20)), padding_m=max(0.0, _safe_float(src.get("padding_m", 0.40), 0.40)), ) def patched(self, patch: Dict[str, Any] | None) -> "NavigationExportConfig": src = patch or {} cur = self return NavigationExportConfig( enabled=_as_bool(src.get("enabled", cur.enabled), cur.enabled), min_points=max(50, _safe_int(src.get("min_points", cur.min_points), cur.min_points)), resolution_m=max(0.01, _safe_float(src.get("resolution_m", cur.resolution_m), cur.resolution_m)), z_min_m=_safe_float(src.get("z_min_m", cur.z_min_m), cur.z_min_m), z_max_m=_safe_float(src.get("z_max_m", cur.z_max_m), cur.z_max_m), inflation_radius_m=max( 0.0, _safe_float(src.get("inflation_radius_m", cur.inflation_radius_m), cur.inflation_radius_m), ), padding_m=max(0.0, _safe_float(src.get("padding_m", cur.padding_m), cur.padding_m)), ) def _inflate_binary(mask: np.ndarray, radius_cells: int) -> np.ndarray: if radius_cells <= 0: return mask.copy() h, w = mask.shape out = mask.copy() offsets = [] rr = radius_cells * radius_cells for dy in range(-radius_cells, radius_cells + 1): for dx in range(-radius_cells, radius_cells + 1): if (dy * dy + dx * dx) <= rr: offsets.append((dy, dx)) for dy, dx in offsets: ys = max(0, -dy) ye = min(h, h - dy) xs = max(0, -dx) xe = min(w, w - dx) yd = max(0, dy) xd = max(0, dx) out[yd : yd + (ye - ys), xd : xd + (xe - xs)] |= mask[ys:ye, xs:xe] return out def _sanitize_basename(name: str) -> str: base = (name or "").strip() or "map" for bad in ("/", "\\", ":", "*", "?", "\"", "<", ">", "|"): base = base.replace(bad, "_") return base class NavigationExporter: def __init__(self, cfg: NavigationExportConfig, data_folder: str): self.cfg = cfg self.data_folder = Path(data_folder) self.data_folder.mkdir(parents=True, exist_ok=True) def update_config(self, cfg: NavigationExportConfig) -> None: self.cfg = cfg def _filter_nav_points(self, points: np.ndarray) -> np.ndarray: pts = np.asarray(points, dtype=np.float32) if pts.ndim != 2 or pts.shape[1] < 3: raise RuntimeError("Invalid point array for navigation export.") zmin, zmax = float(self.cfg.z_min_m), float(self.cfg.z_max_m) m = (pts[:, 2] >= zmin) & (pts[:, 2] <= zmax) return pts[m] def count_nav_points(self, points: np.ndarray) -> int: try: return int(len(self._filter_nav_points(points))) except Exception: return 0 def _build_grid(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: pts = self._filter_nav_points(points) if len(pts) < int(self.cfg.min_points): raise RuntimeError(f"Not enough points for nav export ({len(pts)}).") res = float(self.cfg.resolution_m) pad = float(self.cfg.padding_m) min_xy = pts[:, :2].min(axis=0) - pad max_xy = pts[:, :2].max(axis=0) + pad span = np.maximum(max_xy - min_xy, res) w = int(np.ceil(span[0] / res)) + 1 h = int(np.ceil(span[1] / res)) + 1 occ = np.zeros((h, w), dtype=bool) gx = np.clip(((pts[:, 0] - min_xy[0]) / res).astype(np.int32), 0, w - 1) gy = np.clip(((pts[:, 1] - min_xy[1]) / res).astype(np.int32), 0, h - 1) occ[gy, gx] = True rad_cells = int(np.ceil(float(self.cfg.inflation_radius_m) / res)) inf = _inflate_binary(occ, rad_cells) return occ, inf, min_xy def export(self, base_name: str, points: np.ndarray) -> Dict[str, Any]: if not self.cfg.enabled: raise RuntimeError("Navigation export is disabled in config.") occ, inf, min_xy = self._build_grid(points) base = _sanitize_basename(base_name) folder = self.data_folder pgm_name = f"{base}_nav.pgm" yaml_name = f"{base}_nav.yaml" cost_name = f"{base}_costmap.npy" n = 1 while (folder / pgm_name).exists() or (folder / yaml_name).exists() or (folder / cost_name).exists(): pgm_name = f"{base}_nav({n}).pgm" yaml_name = f"{base}_nav({n}).yaml" cost_name = f"{base}_costmap({n}).npy" n += 1 # ROS map image: 0=occupied, 254=free img = np.where(inf, 0, 254).astype(np.uint8) img = np.flipud(img) # image origin top-left, map origin bottom-left pgm_path = folder / pgm_name with open(pgm_path, "wb") as f: h, w = img.shape f.write(f"P5\n{w} {h}\n255\n".encode("ascii")) f.write(img.tobytes()) yaml_path = folder / yaml_name # Write proper YAML — NOT json.dumps (quoted keys break ROS2 Nav2 map_server) ox, oy = float(min_xy[0]), float(min_xy[1]) yaml_lines = [ f"image: {pgm_name}", f"resolution: {float(self.cfg.resolution_m):.6f}", f"origin: [{ox:.6f}, {oy:.6f}, 0.000000]", "negate: 0", "occupied_thresh: 0.65", "free_thresh: 0.196", "mode: trinary", # required by Nav2 map_server ] yaml_path.write_text("\n".join(yaml_lines) + "\n", encoding="utf-8") cost = np.zeros_like(img, dtype=np.uint8) occ_img = np.flipud(occ) inf_img = np.flipud(inf) cost[inf_img] = 180 cost[occ_img] = 255 cost_path = folder / cost_name np.save(cost_path, cost) return { "pgm": str(pgm_path), "yaml": str(yaml_path), "costmap": str(cost_path), "shape": [int(img.shape[0]), int(img.shape[1])], "occupied_cells": int(np.count_nonzero(occ)), "inflated_cells": int(np.count_nonzero(inf)), }