#!/usr/bin/env python3 """Sanad Package 2 — Premium Communication launcher. P2 = a SUPERSET of P1: hands-free Gemini conversation (multilingual auto-detect), voice-command ARM gestures + gestures-while-speaking, wake-phrases, skills, and a lip-syncing LED "Shining Mask". Voice-command LOCOMOTION is DEFERRED. Like P1, this is a *containerization wrapper* around the canonical engine — here the engine is vendored from SanadV3 (which already carries the mask/face subsystem and the evolved voice/audio/arm). It: 1. bootstraps the `Project.Sanad` namespace + the flat Mask lib path, 2. constructs the P2 superset subsystems (comms + arm + mask), 3. wires lip-sync ([[MOUTH:n]] markers), gestures-while-speaking, and lifelike face state, + the arm motion-block interlock seam (no-op while loco deferred), 4. injects a P2-scoped `Project.Sanad.main` shim, 5. mounts the P1 + premium dashboard routers + the logs websocket, 6. serves the SanadV3 SPA with non-P2 tabs hidden, on :8012. Kept Python-3.8 compatible. """ from __future__ import annotations import atexit import importlib import os import sys import types from pathlib import Path # ── 1. namespace bootstrap (mirrors app_p1.py; deployed layout) ────────────── # In the image: /app/Sanad is the canonical package, /app is on sys.path, and the # flat BLE Mask lib lives at /app/mask (its OWN path — it uses flat imports). _APP = Path(os.environ.get("SANAD_APP_DIR", "/app")) _SANAD_DIR = _APP / "Sanad" if str(_APP) not in sys.path: sys.path.insert(0, str(_APP)) # Mask lib on its own path (NOT under Sanad/). FaceController also inserts it, but # set it here so the env + sys.path are consistent before any face import. _MASK_DIR = os.environ.setdefault("SANAD_MASK_DIR", str(_APP / "mask")) if _MASK_DIR and _MASK_DIR not in sys.path: sys.path.insert(0, _MASK_DIR) if "Project" not in sys.modules: _proj = types.ModuleType("Project") _proj.__path__ = [] # namespace package sys.modules["Project"] = _proj if "Project.Sanad" not in sys.modules: _sanad = importlib.import_module("Sanad") sys.modules["Project.Sanad"] = _sanad sys.modules["Project"].Sanad = _sanad # type: ignore[attr-defined] # asyncio.to_thread shim for Py3.8 — before any Sanad module that uses it. from Project.Sanad.core import asyncio_compat # noqa: E402,F401 from Project.Sanad.core.logger import get_logger # noqa: E402 log = get_logger("pkg2.app") PACKAGE = "P2" PACKAGE_TITLE = "Sanad — Premium Communication (P2)" # SanadV3 SPA tab ids (nav is
, content # is #tab-X). P2 SHOWS a subset; live-voice / wake-phrases / skills are controls # WITHIN the shown tabs (operations/voice/motion), not separate tabs. P2_SPA_TABS = ["operations", "voice", "motion", "mask", "recordings", "settings"] P2_SPA_HIDE = ["controller", "navigation", "livemap", "mapeditor", "recognition", "temp", "terminal"] # Routers P2 does NOT mount (no backing subsystem) — the SPA polls these; short- # circuit them client-side to an empty 200 so api() never toasts "Not Found". P2_UNMOUNTED = ["/api/nav", "/api/zones", "/api/recognition", "/api/controller"] def _safe(name, factory): try: return factory() except Exception: log.exception("P2: could not construct %s — degraded", name) return None # ── 2. construct the P2 superset subsystems ────────────────────────────────── def _build_singletons(): from Project.Sanad.core.brain import Brain from Project.Sanad.voice.audio_manager import AudioManager from Project.Sanad.gemini.client import GeminiVoiceClient from Project.Sanad.gemini.subprocess import GeminiSubprocess from Project.Sanad.motion.arm_controller import ArmController from Project.Sanad.voice.wake_phrase_manager import WakePhraseManager brain = _safe("brain", Brain) audio_mgr = _safe("audio_mgr", AudioManager) voice_client = _safe("voice_client", GeminiVoiceClient) local_tts = None try: from Project.Sanad.voice.local_tts import LocalTTSEngine local_tts = _safe("local_tts", LocalTTSEngine) except Exception: pass typed_replay = None if voice_client is not None and audio_mgr is not None: try: from Project.Sanad.voice.typed_replay import TypedReplayEngine typed_replay = _safe("typed_replay", lambda: TypedReplayEngine(voice_client, audio_mgr)) except Exception: pass # Gemini live supervisor — built BEFORE mask_face (lip-sync wires live_sub→mask). live_sub = _safe("live_sub", lambda: GeminiSubprocess()) # Premium arm + voice-command gesture stack. arm = _safe("arm", ArmController) # rt/arm_sdk publisher #1 wake_mgr = _safe("wake_mgr", WakePhraseManager) macro_rec = macro_play = teacher = None if arm is not None: try: from Project.Sanad.motion.macro_recorder import MacroRecorder macro_rec = _safe("macro_rec", lambda: MacroRecorder(arm)) except Exception: pass try: from Project.Sanad.motion.macro_player import MacroPlayer macro_play = _safe("macro_play", lambda: MacroPlayer(audio_mgr, arm)) except Exception: pass try: from Project.Sanad.motion.teaching import TeachingSession teacher = _safe("teacher", lambda: TeachingSession(arm)) except Exception: pass # Voice-command arm gestures (fires sanad_arm_controller.ARM = rt/arm_sdk #2). live_voice = None if all(x is not None for x in (voice_client, arm, wake_mgr, audio_mgr)): try: from Project.Sanad.voice.live_voice_loop import LiveVoiceLoop live_voice = _safe("live_voice", lambda: LiveVoiceLoop(voice_client, arm, wake_mgr, audio_mgr)) except Exception: pass # Mask/face — starts its own BLE asyncio-loop thread in __init__; degrade if absent. mask_face = None try: from Project.Sanad.face.mask_face import FaceController mask_face = _safe("mask_face", FaceController) except Exception: log.exception("P2: FaceController import failed — LED mask unavailable") # Brain attachments (best-effort; hasattr-guarded so missing methods are skipped). for meth, val in (("attach_voice", voice_client), ("attach_audio_manager", audio_mgr), ("attach_arm", arm), ("attach_macro_recorder", macro_rec), ("attach_macro_player", macro_play), ("attach_live_voice", live_voice)): if brain is not None and val is not None and hasattr(brain, meth): try: getattr(brain, meth)(val) except Exception: log.exception("brain.%s failed", meth) if live_sub is not None and audio_mgr is not None and hasattr(live_sub, "attach_audio_manager"): try: live_sub.attach_audio_manager(audio_mgr) except Exception: log.exception("live_sub.attach_audio_manager failed") return dict(brain=brain, audio_mgr=audio_mgr, voice_client=voice_client, local_tts=local_tts, typed_replay=typed_replay, live_sub=live_sub, arm=arm, wake_mgr=wake_mgr, macro_rec=macro_rec, macro_play=macro_play, teacher=teacher, live_voice=live_voice, mask_face=mask_face) # ── 2b. P2-specific wiring (lip-sync / gestures / lifelike / interlock seam) ── def _wire_premium(s): brain = s.get("brain") arm = s.get("arm") live_sub = s.get("live_sub") mask_face = s.get("mask_face") loco_controller = s.get("loco_controller") # None this pass (locomotion deferred) # Arm⇄locomotion motion-block interlock. With locomotion DEFERRED, # loco_controller is None → set_motion_block is NOT wired (the predicate is # absent → arm permits). Documented no-op-by-omission; MUST be re-armed before # the deferred voice-command-locomotion pass ships. Two rt/arm_sdk publishers # (arm_controller + sanad_arm_controller.ARM) coexist; they stay collision-free # via sanad_arm's _is_busy/_busy_lock + arm._lock until the interlock is armed. if arm is not None and loco_controller is not None: try: if hasattr(arm, "set_motion_block"): arm.set_motion_block(loco_controller.movement_active) log.info("Arm motion-block wired to locomotion movement_active") except Exception: log.exception("Could not wire arm motion-block") try: from Project.Sanad.motion.sanad_arm_controller import ARM as _sanad_arm if hasattr(_sanad_arm, "set_motion_block"): _sanad_arm.set_motion_block(loco_controller.movement_active) except Exception: log.exception("Could not wire sanad_arm motion-block") # Lip-sync: Gemini [[MOUTH:n]] markers (surfaced by the PARENT GeminiSubprocess) → mask. if mask_face is not None and live_sub is not None and hasattr(live_sub, "register_mouth_callback"): try: def _on_mouth_level(level): if not getattr(mask_face, "_gemini_linked", False): return # /api/mask/link master switch (matches SanadV3 main.py) try: mask_face.set_mouth(int(level)) except Exception: log.exception("mask_face.set_mouth (lip-sync) failed") live_sub.register_mouth_callback(_on_mouth_level) log.info("LED face wired to Gemini lip-sync (MOUTH markers)") except Exception: log.exception("Could not wire LED face lip-sync hook") # Emotions: Gemini [[FACE:name]] markers (set_expression tool) → brief mask reaction. # Emitted by gemini/script.py + relayed by GeminiSubprocess; we only register the # callback (fires on the reader thread — react() is cheap/non-blocking). if mask_face is not None and live_sub is not None and hasattr(live_sub, "register_face_callback"): try: _FACE_HOLD = {"heart": 2.6, "love": 2.6, "kiss": 2.4, "laugh": 2.2, "surprised": 1.8, "confused": 1.8} def _on_face_emotion(name): if not getattr(mask_face, "_gemini_linked", False): return try: mask_face.react(str(name), _FACE_HOLD.get(name, 1.6)) except Exception: log.exception("mask_face.react (emotion) failed") live_sub.register_face_callback(_on_face_emotion) log.info("LED face wired to Gemini emotions (FACE markers)") except Exception: log.exception("Could not wire LED face emotion hook") # Social QR: Gemini [[SHOW:account]] markers (show_social tool) → QR on the mask. # The upload is a ~9s BLE scratch-slot op — offload to a daemon thread so the # reader thread never blocks. if mask_face is not None and live_sub is not None and hasattr(live_sub, "register_social_callback"): try: def _on_social(account): if not getattr(mask_face, "_gemini_linked", False): return def _run(acc=str(account)): try: from Project.Sanad.dashboard.routes.mask_social import show_social_on_mask show_social_on_mask(acc) except Exception: log.exception("show_social_on_mask failed") import threading as _th _th.Thread(target=_run, daemon=True, name="mask-social").start() live_sub.register_social_callback(_on_social) log.info("LED face wired to Gemini social QR (SHOW markers)") except Exception: log.exception("Could not wire LED face social hook") if mask_face is None: return # The event bus is SYNCHRONOUS (.on/.emit_sync) — handlers run on the emitter's # thread, so these only flip flags the BLE loop reads (non-blocking). try: from Project.Sanad.core.event_bus import bus as _bus def _on_gestural_speaking(enabled=False, **_kw): try: mask_face.set_speaking(bool(enabled)) if not enabled: mask_face.set_listening() except Exception: log.exception("mask_face.set_speaking failed") _bus.on("brain.gestural_speaking_changed", _on_gestural_speaking) def _face_call(method, *args): def _h(**_kw): try: getattr(mask_face, method)(*args) except Exception: log.exception("mask_face.%s failed", method) return _h _bus.on("voice.connected", _face_call("set_listening")) _bus.on("voice.user_said", _face_call("set_thinking")) _bus.on("voice.disconnected", _face_call("set_idle")) _bus.on("voice.error", _face_call("react", "sad")) _bus.on("motion.action_error", _face_call("react", "sad")) _bus.on("skill.finished", _face_call("react", "smile")) log.info("LED face wired to gestural-speaking + lifelike state events") except Exception: log.exception("Could not wire LED face behaviour hooks") # Motion-state → Gemini channel (so "what are you doing?" is grounded). if live_sub is not None and hasattr(live_sub, "send_state"): try: from Project.Sanad.core.event_bus import bus as _bus2 def _ms_started(action="", **_kw): try: live_sub.send_state("start", action) except Exception: log.exception("send_state start failed") def _ms_done(action="", elapsed_sec=None, failed=False, **_kw): if not failed: try: live_sub.send_state("complete", action, elapsed_sec=elapsed_sec) except Exception: log.exception("send_state complete failed") def _ms_error(action="", reason="", **_kw): try: live_sub.send_state("error", action, reason=reason) except Exception: log.exception("send_state error failed") _bus2.on("motion.action_started", _ms_started) _bus2.on("motion.action_done", _ms_done) _bus2.on("motion.action_error", _ms_error) log.info("Motion-state → Gemini channel wired") except Exception: log.exception("Could not wire motion-state → Gemini channel") def _inject_main_shim(singletons): """P2-scoped `Project.Sanad.main` so dashboard routers resolve their lazy `from Project.Sanad.main import `. Deferred subsystems are present as None (routers guard for None) so lazy imports never raise ImportError.""" shim = types.ModuleType("Project.Sanad.main") for k, v in singletons.items(): setattr(shim, k, v) # deferred / out-of-P2-scope subsystems — must EXIST as None. for k in ("loco_controller", "movement_dispatch", "nav_client", "camera", "gallery", "zone_gallery"): if not hasattr(shim, k): setattr(shim, k, None) shim.SUBSYSTEMS = {k: singletons.get(k) for k in ( # type: ignore[attr-defined] "brain", "audio_mgr", "voice_client", "local_tts", "typed_replay", "live_sub", "arm", "wake_mgr", "macro_rec", "macro_play", "teacher", "live_voice", "mask_face")} sys.modules["Project.Sanad.main"] = shim return shim # ── 3. build the P2 FastAPI app ─────────────────────────────────────────────── # (module, prefix, tag) — P2 subset of SanadV3 dashboard/app.py's _REST_ROUTES # (P1 set + premium: motion/skills/macros/replay/scripts/wake-phrases/live-voice/mask). _P2_REST = [ ("health", "/api", "health"), ("system", "/api/system", "system"), ("voice", "/api/voice", "voice"), ("motion", "/api/motion", "motion"), ("skills", "/api/skills", "skills"), ("macros", "/api/macros", "macros"), ("replay", "/api/replay", "replay"), ("scripts", "/api/scripts", "scripts"), ("audio_control", "/api/audio", "audio"), ("prompt", "/api/prompt", "prompt"), ("typed_replay", "/api/typed-replay", "typed-replay"), ("records", "/api/records", "records"), ("logs", "/api/logs", "logs"), ("wake_phrases", "/api/wake-phrases", "wake-phrases"), ("live_voice", "/api/live-voice", "live-voice"), ("live_subprocess", "/api/live-subprocess", "live-subprocess"), ("mask", "/api/mask", "mask"), ("mask_social", "/api/mask", "mask-social"), # QR/social faces (shares /api/mask) ] _P2_WS = ["log_stream"] def _tab_filter_snippet(): """Hide non-P2 tabs/pills + short-circuit unmounted-router polls. The SanadV3 SPA nav is `
` and content is `#tab-X`, so we hide both by the stable onclick/id (CSS — applies before the SPA renders, no DOMContentLoaded race). The fetch shim makes the routers P2 doesn't mount resolve to an empty 200 so the SPA's api() never raises a 'Not Found' toast for an unsold feature (nav / zones / recognition / controller). """ import json as _json css = ",".join( ".tab[onclick*=\"switchTab('%s')\"],#tab-%s" % (t, t) for t in P2_SPA_HIDE ) + ",#status-pills" return ( "" "" % (css, _json.dumps({"name": PACKAGE, "title": PACKAGE_TITLE, "tabs": P2_SPA_TABS}), _json.dumps(P2_UNMOUNTED)) ) def build_app(): from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse from Project.Sanad.config import BASE_DIR from Project.Sanad.core.config_loader import section as _cfg_section app_cfg = _cfg_section("dashboard", "app") app = FastAPI(title=PACKAGE_TITLE, version="0.1.0") loaded, failed = [], {} def _register(mod_name, prefix, tag, package="Project.Sanad.dashboard.routes"): try: mod = importlib.import_module("%s.%s" % (package, mod_name)) if not hasattr(mod, "router"): raise AttributeError("no 'router'") kw = {} if prefix: kw["prefix"] = prefix if tag: kw["tags"] = [tag] app.include_router(mod.router, **kw) loaded.append(mod_name) except Exception as exc: failed[mod_name] = str(exc) log.exception("P2: router %s failed — skipped", mod_name) for m, p, t in _P2_REST: _register(m, p, t) for m in _P2_WS: _register(m, None, "websocket", package="Project.Sanad.dashboard.websockets") # P2-specific convenience routes: /api/p2/* (api-key/persona/say/logs/settings # with live-session restart). try: import routes_p2 app.include_router(routes_p2.router, prefix="/api/p2", tags=["p2"]) loaded.append("routes_p2") except Exception as exc: failed["routes_p2"] = str(exc) log.exception("P2: routes_p2 failed — /api/p2 unavailable") static_dir = BASE_DIR / app_cfg.get("static_subdir", "dashboard/static") try: app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") except Exception: log.exception("P2: static mount failed") @app.get("/api/package") async def package_info(): from sanad_pkg import license as _lic lic = _lic.current() api_key = {"has_key": False, "masked": "", "source": "default"} try: import Project.Sanad.config as _cfg_mod from Project.Sanad.dashboard.routes.voice import _mask_api_key _k = getattr(_cfg_mod, "GEMINI_API_KEY", "") or "" try: from Project.Sanad.config import load_config _stored = (load_config().get("gemini", {}) or {}).get("api_key") except Exception: _stored = None api_key = {"has_key": bool(_k), "masked": _mask_api_key(_k), "source": "config_file" if _stored else "default"} except Exception: log.exception("could not read api-key status") return { "package": PACKAGE, "title": PACKAGE_TITLE, "tabs": P2_SPA_TABS, "language": os.environ.get("SANAD_LANGUAGE", "") or "(multilingual auto-detect)", "audio_profile": os.environ.get("SANAD_AUDIO_PROFILE", "builtin"), "features": { "multilingual": bool(lic.feature("multilingual", True)), "voice_command_motion": bool(lic.feature("voice_command_motion", True)), "lipsync": bool(lic.feature("lipsync", True)), "mask": bool(lic.feature("mask", True)), }, "api_key": api_key, "endpoints": { "api_key_get": "GET /api/p2/api-key", "api_key_set": "POST /api/p2/api-key {\"api_key\": \"AIza...\"}", "persona_get": "GET /api/p2/persona", "persona_set": "POST /api/p2/persona {\"content\": \"\"}", "settings": "GET /api/p2/settings", "mask_status": "GET /api/mask/status", "live_voice": "GET /api/live-voice/status", }, "loaded_routes": loaded, "failed_routes": failed, "license": lic.summary(), } p2_static = Path(os.environ.get( "SANAD_P2_STATIC", str(Path(__file__).resolve().parent / "static"))) def _widget_html(): w = p2_static / "p2_widget.html" try: return w.read_text(encoding="utf-8") if w.exists() else "" except OSError: return "" def _filtered_spa(): index = static_dir / "index.html" if not index.exists(): return JSONResponse({"message": "full SPA index.html not found", "loaded": loaded, "failed": failed}) try: html = index.read_text(encoding="utf-8") # Filter (CSS hide + fetch shim) goes in so the fetch override # is installed BEFORE the SPA's body scripts/pollers run. filt = _tab_filter_snippet() if "" in html: html = html.replace("", filt + "", 1) elif "" in html: html = html.replace("", filt + "", 1) else: html = filt + html w = _widget_html() if w: html = html.replace("", w + "", 1) if "" in html else html + w return HTMLResponse(html) except OSError as exc: return JSONResponse({"error": "index.html unreadable: %s" % exc}, status_code=500) @app.get("/") async def root(): page = p2_static / "p2.html" if page.exists(): try: return HTMLResponse(page.read_text(encoding="utf-8")) except OSError: log.exception("could not read p2.html — falling back to SPA") return _filtered_spa() @app.get("/full") async def full_dashboard(): """The complete Sanad SPA (advanced), with non-P2 tabs hidden.""" return _filtered_spa() log.info("P2 dashboard built — routers loaded=%s failed=%s", loaded, list(failed)) return app def _init_dds_for_audio(): """Standalone P2 owns the ONE ChannelFactoryInitialize so the G1 chest AudioClient AND the arm rt/arm_sdk link work. In bus/multi-package mode the hwbroker owns DDS, so P2 skips it there.""" if os.environ.get("SANAD_BUS_ADDR"): return try: from unitree_sdk2py.core.channel import ChannelFactoryInitialize iface = os.environ.get("SANAD_DDS_INTERFACE", "eth0") ChannelFactoryInitialize(0, iface) log.info("P2: DDS ChannelFactoryInitialize(0, %s) done — chest audio + arm enabled", iface) except Exception: log.exception("P2: DDS init failed — chest audio/arm unavailable (plugged/USB still works)") def _init_dds_and_arm(singletons): """Initialize DDS + the arm EXACTLY ONCE. The canonical engine inits the arm in main.py (`arm.init(iface)` → ChannelFactoryInitialize + rt/arm_sdk bring-up); the P2 shim replaces main.py, so WITHOUT this call `arm._initialized` stays False and every gesture/macro/replay silently no-ops while /api/motion still 200s. arm.init() also performs the one ChannelFactoryInitialize the chest AudioClient needs, so we do NOT also call _init_dds_for_audio when the arm inits (a 2nd ChannelFactoryInitialize throws and would fail arm.init). Fall back to audio-only DDS if the arm is absent or its init fails before DDS came up. Bus/multi-package mode → hwbroker owns DDS, skip.""" if os.environ.get("SANAD_BUS_ADDR"): return iface = os.environ.get("SANAD_DDS_INTERFACE", "eth0") arm = singletons.get("arm") if arm is not None and hasattr(arm, "init"): try: if bool(arm.init(iface)): log.info("P2: arm.init(%s) OK — arm + chest audio share this DDS init", iface) return log.warning("P2: arm.init(%s) returned False — arm gestures DISABLED", iface) except Exception: log.exception("P2: arm.init raised — arm disabled") # No arm (or arm.init failed before ChannelFactoryInitialize) — chest-audio DDS only. _init_dds_for_audio() def _enforce_keyless_default(): """P2 ships KEYLESS — honor only an explicit vendor key (SANAD_GEMINI_API_KEY) or a customer-saved key; otherwise blank GEMINI_API_KEY everywhere.""" import Project.Sanad.config as _cfg env_key = (os.environ.get("SANAD_GEMINI_API_KEY") or "").strip() saved = "" try: from Project.Sanad.config import load_config saved = ((load_config().get("gemini") or {}).get("api_key") or "").strip() except Exception: pass if env_key or saved: return _cfg.GEMINI_API_KEY = "" try: import Project.Sanad.gemini.client as _gc _gc.GEMINI_API_KEY = "" except Exception: pass log.info("P2: keyless by default — customer must add a Gemini key via the dashboard") def main(): host = os.environ.get("SANAD_DASHBOARD_HOST", "0.0.0.0") port = int(os.environ.get("SANAD_DASHBOARD_PORT", "8012")) log.info("Sanad P2 (Premium Communication) starting — %s:%d lang=%s audio=%s mask_dir=%s", host, port, os.environ.get("SANAD_LANGUAGE", "") or "", os.environ.get("SANAD_AUDIO_PROFILE", "builtin"), os.environ.get("SANAD_MASK_DIR", "?")) try: from sanad_pkg.bus import bus bus.connect() except Exception: log.exception("bus connect failed (continuing in-process)") _enforce_keyless_default() singletons = _build_singletons() _init_dds_and_arm(singletons) # DDS once — arm.init owns it (chest audio shares); audio-only fallback _wire_premium(singletons) _inject_main_shim(singletons) # Clean BLE shutdown on exit (uvicorn returns from run() on SIGTERM/SIGINT, # then atexit fires — disconnect the mask + stop its loop). _mask = singletons.get("mask_face") if _mask is not None and hasattr(_mask, "shutdown"): atexit.register(lambda: _mask.shutdown()) import uvicorn app = build_app() uvicorn.run(app, host=host, port=port, log_level="info") if __name__ == "__main__": main()