464 lines
18 KiB
Python
464 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
voice_sanad.py (PRODUCTION)
|
||
- Starts photo server ONCE and keeps it alive.
|
||
- Initializes DDS ONCE.
|
||
- Keeps core components alive (mic/speaker/receive/trigger/autonomous).
|
||
- Uses a dedicated WS supervisor loop that reconnects only the Gemini channel.
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import random
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import websockets
|
||
|
||
from Core.Logger import Logs
|
||
|
||
from Core import settings as config
|
||
from Core.settings import (
|
||
PHOTOS_DIR,
|
||
PHOTO_SERVER_PORT,
|
||
MODEL,
|
||
VOICE_NAME,
|
||
GEMINI_API_KEY,
|
||
PHOTO_TOTAL_SEC,
|
||
PHOTO_DELAY_SEC,
|
||
load_system_prompt,
|
||
)
|
||
|
||
from Server.photo_server import start_photo_server
|
||
from Gemini.gemini_voice import HamadGeminiVoice
|
||
|
||
|
||
sanad_logger = Logs()
|
||
sanad_logger.LogEngine("G1_Logs", "voice_sanad")
|
||
|
||
API_KEY = GEMINI_API_KEY.strip()
|
||
if not API_KEY:
|
||
raise RuntimeError(f"Gemini API key is empty. Set gemini.api_key in {config.CONFIG_JSON}.")
|
||
|
||
URI = (
|
||
"wss://generativelanguage.googleapis.com/ws/"
|
||
"google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent"
|
||
f"?key={API_KEY}"
|
||
)
|
||
|
||
RUNTIME_HEALTH_FILE = Path(config.RUNTIME_HEALTH_FILE)
|
||
INTERACTION_FLAG_FILE = Path(config.SCRIPTS_DIR) / "interaction_triggered.flag"
|
||
|
||
|
||
def _build_setup_message() -> dict:
|
||
system_prompt = load_system_prompt()
|
||
return {
|
||
"setup": {
|
||
"model": MODEL,
|
||
"generationConfig": {
|
||
"responseModalities": ["AUDIO"],
|
||
"speechConfig": {
|
||
"voiceConfig": {"prebuiltVoiceConfig": {"voiceName": VOICE_NAME}}
|
||
},
|
||
},
|
||
"inputAudioTranscription": {},
|
||
"systemInstruction": {"parts": [{"text": system_prompt}]},
|
||
}
|
||
}
|
||
|
||
|
||
def _read_runtime_mode_safe() -> str:
|
||
try:
|
||
mode = str(config.read_runtime_mode()).strip().lower()
|
||
except Exception:
|
||
mode = "manual"
|
||
return mode if mode in ("manual", "ai") else "manual"
|
||
|
||
|
||
def _interaction_active_now() -> bool:
|
||
try:
|
||
return INTERACTION_FLAG_FILE.exists()
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _apply_idle_audio_policy(voice: HamadGeminiVoice, autonomous_enabled: bool):
|
||
runtime_mode = _read_runtime_mode_safe()
|
||
idle_voice_listen_enabled = bool(config.read_vision_idle_voice_listen_enabled())
|
||
mic_enabled = bool(config.read_gemini_mic_enabled())
|
||
|
||
if not mic_enabled:
|
||
voice.set_passive_listen(False, reason="mic disabled")
|
||
voice.set_audio_gate(False, reason="mic disabled")
|
||
return
|
||
|
||
if runtime_mode != "ai":
|
||
voice.set_passive_listen(False, reason=f"{runtime_mode} mode")
|
||
voice.set_audio_gate(True, reason=f"{runtime_mode} mode")
|
||
return
|
||
|
||
if autonomous_enabled:
|
||
if idle_voice_listen_enabled:
|
||
voice.set_passive_listen(True, reason="autonomous idle")
|
||
voice.set_audio_gate(True, reason="autonomous idle listen")
|
||
else:
|
||
voice.set_passive_listen(False, reason="autonomous idle")
|
||
voice.set_audio_gate(False, reason="autonomous idle")
|
||
return
|
||
|
||
voice.set_passive_listen(False, reason="ai mode")
|
||
voice.set_audio_gate(True, reason="ai mode")
|
||
|
||
|
||
async def _write_runtime_health_loop(voice: HamadGeminiVoice, interval_sec: float = 1.0):
|
||
while True:
|
||
try:
|
||
payload = voice.get_runtime_health() if hasattr(voice, "get_runtime_health") else {}
|
||
payload = dict(payload or {})
|
||
payload["model"] = MODEL
|
||
payload["time"] = payload.get("time") or time.time()
|
||
RUNTIME_HEALTH_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
RUNTIME_HEALTH_FILE.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
|
||
except Exception as e:
|
||
sanad_logger.print_and_log(f"Runtime health write failed: {e}", "warning")
|
||
await asyncio.sleep(max(0.2, float(interval_sec)))
|
||
|
||
|
||
async def _run_component_forever(name: str, coro_factory, restart_delay_sec: float):
|
||
while True:
|
||
try:
|
||
await coro_factory()
|
||
sanad_logger.print_and_log(f"⚠️ {name} loop ended; restarting.", "warning")
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as e:
|
||
sanad_logger.print_and_log(f"❌ {name} loop error: {e}. Restarting.", "warning")
|
||
await asyncio.sleep(max(0.1, float(restart_delay_sec)))
|
||
|
||
|
||
async def _run_mode_policy_sync(voice: HamadGeminiVoice, autonomous_enabled: bool, interval_sec: float = 0.5):
|
||
last_signature = None
|
||
while True:
|
||
try:
|
||
await asyncio.sleep(max(0.2, float(interval_sec)))
|
||
if not voice.is_ws_connected():
|
||
continue
|
||
runtime_mode = _read_runtime_mode_safe()
|
||
mic_enabled = bool(config.read_gemini_mic_enabled())
|
||
idle_voice_listen_enabled = bool(config.read_vision_idle_voice_listen_enabled())
|
||
signature = (runtime_mode, mic_enabled, idle_voice_listen_enabled, bool(autonomous_enabled))
|
||
if not mic_enabled:
|
||
if signature != last_signature:
|
||
_apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled)
|
||
last_signature = signature
|
||
continue
|
||
if _interaction_active_now():
|
||
last_signature = signature
|
||
continue
|
||
if signature != last_signature:
|
||
_apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled)
|
||
last_signature = signature
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as e:
|
||
sanad_logger.print_and_log(f"⚠️ Mode policy sync error: {e}", "warning")
|
||
|
||
|
||
async def _run_autonomous_forever(hub, replay, voice, restart_delay_sec: float):
|
||
from Modes.AI.autonomous_manager import AutonomousManager
|
||
|
||
while True:
|
||
am = AutonomousManager()
|
||
try:
|
||
await am.run(hub=hub, replay=replay, voice=voice, ws=None)
|
||
sanad_logger.print_and_log("⚠️ Autonomous manager exited; restarting.", "warning")
|
||
except asyncio.CancelledError:
|
||
try:
|
||
am.stop()
|
||
except Exception:
|
||
pass
|
||
raise
|
||
except Exception as e:
|
||
sanad_logger.print_and_log(f"❌ Autonomous manager error: {e}. Restarting.", "warning")
|
||
await asyncio.sleep(max(0.2, float(restart_delay_sec)))
|
||
|
||
|
||
async def _run_ws_supervisor(
|
||
voice: HamadGeminiVoice,
|
||
autonomous_enabled: bool,
|
||
dashboard_url: str | None = None,
|
||
manual_lean_runtime: bool = False,
|
||
):
|
||
backoff = float(config.read_watchdog_ws_initial_backoff_sec())
|
||
max_backoff = float(config.read_watchdog_ws_max_backoff_sec())
|
||
|
||
while True:
|
||
sanad_logger.print_and_log(f"\n🚀 Connecting to Gemini ({MODEL})...", message_type="info")
|
||
ws = None
|
||
try:
|
||
async with websockets.connect(URI, **voice._ws_connect_kwargs()) as ws:
|
||
setup_msg = _build_setup_message()
|
||
await ws.send(json.dumps(setup_msg))
|
||
await ws.recv()
|
||
|
||
voice.attach_ws(ws)
|
||
backoff = float(config.read_watchdog_ws_initial_backoff_sec())
|
||
_apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled)
|
||
|
||
sanad_logger.print_and_log("🎙️ Connected! Sanad ready.", message_type="info")
|
||
if dashboard_url:
|
||
base_url = dashboard_url.rstrip("/")
|
||
sanad_logger.print_and_log(f"🌐 Dashboard: {base_url}/", "info")
|
||
sanad_logger.print_and_log(f"📊 Runtime Health API: {base_url}/api/runtime_health", "info")
|
||
sanad_logger.print_and_log("🖼️ Live Preview: OFF by default. Enable it from the dashboard when needed.", "info")
|
||
sanad_logger.print_and_log("🛡️ Idle behavior: no auto-shutdown on silence. Gemini WS keepalive is active.", "info")
|
||
if manual_lean_runtime:
|
||
sanad_logger.print_and_log("🎮 Manual lean runtime: voice conversation only. Restart in AI mode for replay/capture/autonomous services.", "info")
|
||
else:
|
||
sanad_logger.print_and_log("🎮 Controls:", "info")
|
||
sanad_logger.print_and_log(" - R2+X : Replay + Photographer Talk + Take Photo", "info")
|
||
sanad_logger.print_and_log(" - R2+L1: HARD CANCEL safety combo (AI/manual) -> cancel session/replay", "info")
|
||
|
||
await ws.wait_closed()
|
||
except asyncio.CancelledError:
|
||
if ws is not None:
|
||
try:
|
||
await ws.close()
|
||
except Exception:
|
||
pass
|
||
raise
|
||
except websockets.exceptions.ConnectionClosed as e:
|
||
sanad_logger.print_and_log(f"⚠️ WebSocket closed: {e}", message_type="warning")
|
||
except Exception as e:
|
||
sanad_logger.print_and_log(f"❌ WS supervisor error: {e}", message_type="error")
|
||
finally:
|
||
voice.detach_ws(reason="ws supervisor disconnect")
|
||
|
||
sleep_s = backoff + random.uniform(0.0, 0.8)
|
||
sanad_logger.print_and_log(f"🔁 Reconnecting in {sleep_s:.1f}s...", message_type="warning")
|
||
await asyncio.sleep(max(0.1, sleep_s))
|
||
backoff = min(max_backoff, backoff * 1.7)
|
||
|
||
|
||
def _manual_lean_runtime_enabled(startup_mode: str) -> bool:
|
||
if startup_mode != "manual":
|
||
return False
|
||
raw = os.environ.get("MANUAL_LEAN_RUNTIME")
|
||
if raw is None or raw == "":
|
||
return False
|
||
return str(raw).strip().lower() in ("1", "true", "yes", "on", "y")
|
||
|
||
|
||
async def _run_autonomous_mode_supervisor(hub, replay, voice, restart_delay_sec: float, poll_sec: float = 0.5):
|
||
task = None
|
||
last_should_run = None
|
||
try:
|
||
while True:
|
||
should_run = _read_runtime_mode_safe() == "ai"
|
||
if should_run != last_should_run:
|
||
if should_run:
|
||
sanad_logger.print_and_log("🤖 AI mode active: starting autonomous manager.", "info")
|
||
else:
|
||
sanad_logger.print_and_log("🤖 Manual mode active: autonomous manager paused.", "info")
|
||
last_should_run = should_run
|
||
|
||
if should_run:
|
||
if task is None or task.done():
|
||
task = asyncio.create_task(
|
||
_run_autonomous_forever(hub=hub, replay=replay, voice=voice, restart_delay_sec=restart_delay_sec)
|
||
)
|
||
elif task is not None:
|
||
task.cancel()
|
||
await asyncio.gather(task, return_exceptions=True)
|
||
task = None
|
||
|
||
await asyncio.sleep(max(0.2, float(poll_sec)))
|
||
finally:
|
||
if task is not None:
|
||
task.cancel()
|
||
await asyncio.gather(task, return_exceptions=True)
|
||
|
||
|
||
async def main():
|
||
startup_mode = _read_runtime_mode_safe()
|
||
autonomous_enabled = bool(int(os.environ.get("AUTONOMOUS_ENABLE", "0")))
|
||
manual_lean_runtime = _manual_lean_runtime_enabled(startup_mode)
|
||
|
||
hub = None
|
||
replay = None
|
||
dashboard_capture_func = None
|
||
dashboard_replay_test_func = None
|
||
|
||
if manual_lean_runtime:
|
||
sanad_logger.print_and_log(
|
||
"🪶 Manual lean runtime active: Gemini + dashboard only. DDS, replay, uploader, and teleimager-dependent startup are skipped.",
|
||
"info",
|
||
)
|
||
|
||
def _manual_capture_unavailable() -> str:
|
||
return "[ERR] Capture unavailable in manual lean runtime. Restart in AI mode for camera/replay services."
|
||
|
||
dashboard_capture_func = _manual_capture_unavailable
|
||
else:
|
||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_
|
||
from Modes.Manual.controller import auto_pick_iface, LowStateHub
|
||
from Modes.Manual.replay_engine import ReplayWithHome
|
||
from Server.capture_service import capture_with_replay_sync, ensure_replay_integrity
|
||
|
||
iface = auto_pick_iface()
|
||
sanad_logger.print_and_log(f"🌐 DDS iface: {iface}", "info")
|
||
ChannelFactoryInitialize(0, iface)
|
||
|
||
hub = LowStateHub(watchdog_timeout=0.25)
|
||
sub = ChannelSubscriber("rt/lowstate", LowState_)
|
||
sub.Init(hub.handler, 10)
|
||
|
||
replay = ReplayWithHome(hub, watchdog_disable_after=1.0)
|
||
|
||
startup_replay = config.read_selected_replay_path()
|
||
fallback_replay = config.DATA_DIR / "photo_G3.jsonl"
|
||
selected_replay, replay_report = ensure_replay_integrity(startup_replay, fallback_replay)
|
||
if not replay_report.get("active", {}).get("ok", False):
|
||
sanad_logger.print_and_log(
|
||
f"⚠️ Active replay invalid: {replay_report.get('active', {}).get('error', 'unknown')}",
|
||
"warning",
|
||
)
|
||
elif not replay_report.get("active", {}).get("trigger_ok", False):
|
||
sanad_logger.print_and_log(
|
||
"⚠️ Active replay has no trigger markers. Timed fallback capture will be used.",
|
||
"warning",
|
||
)
|
||
if replay_report.get("fallback_used"):
|
||
sanad_logger.print_and_log(
|
||
f"⚠️ Using fallback replay: {selected_replay.name}",
|
||
"warning",
|
||
)
|
||
if replay_report.get("degraded_no_trigger"):
|
||
sanad_logger.print_and_log(
|
||
"⚠️ No replay with trigger markers found. Running in degraded triggerless mode.",
|
||
"warning",
|
||
)
|
||
config.REPLAY_FILE = selected_replay
|
||
try:
|
||
rel_name = str(selected_replay.resolve().relative_to(config.DATA_DIR)).replace("\\", "/")
|
||
config.write_selected_replay_name(rel_name)
|
||
except Exception:
|
||
pass
|
||
|
||
def _capture_from_dashboard() -> str:
|
||
base_prefix = os.environ.get("PHOTO_PREFIX", "photo").strip() or "photo"
|
||
delay = max(0.0, min(PHOTO_DELAY_SEC, PHOTO_TOTAL_SEC))
|
||
return capture_with_replay_sync(
|
||
replay_runner=replay,
|
||
replay_file=config.REPLAY_FILE,
|
||
home_file=config.HOME_FILE,
|
||
delay_sec=delay,
|
||
prefix=base_prefix,
|
||
speed=1.0,
|
||
)
|
||
|
||
dashboard_capture_func = _capture_from_dashboard
|
||
|
||
def _test_replay_from_dashboard(replay_name: str) -> str:
|
||
try:
|
||
candidate = config.resolve_replay_path(replay_name)
|
||
except Exception as e:
|
||
return f"[ERR] invalid replay path: {e}"
|
||
if not candidate.exists():
|
||
return "[ERR] replay not found"
|
||
if replay.is_playing:
|
||
return "[ERR] replay already in progress"
|
||
replay.run(candidate, config.HOME_FILE, speed=1.0)
|
||
return str(candidate)
|
||
|
||
dashboard_replay_test_func = _test_replay_from_dashboard
|
||
|
||
url = start_photo_server(
|
||
PHOTOS_DIR,
|
||
port=PHOTO_SERVER_PORT,
|
||
capture_func=dashboard_capture_func,
|
||
replay_test_func=dashboard_replay_test_func,
|
||
)
|
||
sanad_logger.print_and_log(f"🖼️ Phone gallery: {url}", message_type="info")
|
||
|
||
if not manual_lean_runtime:
|
||
from Server.uploader import start_uploader
|
||
|
||
try:
|
||
start_uploader(PHOTOS_DIR)
|
||
except Exception as e:
|
||
sanad_logger.print_and_log(f"Failed to start uploader: {e}", message_type="warning")
|
||
|
||
voice = HamadGeminiVoice(
|
||
photo_phrases_file=str(config.PHOTO_PHRASES_FILE),
|
||
)
|
||
|
||
if startup_mode == "ai" or autonomous_enabled:
|
||
voice.calibrate_mic()
|
||
else:
|
||
sanad_logger.print_and_log("🎙️ Mic calibration skipped at startup (manual mode).", "info")
|
||
sanad_logger.print_and_log("ℹ️ Manual mode: mic follows dashboard toggle, but AI photo commands and AI detection stay disabled until AI mode.", "info")
|
||
restart_delay = float(config.read_watchdog_component_restart_delay_sec())
|
||
|
||
tasks = [
|
||
asyncio.create_task(_run_component_forever("capture_mic", lambda: voice.capture_mic(), restart_delay)),
|
||
asyncio.create_task(_run_component_forever("receive_audio", lambda: voice.receive_audio(), restart_delay)),
|
||
asyncio.create_task(_run_component_forever("play_audio", lambda: voice.play_audio(), restart_delay)),
|
||
asyncio.create_task(_run_component_forever("keepalive", lambda: voice.keepalive(every_sec=20.0), restart_delay)),
|
||
asyncio.create_task(_run_component_forever("runtime_health", lambda: _write_runtime_health_loop(voice, 1.0), restart_delay)),
|
||
asyncio.create_task(
|
||
_run_ws_supervisor(
|
||
voice,
|
||
autonomous_enabled=autonomous_enabled,
|
||
dashboard_url=url,
|
||
manual_lean_runtime=manual_lean_runtime,
|
||
)
|
||
),
|
||
asyncio.create_task(_run_mode_policy_sync(voice, autonomous_enabled=autonomous_enabled)),
|
||
]
|
||
|
||
if not manual_lean_runtime:
|
||
from Modes.Manual.trigger_loop import trigger_loop
|
||
|
||
tasks.append(
|
||
asyncio.create_task(
|
||
_run_component_forever(
|
||
"trigger_loop",
|
||
lambda: trigger_loop(hub=hub, replay=replay, voice=voice, take_photo_sync_callable=None, ws=None),
|
||
restart_delay,
|
||
)
|
||
)
|
||
)
|
||
|
||
if autonomous_enabled and not manual_lean_runtime:
|
||
tasks.append(
|
||
asyncio.create_task(
|
||
_run_autonomous_mode_supervisor(
|
||
hub=hub,
|
||
replay=replay,
|
||
voice=voice,
|
||
restart_delay_sec=restart_delay,
|
||
)
|
||
)
|
||
)
|
||
sanad_logger.print_and_log("🤖 Autonomous services armed (AUTONOMOUS_ENABLE=1). AI mode can start without restart.", "info")
|
||
|
||
try:
|
||
await asyncio.gather(*tasks)
|
||
finally:
|
||
for t in tasks:
|
||
if not t.done():
|
||
t.cancel()
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
sanad_logger.print_and_log("\n👋 Ma'a Salama (Goodbye)!", message_type="info")
|
||
except Exception as e:
|
||
sanad_logger.print_and_log(f"\n❌ Fatal Error: {e}", message_type="error")
|