AI_Photographer/Gemini/gemini_voice.py
2026-04-12 18:52:37 +04:00

824 lines
33 KiB
Python

# gemini_voice.py
import asyncio
import base64
import json
import time
import array
import audioop
import inspect
import os
import re
import wave
from pathlib import Path
import pyaudio
import websockets
from Core import settings as config
from Core import audio_prompts
from Gemini.sanad_text_utils import load_phrase_map, _norm_ar
from Core.Logger import Logs
FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 512
class HamadGeminiVoice:
def __init__(
self,
photo_phrases_file: str = "photo_command_ai.txt",
):
self.sanad_logger = Logs()
self.sanad_logger.LogEngine("G1_Logs", "gemini_voice")
self.audio_q = asyncio.Queue()
self.speaking = False
self.interrupted = False
self.pya = pyaudio.PyAudio()
self.mic_frames_per_buffer = CHUNK_SIZE
self.speaker_frames_per_buffer = CHUNK_SIZE
self.MIN_THRESHOLD = 3000
self.barge_in_threshold = 3000
self.REQUIRED_LOUD_CHUNKS = 5
self.PREBUFFER_CHUNKS = max(1, int(os.environ.get("GEMINI_PREBUFFER_CHUNKS", "4")))
self.PLAYBACK_TIMEOUT = max(0.1, float(os.environ.get("GEMINI_PLAYBACK_TIMEOUT", "0.35")))
self.BARGE_IN_COOLDOWN = 0.7
self.AI_SPEAK_GRACE = 0.25
self._last_ai_audio_time = 0.0
self._ai_speaking_since = 0.0
self._barge_in_block_until = 0.0
self.ECHO_GUARD_SEC = 0.8
self._ignore_input_until = 0.0
self.SEND_SILENCE_WHEN_SPEAKING = True
self.SPEAKING_ENERGY_GATE = 0.85
self._silence_pcm = b"\x00" * (CHUNK_SIZE * 2)
self._photo_phrases_file = photo_phrases_file
self._photo_command_map = None
self.audio_gate_open = bool(int(os.environ.get("AUDIO_GATE_OPEN_ON_START", "1")))
self.passive_listen_only = False
self._cmd_last_ts = {}
self.command_cooldown_sec = float(os.environ.get("VOICE_COMMAND_COOLDOWN_SEC", "1.2"))
self.device_restart_delay_sec = float(os.environ.get("AUDIO_DEVICE_RESTART_DELAY_SEC", "1.0"))
self.require_final_command = bool(int(os.environ.get("VOICE_CMDS_REQUIRE_FINAL", "1")))
self.context_silent_default = bool(config.read_vision_gemini_context_silent())
self.context_suppress_window_sec = float(os.environ.get("GEMINI_CONTEXT_SUPPRESS_WINDOW_SEC", "1.2"))
self.prompt_audio_allow_sec = float(os.environ.get("GEMINI_PROMPT_AUDIO_ALLOW_SEC", "15.0"))
self.user_turn_audio_allow_sec = float(os.environ.get("GEMINI_USER_TURN_ALLOW_SEC", "8.0"))
self._vision_context_seq = 0
self._context_suppress_until = 0.0
self._prompt_audio_allow_until = 0.0
self._last_user_transcript_ts = 0.0
self._last_context_drop_log_ts = 0.0
self._last_audio_drop_log_ts = 0.0
self._last_transcript_log_text = ""
self._last_transcript_log_ts = 0.0
self._speaker_queue_drop_count = 0
self._ws = None
self._ws_seq = 0
self._ws_event = asyncio.Event()
self._prompt_play_lock = asyncio.Lock()
self._health = {
"ws_connected": False,
"ws_state": "detached",
"ws_restarts": 0,
"ws_last_error": "",
"ws_last_event_ts": 0.0,
"ws_last_reason": "",
"mic_state": "idle",
"mic_restarts": 0,
"mic_last_error": "",
"mic_last_event_ts": 0.0,
"speaker_state": "idle",
"speaker_restarts": 0,
"speaker_last_error": "",
"speaker_last_event_ts": 0.0,
"speaker_queue_drops": 0,
"speaker_queue_max": 0,
}
self._component_restart_delay_sec = float(config.read_watchdog_component_restart_delay_sec())
self._log_audio_device_summary()
def _log_user_transcript(self, text: str, is_final: bool, mode: str):
txt = str(text or "").strip()
if not txt:
return
now = time.time()
if is_final and txt == self._last_transcript_log_text and (now - self._last_transcript_log_ts) < 1.0:
return
self._last_transcript_log_text = txt
self._last_transcript_log_ts = now
state = "final" if is_final else "partial"
self.sanad_logger.print_and_log(
f"🎤 Heard ({state}, mode={mode}): {txt}",
message_type="info",
)
def _log_audio_device_summary(self):
try:
in_info = self.pya.get_default_input_device_info()
except Exception:
in_info = None
try:
out_info = self.pya.get_default_output_device_info()
except Exception:
out_info = None
def _fmt(info: dict | None) -> str:
if not info:
return "unavailable"
name = str(info.get("name", "unknown"))
idx = info.get("index", "?")
rate = info.get("defaultSampleRate", "?")
return f"{name} (index={idx}, rate={rate})"
self.sanad_logger.print_and_log(f"🎤 PortAudio input: {_fmt(in_info)}", message_type="info")
self.sanad_logger.print_and_log(f"🔈 PortAudio output: {_fmt(out_info)}", message_type="info")
def set_audio_gate(self, enabled: bool, reason: str = ""):
self.audio_gate_open = bool(enabled)
state = "OPEN" if self.audio_gate_open else "CLOSED"
suffix = f" ({reason})" if reason else ""
self.sanad_logger.print_and_log(f"🎧 Audio gate {state}{suffix}", message_type="info")
def set_passive_listen(self, enabled: bool, reason: str = ""):
self.passive_listen_only = bool(enabled)
state = "ON" if self.passive_listen_only else "OFF"
suffix = f" ({reason})" if reason else ""
self.sanad_logger.print_and_log(f"🦻 Passive listen {state}{suffix}", message_type="info")
def attach_ws(self, ws):
if ws is None:
return
self._ws = ws
self._ws_seq += 1
self._ws_event.set()
self._health["ws_connected"] = True
self._health["ws_state"] = "connected"
self._health["ws_last_event_ts"] = time.time()
self._health["ws_last_reason"] = "attach"
if self._ws_seq > 1:
self._health["ws_restarts"] = int(self._health.get("ws_restarts", 0)) + 1
def detach_ws(self, reason: str = ""):
old_ws = self._ws
self._ws = None
self._ws_event.clear()
self._health["ws_connected"] = False
self._health["ws_state"] = "detached"
self._health["ws_last_event_ts"] = time.time()
self._health["ws_last_reason"] = str(reason or "")
if reason:
self._health["ws_last_error"] = str(reason)
try:
if old_ws is not None:
asyncio.create_task(old_ws.close())
except Exception:
pass
def is_ws_connected(self) -> bool:
ws = self._ws
return ws is not None
async def _wait_for_ws(self, timeout_sec: float | None = None):
ws = self._ws
if ws is not None:
return ws
if timeout_sec is None:
await self._ws_event.wait()
return self._ws
if timeout_sec <= 0:
return None
try:
await asyncio.wait_for(self._ws_event.wait(), timeout=timeout_sec)
except asyncio.TimeoutError:
return None
return self._ws
def get_runtime_health(self) -> dict:
out = dict(self._health)
out["mic_enabled"] = bool(config.read_gemini_mic_enabled())
out["audio_gate_open"] = bool(self.audio_gate_open)
out["passive_listen_only"] = bool(self.passive_listen_only)
out["speaking"] = bool(self.speaking)
out["interrupted"] = bool(self.interrupted)
out["speaker_queue_size"] = int(self.audio_q.qsize())
out["time"] = time.time()
return out
def _queue_model_audio(self, audio_bytes: bytes):
if not audio_bytes:
return
self.audio_q.put_nowait(audio_bytes)
def _load_prompt_pcm(self, prompt_path: Path) -> tuple[bytes, float]:
with wave.open(str(prompt_path), "rb") as wav_file:
channels = int(wav_file.getnchannels())
sample_width = int(wav_file.getsampwidth())
frame_rate = int(wav_file.getframerate())
raw = wav_file.readframes(wav_file.getnframes())
if channels < 1 or channels > 2:
raise ValueError(f"unsupported WAV channel count: {channels}")
if sample_width != 2:
raw = audioop.lin2lin(raw, sample_width, 2)
sample_width = 2
if channels == 2:
raw = audioop.tomono(raw, sample_width, 0.5, 0.5)
channels = 1
if frame_rate != RECEIVE_SAMPLE_RATE:
raw, _ = audioop.ratecv(raw, sample_width, channels, frame_rate, RECEIVE_SAMPLE_RATE, None)
duration_sec = len(raw) / float(sample_width * channels * RECEIVE_SAMPLE_RATE) if raw else 0.0
return raw, max(0.0, duration_sec)
async def _play_local_prompt_file(self, prompt_path: Path) -> bool:
async with self._prompt_play_lock:
try:
pcm, duration_sec = await asyncio.to_thread(self._load_prompt_pcm, prompt_path)
except Exception as e:
self.sanad_logger.print_and_log(
f"⚠️ Prompt audio failed for {prompt_path.name}: {e}",
message_type="warning",
)
return False
if not pcm:
return False
now = time.time()
self.interrupted = False
self.speaking = True
self._ai_speaking_since = now
self._last_ai_audio_time = now
self._ignore_input_until = now + duration_sec + self.ECHO_GUARD_SEC
self._prompt_audio_allow_until = max(self._prompt_audio_allow_until, now + duration_sec + 1.0)
self.sanad_logger.print_and_log(
f"🔊 Prompt audio: {prompt_path.name}",
message_type="info",
)
chunk_bytes = max(self.speaker_frames_per_buffer * 2, 4096)
for i in range(0, len(pcm), chunk_bytes):
self._queue_model_audio(pcm[i:i + chunk_bytes])
deadline = time.time() + max(2.0, duration_sec + 2.0)
while time.time() < deadline:
if self.audio_q.empty() and not self.speaking:
break
await asyncio.sleep(0.05)
return True
async def play_prompt_key(
self,
key: str,
fallback_text: str = "",
allow_gemini_fallback: bool | None = None,
wait_timeout_sec: float = 0.0,
mode_override: str | None = None,
) -> bool:
prompt_mode = str(mode_override or config.read_audio_prompt_mode()).strip().lower()
if prompt_mode not in ("audio", "gemini"):
prompt_mode = "audio"
allow_fallback = (
config.read_audio_prompts_fallback_to_gemini()
if allow_gemini_fallback is None
else bool(allow_gemini_fallback)
)
try:
prompt_path = audio_prompts.prompt_path(key)
except Exception as e:
self.sanad_logger.print_and_log(f"⚠️ Unknown prompt key {key}: {e}", message_type="warning")
prompt_path = None
if prompt_mode == "gemini" and fallback_text:
return await self.send_text_prompt_live(fallback_text, wait_timeout_sec=wait_timeout_sec)
if prompt_path is not None and prompt_path.exists():
played = await self._play_local_prompt_file(prompt_path)
if played:
return True
if fallback_text and allow_fallback:
return await self.send_text_prompt_live(fallback_text, wait_timeout_sec=wait_timeout_sec)
return False
async def trigger_wake_sequence(self, ws=None, wake_text: str | None = None, prompt_key: str | None = None):
"""
Open mic gate and inject a context prompt so Gemini speaks first.
"""
self.set_audio_gate(True, reason="interaction triggered")
prompt = (
wake_text
or "A tourist has just approached you. Warmly greet them and ask if they would like to take a photo."
)
if prompt_key:
played = await self.play_prompt_key(prompt_key, fallback_text=prompt)
if played:
return
if ws is not None:
await self.send_text_prompt(ws, prompt)
else:
await self.send_text_prompt_live(prompt)
def _should_suppress_model_audio(self, now_ts: float) -> bool:
if self.passive_listen_only:
return True
if now_ts <= self._prompt_audio_allow_until:
return False
if (now_ts - self._last_user_transcript_ts) <= self.user_turn_audio_allow_sec:
return False
return now_ts <= self._context_suppress_until
def _command_allowed_now(self, cmd: str) -> bool:
now = time.time()
last = float(self._cmd_last_ts.get(cmd, 0.0))
if (now - last) < self.command_cooldown_sec:
return False
self._cmd_last_ts[cmd] = now
return True
def _write_flag(self, flag_name: str, label: str):
try:
p = config.SCRIPTS_DIR / flag_name
p.write_text(str(time.time()))
self.sanad_logger.print_and_log(f"📩 {label} flag written", message_type="info")
except Exception as e:
self.sanad_logger.print_and_log(f"❌ Failed to write {label} flag: {e}", message_type="error")
def _mode_value(self) -> str:
try:
mode = str(config.read_runtime_mode()).strip().lower()
except Exception:
mode = "manual"
if mode not in ("manual", "ai"):
mode = "manual"
return mode
def _match_in_mapping(self, text: str, mapping: dict | None):
norm = _norm_ar(str(text or ""))
if not norm or not mapping:
return None
# Exact phrase first.
cmd = mapping.get(norm)
if cmd:
return cmd
norm_padded = f" {norm} "
norm_nospace = norm.replace(" ", "")
# Longest aliases first to avoid matching short words prematurely.
for phrase, mapped in sorted(mapping.items(), key=lambda kv: len(kv[0]), reverse=True):
p = str(phrase or "").strip()
if not p:
continue
if f" {p} " in norm_padded:
return mapped
# Arabic/compact forms fallback.
p_nospace = p.replace(" ", "")
if p_nospace and p_nospace in norm_nospace:
return mapped
try:
if " " in p and re.search(rf"\b{re.escape(p)}\b", norm):
return mapped
except Exception:
pass
return None
def _match_photo_command_from_text(self, text: str):
if self._photo_command_map is None:
self._photo_command_map = load_phrase_map(self._photo_phrases_file)
return self._match_in_mapping(text, self._photo_command_map)
def _extract_user_transcripts(self, response: dict, server_content: dict):
out = []
candidates = [
response.get("inputAudioTranscription"),
response.get("inputTranscription"),
server_content.get("inputAudioTranscription"),
server_content.get("inputTranscription"),
]
for item in candidates:
if item is None:
continue
if isinstance(item, str):
out.append((item, True))
continue
if isinstance(item, dict):
txt = item.get("text") or item.get("transcript") or item.get("content")
is_final = item.get("isFinal")
if is_final is None:
is_final = item.get("final")
if is_final is None:
is_final = item.get("done")
if txt:
out.append((str(txt), True if is_final is None else bool(is_final)))
parts = item.get("parts")
if isinstance(parts, list):
for p in parts:
if isinstance(p, dict):
t = p.get("text")
if t:
out.append((str(t), True))
elif isinstance(item, list):
for it in item:
if isinstance(it, str):
out.append((it, True))
elif isinstance(it, dict):
t = it.get("text") or it.get("transcript")
if t:
out.append((str(t), bool(it.get("isFinal", True))))
return out
async def _handle_user_transcript(self, text: str, is_final: bool):
now = time.time()
self._last_user_transcript_ts = now
self._prompt_audio_allow_until = max(self._prompt_audio_allow_until, now + self.user_turn_audio_allow_sec)
mode = self._mode_value()
self._log_user_transcript(text, is_final=bool(is_final), mode=mode)
if self.require_final_command and (not is_final):
return
if mode == "manual":
return
photo_cmd = self._match_photo_command_from_text(text)
if photo_cmd:
self.sanad_logger.print_and_log(
f"🧠 Command match: {photo_cmd}",
message_type="info",
)
if not self._command_allowed_now(photo_cmd):
return
if photo_cmd == "no_photo":
self._write_flag("confirm_no.flag", "No photo")
else:
self._write_flag(
"confirm_yes.flag",
"Yes photo" if photo_cmd == "yes_photo" else "Photo request",
)
if photo_cmd == "request_photo":
self._write_flag("request_photo.flag", "Photo request")
return
def audio_energy(self, pcm):
try:
samples = array.array("h", pcm)
if not samples:
return 0
return sum(abs(s) for s in samples) // len(samples)
except Exception:
return 0
def calibrate_mic(self):
self.sanad_logger.print_and_log("\n🤫 Calibrating Microphone... (Please remain silent)", message_type="info")
try:
stream = self.pya.open(
format=FORMAT,
channels=CHANNELS,
rate=SEND_SAMPLE_RATE,
input=True,
frames_per_buffer=self.mic_frames_per_buffer,
)
values = []
for _ in range(40):
data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
values.append(self.audio_energy(data))
stream.stop_stream()
stream.close()
avg_noise = sum(values) / len(values)
self.barge_in_threshold = max(self.MIN_THRESHOLD, avg_noise * 3.0)
self.sanad_logger.print_and_log(f"✅ Baseline Noise: {avg_noise:.1f}", message_type="info")
self.sanad_logger.print_and_log(f"✅ Interruption Threshold: {self.barge_in_threshold:.1f}", message_type="info")
except Exception as e:
self.sanad_logger.print_and_log(f"⚠️ Calibration failed: {e}. Using default threshold.", message_type="warning")
def _ws_connect_kwargs(self):
kwargs = {"max_size": None}
try:
sig = inspect.signature(websockets.connect)
if "extra_headers" in sig.parameters:
kwargs["extra_headers"] = {"Content-Type": "application/json"}
else:
kwargs["additional_headers"] = {"Content-Type": "application/json"}
except Exception:
kwargs["extra_headers"] = {"Content-Type": "application/json"}
return kwargs
async def send_text_prompt(self, ws, text: str, allow_speech: bool = True):
msg = {
"clientContent": {
"turns": [{"role": "user", "parts": [{"text": text}]}],
"turnComplete": True
}
}
await ws.send(json.dumps(msg))
if allow_speech:
self._prompt_audio_allow_until = max(self._prompt_audio_allow_until, time.time() + self.prompt_audio_allow_sec)
async def send_text_prompt_live(self, text: str, allow_speech: bool = True, wait_timeout_sec: float = 0.0) -> bool:
ws = await self._wait_for_ws(timeout_sec=wait_timeout_sec)
if ws is None:
return False
try:
await self.send_text_prompt(ws, text, allow_speech=allow_speech)
return True
except Exception as e:
self.detach_ws(reason=f"send_text_prompt_live failed: {e}")
return False
async def send_vision_context(self, ws, context: dict, silent: bool | None = None):
if silent is None:
silent = bool(self.context_silent_default)
self._vision_context_seq += 1
seq = self._vision_context_seq
payload = dict(context or {})
payload["context_seq"] = seq
payload["context_type"] = "vision"
if silent:
text = (
"SILENT_VISION_CONTEXT. Internal grounding update only; do not speak and do not produce audio. "
f"Use this context silently. Payload JSON: {json.dumps(payload, ensure_ascii=False)}"
)
else:
text = f"VISION_CONTEXT_UPDATE: {json.dumps(payload, ensure_ascii=False)}"
await self.send_text_prompt(ws, text, allow_speech=not silent)
if silent:
self._context_suppress_until = max(
self._context_suppress_until,
time.time() + max(0.2, self.context_suppress_window_sec),
)
async def send_vision_context_live(self, context: dict, silent: bool | None = None, wait_timeout_sec: float = 0.0) -> bool:
ws = await self._wait_for_ws(timeout_sec=wait_timeout_sec)
if ws is None:
return False
try:
await self.send_vision_context(ws, context=context, silent=silent)
return True
except Exception as e:
self.detach_ws(reason=f"send_vision_context_live failed: {e}")
return False
async def keepalive(self, ws=None, every_sec: float = 20.0):
"""
Prevent idle disconnects: periodically send a websocket ping.
"""
while True:
try:
await asyncio.sleep(every_sec)
active_ws = self._ws
if active_ws is None:
continue
await active_ws.ping()
except asyncio.CancelledError:
raise
except Exception as e:
self.detach_ws(reason=f"keepalive ping failed: {e}")
await asyncio.sleep(self._component_restart_delay_sec)
async def capture_mic(self, ws=None):
loud_chunks = 0
while True:
stream = None
try:
while not self.audio_gate_open:
self._health["mic_state"] = "idle"
self._health["mic_last_event_ts"] = time.time()
await asyncio.sleep(0.1)
self._health["mic_state"] = "starting"
self._health["mic_last_event_ts"] = time.time()
stream = await asyncio.to_thread(
self.pya.open,
format=FORMAT,
channels=CHANNELS,
rate=SEND_SAMPLE_RATE,
input=True,
frames_per_buffer=self.mic_frames_per_buffer,
)
self._health["mic_state"] = "running"
self._health["mic_last_event_ts"] = time.time()
while True:
data = await asyncio.to_thread(stream.read, CHUNK_SIZE, exception_on_overflow=False)
energy = self.audio_energy(data)
now = time.time()
if not self.audio_gate_open:
continue
if self.speaking and (now >= self._barge_in_block_until):
if (now - self._ai_speaking_since) >= self.AI_SPEAK_GRACE:
loud_chunks = loud_chunks + 1 if energy > self.barge_in_threshold else 0
if loud_chunks > self.REQUIRED_LOUD_CHUNKS:
self.sanad_logger.print_and_log(f"🛑 Interruption! (Energy: {energy})", message_type="warning")
self.interrupted = True
self.speaking = False
loud_chunks = 0
self._barge_in_block_until = now + self.BARGE_IN_COOLDOWN
while not self.audio_q.empty():
try:
self.audio_q.get_nowait()
except asyncio.QueueEmpty:
break
data_to_send = data
if self.SEND_SILENCE_WHEN_SPEAKING and self.speaking:
gate = self.barge_in_threshold * self.SPEAKING_ENERGY_GATE
if energy < gate:
data_to_send = self._silence_pcm
b64_audio = base64.b64encode(data_to_send).decode("utf-8")
msg = {
"realtime_input": {
"media_chunks": [{
"data": b64_audio,
"mime_type": f"audio/pcm;rate={SEND_SAMPLE_RATE}",
}]
}
}
active_ws = self._ws
if active_ws is None:
continue
try:
await active_ws.send(json.dumps(msg))
except websockets.exceptions.ConnectionClosed as e:
self.detach_ws(reason=f"mic send ws closed: {e}")
await asyncio.sleep(self._component_restart_delay_sec)
except Exception as e:
self.detach_ws(reason=f"mic send ws error: {e}")
await asyncio.sleep(self._component_restart_delay_sec)
except asyncio.CancelledError:
raise
except Exception as e:
self.sanad_logger.print_and_log(f"❌ Mic Error: {e} (restarting mic)", message_type="warning")
self._health["mic_state"] = "error"
self._health["mic_restarts"] = int(self._health.get("mic_restarts", 0)) + 1
self._health["mic_last_error"] = str(e)
self._health["mic_last_event_ts"] = time.time()
await asyncio.sleep(max(self.device_restart_delay_sec, self._component_restart_delay_sec))
finally:
if stream is not None:
try:
await asyncio.to_thread(stream.stop_stream)
except Exception:
pass
try:
await asyncio.to_thread(stream.close)
except Exception:
pass
if self._health.get("mic_state") != "error":
self._health["mic_state"] = "restarting"
self._health["mic_last_event_ts"] = time.time()
async def receive_audio(self, ws=None):
while True:
active_ws = self._ws
if active_ws is None:
await asyncio.sleep(0.05)
continue
try:
msg = await active_ws.recv()
except asyncio.CancelledError:
raise
except websockets.exceptions.ConnectionClosed as e:
self.detach_ws(reason=f"receive ws closed: {e}")
await asyncio.sleep(self._component_restart_delay_sec)
continue
except Exception as e:
self.detach_ws(reason=f"receive ws error: {e}")
await asyncio.sleep(self._component_restart_delay_sec)
continue
try:
response = json.loads(msg)
server_content = response.get("serverContent", {})
if server_content.get("interrupted"):
self.interrupted = False
if self.interrupted:
continue
# Safety: map commands from USER transcription events (not model text).
try:
transcripts = self._extract_user_transcripts(response, server_content)
for txt, is_final in transcripts:
await self._handle_user_transcript(txt, is_final=bool(is_final))
except Exception as e:
self.sanad_logger.print_and_log(f"⚠️ Transcript parse warning: {e}", message_type="warning")
model_turn = server_content.get("modelTurn")
if model_turn:
for part in model_turn.get("parts", []):
inline_data = part.get("inlineData")
if inline_data and inline_data.get("data"):
now = time.time()
if self._should_suppress_model_audio(now):
if (now - self._last_context_drop_log_ts) >= 5.0:
self._last_context_drop_log_ts = now
self.sanad_logger.print_and_log(
"🤫 Suppressed unsolicited model audio from context-only turn.",
message_type="info",
)
continue
if not self.speaking:
self._ai_speaking_since = now
self.sanad_logger.print_and_log("🔊 Gemini audio response started.", message_type="info")
self.speaking = True
self._last_ai_audio_time = now
self._ignore_input_until = now + self.ECHO_GUARD_SEC
audio_bytes = base64.b64decode(inline_data["data"])
self._queue_model_audio(audio_bytes)
except Exception as e:
self.sanad_logger.print_and_log(f"❌ Parse Error: {e}", message_type="error")
async def play_audio(self):
while True:
stream = None
buffered = False
try:
self._health["speaker_state"] = "starting"
self._health["speaker_last_event_ts"] = time.time()
stream = await asyncio.to_thread(
self.pya.open,
format=FORMAT,
channels=CHANNELS,
rate=RECEIVE_SAMPLE_RATE,
output=True,
frames_per_buffer=self.speaker_frames_per_buffer,
)
self._health["speaker_state"] = "running"
self._health["speaker_last_event_ts"] = time.time()
while True:
if self.interrupted:
await asyncio.sleep(0.01)
continue
if self.speaking and not buffered:
while self.audio_q.qsize() < self.PREBUFFER_CHUNKS and self.speaking and not self.interrupted:
await asyncio.sleep(0.01)
buffered = True
try:
data = await asyncio.wait_for(self.audio_q.get(), timeout=self.PLAYBACK_TIMEOUT)
except asyncio.TimeoutError:
if self.audio_q.empty() and (time.time() - self._last_ai_audio_time) > 0.25:
self.speaking = False
buffered = False
continue
if data:
await asyncio.to_thread(stream.write, data)
if self.audio_q.empty() and (time.time() - self._last_ai_audio_time) > 0.25:
self.speaking = False
buffered = False
except asyncio.CancelledError:
raise
except Exception as e:
self.sanad_logger.print_and_log(f"❌ Speaker Error: {e} (restarting speaker)", message_type="warning")
self._health["speaker_state"] = "error"
self._health["speaker_restarts"] = int(self._health.get("speaker_restarts", 0)) + 1
self._health["speaker_last_error"] = str(e)
self._health["speaker_last_event_ts"] = time.time()
await asyncio.sleep(max(self.device_restart_delay_sec, self._component_restart_delay_sec))
finally:
if stream is not None:
try:
await asyncio.to_thread(stream.stop_stream)
except Exception:
pass
try:
await asyncio.to_thread(stream.close)
except Exception:
pass
if self._health.get("speaker_state") != "error":
self._health["speaker_state"] = "restarting"
self._health["speaker_last_event_ts"] = time.time()