376 lines
16 KiB
Python
376 lines
16 KiB
Python
"""Thin HTTP client to the web_nav3 Nav2 stack for SanadV3.
|
|
|
|
web_nav3 exposes a FastAPI surface (default http://127.0.0.1:8765) that wraps
|
|
a ROS2/Nav2 + rosbridge bringup. SanadV3 talks to it over plain HTTP so it
|
|
never has to import ROS2 itself.
|
|
|
|
Design contract: NO method here ever raises into the caller. Every public
|
|
method returns either a clean dict {"ok": bool, "error": str|None, ...} or a
|
|
NavStatus. If web_nav3 is unreachable/down, callers (dashboard, voice) get a
|
|
graceful failure result and keep running.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger("sanadv3.navigation")
|
|
|
|
|
|
@dataclass
|
|
class NavStatus:
|
|
"""Snapshot of web_nav3 health (from GET /api/status)."""
|
|
|
|
bringup_alive: bool = False
|
|
rosbridge_alive: bool = False
|
|
reachable: bool = False
|
|
error: Optional[str] = None
|
|
log_tail: str = ""
|
|
# What the single bringup is doing right now (from /api/status):
|
|
# mode 1 fresh / 2 continue / 3 localize / None idle.
|
|
mode: Optional[int] = None
|
|
active_map: Optional[str] = None
|
|
mode_label: str = "IDLE"
|
|
mapping: bool = False
|
|
localizing: bool = False
|
|
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"bringup_alive": self.bringup_alive,
|
|
"rosbridge_alive": self.rosbridge_alive,
|
|
"reachable": self.reachable,
|
|
"error": self.error,
|
|
"log_tail": self.log_tail,
|
|
"mode": self.mode,
|
|
"active_map": self.active_map,
|
|
"mode_label": self.mode_label,
|
|
"mapping": self.mapping,
|
|
"localizing": self.localizing,
|
|
}
|
|
|
|
|
|
class WebNav3Client:
|
|
"""Loosely-coupled HTTP client to web_nav3.
|
|
|
|
Args:
|
|
base_url: web_nav3 FastAPI base, e.g. "http://127.0.0.1:8765".
|
|
robot: robot name, sent as X-Robot-Name header on every request.
|
|
timeout: default per-request timeout (seconds).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str = "http://127.0.0.1:8765",
|
|
robot: str = "sanad",
|
|
timeout: float = 3.0,
|
|
) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.robot = robot
|
|
self.timeout = float(timeout)
|
|
self._session = requests.Session()
|
|
# requests.Session is not guaranteed thread-safe, but route handlers
|
|
# call us from asyncio.to_thread workers (multiple tabs / overlapping
|
|
# status-poll + goto). Serialize Session access with a lock.
|
|
self._session_lock = threading.Lock()
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# internals
|
|
# ------------------------------------------------------------------ #
|
|
def _headers(self) -> Dict[str, str]:
|
|
return {"X-Robot-Name": self.robot}
|
|
|
|
def _url(self, path: str) -> str:
|
|
if not path.startswith("/"):
|
|
path = "/" + path
|
|
return self.base_url + path
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
params: Optional[Dict[str, Any]] = None,
|
|
json_body: Optional[Dict[str, Any]] = None,
|
|
timeout: Optional[float] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Perform a request, always returning {"ok":bool, ...}; never raises."""
|
|
url = self._url(path)
|
|
try:
|
|
with self._session_lock:
|
|
resp = self._session.request(
|
|
method,
|
|
url,
|
|
params=params,
|
|
json=json_body,
|
|
headers=self._headers(),
|
|
timeout=timeout if timeout is not None else self.timeout,
|
|
)
|
|
except requests.RequestException as exc:
|
|
logger.debug("web_nav3 %s %s failed: %s", method, path, exc)
|
|
return {"ok": False, "error": f"{type(exc).__name__}: {exc}"}
|
|
|
|
# Try to decode a JSON body either way (errors often carry detail).
|
|
body: Any = None
|
|
try:
|
|
body = resp.json()
|
|
except ValueError:
|
|
body = resp.text
|
|
|
|
if not resp.ok:
|
|
detail = body
|
|
if isinstance(body, dict):
|
|
detail = body.get("detail") or body.get("error") or body
|
|
return {
|
|
"ok": False,
|
|
"error": f"HTTP {resp.status_code}: {detail}",
|
|
"status_code": resp.status_code,
|
|
"data": body,
|
|
}
|
|
|
|
return {"ok": True, "error": None, "data": body, "status_code": resp.status_code}
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# health
|
|
# ------------------------------------------------------------------ #
|
|
def reachable(self) -> bool:
|
|
"""Quick liveness check: short-timeout GET /api/status.
|
|
|
|
Reachable means web_nav3 ANSWERED — any HTTP response (including a 5xx
|
|
from a running-but-erroring backend) counts. Only a transport failure
|
|
(connection refused / timeout) means unreachable.
|
|
"""
|
|
try:
|
|
with self._session_lock:
|
|
self._session.get(
|
|
self._url("/api/status"),
|
|
headers=self._headers(),
|
|
timeout=min(self.timeout, 1.5),
|
|
)
|
|
return True
|
|
except requests.RequestException:
|
|
return False
|
|
|
|
def status(self) -> NavStatus:
|
|
"""Poll GET /api/status; returns a NavStatus (never raises)."""
|
|
res = self._request("GET", "/api/status")
|
|
if not res["ok"]:
|
|
# A 'status_code' key means web_nav3 answered with an HTTP error
|
|
# (e.g. 500) — it IS reachable, just erroring. Only a transport
|
|
# failure (no status_code) is genuinely unreachable.
|
|
answered = "status_code" in res
|
|
log_tail = ""
|
|
body = res.get("data")
|
|
if answered and isinstance(body, dict):
|
|
log_tail = str(body.get("log_tail", "") or "")
|
|
return NavStatus(reachable=answered, error=res["error"], log_tail=log_tail)
|
|
data = res.get("data") or {}
|
|
if not isinstance(data, dict):
|
|
return NavStatus(reachable=True, error="unexpected status payload")
|
|
return NavStatus(
|
|
bringup_alive=bool(data.get("bringup_alive", False)),
|
|
rosbridge_alive=bool(data.get("rosbridge_alive", False)),
|
|
reachable=True,
|
|
error=None,
|
|
log_tail=str(data.get("log_tail", "") or ""),
|
|
mode=data.get("mode"),
|
|
active_map=data.get("active_map"),
|
|
mode_label=str(data.get("mode_label", "IDLE") or "IDLE"),
|
|
mapping=bool(data.get("mapping", False)),
|
|
localizing=bool(data.get("localizing", False)),
|
|
)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# bringup lifecycle
|
|
# ------------------------------------------------------------------ #
|
|
def start(self, mode: int, db_path: Optional[str] = None) -> Dict[str, Any]:
|
|
"""POST /api/start. mode 1=fresh SLAM, 2/3=localization (need db_path)."""
|
|
body: Dict[str, Any] = {"mode": int(mode), "robot": self.robot}
|
|
if db_path is not None:
|
|
body["db_path"] = db_path
|
|
return self._request("POST", "/api/start", json_body=body, timeout=15.0)
|
|
|
|
def stop(self) -> Dict[str, Any]:
|
|
"""POST /api/stop — SIGINT the running bringup process group."""
|
|
return self._request("POST", "/api/stop", timeout=15.0)
|
|
|
|
def load_map(self, db_path: str) -> Dict[str, Any]:
|
|
"""POST /api/load_map — VIEW a saved map: stop any running bringup, then
|
|
start LOCALIZE-only (mode 3) against db_path. Use this for 'Load & View'
|
|
so it actually switches to the chosen map instead of attaching to a
|
|
mapping session already in progress."""
|
|
return self._request("POST", "/api/load_map",
|
|
json_body={"db_path": db_path}, timeout=25.0)
|
|
|
|
def shutdown(self) -> Dict[str, Any]:
|
|
"""POST /api/shutdown — kill ALL registered web_nav3 processes."""
|
|
return self._request("POST", "/api/shutdown", timeout=15.0)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# places / navigation
|
|
# ------------------------------------------------------------------ #
|
|
def list_places(self, map_name: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""GET /api/places. Per-MAP when map_name given (each map keeps its own
|
|
places); else the legacy per-robot store. Returns a list (empty on error)."""
|
|
params: Dict[str, Any] = {"robot": self.robot}
|
|
if map_name:
|
|
params["map"] = map_name
|
|
res = self._request("GET", "/api/places", params=params)
|
|
if not res["ok"]:
|
|
logger.debug("list_places failed: %s", res["error"])
|
|
return []
|
|
data = res.get("data")
|
|
return data if isinstance(data, list) else []
|
|
|
|
def goto(self, name: str) -> Dict[str, Any]:
|
|
"""POST /api/places/goto?name=... — THE primary nav command.
|
|
|
|
Navigate to a saved place by name on the currently-running bringup.
|
|
"""
|
|
# Backend runs `timeout 5 ros2 topic pub --once` inside subprocess.run(
|
|
# timeout=12); on a Jetson the rclpy cold-start routinely exceeds the
|
|
# default 3s read-timeout, so override to match the backend's cap.
|
|
res = self._request("POST", "/api/places/goto", params={"name": name}, timeout=12.0)
|
|
if res["ok"]:
|
|
return {"ok": True, "error": None, "name": name, "data": res.get("data")}
|
|
return {"ok": False, "error": res["error"], "name": name}
|
|
|
|
def cancel(self) -> Dict[str, Any]:
|
|
"""Report that server-side goal-cancel is NOT available.
|
|
|
|
web_nav3 has no explicit goal-cancel HTTP endpoint, so a true
|
|
per-goal cancel must be performed client-side (browser) over rosbridge
|
|
by sending a CancelGoal to the /navigate_to_pose action server. That
|
|
keeps Nav2/SLAM/bridges alive — only the in-flight goal aborts.
|
|
|
|
Server-side there is nothing safe to do here (we must NOT call stop(),
|
|
which SIGINTs the whole bringup process group). Previously this returned
|
|
ok:True, which made the dashboard's rosbridge-down fallback toast
|
|
"cancel sent" while the robot kept driving. Return ok:False with an
|
|
explicit reason so callers never mistake this for a real cancel — the
|
|
only server-side way to halt motion is stop() (tears down bringup).
|
|
"""
|
|
return {
|
|
"ok": False,
|
|
"error": "cancel must be done client-side via rosbridge",
|
|
"note": "no server-side goal-cancel; use stop() to tear down bringup",
|
|
}
|
|
|
|
def save_here(self, name: str) -> Dict[str, Any]:
|
|
"""Save the robot's CURRENT pose as a named place.
|
|
|
|
POST /api/places/save_here?name=... — web_nav3 reads the current TF
|
|
pose (requires bringup running; backend takes ``name`` as a query arg).
|
|
|
|
CONFIG COUPLING: backend save_here writes the LEGACY places store
|
|
(~/marcus_nav2_test/places.json) regardless of robot, while save_at /
|
|
list_places are robot-scoped and only resolve to that legacy file when
|
|
web_nav3's robot_config.yaml ``robot_name`` equals this client's robot
|
|
(``self.robot``). For all three to share one store, web_nav3's
|
|
robot_name MUST match SANAD_ROBOT_NAME (default 'sanad'); if they drift,
|
|
'Save here' and click-to-add places silently target different files.
|
|
"""
|
|
# Same ros2-pub cold-start as goto(): override the 3s default so a slow
|
|
# (but successful) publish isn't reported as a failure.
|
|
return self._request(
|
|
"POST",
|
|
"/api/places/save_here",
|
|
params={"robot": self.robot, "name": name},
|
|
timeout=12.0,
|
|
)
|
|
|
|
def save_at(self, name: str, x: float, y: float, yaw: float,
|
|
map_name: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Save a named place at an explicit (x, y, yaw) map pose.
|
|
|
|
POST /api/places/save_at — writes straight to places.json, so it works
|
|
even with no bringup running (used by click-to-add-place on the map).
|
|
Per-MAP when map_name given. Re-saving an existing name MOVES the place.
|
|
"""
|
|
params: Dict[str, Any] = {"robot": self.robot}
|
|
if map_name:
|
|
params["map"] = map_name
|
|
return self._request(
|
|
"POST",
|
|
"/api/places/save_at",
|
|
params=params,
|
|
json_body={
|
|
"name": name,
|
|
"x": float(x),
|
|
"y": float(y),
|
|
"yaw": float(yaw),
|
|
},
|
|
)
|
|
|
|
def delete_place(self, name: str, map_name: Optional[str] = None) -> Dict[str, Any]:
|
|
"""DELETE /api/places?name=... — remove a saved place (per-map)."""
|
|
params: Dict[str, Any] = {"robot": self.robot, "name": name}
|
|
if map_name:
|
|
params["map"] = map_name
|
|
return self._request("DELETE", "/api/places", params=params)
|
|
|
|
def rename_place(self, old: str, new: str,
|
|
map_name: Optional[str] = None) -> Dict[str, Any]:
|
|
"""POST /api/places/rename — rename a saved place (per-map)."""
|
|
params: Dict[str, Any] = {"robot": self.robot}
|
|
if map_name:
|
|
params["map"] = map_name
|
|
return self._request(
|
|
"POST", "/api/places/rename", params=params,
|
|
json_body={"old": old, "new": new},
|
|
)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# map editor — per-map occupancy edit overlay
|
|
# ------------------------------------------------------------------ #
|
|
def get_map_edits(self, map_name: str) -> Dict[str, Any]:
|
|
"""GET /api/map_edits?map=... — the saved edit overlay (erased points +
|
|
painted walls) for a map. Returns {ok, edits:[[wx,wy,v],...]}."""
|
|
res = self._request("GET", "/api/map_edits",
|
|
params={"robot": self.robot, "map": map_name})
|
|
if not res["ok"]:
|
|
return {"ok": False, "error": res["error"], "edits": []}
|
|
data = res.get("data") or {}
|
|
return {"ok": True, "edits": data.get("edits", []) if isinstance(data, dict) else []}
|
|
|
|
def save_map_edits(self, map_name: str, edits: list) -> Dict[str, Any]:
|
|
"""POST /api/map_edits?map=... — persist the edit overlay for a map.
|
|
edits = list of [world_x, world_y, value] (0=free/erase, 100=wall)."""
|
|
return self._request("POST", "/api/map_edits",
|
|
params={"robot": self.robot, "map": map_name},
|
|
json_body={"edits": edits})
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# maps / missions
|
|
# ------------------------------------------------------------------ #
|
|
def list_maps(self) -> List[Dict[str, Any]]:
|
|
"""GET /api/maps. Returns a list (empty on any error)."""
|
|
res = self._request("GET", "/api/maps", params={"robot": self.robot})
|
|
if not res["ok"]:
|
|
logger.debug("list_maps failed: %s", res["error"])
|
|
return []
|
|
data = res.get("data")
|
|
return data if isinstance(data, list) else []
|
|
|
|
def list_missions(self) -> List[Dict[str, Any]]:
|
|
"""GET /api/missions. Returns a list (empty on any error)."""
|
|
res = self._request("GET", "/api/missions", params={"robot": self.robot})
|
|
if not res["ok"]:
|
|
logger.debug("list_missions failed: %s", res["error"])
|
|
return []
|
|
data = res.get("data")
|
|
return data if isinstance(data, list) else []
|
|
|
|
def run_mission(self, mission_id: Any) -> Dict[str, Any]:
|
|
"""POST /api/missions/run — start a saved mission by id."""
|
|
return self._request(
|
|
"POST",
|
|
"/api/missions/run",
|
|
params={"id": mission_id},
|
|
json_body={"id": mission_id, "robot": self.robot},
|
|
timeout=15.0,
|
|
)
|