from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import numpy as np def _as_bool(v: Any, default: bool = False) -> bool: if v is None: return default if isinstance(v, bool): return v if isinstance(v, (int, float)): return bool(v) return str(v).strip().lower() in ("1", "true", "yes", "on") def _safe_float(v: Any, default: float) -> float: try: return float(v) except Exception: return float(default) def _safe_int(v: Any, default: int) -> int: try: return int(v) except Exception: return int(default) def _safe_profiles(v: Any, default: Tuple[str, ...]) -> Tuple[str, ...]: if isinstance(v, (list, tuple)): out = [] for item in v: s = str(item).upper().strip() if s: out.append(s) if out: return tuple(out) return tuple(default) def _voxel_downsample(points: np.ndarray, voxel_m: float) -> np.ndarray: pts = np.asarray(points, dtype=np.float32) if pts.ndim != 2 or pts.shape[1] != 3 or len(pts) == 0: return np.zeros((0, 3), dtype=np.float32) if float(voxel_m) <= 0.0: return pts keys = np.floor(pts / float(voxel_m)).astype(np.int32) _, idx = np.unique(keys, axis=0, return_index=True) return pts[np.sort(idx)] @dataclass class PlaceRecognitionConfig: enabled: bool = True backend: str = "geometric" models_dir: str = "model" apply_profiles: Tuple[str, ...] = ("LOCALIZE_MAP", "LIVE_NAV_MAP") anchor_voxel_m: float = 1.6 submap_radius_m: float = 4.8 min_points_per_anchor: int = 180 max_anchors: int = 140 candidate_top_k: int = 10 descriptor_voxel_m: float = 0.18 radial_bins: int = 8 angular_bins: int = 16 height_bins: int = 6 guess_bias_radius_m: float = 9.0 guess_score_weight: float = 0.18 @staticmethod def from_dict(d: Dict[str, Any] | None) -> "PlaceRecognitionConfig": src = d or {} return PlaceRecognitionConfig( enabled=_as_bool(src.get("enabled", True), True), backend=str(src.get("backend", "geometric")).strip().lower() or "geometric", models_dir=str(src.get("models_dir", "model")).strip() or "model", apply_profiles=_safe_profiles( src.get("apply_profiles", ("LOCALIZE_MAP", "LIVE_NAV_MAP")), ("LOCALIZE_MAP", "LIVE_NAV_MAP"), ), anchor_voxel_m=max(0.4, _safe_float(src.get("anchor_voxel_m", 1.6), 1.6)), submap_radius_m=max(1.0, _safe_float(src.get("submap_radius_m", 4.8), 4.8)), min_points_per_anchor=max(40, _safe_int(src.get("min_points_per_anchor", 180), 180)), max_anchors=max(8, _safe_int(src.get("max_anchors", 140), 140)), candidate_top_k=max(1, _safe_int(src.get("candidate_top_k", 10), 10)), descriptor_voxel_m=max(0.04, _safe_float(src.get("descriptor_voxel_m", 0.18), 0.18)), radial_bins=max(4, _safe_int(src.get("radial_bins", 8), 8)), angular_bins=max(8, _safe_int(src.get("angular_bins", 16), 16)), height_bins=max(3, _safe_int(src.get("height_bins", 6), 6)), guess_bias_radius_m=max(0.5, _safe_float(src.get("guess_bias_radius_m", 9.0), 9.0)), guess_score_weight=float(np.clip(_safe_float(src.get("guess_score_weight", 0.18), 0.18), 0.0, 1.0)), ) class PlaceRecognitionIndex: def __init__(self, cfg: PlaceRecognitionConfig, base_dir: str | Path): self.cfg = cfg self.base_dir = Path(base_dir).resolve() self.models_dir = (self.base_dir / self.cfg.models_dir).resolve() self.reset() def reset(self) -> None: self.ref_path: Optional[str] = None self.anchors = np.zeros((0, 3), dtype=np.float32) self.descriptors = np.zeros((0, 1), dtype=np.float32) self.anchor_sizes = np.zeros((0,), dtype=np.int32) self.index_ready = False self.model_files: List[str] = [] self.backend_active = "disabled" self.last_query: Dict[str, Any] = {} def profile_allowed(self, profile: str) -> bool: p = str(profile).upper().strip() return p in {str(x).upper().strip() for x in tuple(self.cfg.apply_profiles)} def _refresh_model_files(self) -> None: try: self.models_dir.mkdir(parents=True, exist_ok=True) self.model_files = sorted( str(p.name) for p in self.models_dir.iterdir() if p.is_file() and p.suffix.lower() in (".onnx", ".pt", ".pth", ".engine") ) except Exception: self.model_files = [] def _descriptor(self, points: np.ndarray) -> np.ndarray: pts = np.asarray(points, dtype=np.float32) if pts.ndim != 2 or pts.shape[1] != 3 or len(pts) == 0: n = int(self.cfg.radial_bins + self.cfg.angular_bins + self.cfg.height_bins + 7) return np.zeros((n,), dtype=np.float32) center = np.mean(pts, axis=0, dtype=np.float64) rel = np.asarray(pts, dtype=np.float64) - center.reshape((1, 3)) xy = rel[:, :2] r = np.linalg.norm(xy, axis=1) theta = np.arctan2(xy[:, 1], xy[:, 0]) z = rel[:, 2] max_r = max(0.5, float(self.cfg.submap_radius_m)) r_clip = np.clip(r / max_r, 0.0, 0.999999) theta_norm = (theta + np.pi) / (2.0 * np.pi) z_clip = np.clip((z + 2.5) / 5.0, 0.0, 0.999999) h_r, _ = np.histogram(r_clip, bins=int(self.cfg.radial_bins), range=(0.0, 1.0)) h_t, _ = np.histogram(theta_norm, bins=int(self.cfg.angular_bins), range=(0.0, 1.0)) h_z, _ = np.histogram(z_clip, bins=int(self.cfg.height_bins), range=(0.0, 1.0)) cov = np.cov(rel.T) if len(rel) >= 3 else np.eye(3, dtype=np.float64) eig = np.sort(np.linalg.eigvalsh(cov).astype(np.float64)) eig = eig / max(1e-6, float(np.sum(np.abs(eig)))) ext_xy = np.percentile(np.abs(xy), 90.0, axis=0) if len(xy) > 0 else np.zeros((2,), dtype=np.float64) ext_z = np.percentile(np.abs(z), 90.0) if len(z) > 0 else 0.0 density = float(len(pts)) / max(1.0, float(np.pi * max_r * max_r)) desc = np.concatenate( [ np.asarray(h_r, dtype=np.float32), np.asarray(h_t, dtype=np.float32), np.asarray(h_z, dtype=np.float32), np.asarray( [ float(ext_xy[0]) / max_r, float(ext_xy[1]) / max_r, float(ext_z) / 2.5, float(density) / 200.0, ], dtype=np.float32, ), np.asarray(eig, dtype=np.float32), ], axis=0, ).astype(np.float32) norm = float(np.linalg.norm(desc)) if norm > 1e-6: desc /= norm return desc def build(self, ref_points: np.ndarray, ref_path: Optional[str] = None) -> Dict[str, Any]: self._refresh_model_files() pts = _voxel_downsample(np.asarray(ref_points, dtype=np.float32), float(self.cfg.descriptor_voxel_m)) self.ref_path = None if ref_path is None else str(ref_path) self.index_ready = False self.anchors = np.zeros((0, 3), dtype=np.float32) self.descriptors = np.zeros((0, 1), dtype=np.float32) self.anchor_sizes = np.zeros((0,), dtype=np.int32) if not self.cfg.enabled or len(pts) < int(self.cfg.min_points_per_anchor): self.backend_active = "disabled" if not self.cfg.enabled else "geometric_empty" return self.snapshot() xy_keys = np.floor(pts[:, :2] / float(self.cfg.anchor_voxel_m)).astype(np.int32) uniq, inverse, counts = np.unique(xy_keys, axis=0, return_inverse=True, return_counts=True) order = np.argsort(counts)[::-1] if len(order) > int(self.cfg.max_anchors): order = order[: int(self.cfg.max_anchors)] anchors: List[np.ndarray] = [] descs: List[np.ndarray] = [] sizes: List[int] = [] r2 = float(self.cfg.submap_radius_m) * float(self.cfg.submap_radius_m) for idx in order: base_mask = inverse == int(idx) if not np.any(base_mask): continue center = np.mean(pts[base_mask], axis=0, dtype=np.float64) rel = pts - center.reshape((1, 3)) d2 = np.einsum("ij,ij->i", rel[:, :2], rel[:, :2]) keep = d2 <= r2 n_keep = int(np.count_nonzero(keep)) if n_keep < int(self.cfg.min_points_per_anchor): continue local_pts = pts[keep] anchors.append(np.asarray(center, dtype=np.float32)) descs.append(self._descriptor(local_pts)) sizes.append(n_keep) if descs: self.anchors = np.asarray(anchors, dtype=np.float32).reshape((-1, 3)) self.descriptors = np.asarray(descs, dtype=np.float32) self.anchor_sizes = np.asarray(sizes, dtype=np.int32) self.index_ready = True self.backend_active = "ai_pending" if (self.cfg.backend == "ai" and self.model_files) else "geometric" else: self.backend_active = "geometric_empty" return self.snapshot() def query( self, live_points: np.ndarray, approx_center: Optional[np.ndarray] = None, top_k: Optional[int] = None, ) -> Dict[str, Any]: live = _voxel_downsample(np.asarray(live_points, dtype=np.float32), float(self.cfg.descriptor_voxel_m)) top_n = int(self.cfg.candidate_top_k if top_k is None else max(1, int(top_k))) if not self.index_ready or len(live) < 40 or len(self.anchors) == 0: out = { "ok": False, "reason": "index_not_ready" if not self.index_ready else "live_too_few", "backend": str(self.backend_active), "candidates": [], } self.last_query = dict(out) return out q = self._descriptor(live) scores = np.asarray(self.descriptors @ q.reshape((-1, 1)), dtype=np.float32).reshape((-1,)) if approx_center is not None: try: guess = np.asarray(approx_center, dtype=np.float32).reshape((3,)) d = np.linalg.norm(self.anchors[:, :2] - guess[:2].reshape((1, 2)), axis=1) bias = 1.0 - np.clip(d / max(1e-3, float(self.cfg.guess_bias_radius_m)), 0.0, 1.0) scores = (1.0 - float(self.cfg.guess_score_weight)) * scores + float(self.cfg.guess_score_weight) * bias except Exception: pass order = np.argsort(scores)[::-1][:top_n] candidates: List[Dict[str, Any]] = [] for idx in order: center = np.asarray(self.anchors[int(idx)], dtype=np.float32) item = { "center": [float(center[0]), float(center[1]), float(center[2])], "score": float(scores[int(idx)]), "support_points": int(self.anchor_sizes[int(idx)]) if len(self.anchor_sizes) > int(idx) else 0, } if approx_center is not None: try: guess = np.asarray(approx_center, dtype=np.float32).reshape((3,)) item["guess_dist_m"] = float(np.linalg.norm(center[:2] - guess[:2])) except Exception: pass candidates.append(item) out = { "ok": bool(len(candidates) > 0), "reason": "ok" if candidates else "no_candidates", "backend": str(self.backend_active), "top_score": float(candidates[0]["score"]) if candidates else 0.0, "candidates": candidates, } self.last_query = dict(out) return out def snapshot(self) -> Dict[str, Any]: return { "enabled": bool(self.cfg.enabled), "backend": str(self.cfg.backend), "backend_active": str(self.backend_active), "models_dir": str(self.models_dir), "model_files": list(self.model_files), "index_ready": bool(self.index_ready), "anchors": int(len(self.anchors)), "ref_path": self.ref_path, "last_query": dict(self.last_query) if isinstance(self.last_query, dict) else {}, }