#!/usr/bin/env python3 """ voice_sanad.py (PRODUCTION) - Starts photo server ONCE (thread) and keeps it alive. - Initializes DDS ONCE (CycloneDDS doesn't like re-init loops). - Runs Gemini WS sessions in a reconnect loop with backoff + jitter. - If WS drops / idle disconnects: reconnect automatically without killing the whole app. """ import asyncio import json import os import random import websockets from Logger import Logs from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ from config import ( PHOTOS_DIR, PHOTO_SERVER_PORT, MODEL, VOICE_NAME, GEMINI_API_KEY, PHOTO_TOTAL_SEC, PHOTO_THANKS_SEC, PHOTO_DELAY_SEC, load_system_prompt, ) from photo_server import start_photo_server from controller import auto_pick_iface, LowStateHub from replay_engine import ReplayWithHome from gemini_voice import HamadGeminiVoice from trigger_loop import trigger_loop from photo_runner import take_photo_sync # ================================================== # 🪵 LOGGERS (separate logs like you want) # ================================================== sanad_logger = Logs() sanad_logger.LogEngine("G1_Logs", "voice_sanad") # ================================================== # 🌐 GEMINI URI # ================================================== API_KEY = GEMINI_API_KEY.strip() if not API_KEY: raise RuntimeError("GEMINI_API_KEY is empty. Export it before running.") URI = ( "wss://generativelanguage.googleapis.com/ws/" "google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent" f"?key={API_KEY}" ) async def run_ws_session(ws, hub, replay, voice) -> None: """ Runs ONE Gemini WS session. If WS drops or any task ends, this returns -> caller reconnects. """ system_prompt = load_system_prompt() setup_msg = { "setup": { "model": MODEL, "generationConfig": { "responseModalities": ["AUDIO"], "speechConfig": { "voiceConfig": {"prebuiltVoiceConfig": {"voiceName": VOICE_NAME}} }, }, "inputAudioTranscription": {}, "systemInstruction": {"parts": [{"text": system_prompt}]}, } } await ws.send(json.dumps(setup_msg)) await ws.recv() sanad_logger.print_and_log("šŸŽ™ļø Connected! Sanad ready.", message_type="info") sanad_logger.print_and_log("šŸŽ® Controls:", "info") sanad_logger.print_and_log(" - R2+X : Replay + Photographer Talk + Take Photo", "info") sanad_logger.print_and_log(" - R2+L1: Cancel replay -> Home", "info") # Trigger loop uses this ws (session-bound) trig_task = asyncio.create_task( trigger_loop( hub=hub, replay=replay, voice=voice, ws=ws, take_photo_sync_callable=take_photo_sync, # timing can stay inside trigger_loop via config/env, # but we keep these exported via config.py already ) ) tasks = [ asyncio.create_task(voice.capture_mic(ws)), asyncio.create_task(voice.receive_audio(ws)), asyncio.create_task(voice.play_audio()), # IMPORTANT: no ws here asyncio.create_task(voice.keepalive(ws, every_sec=20.0)), # helps idle stability trig_task, ] try: done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) for t in done: exc = None try: exc = t.exception() except asyncio.CancelledError: exc = None except Exception as e: exc = e if exc: sanad_logger.print_and_log(f"āš ļø Task ended with error: {exc}", message_type="warning") finally: for t in tasks: if not t.done(): t.cancel() await asyncio.gather(*tasks, return_exceptions=True) async def main(): # 1) Start photo server ONCE (never stops) url = start_photo_server(PHOTOS_DIR, port=PHOTO_SERVER_PORT) sanad_logger.print_and_log(f"šŸ–¼ļø Phone gallery: {url}", message_type="info") # 2) Init DDS ONCE (never loop this) iface = auto_pick_iface() sanad_logger.print_and_log(f"🌐 DDS iface: {iface}", "info") ChannelFactoryInitialize(0, iface) hub = LowStateHub(watchdog_timeout=0.25) sub = ChannelSubscriber("rt/lowstate", LowState_) sub.Init(hub.handler, 10) replay = ReplayWithHome(hub, watchdog_disable_after=1.0) # 3) Voice object once (keeps audio device stable) voice = HamadGeminiVoice() voice.calibrate_mic() # 4) Reconnect loop with backoff backoff = 1.0 max_backoff = 20.0 while True: sanad_logger.print_and_log(f"\nšŸš€ Connecting to Gemini ({MODEL})...", message_type="info") try: async with websockets.connect(URI, **voice._ws_connect_kwargs()) as ws: backoff = 1.0 # reset after a successful connect await run_ws_session(ws, hub, replay, voice) except websockets.exceptions.ConnectionClosed as e: sanad_logger.print_and_log(f"āš ļø WebSocket closed: {e}", message_type="warning") except Exception as e: sanad_logger.print_and_log(f"āŒ WS session error: {e}", message_type="error") # Backoff sleep with jitter sleep_s = backoff + random.uniform(0.0, 0.8) sanad_logger.print_and_log(f"šŸ” Reconnecting in {sleep_s:.1f}s...", message_type="warning") await asyncio.sleep(sleep_s) backoff = min(max_backoff, backoff * 1.7) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: sanad_logger.print_and_log("\nšŸ‘‹ Ma'a Salama (Goodbye)!", message_type="info") except Exception as e: sanad_logger.print_and_log(f"\nāŒ Fatal Error: {e}", message_type="error")