Marcus/Lidar/SLAM_Filter.py
2026-04-12 18:50:22 +04:00

393 lines
14 KiB
Python

# SLAM_Filter.py
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict
import numpy as np
def load_slam_config() -> dict:
import json, os
from pathlib import Path
cfg_path = os.environ.get("SLAM_CONFIG", "").strip()
p = Path(cfg_path) if cfg_path else (Path(__file__).resolve().parent / "SLAM_Config.json")
if not p.exists():
raise FileNotFoundError(f"Missing config file: {p}")
text = p.read_text(encoding="utf-8").strip()
if not text:
raise RuntimeError("SLAM_Config.json is empty.")
return json.loads(text)
@dataclass
class FilterConfig:
voxel_size: float
hit_threshold: int
decay_seconds: float
max_voxels: int
@staticmethod
def from_config_file() -> "FilterConfig":
cfg = load_slam_config()
return FilterConfig(
voxel_size=float(cfg["filter"]["voxel_size"]),
hit_threshold=int(cfg["filter"]["hits_threshold"]),
decay_seconds=float(cfg["filter"]["persistence"]["decay_seconds"]),
max_voxels=int(cfg["filter"]["persistence"]["max_voxels"]),
)
class VoxelPersistenceFilter:
"""
Voxel hit-count persistence filter.
No hardcoded params: everything comes from SLAM_Config.jsonl.
"""
def __init__(self, cfg: FilterConfig | None = None):
self.cfg = cfg if cfg is not None else FilterConfig.from_config_file()
self._count: dict[tuple[int, int, int], int] = {}
self._last: dict[tuple[int, int, int], float] = {}
self._sum: dict[tuple[int, int, int], np.ndarray] = {}
self._n: dict[tuple[int, int, int], int] = {}
def reset(self):
self._count.clear()
self._last.clear()
self._sum.clear()
self._n.clear()
def _voxel_keys(self, pts: np.ndarray) -> np.ndarray:
vs = float(self.cfg.voxel_size)
return np.floor(pts / vs).astype(np.int32)
def update(self, points_world: np.ndarray, now: float | None = None) -> None:
if points_world is None or len(points_world) == 0:
return
now = time.time() if now is None else float(now)
keys = self._voxel_keys(points_world)
uniq, inv = np.unique(keys, axis=0, return_inverse=True)
for i, k in enumerate(uniq):
k_t = (int(k[0]), int(k[1]), int(k[2]))
idx = np.where(inv == i)[0]
mean = points_world[idx].mean(axis=0)
self._count[k_t] = self._count.get(k_t, 0) + 1
self._last[k_t] = now
if k_t not in self._sum:
self._sum[k_t] = mean.copy()
self._n[k_t] = 1
else:
self._sum[k_t] += mean
self._n[k_t] += 1
self._decay(now)
if len(self._count) > int(self.cfg.max_voxels):
self._aggressive_prune()
def _decay(self, now: float) -> None:
ttl = float(self.cfg.decay_seconds)
to_del = []
for k, t_last in self._last.items():
if (now - t_last) > ttl:
to_del.append(k)
for k in to_del:
self._count.pop(k, None)
self._last.pop(k, None)
self._sum.pop(k, None)
self._n.pop(k, None)
def _aggressive_prune(self) -> None:
items = sorted(self._count.items(), key=lambda kv: kv[1], reverse=True)
keep_n = int(int(self.cfg.max_voxels) * 0.8)
keep = set(k for k, _ in items[:keep_n])
for k in list(self._count.keys()):
if k not in keep:
self._count.pop(k, None)
self._last.pop(k, None)
self._sum.pop(k, None)
self._n.pop(k, None)
def get_stable_points(self) -> np.ndarray:
thr = int(self.cfg.hit_threshold)
pts = []
for k, c in self._count.items():
if c >= thr:
s = self._sum.get(k)
n = self._n.get(k, 1)
if s is not None:
pts.append(s / max(n, 1))
if not pts:
return np.zeros((0, 3), dtype=np.float32)
return np.asarray(pts, dtype=np.float32)
def seed_stable_points(
self,
points_world: np.ndarray,
now: float | None = None,
hit_count: int | None = None,
clear_existing: bool = True,
) -> None:
"""
Seed filter state from already-stable world-frame points.
Useful after global pose correction (loop-closure optimization).
"""
if points_world is None or len(points_world) == 0:
if clear_existing:
self.reset()
return
if clear_existing:
self.reset()
now_v = time.time() if now is None else float(now)
hc = int(hit_count if hit_count is not None else self.cfg.hit_threshold)
hc = max(1, hc)
pts = np.asarray(points_world, dtype=np.float32)
keys = self._voxel_keys(pts)
uniq, inv = np.unique(keys, axis=0, return_inverse=True)
for i, k in enumerate(uniq):
k_t = (int(k[0]), int(k[1]), int(k[2]))
idx = np.where(inv == i)[0]
mean = pts[idx].mean(axis=0)
self._count[k_t] = max(self._count.get(k_t, 0), hc)
self._last[k_t] = now_v
self._sum[k_t] = mean.copy()
self._n[k_t] = 1
self._decay(now_v)
if len(self._count) > int(self.cfg.max_voxels):
self._aggressive_prune()
def stats(self) -> dict:
thr = int(self.cfg.hit_threshold)
stable = sum(1 for c in self._count.values() if c >= thr)
return {
"voxels_total": len(self._count),
"voxels_stable": stable,
"hit_threshold": thr,
"voxel_size": float(self.cfg.voxel_size),
"decay_seconds": float(self.cfg.decay_seconds),
"max_voxels": int(self.cfg.max_voxels),
}
@dataclass
class IndoorMapQualityConfig:
enabled: bool = True
near_min_range_m: float = 0.15
ray_consistency_enabled: bool = True
ray_bin_deg: float = 1.0
ray_elev_bin_deg: float = 4.0
ray_max_behind_m: float = 0.25
ray_keep_n: int = 1
world_z_clip_enabled: bool = False
world_z_min_m: float = -2.0
world_z_max_m: float = 3.0
outlier_filter_enabled: bool = False
outlier_voxel_m: float = 0.12
outlier_min_points: int = 2
# Robot body exclusion box (sensor frame) — removes self-returns from robot body.
# Tuned for Unitree G1 Edu with chest-mounted Livox MID-360.
body_exclusion_enabled: bool = False
body_x_min_m: float = -0.20
body_x_max_m: float = 0.35
body_y_min_m: float = -0.25
body_y_max_m: float = 0.25
body_z_min_m: float = -1.30
body_z_max_m: float = 0.15
@staticmethod
def from_dict(d: Dict[str, Any] | None) -> "IndoorMapQualityConfig":
src = d or {}
return IndoorMapQualityConfig(
enabled=bool(src.get("enabled", True)),
near_min_range_m=max(0.0, float(src.get("near_min_range_m", 0.15))),
ray_consistency_enabled=bool(src.get("ray_consistency_enabled", True)),
ray_bin_deg=max(0.2, float(src.get("ray_bin_deg", 1.0))),
ray_elev_bin_deg=max(0.0, float(src.get("ray_elev_bin_deg", 4.0))),
ray_max_behind_m=max(0.0, float(src.get("ray_max_behind_m", 0.25))),
ray_keep_n=max(1, int(src.get("ray_keep_n", 1))),
world_z_clip_enabled=bool(src.get("world_z_clip_enabled", False)),
world_z_min_m=float(src.get("world_z_min_m", -2.0)),
world_z_max_m=float(src.get("world_z_max_m", 3.0)),
outlier_filter_enabled=bool(src.get("outlier_filter_enabled", False)),
outlier_voxel_m=max(0.02, float(src.get("outlier_voxel_m", 0.12))),
outlier_min_points=max(1, int(src.get("outlier_min_points", 2))),
body_exclusion_enabled=bool(src.get("body_exclusion_enabled", False)),
body_x_min_m=float(src.get("body_x_min_m", -0.20)),
body_x_max_m=float(src.get("body_x_max_m", 0.35)),
body_y_min_m=float(src.get("body_y_min_m", -0.25)),
body_y_max_m=float(src.get("body_y_max_m", 0.25)),
body_z_min_m=float(src.get("body_z_min_m", -1.30)),
body_z_max_m=float(src.get("body_z_max_m", 0.15)),
)
class IndoorMapQualityFilter:
"""
Lightweight indoor map-quality filters intended for real-time use.
"""
def __init__(self, cfg: IndoorMapQualityConfig):
self.cfg = cfg
@staticmethod
def _finite(points: np.ndarray) -> np.ndarray:
if points is None or len(points) == 0:
return np.zeros((0, 3), dtype=np.float32)
pts = np.asarray(points, dtype=np.float32)
m = np.isfinite(pts).all(axis=1)
return pts[m]
def _remove_sparse_voxels(self, points: np.ndarray) -> np.ndarray:
if points is None or len(points) == 0:
return np.zeros((0, 3), dtype=np.float32)
voxel = float(self.cfg.outlier_voxel_m)
keys = np.floor(points / voxel).astype(np.int32)
uniq, inv, counts = np.unique(keys, axis=0, return_inverse=True, return_counts=True)
del uniq # only counts/inv are needed
keep = counts[inv] >= int(self.cfg.outlier_min_points)
return points[keep]
def _ray_consistency_filter(self, points: np.ndarray) -> np.ndarray:
"""
Keep nearest returns per azimuth bin (+small behind margin) to suppress
echoes and duplicated points that appear behind wall lines.
Uses a vectorized fast path for keep_n=1 to keep CPU usage low.
"""
if points is None or len(points) == 0:
return np.zeros((0, 3), dtype=np.float32)
if not self.cfg.ray_consistency_enabled:
return points
pts = np.asarray(points, dtype=np.float32)
if len(pts) < 80:
return pts
xy = pts[:, :2]
r = np.linalg.norm(xy, axis=1)
valid = np.isfinite(r) & (r > 1e-4)
if not np.all(valid):
pts = pts[valid]
r = r[valid]
if len(pts) < 40:
return pts
bin_rad = np.deg2rad(float(self.cfg.ray_bin_deg))
az = np.arctan2(pts[:, 1], pts[:, 0])
bins_az = np.floor((az + np.pi) / max(1e-6, bin_rad)).astype(np.int32)
elev_bin_deg = float(self.cfg.ray_elev_bin_deg)
if elev_bin_deg > 0.0:
elev_rad = np.deg2rad(max(0.2, elev_bin_deg))
elev = np.arctan2(pts[:, 2], np.maximum(r, 1e-4))
bins_el = np.floor((elev + 0.5 * np.pi) / elev_rad).astype(np.int32)
n_az = int(np.max(bins_az)) + 1 if len(bins_az) > 0 else 1
bins = bins_az + (np.maximum(0, bins_el) * max(1, n_az))
else:
bins = bins_az
margin = float(self.cfg.ray_max_behind_m)
keep_n = int(self.cfg.ray_keep_n)
n_bins = int(np.max(bins)) + 1 if len(bins) > 0 else 0
if n_bins <= 0:
return pts
if keep_n <= 1:
# Fast vectorized mode: nearest hit per bin + margin.
min_r = np.full((n_bins,), np.inf, dtype=np.float32)
np.minimum.at(min_r, bins, r.astype(np.float32, copy=False))
keep = r <= (min_r[bins] + margin)
else:
# Fallback for multi-return mode (rare; higher CPU).
order = np.lexsort((r, bins))
bins_s = bins[order]
r_s = r[order]
keep_sorted = np.zeros((len(order),), dtype=bool)
i = 0
n = int(len(order))
while i < n:
j = i + 1
b = int(bins_s[i])
while j < n and int(bins_s[j]) == b:
j += 1
rr = r_s[i:j]
if len(rr) <= keep_n:
keep_sorted[i:j] = True
else:
base = float(rr[min(len(rr) - 1, keep_n - 1)])
cutoff = base + margin
local = rr <= cutoff
local[:keep_n] = True
keep_sorted[i:j] = local
i = j
keep = np.zeros((len(pts),), dtype=bool)
keep[order] = keep_sorted
out = pts[keep]
# Safety fallback: if filter is too aggressive, keep original frame.
if len(out) < max(40, int(0.15 * len(pts))):
return pts
return out
def _exclude_body_box(self, points: np.ndarray) -> np.ndarray:
"""Remove points inside the robot body bounding box (sensor frame).
Prevents G1 arm/torso self-returns from polluting the map.
"""
inside = (
(points[:, 0] >= self.cfg.body_x_min_m) & (points[:, 0] <= self.cfg.body_x_max_m) &
(points[:, 1] >= self.cfg.body_y_min_m) & (points[:, 1] <= self.cfg.body_y_max_m) &
(points[:, 2] >= self.cfg.body_z_min_m) & (points[:, 2] <= self.cfg.body_z_max_m)
)
return points[~inside]
def apply_sensor(self, points_sensor: np.ndarray) -> np.ndarray:
pts = self._finite(points_sensor)
if not self.cfg.enabled or len(pts) == 0:
return pts
r2 = np.einsum("ij,ij->i", pts, pts)
min_r2 = float(self.cfg.near_min_range_m) ** 2
pts = pts[r2 >= min_r2]
if len(pts) == 0:
return pts
if self.cfg.body_exclusion_enabled:
pts = self._exclude_body_box(pts)
if len(pts) == 0:
return pts
pts = self._ray_consistency_filter(pts)
if len(pts) == 0:
return pts
if self.cfg.outlier_filter_enabled:
pts = self._remove_sparse_voxels(pts)
return pts
def apply_world(self, points_world: np.ndarray) -> np.ndarray:
pts = self._finite(points_world)
if not self.cfg.enabled or len(pts) == 0:
return pts
if self.cfg.world_z_clip_enabled:
zmin = float(self.cfg.world_z_min_m)
zmax = float(self.cfg.world_z_max_m)
if zmax > zmin:
pts = pts[(pts[:, 2] >= zmin) & (pts[:, 2] <= zmax)]
return pts