#!/usr/bin/env python3 """Sanad Package 1 — Basic Communication launcher. P1 = hands-free conversation in ONE operator-selected language (Gemini Live), audio over the G1 chest OR any plugged USB device. NO voice-command motion, NO vision/recognition, NO navigation. This is a *containerization wrapper* around the canonical Sanad source — it does NOT fork Sanad. It: 1. bootstraps the `Project.Sanad` namespace (deployed layout), 2. constructs ONLY the P1 comms subsystems, 3. injects a P1-scoped `Project.Sanad.main` shim (the dashboard routers import their singletons from there — lazily, inside handlers), 4. mounts ONLY the P1 dashboard routers + the logs websocket, 5. serves the real Sanad SPA with non-P1 tabs hidden, 6. runs uvicorn on the P1 port (default :8011). Kept Python-3.8 compatible. """ from __future__ import annotations import importlib import os import sys import types from pathlib import Path # ── 1. namespace bootstrap (mirrors Sanad/main.py, deployed layout) ────────── # In the image: /app/Sanad is the canonical package, /app is on sys.path. _APP = Path(os.environ.get("SANAD_APP_DIR", "/app")) _SANAD_DIR = _APP / "Sanad" if str(_APP) not in sys.path: sys.path.insert(0, str(_APP)) if "Project" not in sys.modules: _proj = types.ModuleType("Project") _proj.__path__ = [] # namespace package sys.modules["Project"] = _proj if "Project.Sanad" not in sys.modules: _sanad = importlib.import_module("Sanad") sys.modules["Project.Sanad"] = _sanad sys.modules["Project"].Sanad = _sanad # type: ignore[attr-defined] # asyncio.to_thread shim for Py3.8 — before any Sanad module that uses it. from Project.Sanad.core import asyncio_compat # noqa: E402,F401 from Project.Sanad.core.logger import get_logger # noqa: E402 log = get_logger("pkg1.app") PACKAGE = "P1" PACKAGE_TITLE = "Sanad — Basic Communication (P1)" # SanadV3 SPA tab ids (nav is
, content #tab-X). # P1 (Basic Communication) shows only the comms tabs; everything else is hidden on /full. P1_SPA_TABS = ["operations", "voice", "recordings", "settings"] P1_SPA_HIDE = ["motion", "controller", "navigation", "livemap", "mapeditor", "recognition", "mask", "temp", "terminal"] # Routers P1 does NOT mount — the SPA polls these; short-circuit client-side so # /full raises no "Not Found" toasts for an unsold feature. P1_UNMOUNTED = ["/api/nav", "/api/zones", "/api/recognition", "/api/controller", "/api/motion", "/api/skills", "/api/macros", "/api/replay", "/api/wake-phrases", "/api/live-voice", "/api/scripts", "/api/mask"] def _safe(name, factory): try: return factory() except Exception: log.exception("P1: could not construct %s — degraded", name) return None # ── 2. construct ONLY P1 comms subsystems ──────────────────────────────────── def _build_singletons(): from Project.Sanad.core.brain import Brain from Project.Sanad.voice.audio_manager import AudioManager from Project.Sanad.gemini.client import GeminiVoiceClient from Project.Sanad.gemini.subprocess import GeminiSubprocess brain = _safe("brain", Brain) audio_mgr = _safe("audio_mgr", AudioManager) voice_client = _safe("voice_client", GeminiVoiceClient) local_tts = None try: from Project.Sanad.voice.local_tts import LocalTTSEngine local_tts = _safe("local_tts", LocalTTSEngine) except Exception: pass typed_replay = None if voice_client is not None and audio_mgr is not None: try: from Project.Sanad.voice.typed_replay import TypedReplayEngine typed_replay = _safe("typed_replay", lambda: TypedReplayEngine(voice_client, audio_mgr)) except Exception: pass # P1 keeps the Gemini live supervisor (basic profile, motion paths off). live_sub = _safe("live_sub", lambda: GeminiSubprocess()) # Brain attachments (only what P1 has). for meth, val in (("attach_voice", voice_client), ("attach_audio_manager", audio_mgr)): if brain is not None and val is not None and hasattr(brain, meth): try: getattr(brain, meth)(val) except Exception: log.exception("brain.%s failed", meth) return dict(brain=brain, audio_mgr=audio_mgr, voice_client=voice_client, local_tts=local_tts, typed_replay=typed_replay, live_sub=live_sub) def _inject_main_shim(singletons): """Create a P1-scoped `Project.Sanad.main` so dashboard routers resolve their `from Project.Sanad.main import ` against P1's subset. Excluded subsystems are present as None (routers guard for None).""" shim = types.ModuleType("Project.Sanad.main") # P1 singletons for k, v in singletons.items(): setattr(shim, k, v) # excluded subsystems — must EXIST (as None) so lazy imports never ImportError for k in ("arm", "wake_mgr", "macro_rec", "macro_play", "teacher", "live_voice", "camera", "gallery", "zone_gallery", "loco_controller", "movement_dispatch"): if not hasattr(shim, k): setattr(shim, k, None) shim.SUBSYSTEMS = { # type: ignore[attr-defined] "brain": singletons.get("brain"), "audio_mgr": singletons.get("audio_mgr"), "voice_client": singletons.get("voice_client"), "local_tts": singletons.get("local_tts"), "typed_replay": singletons.get("typed_replay"), "live_sub": singletons.get("live_sub"), } sys.modules["Project.Sanad.main"] = shim return shim # ── 3. build the P1 FastAPI app ─────────────────────────────────────────────── # (module, prefix, tag) — P1 subset of dashboard/app.py's _REST_ROUTES. _P1_REST = [ ("health", "/api", "health"), ("system", "/api/system", "system"), ("voice", "/api/voice", "voice"), ("audio_control", "/api/audio", "audio"), ("prompt", "/api/prompt", "prompt"), ("typed_replay", "/api/typed-replay", "typed-replay"), ("records", "/api/records", "records"), ("logs", "/api/logs", "logs"), ("live_subprocess", "/api/live-subprocess", "live-subprocess"), ] _P1_WS = ["log_stream"] def _tab_filter_snippet(): """Hide non-P1 tabs/pills + short-circuit unmounted-router polls on /full. The SanadV3 nav is
, so hide by the stable onclick/id via CSS (applies before render, no DOMContentLoaded race).""" import json as _json css = ",".join(".tab[onclick*=\"switchTab('%s')\"],#tab-%s" % (t, t) for t in P1_SPA_HIDE) + ",#status-pills" return ( "" "" % (css, _json.dumps({"name": PACKAGE, "title": PACKAGE_TITLE, "tabs": P1_SPA_TABS}), _json.dumps(P1_UNMOUNTED)) ) def build_app(): from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse from Project.Sanad.config import BASE_DIR from Project.Sanad.core.config_loader import section as _cfg_section app_cfg = _cfg_section("dashboard", "app") app = FastAPI(title=PACKAGE_TITLE, version="0.1.0") loaded, failed = [], {} def _register(mod_name, prefix, tag, package="Project.Sanad.dashboard.routes"): try: mod = importlib.import_module("%s.%s" % (package, mod_name)) if not hasattr(mod, "router"): raise AttributeError("no 'router'") kw = {} if prefix: kw["prefix"] = prefix if tag: kw["tags"] = [tag] app.include_router(mod.router, **kw) loaded.append(mod_name) except Exception as exc: failed[mod_name] = str(exc) log.exception("P1: router %s failed — skipped", mod_name) for m, p, t in _P1_REST: _register(m, p, t) for m in _P1_WS: _register(m, None, "websocket", package="Project.Sanad.dashboard.websockets") # P1-specific routes (package-local, not part of Sanad): /api/p1/* — # first-class "set / update Gemini API key" with live-session restart. try: import routes_p1 app.include_router(routes_p1.router, prefix="/api/p1", tags=["p1"]) loaded.append("routes_p1") except Exception as exc: failed["routes_p1"] = str(exc) log.exception("P1: routes_p1 failed — /api/p1 unavailable") static_dir = BASE_DIR / app_cfg.get("static_subdir", "dashboard/static") try: app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") except Exception: log.exception("P1: static mount failed") @app.get("/api/package") async def package_info(): from sanad_pkg import license as _lic lic = _lic.current() # Gemini API-key status (masked) — so the dashboard can show whether a # key is set without ever exposing it. api_key = {"has_key": False, "masked": "", "source": "default"} try: import Project.Sanad.config as _cfg_mod from Project.Sanad.dashboard.routes.voice import _mask_api_key _k = getattr(_cfg_mod, "GEMINI_API_KEY", "") or "" try: from Project.Sanad.config import load_config _stored = (load_config().get("gemini", {}) or {}).get("api_key") except Exception: _stored = None api_key = {"has_key": bool(_k), "masked": _mask_api_key(_k), "source": "config_file" if _stored else "default"} except Exception: log.exception("could not read api-key status") return { "package": PACKAGE, "title": PACKAGE_TITLE, "tabs": P1_SPA_TABS, "language": os.environ.get("SANAD_LANGUAGE", lic.feature("language", "ar")), "audio_profile": os.environ.get("SANAD_AUDIO_PROFILE", "builtin"), "api_key": api_key, "endpoints": { "api_key_get": "GET /api/p1/api-key", "api_key_set": "POST /api/p1/api-key {\"api_key\": \"AIza...\"}", "persona_get": "GET /api/p1/persona", "persona_set": "POST /api/p1/persona {\"content\": \"\"}", "settings": "GET /api/p1/settings", }, "loaded_routes": loaded, "failed_routes": failed, "license": lic.summary(), } # P1's own clean control page (cards). Falls back to the full SPA if missing. p1_static = Path(os.environ.get( "SANAD_P1_STATIC", str(Path(__file__).resolve().parent / "static"))) def _widget_html(): """The P1 Quick-Controls widget injected into the full SPA (cards on /full).""" w = p1_static / "p1_widget.html" try: return w.read_text(encoding="utf-8") if w.exists() else "" except OSError: return "" def _filtered_spa(): index = static_dir / "index.html" if not index.exists(): return JSONResponse({"message": "full SPA index.html not found", "loaded": loaded, "failed": failed}) try: html = index.read_text(encoding="utf-8") # Filter (CSS hide + fetch shim) in so the fetch override is set # BEFORE the SPA's body scripts/pollers run; the widget goes at body end. filt = _tab_filter_snippet() if "" in html: html = html.replace("", filt + "", 1) elif "" in html: html = html.replace("", filt + "", 1) else: html = filt + html w = _widget_html() if w: html = html.replace("", w + "", 1) if "" in html else html + w return HTMLResponse(html) except OSError as exc: return JSONResponse({"error": "index.html unreadable: %s" % exc}, status_code=500) @app.get("/") async def root(): page = p1_static / "p1.html" if page.exists(): try: return HTMLResponse(page.read_text(encoding="utf-8")) except OSError as exc: log.exception("could not read p1.html — falling back to SPA") return _filtered_spa() @app.get("/full") async def full_dashboard(): """The complete Sanad SPA (advanced), with non-P1 tabs hidden.""" return _filtered_spa() log.info("P1 dashboard built — routers loaded=%s failed=%s", loaded, list(failed)) return app def _init_dds_for_audio(): """Standalone P1 owns the ONE ChannelFactoryInitialize so the G1 chest AudioClient works — the same DDS init Sanad performs via arm.init(). Without it, AudioClient() fails ('NoneType._ref') and chest playback is silent. In bus/multi-package mode the hwbroker owns DDS, so P1 skips it there.""" if os.environ.get("SANAD_BUS_ADDR"): return try: from unitree_sdk2py.core.channel import ChannelFactoryInitialize iface = os.environ.get("SANAD_DDS_INTERFACE", "eth0") ChannelFactoryInitialize(0, iface) log.info("P1: DDS ChannelFactoryInitialize(0, %s) done — chest audio enabled", iface) except Exception: log.exception("P1: DDS init failed — chest audio unavailable (plugged/USB still works)") def _enforce_keyless_default(): """P1 ships KEYLESS — the customer adds their own Gemini key via the dashboard (saved to data/motions/config.json). Honor only an explicit vendor key (SANAD_GEMINI_API_KEY env) or a customer-saved key; otherwise IGNORE any key baked into the shipped Sanad config so the product never carries a vendor key.""" import Project.Sanad.config as _cfg env_key = (os.environ.get("SANAD_GEMINI_API_KEY") or "").strip() saved = "" try: from Project.Sanad.config import load_config saved = ((load_config().get("gemini") or {}).get("api_key") or "").strip() except Exception: pass if env_key or saved: return # explicit/customer key present — keep it _cfg.GEMINI_API_KEY = "" try: import Project.Sanad.gemini.client as _gc _gc.GEMINI_API_KEY = "" except Exception: pass log.info("P1: keyless by default — customer must add a Gemini key via the dashboard") def main(): host = os.environ.get("SANAD_DASHBOARD_HOST", "0.0.0.0") port = int(os.environ.get("SANAD_DASHBOARD_PORT", "8011")) log.info("Sanad P1 (Basic Communication) starting — %s:%d lang=%s audio=%s", host, port, os.environ.get("SANAD_LANGUAGE", "?"), os.environ.get("SANAD_AUDIO_PROFILE", "builtin")) # optional cross-container bus (no-op without SANAD_BUS_ADDR) try: from sanad_pkg.bus import bus bus.connect() except Exception: log.exception("bus connect failed (continuing in-process)") _init_dds_for_audio() _enforce_keyless_default() singletons = _build_singletons() _inject_main_shim(singletons) import uvicorn app = build_app() uvicorn.run(app, host=host, port=port, log_level="info") if __name__ == "__main__": main()