353 lines
14 KiB
Python
353 lines
14 KiB
Python
"""Nav2 goal-status monitor — arrival/failure feedback for Gemini voice nav.
|
|
|
|
web_nav3's ``goto()`` is fire-and-forget: it publishes a goal and returns at
|
|
once, with no completion event. To let Gemini tell the user the truth ("we've
|
|
arrived" / "I couldn't get there") instead of guessing, we watch the Nav2
|
|
action status over rosbridge and, on a terminal status, push a
|
|
``[NAV ARRIVED]`` / ``[NAV FAILED]`` note to the Gemini child (via
|
|
``live_sub.send_state``) and release the nav arbiter so the legs free up.
|
|
|
|
Design
|
|
------
|
|
* One background daemon thread runs an asyncio loop holding a persistent
|
|
rosbridge websocket subscription to ``/navigate_to_pose/_action/status``
|
|
(``action_msgs/msg/GoalStatusArray``).
|
|
* ``arm_goal(place)`` marks a pending destination. A goal that is ACCEPTED/
|
|
EXECUTING and not already-terminal is latched as "ours"; its terminal status
|
|
fires feedback. A CANCELED of the latched goal while another goal is active
|
|
is treated as a preemption (re-latch, don't fire). Re-arming supersedes.
|
|
* A watchdog fails the goal after ``SANAD_NAV_GOAL_TIMEOUT_S`` so the arbiter
|
|
is ALWAYS released even if rosbridge/websockets is unavailable.
|
|
* ``request_cancel()`` sends a real rosbridge action CancelGoal (cancel-all) so
|
|
"stop" actually stops Nav2 — not just an arbiter release.
|
|
|
|
Everything is best-effort: the drive already succeeded by the time we arm, so
|
|
any monitor failure simply means no spoken feedback — never a crash.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
from typing import Any, List, Optional
|
|
|
|
from Project.Sanad.core.logger import get_logger
|
|
|
|
log = get_logger("goal_monitor")
|
|
|
|
# Capture the arbiter at import so releasing the legs never depends on a lazy
|
|
# import succeeding inside the fire path (a missed release locks the legs).
|
|
try:
|
|
from Project.Sanad.dashboard.routes import _arbiter as _ARBITER
|
|
except Exception: # noqa: BLE001
|
|
_ARBITER = None
|
|
|
|
_ROSBRIDGE_URL = (
|
|
os.environ.get("SANAD_ROSBRIDGE_URL")
|
|
or os.environ.get("ROSBRIDGE_URL")
|
|
or "ws://127.0.0.1:9090"
|
|
)
|
|
_STATUS_TOPIC = os.environ.get(
|
|
"SANAD_NAV_STATUS_TOPIC", "/navigate_to_pose/_action/status"
|
|
)
|
|
_STATUS_TYPE = "action_msgs/msg/GoalStatusArray"
|
|
_CANCEL_SERVICE = os.environ.get(
|
|
"SANAD_NAV_CANCEL_SERVICE", "/navigate_to_pose/_action/cancel_goal"
|
|
)
|
|
_GOAL_TIMEOUT_S = float(os.environ.get("SANAD_NAV_GOAL_TIMEOUT_S", "240"))
|
|
|
|
# action_msgs/msg/GoalStatus.status
|
|
_ACCEPTED, _EXECUTING = 1, 2
|
|
_SUCCEEDED, _CANCELED, _ABORTED = 4, 5, 6
|
|
_TERMINAL = {_SUCCEEDED, _CANCELED, _ABORTED}
|
|
|
|
|
|
def _uuid_of(status: dict) -> Any:
|
|
"""Canonical, encoding-independent key for a goal id.
|
|
|
|
rosbridge may serialize uint8[16] as a base64 string OR an int list
|
|
depending on its png/cbor config; normalize both to a tuple of ints so the
|
|
ACCEPTED frame and the terminal frame compare equal even if the bridge
|
|
switches representation mid-session."""
|
|
gid = ((status.get("goal_info") or {}).get("goal_id") or {})
|
|
u = gid.get("uuid")
|
|
if isinstance(u, str):
|
|
try:
|
|
return tuple(base64.b64decode(u))
|
|
except Exception:
|
|
return u
|
|
if isinstance(u, list):
|
|
try:
|
|
return tuple(int(x) for x in u)
|
|
except Exception:
|
|
return tuple(u)
|
|
return None
|
|
|
|
|
|
class _GoalMonitor:
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._active = False
|
|
self._place: Optional[str] = None
|
|
self._armed_at = 0.0
|
|
self._latched: Any = None
|
|
self._ignore: set = set() # uuids seen terminal — never latch
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
# ── public ───────────────────────────────────────────────
|
|
def arm(self, place: str) -> None:
|
|
spawn = False
|
|
with self._lock:
|
|
self._active = True
|
|
self._place = place
|
|
self._armed_at = time.time()
|
|
self._latched = None
|
|
self._ignore = set()
|
|
if self._thread is None:
|
|
self._thread = threading.Thread(
|
|
target=self._run, daemon=True, name="nav-goal-monitor")
|
|
t = self._thread
|
|
spawn = True
|
|
if spawn:
|
|
t.start()
|
|
log.info("armed goal monitor for '%s'", place)
|
|
|
|
def disarm(self) -> None:
|
|
with self._lock:
|
|
self._active = False
|
|
self._latched = None
|
|
self._place = None
|
|
|
|
# ── thread / loop ────────────────────────────────────────
|
|
def _run(self) -> None:
|
|
# Loop so that a new arm() arriving exactly as a session ends keeps the
|
|
# monitor alive. The decision to exit is made under the lock together
|
|
# with clearing _thread, so arm()'s "spawn only if _thread is None" can
|
|
# never strand an active goal with no live thread.
|
|
while True:
|
|
try:
|
|
asyncio.run(self._serve())
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("goal monitor loop ended: %s", exc)
|
|
with self._lock:
|
|
if not self._active:
|
|
self._thread = None
|
|
return
|
|
# still active → a fresh goal arrived; serve again
|
|
|
|
async def _serve(self) -> None:
|
|
try:
|
|
import websockets # local import — dashboard env only
|
|
except Exception as exc: # noqa: BLE001
|
|
# No ws client → honor the timeout so the arbiter is still released.
|
|
log.warning("websockets unavailable — nav feedback via timeout only: %s", exc)
|
|
while True:
|
|
await asyncio.sleep(min(5.0, _GOAL_TIMEOUT_S))
|
|
if self._check_timeout():
|
|
return
|
|
with self._lock:
|
|
if not self._active:
|
|
return
|
|
while True:
|
|
try:
|
|
await self._listen(websockets)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.debug("rosbridge listen error: %s", exc)
|
|
if self._check_timeout():
|
|
return
|
|
with self._lock:
|
|
if not self._active:
|
|
return
|
|
await asyncio.sleep(3.0)
|
|
|
|
async def _listen(self, websockets: Any) -> None:
|
|
async with websockets.connect(
|
|
_ROSBRIDGE_URL, ping_interval=20, ping_timeout=20
|
|
) as ws:
|
|
await ws.send(json.dumps({
|
|
"op": "subscribe", "topic": _STATUS_TOPIC,
|
|
"type": _STATUS_TYPE, "throttle_rate": 0, "queue_length": 1,
|
|
}))
|
|
log.info("goal monitor subscribed %s", _STATUS_TOPIC)
|
|
while True:
|
|
try:
|
|
raw = await asyncio.wait_for(ws.recv(), timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
if self._check_timeout():
|
|
return
|
|
with self._lock:
|
|
if not self._active:
|
|
return
|
|
continue
|
|
try:
|
|
data = json.loads(raw)
|
|
except Exception:
|
|
continue
|
|
if data.get("op") != "publish":
|
|
continue
|
|
msg = data.get("msg") or {}
|
|
self._on_status(msg.get("status_list") or [])
|
|
if self._check_timeout():
|
|
return
|
|
with self._lock:
|
|
if not self._active:
|
|
return
|
|
|
|
# ── status handling ──────────────────────────────────────
|
|
def _on_status(self, status_list: List[dict]) -> None:
|
|
fire: Optional[tuple] = None
|
|
with self._lock:
|
|
if not self._active:
|
|
return
|
|
states = {} # uuid -> status (this frame)
|
|
for s in status_list:
|
|
u = _uuid_of(s)
|
|
st = s.get("status")
|
|
if u is None:
|
|
continue
|
|
states[u] = st
|
|
if st in _TERMINAL:
|
|
self._ignore.add(u) # prior/other goals — never ours
|
|
# Latch a genuinely-active, non-ignored goal as ours.
|
|
if self._latched is None:
|
|
cand = [u for u, st in states.items()
|
|
if st in (_ACCEPTED, _EXECUTING) and u not in self._ignore]
|
|
if cand:
|
|
self._latched = cand[-1] # newest Nav2 entry
|
|
# Terminal for the latched goal?
|
|
if self._latched is not None:
|
|
st = states.get(self._latched)
|
|
if st in _TERMINAL:
|
|
if st == _CANCELED:
|
|
# A CANCELED latch while another goal is active is a
|
|
# preemption (a newer goal replaced ours) — re-latch
|
|
# instead of falsely reporting we stopped.
|
|
others = [u for u, s2 in states.items()
|
|
if s2 in (_ACCEPTED, _EXECUTING)
|
|
and u not in self._ignore and u != self._latched]
|
|
if others:
|
|
self._ignore.add(self._latched)
|
|
self._latched = others[-1]
|
|
st = None
|
|
if st is not None and st in _TERMINAL:
|
|
fire = (self._place, st)
|
|
self._active = False
|
|
self._latched = None
|
|
self._place = None
|
|
if fire:
|
|
self._fire(*fire)
|
|
|
|
def _check_timeout(self) -> bool:
|
|
fire: Optional[tuple] = None
|
|
with self._lock:
|
|
if self._active and (time.time() - self._armed_at) > _GOAL_TIMEOUT_S:
|
|
fire = (self._place, "timeout")
|
|
self._active = False
|
|
self._latched = None
|
|
self._place = None
|
|
if fire:
|
|
self._fire(*fire)
|
|
return True
|
|
return False
|
|
|
|
# ── feedback + arbiter release ───────────────────────────
|
|
def _fire(self, place: Optional[str], status: Any) -> None:
|
|
place = place or "the destination"
|
|
if status == _SUCCEEDED:
|
|
event = "nav_arrived"
|
|
cmd = (f"You have arrived at '{place}'. Briefly tell the user "
|
|
"you've arrived, in your normal Khaleeji style.")
|
|
elif status == _CANCELED:
|
|
event = "nav_canceled"
|
|
cmd = (f"Navigation to '{place}' was canceled. Briefly acknowledge "
|
|
"it if relevant.")
|
|
elif status == "timeout":
|
|
event = "nav_failed"
|
|
cmd = (f"You could not confirm reaching '{place}' — it is taking "
|
|
"too long or the path is blocked. Briefly tell the user you "
|
|
"couldn't get there.")
|
|
else: # ABORTED / unknown
|
|
event = "nav_failed"
|
|
cmd = (f"You could NOT reach '{place}' — the path was blocked or "
|
|
"planning failed. Briefly apologise and say you couldn't "
|
|
"get there.")
|
|
log.info("goal terminal: place=%s status=%s → %s", place, status, event)
|
|
# Free the legs first (a missed release locks them — make it loud).
|
|
arb = _ARBITER
|
|
if arb is None:
|
|
try:
|
|
from Project.Sanad.dashboard.routes import _arbiter as arb
|
|
except Exception: # noqa: BLE001
|
|
arb = None
|
|
if arb is not None:
|
|
try:
|
|
arb.release_nav()
|
|
except Exception as exc: # noqa: BLE001
|
|
log.error("release_nav failed after %s: %s", event, exc)
|
|
else:
|
|
log.error("arbiter unavailable — could not release nav after %s", event)
|
|
# Tell Gemini (via the supervisor's stdin push to the child).
|
|
try:
|
|
from Project.Sanad.main import live_sub
|
|
if live_sub is not None and hasattr(live_sub, "send_state"):
|
|
live_sub.send_state(event, cmd)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.debug("nav feedback inject failed: %s", exc)
|
|
|
|
|
|
_MON = _GoalMonitor()
|
|
|
|
|
|
def arm_goal(place: str) -> None:
|
|
"""Begin watching for the arrival/failure of a goal driving to ``place``."""
|
|
try:
|
|
_MON.arm(place)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.debug("arm_goal failed: %s", exc)
|
|
|
|
|
|
def disarm() -> None:
|
|
"""Stop watching the current goal (e.g. on an explicit cancel)."""
|
|
try:
|
|
_MON.disarm()
|
|
except Exception as exc: # noqa: BLE001
|
|
log.debug("disarm failed: %s", exc)
|
|
|
|
|
|
async def _cancel_once() -> bool:
|
|
try:
|
|
import websockets
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
try:
|
|
async with websockets.connect(_ROSBRIDGE_URL, ping_interval=None) as ws:
|
|
# Zero goal_id + zero stamp == cancel ALL goals (CancelGoal convention).
|
|
await ws.send(json.dumps({
|
|
"op": "call_service",
|
|
"service": _CANCEL_SERVICE,
|
|
"type": "action_msgs/srv/CancelGoal",
|
|
"args": {"goal_info": {"goal_id": {"uuid": [0] * 16},
|
|
"stamp": {"sec": 0, "nanosec": 0}}},
|
|
}))
|
|
try:
|
|
await asyncio.wait_for(ws.recv(), timeout=3.0)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
except Exception as exc: # noqa: BLE001
|
|
log.debug("cancel ws failed: %s", exc)
|
|
return False
|
|
|
|
|
|
def request_cancel() -> bool:
|
|
"""Send a real Nav2 action CancelGoal (cancel-all) over rosbridge so the
|
|
robot actually stops. Blocking; call via asyncio.to_thread. Best-effort."""
|
|
try:
|
|
return asyncio.run(_cancel_once())
|
|
except Exception as exc: # noqa: BLE001
|
|
log.debug("request_cancel failed: %s", exc)
|
|
return False
|