from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import numpy as np def _safe_float(v: Any, default: float) -> float: try: return float(v) except Exception: return float(default) @dataclass class MissionConfig: waypoint_tolerance_m: float = 0.55 # G1 Edu stride ~0.3 m; 0.55 m gives reliable completion @staticmethod def from_dict(d: Dict[str, Any] | None) -> "MissionConfig": src = d or {} return MissionConfig( waypoint_tolerance_m=max(0.05, _safe_float(src.get("waypoint_tolerance_m", 0.55), 0.55)), ) class WaypointMissionManager: def __init__(self, cfg: MissionConfig): self.cfg = cfg self.reset() def reset(self) -> None: self._waypoints: List[Tuple[float, float]] = [] self._idx = 0 self._active = False self._paused = False self._completed = False def set_waypoints(self, waypoints: List[Tuple[float, float]]) -> None: self._waypoints = [(float(x), float(y)) for x, y in waypoints] self._idx = 0 self._completed = len(self._waypoints) == 0 def start(self, waypoints: Optional[List[Tuple[float, float]]] = None) -> None: if waypoints is not None: self.set_waypoints(waypoints) self._active = len(self._waypoints) > 0 self._paused = False self._completed = not self._active def pause(self) -> None: if self._active: self._paused = True def resume(self) -> None: if self._active: self._paused = False def stop(self) -> None: self._active = False self._paused = False def current_goal(self) -> Optional[Tuple[float, float]]: if not self._active or self._paused or self._completed: return None if self._idx < 0 or self._idx >= len(self._waypoints): return None return self._waypoints[self._idx] def update_pose(self, x: float, y: float) -> Dict[str, Any]: goal = self.current_goal() if goal is None: return self.snapshot() dx = float(goal[0]) - float(x) dy = float(goal[1]) - float(y) dist = float(np.hypot(dx, dy)) if dist <= float(self.cfg.waypoint_tolerance_m): self._idx += 1 if self._idx >= len(self._waypoints): self._completed = True self._active = False self._paused = False return self.snapshot() def snapshot(self) -> Dict[str, Any]: goal = self.current_goal() return { "active": bool(self._active), "paused": bool(self._paused), "completed": bool(self._completed), "index": int(self._idx), "total": int(len(self._waypoints)), "goal": [float(goal[0]), float(goal[1])] if goal is not None else None, }