#!/usr/bin/env python3 """Voice/gemini_runner.py — Gemini Live S2S subprocess (Option 3). Runs in a Python 3.10+ env (the `gemini_sdk` conda env on this Jetson) so it can import `google-genai` (which doesn't support Python 3.8). The marcus env itself is pinned to Python 3.8 by the NVIDIA Jetson torch wheel, so Gemini has to live in its own process. This is the full Sanad-pattern speech-to-speech variant: - response_modalities=["AUDIO"] → Gemini speaks back through G1 speaker - input_audio_transcription → user transcripts emitted on stdout for Marcus's wake-word side channel - output_audio_transcription → Gemini's reply text logged for review - barge-in detection → user speaking over AI cuts AI off - echo suppression → mic muted during AI playback - JPEG camera frames over stdin → Marcus parent forwards frames; runner streams them to Gemini Live so the vision answers ("what do you see") are correct, not hallucinated. The runner owns the G1 mic AND the G1 speaker (unitree_sdk2py works fine in the gemini_sdk env on this Jetson — already verified). ──────────────────────────────────────────────────────────────────────── Stdout protocol (one JSON object per line, UTF-8): {"type":"ready"} session connected {"type":"user", "text":"..."} user input transcription {"type":"bot", "text":"..."} Gemini's reply text (logged only — Gemini also speaks it) {"type":"turn_end"} {"type":"barge_in"} {"type":"reconnect", "reason":"..."} {"type":"log", "level":"info|warn|error", "msg":"..."} Stdin protocol (line-based): "stop\n" graceful shutdown "flush\n" drop mic buffer (echo prevention) "frame:\n" forward a camera frame to Gemini Live (Marcus parent throttles to ~2 fps) "state:\n" motion state update from Marcus's worker. JSON: {"event":"start|complete|interrupted|error", "cmd":"", "elapsed_sec": float (optional), "reason": "" (error only)}. Runner injects '[STATE-...] ' into the live session as silent text context — Gemini reads it for honest answers to "what are you doing?" but does NOT speak about it unprompted (per persona Rule 9). ──────────────────────────────────────────────────────────────────────── Env vars: MARCUS_GEMINI_API_KEY (or SANAD_GEMINI_API_KEY) MARCUS_GEMINI_MODEL (optional) MARCUS_GEMINI_VOICE (optional) MARCUS_PROJECT_ROOT (optional) """ from __future__ import annotations import array import asyncio import base64 import json import os import signal import sys import threading import time from typing import Any import numpy as np _PROJECT_ROOT = ( os.environ.get("MARCUS_PROJECT_ROOT") or os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from Voice.audio_io import BuiltinMic, BuiltinSpeaker from Voice.turn_recorder import TurnRecorder try: from Core.config_loader import load_config _VCFG = load_config("Voice") or {} except Exception: _VCFG = {} _STT = _VCFG.get("stt", {}) _SPK = _VCFG.get("speaker", {}) # ─── stdout / stderr helpers ───────────────────────────────────── _stdout_lock = threading.Lock() def emit(payload: dict) -> None: line = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) with _stdout_lock: sys.stdout.write(line + "\n") sys.stdout.flush() def log(level: str, msg: str) -> None: emit({"type": "log", "level": level, "msg": msg}) # ─── stdin watcher (graceful shutdown + flush + frames) ────────── _STOP_REQUESTED = threading.Event() _MIC_HOLDER: list = [] # [BuiltinMic] when active # Latest JPEG frame from the parent (raw bytes). The frame-send loop # picks this up and ships it to Gemini Live. _LATEST_FRAME_LOCK = threading.Lock() _LATEST_FRAME: dict = {"bytes": None, "ts": 0.0} # Pending motion-state updates to inject into Gemini Live as silent # text context. The stdin watcher (running in a regular thread) drops # parsed payloads here; an asyncio task in the main session loop # drains them and calls session.send_realtime_input(text=...). _STATE_LOCK = threading.Lock() _STATE_PENDING: list = [] # list[str] of formatted '[STATE-...] ...' lines def _stdin_watcher() -> None: try: for line in sys.stdin: line = line.rstrip("\n") if not line: continue # Match the cheap commands first, then fall through to frame: cmd = line.lower() if cmd == "stop": log("info", "stop received from parent — exiting") _STOP_REQUESTED.set() return elif cmd == "flush": if _MIC_HOLDER: try: _MIC_HOLDER[0].flush() except Exception: pass elif line.startswith("frame:"): b64 = line[len("frame:"):] try: data = base64.b64decode(b64) except Exception: continue if data: with _LATEST_FRAME_LOCK: _LATEST_FRAME["bytes"] = data _LATEST_FRAME["ts"] = time.time() elif line.startswith("state:"): # Motion state update from Marcus's worker. Parse + format # for Gemini Live; the actual session.send_realtime_input # call happens in an asyncio task that drains _STATE_PENDING. try: payload = json.loads(line[len("state:"):]) except Exception: continue event = (payload.get("event") or "").strip().lower() cmd = (payload.get("cmd") or "").strip() if not event or not cmd: continue tag_map = { "start": "[STATE-START]", "complete": "[STATE-DONE]", "interrupted": "[STATE-INTERRUPTED]", "error": "[STATE-ERROR]", "paused": "[STATE-PAUSED]", "resumed": "[STATE-RESUMED]", } tag = tag_map.get(event) if tag is None: continue msg = "{} {}".format(tag, cmd) elapsed = payload.get("elapsed_sec") if isinstance(elapsed, (int, float)): msg += " ({:.1f}s)".format(float(elapsed)) reason = payload.get("reason") if reason and event == "error": msg += " — {}".format(reason) with _STATE_LOCK: _STATE_PENDING.append(msg) except Exception: return threading.Thread(target=_stdin_watcher, daemon=True, name="stdin-watcher").start() def _install_signal_handlers() -> None: def _handle(_signum, _frame): log("info", "signal received — exiting") _STOP_REQUESTED.set() for sig in (signal.SIGTERM, signal.SIGINT): try: signal.signal(sig, _handle) except Exception: pass # ─── tunables ──────────────────────────────────────────────────── _MODEL = os.environ.get( "MARCUS_GEMINI_MODEL", _STT.get("gemini_model", "gemini-2.5-flash-native-audio-preview-12-2025"), ) _VOICE = os.environ.get( "MARCUS_GEMINI_VOICE", _STT.get("gemini_voice_name", "Charon"), ) _API_KEY = ( os.environ.get("MARCUS_GEMINI_API_KEY") or os.environ.get("SANAD_GEMINI_API_KEY") or _STT.get("gemini_api_key", "") ) _MIC_GAIN = float(_STT.get("mic_gain", 1.0)) _SESSION_TIMEOUT = float(_STT.get("gemini_session_timeout_sec", 660)) _MAX_RECONNECT_DELAY = float(_STT.get("gemini_max_reconnect_delay_sec", 30)) _MAX_CONSECUTIVE_ERRORS = int(_STT.get("gemini_max_consecutive_errors", 10)) _NO_MESSAGES_TIMEOUT = float(_STT.get("gemini_no_messages_timeout_sec", 30)) SEND_SAMPLE_RATE = int(_STT.get("gemini_send_sample_rate", 16000)) RECEIVE_SAMPLE_RATE = int(_STT.get("gemini_receive_sample_rate", 24000)) CHUNK_SIZE = int(_STT.get("gemini_chunk_size", 512)) _CHUNK_BYTES = CHUNK_SIZE * 2 _SILENCE_PCM = b"\x00" * _CHUNK_BYTES _BARGE_THRESHOLD = int(_STT.get("gemini_barge_threshold", 500)) _BARGE_CHUNKS = int(_STT.get("gemini_barge_loud_chunks_needed", 3)) _BARGE_COOLDOWN = float(_STT.get("gemini_barge_cooldown_sec", 0.3)) _ECHO_SUPPRESS_BELOW = int(_STT.get("gemini_echo_suppress_below", 500)) _AI_GRACE = float(_STT.get("gemini_ai_speak_grace_sec", 0.15)) _FRAME_INTERVAL = float(_STT.get("gemini_frame_interval_sec", 0.5)) _FRAME_MAX_AGE = float(_STT.get("gemini_frame_max_age_sec", 1.5)) _REC_ENABLED = bool(_STT.get("gemini_record_enabled", False)) _REC_KEEP = int(_STT.get("gemini_record_keep_count", 50)) _DATA_DIR = os.path.join( _PROJECT_ROOT, _VCFG.get("audio", {}).get("data_dir", "Data/Voice/Recordings"), "gemini_turns", ) _SYS_PROMPT = ( os.environ.get("MARCUS_GEMINI_SYSTEM_PROMPT") or _STT.get( "gemini_system_prompt", "You are Sanad, a humanoid robot assistant. Reply briefly.", ) ) _SP_FILE = _STT.get("gemini_system_prompt_file", "") if _SP_FILE: _sp_path = ( _SP_FILE if os.path.isabs(_SP_FILE) else os.path.join(_PROJECT_ROOT, _SP_FILE) ) try: with open(_sp_path, "r", encoding="utf-8") as f: txt = f.read().strip() if txt: _SYS_PROMPT = txt except Exception: pass def _audio_energy(pcm: bytes) -> int: try: samples = array.array("h", pcm) return sum(abs(s) for s in samples) // len(samples) if samples else 0 except Exception: return 0 # ─── G1 speaker (audio playback) — initialized in main_async ──── def _init_g1_speaker() -> BuiltinSpeaker | None: """Initialise the G1 DDS audio client and wrap it in a BuiltinSpeaker. Sanad's audio_io.BuiltinSpeaker normally takes an already-initialised AudioClient. This subprocess owns its own DDS init. """ try: from unitree_sdk2py.core.channel import ChannelFactoryInitialize from unitree_sdk2py.g1.audio.g1_audio_client import AudioClient except Exception as e: log("error", f"unitree_sdk2py not importable in this env: {e}") return None iface = _SPK.get("dds_interface", "eth0") try: ChannelFactoryInitialize(0, iface) except Exception as e: # Already initialised in this process — that's fine. log("warn", f"ChannelFactoryInitialize: {e}") try: ac = AudioClient() ac.SetTimeout(10.0) ac.Init() try: ac.SetVolume(int(_SPK.get("volume", 100))) except Exception: pass except Exception as e: log("error", f"AudioClient init failed: {e}") return None return BuiltinSpeaker(ac, app_name=_SPK.get("app_name", "sanad")) # ─── per-session state (reset on each connect) ─────────────────── class _Session: def __init__(self): self.speaking = False self.stream_started = False self.barge_block_until = 0.0 self.ai_speak_start = 0.0 self.last_ai_audio = 0.0 # ─── main async loop ───────────────────────────────────────────── def _build_config(types): vad_start = _STT.get("gemini_vad_start_sensitivity", "START_SENSITIVITY_HIGH") vad_end = _STT.get("gemini_vad_end_sensitivity", "END_SENSITIVITY_LOW") prefix_ms = int(_STT.get("gemini_vad_prefix_padding_ms", 20)) silence_ms = int(_STT.get("gemini_vad_silence_duration_ms", 200)) return types.LiveConnectConfig( response_modalities=["AUDIO"], speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig( voice_name=_VOICE, ), ), ), realtime_input_config=types.RealtimeInputConfig( automatic_activity_detection=types.AutomaticActivityDetection( disabled=False, start_of_speech_sensitivity=getattr(types.StartSensitivity, vad_start), end_of_speech_sensitivity=getattr(types.EndSensitivity, vad_end), prefix_padding_ms=prefix_ms, silence_duration_ms=silence_ms, ), ), input_audio_transcription=types.AudioTranscriptionConfig(), output_audio_transcription=types.AudioTranscriptionConfig(), system_instruction=types.Content( parts=[types.Part(text=_SYS_PROMPT)], ), ) async def _send_mic_loop(session, types_mod, mic, speaker, recorder, sess: _Session, done: asyncio.Event) -> None: loop = asyncio.get_event_loop() frame_pause = CHUNK_SIZE / float(SEND_SAMPLE_RATE) loud_count = 0 last_activity = time.time() while not done.is_set() and not _STOP_REQUESTED.is_set(): try: raw = await loop.run_in_executor(None, mic.read_chunk, _CHUNK_BYTES) except Exception as e: log("warn", f"mic read failed: {e}") break if not raw: await asyncio.sleep(frame_pause) continue if _MIC_GAIN != 1.0: samples = np.frombuffer(raw, dtype=np.int16).astype(np.float32) samples = np.clip(samples * _MIC_GAIN, -32768, 32767).astype(np.int16) raw = samples.tobytes() energy = _audio_energy(raw) now = time.time() # Barge-in detection — sustained user energy interrupts the AI. if sess.speaking and now >= sess.barge_block_until: if (now - sess.ai_speak_start) >= _AI_GRACE: if energy > _BARGE_THRESHOLD: loud_count += 1 else: loud_count = max(0, loud_count - 1) if loud_count > _BARGE_CHUNKS: log("info", f"BARGE-IN (e={energy})") emit({"type": "barge_in"}) sess.speaking = False sess.stream_started = False try: speaker.stop() except Exception: pass try: mic.flush() except Exception: pass try: recorder.finish_turn() except Exception: pass loud_count = 0 sess.barge_block_until = now + _BARGE_COOLDOWN # Echo suppression — while AI speaks, mute quiet mic frames so the # mic doesn't feed Gemini its own voice. send_data = raw if sess.speaking and energy < _ECHO_SUPPRESS_BELOW: send_data = _SILENCE_PCM # Capture user audio for the per-turn WAV (only when user actually speaks). if energy > 250 and not sess.speaking: try: recorder.capture_user(raw) except Exception: pass if energy > 250: last_activity = now elif now - last_activity > 10: # Show actual energy + a short rolling sample of recent peaks so # "no speech" diagnostics reveal whether the mic is silent (e≈0, # mic broken / multicast not delivering) vs picking up faint # ambient (e≈50-200, mic OK but user too far / mic_gain too low). log("info", f"alive (no speech {now - last_activity:.0f}s, e={energy})") last_activity = now try: await session.send_realtime_input( audio=types_mod.Blob( data=send_data, mime_type=f"audio/pcm;rate={SEND_SAMPLE_RATE}", ), ) except asyncio.CancelledError: return except Exception as e: log("warn", f"mic send failed: {e}") done.set() return await asyncio.sleep(frame_pause) async def _send_frame_loop(session, types_mod, done: asyncio.Event) -> None: """Periodically push the latest cached camera frame (JPEG) to Gemini Live.""" while not done.is_set() and not _STOP_REQUESTED.is_set(): await asyncio.sleep(_FRAME_INTERVAL) with _LATEST_FRAME_LOCK: data = _LATEST_FRAME.get("bytes") ts = _LATEST_FRAME.get("ts", 0.0) if not data: continue if (time.time() - ts) > _FRAME_MAX_AGE: # Stale — don't waste tokens streaming a frame the camera abandoned. continue try: await session.send_realtime_input( video=types_mod.Blob(data=data, mime_type="image/jpeg"), ) except asyncio.CancelledError: return except Exception as e: log("warn", f"frame send failed: {e}") # Keep going — frames are best-effort. async def _send_state_loop(session, done: asyncio.Event) -> None: """Drain _STATE_PENDING and inject each line into Gemini Live as silent text context. This is what makes Gemini aware of the robot's actual motion state — start/complete/interrupted/error events from Marcus's motion worker. Persona Rule 9 instructs Gemini to read these for context but NOT speak about them unprompted; mention only when the user asks 'what are you doing?'. Polls at 10 Hz which is plenty — state events arrive at most a few per second and they're tiny. send_realtime_input(text=...) is fire-and-forget; we log+swallow errors and keep draining.""" while not done.is_set() and not _STOP_REQUESTED.is_set(): await asyncio.sleep(0.1) with _STATE_LOCK: if not _STATE_PENDING: continue pending = list(_STATE_PENDING) _STATE_PENDING.clear() for msg in pending: try: await session.send_realtime_input(text=msg) log("info", f"STATE injected: {msg}") except asyncio.CancelledError: return except Exception as e: # Some Gemini Live SDK versions may not accept text on # send_realtime_input. Log once and drop — motion still # works fine, only the live-state-to-Gemini channel is # missing. Marcus side and dispatcher are unaffected. log("warn", f"state inject failed: {e}") async def _receive_loop(session, speaker, recorder, sess: _Session, done: asyncio.Event) -> None: loop = asyncio.get_event_loop() last_recv = time.time() try: while not done.is_set() and not _STOP_REQUESTED.is_set(): async for response in session.receive(): last_recv = time.time() if done.is_set(): break if (hasattr(response, "go_away") and getattr(response, "go_away", None) is not None): emit({"type": "reconnect", "reason": "server go_away"}) done.set() return sc = getattr(response, "server_content", None) if sc is None: continue if getattr(sc, "interrupted", False) is True: if sess.speaking: log("info", "Gemini interrupted by server") sess.speaking = False sess.stream_started = False try: speaker.stop() except Exception: pass try: recorder.finish_turn() except Exception: pass continue it = getattr(sc, "input_transcription", None) if it is not None: text = (getattr(it, "text", "") or "").strip() if text and not sess.speaking: emit({"type": "user", "text": text}) try: recorder.add_user_text(text) except Exception: pass ot = getattr(sc, "output_transcription", None) if ot is not None: text = (getattr(ot, "text", "") or "").strip() if text: emit({"type": "bot", "text": text}) try: recorder.add_robot_text(text) except Exception: pass mt = getattr(sc, "model_turn", None) if mt is not None: for part in getattr(mt, "parts", []) or []: inl = getattr(part, "inline_data", None) if inl is not None and getattr(inl, "data", None): now = time.time() if not sess.speaking: sess.ai_speak_start = now sess.speaking = True sess.last_ai_audio = now raw_audio = inl.data try: recorder.capture_robot(raw_audio) except Exception: pass audio_arr = np.frombuffer(raw_audio, dtype=np.int16) if not sess.stream_started: await loop.run_in_executor(None, speaker.begin_stream) sess.stream_started = True await loop.run_in_executor( None, speaker.send_chunk, audio_arr, RECEIVE_SAMPLE_RATE, ) if getattr(sc, "turn_complete", False): if sess.speaking and sess.stream_started and not speaker.interrupted: log("info", f"speaker {speaker.total_sent_sec:.1f}s") await loop.run_in_executor(None, speaker.wait_finish) elif sess.speaking and speaker.interrupted: log("info", "speaker interrupted") sess.speaking = False sess.stream_started = False try: # Drop any echo of the just-played reply. _MIC_HOLDER and _MIC_HOLDER[0].flush() except Exception: pass try: recorder.finish_turn() except Exception: pass emit({"type": "turn_end"}) if time.time() - last_recv > _NO_MESSAGES_TIMEOUT: log("warn", f"no messages from Gemini for {_NO_MESSAGES_TIMEOUT:.0f}s") break await asyncio.sleep(0.1) except asyncio.CancelledError: return except Exception as e: log("warn", f"receive ended: {e}") finally: done.set() async def main_async() -> int: if not _API_KEY: log("error", "no Gemini API key (set MARCUS_GEMINI_API_KEY)") return 3 try: from google import genai from google.genai import types except Exception as e: log("error", f"google-genai not importable: {e}") return 2 try: client = genai.Client(api_key=_API_KEY) except Exception as e: log("error", f"failed to create Gemini client: {e}") return 4 config = _build_config(types) mic = BuiltinMic() mic.start() _MIC_HOLDER.append(mic) speaker = _init_g1_speaker() if speaker is None: log("error", "G1 speaker not available — exiting") try: mic.stop() except Exception: pass return 5 recorder = TurnRecorder( enabled=_REC_ENABLED, out_dir=_DATA_DIR, user_rate=SEND_SAMPLE_RATE, robot_rate=RECEIVE_SAMPLE_RATE, keep_count=_REC_KEEP, ) session_num = 0 consecutive_errors = 0 start = time.time() try: while not _STOP_REQUESTED.is_set(): session_num += 1 uptime_min = (time.time() - start) / 60 try: log("info", f"connecting (session #{session_num}, uptime {uptime_min:.0f}m)") async with client.aio.live.connect(model=_MODEL, config=config) as session: emit({"type": "ready"}) consecutive_errors = 0 mic.flush() sess = _Session() done = asyncio.Event() try: await asyncio.wait_for( asyncio.gather( _send_mic_loop(session, types, mic, speaker, recorder, sess, done), _send_frame_loop(session, types, done), _send_state_loop(session, done), _receive_loop(session, speaker, recorder, sess, done), ), timeout=_SESSION_TIMEOUT, ) except asyncio.TimeoutError: log("info", f"session timed out after {_SESSION_TIMEOUT:.0f}s") except asyncio.CancelledError: pass log("info", f"session #{session_num} ended — reconnecting in 1s") try: speaker.stop() except Exception: pass try: mic.flush() except Exception: pass if _STOP_REQUESTED.is_set(): break await asyncio.sleep(1) except asyncio.CancelledError: break except Exception as e: consecutive_errors += 1 delay = min(_MAX_RECONNECT_DELAY, 2 ** consecutive_errors) log("error", f"session error #{consecutive_errors}: {e} — retry in {delay:.0f}s") try: await asyncio.sleep(delay) except asyncio.CancelledError: break if consecutive_errors >= _MAX_CONSECUTIVE_ERRORS: log("warn", f"{consecutive_errors} consecutive errors — recreating client") try: client = genai.Client(api_key=_API_KEY) consecutive_errors = 0 except Exception as ce: log("error", f"client recreation failed: {ce}") finally: try: mic.stop() except Exception: pass try: speaker.stop() except Exception: pass return 0 def main() -> int: _install_signal_handlers() try: return asyncio.run(main_async()) except KeyboardInterrupt: return 0 except Exception as e: log("error", f"fatal: {e}") return 4 if __name__ == "__main__": sys.exit(main())