AI_Photographer/Gemini/voice_sanad.py
2026-04-12 18:52:37 +04:00

464 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
voice_sanad.py (PRODUCTION)
- Starts photo server ONCE and keeps it alive.
- Initializes DDS ONCE.
- Keeps core components alive (mic/speaker/receive/trigger/autonomous).
- Uses a dedicated WS supervisor loop that reconnects only the Gemini channel.
"""
import asyncio
import json
import os
import random
import time
from pathlib import Path
import websockets
from Core.Logger import Logs
from Core import settings as config
from Core.settings import (
PHOTOS_DIR,
PHOTO_SERVER_PORT,
MODEL,
VOICE_NAME,
GEMINI_API_KEY,
PHOTO_TOTAL_SEC,
PHOTO_DELAY_SEC,
load_system_prompt,
)
from Server.photo_server import start_photo_server
from Gemini.gemini_voice import HamadGeminiVoice
sanad_logger = Logs()
sanad_logger.LogEngine("G1_Logs", "voice_sanad")
API_KEY = GEMINI_API_KEY.strip()
if not API_KEY:
raise RuntimeError(f"Gemini API key is empty. Set gemini.api_key in {config.CONFIG_JSON}.")
URI = (
"wss://generativelanguage.googleapis.com/ws/"
"google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent"
f"?key={API_KEY}"
)
RUNTIME_HEALTH_FILE = Path(config.RUNTIME_HEALTH_FILE)
INTERACTION_FLAG_FILE = Path(config.SCRIPTS_DIR) / "interaction_triggered.flag"
def _build_setup_message() -> dict:
system_prompt = load_system_prompt()
return {
"setup": {
"model": MODEL,
"generationConfig": {
"responseModalities": ["AUDIO"],
"speechConfig": {
"voiceConfig": {"prebuiltVoiceConfig": {"voiceName": VOICE_NAME}}
},
},
"inputAudioTranscription": {},
"systemInstruction": {"parts": [{"text": system_prompt}]},
}
}
def _read_runtime_mode_safe() -> str:
try:
mode = str(config.read_runtime_mode()).strip().lower()
except Exception:
mode = "manual"
return mode if mode in ("manual", "ai") else "manual"
def _interaction_active_now() -> bool:
try:
return INTERACTION_FLAG_FILE.exists()
except Exception:
return False
def _apply_idle_audio_policy(voice: HamadGeminiVoice, autonomous_enabled: bool):
runtime_mode = _read_runtime_mode_safe()
idle_voice_listen_enabled = bool(config.read_vision_idle_voice_listen_enabled())
mic_enabled = bool(config.read_gemini_mic_enabled())
if not mic_enabled:
voice.set_passive_listen(False, reason="mic disabled")
voice.set_audio_gate(False, reason="mic disabled")
return
if runtime_mode != "ai":
voice.set_passive_listen(False, reason=f"{runtime_mode} mode")
voice.set_audio_gate(True, reason=f"{runtime_mode} mode")
return
if autonomous_enabled:
if idle_voice_listen_enabled:
voice.set_passive_listen(True, reason="autonomous idle")
voice.set_audio_gate(True, reason="autonomous idle listen")
else:
voice.set_passive_listen(False, reason="autonomous idle")
voice.set_audio_gate(False, reason="autonomous idle")
return
voice.set_passive_listen(False, reason="ai mode")
voice.set_audio_gate(True, reason="ai mode")
async def _write_runtime_health_loop(voice: HamadGeminiVoice, interval_sec: float = 1.0):
while True:
try:
payload = voice.get_runtime_health() if hasattr(voice, "get_runtime_health") else {}
payload = dict(payload or {})
payload["model"] = MODEL
payload["time"] = payload.get("time") or time.time()
RUNTIME_HEALTH_FILE.parent.mkdir(parents=True, exist_ok=True)
RUNTIME_HEALTH_FILE.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
except Exception as e:
sanad_logger.print_and_log(f"Runtime health write failed: {e}", "warning")
await asyncio.sleep(max(0.2, float(interval_sec)))
async def _run_component_forever(name: str, coro_factory, restart_delay_sec: float):
while True:
try:
await coro_factory()
sanad_logger.print_and_log(f"⚠️ {name} loop ended; restarting.", "warning")
except asyncio.CancelledError:
raise
except Exception as e:
sanad_logger.print_and_log(f"{name} loop error: {e}. Restarting.", "warning")
await asyncio.sleep(max(0.1, float(restart_delay_sec)))
async def _run_mode_policy_sync(voice: HamadGeminiVoice, autonomous_enabled: bool, interval_sec: float = 0.5):
last_signature = None
while True:
try:
await asyncio.sleep(max(0.2, float(interval_sec)))
if not voice.is_ws_connected():
continue
runtime_mode = _read_runtime_mode_safe()
mic_enabled = bool(config.read_gemini_mic_enabled())
idle_voice_listen_enabled = bool(config.read_vision_idle_voice_listen_enabled())
signature = (runtime_mode, mic_enabled, idle_voice_listen_enabled, bool(autonomous_enabled))
if not mic_enabled:
if signature != last_signature:
_apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled)
last_signature = signature
continue
if _interaction_active_now():
last_signature = signature
continue
if signature != last_signature:
_apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled)
last_signature = signature
except asyncio.CancelledError:
raise
except Exception as e:
sanad_logger.print_and_log(f"⚠️ Mode policy sync error: {e}", "warning")
async def _run_autonomous_forever(hub, replay, voice, restart_delay_sec: float):
from Modes.AI.autonomous_manager import AutonomousManager
while True:
am = AutonomousManager()
try:
await am.run(hub=hub, replay=replay, voice=voice, ws=None)
sanad_logger.print_and_log("⚠️ Autonomous manager exited; restarting.", "warning")
except asyncio.CancelledError:
try:
am.stop()
except Exception:
pass
raise
except Exception as e:
sanad_logger.print_and_log(f"❌ Autonomous manager error: {e}. Restarting.", "warning")
await asyncio.sleep(max(0.2, float(restart_delay_sec)))
async def _run_ws_supervisor(
voice: HamadGeminiVoice,
autonomous_enabled: bool,
dashboard_url: str | None = None,
manual_lean_runtime: bool = False,
):
backoff = float(config.read_watchdog_ws_initial_backoff_sec())
max_backoff = float(config.read_watchdog_ws_max_backoff_sec())
while True:
sanad_logger.print_and_log(f"\n🚀 Connecting to Gemini ({MODEL})...", message_type="info")
ws = None
try:
async with websockets.connect(URI, **voice._ws_connect_kwargs()) as ws:
setup_msg = _build_setup_message()
await ws.send(json.dumps(setup_msg))
await ws.recv()
voice.attach_ws(ws)
backoff = float(config.read_watchdog_ws_initial_backoff_sec())
_apply_idle_audio_policy(voice, autonomous_enabled=autonomous_enabled)
sanad_logger.print_and_log("🎙️ Connected! Sanad ready.", message_type="info")
if dashboard_url:
base_url = dashboard_url.rstrip("/")
sanad_logger.print_and_log(f"🌐 Dashboard: {base_url}/", "info")
sanad_logger.print_and_log(f"📊 Runtime Health API: {base_url}/api/runtime_health", "info")
sanad_logger.print_and_log("🖼️ Live Preview: OFF by default. Enable it from the dashboard when needed.", "info")
sanad_logger.print_and_log("🛡️ Idle behavior: no auto-shutdown on silence. Gemini WS keepalive is active.", "info")
if manual_lean_runtime:
sanad_logger.print_and_log("🎮 Manual lean runtime: voice conversation only. Restart in AI mode for replay/capture/autonomous services.", "info")
else:
sanad_logger.print_and_log("🎮 Controls:", "info")
sanad_logger.print_and_log(" - R2+X : Replay + Photographer Talk + Take Photo", "info")
sanad_logger.print_and_log(" - R2+L1: HARD CANCEL safety combo (AI/manual) -> cancel session/replay", "info")
await ws.wait_closed()
except asyncio.CancelledError:
if ws is not None:
try:
await ws.close()
except Exception:
pass
raise
except websockets.exceptions.ConnectionClosed as e:
sanad_logger.print_and_log(f"⚠️ WebSocket closed: {e}", message_type="warning")
except Exception as e:
sanad_logger.print_and_log(f"❌ WS supervisor error: {e}", message_type="error")
finally:
voice.detach_ws(reason="ws supervisor disconnect")
sleep_s = backoff + random.uniform(0.0, 0.8)
sanad_logger.print_and_log(f"🔁 Reconnecting in {sleep_s:.1f}s...", message_type="warning")
await asyncio.sleep(max(0.1, sleep_s))
backoff = min(max_backoff, backoff * 1.7)
def _manual_lean_runtime_enabled(startup_mode: str) -> bool:
if startup_mode != "manual":
return False
raw = os.environ.get("MANUAL_LEAN_RUNTIME")
if raw is None or raw == "":
return False
return str(raw).strip().lower() in ("1", "true", "yes", "on", "y")
async def _run_autonomous_mode_supervisor(hub, replay, voice, restart_delay_sec: float, poll_sec: float = 0.5):
task = None
last_should_run = None
try:
while True:
should_run = _read_runtime_mode_safe() == "ai"
if should_run != last_should_run:
if should_run:
sanad_logger.print_and_log("🤖 AI mode active: starting autonomous manager.", "info")
else:
sanad_logger.print_and_log("🤖 Manual mode active: autonomous manager paused.", "info")
last_should_run = should_run
if should_run:
if task is None or task.done():
task = asyncio.create_task(
_run_autonomous_forever(hub=hub, replay=replay, voice=voice, restart_delay_sec=restart_delay_sec)
)
elif task is not None:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
task = None
await asyncio.sleep(max(0.2, float(poll_sec)))
finally:
if task is not None:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
async def main():
startup_mode = _read_runtime_mode_safe()
autonomous_enabled = bool(int(os.environ.get("AUTONOMOUS_ENABLE", "0")))
manual_lean_runtime = _manual_lean_runtime_enabled(startup_mode)
hub = None
replay = None
dashboard_capture_func = None
dashboard_replay_test_func = None
if manual_lean_runtime:
sanad_logger.print_and_log(
"🪶 Manual lean runtime active: Gemini + dashboard only. DDS, replay, uploader, and teleimager-dependent startup are skipped.",
"info",
)
def _manual_capture_unavailable() -> str:
return "[ERR] Capture unavailable in manual lean runtime. Restart in AI mode for camera/replay services."
dashboard_capture_func = _manual_capture_unavailable
else:
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_
from Modes.Manual.controller import auto_pick_iface, LowStateHub
from Modes.Manual.replay_engine import ReplayWithHome
from Server.capture_service import capture_with_replay_sync, ensure_replay_integrity
iface = auto_pick_iface()
sanad_logger.print_and_log(f"🌐 DDS iface: {iface}", "info")
ChannelFactoryInitialize(0, iface)
hub = LowStateHub(watchdog_timeout=0.25)
sub = ChannelSubscriber("rt/lowstate", LowState_)
sub.Init(hub.handler, 10)
replay = ReplayWithHome(hub, watchdog_disable_after=1.0)
startup_replay = config.read_selected_replay_path()
fallback_replay = config.DATA_DIR / "photo_G3.jsonl"
selected_replay, replay_report = ensure_replay_integrity(startup_replay, fallback_replay)
if not replay_report.get("active", {}).get("ok", False):
sanad_logger.print_and_log(
f"⚠️ Active replay invalid: {replay_report.get('active', {}).get('error', 'unknown')}",
"warning",
)
elif not replay_report.get("active", {}).get("trigger_ok", False):
sanad_logger.print_and_log(
"⚠️ Active replay has no trigger markers. Timed fallback capture will be used.",
"warning",
)
if replay_report.get("fallback_used"):
sanad_logger.print_and_log(
f"⚠️ Using fallback replay: {selected_replay.name}",
"warning",
)
if replay_report.get("degraded_no_trigger"):
sanad_logger.print_and_log(
"⚠️ No replay with trigger markers found. Running in degraded triggerless mode.",
"warning",
)
config.REPLAY_FILE = selected_replay
try:
rel_name = str(selected_replay.resolve().relative_to(config.DATA_DIR)).replace("\\", "/")
config.write_selected_replay_name(rel_name)
except Exception:
pass
def _capture_from_dashboard() -> str:
base_prefix = os.environ.get("PHOTO_PREFIX", "photo").strip() or "photo"
delay = max(0.0, min(PHOTO_DELAY_SEC, PHOTO_TOTAL_SEC))
return capture_with_replay_sync(
replay_runner=replay,
replay_file=config.REPLAY_FILE,
home_file=config.HOME_FILE,
delay_sec=delay,
prefix=base_prefix,
speed=1.0,
)
dashboard_capture_func = _capture_from_dashboard
def _test_replay_from_dashboard(replay_name: str) -> str:
try:
candidate = config.resolve_replay_path(replay_name)
except Exception as e:
return f"[ERR] invalid replay path: {e}"
if not candidate.exists():
return "[ERR] replay not found"
if replay.is_playing:
return "[ERR] replay already in progress"
replay.run(candidate, config.HOME_FILE, speed=1.0)
return str(candidate)
dashboard_replay_test_func = _test_replay_from_dashboard
url = start_photo_server(
PHOTOS_DIR,
port=PHOTO_SERVER_PORT,
capture_func=dashboard_capture_func,
replay_test_func=dashboard_replay_test_func,
)
sanad_logger.print_and_log(f"🖼️ Phone gallery: {url}", message_type="info")
if not manual_lean_runtime:
from Server.uploader import start_uploader
try:
start_uploader(PHOTOS_DIR)
except Exception as e:
sanad_logger.print_and_log(f"Failed to start uploader: {e}", message_type="warning")
voice = HamadGeminiVoice(
photo_phrases_file=str(config.PHOTO_PHRASES_FILE),
)
if startup_mode == "ai" or autonomous_enabled:
voice.calibrate_mic()
else:
sanad_logger.print_and_log("🎙️ Mic calibration skipped at startup (manual mode).", "info")
sanad_logger.print_and_log(" Manual mode: mic follows dashboard toggle, but AI photo commands and AI detection stay disabled until AI mode.", "info")
restart_delay = float(config.read_watchdog_component_restart_delay_sec())
tasks = [
asyncio.create_task(_run_component_forever("capture_mic", lambda: voice.capture_mic(), restart_delay)),
asyncio.create_task(_run_component_forever("receive_audio", lambda: voice.receive_audio(), restart_delay)),
asyncio.create_task(_run_component_forever("play_audio", lambda: voice.play_audio(), restart_delay)),
asyncio.create_task(_run_component_forever("keepalive", lambda: voice.keepalive(every_sec=20.0), restart_delay)),
asyncio.create_task(_run_component_forever("runtime_health", lambda: _write_runtime_health_loop(voice, 1.0), restart_delay)),
asyncio.create_task(
_run_ws_supervisor(
voice,
autonomous_enabled=autonomous_enabled,
dashboard_url=url,
manual_lean_runtime=manual_lean_runtime,
)
),
asyncio.create_task(_run_mode_policy_sync(voice, autonomous_enabled=autonomous_enabled)),
]
if not manual_lean_runtime:
from Modes.Manual.trigger_loop import trigger_loop
tasks.append(
asyncio.create_task(
_run_component_forever(
"trigger_loop",
lambda: trigger_loop(hub=hub, replay=replay, voice=voice, take_photo_sync_callable=None, ws=None),
restart_delay,
)
)
)
if autonomous_enabled and not manual_lean_runtime:
tasks.append(
asyncio.create_task(
_run_autonomous_mode_supervisor(
hub=hub,
replay=replay,
voice=voice,
restart_delay_sec=restart_delay,
)
)
)
sanad_logger.print_and_log("🤖 Autonomous services armed (AUTONOMOUS_ENABLE=1). AI mode can start without restart.", "info")
try:
await asyncio.gather(*tasks)
finally:
for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
sanad_logger.print_and_log("\n👋 Ma'a Salama (Goodbye)!", message_type="info")
except Exception as e:
sanad_logger.print_and_log(f"\n❌ Fatal Error: {e}", message_type="error")