Marcus/Voice/gemini_script.py

357 lines
14 KiB
Python

"""Voice/gemini_script.py — subprocess manager for Gemini Live STT.
Runs in marcus's Python 3.8 env. The actual Gemini STT lives in
[Voice/gemini_runner.py](Voice/gemini_runner.py) which has to run in a
Python 3.10+ env (e.g. the `gemini_sdk` conda env on the Jetson) because
`google-genai` doesn't support Python 3.8.
This file spawns the runner as a subprocess, reads JSON-line transcripts
off its stdout, and turns them into the same `on_transcript` / `on_command`
callbacks the rest of marcus expects. The external API of class
`GeminiBrain` is unchanged from the previous in-process port — drop-in
swap for `Voice/marcus_voice.py::_voice_loop_gemini`.
Sanad uses the same subprocess pattern (its own `live_voice_loop.py`
parses log lines from a Gemini subprocess), so this matches Sanad's
architecture not just in mechanism but in shape.
────────────────────────────────────────────────────────────────────────
Subprocess lookup order for the Python 3.10+ binary:
1. env MARCUS_GEMINI_PYTHON (highest priority)
2. config stt.gemini_python_path
3. auto-detect — try a list of common conda env paths
4. raise — explicit error in voice.log
────────────────────────────────────────────────────────────────────────
"""
from __future__ import annotations
import base64
import json
import logging
import os
import subprocess
import sys
import threading
from typing import Callable, Optional, Union
log = logging.getLogger("gemini_brain")
# Candidate conda-env paths for the Python 3.10+ binary. Override with
# MARCUS_GEMINI_PYTHON or stt.gemini_python_path if the env lives elsewhere.
_DEFAULT_CANDIDATES = [
"~/miniconda3/envs/gemini_sdk/bin/python",
"~/anaconda3/envs/gemini_sdk/bin/python",
"~/.miniconda3/envs/gemini_sdk/bin/python",
"/opt/conda/envs/gemini_sdk/bin/python",
"~/miniconda3/envs/sanad/bin/python",
"~/anaconda3/envs/sanad/bin/python",
]
def _resolve_runner_python(stt_cfg: dict) -> str:
"""Find the Python 3.10+ binary that can import google-genai."""
explicit = (
os.environ.get("MARCUS_GEMINI_PYTHON")
or stt_cfg.get("gemini_python_path", "")
)
if explicit:
path = os.path.expanduser(explicit)
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
raise FileNotFoundError(
"MARCUS_GEMINI_PYTHON / stt.gemini_python_path = "
"{!r} but that binary does not exist or is not executable".format(path)
)
for cand in _DEFAULT_CANDIDATES:
path = os.path.expanduser(cand)
if os.path.isfile(path) and os.access(path, os.X_OK):
log.info("auto-detected gemini-runner python at %s", path)
return path
raise FileNotFoundError(
"no Python 3.10+ env found for the Gemini runner. Set env "
"MARCUS_GEMINI_PYTHON to the path of a conda env's python with "
"`google-genai` installed (e.g. ~/miniconda3/envs/gemini_sdk/bin/python)."
)
class GeminiBrain:
"""Subprocess-managing wrapper around Voice/gemini_runner.py.
External API kept identical to the in-process version so callers don't
care that Gemini lives in another Python:
brain = GeminiBrain(audio_io, recorder, voice_name, system_prompt,
api_key=..., on_transcript=cb1, on_command=cb2)
brain.start()
...
brain.stop()
`audio_io` and `recorder` are accepted for API parity but unused —
the subprocess owns its own mic and writes its own WAVs (one process
owning the whole audio path is simpler than streaming PCM over a pipe).
"""
def __init__(
self,
audio_io, # ignored (runner owns its own)
recorder, # ignored (runner owns its own)
voice_name=None, # forwarded via env
system_prompt="", # forwarded via env (or config)
*,
api_key: str = "",
on_transcript: Optional[Callable[[str], None]] = None,
on_command: Optional[Callable[[str, str], None]] = None,
on_bot_text: Optional[Callable[[str], None]] = None,
on_turn_end: Optional[Callable[[], None]] = None,
):
self._voice_name = voice_name or ""
self._system_prompt = system_prompt or ""
self._api_key = api_key
self._on_transcript = on_transcript
self._on_command = on_command
self._on_bot_text = on_bot_text
self._on_turn_end = on_turn_end
self._proc = None # type: Optional[subprocess.Popen]
self._reader_thread = None # type: Optional[threading.Thread]
self._err_thread = None # type: Optional[threading.Thread]
self._stopping = False
self._stdin_lock = threading.Lock() # serialise stdin writes
# config-loaded lazily so import order doesn't matter
try:
from Core.config_loader import load_config
cfg = load_config("Voice") or {}
except Exception:
cfg = {}
self._stt = cfg.get("stt", {})
# ─── lifecycle ────────────────────────────────────────
def start(self) -> None:
if self._proc is not None and self._proc.poll() is None:
log.warning("GeminiBrain subprocess already running")
return
self._stopping = False
try:
python_bin = _resolve_runner_python(self._stt)
except FileNotFoundError as e:
log.error("%s", e)
return
runner = os.path.abspath(
os.path.join(os.path.dirname(__file__), "gemini_runner.py")
)
if not os.path.isfile(runner):
log.error("gemini_runner.py not found at %s", runner)
return
env = os.environ.copy()
if self._api_key:
env["MARCUS_GEMINI_API_KEY"] = self._api_key
if self._voice_name:
env["MARCUS_GEMINI_VOICE"] = self._voice_name
# Forward the system prompt via env so the runner doesn't have to
# re-read the JSON file (and so a trimmed inline string survives).
if self._system_prompt:
env["MARCUS_GEMINI_SYSTEM_PROMPT"] = self._system_prompt
env["MARCUS_PROJECT_ROOT"] = os.path.dirname(os.path.dirname(runner))
log.info("spawning gemini runner: %s -u %s", python_bin, runner)
try:
self._proc = subprocess.Popen(
[python_bin, "-u", runner],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(os.path.dirname(runner)),
env=env,
bufsize=1,
universal_newlines=True,
)
except Exception as e:
log.error("failed to spawn gemini runner: %s", e)
self._proc = None
return
self._reader_thread = threading.Thread(
target=self._stdout_reader, daemon=True, name="gemini-stdout",
)
self._reader_thread.start()
self._err_thread = threading.Thread(
target=self._stderr_reader, daemon=True, name="gemini-stderr",
)
self._err_thread.start()
def flush_mic(self) -> None:
"""Tell the runner to drop its mic buffer (echo prevention)."""
self._send_stdin("flush\n")
def send_frame(self, jpeg: Union[bytes, str]) -> None:
"""
Forward a single camera frame (JPEG) to the runner so it can stream
it to Gemini Live. The runner throttles + de-stales internally.
`jpeg` may be raw bytes (e.g. from cv2.imencode) OR an already-
base64 ASCII string (e.g. from API.camera_api.get_frame()).
"""
proc = self._proc
if proc is None or proc.stdin is None:
return
if isinstance(jpeg, bytes):
b64 = base64.b64encode(jpeg).decode("ascii")
elif isinstance(jpeg, str):
b64 = jpeg.strip()
else:
return
if not b64:
return
self._send_stdin("frame:" + b64 + "\n")
def send_state(self, event: str, cmd: str,
elapsed_sec: Optional[float] = None,
reason: Optional[str] = None) -> None:
"""
Push a motion state update to the runner so Gemini Live can
track what the robot is actually doing. Events:
'start' — worker dequeued cmd, brain about to run
'complete' — brain returned and motion_abort was clear
'interrupted' — brain returned but motion_abort was set
'error' — brain raised an exception (`reason` set)
The runner formats this as '[STATE-...] <cmd>' and injects it
into the live session as silent text context. Best-effort —
no-op if runner not yet started or stdin closed.
"""
proc = self._proc
if proc is None or proc.stdin is None:
return
if not event or not cmd:
return
payload = {"event": event, "cmd": cmd}
if elapsed_sec is not None:
payload["elapsed_sec"] = round(float(elapsed_sec), 2)
if reason:
payload["reason"] = str(reason)[:200]
try:
line = "state:" + json.dumps(payload, ensure_ascii=False) + "\n"
except Exception:
return
self._send_stdin(line)
def _send_stdin(self, line: str) -> None:
"""Serialised stdin write — multiple threads (frame sender + flush) can call safely."""
proc = self._proc
if proc is None or proc.stdin is None:
return
try:
with self._stdin_lock:
if not proc.stdin.closed:
proc.stdin.write(line)
proc.stdin.flush()
except Exception:
pass
def stop(self) -> None:
self._stopping = True
proc = self._proc
if proc is None:
return
# Polite stop: send "stop\n" on stdin, then wait briefly, then SIGTERM.
self._send_stdin("stop\n")
try:
proc.wait(timeout=3)
except Exception:
try:
proc.terminate()
except Exception:
pass
try:
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
self._proc = None
# ─── stdout / stderr drainers ─────────────────────────
def _stdout_reader(self) -> None:
proc = self._proc
if proc is None or proc.stdout is None:
return
for line in proc.stdout:
if self._stopping:
break
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except Exception:
# Non-JSON line — log it raw so we can debug runner crashes.
log.warning("gemini-runner stdout (non-JSON): %s", line[:200])
continue
self._handle_msg(msg)
def _stderr_reader(self) -> None:
proc = self._proc
if proc is None or proc.stderr is None:
return
for line in proc.stderr:
line = line.rstrip()
if line:
log.warning("gemini-runner stderr: %s", line[:200])
def _handle_msg(self, msg: dict) -> None:
t = msg.get("type")
if t == "user":
text = (msg.get("text") or "").strip()
if not text:
return
log.info("USER: %s", text)
if self._on_transcript is not None:
try:
self._on_transcript(text)
except Exception as e:
log.error("on_transcript failed: %s", e)
if self._on_command is not None:
try:
self._on_command(text, "en")
except Exception as e:
log.error("on_command failed: %s", e)
elif t == "bot":
txt = (msg.get("text") or "").strip()
if txt:
log.info("GEMINI: %s", txt[:120])
if self._on_bot_text is not None:
try:
self._on_bot_text(txt)
except Exception as e:
log.error("on_bot_text failed: %s", e)
elif t == "turn_end":
log.info("listening")
if self._on_turn_end is not None:
try:
self._on_turn_end()
except Exception as e:
log.error("on_turn_end failed: %s", e)
elif t == "ready":
log.info("connected — listening for speech")
elif t == "reconnect":
log.info("server signalled reconnect: %s", msg.get("reason", ""))
elif t == "log":
level = msg.get("level", "info")
text = msg.get("msg", "")
if level == "error":
log.error("[runner] %s", text)
elif level == "warn":
log.warning("[runner] %s", text)
else:
log.info("[runner] %s", text)
else:
log.debug("gemini-runner unknown type=%r: %s", t, msg)