from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, Optional import numpy as np def _safe_float(v: Any, default: float) -> float: try: return float(v) except Exception: return float(default) @dataclass class SafetyConfig: enabled: bool = True stop_radius_m: float = 0.50 # G1 Edu shoulder width ~0.45 m; 0.50 m gives safe margin stale_localization_sec: float = 1.5 emergency_hold_sec: float = 0.8 @staticmethod def from_dict(d: Dict[str, Any] | None) -> "SafetyConfig": src = d or {} return SafetyConfig( enabled=bool(src.get("enabled", True)), stop_radius_m=max(0.05, _safe_float(src.get("stop_radius_m", 0.50), 0.50)), stale_localization_sec=max(0.2, _safe_float(src.get("stale_localization_sec", 1.5), 1.5)), emergency_hold_sec=max(0.0, _safe_float(src.get("emergency_hold_sec", 0.8), 0.8)), ) class SafetySupervisor: def __init__(self, cfg: SafetyConfig): self.cfg = cfg self._last_localize_ok_t = 0.0 self._last_emergency_t = -1e9 def mark_localization(self, ok: bool, now: float) -> None: if ok: self._last_localize_ok_t = float(now) def evaluate( self, now: float, pose_world: Optional[np.ndarray], loc_state: str, nav_runtime: Any, nav_cmd: Dict[str, Any], ) -> Dict[str, Any]: if not self.cfg.enabled: return { "enabled": False, "emergency": False, "reasons": [], "cmd": dict(nav_cmd), } reasons = [] motion_requested = ( abs(float(nav_cmd.get("linear_mps", 0.0))) > 1e-3 or abs(float(nav_cmd.get("angular_rps", 0.0))) > 1e-3 ) lost = str(loc_state).upper().strip() == "LOST" if lost and motion_requested: reasons.append("localization_lost") stale = False if self._last_localize_ok_t > 0.0: stale = (float(now) - float(self._last_localize_ok_t)) > float(self.cfg.stale_localization_sec) if stale and motion_requested: reasons.append("localization_stale") collision = False if pose_world is None or np.asarray(pose_world).shape != (4, 4): if motion_requested: collision = True reasons.append("pose_invalid") else: x = float(pose_world[0, 3]) y = float(pose_world[1, 3]) try: collision = bool(nav_runtime.occupied_near(x, y, float(self.cfg.stop_radius_m))) if motion_requested else False except Exception: collision = False if collision: reasons.append("collision_zone") emergency = bool((lost and motion_requested) or (stale and motion_requested) or collision) if emergency: self._last_emergency_t = float(now) hold_active = (float(now) - float(self._last_emergency_t)) <= float(self.cfg.emergency_hold_sec) cmd_out = dict(nav_cmd) if emergency or hold_active: cmd_out["linear_mps"] = 0.0 cmd_out["angular_rps"] = 0.0 cmd_out["blocked"] = True return { "enabled": True, "emergency": bool(emergency), "hold": bool(hold_active), "reasons": reasons, "cmd": cmd_out, }