Marcus/Lidar/SLAM_Safety.py
2026-04-12 18:50:22 +04:00

109 lines
3.4 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional
import numpy as np
def _safe_float(v: Any, default: float) -> float:
try:
return float(v)
except Exception:
return float(default)
@dataclass
class SafetyConfig:
enabled: bool = True
stop_radius_m: float = 0.50 # G1 Edu shoulder width ~0.45 m; 0.50 m gives safe margin
stale_localization_sec: float = 1.5
emergency_hold_sec: float = 0.8
@staticmethod
def from_dict(d: Dict[str, Any] | None) -> "SafetyConfig":
src = d or {}
return SafetyConfig(
enabled=bool(src.get("enabled", True)),
stop_radius_m=max(0.05, _safe_float(src.get("stop_radius_m", 0.50), 0.50)),
stale_localization_sec=max(0.2, _safe_float(src.get("stale_localization_sec", 1.5), 1.5)),
emergency_hold_sec=max(0.0, _safe_float(src.get("emergency_hold_sec", 0.8), 0.8)),
)
class SafetySupervisor:
def __init__(self, cfg: SafetyConfig):
self.cfg = cfg
self._last_localize_ok_t = 0.0
self._last_emergency_t = -1e9
def mark_localization(self, ok: bool, now: float) -> None:
if ok:
self._last_localize_ok_t = float(now)
def evaluate(
self,
now: float,
pose_world: Optional[np.ndarray],
loc_state: str,
nav_runtime: Any,
nav_cmd: Dict[str, Any],
) -> Dict[str, Any]:
if not self.cfg.enabled:
return {
"enabled": False,
"emergency": False,
"reasons": [],
"cmd": dict(nav_cmd),
}
reasons = []
motion_requested = (
abs(float(nav_cmd.get("linear_mps", 0.0))) > 1e-3
or abs(float(nav_cmd.get("angular_rps", 0.0))) > 1e-3
)
lost = str(loc_state).upper().strip() == "LOST"
if lost and motion_requested:
reasons.append("localization_lost")
stale = False
if self._last_localize_ok_t > 0.0:
stale = (float(now) - float(self._last_localize_ok_t)) > float(self.cfg.stale_localization_sec)
if stale and motion_requested:
reasons.append("localization_stale")
collision = False
if pose_world is None or np.asarray(pose_world).shape != (4, 4):
if motion_requested:
collision = True
reasons.append("pose_invalid")
else:
x = float(pose_world[0, 3])
y = float(pose_world[1, 3])
try:
collision = bool(nav_runtime.occupied_near(x, y, float(self.cfg.stop_radius_m))) if motion_requested else False
except Exception:
collision = False
if collision:
reasons.append("collision_zone")
emergency = bool((lost and motion_requested) or (stale and motion_requested) or collision)
if emergency:
self._last_emergency_t = float(now)
hold_active = (float(now) - float(self._last_emergency_t)) <= float(self.cfg.emergency_hold_sec)
cmd_out = dict(nav_cmd)
if emergency or hold_active:
cmd_out["linear_mps"] = 0.0
cmd_out["angular_rps"] = 0.0
cmd_out["blocked"] = True
return {
"enabled": True,
"emergency": bool(emergency),
"hold": bool(hold_active),
"reasons": reasons,
"cmd": cmd_out,
}