304 lines
12 KiB
Python
304 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
|
|
def _as_bool(v: Any, default: bool = False) -> bool:
|
|
if v is None:
|
|
return default
|
|
if isinstance(v, bool):
|
|
return v
|
|
if isinstance(v, (int, float)):
|
|
return bool(v)
|
|
return str(v).strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
|
|
def _safe_float(v: Any, default: float) -> float:
|
|
try:
|
|
return float(v)
|
|
except Exception:
|
|
return float(default)
|
|
|
|
|
|
def _safe_int(v: Any, default: int) -> int:
|
|
try:
|
|
return int(v)
|
|
except Exception:
|
|
return int(default)
|
|
|
|
|
|
def _safe_profiles(v: Any, default: Tuple[str, ...]) -> Tuple[str, ...]:
|
|
if isinstance(v, (list, tuple)):
|
|
out = []
|
|
for item in v:
|
|
s = str(item).upper().strip()
|
|
if s:
|
|
out.append(s)
|
|
if out:
|
|
return tuple(out)
|
|
return tuple(default)
|
|
|
|
|
|
def _voxel_downsample(points: np.ndarray, voxel_m: float) -> np.ndarray:
|
|
pts = np.asarray(points, dtype=np.float32)
|
|
if pts.ndim != 2 or pts.shape[1] != 3 or len(pts) == 0:
|
|
return np.zeros((0, 3), dtype=np.float32)
|
|
if float(voxel_m) <= 0.0:
|
|
return pts
|
|
keys = np.floor(pts / float(voxel_m)).astype(np.int32)
|
|
_, idx = np.unique(keys, axis=0, return_index=True)
|
|
return pts[np.sort(idx)]
|
|
|
|
|
|
@dataclass
|
|
class PlaceRecognitionConfig:
|
|
enabled: bool = True
|
|
backend: str = "geometric"
|
|
models_dir: str = "model"
|
|
apply_profiles: Tuple[str, ...] = ("LOCALIZE_MAP", "LIVE_NAV_MAP")
|
|
anchor_voxel_m: float = 1.6
|
|
submap_radius_m: float = 4.8
|
|
min_points_per_anchor: int = 180
|
|
max_anchors: int = 140
|
|
candidate_top_k: int = 10
|
|
descriptor_voxel_m: float = 0.18
|
|
radial_bins: int = 8
|
|
angular_bins: int = 16
|
|
height_bins: int = 6
|
|
guess_bias_radius_m: float = 9.0
|
|
guess_score_weight: float = 0.18
|
|
|
|
@staticmethod
|
|
def from_dict(d: Dict[str, Any] | None) -> "PlaceRecognitionConfig":
|
|
src = d or {}
|
|
return PlaceRecognitionConfig(
|
|
enabled=_as_bool(src.get("enabled", True), True),
|
|
backend=str(src.get("backend", "geometric")).strip().lower() or "geometric",
|
|
models_dir=str(src.get("models_dir", "model")).strip() or "model",
|
|
apply_profiles=_safe_profiles(
|
|
src.get("apply_profiles", ("LOCALIZE_MAP", "LIVE_NAV_MAP")),
|
|
("LOCALIZE_MAP", "LIVE_NAV_MAP"),
|
|
),
|
|
anchor_voxel_m=max(0.4, _safe_float(src.get("anchor_voxel_m", 1.6), 1.6)),
|
|
submap_radius_m=max(1.0, _safe_float(src.get("submap_radius_m", 4.8), 4.8)),
|
|
min_points_per_anchor=max(40, _safe_int(src.get("min_points_per_anchor", 180), 180)),
|
|
max_anchors=max(8, _safe_int(src.get("max_anchors", 140), 140)),
|
|
candidate_top_k=max(1, _safe_int(src.get("candidate_top_k", 10), 10)),
|
|
descriptor_voxel_m=max(0.04, _safe_float(src.get("descriptor_voxel_m", 0.18), 0.18)),
|
|
radial_bins=max(4, _safe_int(src.get("radial_bins", 8), 8)),
|
|
angular_bins=max(8, _safe_int(src.get("angular_bins", 16), 16)),
|
|
height_bins=max(3, _safe_int(src.get("height_bins", 6), 6)),
|
|
guess_bias_radius_m=max(0.5, _safe_float(src.get("guess_bias_radius_m", 9.0), 9.0)),
|
|
guess_score_weight=float(np.clip(_safe_float(src.get("guess_score_weight", 0.18), 0.18), 0.0, 1.0)),
|
|
)
|
|
|
|
|
|
class PlaceRecognitionIndex:
|
|
def __init__(self, cfg: PlaceRecognitionConfig, base_dir: str | Path):
|
|
self.cfg = cfg
|
|
self.base_dir = Path(base_dir).resolve()
|
|
self.models_dir = (self.base_dir / self.cfg.models_dir).resolve()
|
|
self.reset()
|
|
|
|
def reset(self) -> None:
|
|
self.ref_path: Optional[str] = None
|
|
self.anchors = np.zeros((0, 3), dtype=np.float32)
|
|
self.descriptors = np.zeros((0, 1), dtype=np.float32)
|
|
self.anchor_sizes = np.zeros((0,), dtype=np.int32)
|
|
self.index_ready = False
|
|
self.model_files: List[str] = []
|
|
self.backend_active = "disabled"
|
|
self.last_query: Dict[str, Any] = {}
|
|
|
|
def profile_allowed(self, profile: str) -> bool:
|
|
p = str(profile).upper().strip()
|
|
return p in {str(x).upper().strip() for x in tuple(self.cfg.apply_profiles)}
|
|
|
|
def _refresh_model_files(self) -> None:
|
|
try:
|
|
self.models_dir.mkdir(parents=True, exist_ok=True)
|
|
self.model_files = sorted(
|
|
str(p.name)
|
|
for p in self.models_dir.iterdir()
|
|
if p.is_file() and p.suffix.lower() in (".onnx", ".pt", ".pth", ".engine")
|
|
)
|
|
except Exception:
|
|
self.model_files = []
|
|
|
|
def _descriptor(self, points: np.ndarray) -> np.ndarray:
|
|
pts = np.asarray(points, dtype=np.float32)
|
|
if pts.ndim != 2 or pts.shape[1] != 3 or len(pts) == 0:
|
|
n = int(self.cfg.radial_bins + self.cfg.angular_bins + self.cfg.height_bins + 7)
|
|
return np.zeros((n,), dtype=np.float32)
|
|
|
|
center = np.mean(pts, axis=0, dtype=np.float64)
|
|
rel = np.asarray(pts, dtype=np.float64) - center.reshape((1, 3))
|
|
xy = rel[:, :2]
|
|
r = np.linalg.norm(xy, axis=1)
|
|
theta = np.arctan2(xy[:, 1], xy[:, 0])
|
|
z = rel[:, 2]
|
|
|
|
max_r = max(0.5, float(self.cfg.submap_radius_m))
|
|
r_clip = np.clip(r / max_r, 0.0, 0.999999)
|
|
theta_norm = (theta + np.pi) / (2.0 * np.pi)
|
|
z_clip = np.clip((z + 2.5) / 5.0, 0.0, 0.999999)
|
|
|
|
h_r, _ = np.histogram(r_clip, bins=int(self.cfg.radial_bins), range=(0.0, 1.0))
|
|
h_t, _ = np.histogram(theta_norm, bins=int(self.cfg.angular_bins), range=(0.0, 1.0))
|
|
h_z, _ = np.histogram(z_clip, bins=int(self.cfg.height_bins), range=(0.0, 1.0))
|
|
|
|
cov = np.cov(rel.T) if len(rel) >= 3 else np.eye(3, dtype=np.float64)
|
|
eig = np.sort(np.linalg.eigvalsh(cov).astype(np.float64))
|
|
eig = eig / max(1e-6, float(np.sum(np.abs(eig))))
|
|
|
|
ext_xy = np.percentile(np.abs(xy), 90.0, axis=0) if len(xy) > 0 else np.zeros((2,), dtype=np.float64)
|
|
ext_z = np.percentile(np.abs(z), 90.0) if len(z) > 0 else 0.0
|
|
density = float(len(pts)) / max(1.0, float(np.pi * max_r * max_r))
|
|
|
|
desc = np.concatenate(
|
|
[
|
|
np.asarray(h_r, dtype=np.float32),
|
|
np.asarray(h_t, dtype=np.float32),
|
|
np.asarray(h_z, dtype=np.float32),
|
|
np.asarray(
|
|
[
|
|
float(ext_xy[0]) / max_r,
|
|
float(ext_xy[1]) / max_r,
|
|
float(ext_z) / 2.5,
|
|
float(density) / 200.0,
|
|
],
|
|
dtype=np.float32,
|
|
),
|
|
np.asarray(eig, dtype=np.float32),
|
|
],
|
|
axis=0,
|
|
).astype(np.float32)
|
|
|
|
norm = float(np.linalg.norm(desc))
|
|
if norm > 1e-6:
|
|
desc /= norm
|
|
return desc
|
|
|
|
def build(self, ref_points: np.ndarray, ref_path: Optional[str] = None) -> Dict[str, Any]:
|
|
self._refresh_model_files()
|
|
pts = _voxel_downsample(np.asarray(ref_points, dtype=np.float32), float(self.cfg.descriptor_voxel_m))
|
|
self.ref_path = None if ref_path is None else str(ref_path)
|
|
self.index_ready = False
|
|
self.anchors = np.zeros((0, 3), dtype=np.float32)
|
|
self.descriptors = np.zeros((0, 1), dtype=np.float32)
|
|
self.anchor_sizes = np.zeros((0,), dtype=np.int32)
|
|
|
|
if not self.cfg.enabled or len(pts) < int(self.cfg.min_points_per_anchor):
|
|
self.backend_active = "disabled" if not self.cfg.enabled else "geometric_empty"
|
|
return self.snapshot()
|
|
|
|
xy_keys = np.floor(pts[:, :2] / float(self.cfg.anchor_voxel_m)).astype(np.int32)
|
|
uniq, inverse, counts = np.unique(xy_keys, axis=0, return_inverse=True, return_counts=True)
|
|
order = np.argsort(counts)[::-1]
|
|
if len(order) > int(self.cfg.max_anchors):
|
|
order = order[: int(self.cfg.max_anchors)]
|
|
|
|
anchors: List[np.ndarray] = []
|
|
descs: List[np.ndarray] = []
|
|
sizes: List[int] = []
|
|
r2 = float(self.cfg.submap_radius_m) * float(self.cfg.submap_radius_m)
|
|
for idx in order:
|
|
base_mask = inverse == int(idx)
|
|
if not np.any(base_mask):
|
|
continue
|
|
center = np.mean(pts[base_mask], axis=0, dtype=np.float64)
|
|
rel = pts - center.reshape((1, 3))
|
|
d2 = np.einsum("ij,ij->i", rel[:, :2], rel[:, :2])
|
|
keep = d2 <= r2
|
|
n_keep = int(np.count_nonzero(keep))
|
|
if n_keep < int(self.cfg.min_points_per_anchor):
|
|
continue
|
|
local_pts = pts[keep]
|
|
anchors.append(np.asarray(center, dtype=np.float32))
|
|
descs.append(self._descriptor(local_pts))
|
|
sizes.append(n_keep)
|
|
|
|
if descs:
|
|
self.anchors = np.asarray(anchors, dtype=np.float32).reshape((-1, 3))
|
|
self.descriptors = np.asarray(descs, dtype=np.float32)
|
|
self.anchor_sizes = np.asarray(sizes, dtype=np.int32)
|
|
self.index_ready = True
|
|
self.backend_active = "ai_pending" if (self.cfg.backend == "ai" and self.model_files) else "geometric"
|
|
else:
|
|
self.backend_active = "geometric_empty"
|
|
return self.snapshot()
|
|
|
|
def query(
|
|
self,
|
|
live_points: np.ndarray,
|
|
approx_center: Optional[np.ndarray] = None,
|
|
top_k: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
live = _voxel_downsample(np.asarray(live_points, dtype=np.float32), float(self.cfg.descriptor_voxel_m))
|
|
top_n = int(self.cfg.candidate_top_k if top_k is None else max(1, int(top_k)))
|
|
if not self.index_ready or len(live) < 40 or len(self.anchors) == 0:
|
|
out = {
|
|
"ok": False,
|
|
"reason": "index_not_ready" if not self.index_ready else "live_too_few",
|
|
"backend": str(self.backend_active),
|
|
"candidates": [],
|
|
}
|
|
self.last_query = dict(out)
|
|
return out
|
|
|
|
q = self._descriptor(live)
|
|
scores = np.asarray(self.descriptors @ q.reshape((-1, 1)), dtype=np.float32).reshape((-1,))
|
|
|
|
if approx_center is not None:
|
|
try:
|
|
guess = np.asarray(approx_center, dtype=np.float32).reshape((3,))
|
|
d = np.linalg.norm(self.anchors[:, :2] - guess[:2].reshape((1, 2)), axis=1)
|
|
bias = 1.0 - np.clip(d / max(1e-3, float(self.cfg.guess_bias_radius_m)), 0.0, 1.0)
|
|
scores = (1.0 - float(self.cfg.guess_score_weight)) * scores + float(self.cfg.guess_score_weight) * bias
|
|
except Exception:
|
|
pass
|
|
|
|
order = np.argsort(scores)[::-1][:top_n]
|
|
candidates: List[Dict[str, Any]] = []
|
|
for idx in order:
|
|
center = np.asarray(self.anchors[int(idx)], dtype=np.float32)
|
|
item = {
|
|
"center": [float(center[0]), float(center[1]), float(center[2])],
|
|
"score": float(scores[int(idx)]),
|
|
"support_points": int(self.anchor_sizes[int(idx)]) if len(self.anchor_sizes) > int(idx) else 0,
|
|
}
|
|
if approx_center is not None:
|
|
try:
|
|
guess = np.asarray(approx_center, dtype=np.float32).reshape((3,))
|
|
item["guess_dist_m"] = float(np.linalg.norm(center[:2] - guess[:2]))
|
|
except Exception:
|
|
pass
|
|
candidates.append(item)
|
|
|
|
out = {
|
|
"ok": bool(len(candidates) > 0),
|
|
"reason": "ok" if candidates else "no_candidates",
|
|
"backend": str(self.backend_active),
|
|
"top_score": float(candidates[0]["score"]) if candidates else 0.0,
|
|
"candidates": candidates,
|
|
}
|
|
self.last_query = dict(out)
|
|
return out
|
|
|
|
def snapshot(self) -> Dict[str, Any]:
|
|
return {
|
|
"enabled": bool(self.cfg.enabled),
|
|
"backend": str(self.cfg.backend),
|
|
"backend_active": str(self.backend_active),
|
|
"models_dir": str(self.models_dir),
|
|
"model_files": list(self.model_files),
|
|
"index_ready": bool(self.index_ready),
|
|
"anchors": int(len(self.anchors)),
|
|
"ref_path": self.ref_path,
|
|
"last_query": dict(self.last_query) if isinstance(self.last_query, dict) else {},
|
|
}
|