#!/usr/bin/env python3 """Sanad Package 3 — Facial Recognition + Places + Memories launcher. P3 = perception + memory (NO motion): identify faces (VIP DB), recognize places (zones), and remember visitors across visits, driving mask expressions on recognition. Recognition is 100% Gemini-side / in-context (primer images) — no local ML. Dashboard on :8013. Self-contained wrapper around the vendored SanadV3 engine (like P1/P2): 1. bootstrap the Project.Sanad namespace + flat Mask path, 2. construct the perception subsystems (camera, face gallery, zone gallery, recognition state) + comms core (brain/audio/voice/live_sub) + mask, 3. construct the NEW package-local VisitorMemory store, 4. wire lip-sync + Gemini emotions/social + recognition-driven expressions, 5. inject a P3-scoped Project.Sanad.main shim + a package-local memory shim, 6. mount the recognition/places/mask/memory routers + comms subset, serve the SanadV3 SPA with non-P3 tabs hidden, on :8013. Kept Python-3.8 compatible. """ from __future__ import annotations import atexit import importlib import os import sys import types from pathlib import Path # ── 1. namespace bootstrap (mirrors app_p2.py) ─────────────────────────────── _APP = Path(os.environ.get("SANAD_APP_DIR", "/app")) if str(_APP) not in sys.path: sys.path.insert(0, str(_APP)) _MASK_DIR = os.environ.setdefault("SANAD_MASK_DIR", str(_APP / "mask")) if _MASK_DIR and _MASK_DIR not in sys.path: sys.path.insert(0, _MASK_DIR) if "Project" not in sys.modules: _proj = types.ModuleType("Project") _proj.__path__ = [] sys.modules["Project"] = _proj if "Project.Sanad" not in sys.modules: _sanad = importlib.import_module("Sanad") sys.modules["Project.Sanad"] = _sanad sys.modules["Project"].Sanad = _sanad # type: ignore[attr-defined] # package-local modules (memory store + its route) live next to this file. sys.path.insert(0, str(Path(__file__).resolve().parent)) from Project.Sanad.core import asyncio_compat # noqa: E402,F401 from Project.Sanad.core.logger import get_logger # noqa: E402 log = get_logger("pkg3.app") PACKAGE = "P3" PACKAGE_TITLE = "Sanad — Recognition + Places + Memories (P3)" # SanadV3 SPA tab ids P3 SHOWS / HIDES. P3_SPA_TABS = ["operations", "voice", "recognition", "mask", "recordings", "settings"] P3_SPA_HIDE = ["motion", "controller", "navigation", "livemap", "mapeditor", "temp", "terminal"] # Routers P3 does NOT mount → short-circuit client-side (no "Not Found" toasts). P3_UNMOUNTED = ["/api/nav", "/api/controller", "/api/motion", "/api/skills", "/api/macros", "/api/replay", "/api/wake-phrases", "/api/live-voice", "/api/scripts"] def _safe(name, factory): try: return factory() except Exception: log.exception("P3: could not construct %s — degraded", name) return None # ── 2. construct the perception + comms subsystems ─────────────────────────── def _build_singletons(): from Project.Sanad.core.brain import Brain from Project.Sanad.voice.audio_manager import AudioManager from Project.Sanad.gemini.client import GeminiVoiceClient from Project.Sanad.gemini.subprocess import GeminiSubprocess brain = _safe("brain", Brain) # CRITICAL — greetings ride the live session audio_mgr = _safe("audio_mgr", AudioManager) voice_client = _safe("voice_client", GeminiVoiceClient) local_tts = None try: from Project.Sanad.voice.local_tts import LocalTTSEngine local_tts = _safe("local_tts", LocalTTSEngine) except Exception: pass typed_replay = None if voice_client is not None and audio_mgr is not None: try: from Project.Sanad.voice.typed_replay import TypedReplayEngine typed_replay = _safe("typed_replay", lambda: TypedReplayEngine(voice_client, audio_mgr)) except Exception: pass live_sub = _safe("live_sub", lambda: GeminiSubprocess()) # Perception: camera daemon + face gallery + zone gallery. camera = None try: from Project.Sanad.vision.camera import CameraDaemon camera = _safe("camera", lambda: CameraDaemon()) except Exception: log.exception("P3: CameraDaemon import failed — vision unavailable") gallery = None try: from Project.Sanad.vision.face_gallery import FaceGallery gallery = _safe("gallery", lambda: FaceGallery()) except Exception: log.exception("P3: FaceGallery import failed") zone_gallery = None try: from Project.Sanad.vision.zone_gallery import ZoneGallery zone_gallery = _safe("zone_gallery", lambda: ZoneGallery()) except Exception: log.exception("P3: ZoneGallery import failed") # Mask/face — expressions on recognition. mask_face = None try: from Project.Sanad.face.mask_face import FaceController mask_face = _safe("mask_face", FaceController) except Exception: log.exception("P3: FaceController import failed — LED mask unavailable") # NEW package-local visitor memory store. memory = None try: from visitor_memory import VisitorMemory memory = _safe("memory", VisitorMemory) except Exception: log.exception("P3: VisitorMemory init failed") # attachments for meth, val in (("attach_voice", voice_client), ("attach_audio_manager", audio_mgr)): if brain is not None and val is not None and hasattr(brain, meth): try: getattr(brain, meth)(val) except Exception: log.exception("brain.%s failed", meth) if live_sub is not None: if audio_mgr is not None and hasattr(live_sub, "attach_audio_manager"): try: live_sub.attach_audio_manager(audio_mgr) except Exception: log.exception("live_sub.attach_audio_manager failed") if camera is not None and hasattr(live_sub, "attach_camera"): try: live_sub.attach_camera(camera) # frames flow to the child for recognition except Exception: log.exception("live_sub.attach_camera failed") # Boot vision-restore (guarded — never crash the container if no camera). # The vendored recognition_state exposes read(path)/mutate(path, **) — NOT load(). try: from Project.Sanad.vision import recognition_state as _rs from Project.Sanad.config import BASE_DIR as _BD _state_path = _BD / "data" / ".recognition_state.json" st = _rs.read(_state_path) want_vision = bool(getattr(st, "vision_enabled", False)) if want_vision and camera is not None and hasattr(camera, "start"): try: camera.start() log.info("P3: vision restored (camera started)") except Exception: log.exception("P3: camera.start() failed — disabling vision, booting headless") try: _rs.mutate(_state_path, vision_enabled=False) except Exception: pass except Exception: log.exception("P3: recognition-state restore skipped") return dict(brain=brain, audio_mgr=audio_mgr, voice_client=voice_client, local_tts=local_tts, typed_replay=typed_replay, live_sub=live_sub, camera=camera, gallery=gallery, zone_gallery=zone_gallery, mask_face=mask_face, memory=memory) # ── 2b. mask wiring (lip-sync + emotions + social + lifelike) ──────────────── def _wire_mask(s): mask_face = s.get("mask_face") live_sub = s.get("live_sub") if mask_face is None: return # lip-sync if live_sub is not None and hasattr(live_sub, "register_mouth_callback"): try: live_sub.register_mouth_callback(lambda lvl: getattr(mask_face, "_gemini_linked", False) and _try(mask_face.set_mouth, int(lvl))) log.info("LED face wired to lip-sync (MOUTH)") except Exception: log.exception("mouth hook failed") # emotions if live_sub is not None and hasattr(live_sub, "register_face_callback"): try: _HOLD = {"heart": 2.6, "love": 2.6, "kiss": 2.4, "laugh": 2.2, "surprised": 1.8, "confused": 1.8} live_sub.register_face_callback(lambda n: getattr(mask_face, "_gemini_linked", False) and _try(mask_face.react, str(n), _HOLD.get(n, 1.6))) log.info("LED face wired to emotions (FACE)") except Exception: log.exception("face hook failed") # social QR (off-thread — ~9s BLE upload) if live_sub is not None and hasattr(live_sub, "register_social_callback"): try: def _on_social(account): if not getattr(mask_face, "_gemini_linked", False): return import threading as _th def _run(acc=str(account)): try: from Project.Sanad.dashboard.routes.mask_social import show_social_on_mask show_social_on_mask(acc) except Exception: log.exception("show_social_on_mask failed") _th.Thread(target=_run, daemon=True, name="mask-social").start() live_sub.register_social_callback(_on_social) log.info("LED face wired to social QR (SHOW)") except Exception: log.exception("social hook failed") # lifelike state + reactions (synchronous bus) try: from Project.Sanad.core.event_bus import bus as _bus _bus.on("brain.gestural_speaking_changed", lambda enabled=False, **_k: (_try(mask_face.set_speaking, bool(enabled)), (not enabled) and _try(mask_face.set_listening))) _bus.on("voice.connected", lambda **_k: _try(mask_face.set_listening)) _bus.on("voice.user_said", lambda **_k: _try(mask_face.set_thinking)) _bus.on("voice.disconnected", lambda **_k: _try(mask_face.set_idle)) _bus.on("voice.error", lambda **_k: _try(mask_face.react, "sad")) _bus.on("recognition.event", lambda **_k: _try(mask_face.react, "smile")) # greet on recognition log.info("LED face wired to lifelike + recognition-greeting events") except Exception: log.exception("lifelike hooks failed") def _try(fn, *a): try: return fn(*a) except Exception: log.exception("%s failed", getattr(fn, "__name__", "callback")) def _inject_main_shim(singletons): shim = types.ModuleType("Project.Sanad.main") for k, v in singletons.items(): setattr(shim, k, v) # motion / nav subsystems P3 does NOT own — present as None (routers guard). for k in ("arm", "wake_mgr", "macro_rec", "macro_play", "teacher", "live_voice", "loco_controller", "movement_dispatch", "nav_client"): if not hasattr(shim, k): setattr(shim, k, None) shim.SUBSYSTEMS = {k: singletons.get(k) for k in ( # type: ignore[attr-defined] "brain", "audio_mgr", "voice_client", "local_tts", "typed_replay", "live_sub", "camera", "gallery", "zone_gallery", "mask_face", "memory")} sys.modules["Project.Sanad.main"] = shim return shim # ── 3. build the P3 FastAPI app ─────────────────────────────────────────────── _P3_REST = [ ("health", "/api", "health"), ("system", "/api/system", "system"), ("voice", "/api/voice", "voice"), ("audio_control", "/api/audio", "audio"), ("prompt", "/api/prompt", "prompt"), ("typed_replay", "/api/typed-replay", "typed-replay"), ("records", "/api/records", "records"), ("logs", "/api/logs", "logs"), ("live_subprocess", "/api/live-subprocess", "live-subprocess"), ("recognition", "/api/recognition", "recognition"), # faces / VIP ("zones", "/api/zones", "zones"), # places (nav /go degrades to nav_unavailable) ("mask", "/api/mask", "mask"), ("mask_social", "/api/mask", "mask-social"), ] _P3_WS = ["log_stream"] def _tab_filter_snippet(): import json as _json css = ",".join(".tab[onclick*=\"switchTab('%s')\"],#tab-%s" % (t, t) for t in P3_SPA_HIDE) + ",#status-pills" return ( "" "" % (css, _json.dumps({"name": PACKAGE, "title": PACKAGE_TITLE, "tabs": P3_SPA_TABS}), _json.dumps(P3_UNMOUNTED)) ) def build_app(): from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse from Project.Sanad.config import BASE_DIR from Project.Sanad.core.config_loader import section as _cfg_section app_cfg = _cfg_section("dashboard", "app") app = FastAPI(title=PACKAGE_TITLE, version="0.1.0") loaded, failed = [], {} def _register(mod_name, prefix, tag, package="Project.Sanad.dashboard.routes"): try: mod = importlib.import_module("%s.%s" % (package, mod_name)) if not hasattr(mod, "router"): raise AttributeError("no 'router'") kw = {} if prefix: kw["prefix"] = prefix if tag: kw["tags"] = [tag] app.include_router(mod.router, **kw) loaded.append(mod_name) except Exception as exc: failed[mod_name] = str(exc) log.exception("P3: router %s failed — skipped", mod_name) for m, p, t in _P3_REST: _register(m, p, t) for m in _P3_WS: _register(m, None, "websocket", package="Project.Sanad.dashboard.websockets") # package-local routers: NEW memory store + P3 convenience. try: import routes_memory app.include_router(routes_memory.router, prefix="/api/memory", tags=["memory"]) loaded.append("routes_memory") except Exception as exc: failed["routes_memory"] = str(exc) log.exception("P3: routes_memory failed — /api/memory unavailable") try: import routes_p3 app.include_router(routes_p3.router, prefix="/api/p3", tags=["p3"]) loaded.append("routes_p3") except Exception as exc: failed["routes_p3"] = str(exc) log.exception("P3: routes_p3 failed") static_dir = BASE_DIR / app_cfg.get("static_subdir", "dashboard/static") try: app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") except Exception: log.exception("P3: static mount failed") @app.get("/api/package") async def package_info(): from sanad_pkg import license as _lic lic = _lic.current() api_key = {"has_key": False, "masked": "", "source": "default"} try: import Project.Sanad.config as _cfg_mod from Project.Sanad.dashboard.routes.voice import _mask_api_key _k = getattr(_cfg_mod, "GEMINI_API_KEY", "") or "" try: from Project.Sanad.config import load_config _stored = (load_config().get("gemini", {}) or {}).get("api_key") except Exception: _stored = None api_key = {"has_key": bool(_k), "masked": _mask_api_key(_k), "source": "config_file" if _stored else "default"} except Exception: log.exception("could not read api-key status") return { "package": PACKAGE, "title": PACKAGE_TITLE, "tabs": P3_SPA_TABS, "features": {"face_rec": bool(lic.feature("face_rec", True)), "places": bool(lic.feature("places", True)), "memory": bool(lic.feature("memory", True)), "mask": bool(lic.feature("mask", True))}, "api_key": api_key, "endpoints": {"recognition": "GET /api/recognition/*", "places": "GET /api/zones/*", "memories": "GET /api/memory/", "mask": "GET /api/mask/status"}, "loaded_routes": loaded, "failed_routes": failed, "license": lic.summary(), } def _filtered_spa(): index = static_dir / "index.html" if not index.exists(): return JSONResponse({"message": "SPA index.html not found", "loaded": loaded, "failed": failed}) try: html = index.read_text(encoding="utf-8") filt = _tab_filter_snippet() if "" in html: html = html.replace("", filt + "", 1) elif "" in html: html = html.replace("", filt + "", 1) else: html = filt + html return HTMLResponse(html) except OSError as exc: return JSONResponse({"error": "index.html unreadable: %s" % exc}, status_code=500) @app.get("/") async def root(): return _filtered_spa() @app.get("/full") async def full_dashboard(): return _filtered_spa() log.info("P3 dashboard built — routers loaded=%s failed=%s", loaded, list(failed)) return app def _init_dds_for_audio(): if os.environ.get("SANAD_BUS_ADDR"): return try: from unitree_sdk2py.core.channel import ChannelFactoryInitialize iface = os.environ.get("SANAD_DDS_INTERFACE", "eth0") ChannelFactoryInitialize(0, iface) log.info("P3: DDS ChannelFactoryInitialize(0, %s) done — chest audio enabled", iface) except Exception: log.exception("P3: DDS init failed — chest audio unavailable (plugged still works)") def _enforce_keyless_default(): import Project.Sanad.config as _cfg env_key = (os.environ.get("SANAD_GEMINI_API_KEY") or "").strip() saved = "" try: from Project.Sanad.config import load_config saved = ((load_config().get("gemini") or {}).get("api_key") or "").strip() except Exception: pass if env_key or saved: return _cfg.GEMINI_API_KEY = "" try: import Project.Sanad.gemini.client as _gc _gc.GEMINI_API_KEY = "" except Exception: pass log.info("P3: keyless by default — customer adds a Gemini key via the dashboard") def main(): host = os.environ.get("SANAD_DASHBOARD_HOST", "0.0.0.0") port = int(os.environ.get("SANAD_DASHBOARD_PORT", "8013")) log.info("Sanad P3 (Recognition + Places + Memories) starting — %s:%d", host, port) try: from sanad_pkg.bus import bus bus.connect() except Exception: log.exception("bus connect failed (continuing in-process)") _init_dds_for_audio() _enforce_keyless_default() singletons = _build_singletons() _wire_mask(singletons) _inject_main_shim(singletons) _mask = singletons.get("mask_face") if _mask is not None and hasattr(_mask, "shutdown"): atexit.register(lambda: _mask.shutdown()) _cam = singletons.get("camera") if _cam is not None and hasattr(_cam, "stop"): atexit.register(lambda: _try(_cam.stop)) import uvicorn app = build_app() uvicorn.run(app, host=host, port=port, log_level="info") if __name__ == "__main__": main()