#!/usr/bin/env python3 """ voice_sanad.py (PRODUCTION) - Starts photo server ONCE and keeps it alive. - Initializes DDS ONCE. - Keeps core components alive (mic/speaker/receive/trigger/autonomous). - Uses a dedicated WS supervisor loop that reconnects only the Gemini channel. """ import asyncio import json import os import random import time from pathlib import Path import websockets from Core.Logger import Logs from Core import settings as config from Core.settings import ( PHOTOS_DIR, PHOTO_SERVER_PORT, MODEL, VOICE_NAME, GEMINI_API_KEY, PHOTO_TOTAL_SEC, PHOTO_DELAY_SEC, load_system_prompt, ) from Server.photo_server import start_photo_server from Gemini.gemini_voice import HamadGeminiVoice sanad_logger = Logs() sanad_logger.LogEngine("G1_Logs", "voice_sanad") API_KEY = GEMINI_API_KEY.strip() if not API_KEY: raise RuntimeError(f"Gemini API key is empty. Set gemini.api_key in {config.CONFIG_JSON}.") URI = ( "wss://generativelanguage.googleapis.com/ws/" "google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent" f"?key={API_KEY}" ) RUNTIME_HEALTH_FILE = Path(config.RUNTIME_HEALTH_FILE) INTERACTION_FLAG_FILE = Path(config.SCRIPTS_DIR) / "interaction_triggered.flag" def _build_setup_message() -> dict: system_prompt = load_system_prompt() return { "setup": { "model": MODEL, "generationConfig": { "responseModalities": ["AUDIO"], "speechConfig": { "voiceConfig": {"prebuiltVoiceConfig": {"voiceName": VOICE_NAME}} }, }, "inputAudioTranscription": {}, "systemInstruction": {"parts": [{"text": system_prompt}]}, } } def _read_runtime_mode_safe() -> str: try: mode = str(config.read_runtime_mode()).strip().lower() except Exception: mode = "manual" return mode if mode in ("manual", "ai") else "manual" def _interaction_active_now() -> bool: try: return INTERACTION_FLAG_FILE.exists() except Exception: return False def _apply_idle_audio_policy(voice: HamadGeminiVoice, autonomous_enabled: bool): runtime_mode = _read_runtime_mode_safe() idle_voice_listen_enabled = bool(config.read_vision_idle_voice_listen_enabled()) mic_enabled = bool(config.read_gemini_mic_enabled()) if not mic_enabled: voice.set_passive_listen(False, reason="mic disabled") voice.set_audio_gate(False, reason="mic disabled") return if runtime_mode != "ai": voice.set_passive_listen(False, reason=f"{runtime_mode} mode") voice.set_audio_gate(True, reason=f"{runtime_mode} mode") return if autonomous_enabled: if idle_voice_listen_enabled: voice.set_passive_listen(True, reason="autonomous idle") voice.set_audio_gate(True, reason="autonomous idle listen") else: voice.set_passive_listen(False, reason="autonomous idle") voice.set_audio_gate(False, reason="autonomous idle") return voice.set_passive_listen(False, reason="ai mode") voice.set_audio_gate(True, reason="ai mode") async def _write_runtime_health_loop(voice: HamadGeminiVoice, interval_sec: float = 1.0): while True: try: payload = voice.get_runtime_health() if hasattr(voice, "get_runtime_health") else {} payload = dict(payload or {}) payload["model"] = MODEL payload["time"] = payload.get("time") or time.time() RUNTIME_HEALTH_FILE.parent.mkdir(parents=True, exist_ok=True) RUNTIME_HEALTH_FILE.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") except Exception as e: sanad_logger.print_and_log(f"Runtime health write failed: {e}", "warning") await asyncio.sleep(max(0.2, float(interval_sec))) async def _run_component_forever(name: str, coro_factory, restart_delay_sec: float): while True: try: await coro_factory() sanad_logger.print_and_log(f"⚠️ {name} loop ended; restarting.", "warning") except asyncio.CancelledError: raise except Exception as e: sanad_logger.print_and_log(f"❌ {name} loop error: {e}. Restarting.", "warning") await asyncio.sleep(max(0.1, float(restart_delay_sec))) async def _run_mode_policy_sync(voice: HamadGeminiVoice, autonomous_enabled: bool, interval_sec: float = 0.5): last_signature = None while True: try: await asyncio.sleep(max(0.2, float(interval_sec))) if not voice.is_ws_connected(): continue runtime_mode = _read_runtime_mode_safe() mic_enabled = bool(config.read_gemini_mic_enabled()) idle_voice_listen_enabled = bool(config.read_vision_idle_voice_listen_enabled()) signature = (runtime_mode, mic_enabled, idle_voice_listen_enabled, bool(autonomous_enabled)) if not mic_enabled: if signature != last_signature: _apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled) last_signature = signature continue if _interaction_active_now(): last_signature = signature continue if signature != last_signature: _apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled) last_signature = signature except asyncio.CancelledError: raise except Exception as e: sanad_logger.print_and_log(f"⚠️ Mode policy sync error: {e}", "warning") async def _run_autonomous_forever(hub, replay, voice, restart_delay_sec: float): from Modes.AI.autonomous_manager import AutonomousManager while True: am = AutonomousManager() try: await am.run(hub=hub, replay=replay, voice=voice, ws=None) sanad_logger.print_and_log("⚠️ Autonomous manager exited; restarting.", "warning") except asyncio.CancelledError: try: am.stop() except Exception: pass raise except Exception as e: sanad_logger.print_and_log(f"❌ Autonomous manager error: {e}. Restarting.", "warning") await asyncio.sleep(max(0.2, float(restart_delay_sec))) async def _run_ws_supervisor( voice: HamadGeminiVoice, autonomous_enabled: bool, dashboard_url: str | None = None, manual_lean_runtime: bool = False, ): backoff = float(config.read_watchdog_ws_initial_backoff_sec()) max_backoff = float(config.read_watchdog_ws_max_backoff_sec()) while True: sanad_logger.print_and_log(f"\n🚀 Connecting to Gemini ({MODEL})...", message_type="info") ws = None try: async with websockets.connect(URI, **voice._ws_connect_kwargs()) as ws: setup_msg = _build_setup_message() await ws.send(json.dumps(setup_msg)) await ws.recv() voice.attach_ws(ws) backoff = float(config.read_watchdog_ws_initial_backoff_sec()) _apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled) sanad_logger.print_and_log("🎙️ Connected! Sanad ready.", message_type="info") if dashboard_url: base_url = dashboard_url.rstrip("/") sanad_logger.print_and_log(f"🌐 Dashboard: {base_url}/", "info") sanad_logger.print_and_log(f"📊 Runtime Health API: {base_url}/api/runtime_health", "info") sanad_logger.print_and_log("🖼️ Live Preview: OFF by default. Enable it from the dashboard when needed.", "info") sanad_logger.print_and_log("🛡️ Idle behavior: no auto-shutdown on silence. Gemini WS keepalive is active.", "info") if manual_lean_runtime: sanad_logger.print_and_log("🎮 Manual lean runtime: voice conversation only. Restart in AI mode for replay/capture/autonomous services.", "info") else: sanad_logger.print_and_log("🎮 Controls:", "info") sanad_logger.print_and_log(" - R2+X : Replay + Photographer Talk + Take Photo", "info") sanad_logger.print_and_log(" - R2+L1: HARD CANCEL safety combo (AI/manual) -> cancel session/replay", "info") await ws.wait_closed() except asyncio.CancelledError: if ws is not None: try: await ws.close() except Exception: pass raise except websockets.exceptions.ConnectionClosed as e: sanad_logger.print_and_log(f"⚠️ WebSocket closed: {e}", message_type="warning") except Exception as e: sanad_logger.print_and_log(f"❌ WS supervisor error: {e}", message_type="error") finally: voice.detach_ws(reason="ws supervisor disconnect") sleep_s = backoff + random.uniform(0.0, 0.8) sanad_logger.print_and_log(f"🔁 Reconnecting in {sleep_s:.1f}s...", message_type="warning") await asyncio.sleep(max(0.1, sleep_s)) backoff = min(max_backoff, backoff * 1.7) def _manual_lean_runtime_enabled(startup_mode: str) -> bool: if startup_mode != "manual": return False raw = os.environ.get("MANUAL_LEAN_RUNTIME") if raw is None or raw == "": return False return str(raw).strip().lower() in ("1", "true", "yes", "on", "y") async def _run_autonomous_mode_supervisor(hub, replay, voice, restart_delay_sec: float, poll_sec: float = 0.5): task = None last_should_run = None try: while True: should_run = _read_runtime_mode_safe() == "ai" if should_run != last_should_run: if should_run: sanad_logger.print_and_log("🤖 AI mode active: starting autonomous manager.", "info") else: sanad_logger.print_and_log("🤖 Manual mode active: autonomous manager paused.", "info") last_should_run = should_run if should_run: if task is None or task.done(): task = asyncio.create_task( _run_autonomous_forever(hub=hub, replay=replay, voice=voice, restart_delay_sec=restart_delay_sec) ) elif task is not None: task.cancel() await asyncio.gather(task, return_exceptions=True) task = None await asyncio.sleep(max(0.2, float(poll_sec))) finally: if task is not None: task.cancel() await asyncio.gather(task, return_exceptions=True) async def main(): startup_mode = _read_runtime_mode_safe() autonomous_enabled = bool(int(os.environ.get("AUTONOMOUS_ENABLE", "0"))) manual_lean_runtime = _manual_lean_runtime_enabled(startup_mode) hub = None replay = None dashboard_capture_func = None dashboard_replay_test_func = None if manual_lean_runtime: sanad_logger.print_and_log( "🪶 Manual lean runtime active: Gemini + dashboard only. DDS, replay, uploader, and teleimager-dependent startup are skipped.", "info", ) def _manual_capture_unavailable() -> str: return "[ERR] Capture unavailable in manual lean runtime. Restart in AI mode for camera/replay services." dashboard_capture_func = _manual_capture_unavailable else: from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ from Modes.Manual.controller import auto_pick_iface, LowStateHub from Modes.Manual.replay_engine import ReplayWithHome from Server.capture_service import capture_with_replay_sync, ensure_replay_integrity iface = auto_pick_iface() sanad_logger.print_and_log(f"🌐 DDS iface: {iface}", "info") ChannelFactoryInitialize(0, iface) hub = LowStateHub(watchdog_timeout=0.25) sub = ChannelSubscriber("rt/lowstate", LowState_) sub.Init(hub.handler, 10) replay = ReplayWithHome(hub, watchdog_disable_after=1.0) startup_replay = config.read_selected_replay_path() fallback_replay = config.DATA_DIR / "photo_G3.jsonl" selected_replay, replay_report = ensure_replay_integrity(startup_replay, fallback_replay) if not replay_report.get("active", {}).get("ok", False): sanad_logger.print_and_log( f"⚠️ Active replay invalid: {replay_report.get('active', {}).get('error', 'unknown')}", "warning", ) elif not replay_report.get("active", {}).get("trigger_ok", False): sanad_logger.print_and_log( "⚠️ Active replay has no trigger markers. Timed fallback capture will be used.", "warning", ) if replay_report.get("fallback_used"): sanad_logger.print_and_log( f"⚠️ Using fallback replay: {selected_replay.name}", "warning", ) if replay_report.get("degraded_no_trigger"): sanad_logger.print_and_log( "⚠️ No replay with trigger markers found. Running in degraded triggerless mode.", "warning", ) config.REPLAY_FILE = selected_replay try: rel_name = str(selected_replay.resolve().relative_to(config.DATA_DIR)).replace("\\", "/") config.write_selected_replay_name(rel_name) except Exception: pass def _capture_from_dashboard() -> str: base_prefix = os.environ.get("PHOTO_PREFIX", "photo").strip() or "photo" delay = max(0.0, min(PHOTO_DELAY_SEC, PHOTO_TOTAL_SEC)) return capture_with_replay_sync( replay_runner=replay, replay_file=config.REPLAY_FILE, home_file=config.HOME_FILE, delay_sec=delay, prefix=base_prefix, speed=1.0, ) dashboard_capture_func = _capture_from_dashboard def _test_replay_from_dashboard(replay_name: str) -> str: try: candidate = config.resolve_replay_path(replay_name) except Exception as e: return f"[ERR] invalid replay path: {e}" if not candidate.exists(): return "[ERR] replay not found" if replay.is_playing: return "[ERR] replay already in progress" replay.run(candidate, config.HOME_FILE, speed=1.0) return str(candidate) dashboard_replay_test_func = _test_replay_from_dashboard url = start_photo_server( PHOTOS_DIR, port=PHOTO_SERVER_PORT, capture_func=dashboard_capture_func, replay_test_func=dashboard_replay_test_func, ) sanad_logger.print_and_log(f"🖼️ Phone gallery: {url}", message_type="info") if not manual_lean_runtime: from Server.uploader import start_uploader try: start_uploader(PHOTOS_DIR) except Exception as e: sanad_logger.print_and_log(f"Failed to start uploader: {e}", message_type="warning") voice = HamadGeminiVoice( photo_phrases_file=str(config.PHOTO_PHRASES_FILE), ) if startup_mode == "ai" or autonomous_enabled: voice.calibrate_mic() else: sanad_logger.print_and_log("🎙️ Mic calibration skipped at startup (manual mode).", "info") sanad_logger.print_and_log("ℹ️ Manual mode: mic follows dashboard toggle, but AI photo commands and AI detection stay disabled until AI mode.", "info") restart_delay = float(config.read_watchdog_component_restart_delay_sec()) tasks = [ asyncio.create_task(_run_component_forever("capture_mic", lambda: voice.capture_mic(), restart_delay)), asyncio.create_task(_run_component_forever("receive_audio", lambda: voice.receive_audio(), restart_delay)), asyncio.create_task(_run_component_forever("play_audio", lambda: voice.play_audio(), restart_delay)), asyncio.create_task(_run_component_forever("keepalive", lambda: voice.keepalive(every_sec=20.0), restart_delay)), asyncio.create_task(_run_component_forever("runtime_health", lambda: _write_runtime_health_loop(voice, 1.0), restart_delay)), asyncio.create_task( _run_ws_supervisor( voice, autonomous_enabled=autonomous_enabled, dashboard_url=url, manual_lean_runtime=manual_lean_runtime, ) ), asyncio.create_task(_run_mode_policy_sync(voice, autonomous_enabled=autonomous_enabled)), ] if not manual_lean_runtime: from Modes.Manual.trigger_loop import trigger_loop tasks.append( asyncio.create_task( _run_component_forever( "trigger_loop", lambda: trigger_loop(hub=hub, replay=replay, voice=voice, take_photo_sync_callable=None, ws=None), restart_delay, ) ) ) if autonomous_enabled and not manual_lean_runtime: tasks.append( asyncio.create_task( _run_autonomous_mode_supervisor( hub=hub, replay=replay, voice=voice, restart_delay_sec=restart_delay, ) ) ) sanad_logger.print_and_log("🤖 Autonomous services armed (AUTONOMOUS_ENABLE=1). AI mode can start without restart.", "info") try: await asyncio.gather(*tasks) finally: for t in tasks: if not t.done(): t.cancel() await asyncio.gather(*tasks, return_exceptions=True) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: sanad_logger.print_and_log("\n👋 Ma'a Salama (Goodbye)!", message_type="info") except Exception as e: sanad_logger.print_and_log(f"\n❌ Fatal Error: {e}", message_type="error")