Marcus/Lidar/SLAM_PlaceRecognition.py
2026-04-12 18:50:22 +04:00

304 lines
12 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
def _as_bool(v: Any, default: bool = False) -> bool:
if v is None:
return default
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return bool(v)
return str(v).strip().lower() in ("1", "true", "yes", "on")
def _safe_float(v: Any, default: float) -> float:
try:
return float(v)
except Exception:
return float(default)
def _safe_int(v: Any, default: int) -> int:
try:
return int(v)
except Exception:
return int(default)
def _safe_profiles(v: Any, default: Tuple[str, ...]) -> Tuple[str, ...]:
if isinstance(v, (list, tuple)):
out = []
for item in v:
s = str(item).upper().strip()
if s:
out.append(s)
if out:
return tuple(out)
return tuple(default)
def _voxel_downsample(points: np.ndarray, voxel_m: float) -> np.ndarray:
pts = np.asarray(points, dtype=np.float32)
if pts.ndim != 2 or pts.shape[1] != 3 or len(pts) == 0:
return np.zeros((0, 3), dtype=np.float32)
if float(voxel_m) <= 0.0:
return pts
keys = np.floor(pts / float(voxel_m)).astype(np.int32)
_, idx = np.unique(keys, axis=0, return_index=True)
return pts[np.sort(idx)]
@dataclass
class PlaceRecognitionConfig:
enabled: bool = True
backend: str = "geometric"
models_dir: str = "model"
apply_profiles: Tuple[str, ...] = ("LOCALIZE_MAP", "LIVE_NAV_MAP")
anchor_voxel_m: float = 1.6
submap_radius_m: float = 4.8
min_points_per_anchor: int = 180
max_anchors: int = 140
candidate_top_k: int = 10
descriptor_voxel_m: float = 0.18
radial_bins: int = 8
angular_bins: int = 16
height_bins: int = 6
guess_bias_radius_m: float = 9.0
guess_score_weight: float = 0.18
@staticmethod
def from_dict(d: Dict[str, Any] | None) -> "PlaceRecognitionConfig":
src = d or {}
return PlaceRecognitionConfig(
enabled=_as_bool(src.get("enabled", True), True),
backend=str(src.get("backend", "geometric")).strip().lower() or "geometric",
models_dir=str(src.get("models_dir", "model")).strip() or "model",
apply_profiles=_safe_profiles(
src.get("apply_profiles", ("LOCALIZE_MAP", "LIVE_NAV_MAP")),
("LOCALIZE_MAP", "LIVE_NAV_MAP"),
),
anchor_voxel_m=max(0.4, _safe_float(src.get("anchor_voxel_m", 1.6), 1.6)),
submap_radius_m=max(1.0, _safe_float(src.get("submap_radius_m", 4.8), 4.8)),
min_points_per_anchor=max(40, _safe_int(src.get("min_points_per_anchor", 180), 180)),
max_anchors=max(8, _safe_int(src.get("max_anchors", 140), 140)),
candidate_top_k=max(1, _safe_int(src.get("candidate_top_k", 10), 10)),
descriptor_voxel_m=max(0.04, _safe_float(src.get("descriptor_voxel_m", 0.18), 0.18)),
radial_bins=max(4, _safe_int(src.get("radial_bins", 8), 8)),
angular_bins=max(8, _safe_int(src.get("angular_bins", 16), 16)),
height_bins=max(3, _safe_int(src.get("height_bins", 6), 6)),
guess_bias_radius_m=max(0.5, _safe_float(src.get("guess_bias_radius_m", 9.0), 9.0)),
guess_score_weight=float(np.clip(_safe_float(src.get("guess_score_weight", 0.18), 0.18), 0.0, 1.0)),
)
class PlaceRecognitionIndex:
def __init__(self, cfg: PlaceRecognitionConfig, base_dir: str | Path):
self.cfg = cfg
self.base_dir = Path(base_dir).resolve()
self.models_dir = (self.base_dir / self.cfg.models_dir).resolve()
self.reset()
def reset(self) -> None:
self.ref_path: Optional[str] = None
self.anchors = np.zeros((0, 3), dtype=np.float32)
self.descriptors = np.zeros((0, 1), dtype=np.float32)
self.anchor_sizes = np.zeros((0,), dtype=np.int32)
self.index_ready = False
self.model_files: List[str] = []
self.backend_active = "disabled"
self.last_query: Dict[str, Any] = {}
def profile_allowed(self, profile: str) -> bool:
p = str(profile).upper().strip()
return p in {str(x).upper().strip() for x in tuple(self.cfg.apply_profiles)}
def _refresh_model_files(self) -> None:
try:
self.models_dir.mkdir(parents=True, exist_ok=True)
self.model_files = sorted(
str(p.name)
for p in self.models_dir.iterdir()
if p.is_file() and p.suffix.lower() in (".onnx", ".pt", ".pth", ".engine")
)
except Exception:
self.model_files = []
def _descriptor(self, points: np.ndarray) -> np.ndarray:
pts = np.asarray(points, dtype=np.float32)
if pts.ndim != 2 or pts.shape[1] != 3 or len(pts) == 0:
n = int(self.cfg.radial_bins + self.cfg.angular_bins + self.cfg.height_bins + 7)
return np.zeros((n,), dtype=np.float32)
center = np.mean(pts, axis=0, dtype=np.float64)
rel = np.asarray(pts, dtype=np.float64) - center.reshape((1, 3))
xy = rel[:, :2]
r = np.linalg.norm(xy, axis=1)
theta = np.arctan2(xy[:, 1], xy[:, 0])
z = rel[:, 2]
max_r = max(0.5, float(self.cfg.submap_radius_m))
r_clip = np.clip(r / max_r, 0.0, 0.999999)
theta_norm = (theta + np.pi) / (2.0 * np.pi)
z_clip = np.clip((z + 2.5) / 5.0, 0.0, 0.999999)
h_r, _ = np.histogram(r_clip, bins=int(self.cfg.radial_bins), range=(0.0, 1.0))
h_t, _ = np.histogram(theta_norm, bins=int(self.cfg.angular_bins), range=(0.0, 1.0))
h_z, _ = np.histogram(z_clip, bins=int(self.cfg.height_bins), range=(0.0, 1.0))
cov = np.cov(rel.T) if len(rel) >= 3 else np.eye(3, dtype=np.float64)
eig = np.sort(np.linalg.eigvalsh(cov).astype(np.float64))
eig = eig / max(1e-6, float(np.sum(np.abs(eig))))
ext_xy = np.percentile(np.abs(xy), 90.0, axis=0) if len(xy) > 0 else np.zeros((2,), dtype=np.float64)
ext_z = np.percentile(np.abs(z), 90.0) if len(z) > 0 else 0.0
density = float(len(pts)) / max(1.0, float(np.pi * max_r * max_r))
desc = np.concatenate(
[
np.asarray(h_r, dtype=np.float32),
np.asarray(h_t, dtype=np.float32),
np.asarray(h_z, dtype=np.float32),
np.asarray(
[
float(ext_xy[0]) / max_r,
float(ext_xy[1]) / max_r,
float(ext_z) / 2.5,
float(density) / 200.0,
],
dtype=np.float32,
),
np.asarray(eig, dtype=np.float32),
],
axis=0,
).astype(np.float32)
norm = float(np.linalg.norm(desc))
if norm > 1e-6:
desc /= norm
return desc
def build(self, ref_points: np.ndarray, ref_path: Optional[str] = None) -> Dict[str, Any]:
self._refresh_model_files()
pts = _voxel_downsample(np.asarray(ref_points, dtype=np.float32), float(self.cfg.descriptor_voxel_m))
self.ref_path = None if ref_path is None else str(ref_path)
self.index_ready = False
self.anchors = np.zeros((0, 3), dtype=np.float32)
self.descriptors = np.zeros((0, 1), dtype=np.float32)
self.anchor_sizes = np.zeros((0,), dtype=np.int32)
if not self.cfg.enabled or len(pts) < int(self.cfg.min_points_per_anchor):
self.backend_active = "disabled" if not self.cfg.enabled else "geometric_empty"
return self.snapshot()
xy_keys = np.floor(pts[:, :2] / float(self.cfg.anchor_voxel_m)).astype(np.int32)
uniq, inverse, counts = np.unique(xy_keys, axis=0, return_inverse=True, return_counts=True)
order = np.argsort(counts)[::-1]
if len(order) > int(self.cfg.max_anchors):
order = order[: int(self.cfg.max_anchors)]
anchors: List[np.ndarray] = []
descs: List[np.ndarray] = []
sizes: List[int] = []
r2 = float(self.cfg.submap_radius_m) * float(self.cfg.submap_radius_m)
for idx in order:
base_mask = inverse == int(idx)
if not np.any(base_mask):
continue
center = np.mean(pts[base_mask], axis=0, dtype=np.float64)
rel = pts - center.reshape((1, 3))
d2 = np.einsum("ij,ij->i", rel[:, :2], rel[:, :2])
keep = d2 <= r2
n_keep = int(np.count_nonzero(keep))
if n_keep < int(self.cfg.min_points_per_anchor):
continue
local_pts = pts[keep]
anchors.append(np.asarray(center, dtype=np.float32))
descs.append(self._descriptor(local_pts))
sizes.append(n_keep)
if descs:
self.anchors = np.asarray(anchors, dtype=np.float32).reshape((-1, 3))
self.descriptors = np.asarray(descs, dtype=np.float32)
self.anchor_sizes = np.asarray(sizes, dtype=np.int32)
self.index_ready = True
self.backend_active = "ai_pending" if (self.cfg.backend == "ai" and self.model_files) else "geometric"
else:
self.backend_active = "geometric_empty"
return self.snapshot()
def query(
self,
live_points: np.ndarray,
approx_center: Optional[np.ndarray] = None,
top_k: Optional[int] = None,
) -> Dict[str, Any]:
live = _voxel_downsample(np.asarray(live_points, dtype=np.float32), float(self.cfg.descriptor_voxel_m))
top_n = int(self.cfg.candidate_top_k if top_k is None else max(1, int(top_k)))
if not self.index_ready or len(live) < 40 or len(self.anchors) == 0:
out = {
"ok": False,
"reason": "index_not_ready" if not self.index_ready else "live_too_few",
"backend": str(self.backend_active),
"candidates": [],
}
self.last_query = dict(out)
return out
q = self._descriptor(live)
scores = np.asarray(self.descriptors @ q.reshape((-1, 1)), dtype=np.float32).reshape((-1,))
if approx_center is not None:
try:
guess = np.asarray(approx_center, dtype=np.float32).reshape((3,))
d = np.linalg.norm(self.anchors[:, :2] - guess[:2].reshape((1, 2)), axis=1)
bias = 1.0 - np.clip(d / max(1e-3, float(self.cfg.guess_bias_radius_m)), 0.0, 1.0)
scores = (1.0 - float(self.cfg.guess_score_weight)) * scores + float(self.cfg.guess_score_weight) * bias
except Exception:
pass
order = np.argsort(scores)[::-1][:top_n]
candidates: List[Dict[str, Any]] = []
for idx in order:
center = np.asarray(self.anchors[int(idx)], dtype=np.float32)
item = {
"center": [float(center[0]), float(center[1]), float(center[2])],
"score": float(scores[int(idx)]),
"support_points": int(self.anchor_sizes[int(idx)]) if len(self.anchor_sizes) > int(idx) else 0,
}
if approx_center is not None:
try:
guess = np.asarray(approx_center, dtype=np.float32).reshape((3,))
item["guess_dist_m"] = float(np.linalg.norm(center[:2] - guess[:2]))
except Exception:
pass
candidates.append(item)
out = {
"ok": bool(len(candidates) > 0),
"reason": "ok" if candidates else "no_candidates",
"backend": str(self.backend_active),
"top_score": float(candidates[0]["score"]) if candidates else 0.0,
"candidates": candidates,
}
self.last_query = dict(out)
return out
def snapshot(self) -> Dict[str, Any]:
return {
"enabled": bool(self.cfg.enabled),
"backend": str(self.cfg.backend),
"backend_active": str(self.backend_active),
"models_dir": str(self.models_dir),
"model_files": list(self.model_files),
"index_ready": bool(self.index_ready),
"anchors": int(len(self.anchors)),
"ref_path": self.ref_path,
"last_query": dict(self.last_query) if isinstance(self.last_query, dict) else {},
}