#!/usr/bin/env python3
"""Sanad Package 3 — Facial Recognition + Places + Memories launcher.
P3 = perception + memory (NO motion): identify faces (VIP DB), recognize places
(zones), and remember visitors across visits, driving mask expressions on
recognition. Recognition is 100% Gemini-side / in-context (primer images) — no
local ML. Dashboard on :8013.
Self-contained wrapper around the vendored SanadV3 engine (like P1/P2):
1. bootstrap the Project.Sanad namespace + flat Mask path,
2. construct the perception subsystems (camera, face gallery, zone gallery,
recognition state) + comms core (brain/audio/voice/live_sub) + mask,
3. construct the NEW package-local VisitorMemory store,
4. wire lip-sync + Gemini emotions/social + recognition-driven expressions,
5. inject a P3-scoped Project.Sanad.main shim + a package-local memory shim,
6. mount the recognition/places/mask/memory routers + comms subset, serve the
SanadV3 SPA with non-P3 tabs hidden, on :8013.
Kept Python-3.8 compatible.
"""
from __future__ import annotations
import atexit
import importlib
import os
import sys
import types
from pathlib import Path
# ── 1. namespace bootstrap (mirrors app_p2.py) ───────────────────────────────
_APP = Path(os.environ.get("SANAD_APP_DIR", "/app"))
if str(_APP) not in sys.path:
sys.path.insert(0, str(_APP))
_MASK_DIR = os.environ.setdefault("SANAD_MASK_DIR", str(_APP / "mask"))
if _MASK_DIR and _MASK_DIR not in sys.path:
sys.path.insert(0, _MASK_DIR)
if "Project" not in sys.modules:
_proj = types.ModuleType("Project")
_proj.__path__ = []
sys.modules["Project"] = _proj
if "Project.Sanad" not in sys.modules:
_sanad = importlib.import_module("Sanad")
sys.modules["Project.Sanad"] = _sanad
sys.modules["Project"].Sanad = _sanad # type: ignore[attr-defined]
# package-local modules (memory store + its route) live next to this file.
sys.path.insert(0, str(Path(__file__).resolve().parent))
from Project.Sanad.core import asyncio_compat # noqa: E402,F401
from Project.Sanad.core.logger import get_logger # noqa: E402
log = get_logger("pkg3.app")
PACKAGE = "P3"
PACKAGE_TITLE = "Sanad — Recognition + Places + Memories (P3)"
# SanadV3 SPA tab ids P3 SHOWS / HIDES.
P3_SPA_TABS = ["operations", "voice", "recognition", "mask", "recordings", "settings"]
P3_SPA_HIDE = ["motion", "controller", "navigation", "livemap", "mapeditor",
"temp", "terminal"]
# Routers P3 does NOT mount → short-circuit client-side (no "Not Found" toasts).
P3_UNMOUNTED = ["/api/nav", "/api/controller", "/api/motion", "/api/skills",
"/api/macros", "/api/replay", "/api/wake-phrases", "/api/live-voice",
"/api/scripts"]
def _safe(name, factory):
try:
return factory()
except Exception:
log.exception("P3: could not construct %s — degraded", name)
return None
# ── 2. construct the perception + comms subsystems ───────────────────────────
def _build_singletons():
from Project.Sanad.core.brain import Brain
from Project.Sanad.voice.audio_manager import AudioManager
from Project.Sanad.gemini.client import GeminiVoiceClient
from Project.Sanad.gemini.subprocess import GeminiSubprocess
brain = _safe("brain", Brain) # CRITICAL — greetings ride the live session
audio_mgr = _safe("audio_mgr", AudioManager)
voice_client = _safe("voice_client", GeminiVoiceClient)
local_tts = None
try:
from Project.Sanad.voice.local_tts import LocalTTSEngine
local_tts = _safe("local_tts", LocalTTSEngine)
except Exception:
pass
typed_replay = None
if voice_client is not None and audio_mgr is not None:
try:
from Project.Sanad.voice.typed_replay import TypedReplayEngine
typed_replay = _safe("typed_replay", lambda: TypedReplayEngine(voice_client, audio_mgr))
except Exception:
pass
live_sub = _safe("live_sub", lambda: GeminiSubprocess())
# Perception: camera daemon + face gallery + zone gallery.
camera = None
try:
from Project.Sanad.vision.camera import CameraDaemon
camera = _safe("camera", lambda: CameraDaemon())
except Exception:
log.exception("P3: CameraDaemon import failed — vision unavailable")
gallery = None
try:
from Project.Sanad.vision.face_gallery import FaceGallery
gallery = _safe("gallery", lambda: FaceGallery())
except Exception:
log.exception("P3: FaceGallery import failed")
zone_gallery = None
try:
from Project.Sanad.vision.zone_gallery import ZoneGallery
zone_gallery = _safe("zone_gallery", lambda: ZoneGallery())
except Exception:
log.exception("P3: ZoneGallery import failed")
# Mask/face — expressions on recognition.
mask_face = None
try:
from Project.Sanad.face.mask_face import FaceController
mask_face = _safe("mask_face", FaceController)
except Exception:
log.exception("P3: FaceController import failed — LED mask unavailable")
# NEW package-local visitor memory store.
memory = None
try:
from visitor_memory import VisitorMemory
memory = _safe("memory", VisitorMemory)
except Exception:
log.exception("P3: VisitorMemory init failed")
# attachments
for meth, val in (("attach_voice", voice_client),
("attach_audio_manager", audio_mgr)):
if brain is not None and val is not None and hasattr(brain, meth):
try:
getattr(brain, meth)(val)
except Exception:
log.exception("brain.%s failed", meth)
if live_sub is not None:
if audio_mgr is not None and hasattr(live_sub, "attach_audio_manager"):
try:
live_sub.attach_audio_manager(audio_mgr)
except Exception:
log.exception("live_sub.attach_audio_manager failed")
if camera is not None and hasattr(live_sub, "attach_camera"):
try:
live_sub.attach_camera(camera) # frames flow to the child for recognition
except Exception:
log.exception("live_sub.attach_camera failed")
# Boot vision-restore (guarded — never crash the container if no camera).
# The vendored recognition_state exposes read(path)/mutate(path, **) — NOT load().
try:
from Project.Sanad.vision import recognition_state as _rs
from Project.Sanad.config import BASE_DIR as _BD
_state_path = _BD / "data" / ".recognition_state.json"
st = _rs.read(_state_path)
want_vision = bool(getattr(st, "vision_enabled", False))
if want_vision and camera is not None and hasattr(camera, "start"):
try:
camera.start()
log.info("P3: vision restored (camera started)")
except Exception:
log.exception("P3: camera.start() failed — disabling vision, booting headless")
try:
_rs.mutate(_state_path, vision_enabled=False)
except Exception:
pass
except Exception:
log.exception("P3: recognition-state restore skipped")
return dict(brain=brain, audio_mgr=audio_mgr, voice_client=voice_client,
local_tts=local_tts, typed_replay=typed_replay, live_sub=live_sub,
camera=camera, gallery=gallery, zone_gallery=zone_gallery,
mask_face=mask_face, memory=memory)
# ── 2b. mask wiring (lip-sync + emotions + social + lifelike) ────────────────
def _wire_mask(s):
mask_face = s.get("mask_face")
live_sub = s.get("live_sub")
if mask_face is None:
return
# lip-sync
if live_sub is not None and hasattr(live_sub, "register_mouth_callback"):
try:
live_sub.register_mouth_callback(lambda lvl: getattr(mask_face, "_gemini_linked", False) and _try(mask_face.set_mouth, int(lvl)))
log.info("LED face wired to lip-sync (MOUTH)")
except Exception:
log.exception("mouth hook failed")
# emotions
if live_sub is not None and hasattr(live_sub, "register_face_callback"):
try:
_HOLD = {"heart": 2.6, "love": 2.6, "kiss": 2.4, "laugh": 2.2,
"surprised": 1.8, "confused": 1.8}
live_sub.register_face_callback(lambda n: getattr(mask_face, "_gemini_linked", False) and _try(mask_face.react, str(n), _HOLD.get(n, 1.6)))
log.info("LED face wired to emotions (FACE)")
except Exception:
log.exception("face hook failed")
# social QR (off-thread — ~9s BLE upload)
if live_sub is not None and hasattr(live_sub, "register_social_callback"):
try:
def _on_social(account):
if not getattr(mask_face, "_gemini_linked", False):
return
import threading as _th
def _run(acc=str(account)):
try:
from Project.Sanad.dashboard.routes.mask_social import show_social_on_mask
show_social_on_mask(acc)
except Exception:
log.exception("show_social_on_mask failed")
_th.Thread(target=_run, daemon=True, name="mask-social").start()
live_sub.register_social_callback(_on_social)
log.info("LED face wired to social QR (SHOW)")
except Exception:
log.exception("social hook failed")
# lifelike state + reactions (synchronous bus)
try:
from Project.Sanad.core.event_bus import bus as _bus
_bus.on("brain.gestural_speaking_changed",
lambda enabled=False, **_k: (_try(mask_face.set_speaking, bool(enabled)),
(not enabled) and _try(mask_face.set_listening)))
_bus.on("voice.connected", lambda **_k: _try(mask_face.set_listening))
_bus.on("voice.user_said", lambda **_k: _try(mask_face.set_thinking))
_bus.on("voice.disconnected", lambda **_k: _try(mask_face.set_idle))
_bus.on("voice.error", lambda **_k: _try(mask_face.react, "sad"))
_bus.on("recognition.event", lambda **_k: _try(mask_face.react, "smile")) # greet on recognition
log.info("LED face wired to lifelike + recognition-greeting events")
except Exception:
log.exception("lifelike hooks failed")
def _try(fn, *a):
try:
return fn(*a)
except Exception:
log.exception("%s failed", getattr(fn, "__name__", "callback"))
def _inject_main_shim(singletons):
shim = types.ModuleType("Project.Sanad.main")
for k, v in singletons.items():
setattr(shim, k, v)
# motion / nav subsystems P3 does NOT own — present as None (routers guard).
for k in ("arm", "wake_mgr", "macro_rec", "macro_play", "teacher", "live_voice",
"loco_controller", "movement_dispatch", "nav_client"):
if not hasattr(shim, k):
setattr(shim, k, None)
shim.SUBSYSTEMS = {k: singletons.get(k) for k in ( # type: ignore[attr-defined]
"brain", "audio_mgr", "voice_client", "local_tts", "typed_replay", "live_sub",
"camera", "gallery", "zone_gallery", "mask_face", "memory")}
sys.modules["Project.Sanad.main"] = shim
return shim
# ── 3. build the P3 FastAPI app ───────────────────────────────────────────────
_P3_REST = [
("health", "/api", "health"),
("system", "/api/system", "system"),
("voice", "/api/voice", "voice"),
("audio_control", "/api/audio", "audio"),
("prompt", "/api/prompt", "prompt"),
("typed_replay", "/api/typed-replay", "typed-replay"),
("records", "/api/records", "records"),
("logs", "/api/logs", "logs"),
("live_subprocess", "/api/live-subprocess", "live-subprocess"),
("recognition", "/api/recognition", "recognition"), # faces / VIP
("zones", "/api/zones", "zones"), # places (nav /go degrades to nav_unavailable)
("mask", "/api/mask", "mask"),
("mask_social", "/api/mask", "mask-social"),
]
_P3_WS = ["log_stream"]
def _tab_filter_snippet():
import json as _json
css = ",".join(".tab[onclick*=\"switchTab('%s')\"],#tab-%s" % (t, t) for t in P3_SPA_HIDE) + ",#status-pills"
return (
""
""
% (css, _json.dumps({"name": PACKAGE, "title": PACKAGE_TITLE, "tabs": P3_SPA_TABS}),
_json.dumps(P3_UNMOUNTED))
)
def build_app():
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
from Project.Sanad.config import BASE_DIR
from Project.Sanad.core.config_loader import section as _cfg_section
app_cfg = _cfg_section("dashboard", "app")
app = FastAPI(title=PACKAGE_TITLE, version="0.1.0")
loaded, failed = [], {}
def _register(mod_name, prefix, tag, package="Project.Sanad.dashboard.routes"):
try:
mod = importlib.import_module("%s.%s" % (package, mod_name))
if not hasattr(mod, "router"):
raise AttributeError("no 'router'")
kw = {}
if prefix:
kw["prefix"] = prefix
if tag:
kw["tags"] = [tag]
app.include_router(mod.router, **kw)
loaded.append(mod_name)
except Exception as exc:
failed[mod_name] = str(exc)
log.exception("P3: router %s failed — skipped", mod_name)
for m, p, t in _P3_REST:
_register(m, p, t)
for m in _P3_WS:
_register(m, None, "websocket", package="Project.Sanad.dashboard.websockets")
# package-local routers: NEW memory store + P3 convenience.
try:
import routes_memory
app.include_router(routes_memory.router, prefix="/api/memory", tags=["memory"])
loaded.append("routes_memory")
except Exception as exc:
failed["routes_memory"] = str(exc)
log.exception("P3: routes_memory failed — /api/memory unavailable")
try:
import routes_p3
app.include_router(routes_p3.router, prefix="/api/p3", tags=["p3"])
loaded.append("routes_p3")
except Exception as exc:
failed["routes_p3"] = str(exc)
log.exception("P3: routes_p3 failed")
static_dir = BASE_DIR / app_cfg.get("static_subdir", "dashboard/static")
try:
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
except Exception:
log.exception("P3: static mount failed")
@app.get("/api/package")
async def package_info():
from sanad_pkg import license as _lic
lic = _lic.current()
api_key = {"has_key": False, "masked": "", "source": "default"}
try:
import Project.Sanad.config as _cfg_mod
from Project.Sanad.dashboard.routes.voice import _mask_api_key
_k = getattr(_cfg_mod, "GEMINI_API_KEY", "") or ""
try:
from Project.Sanad.config import load_config
_stored = (load_config().get("gemini", {}) or {}).get("api_key")
except Exception:
_stored = None
api_key = {"has_key": bool(_k), "masked": _mask_api_key(_k),
"source": "config_file" if _stored else "default"}
except Exception:
log.exception("could not read api-key status")
return {
"package": PACKAGE, "title": PACKAGE_TITLE, "tabs": P3_SPA_TABS,
"features": {"face_rec": bool(lic.feature("face_rec", True)),
"places": bool(lic.feature("places", True)),
"memory": bool(lic.feature("memory", True)),
"mask": bool(lic.feature("mask", True))},
"api_key": api_key,
"endpoints": {"recognition": "GET /api/recognition/*", "places": "GET /api/zones/*",
"memories": "GET /api/memory/", "mask": "GET /api/mask/status"},
"loaded_routes": loaded, "failed_routes": failed, "license": lic.summary(),
}
def _filtered_spa():
index = static_dir / "index.html"
if not index.exists():
return JSONResponse({"message": "SPA index.html not found", "loaded": loaded, "failed": failed})
try:
html = index.read_text(encoding="utf-8")
filt = _tab_filter_snippet()
if "" in html:
html = html.replace("", filt + "", 1)
elif "