Sanad_Package_1/routes_p1.py

244 lines
10 KiB
Python

"""P1-specific dashboard routes (mounted at /api/p1 by app_p1.py).
First-class P1 settings that REUSE Sanad's canonical logic (no fork) and add the
one thing the base routes don't: applying a change to the LIVE Gemini session
immediately by restarting the voice subprocess (the child reads the API key +
persona at spawn time).
/api/p1/api-key GET masked status | POST set/update + live-restart
/api/p1/persona GET current persona+rules | POST update persona + live-restart
/api/p1/settings one-shot view (api-key + persona + language + audio + live)
Kept Python-3.8 compatible.
"""
from __future__ import annotations
import asyncio
import base64
import os
from fastapi import APIRouter, HTTPException
from Project.Sanad.core.logger import get_logger
from Project.Sanad.dashboard.routes import voice as _voice # reuse api-key logic
from Project.Sanad.dashboard.routes import prompt as _prompt # reuse persona logic
from Project.Sanad.dashboard.routes import typed_replay as _tr # reuse local TTS say
from sanad_pkg.bus import bus
# Bind request models as module-level names so FastAPI resolves body annotations
# cleanly under `from __future__ import annotations` (dotted forward-refs are
# version-fragile).
ApiKeyPayload = _voice.ApiKeyPayload
PromptUpdate = _prompt.PromptUpdate
SayPayload = _tr.SayPayload
log = get_logger("pkg1.routes")
router = APIRouter()
async def _restart_live_if_running() -> bool:
"""Restart the live Gemini subprocess (if running) so a new key/persona
takes effect immediately. Returns True if it was restarted."""
try:
from Project.Sanad.main import live_sub
is_running = getattr(live_sub, "is_running", None)
if live_sub is None or not callable(is_running) or not is_running():
return False
try:
live_sub.stop()
except Exception:
log.exception("live_sub.stop() failed")
try:
await asyncio.to_thread(live_sub.start)
return True
except Exception:
log.exception("live_sub.start() failed")
return False
except Exception:
log.exception("could not restart live subprocess")
return False
# ─────────────────────────── Gemini API key ───────────────────────────
def _persist_and_hotswap_key(key: str) -> None:
"""Persist gemini.api_key to data/motions/config.json (empty string => remove)
and hot-swap the in-memory globals so it takes effect without a restart.
Patches BOTH Project.Sanad.config and gemini.client (each binds its own ref)."""
from Project.Sanad.config import load_config, save_config
import Project.Sanad.config as _cfg_mod
cfg = load_config() or {}
g = cfg.get("gemini") if isinstance(cfg.get("gemini"), dict) else {}
if key:
g["api_key"] = key
else:
g.pop("api_key", None)
cfg["gemini"] = g
save_config(cfg)
_cfg_mod.GEMINI_API_KEY = key
try:
import Project.Sanad.gemini.client as _gc
_gc.GEMINI_API_KEY = key
except Exception:
log.exception("could not patch gemini.client.GEMINI_API_KEY")
async def _disconnect_voice():
try:
from Project.Sanad.main import voice_client
if voice_client is not None and getattr(voice_client, "connected", False):
await voice_client.disconnect()
except Exception:
log.exception("voice_client disconnect failed")
@router.get("/api-key")
async def p1_get_api_key():
"""Masked current key + where it came from (delegates to the voice route)."""
return await _voice.get_api_key()
@router.post("/api-key")
async def p1_set_api_key(payload: ApiKeyPayload):
"""ADD / update the Gemini API key. Relaxed validation — accepts any
reasonable key (AIza… standard keys AND AQ.… / ephemeral tokens), not just
AIza. Persists + hot-swaps + restarts the live session so it applies now."""
key = (payload.api_key or "").strip()
if len(key) < 10:
raise HTTPException(400, "API key looks too short (paste the full key).")
_persist_and_hotswap_key(key)
await _disconnect_voice()
restarted = await _restart_live_if_running()
return {
"ok": True,
"masked": _voice._mask_api_key(key),
"source": "config_file",
"live_subprocess_restarted": restarted,
"message": "API key added" + (" and applied (live session restarted)."
if restarted else " — start the session to use it."),
}
@router.post("/api-key/delete")
async def p1_delete_api_key():
"""DELETE the Gemini API key — clears it from data/motions/config.json and
in-memory. Conversation stops until a new key is added. (If config.py has a
hardcoded fallback, that re-applies on the next process restart.)"""
_persist_and_hotswap_key("")
await _disconnect_voice()
restarted = await _restart_live_if_running()
return {
"ok": True,
"deleted": True,
"live_subprocess_restarted": restarted,
"message": "API key deleted. Add a new key to re-enable conversation.",
}
# ─────────────────────────── Robot persona ───────────────────────────
@router.get("/persona")
async def p1_get_persona():
"""Current persona system prompt + parsed rules + file paths."""
return await _prompt.get_prompt()
@router.post("/persona")
async def p1_set_persona(payload: PromptUpdate):
"""Change the robot persona — write scripts/sanad_script.txt (canonical
prompt logic) and restart the live session so it speaks with the new
persona immediately. The persona is also where you steer language/dialect."""
result = await _prompt.update_prompt(payload) # atomic write to sanad_script.txt
restarted = await _restart_live_if_running()
result["live_subprocess_restarted"] = restarted
result["message"] = (
"Persona saved and applied — live session restarted."
if restarted else
"Persona saved. Start (or restart) the live session to use the new persona."
)
return result
# ─────────────────────────── say a line ───────────────────────────
@router.post("/say")
async def p1_say(payload: SayPayload):
"""Speak a typed line. Standalone (no bus) → play locally via Sanad's
typed-replay. Multi-package (SANAD_BUS_ADDR set) → synth via Gemini and hand
the PCM to the hwbroker `speak.request`, so it plays under the audio-lock
(refused/queued while the live conversation is speaking)."""
text = (payload.text or "").strip()
if not text:
raise HTTPException(400, "text cannot be empty")
if not os.environ.get("SANAD_BUS_ADDR"):
# standalone — no contention, play directly (reuse canonical typed-replay)
return await _tr.say(payload)
# multi-package — route audio output through the hwbroker audio-lock
from Project.Sanad.main import voice_client
if voice_client is None:
raise HTTPException(503, "voice client unavailable")
if not getattr(voice_client, "connected", False):
try:
await voice_client.connect()
except Exception as exc:
raise HTTPException(503, "Gemini connect failed: %s" % exc)
try:
audio, _parts = await voice_client.send_text(text, owner="p1_say")
except Exception as exc:
raise HTTPException(502, "Gemini error: %s" % exc)
if not audio:
return {"ok": False, "routed": "hwbroker", "reason": "no audio produced"}
bus.emit_sync("speak.request", owner="p1",
pcm_b64=base64.b64encode(audio).decode("ascii"),
rate=24000, channels=1, sampwidth=2)
return {"ok": True, "routed": "hwbroker (audio-lock)"}
# ─────────────────────────── logs ───────────────────────────
@router.post("/logs/delete")
async def p1_delete_logs():
"""Delete all log files on the robot. Active .log files are truncated (so the
live logger keeps a valid handle); rotated/snapshot/bundle files are removed."""
from Project.Sanad.config import LOGS_DIR
cleared = []
try:
for p in sorted(LOGS_DIR.glob("*.log*")):
try:
if p.name.endswith(".log") and "_snapshot_" not in p.name:
open(p, "w").close() # truncate active log
else:
p.unlink() # remove rotated/snapshot/bundle
cleared.append(p.name)
except Exception:
log.exception("could not clear log %s", p.name)
except Exception:
log.exception("delete logs failed")
return {"ok": True, "count": len(cleared), "cleared": cleared}
# ─────────────────────────── combined view ───────────────────────────
@router.get("/settings")
async def p1_settings():
"""One-shot P1 settings: api-key status + persona + language + audio + live."""
key_status = await _voice.get_api_key()
persona = ""
try:
persona = _prompt._load_system_prompt()
except Exception:
log.exception("could not load persona")
live_running = False
try:
from Project.Sanad.main import live_sub
is_running = getattr(live_sub, "is_running", None)
live_running = bool(live_sub is not None and callable(is_running) and is_running())
except Exception:
pass
return {
"package": "P1",
"api_key": key_status,
"persona_preview": (persona[:400] + ("" if len(persona) > 400 else "")),
"persona_chars": len(persona),
"language": os.environ.get("SANAD_LANGUAGE", ""),
"audio_profile": os.environ.get("SANAD_AUDIO_PROFILE", "builtin"),
"live_running": live_running,
}