"""Tour engine — the NEW subsystem in P4 (Custom AI Guide Tour). Composes primitives that already ship in SanadV3 into a configurable guided tour: ordered narrated stops with BLOCKING arrival sequencing and entitlement-aware degradation. Everything else (nav goals, arrival feedback, arbitration, narration channel) is reused verbatim from the vendored engine. Two pieces: * TourStore — file-IO CRUD for tour definitions (data/tours/.json). * TourRuntime — a background-thread state machine that runs one tour, stop by stop: acquire nav → goto+arm → BLOCK on arrival (the nav arbiter is released by navigation.goal_monitor on any terminal status) → narrate via live_sub.send_state → optional gesture/expression → dwell → advance → release. Degradation (driven by license flags, never crashes): * no navigation entitlement / web_nav3 unreachable → "preset" stops (narrate in place, no drive). * no P2 (mask/multilingual) → single-language narration, no gesture/expression. * no P3 (recognition) → generic greeting (the orchestrator skips the recognition primer; handled at construct time). Kept Python-3.8 compatible. Pure threads + time.sleep (no asyncio in the runtime). """ from __future__ import annotations import json import os import re import threading import time from pathlib import Path from typing import Any, Callable, Dict, List, Optional def _now() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def _slug(name: str) -> str: s = re.sub(r"[^a-zA-Z0-9_-]+", "-", (name or "").strip().lower()).strip("-") return s or "tour" # ─────────────────────────── store ─────────────────────────── class TourStore(object): """CRUD for tour definitions. A tour = {id, name, stops:[Stop], created}. Stop = {place, narration, expression, gesture, dwell_sec, greet}.""" def __init__(self, root: Optional[str] = None): if not root: root = os.environ.get("SANAD_TOURS_DIR", "") if not root: try: from Project.Sanad.config import BASE_DIR root = str(Path(BASE_DIR) / "data" / "tours") except Exception: root = str(Path.cwd() / "data" / "tours") self.root = Path(root) self._lock = threading.RLock() try: self.root.mkdir(parents=True, exist_ok=True) except Exception: pass def _path(self, tid: str) -> Path: return self.root / ("%s.json" % tid) def _write(self, tid: str, data: Dict[str, Any]) -> None: tmp = self.root / ("%s.json.tmp" % tid) tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(self._path(tid)) def list(self) -> List[Dict[str, Any]]: out = [] with self._lock: for p in sorted(self.root.glob("*.json")): try: out.append(json.loads(p.read_text(encoding="utf-8"))) except Exception: continue return out def get(self, tid: str) -> Optional[Dict[str, Any]]: with self._lock: p = self._path(tid) if not p.exists(): return None try: return json.loads(p.read_text(encoding="utf-8")) except Exception: return None def _norm_stops(self, stops: Any) -> List[Dict[str, Any]]: out = [] for s in (stops or []): if not isinstance(s, dict): continue out.append({ "place": str(s.get("place", "") or ""), "narration": str(s.get("narration", "") or ""), "expression": str(s.get("expression", "") or ""), "gesture": str(s.get("gesture", "") or ""), "greet": bool(s.get("greet", False)), "dwell_sec": float(s.get("dwell_sec", 8) or 8), }) return out def save(self, name: str, stops: Any, tid: Optional[str] = None) -> Dict[str, Any]: with self._lock: if not tid: base = _slug(name) tid = base n = 1 while self._path(tid).exists(): n += 1 tid = "%s-%d" % (base, n) existing = self.get(tid) or {} data = { "id": tid, "name": name or tid, "stops": self._norm_stops(stops), "created": existing.get("created", _now()), "updated": _now(), } self._write(tid, data) return data def delete(self, tid: str) -> bool: with self._lock: p = self._path(tid) if not p.exists(): return False try: p.unlink() return True except Exception: return False # ─────────────────────────── runtime ─────────────────────────── class TourRuntime(object): """Runs ONE tour at a time in a daemon thread. All nav primitives are injected (loosely coupled) so a missing subsystem simply degrades.""" def __init__(self, store, nav_client=None, goal_monitor=None, arbiter=None, live_sub=None, mask_face=None, log=None, has_nav: bool = False, has_mask: bool = False): self.store = store self.nav = nav_client self.gm = goal_monitor # navigation.goal_monitor module (arm_goal/request_cancel) self.arb = arbiter # dashboard.routes._arbiter module (acquire/nav_active/release) self.live_sub = live_sub self.mask_face = mask_face self.log = log self.has_nav = bool(has_nav and nav_client is not None) self.has_mask = bool(has_mask and mask_face is not None) self.timeout = float(os.environ.get("SANAD_NAV_GOAL_TIMEOUT_S", "240")) self._lock = threading.Lock() self._thread = None # type: Optional[threading.Thread] self._state = "idle" # idle|running|paused|stopping|done|error self._tour_id = "" self._tour_name = "" self._idx = -1 self._place = "" self._total = 0 self._started = "" self._msg = "" self._skip = threading.Event() # -- helpers -- def _li(self, msg, *a): if self.log: try: self.log.info(msg, *a) except Exception: pass def _narrate(self, event, text): if self.live_sub is not None and hasattr(self.live_sub, "send_state") and text: try: self.live_sub.send_state(event, text) except Exception: self._li("tour: send_state failed") def _ensure_live(self): ls = self.live_sub if ls is None: return try: if hasattr(ls, "is_running") and not ls.is_running() and hasattr(ls, "start"): ls.start() except Exception: self._li("tour: could not start live session") # -- public control -- def status(self) -> Dict[str, Any]: with self._lock: return { "state": self._state, "tour_id": self._tour_id, "tour_name": self._tour_name, "stop_index": self._idx, "stop_place": self._place, "total_stops": self._total, "started_at": self._started, "message": self._msg, "capabilities": {"navigation": self.has_nav, "mask": self.has_mask}, } def start(self, tour_id: str) -> Dict[str, Any]: with self._lock: if self._state in ("running", "paused", "stopping"): return {"ok": False, "error": "a tour is %s (%s) — stop it and wait first" % (self._state, self._tour_id)} # A stop() that just set state=idle may still have a worker draining # its nav cancel — never spawn a 2nd concurrent tour thread. if self._thread is not None and self._thread.is_alive(): return {"ok": False, "error": "previous tour is still finishing — retry shortly"} tour = self.store.get(tour_id) if tour is None: return {"ok": False, "error": "no tour '%s'" % tour_id} self._state = "running" self._tour_id = tour_id self._tour_name = tour.get("name", tour_id) self._total = len(tour.get("stops") or []) self._idx = -1 self._place = "" self._started = _now() self._msg = "" self._skip.clear() self._thread = threading.Thread(target=self._run, args=(tour,), daemon=True, name="tour") self._thread.start() return {"ok": True, "started": tour_id, "stops": self._total, "degraded": (not self.has_nav)} def pause(self) -> Dict[str, Any]: with self._lock: if self._state == "running": self._state = "paused" return self.status() def resume(self) -> Dict[str, Any]: with self._lock: if self._state == "paused": self._state = "running" return self.status() def skip(self) -> Dict[str, Any]: self._skip.set() return {"ok": True, "skipping": self._idx} def stop(self) -> Dict[str, Any]: with self._lock: if self._state in ("idle", "done", "error"): return {"ok": True, "state": self._state} self._state = "stopping" # cancel any in-flight nav goal + disarm the monitor + free the arbiter try: if self.gm is not None and hasattr(self.gm, "request_cancel"): self.gm.request_cancel() except Exception: pass self._disarm_gm() self._release_nav() return {"ok": True, "state": "stopping"} # -- nav helpers -- def _release_nav(self): try: if self.arb is not None and self.arb.nav_active(): self.arb.release_nav() except Exception: pass def _disarm_gm(self): """Stop goal_monitor watching. MUST fire on any armed-but-no-goal exit, else its 240s watchdog later pushes a bogus 'couldn't get there' to the child.""" try: if self.gm is not None and hasattr(self.gm, "disarm"): self.gm.disarm() except Exception: pass def _drive_to(self, place: str) -> str: """Drive to `place` and BLOCK until arrival/terminal. Returns a short outcome string. Reuses goal_monitor's arbiter-release as the arrival signal.""" if not (self.has_nav and place): return "preset" try: if not self.arb.acquire_nav(): return "nav_busy" # manual loco holds the legs except Exception: return "nav_unavailable" try: if self.gm is not None and hasattr(self.gm, "arm_goal"): self.gm.arm_goal(place) # watch for arrival; releases arbiter on terminal res = self.nav.goto(place) # fire-and-forget Nav2 goal if not (isinstance(res, dict) and res.get("ok")): self._disarm_gm(); self._release_nav() return "goto_failed" except Exception: self._disarm_gm(); self._release_nav() return "goto_error" # BLOCK: goal_monitor releases the arbiter on the terminal status. waited = 0.0 while waited < self.timeout: if self._state == "stopping": self._disarm_gm() # goal_monitor still armed on our behalf return "stopped" try: if not self.arb.nav_active(): return "arrived" # terminal reached (goal_monitor freed nav + disarmed) except Exception: return "arrived" time.sleep(1.0) waited += 1.0 # watchdog — never hang a tour try: if self.gm is not None and hasattr(self.gm, "request_cancel"): self.gm.request_cancel() except Exception: pass self._disarm_gm(); self._release_nav() return "timeout" # -- the run loop -- def _run(self, tour: Dict[str, Any]): self._ensure_live() stops = tour.get("stops") or [] try: for i, stop in enumerate(stops): # pause / stop gates while True: with self._lock: st = self._state if st == "stopping": raise _Stopped() if st != "paused": break time.sleep(0.5) self._skip.clear() with self._lock: self._idx = i self._place = stop.get("place", "") self._li("tour '%s' stop %d/%d → %s", self._tour_id, i + 1, len(stops), self._place or "(in place)") outcome = self._drive_to(stop.get("place", "")) if outcome not in ("arrived", "preset"): self._li("tour stop %d (%s): NOT reached (%s) — narrating in place", i + 1, self._place or "?", outcome) if self._state == "stopping": raise _Stopped() # narrate this stop (the child speaks it in the configured language; # multilingual only if P2 entitled — handled by the persona). narration = stop.get("narration") or "" if narration: self._narrate("tour_stop", narration) # gesture / expression on the mask (P2 only). if self.has_mask and self.mask_face is not None: expr = stop.get("expression") if expr: try: self.mask_face.react(str(expr)) except Exception: self._li("tour: mask react failed") # dwell (interruptible by skip/stop) dwell = float(stop.get("dwell_sec", 8) or 8) t = 0.0 while t < dwell: if self._state == "stopping": raise _Stopped() if self._skip.is_set(): break time.sleep(0.5) t += 0.5 with self._lock: self._state = "done" self._msg = "tour complete" self._narrate("tour_done", "The tour is complete. Thank the guest warmly.") except _Stopped: with self._lock: self._state = "idle" self._msg = "stopped" self._release_nav() except Exception as exc: with self._lock: self._state = "error" self._msg = str(exc) self._release_nav() self._li("tour run crashed: %s", exc) class _Stopped(Exception): pass