605 lines
27 KiB
Python

"""Gemini live subprocess supervisor.
Spawns `voice/sanad_voice.py` as a managed child with `SANAD_VOICE_BRAIN=gemini`,
tails the child's stdout, and extracts state transitions + user transcripts
from the Gemini-specific log lines emitted by `gemini/script.py:GeminiBrain`.
When a new model is added, build its own sibling supervisor (see
`voice/model_subprocess.py` for the template) — do not refactor this file.
"""
from __future__ import annotations
import base64
import json
import os
import signal
import subprocess
import sys
import threading
from collections import deque
from datetime import datetime
from typing import Any, Optional, Union
from pathlib import Path
from Project.Sanad.config import BASE_DIR, LOGS_DIR, SCRIPTS_DIR, LIVE_TUNE
from Project.Sanad.core.config_loader import section as _cfg_section
from Project.Sanad.core.logger import get_logger
log = get_logger("gemini_subprocess")
_LS_CFG = _cfg_section("gemini", "subprocess")
# Camera frame forwarding — push the latest JPEG to the child over stdin
# at this interval (seconds). 0.5 s ≈ 2 fps, matching the child's
# SANAD_VISION_SEND_HZ default. The child de-stales + relays to Gemini.
_FRAME_FORWARD_INTERVAL_S = float(_LS_CFG.get("frame_forward_interval_sec", 0.5))
# Audio profile watcher — poll pactl for the Anker USB device at this
# interval, send "profile:<json>" to the child on every state change.
_AUDIO_WATCH_INTERVAL_S = float(_LS_CFG.get("audio_watch_interval_sec", 1.5))
# The Anker profile id, as defined in voice/audio_devices.py. When this
# profile is fully plugged (both sink + source present), we switch the
# child to "anker"; otherwise we hold the boot fallback profile.
_ANKER_PROFILE_ID = "anker_powerconf"
def _resolve_live_script() -> Path:
"""Locate the voice script to run as subprocess.
Default: voice/sanad_voice.py (the canonical G1 built-in mic +
AudioClient speaker path). Override with SANAD_LIVE_SCRIPT.
"""
override = os.environ.get("SANAD_LIVE_SCRIPT", "").strip()
if override:
p = Path(override).expanduser()
if p.exists():
return p
for c in (BASE_DIR / "voice" / "sanad_voice.py",
SCRIPTS_DIR / "sanad_voice.py"):
if c.exists():
return c
return SCRIPTS_DIR / "sanad_voice.py"
LIVE_SCRIPT = _resolve_live_script()
LOG_TAIL_SIZE = _LS_CFG.get("log_tail_size", 2000)
TRANSCRIPT_TAIL_SIZE = _LS_CFG.get("transcript_tail_size", 30)
# Persistent on-disk log for the full subprocess session.
LIVE_LOG_DIR = LOGS_DIR
LIVE_LOG_NAME = _LS_CFG.get("log_name", "gemini_subprocess")
_STOP_TIMEOUT_SEC = _LS_CFG.get("stop_timeout_sec", 3.0)
_TERMINATE_TIMEOUT_SEC = _LS_CFG.get("terminate_timeout_sec", 2.0)
_NOISY_PREFIXES = tuple(_LS_CFG.get("noisy_prefixes", [
"ALSA lib ", "Expression 'alsa_", "Cannot connect to server socket",
"jack server is not running",
]))
_NOISY_FRAGMENTS = tuple(_LS_CFG.get("noisy_fragments", [
"Unknown PCM", "Evaluate error", "snd_pcm_open_noupdate",
"PaAlsaStream", "snd_config_evaluate", "snd_func_refer",
]))
class GeminiSubprocess:
def __init__(self):
self._lock = threading.Lock()
self.process: subprocess.Popen | None = None
self.log_tail: deque[str] = deque(maxlen=LOG_TAIL_SIZE)
self.user_transcript: deque[str] = deque(maxlen=TRANSCRIPT_TAIL_SIZE)
# Gemini's OWN spoken text (output transcription). The movement
# dispatcher (N2) polls this the way LiveVoiceLoop polls
# user_transcript — it reads what Gemini *said* and fires motion on
# a confirmation-phrase match (the Marcus pattern). Also handy for
# surfacing the bot side of the conversation on the dashboard.
self.bot_transcript: deque[str] = deque(maxlen=TRANSCRIPT_TAIL_SIZE)
# N2 Phase 3 — callbacks fired with each new BOT: line (Gemini's own
# spoken text). The MovementDispatcher registers here to drive
# locomotion off Gemini's confirmation phrases. Fired on the reader
# thread; callbacks must be cheap / non-blocking (the dispatcher just
# enqueues to its own worker).
self._bot_callbacks: list = []
# _track_line (which runs under self._lock) stashes the latest BOT text
# here; the reader loop fires callbacks AFTER releasing the lock so a
# slow callback (e.g. movement dispatch reading state) never stalls the
# reader thread or blocks log parsing.
self._pending_bot: str | None = None
self._reader_thread: threading.Thread | None = None
self._log_file = None # opened per-session in _reader_loop
self.state = "stopped"
self.state_message = "Idle."
self.last_user_text = ""
self.last_bot_text = ""
self.suppressed_noise = 0
# ── stdin push channel (camera frames + motion state + profile) ──
# The child (gemini/script.py) reads "frame:<b64>\n",
# "state:<json>\n", and "profile:<json>\n" lines off its stdin.
# Writes are serialised because the frame forwarder, motion-state
# bus handler, and audio watcher all call from different threads.
self._stdin_lock = threading.Lock()
self._camera = None # set via attach_camera()
self._frame_thread: threading.Thread | None = None
self._frame_stop = threading.Event()
# ── audio profile hot-swap ────────────────────────────────
# _audio_mgr is the parent's AudioManager — needed so we can keep
# PulseAudio defaults in sync (so /api/records/play etc. follow
# the same device the live session uses). Set via attach_audio_manager.
self._audio_mgr = None
self._audio_thread: threading.Thread | None = None
self._audio_stop = threading.Event()
# The boot profile captured at start() — what we revert to when
# the Anker is unplugged. Read from env (already in LIVE_TUNE).
self._boot_profile_id: str = "builtin"
# Last profile signalled to the child (for edge-only dispatch).
self._last_profile_id: str | None = None
# ── camera attach (called once from main.py) ──────────────
def register_bot_callback(self, callback) -> None:
"""Register a fn(text) fired on each new BOT: line (Gemini's spoken
text). Used by the N2 movement dispatcher. Cheap/non-blocking only."""
if callback not in self._bot_callbacks:
self._bot_callbacks.append(callback)
def attach_camera(self, camera) -> None:
"""Give the supervisor a reference to the CameraDaemon so it can
forward frames to the child over stdin while a session runs."""
self._camera = camera
def attach_audio_manager(self, audio_mgr) -> None:
"""Hand the parent's AudioManager to the supervisor so the audio
watcher can keep PulseAudio defaults in sync on every swap (so
typed-replay / record playback follow the same device as the live
Gemini session)."""
self._audio_mgr = audio_mgr
def _open_session_log(self, pid: int):
"""Open (or re-open) the per-day append log file for this session."""
try:
LIVE_LOG_DIR.mkdir(parents=True, exist_ok=True)
fname = f"{LIVE_LOG_NAME}_{datetime.now().strftime('%Y%m%d')}.log"
fh = open(LIVE_LOG_DIR / fname, "a", encoding="utf-8", buffering=1)
fh.write(
f"\n===== live_gemini subprocess start "
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} pid={pid} =====\n"
)
return fh
except Exception as exc:
log.warning("Could not open live-gemini log file: %s", exc)
return None
def _is_noisy(self, line: str) -> bool:
return line.startswith(_NOISY_PREFIXES) or any(f in line for f in _NOISY_FRAGMENTS)
def _set_state(self, state: str, msg: str):
self.state = state
self.state_message = msg
def _track_line(self, line: str):
"""Parse Gemini-specific log markers emitted by `gemini/script.py`.
Must stay in lock-step with the `log.info(...)` strings in
`GeminiBrain`. If you add a new state, add the emit in the brain
AND the matching detector here — in one PR.
"""
if "connecting to Gemini" in line:
self._set_state("connecting", line)
elif "connected — speak anytime" in line or "connected - speak anytime" in line:
self._set_state("listening", "Listening for speech.")
elif " USER: " in line or line.strip().startswith("USER:"):
# GeminiBrain emits: log.info("USER: %s", text)
text = line.split("USER:", 1)[1].strip()
if text:
self.last_user_text = text
self.user_transcript.append(text)
self._set_state("hearing", f"User: {text}")
elif " BOT: " in line or line.strip().startswith("BOT:"):
# GeminiBrain emits: log.info("BOT: %s", text) — Gemini's own
# spoken text. The movement dispatcher (N2) reads this deque to
# match confirmation phrases. Deliberately does NOT change the
# session state (that stays driven by USER / listening markers).
# NOTE: must precede the generic "listening" catch below, else a
# bot line that happens to contain "listening" would be misrouted.
text = line.split("BOT:", 1)[1].strip()
if text:
self.last_bot_text = text
self.bot_transcript.append(text)
# Defer callback firing to the reader loop, OUTSIDE self._lock.
self._pending_bot = text
elif "BARGE-IN" in line or "Gemini interrupted" in line or "interrupt (" in line:
self._set_state("interrupting", line)
elif "listening" in line.lower() and "no speech" not in line:
# Fires on "listening" (post-turn) — keep the state fresh.
self._set_state("listening", "Listening for speech.")
elif "session error" in line or "client recreation failed" in line:
self._set_state("error", line)
elif "server going away" in line or "session ended" in line or "session dead" in line:
self._set_state("warning", line)
elif "keyboard interrupt" in line or "cancelled — stopping" in line:
self._set_state("stopped", line)
def _reader_loop(self):
proc = self.process
if proc is None or proc.stdout is None:
return
# Every line goes to the on-disk log — including the ALSA noise
# that we filter out of the in-memory tail. That way a field
# post-mortem has the full raw capture if we need it.
fh = self._open_session_log(proc.pid)
self._log_file = fh
for line in proc.stdout:
clean = line.rstrip()
if not clean:
continue
if fh is not None:
try:
fh.write(clean + "\n")
except Exception:
pass
fired_bot = None
with self._lock:
if self._is_noisy(clean):
self.suppressed_noise += 1
continue
self.log_tail.append(clean)
self._track_line(clean)
fired_bot = self._pending_bot
self._pending_bot = None
# Fire BOT-text callbacks (movement dispatch) OUTSIDE the lock so a
# slow callback can't stall transcript parsing.
if fired_bot is not None:
for cb in self._bot_callbacks:
try:
cb(fired_bot)
except Exception:
log.exception("bot-text callback failed")
with self._lock:
self.log_tail.append("Live Gemini process exited.")
self._set_state("stopped", "Process exited.")
if fh is not None:
try:
fh.write(
f"===== live_gemini subprocess exit "
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====\n"
)
fh.close()
except Exception:
pass
self._log_file = None
def is_running(self) -> bool:
with self._lock:
return self.process is not None and self.process.poll() is None
def start(self) -> dict[str, Any]:
with self._lock:
if self.process is not None and self.process.poll() is None:
return {"started": False, "message": "Already running.", "pid": self.process.pid}
self._set_state("starting", "Starting...")
script = LIVE_SCRIPT
if not script.exists():
raise RuntimeError(f"Script not found: {script}")
env = os.environ.copy()
env.update({"PYTHONUNBUFFERED": "1", **LIVE_TUNE})
# Pass the current G1 speaker volume as an env var so the
# subprocess can compute the correct barge-in threshold at
# startup. Without this, sanad_voice.py would read the volume
# from a stale or non-existent config file path and default to
# 100, scaling the barge-in threshold wrong for any non-100%
# volume. load_config() reads data/motions/config.json — the
# file the dashboard writes to when the user moves the slider.
try:
from Project.Sanad.config import load_config
_cfg = load_config() or {}
_audio_cfg = _cfg.get("audio") if isinstance(_cfg.get("audio"), dict) else {}
_g1_vol = int(_audio_cfg.get("g1_volume", 100))
_g1_vol = max(0, min(100, _g1_vol))
env["SANAD_G1_VOLUME"] = str(_g1_vol)
log.info("Passing SANAD_G1_VOLUME=%d to subprocess", _g1_vol)
except Exception as exc:
log.warning("Could not read g1_volume for subprocess: %s", exc)
# sanad_voice.py takes the DDS interface as the first positional arg
dds_iface = env.get("SANAD_DDS_INTERFACE", "eth0")
cmd = [sys.executable, str(script), dds_iface]
proc = subprocess.Popen(
cmd,
cwd=str(script.parent),
stdin=subprocess.PIPE, # camera frames + motion state push
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
env=env,
)
# Reap any stale frame forwarder / audio watcher from a previous
# session that ended by a child crash rather than a clean stop() —
# otherwise they'd keep spinning and we'd leak threads per restart.
for stale, stop_evt in (
(self._frame_thread, self._frame_stop),
(self._audio_thread, self._audio_stop),
):
if stale is not None and stale.is_alive():
stop_evt.set()
stale.join(timeout=2.0)
# Capture the boot profile for this session — the audio watcher
# uses it as the fallback when the Anker is unplugged. Env var is
# already in LIVE_TUNE so parent + child agree.
self._boot_profile_id = os.environ.get(
"SANAD_AUDIO_PROFILE", "builtin").strip().lower()
self._last_profile_id = None # force one initial send_profile
with self._lock:
self.process = proc
self.log_tail.append(f"Started: pid={proc.pid}")
self._set_state("starting", f"pid={proc.pid}")
self._reader_thread = threading.Thread(target=self._reader_loop, daemon=True)
self._reader_thread.start()
# Frame forwarder — pushes camera JPEGs to the child over stdin.
self._frame_stop.clear()
self._frame_thread = threading.Thread(
target=self._frame_forwarder, daemon=True, name="gemini-frame-fwd",
)
self._frame_thread.start()
# Audio watcher — polls pactl for Anker presence and signals
# the child to hot-swap mic+speaker when it changes.
self._audio_stop.clear()
self._audio_thread = threading.Thread(
target=self._audio_watcher, daemon=True, name="gemini-audio-watcher",
)
self._audio_thread.start()
log.info("Live Gemini subprocess started: pid=%d", proc.pid)
return {"started": True, "pid": proc.pid}
# ── stdin push channel ────────────────────────────────────
def _send_stdin(self, line: str) -> None:
"""Serialised stdin write — frame forwarder + motion-state handler
both call this from different threads. Best-effort: a closed pipe
or a not-yet-started process is a silent no-op."""
proc = self.process
if proc is None or proc.stdin is None:
return
try:
with self._stdin_lock:
if not proc.stdin.closed:
proc.stdin.write(line)
proc.stdin.flush()
except Exception:
# Pipe broke (child exited) — drop silently; the reader thread
# will surface the exit via state="stopped".
pass
def send_frame(self, jpeg: Union[bytes, str]) -> None:
"""Forward one camera frame to the child as 'frame:<base64>\\n'.
Accepts raw JPEG bytes (base64-encoded here) or an already-base64
ASCII string (e.g. CameraDaemon.get_frame_b64() — no re-encode)."""
if isinstance(jpeg, bytes):
b64 = base64.b64encode(jpeg).decode("ascii")
elif isinstance(jpeg, str):
b64 = jpeg.strip()
else:
return
if b64:
self._send_stdin("frame:" + b64 + "\n")
def send_state(self, event: str, cmd: str,
elapsed_sec: Optional[float] = None,
reason: Optional[str] = None) -> None:
"""Push a motion-state update to the child as 'state:<json>\\n'.
Events: start | complete | interrupted | error. The child injects
'[STATE-...] <cmd>' into the live Gemini session as silent text
context so Gemini can answer "what are you doing?" honestly."""
if not event or not cmd:
return
payload: dict[str, Any] = {"event": event, "cmd": cmd}
if elapsed_sec is not None:
payload["elapsed_sec"] = round(float(elapsed_sec), 2)
if reason:
payload["reason"] = str(reason)[:200]
try:
line = "state:" + json.dumps(payload, ensure_ascii=False) + "\n"
except Exception:
return
self._send_stdin(line)
def _frame_forwarder(self) -> None:
"""Background thread — push the camera's latest frame to the child.
Runs for the lifetime of one subprocess session. Gated on the
camera actually running; the child does its own vision-enabled +
staleness checks, so this stays dumb (camera up → push)."""
cam = self._camera
if cam is None:
return
while not self._frame_stop.is_set():
if self._frame_stop.wait(_FRAME_FORWARD_INTERVAL_S):
break
try:
if not cam.is_running():
continue
b64 = cam.get_frame_b64()
if b64:
self.send_frame(b64)
except Exception:
# Best-effort — never let a frame hiccup kill the thread.
pass
# ── audio profile watcher (parent-side detection) ────────────
def send_profile(self, profile_id: str, reason: str = "") -> None:
"""Push an audio-profile hot-swap command to the child as
'profile:<json>\\n'. The child's _stdin_watcher parses it and
_audio_swap_loop performs the actual mic/speaker rebind. No-op
if the process isn't running or stdin is closed."""
pid = (profile_id or "").strip().lower()
if pid not in {"builtin", "anker", "hollyland_builtin"}:
log.warning("send_profile: ignoring unknown profile %r", profile_id)
return
payload: dict[str, Any] = {"id": pid}
if reason:
payload["reason"] = reason[:120]
try:
line = "profile:" + json.dumps(payload, ensure_ascii=False) + "\n"
except Exception:
return
self._send_stdin(line)
def _audio_watcher(self) -> None:
"""Background thread — poll pactl for the Anker USB device, signal
the child on every plug/unplug edge transition.
Detection reuses voice.audio_devices.detect_plugged_profiles() which
already shells to `pactl list short` and matches against the same
`powerconf,anker` substring AnkerMic uses. Zero new deps.
Edge-only dispatch: we only call send_profile() when the target
flips. Rapid bounce (loose cable) is naturally rate-limited by the
poll interval. After every send_profile we also refresh the parent
audio_manager's PulseAudio defaults so non-live playback (typed
replay, record playback) follows the same device.
"""
# Lazy import — voice.audio_devices is imported at module load to
# check pactl availability without polluting our top-level imports.
try:
from Project.Sanad.voice import audio_devices as _ad
except Exception as exc:
log.warning("audio watcher disabled — audio_devices import failed: %s", exc)
return
try:
if not _ad.pactl_available():
log.warning("audio watcher disabled — pactl not available")
return
except Exception:
# If pactl_available itself isn't exposed, fall through and try
# detect_plugged_profiles — it'll raise/return empty if pactl
# is missing and we handle that below.
pass
boot_profile = self._boot_profile_id or "builtin"
log.info("audio watcher started — Anker→anker, no-Anker→%s (poll=%.1fs)",
boot_profile, _AUDIO_WATCH_INTERVAL_S)
while not self._audio_stop.is_set():
if self._audio_stop.wait(_AUDIO_WATCH_INTERVAL_S):
break
try:
# Recovery script (set_powerconf_audio.sh) is intentionally
# NOT invoked from the watcher — its old card-discovery /
# module-alsa-source attempts loaded the wrong hw device
# on this Jetson and knocked the Anker out of pactl
# entirely (observed 2026-06-03). The script is now a
# passive set-default-sink/source helper meant to be run
# by hand, not from the watcher. The watcher just detects
# plug edges and dispatches profile changes to the child.
plugged = _ad.detect_plugged_profiles()
ids = {p.get("profile", {}).get("id") for p in (plugged or [])}
anker_present = _ANKER_PROFILE_ID in ids
target = "anker" if anker_present else boot_profile
reason = "anker plugged" if anker_present else "anker unplugged"
# Surface which detection path succeeded (Path A vs pactl)
if anker_present:
for p in plugged:
if p.get("profile", {}).get("id") == _ANKER_PROFILE_ID:
via = p.get("source_via", "pactl")
if via != "pactl":
reason += f" via {via}"
break
if target == self._last_profile_id:
continue # edge-only
prev = self._last_profile_id
log.info("audio watcher: %s%s (%s)",
prev or "", target, reason)
self.send_profile(target, reason=reason)
self._last_profile_id = target
# Keep PulseAudio defaults aligned so non-live playback
# follows the same device the live session uses.
if self._audio_mgr is not None:
try:
self._audio_mgr.refresh_devices()
except Exception as exc:
log.warning("audio watcher: refresh_devices failed: %s", exc)
except Exception as exc:
# Never let a transient pactl glitch kill the thread.
log.warning("audio watcher iteration failed: %s", exc)
def stop(self) -> dict[str, Any]:
with self._lock:
proc = self.process
if proc is None or proc.poll() is not None:
return {"stopped": False, "message": "Not running."}
self._set_state("stopping", "Stopping...")
# Halt forwarder + audio watcher before we tear the pipe down.
self._frame_stop.set()
self._audio_stop.set()
ft = self._frame_thread
if ft is not None:
ft.join(timeout=2.0)
self._frame_thread = None
at = self._audio_thread
if at is not None:
at.join(timeout=2.0)
self._audio_thread = None
try:
proc.send_signal(signal.SIGINT)
proc.wait(timeout=_STOP_TIMEOUT_SEC)
except subprocess.TimeoutExpired:
proc.terminate()
try:
proc.wait(timeout=_TERMINATE_TIMEOUT_SEC)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=_TERMINATE_TIMEOUT_SEC)
rc = proc.returncode
# Close stdin/stdout explicitly — without this each start/stop
# cycle leaks FDs (relied on Popen.__del__ which only runs at GC;
# a reconnect loop would march the FD count to the OS limit).
for pipe in (getattr(proc, "stdin", None), getattr(proc, "stdout", None)):
if pipe is not None:
try:
pipe.close()
except Exception:
pass
with self._lock:
self.process = None
self.log_tail.append("Stopped.")
self._set_state("stopped", "Stopped.")
log.info("Live Gemini subprocess stopped (rc=%s)", rc)
return {"stopped": True, "returncode": rc}
def status(self) -> dict[str, Any]:
with self._lock:
running = self.process is not None and self.process.poll() is None
return {
"running": running,
"pid": self.process.pid if running and self.process else None,
"state": self.state,
"state_message": self.state_message,
"last_user_text": self.last_user_text,
"last_bot_text": self.last_bot_text,
"user_transcript": list(self.user_transcript),
"bot_transcript": list(self.bot_transcript),
"log_tail": list(self.log_tail),
"suppressed_noise": self.suppressed_noise,
}