#!/usr/bin/env python3 """Sanad — unified robot assistant entry point. Starts all subsystems and the FastAPI dashboard. python main.py # default port 8000 python main.py --port 8080 # custom port """ from __future__ import annotations import argparse import importlib import os import sys import types from pathlib import Path # ───────────────────────────────────────────────────────────────────────────── # Layout detection — support BOTH: # 1. Dev layout: /Project/Sanad/main.py (imports use Project.Sanad.*) # 2. Deployed layout: /home/unitree/Sanad/main.py (no Project/ wrapper) # # In the deployed case we synthesize a `Project` namespace package and alias # `Project.Sanad` → the local `Sanad` package, so every `from Project.Sanad.X # import Y` keeps working without rewriting any other file. # ───────────────────────────────────────────────────────────────────────────── _THIS_DIR = Path(__file__).resolve().parent # .../Sanad _PARENT = _THIS_DIR.parent # .../Project OR /home/unitree if _PARENT.name == "Project": # Dev layout — add the directory containing Project/ _ROOT = _PARENT.parent if str(_ROOT) not in sys.path: sys.path.insert(0, str(_ROOT)) # This codebase imports itself as `Project.Sanad.*`. If this folder is a copy # under a different name (e.g. Sanadv3), alias Project.Sanad → THIS package so # it imports its OWN modules, not the sibling Project/Sanad. (The original # Sanad folder is unaffected — this only triggers for renamed copies.) if _THIS_DIR.name != "Sanad" and "Project.Sanad" not in sys.modules: _self_pkg = importlib.import_module(f"Project.{_THIS_DIR.name}") sys.modules["Project.Sanad"] = _self_pkg sys.modules["Project"].Sanad = _self_pkg # type: ignore[attr-defined] else: # Deployed layout — create a virtual Project package and alias if str(_PARENT) not in sys.path: sys.path.insert(0, str(_PARENT)) if "Project" not in sys.modules: _proj = types.ModuleType("Project") _proj.__path__ = [] # mark as namespace package sys.modules["Project"] = _proj if "Project.Sanad" not in sys.modules: # Import the local Sanad package as a top-level module first _sanad = importlib.import_module(_THIS_DIR.name) sys.modules["Project.Sanad"] = _sanad sys.modules["Project"].Sanad = _sanad # type: ignore[attr-defined] # When main.py runs as a script (`python3 main.py`), Python loads it as the # `__main__` module — NOT as `Project.Sanad.main`. Route handlers later do # `from Project.Sanad.main import arm` etc; without the alias below, Python # would re-execute this file from scratch under a different module name, # creating a SECOND set of subsystem instances (uninitialised). Every # `subsystem not available` / `No LowState` symptom traces back to this. # The alias ensures both names point at the exact same module object. if __name__ == "__main__": sys.modules["Project.Sanad.main"] = sys.modules["__main__"] # asyncio compat shim — backfills asyncio.to_thread for Python 3.8. # MUST be imported before any other Sanad module that uses asyncio.to_thread. from Project.Sanad.core import asyncio_compat # noqa: F401 from Project.Sanad.config import ( DASHBOARD_HOST, DASHBOARD_PORT, DASHBOARD_INTERFACE, DDS_NETWORK_INTERFACE, ) from Project.Sanad.core.logger import get_logger log = get_logger("main") def _safe_import(label: str, importer): """Import a module by callable, returning None if it fails.""" try: return importer() except Exception: log.exception("Failed to import %s — that subsystem will be unavailable", label) return None def _safe_construct(name: str, factory): """Construct a subsystem, log + return None on failure.""" if factory is None: return None try: return factory() except Exception: log.exception("Failed to construct %s — that subsystem will be unavailable", name) return None # ── isolated imports — one bad module never blocks the others ── Brain = _safe_import("Brain", lambda: __import__("Project.Sanad.core.brain", fromlist=["Brain"]).Brain) ArmController = _safe_import("ArmController", lambda: __import__("Project.Sanad.motion.arm_controller", fromlist=["ArmController"]).ArmController) MacroPlayer = _safe_import("MacroPlayer", lambda: __import__("Project.Sanad.motion.macro_player", fromlist=["MacroPlayer"]).MacroPlayer) MacroRecorder = _safe_import("MacroRecorder", lambda: __import__("Project.Sanad.motion.macro_recorder", fromlist=["MacroRecorder"]).MacroRecorder) TeachingSession = _safe_import("TeachingSession", lambda: __import__("Project.Sanad.motion.teaching", fromlist=["TeachingSession"]).TeachingSession) AudioManager = _safe_import("AudioManager", lambda: __import__("Project.Sanad.voice.audio_manager", fromlist=["AudioManager"]).AudioManager) LocalTTSEngine = _safe_import("LocalTTSEngine", lambda: __import__("Project.Sanad.voice.local_tts", fromlist=["LocalTTSEngine"]).LocalTTSEngine) WakePhraseManager = _safe_import("WakePhraseManager", lambda: __import__("Project.Sanad.voice.wake_phrase_manager", fromlist=["WakePhraseManager"]).WakePhraseManager) LiveVoiceLoop = _safe_import("LiveVoiceLoop", lambda: __import__("Project.Sanad.voice.live_voice_loop", fromlist=["LiveVoiceLoop"]).LiveVoiceLoop) TypedReplayEngine = _safe_import("TypedReplayEngine", lambda: __import__("Project.Sanad.voice.typed_replay", fromlist=["TypedReplayEngine"]).TypedReplayEngine) GeminiVoiceClient = _safe_import("GeminiVoiceClient", lambda: __import__("Project.Sanad.gemini.client", fromlist=["GeminiVoiceClient"]).GeminiVoiceClient) GeminiSubprocess = _safe_import("GeminiSubprocess", lambda: __import__("Project.Sanad.gemini.subprocess", fromlist=["GeminiSubprocess"]).GeminiSubprocess) LocalSubprocess = _safe_import("LocalSubprocess", lambda: __import__("Project.Sanad.local.subprocess", fromlist=["LocalSubprocess"]).LocalSubprocess) CameraDaemon = _safe_import("CameraDaemon", lambda: __import__("Project.Sanad.vision.camera", fromlist=["CameraDaemon"]).CameraDaemon) FaceGallery = _safe_import("FaceGallery", lambda: __import__("Project.Sanad.vision.face_gallery", fromlist=["FaceGallery"]).FaceGallery) ZoneGallery = _safe_import("ZoneGallery", lambda: __import__("Project.Sanad.vision.zone_gallery", fromlist=["ZoneGallery"]).ZoneGallery) LocoController = _safe_import("LocoController", lambda: __import__("Project.Sanad.G1_Controller.loco_controller", fromlist=["LocoController"]).LocoController) MovementDispatcher = _safe_import("MovementDispatcher", lambda: __import__("Project.Sanad.voice.movement_dispatch", fromlist=["MovementDispatcher"]).MovementDispatcher) FaceController = _safe_import("FaceController", lambda: __import__("Project.Sanad.face.mask_face", fromlist=["FaceController"]).FaceController) WebNav3Client = _safe_import("WebNav3Client", lambda: __import__("Project.Sanad.navigation", fromlist=["WebNav3Client"]).WebNav3Client) # ── global instances (imported by route modules) ── brain = _safe_construct("brain", Brain) if Brain else None arm = _safe_construct("arm", ArmController) audio_mgr = _safe_construct("audio_mgr", AudioManager) # The voice_client speaks TYPED text (typed-replay + /api/voice/generate), so it # uses the multilingual verbatim TTS prompt — NOT the Khaleeji persona, which # forced every language to Arabic. (The live conversation uses live_sub, not # this client; live_voice only reads its connection flag.) def _build_voice_client(): from Project.Sanad.gemini.client import TTS_SYSTEM_PROMPT return GeminiVoiceClient(system_prompt=TTS_SYSTEM_PROMPT) voice_client = _safe_construct("voice_client", _build_voice_client if GeminiVoiceClient else None) local_tts = _safe_construct("local_tts", LocalTTSEngine) wake_mgr = _safe_construct("wake_mgr", WakePhraseManager) macro_rec = _safe_construct("macro_rec", (lambda: MacroRecorder(arm)) if (MacroRecorder and arm) else None) macro_play = _safe_construct("macro_play", (lambda: MacroPlayer(audio_mgr, arm)) if (MacroPlayer and arm) else None) teacher = _safe_construct("teacher", (lambda: TeachingSession(arm)) if (TeachingSession and arm) else None) live_voice = _safe_construct("live_voice", (lambda: LiveVoiceLoop(voice_client, arm, wake_mgr, audio_mgr)) if (LiveVoiceLoop and voice_client and arm and wake_mgr and audio_mgr) else None) # Which voice supervisor to mount. SANAD_VOICE_BRAIN chooses the brain # that runs INSIDE the subprocess (see voice/sanad_voice.py); the same # env var picks WHICH supervisor here manages that subprocess so its # log-line parser matches the brain's emit format. _brain_choice = os.environ.get("SANAD_VOICE_BRAIN", "gemini").strip().lower() if _brain_choice == "local" and LocalSubprocess is not None: live_sub = _safe_construct("live_sub", LocalSubprocess) else: live_sub = _safe_construct("live_sub", GeminiSubprocess) typed_replay = _safe_construct("typed_replay", (lambda: TypedReplayEngine(voice_client, audio_mgr)) if (TypedReplayEngine and voice_client and audio_mgr) else None) # ── LED face mask (Mask project) — BLE animated face, own asyncio loop ─────── # Constructs idle (no BLE); the dashboard "Mask Face" tab connects on demand. # Unavailable (None) if the Mask lib / bleak / Pillow aren't importable. mask_face = _safe_construct("mask_face", FaceController) # ── Locomotion controller (N2) — manual dashboard locomotion ──────────────── # Reuses the arm controller's single ChannelFactoryInitialize (one DDS init per # process) — it does NOT init DDS itself. Disarmed every boot. See # G1_Controller/loco_controller.py and dashboard/routes/controller.py. loco_controller = _safe_construct( "loco_controller", (lambda: LocoController(arm)) if (LocoController and arm) else None) # Arm ⇄ locomotion mutual exclusion: the arm must NEVER run a replay / SDK # action / gesture while the robot may be walking. `movement_active` is True for # the MANUAL gate (armed/teleop) AND for ~1.5s after any move/step — so it also # covers Phase-3 Gemini-driven moves (which call loco.move/step directly). # Checked at every arm playback chokepoint (replay_file / _execute), so it blocks # voice/Gemini-triggered gestures too, not just the dashboard. if arm is not None and loco_controller is not None: try: if hasattr(arm, "set_motion_block"): arm.set_motion_block(loco_controller.movement_active) log.info("Arm motion-block wired to locomotion movement_active") except Exception: log.exception("Could not wire arm motion-block") # The voice→arm path (live_voice_loop) drives the SEPARATE singleton # motion.sanad_arm_controller.ARM, not the `arm` instance above. Wire the SAME # locomotion interlock onto it so a spoken gesture can't move the arms while # the robot is (or just was) walking — otherwise the motion-block above would # only cover the dashboard/Gemini-replay path, not voice triggers. if loco_controller is not None: try: from Project.Sanad.motion.sanad_arm_controller import ARM as _sanad_arm if hasattr(_sanad_arm, "set_motion_block"): _sanad_arm.set_motion_block(loco_controller.movement_active) log.info("Voice arm (sanad_arm) motion-block wired to locomotion movement_active") except Exception: log.exception("Could not wire sanad_arm motion-block") # ── Gemini voice → movement dispatcher (N2 Phase 3) ───────────────────────── # Reads Gemini's spoken (BOT) transcript via the live supervisor's bot-callback # and drives loco_controller on a confirmation-phrase match (Marcus pattern). # Gated on recognition_state.movement_enabled (the "Enable Gemini movement" # toggle) — SEPARATE from the manual arm flag. Inert until that flag is on. movement_dispatch = None if MovementDispatcher and loco_controller is not None: try: from Project.Sanad.config import BASE_DIR as _BD2, MOTIONS_DIR as _MD movement_dispatch = _safe_construct( "movement_dispatch", lambda: MovementDispatcher( loco_controller, _MD / "instruction.json", _BD2 / "data" / ".recognition_state.json")) if movement_dispatch is not None: movement_dispatch.start() if live_sub is not None and hasattr(live_sub, "register_bot_callback"): live_sub.register_bot_callback(movement_dispatch.on_bot_text) log.info("Movement dispatcher wired to Gemini BOT transcript") except Exception: log.exception("Could not wire movement dispatcher") # ── Navigation (web_nav3 Nav2 stack) — thin HTTP client ───────────────────── # Loosely-coupled client to the standalone web_nav3 service (FastAPI :8765 + # rosbridge :9090). Owns NO ROS2/Nav2 code; if web_nav3 is down the nav routes # degrade gracefully. The dashboard "Navigation" tab routes (dashboard/routes/ # navigation.py) build their own module-level client, so this singleton is the # parent-side handle used by voice/movement wiring and the subsystem report. # Config precedence (highest first): env var → dashboard config 'navigation' # section → hardcoded default — same resolution as the navigation route. def _build_nav_client(): from Project.Sanad.core.config_loader import section as _cfg_section nav_cfg = _cfg_section("dashboard", "navigation") base_url = (os.environ.get("WEB_NAV3_URL") or nav_cfg.get("web_nav3_url") or "http://127.0.0.1:8765") robot = (os.environ.get("SANAD_ROBOT_NAME") or nav_cfg.get("robot") or "sanad") return WebNav3Client(base_url=str(base_url), robot=str(robot)) nav_client = _safe_construct("nav_client", _build_nav_client if WebNav3Client else None) # ── Recognition (camera + face gallery) ───────────────────────────────────── # Camera is idle until the dashboard toggles vision on; face gallery is pure # file IO and always available if the import succeeded. # # Config precedence (highest first): explicit env var → config/core_config.json # section → hardcoded default. The parent process normally has no SANAD_CAMERA_* # env vars (LIVE_TUNE is only forwarded to the Gemini child), so in practice the # core_config.json `camera` / `faces` sections are the live source here. def _build_camera(): from Project.Sanad.core.config_loader import section as _cfg_section cam_cfg = _cfg_section("core", "camera") def _knob(env_key: str, cfg_key: str, default): env_val = os.environ.get(env_key) if env_val is not None and env_val != "": return type(default)(env_val) return type(default)(cam_cfg.get(cfg_key, default)) # Frames are cached in memory and pushed to the Gemini child over its # stdin (see GeminiSubprocess._frame_forwarder) — no file drop. return CameraDaemon( width=_knob("SANAD_CAMERA_WIDTH", "width", 424), height=_knob("SANAD_CAMERA_HEIGHT", "height", 240), fps=_knob("SANAD_CAMERA_FPS", "fps", 15), jpeg_quality=_knob("SANAD_CAMERA_JPEG_QUALITY", "jpeg_quality", 70), stale_threshold_s=float(cam_cfg.get("stale_threshold_s", 10.0)), reconnect_min_s=float(cam_cfg.get("reconnect_min_s", 2.0)), reconnect_max_s=float(cam_cfg.get("reconnect_max_s", 10.0)), capture_timeout_ms=int(cam_cfg.get("capture_timeout_ms", 5000)), ) def _build_gallery(): from Project.Sanad.config import BASE_DIR from Project.Sanad.core.config_loader import section as _cfg_section faces_cfg = _cfg_section("core", "faces") # SANAD_FACES_DIR is set absolute by LIVE_TUNE (the Gemini child reads the # same var). In the parent it's usually unset → fall back to the JSON's # dir_rel, then the hardcoded default. Honour absolute paths as-is. raw = os.environ.get("SANAD_FACES_DIR") or faces_cfg.get("dir_rel", "data/faces") p = Path(raw) root = p if p.is_absolute() else (BASE_DIR / raw) return FaceGallery(root) def _build_zone_gallery(): # N3 — zones gallery (zone → place → linked faces). Honours SANAD_ZONES_DIR # (absolute) then the core_config 'zones' section dir_rel, then a default. from Project.Sanad.config import BASE_DIR from Project.Sanad.core.config_loader import section as _cfg_section zones_cfg = _cfg_section("core", "zones") raw = os.environ.get("SANAD_ZONES_DIR") or zones_cfg.get("dir_rel", "data/zones") p = Path(raw) root = p if p.is_absolute() else (BASE_DIR / raw) return ZoneGallery(root) camera = _safe_construct("camera", _build_camera if CameraDaemon else None) gallery = _safe_construct("gallery", _build_gallery if FaceGallery else None) zone_gallery = _safe_construct("zone_gallery", _build_zone_gallery if ZoneGallery else None) # Restore persisted vision_enabled at boot — start camera if the user left # it on across a reboot. Face-rec state is read by the Gemini child directly. try: from Project.Sanad.vision import recognition_state as _recog_state from Project.Sanad.config import BASE_DIR as _BD _state = _recog_state.read(_BD / "data" / ".recognition_state.json") if _state.vision_enabled and camera is not None: if camera.start(): log.info("Camera vision restored from state (backend=%s)", camera.backend) else: log.warning("Camera vision was ON but no backend available — leaving OFF") _recog_state.mutate(_BD / "data" / ".recognition_state.json", vision_enabled=False) except Exception: log.exception("Could not restore recognition state") # Hand the camera to the Gemini supervisor so it can forward frames to the # child over stdin while a live session runs. if live_sub is not None and camera is not None: try: if hasattr(live_sub, "attach_camera"): live_sub.attach_camera(camera) log.info("Camera attached to live subprocess supervisor") except Exception: log.exception("attach_camera failed") # Hand the AudioManager to the supervisor so the audio watcher can keep # PulseAudio defaults aligned with the live profile on every Anker # plug/unplug. Without this, typed-replay / record playback would stay on # the boot device even after the live session swapped to Anker. if live_sub is not None and audio_mgr is not None: try: if hasattr(live_sub, "attach_audio_manager"): live_sub.attach_audio_manager(audio_mgr) log.info("AudioManager attached to live subprocess supervisor") except Exception: log.exception("attach_audio_manager failed") # ── Motion-state → Gemini channel ─────────────────────────────────────────── # The arm controller emits motion.action_started / _done / _error on the bus. # Forward each to the Gemini child as a 'state:' line so the live session can # answer "what are you doing?" honestly. Sync handlers, fired via emit_sync # from the arm's worker thread — send_state just writes to a pipe (cheap). if live_sub is not None and hasattr(live_sub, "send_state"): try: from Project.Sanad.core.event_bus import bus as _bus def _on_motion_started(action: str = "", **_kw): live_sub.send_state("start", action) def _on_motion_done(action: str = "", elapsed_sec=None, failed: bool = False, **_kw): # action_error already covered the failure case with a reason; # here just emit complete (skip if it failed to avoid a dup). if not failed: live_sub.send_state("complete", action, elapsed_sec=elapsed_sec) def _on_motion_error(action: str = "", reason: str = "", **_kw): live_sub.send_state("error", action, reason=reason) _bus.on("motion.action_started", _on_motion_started) _bus.on("motion.action_done", _on_motion_done) _bus.on("motion.action_error", _on_motion_error) log.info("Motion-state → Gemini channel wired") except Exception: log.exception("Could not wire motion-state → Gemini channel") # Animate the LED face mask while the robot is "speaking". Hooked to the # gestural-speaking toggle (brain.gestural_speaking_changed); finer per-utterance # lip-sync from TTS amplitude is a follow-up. Safe no-op until the face is started. if mask_face is not None: try: from Project.Sanad.core.event_bus import bus as _bus_face def _on_gestural_speaking(enabled: bool = False, **_kw): try: mask_face.set_speaking(bool(enabled)) if not enabled: mask_face.set_listening() # back to attentive after a reply except Exception: log.exception("mask_face.set_speaking failed") _bus_face.on("brain.gestural_speaking_changed", _on_gestural_speaking) log.info("LED face wired to gestural-speaking events") except Exception: log.exception("Could not wire LED face speaking hook") # Real lip-sync: route Gemini's per-chunk [[MOUTH:n]] amplitude markers (emitted # by gemini/script.py, parsed by GeminiSubprocess) to the LED mask's mouth so it # opens/closes with the actual speech. Fires on the subprocess reader thread; # FaceController.set_mouth is thread-safe and a safe no-op until the face starts. if mask_face is not None and live_sub is not None and hasattr(live_sub, "register_mouth_callback"): try: def _on_mouth_level(level: int): if not getattr(mask_face, "_gemini_linked", False): return # Gemini not linked to the mask -> leave it alone try: mask_face.set_mouth(int(level)) except Exception: log.exception("mask_face.set_mouth (lip-sync) failed") live_sub.register_mouth_callback(_on_mouth_level) log.info("LED face wired to Gemini lip-sync (MOUTH markers)") except Exception: log.exception("Could not wire LED face lip-sync hook") # Gemini-driven expressions: [[FACE:name]] markers (from the set_expression tool) # -> a brief emotion reaction on the mask. Fires on the subprocess reader thread; # react() is thread-safe and a safe no-op until the face starts. if mask_face is not None and live_sub is not None and hasattr(live_sub, "register_face_callback"): try: # per-emotion hold (seconds): affection/reactions linger a touch longer _FACE_HOLD = {"heart": 2.6, "love": 2.6, "kiss": 2.4, "laugh": 2.2, "surprised": 1.8, "confused": 1.8} def _on_face_emotion(name: str): if not getattr(mask_face, "_gemini_linked", False): return # Gemini not linked to the mask -> ignore emotion markers try: mask_face.react(str(name), _FACE_HOLD.get(name, 1.6)) except Exception: log.exception("mask_face.react (emotion) failed") live_sub.register_face_callback(_on_face_emotion) log.info("LED face wired to Gemini emotions (FACE markers)") except Exception: log.exception("Could not wire LED face emotion hook") # Gemini-driven social QR: [[SHOW:account]] markers (from the show_social tool) # -> render the account's QR + show it on the mask via the shared helper. if mask_face is not None and live_sub is not None and hasattr(live_sub, "register_social_callback"): try: def _on_social(account: str): if not getattr(mask_face, "_gemini_linked", False): return # Gemini not linked to the mask -> ignore social markers # This fires on the subprocess READER THREAD, which must keep draining # stdout (lip-sync / transcript). show_social_on_mask does a ~9s BLE # scratch upload — so dispatch it to a daemon thread and return at once. def _run(acc=str(account)): try: from Project.Sanad.dashboard.routes.mask_social import show_social_on_mask show_social_on_mask(acc) except Exception: log.exception("show_social_on_mask failed") import threading as _th _th.Thread(target=_run, daemon=True, name="mask-social").start() live_sub.register_social_callback(_on_social) log.info("LED face wired to Gemini social QR (SHOW markers)") except Exception: log.exception("Could not wire LED face social hook") # Lifelike face behaviour: drive the LED face's state + reactions from bus events # so it looks alive and engaged during a conversation (attentive while listening, # looks-away while a reply is prepared, brief smile/sad reactions). All calls are # safe no-ops until the face is started, and on the basic FaceAnimator fallback. if mask_face is not None: try: from Project.Sanad.core.event_bus import bus as _bus_face2 def _face_listening(**_kw): try: mask_face.set_listening() except Exception: log.exception("face set_listening failed") def _face_thinking(**_kw): try: mask_face.set_thinking() except Exception: log.exception("face set_thinking failed") def _face_idle(**_kw): try: mask_face.set_idle() except Exception: log.exception("face set_idle failed") def _face_react(emotion): def _handler(**_kw): try: mask_face.react(emotion) except Exception: log.exception("face react failed") return _handler _bus_face2.on("voice.connected", _face_listening) # session up -> attentive _bus_face2.on("voice.user_said", _face_thinking) # heard user -> processing _bus_face2.on("voice.disconnected", _face_idle) _bus_face2.on("voice.error", _face_react("sad")) _bus_face2.on("motion.action_error", _face_react("sad")) _bus_face2.on("skill.finished", _face_react("smile")) # success -> happy log.info("LED face wired to lifelike state/reaction events") except Exception: log.exception("Could not wire LED face lifelike behaviour hooks") # Wire everything into the Brain (only what was constructed) def _safe_attach(method_name: str, value): if brain is None or value is None: return method = getattr(brain, method_name, None) if method is None: return try: method(value) except Exception: log.exception("brain.%s failed", method_name) _safe_attach("attach_voice", voice_client) _safe_attach("attach_audio_manager", audio_mgr) _safe_attach("attach_arm", arm) _safe_attach("attach_macro_recorder", macro_rec) _safe_attach("attach_macro_player", macro_play) _safe_attach("attach_live_voice", live_voice) # ── Runtime sanity report ──────────────────────────────────────────────── SUBSYSTEMS = { "brain": brain, "arm": arm, "audio_mgr": audio_mgr, "voice_client": voice_client, "local_tts": local_tts, "macro_rec": macro_rec, "macro_play": macro_play, "teacher": teacher, "wake_mgr": wake_mgr, "live_voice": live_voice, "live_sub": live_sub, "typed_replay": typed_replay, "camera": camera, "gallery": gallery, "zone_gallery": zone_gallery, "loco_controller": loco_controller, "movement_dispatch": movement_dispatch, "mask_face": mask_face, "nav_client": nav_client, } # Critical subsystems — if any of these are None, log a warning at startup CRITICAL_SUBSYSTEMS = ("brain",) for _name in CRITICAL_SUBSYSTEMS: if SUBSYSTEMS.get(_name) is None: log.error("CRITICAL subsystem '%s' is None — application will be unusable", _name) _available = [k for k, v in SUBSYSTEMS.items() if v is not None] _missing = [k for k, v in SUBSYSTEMS.items() if v is None] log.info("Subsystems available (%d): %s", len(_available), ", ".join(_available)) if _missing: log.warning("Subsystems unavailable (%d): %s", len(_missing), ", ".join(_missing)) _shutting_down = False def _call_with_timeout(label: str, fn, timeout_s: float = 2.0): """Run a possibly-blocking teardown call on a daemon thread and never wait more than ``timeout_s`` for it. If it hangs we log and move on — the daemon thread dies with the process at os._exit / interpreter exit. """ import threading def _runner(): try: fn() except Exception: log.exception("%s failed", label) t = threading.Thread(target=_runner, name=f"shutdown-{label}", daemon=True) t.start() t.join(timeout_s) if t.is_alive(): log.warning("%s did not finish within %.1fs — skipping (forced exit)", label, timeout_s) def _do_shutdown(from_signal: bool = False): """Clean shutdown — release hardware, stop background tasks. Idempotent. Never blocks more than a couple seconds on any single step: the loco StopMove is run on a watchdog thread (it can re-init / hang DDS during teardown), and tracked children are stopped early so Ctrl+C kills the whole tree fast. """ global _shutting_down if _shutting_down: return _shutting_down = True log.info("Shutdown requested") # ── Stop tracked child subprocesses FIRST ─────────────────────────── # The Gemini/local voice supervisor owns a real child OS process (and # forwards camera/audio to it). Kill it early so on Ctrl+C the child # tree dies fast even if a later step hangs. if live_sub is not None: try: running = live_sub.is_running() if callable(getattr(live_sub, "is_running", None)) else False if running: live_sub.stop() except Exception: log.exception("live_sub.stop() failed") if camera is not None: try: if camera.is_running(): camera.stop() except Exception: log.exception("camera.stop() failed") if arm is not None: try: if hasattr(arm, "cancel"): arm.cancel() except Exception: log.exception("arm.cancel() failed") try: if hasattr(arm, "disable"): arm.disable() except Exception: log.exception("arm.disable() failed") if movement_dispatch is not None: try: movement_dispatch.stop() except Exception: log.exception("movement_dispatch.stop() failed") # ── Loco stop — NON-BLOCKING ───────────────────────────────────────── # loco_controller.shutdown() does StopMove + disarm, but StopMove can # re-init / block on DDS during interpreter teardown. Only stop if a # client is actually live, and never wait more than ~2s on it. if loco_controller is not None: _loco_has_client = True try: # If the controller exposes a "client exists" probe, honour it so # we never trigger a lazy LocoClient re-init during teardown. for _attr in ("has_client", "is_armed", "_client"): if hasattr(loco_controller, _attr): _probe = getattr(loco_controller, _attr) _loco_has_client = bool(_probe() if callable(_probe) else _probe) break except Exception: _loco_has_client = True # probe failed — fall back to attempting it if _loco_has_client: _call_with_timeout("loco_controller.shutdown()", loco_controller.shutdown, timeout_s=2.0) else: log.info("loco_controller has no live client — skipping StopMove") if mask_face is not None: try: mask_face.shutdown() # disconnect BLE + stop the face loop except Exception: log.exception("mask_face.shutdown() failed") if audio_mgr is not None: try: if hasattr(audio_mgr, "close"): audio_mgr.close() except Exception: log.exception("audio_mgr.close() failed") log.info("Shutdown complete") import atexit # noqa: E402 atexit.register(_do_shutdown) # atexit is the fallback path (clean uvicorn return / interpreter exit). # The PRIMARY Ctrl+C path is the explicit SIGINT/SIGTERM handler installed # in main() — see _install_signal_handlers(). That handler fully takes over: # it runs the (idempotent, non-blocking) shutdown and then os._exit(0), so it # never returns to uvicorn. This avoids the old problem where uvicorn's own # handler and ours would fight — we just don't hand control back. A single # SIGINT therefore tears down every child and force-exits within ~2s. def _install_signal_handlers(): """Take over SIGINT/SIGTERM so one Ctrl+C kills EVERYTHING fast. We do NOT chain to uvicorn's handler: we stop tracked children + do a non-blocking loco stop, then os._exit(0) so the process dies immediately without ever returning to uvicorn or hanging in atexit. """ import signal def _handler(signum, _frame): try: log.warning("force shutdown (signal %s) — killing everything", signum) except Exception: pass try: _do_shutdown(from_signal=True) except Exception: try: log.exception("_do_shutdown raised during signal teardown") except Exception: pass # Hard-exit so even if uvicorn/atexit would hang we are gone. os._exit(0) for _sig in (signal.SIGINT, signal.SIGTERM): try: signal.signal(_sig, _handler) except Exception: log.exception("Could not install handler for signal %s", _sig) def _print_env_diagnostic(): """Print everything you'd need to debug a deployment issue.""" print("=" * 60) print("SANAD ENVIRONMENT DIAGNOSTIC") print("=" * 60) print(f"Python: {sys.version}") print(f"Executable: {sys.executable}") print(f"Platform: {sys.platform}") print(f"BASE_DIR: {_THIS_DIR}") print(f"Parent: {_PARENT}") print(f"Layout: {'dev (Project/Sanad)' if _PARENT.name == 'Project' else 'deployed (top-level Sanad)'}") print(f"Dashboard: {DASHBOARD_HOST}:{DASHBOARD_PORT} (interface: {DASHBOARD_INTERFACE})") print(f"DDS interface: {DDS_NETWORK_INTERFACE}") print() print("sys.path[0:8]:") for p in sys.path[:8]: print(f" {p}") print() print("Critical imports:") for mod_name in ("uvicorn", "fastapi", "pydantic", "starlette", "websockets", "httpx", "pyaudio", "pyrealsense2", "unitree_sdk2py", "ultralytics", "numpy", "cv2"): try: mod = __import__(mod_name) ver = getattr(mod, "__version__", "?") path = getattr(mod, "__file__", "?") print(f" ✓ {mod_name:18s} {ver:12s} {path}") except BaseException as exc: print(f" ✗ {mod_name:18s} {type(exc).__name__}: {exc}") print() print("Subsystems available (after constructing main module globals):") for name in sorted(SUBSYSTEMS): print(f" {'✓' if SUBSYSTEMS[name] is not None else '✗'} {name}") print("=" * 60) def main(): parser = argparse.ArgumentParser(description="Sanad Robot Assistant") parser.add_argument("--host", default=DASHBOARD_HOST, help=f"Dashboard bind address. Default is wlan0's IP " f"({DASHBOARD_HOST!r}). Override with SANAD_DASHBOARD_HOST " f"or SANAD_DASHBOARD_INTERFACE.") parser.add_argument("--port", type=int, default=DASHBOARD_PORT) parser.add_argument("--network", default=DDS_NETWORK_INTERFACE, help="DDS network interface (e.g. eth0, lo). " "Override with SANAD_DDS_INTERFACE env var.") parser.add_argument("--check-env", action="store_true", help="Print environment diagnostic and exit " "(no server start, no hardware init)") args = parser.parse_args() if args.check_env: _print_env_diagnostic() return # Install our SIGINT/SIGTERM handler EARLY — before any hardware init or # uvicorn.run() — so a single Ctrl+C at any point forces a fast, clean # teardown of every child and exits the process. _install_signal_handlers() log.info("Sanad starting — Python %s @ %s", sys.version.split()[0], sys.executable) log.info("BASE_DIR: %s", _THIS_DIR) log.info("Dashboard interface: %s → bound to %s", DASHBOARD_INTERFACE, args.host) log.info("Starting Sanad — host=%s port=%d network=%s", args.host, args.port, args.network) if brain is not None: try: log.info("Brain status: %s", brain.status()) except Exception: log.exception("brain.status() failed") # Initialize hardware (graceful if unavailable) if arm is not None: try: arm.init(network_interface=args.network) except Exception: log.exception("arm.init() failed — continuing without hardware") # ── import uvicorn ────────────────────────────────────────────────── # Catch ANY exception (not just ImportError) so the real failure reason # is surfaced. The previous narrow catch hid issues like uvicorn pulling # in a broken transitive dep, or being installed under a different # site-packages than the active interpreter. uvicorn = None try: import uvicorn # type: ignore log.info("uvicorn %s loaded from %s", getattr(uvicorn, "__version__", "?"), getattr(uvicorn, "__file__", "?")) except BaseException as exc: log.error("Could not import uvicorn: %s: %s", type(exc).__name__, exc) log.error("Python: %s", sys.executable) log.error("sys.path[0:5]: %s", sys.path[:5]) log.error("Try: %s -m pip install --user 'uvicorn[standard]' fastapi", sys.executable) sys.exit(1) # ── import the FastAPI app ────────────────────────────────────────── # Pass the app object directly so uvicorn doesn't have to re-resolve the # import path (which differs between dev and deployed layouts). try: from Project.Sanad.dashboard.app import app as _app except BaseException: log.exception("Could not import Dashboard.app — aborting") sys.exit(1) # ── start the server ──────────────────────────────────────────────── try: uvicorn.run( _app, host=args.host, port=args.port, log_level="info", ) except BaseException: log.exception("uvicorn.run() failed") sys.exit(1) if __name__ == "__main__": main()