Sanad_Package_1/app_p1.py

386 lines
16 KiB
Python

#!/usr/bin/env python3
"""Sanad Package 1 — Basic Communication launcher.
P1 = hands-free conversation in ONE operator-selected language (Gemini Live),
audio over the G1 chest OR any plugged USB device. NO voice-command motion,
NO vision/recognition, NO navigation.
This is a *containerization wrapper* around the canonical Sanad source — it does
NOT fork Sanad. It:
1. bootstraps the `Project.Sanad` namespace (deployed layout),
2. constructs ONLY the P1 comms subsystems,
3. injects a P1-scoped `Project.Sanad.main` shim (the dashboard routers import
their singletons from there — lazily, inside handlers),
4. mounts ONLY the P1 dashboard routers + the logs websocket,
5. serves the real Sanad SPA with non-P1 tabs hidden,
6. runs uvicorn on the P1 port (default :8011).
Kept Python-3.8 compatible.
"""
from __future__ import annotations
import importlib
import os
import sys
import types
from pathlib import Path
# ── 1. namespace bootstrap (mirrors Sanad/main.py, deployed layout) ──────────
# In the image: /app/Sanad is the canonical package, /app is on sys.path.
_APP = Path(os.environ.get("SANAD_APP_DIR", "/app"))
_SANAD_DIR = _APP / "Sanad"
if str(_APP) not in sys.path:
sys.path.insert(0, str(_APP))
if "Project" not in sys.modules:
_proj = types.ModuleType("Project")
_proj.__path__ = [] # namespace package
sys.modules["Project"] = _proj
if "Project.Sanad" not in sys.modules:
_sanad = importlib.import_module("Sanad")
sys.modules["Project.Sanad"] = _sanad
sys.modules["Project"].Sanad = _sanad # type: ignore[attr-defined]
# asyncio.to_thread shim for Py3.8 — before any Sanad module that uses it.
from Project.Sanad.core import asyncio_compat # noqa: E402,F401
from Project.Sanad.core.logger import get_logger # noqa: E402
log = get_logger("pkg1.app")
PACKAGE = "P1"
PACKAGE_TITLE = "Sanad — Basic Communication (P1)"
# SanadV3 SPA tab ids (nav is <div class="tab" onclick="switchTab('X')">, content #tab-X).
# P1 (Basic Communication) shows only the comms tabs; everything else is hidden on /full.
P1_SPA_TABS = ["operations", "voice", "recordings", "settings"]
P1_SPA_HIDE = ["motion", "controller", "navigation", "livemap", "mapeditor",
"recognition", "mask", "temp", "terminal"]
# Routers P1 does NOT mount — the SPA polls these; short-circuit client-side so
# /full raises no "Not Found" toasts for an unsold feature.
P1_UNMOUNTED = ["/api/nav", "/api/zones", "/api/recognition", "/api/controller",
"/api/motion", "/api/skills", "/api/macros", "/api/replay",
"/api/wake-phrases", "/api/live-voice", "/api/scripts", "/api/mask"]
def _safe(name, factory):
try:
return factory()
except Exception:
log.exception("P1: could not construct %s — degraded", name)
return None
# ── 2. construct ONLY P1 comms subsystems ────────────────────────────────────
def _build_singletons():
from Project.Sanad.core.brain import Brain
from Project.Sanad.voice.audio_manager import AudioManager
from Project.Sanad.gemini.client import GeminiVoiceClient
from Project.Sanad.gemini.subprocess import GeminiSubprocess
brain = _safe("brain", Brain)
audio_mgr = _safe("audio_mgr", AudioManager)
voice_client = _safe("voice_client", GeminiVoiceClient)
local_tts = None
try:
from Project.Sanad.voice.local_tts import LocalTTSEngine
local_tts = _safe("local_tts", LocalTTSEngine)
except Exception:
pass
typed_replay = None
if voice_client is not None and audio_mgr is not None:
try:
from Project.Sanad.voice.typed_replay import TypedReplayEngine
typed_replay = _safe("typed_replay", lambda: TypedReplayEngine(voice_client, audio_mgr))
except Exception:
pass
# P1 keeps the Gemini live supervisor (basic profile, motion paths off).
live_sub = _safe("live_sub", lambda: GeminiSubprocess())
# Brain attachments (only what P1 has).
for meth, val in (("attach_voice", voice_client),
("attach_audio_manager", audio_mgr)):
if brain is not None and val is not None and hasattr(brain, meth):
try:
getattr(brain, meth)(val)
except Exception:
log.exception("brain.%s failed", meth)
return dict(brain=brain, audio_mgr=audio_mgr, voice_client=voice_client,
local_tts=local_tts, typed_replay=typed_replay, live_sub=live_sub)
def _inject_main_shim(singletons):
"""Create a P1-scoped `Project.Sanad.main` so dashboard routers resolve
their `from Project.Sanad.main import <x>` against P1's subset. Excluded
subsystems are present as None (routers guard for None)."""
shim = types.ModuleType("Project.Sanad.main")
# P1 singletons
for k, v in singletons.items():
setattr(shim, k, v)
# excluded subsystems — must EXIST (as None) so lazy imports never ImportError
for k in ("arm", "wake_mgr", "macro_rec", "macro_play", "teacher",
"live_voice", "camera", "gallery", "zone_gallery",
"loco_controller", "movement_dispatch"):
if not hasattr(shim, k):
setattr(shim, k, None)
shim.SUBSYSTEMS = { # type: ignore[attr-defined]
"brain": singletons.get("brain"),
"audio_mgr": singletons.get("audio_mgr"),
"voice_client": singletons.get("voice_client"),
"local_tts": singletons.get("local_tts"),
"typed_replay": singletons.get("typed_replay"),
"live_sub": singletons.get("live_sub"),
}
sys.modules["Project.Sanad.main"] = shim
return shim
# ── 3. build the P1 FastAPI app ───────────────────────────────────────────────
# (module, prefix, tag) — P1 subset of dashboard/app.py's _REST_ROUTES.
_P1_REST = [
("health", "/api", "health"),
("system", "/api/system", "system"),
("voice", "/api/voice", "voice"),
("audio_control", "/api/audio", "audio"),
("prompt", "/api/prompt", "prompt"),
("typed_replay", "/api/typed-replay", "typed-replay"),
("records", "/api/records", "records"),
("logs", "/api/logs", "logs"),
("live_subprocess", "/api/live-subprocess", "live-subprocess"),
]
_P1_WS = ["log_stream"]
def _tab_filter_snippet():
"""Hide non-P1 tabs/pills + short-circuit unmounted-router polls on /full.
The SanadV3 nav is <div class="tab" onclick="switchTab('X')">, so hide by the
stable onclick/id via CSS (applies before render, no DOMContentLoaded race)."""
import json as _json
css = ",".join(".tab[onclick*=\"switchTab('%s')\"],#tab-%s" % (t, t) for t in P1_SPA_HIDE) + ",#status-pills"
return (
"<style>%s{display:none!important}</style>"
"<script>window.SANAD_PACKAGE=%s;"
"(function(){var B=%s,_f=window.fetch;"
"window.fetch=function(i,o){try{var u=(typeof i==='string')?i:(i&&i.url)||'',"
"p=u.replace(/^https?:\\/\\/[^/]+/,'');"
"for(var k=0;k<B.length;k++){if(p.indexOf(B[k])===0)"
"return Promise.resolve(new Response('{}',{status:200,headers:{'Content-Type':'application/json'}}));}"
"}catch(e){}return _f.apply(this,arguments);};})();</script>"
% (css, _json.dumps({"name": PACKAGE, "title": PACKAGE_TITLE, "tabs": P1_SPA_TABS}),
_json.dumps(P1_UNMOUNTED))
)
def build_app():
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
from Project.Sanad.config import BASE_DIR
from Project.Sanad.core.config_loader import section as _cfg_section
app_cfg = _cfg_section("dashboard", "app")
app = FastAPI(title=PACKAGE_TITLE, version="0.1.0")
loaded, failed = [], {}
def _register(mod_name, prefix, tag, package="Project.Sanad.dashboard.routes"):
try:
mod = importlib.import_module("%s.%s" % (package, mod_name))
if not hasattr(mod, "router"):
raise AttributeError("no 'router'")
kw = {}
if prefix:
kw["prefix"] = prefix
if tag:
kw["tags"] = [tag]
app.include_router(mod.router, **kw)
loaded.append(mod_name)
except Exception as exc:
failed[mod_name] = str(exc)
log.exception("P1: router %s failed — skipped", mod_name)
for m, p, t in _P1_REST:
_register(m, p, t)
for m in _P1_WS:
_register(m, None, "websocket", package="Project.Sanad.dashboard.websockets")
# P1-specific routes (package-local, not part of Sanad): /api/p1/* —
# first-class "set / update Gemini API key" with live-session restart.
try:
import routes_p1
app.include_router(routes_p1.router, prefix="/api/p1", tags=["p1"])
loaded.append("routes_p1")
except Exception as exc:
failed["routes_p1"] = str(exc)
log.exception("P1: routes_p1 failed — /api/p1 unavailable")
static_dir = BASE_DIR / app_cfg.get("static_subdir", "dashboard/static")
try:
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
except Exception:
log.exception("P1: static mount failed")
@app.get("/api/package")
async def package_info():
from sanad_pkg import license as _lic
lic = _lic.current()
# Gemini API-key status (masked) — so the dashboard can show whether a
# key is set without ever exposing it.
api_key = {"has_key": False, "masked": "", "source": "default"}
try:
import Project.Sanad.config as _cfg_mod
from Project.Sanad.dashboard.routes.voice import _mask_api_key
_k = getattr(_cfg_mod, "GEMINI_API_KEY", "") or ""
try:
from Project.Sanad.config import load_config
_stored = (load_config().get("gemini", {}) or {}).get("api_key")
except Exception:
_stored = None
api_key = {"has_key": bool(_k), "masked": _mask_api_key(_k),
"source": "config_file" if _stored else "default"}
except Exception:
log.exception("could not read api-key status")
return {
"package": PACKAGE,
"title": PACKAGE_TITLE,
"tabs": P1_SPA_TABS,
"language": os.environ.get("SANAD_LANGUAGE", lic.feature("language", "ar")),
"audio_profile": os.environ.get("SANAD_AUDIO_PROFILE", "builtin"),
"api_key": api_key,
"endpoints": {
"api_key_get": "GET /api/p1/api-key",
"api_key_set": "POST /api/p1/api-key {\"api_key\": \"AIza...\"}",
"persona_get": "GET /api/p1/persona",
"persona_set": "POST /api/p1/persona {\"content\": \"<system prompt>\"}",
"settings": "GET /api/p1/settings",
},
"loaded_routes": loaded,
"failed_routes": failed,
"license": lic.summary(),
}
# P1's own clean control page (cards). Falls back to the full SPA if missing.
p1_static = Path(os.environ.get(
"SANAD_P1_STATIC", str(Path(__file__).resolve().parent / "static")))
def _widget_html():
"""The P1 Quick-Controls widget injected into the full SPA (cards on /full)."""
w = p1_static / "p1_widget.html"
try:
return w.read_text(encoding="utf-8") if w.exists() else ""
except OSError:
return ""
def _filtered_spa():
index = static_dir / "index.html"
if not index.exists():
return JSONResponse({"message": "full SPA index.html not found",
"loaded": loaded, "failed": failed})
try:
html = index.read_text(encoding="utf-8")
# Filter (CSS hide + fetch shim) in <head> so the fetch override is set
# BEFORE the SPA's body scripts/pollers run; the widget goes at body end.
filt = _tab_filter_snippet()
if "</head>" in html:
html = html.replace("</head>", filt + "</head>", 1)
elif "</body>" in html:
html = html.replace("</body>", filt + "</body>", 1)
else:
html = filt + html
w = _widget_html()
if w:
html = html.replace("</body>", w + "</body>", 1) if "</body>" in html else html + w
return HTMLResponse(html)
except OSError as exc:
return JSONResponse({"error": "index.html unreadable: %s" % exc}, status_code=500)
@app.get("/")
async def root():
page = p1_static / "p1.html"
if page.exists():
try:
return HTMLResponse(page.read_text(encoding="utf-8"))
except OSError as exc:
log.exception("could not read p1.html — falling back to SPA")
return _filtered_spa()
@app.get("/full")
async def full_dashboard():
"""The complete Sanad SPA (advanced), with non-P1 tabs hidden."""
return _filtered_spa()
log.info("P1 dashboard built — routers loaded=%s failed=%s", loaded, list(failed))
return app
def _init_dds_for_audio():
"""Standalone P1 owns the ONE ChannelFactoryInitialize so the G1 chest
AudioClient works — the same DDS init Sanad performs via arm.init(). Without
it, AudioClient() fails ('NoneType._ref') and chest playback is silent.
In bus/multi-package mode the hwbroker owns DDS, so P1 skips it there."""
if os.environ.get("SANAD_BUS_ADDR"):
return
try:
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
iface = os.environ.get("SANAD_DDS_INTERFACE", "eth0")
ChannelFactoryInitialize(0, iface)
log.info("P1: DDS ChannelFactoryInitialize(0, %s) done — chest audio enabled", iface)
except Exception:
log.exception("P1: DDS init failed — chest audio unavailable (plugged/USB still works)")
def _enforce_keyless_default():
"""P1 ships KEYLESS — the customer adds their own Gemini key via the dashboard
(saved to data/motions/config.json). Honor only an explicit vendor key
(SANAD_GEMINI_API_KEY env) or a customer-saved key; otherwise IGNORE any key
baked into the shipped Sanad config so the product never carries a vendor key."""
import Project.Sanad.config as _cfg
env_key = (os.environ.get("SANAD_GEMINI_API_KEY") or "").strip()
saved = ""
try:
from Project.Sanad.config import load_config
saved = ((load_config().get("gemini") or {}).get("api_key") or "").strip()
except Exception:
pass
if env_key or saved:
return # explicit/customer key present — keep it
_cfg.GEMINI_API_KEY = ""
try:
import Project.Sanad.gemini.client as _gc
_gc.GEMINI_API_KEY = ""
except Exception:
pass
log.info("P1: keyless by default — customer must add a Gemini key via the dashboard")
def main():
host = os.environ.get("SANAD_DASHBOARD_HOST", "0.0.0.0")
port = int(os.environ.get("SANAD_DASHBOARD_PORT", "8011"))
log.info("Sanad P1 (Basic Communication) starting — %s:%d lang=%s audio=%s",
host, port,
os.environ.get("SANAD_LANGUAGE", "?"),
os.environ.get("SANAD_AUDIO_PROFILE", "builtin"))
# optional cross-container bus (no-op without SANAD_BUS_ADDR)
try:
from sanad_pkg.bus import bus
bus.connect()
except Exception:
log.exception("bus connect failed (continuing in-process)")
_init_dds_for_audio()
_enforce_keyless_default()
singletons = _build_singletons()
_inject_main_shim(singletons)
import uvicorn
app = build_app()
uvicorn.run(app, host=host, port=port, log_level="info")
if __name__ == "__main__":
main()