# SLAM_Filter.py from __future__ import annotations import json import os import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict import numpy as np def load_slam_config() -> dict: import json, os from pathlib import Path cfg_path = os.environ.get("SLAM_CONFIG", "").strip() p = Path(cfg_path) if cfg_path else (Path(__file__).resolve().parent / "SLAM_Config.json") if not p.exists(): raise FileNotFoundError(f"Missing config file: {p}") text = p.read_text(encoding="utf-8").strip() if not text: raise RuntimeError("SLAM_Config.json is empty.") return json.loads(text) @dataclass class FilterConfig: voxel_size: float hit_threshold: int decay_seconds: float max_voxels: int @staticmethod def from_config_file() -> "FilterConfig": cfg = load_slam_config() return FilterConfig( voxel_size=float(cfg["filter"]["voxel_size"]), hit_threshold=int(cfg["filter"]["hits_threshold"]), decay_seconds=float(cfg["filter"]["persistence"]["decay_seconds"]), max_voxels=int(cfg["filter"]["persistence"]["max_voxels"]), ) class VoxelPersistenceFilter: """ Voxel hit-count persistence filter. No hardcoded params: everything comes from SLAM_Config.jsonl. """ def __init__(self, cfg: FilterConfig | None = None): self.cfg = cfg if cfg is not None else FilterConfig.from_config_file() self._count: dict[tuple[int, int, int], int] = {} self._last: dict[tuple[int, int, int], float] = {} self._sum: dict[tuple[int, int, int], np.ndarray] = {} self._n: dict[tuple[int, int, int], int] = {} def reset(self): self._count.clear() self._last.clear() self._sum.clear() self._n.clear() def _voxel_keys(self, pts: np.ndarray) -> np.ndarray: vs = float(self.cfg.voxel_size) return np.floor(pts / vs).astype(np.int32) def update(self, points_world: np.ndarray, now: float | None = None) -> None: if points_world is None or len(points_world) == 0: return now = time.time() if now is None else float(now) keys = self._voxel_keys(points_world) uniq, inv = np.unique(keys, axis=0, return_inverse=True) for i, k in enumerate(uniq): k_t = (int(k[0]), int(k[1]), int(k[2])) idx = np.where(inv == i)[0] mean = points_world[idx].mean(axis=0) self._count[k_t] = self._count.get(k_t, 0) + 1 self._last[k_t] = now if k_t not in self._sum: self._sum[k_t] = mean.copy() self._n[k_t] = 1 else: self._sum[k_t] += mean self._n[k_t] += 1 self._decay(now) if len(self._count) > int(self.cfg.max_voxels): self._aggressive_prune() def _decay(self, now: float) -> None: ttl = float(self.cfg.decay_seconds) to_del = [] for k, t_last in self._last.items(): if (now - t_last) > ttl: to_del.append(k) for k in to_del: self._count.pop(k, None) self._last.pop(k, None) self._sum.pop(k, None) self._n.pop(k, None) def _aggressive_prune(self) -> None: items = sorted(self._count.items(), key=lambda kv: kv[1], reverse=True) keep_n = int(int(self.cfg.max_voxels) * 0.8) keep = set(k for k, _ in items[:keep_n]) for k in list(self._count.keys()): if k not in keep: self._count.pop(k, None) self._last.pop(k, None) self._sum.pop(k, None) self._n.pop(k, None) def get_stable_points(self) -> np.ndarray: thr = int(self.cfg.hit_threshold) pts = [] for k, c in self._count.items(): if c >= thr: s = self._sum.get(k) n = self._n.get(k, 1) if s is not None: pts.append(s / max(n, 1)) if not pts: return np.zeros((0, 3), dtype=np.float32) return np.asarray(pts, dtype=np.float32) def seed_stable_points( self, points_world: np.ndarray, now: float | None = None, hit_count: int | None = None, clear_existing: bool = True, ) -> None: """ Seed filter state from already-stable world-frame points. Useful after global pose correction (loop-closure optimization). """ if points_world is None or len(points_world) == 0: if clear_existing: self.reset() return if clear_existing: self.reset() now_v = time.time() if now is None else float(now) hc = int(hit_count if hit_count is not None else self.cfg.hit_threshold) hc = max(1, hc) pts = np.asarray(points_world, dtype=np.float32) keys = self._voxel_keys(pts) uniq, inv = np.unique(keys, axis=0, return_inverse=True) for i, k in enumerate(uniq): k_t = (int(k[0]), int(k[1]), int(k[2])) idx = np.where(inv == i)[0] mean = pts[idx].mean(axis=0) self._count[k_t] = max(self._count.get(k_t, 0), hc) self._last[k_t] = now_v self._sum[k_t] = mean.copy() self._n[k_t] = 1 self._decay(now_v) if len(self._count) > int(self.cfg.max_voxels): self._aggressive_prune() def stats(self) -> dict: thr = int(self.cfg.hit_threshold) stable = sum(1 for c in self._count.values() if c >= thr) return { "voxels_total": len(self._count), "voxels_stable": stable, "hit_threshold": thr, "voxel_size": float(self.cfg.voxel_size), "decay_seconds": float(self.cfg.decay_seconds), "max_voxels": int(self.cfg.max_voxels), } @dataclass class IndoorMapQualityConfig: enabled: bool = True near_min_range_m: float = 0.15 ray_consistency_enabled: bool = True ray_bin_deg: float = 1.0 ray_elev_bin_deg: float = 4.0 ray_max_behind_m: float = 0.25 ray_keep_n: int = 1 world_z_clip_enabled: bool = False world_z_min_m: float = -2.0 world_z_max_m: float = 3.0 outlier_filter_enabled: bool = False outlier_voxel_m: float = 0.12 outlier_min_points: int = 2 # Robot body exclusion box (sensor frame) — removes self-returns from robot body. # Tuned for Unitree G1 Edu with chest-mounted Livox MID-360. body_exclusion_enabled: bool = False body_x_min_m: float = -0.20 body_x_max_m: float = 0.35 body_y_min_m: float = -0.25 body_y_max_m: float = 0.25 body_z_min_m: float = -1.30 body_z_max_m: float = 0.15 @staticmethod def from_dict(d: Dict[str, Any] | None) -> "IndoorMapQualityConfig": src = d or {} return IndoorMapQualityConfig( enabled=bool(src.get("enabled", True)), near_min_range_m=max(0.0, float(src.get("near_min_range_m", 0.15))), ray_consistency_enabled=bool(src.get("ray_consistency_enabled", True)), ray_bin_deg=max(0.2, float(src.get("ray_bin_deg", 1.0))), ray_elev_bin_deg=max(0.0, float(src.get("ray_elev_bin_deg", 4.0))), ray_max_behind_m=max(0.0, float(src.get("ray_max_behind_m", 0.25))), ray_keep_n=max(1, int(src.get("ray_keep_n", 1))), world_z_clip_enabled=bool(src.get("world_z_clip_enabled", False)), world_z_min_m=float(src.get("world_z_min_m", -2.0)), world_z_max_m=float(src.get("world_z_max_m", 3.0)), outlier_filter_enabled=bool(src.get("outlier_filter_enabled", False)), outlier_voxel_m=max(0.02, float(src.get("outlier_voxel_m", 0.12))), outlier_min_points=max(1, int(src.get("outlier_min_points", 2))), body_exclusion_enabled=bool(src.get("body_exclusion_enabled", False)), body_x_min_m=float(src.get("body_x_min_m", -0.20)), body_x_max_m=float(src.get("body_x_max_m", 0.35)), body_y_min_m=float(src.get("body_y_min_m", -0.25)), body_y_max_m=float(src.get("body_y_max_m", 0.25)), body_z_min_m=float(src.get("body_z_min_m", -1.30)), body_z_max_m=float(src.get("body_z_max_m", 0.15)), ) class IndoorMapQualityFilter: """ Lightweight indoor map-quality filters intended for real-time use. """ def __init__(self, cfg: IndoorMapQualityConfig): self.cfg = cfg @staticmethod def _finite(points: np.ndarray) -> np.ndarray: if points is None or len(points) == 0: return np.zeros((0, 3), dtype=np.float32) pts = np.asarray(points, dtype=np.float32) m = np.isfinite(pts).all(axis=1) return pts[m] def _remove_sparse_voxels(self, points: np.ndarray) -> np.ndarray: if points is None or len(points) == 0: return np.zeros((0, 3), dtype=np.float32) voxel = float(self.cfg.outlier_voxel_m) keys = np.floor(points / voxel).astype(np.int32) uniq, inv, counts = np.unique(keys, axis=0, return_inverse=True, return_counts=True) del uniq # only counts/inv are needed keep = counts[inv] >= int(self.cfg.outlier_min_points) return points[keep] def _ray_consistency_filter(self, points: np.ndarray) -> np.ndarray: """ Keep nearest returns per azimuth bin (+small behind margin) to suppress echoes and duplicated points that appear behind wall lines. Uses a vectorized fast path for keep_n=1 to keep CPU usage low. """ if points is None or len(points) == 0: return np.zeros((0, 3), dtype=np.float32) if not self.cfg.ray_consistency_enabled: return points pts = np.asarray(points, dtype=np.float32) if len(pts) < 80: return pts xy = pts[:, :2] r = np.linalg.norm(xy, axis=1) valid = np.isfinite(r) & (r > 1e-4) if not np.all(valid): pts = pts[valid] r = r[valid] if len(pts) < 40: return pts bin_rad = np.deg2rad(float(self.cfg.ray_bin_deg)) az = np.arctan2(pts[:, 1], pts[:, 0]) bins_az = np.floor((az + np.pi) / max(1e-6, bin_rad)).astype(np.int32) elev_bin_deg = float(self.cfg.ray_elev_bin_deg) if elev_bin_deg > 0.0: elev_rad = np.deg2rad(max(0.2, elev_bin_deg)) elev = np.arctan2(pts[:, 2], np.maximum(r, 1e-4)) bins_el = np.floor((elev + 0.5 * np.pi) / elev_rad).astype(np.int32) n_az = int(np.max(bins_az)) + 1 if len(bins_az) > 0 else 1 bins = bins_az + (np.maximum(0, bins_el) * max(1, n_az)) else: bins = bins_az margin = float(self.cfg.ray_max_behind_m) keep_n = int(self.cfg.ray_keep_n) n_bins = int(np.max(bins)) + 1 if len(bins) > 0 else 0 if n_bins <= 0: return pts if keep_n <= 1: # Fast vectorized mode: nearest hit per bin + margin. min_r = np.full((n_bins,), np.inf, dtype=np.float32) np.minimum.at(min_r, bins, r.astype(np.float32, copy=False)) keep = r <= (min_r[bins] + margin) else: # Fallback for multi-return mode (rare; higher CPU). order = np.lexsort((r, bins)) bins_s = bins[order] r_s = r[order] keep_sorted = np.zeros((len(order),), dtype=bool) i = 0 n = int(len(order)) while i < n: j = i + 1 b = int(bins_s[i]) while j < n and int(bins_s[j]) == b: j += 1 rr = r_s[i:j] if len(rr) <= keep_n: keep_sorted[i:j] = True else: base = float(rr[min(len(rr) - 1, keep_n - 1)]) cutoff = base + margin local = rr <= cutoff local[:keep_n] = True keep_sorted[i:j] = local i = j keep = np.zeros((len(pts),), dtype=bool) keep[order] = keep_sorted out = pts[keep] # Safety fallback: if filter is too aggressive, keep original frame. if len(out) < max(40, int(0.15 * len(pts))): return pts return out def _exclude_body_box(self, points: np.ndarray) -> np.ndarray: """Remove points inside the robot body bounding box (sensor frame). Prevents G1 arm/torso self-returns from polluting the map. """ inside = ( (points[:, 0] >= self.cfg.body_x_min_m) & (points[:, 0] <= self.cfg.body_x_max_m) & (points[:, 1] >= self.cfg.body_y_min_m) & (points[:, 1] <= self.cfg.body_y_max_m) & (points[:, 2] >= self.cfg.body_z_min_m) & (points[:, 2] <= self.cfg.body_z_max_m) ) return points[~inside] def apply_sensor(self, points_sensor: np.ndarray) -> np.ndarray: pts = self._finite(points_sensor) if not self.cfg.enabled or len(pts) == 0: return pts r2 = np.einsum("ij,ij->i", pts, pts) min_r2 = float(self.cfg.near_min_range_m) ** 2 pts = pts[r2 >= min_r2] if len(pts) == 0: return pts if self.cfg.body_exclusion_enabled: pts = self._exclude_body_box(pts) if len(pts) == 0: return pts pts = self._ray_consistency_filter(pts) if len(pts) == 0: return pts if self.cfg.outlier_filter_enabled: pts = self._remove_sparse_voxels(pts) return pts def apply_world(self, points_world: np.ndarray) -> np.ndarray: pts = self._finite(points_world) if not self.cfg.enabled or len(pts) == 0: return pts if self.cfg.world_z_clip_enabled: zmin = float(self.cfg.world_z_min_m) zmax = float(self.cfg.world_z_max_m) if zmax > zmin: pts = pts[(pts[:, 2] >= zmin) & (pts[:, 2] <= zmax)] return pts