208 lines
7.5 KiB
Python
208 lines
7.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Tuple
|
|
|
|
import numpy as np
|
|
|
|
|
|
def _safe_float(v: Any, default: float) -> float:
|
|
try:
|
|
return float(v)
|
|
except Exception:
|
|
return float(default)
|
|
|
|
|
|
def _safe_int(v: Any, default: int) -> int:
|
|
try:
|
|
return int(v)
|
|
except Exception:
|
|
return int(default)
|
|
|
|
|
|
def _as_bool(v: Any, default: bool) -> bool:
|
|
if v is None:
|
|
return default
|
|
if isinstance(v, bool):
|
|
return v
|
|
if isinstance(v, (int, float)):
|
|
return bool(v)
|
|
return str(v).strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
|
|
@dataclass
|
|
class NavigationExportConfig:
|
|
enabled: bool = True
|
|
min_points: int = 400
|
|
resolution_m: float = 0.05
|
|
z_min_m: float = 0.05 # 5 cm above floor — avoids ground returns, captures low obstacles
|
|
z_max_m: float = 1.40 # 1.4 m — covers G1 Edu body (1.25 m) with margin
|
|
inflation_radius_m: float = 0.20
|
|
padding_m: float = 0.40
|
|
|
|
@staticmethod
|
|
def from_dict(d: Dict[str, Any] | None) -> "NavigationExportConfig":
|
|
src = d or {}
|
|
return NavigationExportConfig(
|
|
enabled=_as_bool(src.get("enabled", True), True),
|
|
min_points=max(50, _safe_int(src.get("min_points", 400), 400)),
|
|
resolution_m=max(0.01, _safe_float(src.get("resolution_m", 0.05), 0.05)),
|
|
z_min_m=_safe_float(src.get("z_min_m", -0.40), -0.40),
|
|
z_max_m=_safe_float(src.get("z_max_m", 1.20), 1.20),
|
|
inflation_radius_m=max(0.0, _safe_float(src.get("inflation_radius_m", 0.20), 0.20)),
|
|
padding_m=max(0.0, _safe_float(src.get("padding_m", 0.40), 0.40)),
|
|
)
|
|
|
|
def patched(self, patch: Dict[str, Any] | None) -> "NavigationExportConfig":
|
|
src = patch or {}
|
|
cur = self
|
|
return NavigationExportConfig(
|
|
enabled=_as_bool(src.get("enabled", cur.enabled), cur.enabled),
|
|
min_points=max(50, _safe_int(src.get("min_points", cur.min_points), cur.min_points)),
|
|
resolution_m=max(0.01, _safe_float(src.get("resolution_m", cur.resolution_m), cur.resolution_m)),
|
|
z_min_m=_safe_float(src.get("z_min_m", cur.z_min_m), cur.z_min_m),
|
|
z_max_m=_safe_float(src.get("z_max_m", cur.z_max_m), cur.z_max_m),
|
|
inflation_radius_m=max(
|
|
0.0,
|
|
_safe_float(src.get("inflation_radius_m", cur.inflation_radius_m), cur.inflation_radius_m),
|
|
),
|
|
padding_m=max(0.0, _safe_float(src.get("padding_m", cur.padding_m), cur.padding_m)),
|
|
)
|
|
|
|
|
|
def _inflate_binary(mask: np.ndarray, radius_cells: int) -> np.ndarray:
|
|
if radius_cells <= 0:
|
|
return mask.copy()
|
|
h, w = mask.shape
|
|
out = mask.copy()
|
|
offsets = []
|
|
rr = radius_cells * radius_cells
|
|
for dy in range(-radius_cells, radius_cells + 1):
|
|
for dx in range(-radius_cells, radius_cells + 1):
|
|
if (dy * dy + dx * dx) <= rr:
|
|
offsets.append((dy, dx))
|
|
for dy, dx in offsets:
|
|
ys = max(0, -dy)
|
|
ye = min(h, h - dy)
|
|
xs = max(0, -dx)
|
|
xe = min(w, w - dx)
|
|
yd = max(0, dy)
|
|
xd = max(0, dx)
|
|
out[yd : yd + (ye - ys), xd : xd + (xe - xs)] |= mask[ys:ye, xs:xe]
|
|
return out
|
|
|
|
|
|
def _sanitize_basename(name: str) -> str:
|
|
base = (name or "").strip() or "map"
|
|
for bad in ("/", "\\", ":", "*", "?", "\"", "<", ">", "|"):
|
|
base = base.replace(bad, "_")
|
|
return base
|
|
|
|
|
|
class NavigationExporter:
|
|
def __init__(self, cfg: NavigationExportConfig, data_folder: str):
|
|
self.cfg = cfg
|
|
self.data_folder = Path(data_folder)
|
|
self.data_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
def update_config(self, cfg: NavigationExportConfig) -> None:
|
|
self.cfg = cfg
|
|
|
|
def _filter_nav_points(self, points: np.ndarray) -> np.ndarray:
|
|
pts = np.asarray(points, dtype=np.float32)
|
|
if pts.ndim != 2 or pts.shape[1] < 3:
|
|
raise RuntimeError("Invalid point array for navigation export.")
|
|
zmin, zmax = float(self.cfg.z_min_m), float(self.cfg.z_max_m)
|
|
m = (pts[:, 2] >= zmin) & (pts[:, 2] <= zmax)
|
|
return pts[m]
|
|
|
|
def count_nav_points(self, points: np.ndarray) -> int:
|
|
try:
|
|
return int(len(self._filter_nav_points(points)))
|
|
except Exception:
|
|
return 0
|
|
|
|
def _build_grid(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
pts = self._filter_nav_points(points)
|
|
if len(pts) < int(self.cfg.min_points):
|
|
raise RuntimeError(f"Not enough points for nav export ({len(pts)}).")
|
|
|
|
res = float(self.cfg.resolution_m)
|
|
pad = float(self.cfg.padding_m)
|
|
|
|
min_xy = pts[:, :2].min(axis=0) - pad
|
|
max_xy = pts[:, :2].max(axis=0) + pad
|
|
span = np.maximum(max_xy - min_xy, res)
|
|
w = int(np.ceil(span[0] / res)) + 1
|
|
h = int(np.ceil(span[1] / res)) + 1
|
|
|
|
occ = np.zeros((h, w), dtype=bool)
|
|
gx = np.clip(((pts[:, 0] - min_xy[0]) / res).astype(np.int32), 0, w - 1)
|
|
gy = np.clip(((pts[:, 1] - min_xy[1]) / res).astype(np.int32), 0, h - 1)
|
|
occ[gy, gx] = True
|
|
|
|
rad_cells = int(np.ceil(float(self.cfg.inflation_radius_m) / res))
|
|
inf = _inflate_binary(occ, rad_cells)
|
|
|
|
return occ, inf, min_xy
|
|
|
|
def export(self, base_name: str, points: np.ndarray) -> Dict[str, Any]:
|
|
if not self.cfg.enabled:
|
|
raise RuntimeError("Navigation export is disabled in config.")
|
|
|
|
occ, inf, min_xy = self._build_grid(points)
|
|
base = _sanitize_basename(base_name)
|
|
folder = self.data_folder
|
|
|
|
pgm_name = f"{base}_nav.pgm"
|
|
yaml_name = f"{base}_nav.yaml"
|
|
cost_name = f"{base}_costmap.npy"
|
|
n = 1
|
|
while (folder / pgm_name).exists() or (folder / yaml_name).exists() or (folder / cost_name).exists():
|
|
pgm_name = f"{base}_nav({n}).pgm"
|
|
yaml_name = f"{base}_nav({n}).yaml"
|
|
cost_name = f"{base}_costmap({n}).npy"
|
|
n += 1
|
|
|
|
# ROS map image: 0=occupied, 254=free
|
|
img = np.where(inf, 0, 254).astype(np.uint8)
|
|
img = np.flipud(img) # image origin top-left, map origin bottom-left
|
|
|
|
pgm_path = folder / pgm_name
|
|
with open(pgm_path, "wb") as f:
|
|
h, w = img.shape
|
|
f.write(f"P5\n{w} {h}\n255\n".encode("ascii"))
|
|
f.write(img.tobytes())
|
|
|
|
yaml_path = folder / yaml_name
|
|
# Write proper YAML — NOT json.dumps (quoted keys break ROS2 Nav2 map_server)
|
|
ox, oy = float(min_xy[0]), float(min_xy[1])
|
|
yaml_lines = [
|
|
f"image: {pgm_name}",
|
|
f"resolution: {float(self.cfg.resolution_m):.6f}",
|
|
f"origin: [{ox:.6f}, {oy:.6f}, 0.000000]",
|
|
"negate: 0",
|
|
"occupied_thresh: 0.65",
|
|
"free_thresh: 0.196",
|
|
"mode: trinary", # required by Nav2 map_server
|
|
]
|
|
yaml_path.write_text("\n".join(yaml_lines) + "\n", encoding="utf-8")
|
|
|
|
cost = np.zeros_like(img, dtype=np.uint8)
|
|
occ_img = np.flipud(occ)
|
|
inf_img = np.flipud(inf)
|
|
cost[inf_img] = 180
|
|
cost[occ_img] = 255
|
|
cost_path = folder / cost_name
|
|
np.save(cost_path, cost)
|
|
|
|
return {
|
|
"pgm": str(pgm_path),
|
|
"yaml": str(yaml_path),
|
|
"costmap": str(cost_path),
|
|
"shape": [int(img.shape[0]), int(img.shape[1])],
|
|
"occupied_cells": int(np.count_nonzero(occ)),
|
|
"inflated_cells": int(np.count_nonzero(inf)),
|
|
}
|