Sanadv3/gemini/script.py

1717 lines
81 KiB
Python

"""Gemini brain — live conversation loop using the google-genai SDK.
Implements the VoiceBrain contract documented in `voice/model_script.py`:
__init__(audio_io, recorder, voice_name, system_prompt)
async run()
stop()
Owns everything Gemini-specific: the `genai.Client`, `LiveConnectConfig`,
the session connect/receive loop, VAD-based barge-in, echo suppression,
reconnect backoff. Hardware I/O is delegated to `audio_io` and per-turn
WAV capture to `recorder` — both are model-agnostic.
Env overrides:
SANAD_GEMINI_MODEL — Gemini Live model id (without "models/" prefix)
"""
from __future__ import annotations
import array
import asyncio
import base64
import json
import os
import sys
import threading
import time
from pathlib import Path
from typing import Any, Optional
import numpy as np
from google import genai
from google.genai import types
from Project.Sanad.config import (
BASE_DIR,
CHUNK_SIZE,
GEMINI_API_KEY,
GEMINI_VOICE,
RECEIVE_SAMPLE_RATE,
SEND_SAMPLE_RATE,
)
from Project.Sanad.core.config_loader import section as _cfg_section
from Project.Sanad.core.logger import get_logger
from Project.Sanad.vision import recognition_state as _recog_state
log = get_logger("gemini_brain")
_SV = _cfg_section("voice", "sanad_voice")
_VAD = _cfg_section("voice", "vad")
_BI = _cfg_section("voice", "barge_in")
_MODEL = os.environ.get(
"SANAD_GEMINI_MODEL",
"gemini-2.5-flash-native-audio-preview-12-2025",
)
_MIC_GAIN = _SV.get("mic_gain", 1.0)
_SESSION_TIMEOUT = _SV.get("session_timeout_sec", 660)
_MAX_RECONNECT_DELAY = _SV.get("max_reconnect_delay_sec", 30)
_MAX_CONSECUTIVE_ERRORS = _SV.get("max_consecutive_errors", 10)
_NO_MESSAGES_TIMEOUT = _SV.get("no_messages_timeout_sec", 30)
# Extra mic-gate time after the AI stops, on loud external-speaker profiles
# (JBL) — covers the speaker buffer + room reverb so it doesn't hear its tail.
_ECHO_TAIL_SEC = _SV.get("echo_tail_sec", 0.6)
# On a loud external speaker (JBL) barge-in must clear the measured speaker
# BLEED by this factor — so the user's own voice cuts the AI but the speaker's
# echo into the mic does not. Lower = easier to interrupt (more false cuts).
_JBL_BLEED_MARGIN = _SV.get("jbl_bleed_margin", 3.0)
# Don't allow barge-in for this long after the AI starts on the JBL — gives the
# bleed estimate time to ramp so an early loud bleed frame can't false-trigger.
_JBL_BARGE_GRACE = _SV.get("jbl_barge_grace_sec", 1.0)
# Sustained loud-frame count required to barge-in on the JBL (vs the shorter
# default). Brief speaker-echo peaks won't reach it; continuous speech will.
_JBL_BARGE_CHUNKS = _SV.get("jbl_barge_chunks", 9)
# Time since the AI last pushed audio after which the speaker bleed is assumed
# FADED (a gap between words/numbers). In that window barge-in drops to a low,
# sensitive bar so the user can interrupt; while audio is flowing it stays high.
_JBL_BLEED_FADE_SEC = _SV.get("jbl_bleed_fade_sec", 0.5)
_CHUNK_BYTES = CHUNK_SIZE * 2
_SILENCE_PCM = b"\x00" * _CHUNK_BYTES
# Set by a "pause:1" stdin command (a record is playing on the dashboard).
# While set, the brain feeds Gemini silence (so it neither hears the playback
# nor keeps replying) and drops its own audio output (so the record owns the
# chest speaker). Cleared by "pause:0" when playback ends. threading.Event is
# safe to read from the asyncio loops.
_INPUT_PAUSED = threading.Event()
# ── Recognition (camera + face gallery) tunables ──
_RECOG_STATE_PATH = Path(os.environ.get(
"SANAD_RECOGNITION_STATE_PATH",
str(BASE_DIR / "data" / ".recognition_state.json"),
))
_VISION_SEND_HZ = float(os.environ.get("SANAD_VISION_SEND_HZ", "2"))
_VISION_STALE_MS = int(os.environ.get("SANAD_VISION_STALE_MS", "1500"))
_RECOG_POLL_S = float(os.environ.get("SANAD_RECOGNITION_POLL_S", "1.0"))
_FACES_DIR = Path(os.environ.get(
"SANAD_FACES_DIR",
str(BASE_DIR / "data" / "faces"),
))
_FACES_MAX_SAMPLES = int(os.environ.get("SANAD_FACES_MAX_SAMPLES", "3"))
_FACES_PRIMER_RESIZE = int(os.environ.get("SANAD_FACES_PRIMER_RESIZE", "256"))
# N3 — zones gallery (zone → place → linked faces). Folded into a Gemini
# primer turn so Gemini can recognise / talk about known locations and the
# people associated with them.
_ZONES_DIR = Path(os.environ.get(
"SANAD_ZONES_DIR",
str(BASE_DIR / "data" / "zones"),
))
# ── navigation tools (Gemini Live function-calling → Nav2) ────
# Gemini can DRIVE the robot to saved places via native function-calling.
# The handlers call the DASHBOARD HTTP API (not web_nav3 directly) so the
# in-process _arbiter (nav↔loco mutex) + single nav client stay authoritative
# — the Gemini brain runs as a SEPARATE subprocess and cannot touch the
# dashboard's in-memory arbiter, so it must go through HTTP.
try:
import requests as _requests
except Exception: # pragma: no cover - requests is in the gemini_sdk env
_requests = None
_NAV_TOOLS_ENABLED = os.environ.get("SANAD_NAV_TOOLS", "1") != "0"
_DASHBOARD_URL = os.environ.get(
"SANAD_DASHBOARD_URL", "http://127.0.0.1:8001"
).rstrip("/")
def _nav_api(method: str, path: str, body: Optional[dict] = None,
timeout: float = 12.0) -> dict:
"""Blocking call to the dashboard nav API. Always returns a dict; never
raises (run it via asyncio.to_thread so the receive loop stays responsive)."""
if _requests is None:
return {"ok": False, "reason": "no_http_client"}
url = _DASHBOARD_URL + path
try:
if method == "GET":
resp = _requests.get(url, timeout=timeout)
else:
resp = _requests.post(url, json=(body or {}), timeout=timeout)
except Exception as exc:
return {"ok": False, "reason": "unreachable", "detail": str(exc)[:200]}
try:
data = resp.json()
except Exception:
data = {"raw": (resp.text or "")[:200]}
if not resp.ok:
detail = data.get("detail") if isinstance(data, dict) else data
return {"ok": False, "reason": f"http_{resp.status_code}", "detail": detail}
return data if isinstance(data, dict) else {"ok": True, "data": data}
def _nav_function_declarations() -> list:
"""Gemini Live tool declarations for navigation (built lazily so the
google.genai types are resolved at call time)."""
S, T = types.Schema, types.Type
return [
types.FunctionDeclaration(
name="navigate_to_place",
description=(
"Drive the robot to a named saved place in the currently loaded "
"map (for example 'kitchen', 'reception', 'office'). Call this "
"ONLY when the user asks to go, move, walk, or be taken "
"somewhere. The place must exist in the active map — if you are "
"unsure of the name, call list_places first."
),
parameters=S(type=T.OBJECT, properties={
"place": S(type=T.STRING,
description="Destination place name, as the user said it."),
}, required=["place"]),
),
types.FunctionDeclaration(
name="list_places",
description=(
"List the saved places you can drive to in the currently loaded "
"map. Use it to tell the user where you can take them."
),
parameters=S(type=T.OBJECT, properties={}),
),
types.FunctionDeclaration(
name="where_am_i",
description=(
"Report navigation status: which map is loaded and whether the "
"robot is localized and ready to drive."
),
parameters=S(type=T.OBJECT, properties={}),
),
types.FunctionDeclaration(
name="stop_navigation",
description="Cancel the current navigation goal and stop the robot from driving.",
parameters=S(type=T.OBJECT, properties={}),
),
]
# Emotions Gemini can show on the LED face (a subset of the mask's frames that
# read as feelings — the talk/blink/gaze frames are driven automatically).
_FACE_EMOTIONS = ("smile", "laugh", "heart", "love", "sad", "surprised",
"wink", "angry", "cool", "confused", "kiss", "thumbs_up",
"neutral")
# Instagram accounts the mask can show as a QR (parent maps these to the code).
_SOCIAL_ACCOUNTS = ("bu_sunaidah", "yslootahtech")
# Appended to whatever base system prompt is passed in, so the expressive-face
# behaviour is always present regardless of the user-edited persona.
_FACE_PROMPT_ADDENDUM = (
"\n\nYou have an expressive LED face you control with tools. IMPORTANT: when "
"the user asks you to SHOW or MAKE a specific face/emotion (e.g. 'show me a "
"smile', 'give me a thumbs up', 'look surprised', 'make a heart'), ALWAYS "
"call set_expression with that emotion right away. Also use set_expression "
"naturally as you talk — smile when greeting or happy, laugh at something "
"funny, heart or love for affection or a compliment, thumbs_up to agree or "
"approve, surprised when astonished, confused when you didn't understand, "
"wink when joking, sad when empathizing, cool when playful. Available "
"emotions: smile, laugh, heart, love, thumbs_up, surprised, confused, wink, "
"kiss, cool, sad, angry, neutral. Your mouth already lip-syncs on its own, "
"so this is only the emotion. When the user asks how to follow you, for your "
"Instagram, or to see/show your social media, ALWAYS call show_social with "
"'bu_sunaidah' (@bu.sunaidah) or 'yslootahtech' (@yslootahtech) to display "
"the QR on your face. These tools are silent — never say the tool name, the "
"emotion, or any bracket marker out loud."
)
def _face_function_declarations() -> list:
"""Gemini Live tools for the expressive LED face + social QR (built lazily
so google.genai types resolve at call time)."""
S, T = types.Schema, types.Type
return [
types.FunctionDeclaration(
name="set_expression",
description=(
"Show an emotion on your LED face to react expressively while you "
"talk. Use it naturally and sparingly: smile when greeting or happy, "
"laugh at something funny, heart/love for affection or a compliment, "
"surprised when astonished, confused when you didn't understand, wink "
"when joking, sad when empathizing, cool when playful, sleepy when "
"tired, angry only rarely. Your mouth already lip-syncs on its own — "
"this is ONLY the emotion, not the mouth."
),
parameters=S(type=T.OBJECT, properties={
"emotion": S(type=T.STRING, enum=list(_FACE_EMOTIONS),
description="The emotion to show on the face."),
}, required=["emotion"]),
),
types.FunctionDeclaration(
name="show_social",
description=(
"Display a social-media QR code on your LED face so a visitor can "
"scan it and follow. Call this when the user asks how to follow you, "
"for your Instagram, or to share your social media. Choose the "
"account: 'bu_sunaidah' (@bu.sunaidah) or 'yslootahtech' "
"(@yslootahtech)."
),
parameters=S(type=T.OBJECT, properties={
"account": S(type=T.STRING, enum=list(_SOCIAL_ACCOUNTS),
description="Which Instagram account to show."),
}, required=["account"]),
),
]
# ── stdin push channel (Marcus pattern) ──────────────────────
# The GeminiSubprocess supervisor writes two line types to this process's
# stdin:
# "frame:<base64-jpeg>\n" — a camera frame to relay to Gemini Live
# "state:<json>\n" — a motion-state update to inject as text
# A daemon thread parses them into the caches below; the asyncio tasks
# _send_frame_loop / _send_state_loop drain those caches.
_LATEST_FRAME_LOCK = threading.Lock()
_LATEST_FRAME: dict = {"bytes": None, "ts": 0.0}
_STATE_LOCK = threading.Lock()
_STATE_PENDING: list[str] = []
_STATE_TAGS = {
"start": "[STATE-START]",
"complete": "[STATE-DONE]",
"interrupted": "[STATE-INTERRUPTED]",
"error": "[STATE-ERROR]",
"paused": "[STATE-PAUSED]",
"resumed": "[STATE-RESUMED]",
# Navigation arrival/failure — pushed by the dashboard goal monitor so
# Gemini can truthfully tell the user it arrived (or couldn't get there)
# instead of guessing from the fire-and-forget goto.
"nav_arrived": "[NAV ARRIVED]",
"nav_failed": "[NAV FAILED]",
"nav_canceled": "[NAV CANCELED]",
# "Gemini Nav" session greeting — the operator entered a zone bound to a
# map; tell Gemini the zone + drivable places and to greet the user.
"nav_zone": "[GEMINI NAV]",
}
# Pending audio-profile swap signalled by the parent over "profile:" stdin
# lines. _audio_swap_loop drains it inside the brain's asyncio loop.
_PROFILE_LOCK = threading.Lock()
_PROFILE_PENDING: dict = {"id": None, "reason": ""}
_VALID_PROFILES = {"builtin", "anker", "anker_powerconf",
"hollyland_builtin", "jbl_builtin_mic"}
def _stdin_watcher() -> None:
"""Daemon thread — parse 'frame:' / 'state:' / 'profile:' lines off stdin.
Best-effort: any malformed line is skipped. Exits when the parent
closes our stdin (subprocess teardown)."""
try:
# IMPORTANT: read with readline(), NOT `for line in sys.stdin`. The file
# iterator does aggressive read-ahead buffering, so on an idle pipe a
# small command like "pause:1\n" can sit unread for SECONDS (until more
# stdin data arrives to flush the read-ahead). That delayed the record-
# playback pause by ~2s — Gemini kept stomping the chest speaker so the
# clip was silent / late. readline() returns each line as soon as its
# newline arrives, so commands are delivered promptly.
while True:
line = sys.stdin.readline()
if line == "":
break # EOF — parent closed our stdin (subprocess teardown)
line = line.rstrip("\n")
if not line:
continue
if line.startswith("frame:"):
b64 = line[len("frame:"):]
try:
data = base64.b64decode(b64)
except Exception:
continue
if data:
with _LATEST_FRAME_LOCK:
_LATEST_FRAME["bytes"] = data
_LATEST_FRAME["ts"] = time.time()
elif line.startswith("state:"):
try:
payload = json.loads(line[len("state:"):])
except Exception:
continue
event = (payload.get("event") or "").strip().lower()
cmd = (payload.get("cmd") or "").strip()
tag = _STATE_TAGS.get(event)
if not tag or not cmd:
continue
msg = f"{tag} {cmd}"
elapsed = payload.get("elapsed_sec")
if isinstance(elapsed, (int, float)):
msg += f" ({float(elapsed):.1f}s)"
reason = payload.get("reason")
if reason and event == "error":
msg += f"{reason}"
with _STATE_LOCK:
_STATE_PENDING.append(msg)
elif line.startswith("profile:"):
# Parent signals an audio-profile hot-swap. Stash the target;
# _audio_swap_loop (asyncio task) handles the actual swap so
# PyAudio open/close happens off the stdin thread.
try:
payload = json.loads(line[len("profile:"):])
except Exception:
continue
pid = (payload.get("id") or "").strip().lower()
if pid not in _VALID_PROFILES:
continue
with _PROFILE_LOCK:
_PROFILE_PENDING["id"] = pid
_PROFILE_PENDING["reason"] = (
payload.get("reason") or "").strip()
elif line.startswith("pause:"):
# Dashboard record playback — pause/resume the live interaction.
if line[len("pause:"):].strip() in ("1", "true", "True", "on"):
if not _INPUT_PAUSED.is_set():
_INPUT_PAUSED.set()
log.info("input PAUSED — record playback")
else:
if _INPUT_PAUSED.is_set():
_INPUT_PAUSED.clear()
log.info("input RESUMED — record playback ended")
except Exception:
return
# Start the watcher at import time — it blocks harmlessly on sys.stdin
# until the supervisor sends something. Daemon so it never blocks exit.
threading.Thread(target=_stdin_watcher, daemon=True, name="stdin-watcher").start()
def _audio_energy(pcm: bytes) -> int:
try:
samples = array.array("h", pcm)
return sum(abs(s) for s in samples) // len(samples) if samples else 0
except Exception:
return 0
class GeminiBrain:
"""Gemini Live conversation brain — reconnect-safe."""
def __init__(self, audio_io, recorder, voice_name: Optional[str] = None,
system_prompt: str = ""):
self._audio = audio_io
self._mic = audio_io.mic
self._speaker = audio_io.speaker
# Kept on the brain so swap_audio_devices() can rebuild profiles that
# need DDS (`builtin`, `hollyland_builtin`) without re-init'ing.
self._audio_client = getattr(audio_io, "_audio_client", None)
# Current profile id (driven by the parent's "profile:" stdin push).
# Defaults to whatever audio_io was constructed with — `from_profile`
# sets profile_id; if SANAD_AUDIO_PROFILE override is in env, that
# value matches.
self._current_profile_id = getattr(audio_io, "profile_id", None) \
or os.environ.get("SANAD_AUDIO_PROFILE", "builtin").strip().lower()
# Coordinates concurrent swap requests so two pending profile
# changes don't interleave mid-tear-down.
self._swap_lock: Optional[asyncio.Lock] = None # built in run()
self._recorder = recorder
self._voice = voice_name or GEMINI_VOICE
self._system_prompt = (system_prompt or "") + _FACE_PROMPT_ADDENDUM
self._api_key = GEMINI_API_KEY
self._stop_flag = asyncio.Event()
# per-session state (reset in the outer reconnect loop)
self._speaking = False
self._stream_started = False
self._barge_block_until = 0.0
self._ai_speak_start = 0.0
self._last_ai_audio = 0.0
# Rolling estimate of the speaker bleed picked up by the mic while the AI
# talks (JBL profile) — the barge-in threshold floats above this.
self._bleed_ewma = 0.0
self._done: Optional[asyncio.Event] = None
# ── Recognition flags — kept in sync with the state file by
# _recognition_state_watcher. Boot defaults come from the file (or
# the SANAD_* env vars if the file is missing).
_initial = _recog_state.read(_RECOG_STATE_PATH)
self._vision_enabled = bool(
_initial.vision_enabled
or os.environ.get("SANAD_VISION_ENABLE", "0") == "1"
)
self._face_rec_enabled = bool(
_initial.face_rec_enabled
or os.environ.get("SANAD_FACE_RECOGNITION_ENABLE", "0") == "1"
)
self._gallery_version_primed = -1 # bumped after first successful primer
# N3 — zones knowledge toggle + primer version tracking.
self._zone_rec_enabled = bool(
_initial.zone_rec_enabled
or os.environ.get("SANAD_ZONE_RECOGNITION_ENABLE", "0") == "1"
)
self._zones_version_primed = -1
# "Go here" destination already announced this session (zone_id, place_id).
self._nav_target = (
int(_initial.nav_target_zone_id), int(_initial.nav_target_place_id),
)
# N2 — Gemini-driven locomotion enable gate (announce only; the
# actual dispatch loop lives in the parent and is wired separately).
self._movement_enabled = bool(
_initial.movement_enabled
or os.environ.get("SANAD_MOVEMENT_ENABLE", "0") == "1"
)
# Auto-record toggle — recognition_state is the live source of truth.
# Sync the recorder to it now; the watcher keeps it in sync at runtime.
self._record_enabled = bool(_initial.record_enabled)
try:
self._recorder.enabled = self._record_enabled
except Exception:
pass
def stop(self) -> None:
"""Signal the run loop to exit at the next opportunity."""
try:
self._stop_flag.set()
except Exception:
pass
# ─── public entry point ───────────────────────────────
async def run(self) -> None:
client = genai.Client(api_key=self._api_key)
config = self._build_config()
session_num = 0
start_time = time.time()
consecutive_errors = 0
while not self._stop_flag.is_set():
session_num += 1
self._reset_turn_state()
# On a reconnect (not the first session), suppress the unprompted
# re-greeting until the user speaks — keeps the chest speaker free
# for record playback and stops the "robot greets every 30s" loop.
self._suppress_greeting = session_num > 1
uptime_min = (time.time() - start_time) / 60
try:
log.info("connecting to Gemini (session #%d, uptime %.0fm)...",
session_num, uptime_min)
async with client.aio.live.connect(model=_MODEL, config=config) as session:
log.info("connected — speak anytime!")
consecutive_errors = 0
self._mic.flush()
self._done = asyncio.Event()
# Reset per-session primer state so re-priming on reconnect
# actually happens. The state watcher will re-prime as soon
# as it sees vision+face-rec (and place-rec) enabled.
self._gallery_version_primed = -1
self._zones_version_primed = -1
# Re-announce the active destination on reconnect.
self._nav_target = (-1, -1)
# Lazy-build the swap lock on the active asyncio loop.
if self._swap_lock is None:
self._swap_lock = asyncio.Lock()
try:
await asyncio.wait_for(
asyncio.gather(
self._send_mic_loop(session),
self._receive_loop(session),
self._send_frame_loop(session),
self._send_state_loop(session),
self._recognition_state_watcher(session),
self._audio_swap_loop(session),
),
timeout=_SESSION_TIMEOUT,
)
except asyncio.TimeoutError:
log.warning("session timed out after %ds", _SESSION_TIMEOUT)
except asyncio.CancelledError:
log.warning("session cancelled")
log.info("session #%d ended — reconnecting in 1s", session_num)
self._speaker.stop()
self._mic.flush()
await asyncio.sleep(1)
except asyncio.CancelledError:
log.info("cancelled — stopping")
break
except KeyboardInterrupt:
log.info("keyboard interrupt — stopping")
break
except Exception as exc:
consecutive_errors += 1
delay = min(_MAX_RECONNECT_DELAY, 2 ** consecutive_errors)
log.error("session error (#%d): %s — reconnecting in %ds",
consecutive_errors, exc, delay)
await asyncio.sleep(delay)
if consecutive_errors >= _MAX_CONSECUTIVE_ERRORS:
log.warning("%d consecutive errors — recreating client",
consecutive_errors)
try:
client = genai.Client(api_key=self._api_key)
consecutive_errors = 0
except Exception as ce:
log.error("client recreation failed: %s", ce)
# ─── Gemini config ────────────────────────────────────
def _build_config(self) -> types.LiveConnectConfig:
return types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name=self._voice,
),
),
),
realtime_input_config=types.RealtimeInputConfig(
automatic_activity_detection=types.AutomaticActivityDetection(
disabled=False,
start_of_speech_sensitivity=getattr(
types.StartSensitivity,
_VAD.get("start_sensitivity", "START_SENSITIVITY_HIGH"),
),
end_of_speech_sensitivity=getattr(
types.EndSensitivity,
_VAD.get("end_sensitivity", "END_SENSITIVITY_LOW"),
),
prefix_padding_ms=_VAD.get("prefix_padding_ms", 20),
silence_duration_ms=_VAD.get("silence_duration_ms", 200),
),
),
input_audio_transcription=types.AudioTranscriptionConfig(),
output_audio_transcription=types.AudioTranscriptionConfig(),
system_instruction=types.Content(
parts=[types.Part(text=self._system_prompt)],
),
# Native function-calling: Gemini can drive the robot to saved
# places (navigate_to_place / list_places / where_am_i /
# stop_navigation). Disable with SANAD_NAV_TOOLS=0.
# Native function-calling: nav tools (if enabled) + the always-on
# expressive-face / social-QR tools (set_expression / show_social).
tools=[types.Tool(function_declarations=(
(_nav_function_declarations() if _NAV_TOOLS_ENABLED else [])
+ _face_function_declarations()))],
)
# ─── state helpers ────────────────────────────────────
def _reset_turn_state(self) -> None:
self._speaking = False
self._stream_started = False
self._barge_block_until = 0.0
self._ai_speak_start = 0.0
self._last_ai_audio = 0.0
# Rolling estimate of the speaker bleed picked up by the mic while the AI
# talks (JBL profile) — the barge-in threshold floats above this.
self._bleed_ewma = 0.0
# Suppress the unprompted greeting on a RECONNECT (set per-session in
# run() for session_num>1). The idle watchdog reconnects every ~30s when
# no one talks, and a fresh session greets each time ("مرحبابك…") which
# floods the shared chest speaker and stomps record playback. We drop
# that greeting's audio until the user actually speaks. Default False so
# the FIRST session (startup) greets normally.
self._suppress_greeting = False
def _interrupt(self, source: str = "local") -> None:
self._speaking = False
self._stream_started = False
self._speaker.stop()
self._mic.flush()
self._recorder.finish_turn()
log.info("interrupt (%s)", source)
# ─── mic send loop ────────────────────────────────────
async def _send_mic_loop(self, session: Any) -> None:
threshold = _BI.get("threshold", 500)
chunks_needed = _BI.get("loud_chunks_needed", 3)
cooldown = _BI.get("cooldown_sec", 0.3)
echo_suppress_below = _BI.get("echo_suppress_below", 500)
grace = _BI.get("ai_speak_grace_sec", 0.15)
loop = asyncio.get_event_loop()
loud_count = 0
last_activity = time.time()
while not self._done.is_set() and not self._stop_flag.is_set():
try:
raw = await loop.run_in_executor(
None, self._mic.read_chunk, _CHUNK_BYTES,
)
except Exception:
break
samples = np.frombuffer(raw, dtype=np.int16).astype(np.float32)
samples = np.clip(samples * _MIC_GAIN, -32768, 32767).astype(np.int16)
data = samples.tobytes()
energy = _audio_energy(data)
now = time.time()
# On the JBL (loud external speaker) the head mic hears the robot's
# OWN voice as loud as the user. We FULLY gate the mic to Gemini while
# it speaks (+ a short echo tail) so it NEVER hears itself, and we
# DISABLE voice barge-in there — the bleed is as loud as your voice, so
# energy can't separate them and any attempt leaks the echo back to the
# model. (Reliable JBL interrupt needs AEC; the only PulseAudio mic is
# dead, so that's separate work.) The chest speaker (builtin) keeps
# light quiet-frame suppression + working barge-in (firmware AEC).
full_gate = "jbl" in (self._current_profile_id or "")
# Barge-in: sustained user energy cuts the AI — chest profile only.
if self._speaking and not full_gate and now >= self._barge_block_until:
if (now - self._ai_speak_start) >= grace:
if energy > threshold:
loud_count += 1
else:
loud_count = max(0, loud_count - 1)
if loud_count > chunks_needed:
log.info("BARGE-IN (e=%d)", energy)
self._interrupt("barge-in")
loud_count = 0
self._barge_block_until = now + cooldown
# Echo suppression: mask the mic so the model doesn't hear its own bleed.
send_data = data
if _INPUT_PAUSED.is_set():
# Paused for a record playback — feed silence so Gemini neither
# hears the record nor keeps talking over it.
send_data = _SILENCE_PCM
elif full_gate and (self._speaking
or (now - self._last_ai_audio) < _ECHO_TAIL_SEC):
# Loud external speaker: gate ALL frames while speaking + tail —
# this is what guarantees it never hears itself.
send_data = _SILENCE_PCM
elif self._speaking and energy < echo_suppress_below:
send_data = _SILENCE_PCM
# Record user audio when clearly speaking and AI isn't.
if energy > 250 and not self._speaking:
self._recorder.capture_user(data)
# Keep-alive watchdog
if energy > 250:
last_activity = now
elif now - last_activity > 10:
log.info("alive (no speech %.0fs, e=%d)",
now - last_activity, energy)
last_activity = now
try:
await session.send_realtime_input(
audio=types.Blob(
data=send_data,
mime_type=f"audio/pcm;rate={SEND_SAMPLE_RATE}",
),
)
except asyncio.CancelledError:
return
except Exception as exc:
log.warning("mic send failed: %s — ending session", exc)
self._done.set()
return
await asyncio.sleep(CHUNK_SIZE / SEND_SAMPLE_RATE)
log.info("send_mic task ended")
# ─── receive loop ─────────────────────────────────────
async def _receive_loop(self, session: Any) -> None:
loop = asyncio.get_event_loop()
try:
while not self._done.is_set() and not self._stop_flag.is_set():
# Iterate session.receive() with a PER-MESSAGE timeout. A plain
# `async for` parks inside the generator on a silent/half-open
# stall (server stops sending but never closes the socket), so
# the no-message watchdog below — which only ran after the
# async-for ended a cycle — could not fire, and recovery waited
# out the 660s outer session cap. Driving __anext__ under
# wait_for(_NO_MESSAGES_TIMEOUT) detects a stall in ~Ns.
_recv_agen = session.receive()
_recv_it = _recv_agen.__aiter__()
_stalled = False
try:
while True:
try:
response = await asyncio.wait_for(
_recv_it.__anext__(),
timeout=_NO_MESSAGES_TIMEOUT,
)
except StopAsyncIteration:
break # generator exhausted — same as async-for end
except asyncio.TimeoutError:
_stalled = True
break
if self._done.is_set():
break
if hasattr(response, "go_away") and response.go_away is not None:
log.info("server going away — will reconnect")
self._done.set()
return
# Native function-calling: Gemini asks us to run a tool
# (navigation). Handle it + reply, then continue — a
# tool_call message carries no server_content to process.
tc = getattr(response, "tool_call", None)
if tc is not None and getattr(tc, "function_calls", None):
await self._handle_tool_calls(session, tc.function_calls)
continue
sc = response.server_content
if sc is None:
continue
if sc.interrupted is True:
if self._speaking:
log.info("Gemini interrupted")
self._interrupt("gemini")
continue
if sc.input_transcription:
text = (sc.input_transcription.text or "").strip()
if text and not self._speaking:
log.info("USER: %s", text)
self._recorder.add_user_text(text)
# The user actually said something (real
# transcription, not mic noise) → stop suppressing
# the reconnect greeting so Gemini's reply is heard.
if self._suppress_greeting:
self._suppress_greeting = False
log.info("reconnect greeting suppression "
"lifted — user spoke")
if sc.output_transcription:
text = (sc.output_transcription.text or "").strip()
if text:
# Emit as "BOT:" (no space before colon) so the
# supervisor's _track_line can parse it the same
# way it parses "USER:" — this is the channel the
# movement dispatcher (N2) reads Gemini's own
# spoken phrases from. Keep in lock-step with
# GeminiSubprocess._track_line.
log.info("BOT: %s", text)
self._recorder.add_robot_text(text)
if sc.model_turn:
for part in sc.model_turn.parts:
if part.inline_data and part.inline_data.data:
if _INPUT_PAUSED.is_set() or self._suppress_greeting:
# Drop Gemini's audio AND halt any in-flight
# stream at the source. Two cases: (1) a record
# is playing (_INPUT_PAUSED) — Gemini's per-chunk
# PlayStream("sanad") must not stomp the record on
# the shared chest speaker; (2) this is a reconnect
# and the user hasn't spoken — drop the unprompted
# re-greeting. Gated on _stream_started so STOP
# fires once (not per chunk); the next turn's
# begin_stream() clears the stop-flag and resumes.
if self._stream_started:
await loop.run_in_executor(
None, self._speaker.stop)
self._stream_started = False
self._speaking = False
continue
now = time.time()
if not self._speaking:
self._ai_speak_start = now
self._speaking = True
self._last_ai_audio = now
raw_audio = part.inline_data.data
self._recorder.capture_robot(raw_audio)
audio = np.frombuffer(raw_audio, dtype=np.int16)
if not self._stream_started:
await loop.run_in_executor(
None, self._speaker.begin_stream,
)
self._stream_started = True
await loop.run_in_executor(
None, self._speaker.send_chunk,
audio, RECEIVE_SAMPLE_RATE,
)
# Lip-sync marker for the LED face mask: emit the
# mouth-open level (0..3) from this chunk's RMS,
# throttled. Parsed by GeminiSubprocess._reader_loop.
_mnow = time.time()
if _mnow - getattr(self, "_mouth_t", 0.0) >= 0.08:
_rms = (float(np.sqrt(np.mean(
audio.astype(np.float32) ** 2))) if audio.size else 0.0)
# Lower thresholds bias the mouth more open
# so lip-sync reads strongly (vs. barely moving).
_lvl = (0 if _rms < 140 else 1 if _rms < 650
else 2 if _rms < 1700 else 3)
if (_lvl != getattr(self, "_mouth_lvl", -1)
or _mnow - getattr(self, "_mouth_t", 0.0) >= 0.2):
self._mouth_t = _mnow
self._mouth_lvl = _lvl
log.info("[[MOUTH:%d]]", _lvl)
if sc.turn_complete:
if (self._speaking and self._stream_started
and not self._speaker.interrupted):
log.info("speaker %.1fs", self._speaker.total_sent_sec)
await loop.run_in_executor(
None, self._speaker.wait_finish,
)
elif self._speaking and self._speaker.interrupted:
log.info("speaker interrupted")
self._speaking = False
self._stream_started = False
if getattr(self, "_mouth_lvl", 0) != 0:
self._mouth_lvl = 0
log.info("[[MOUTH:0]]") # close the LED-mask mouth
self._mic.flush()
self._recorder.finish_turn()
log.info("listening")
finally:
# Close the per-cycle receive generator so a stall/break
# doesn't leak it (the old `async for` closed it for us).
try:
await _recv_agen.aclose()
except Exception:
pass
if _stalled:
log.warning("no messages from Gemini for %ds — session dead",
_NO_MESSAGES_TIMEOUT)
break
await asyncio.sleep(0.1)
except Exception as exc:
log.warning("receive ended: %s", exc)
finally:
self._done.set()
# ─── vision-state announcer ───────────────────────────
# Injects the camera state into the live session as text context.
# On a live toggle Gemini is told to say so out loud ("I can see you
# now" / "I can't see you anymore"); at session start it's silent
# standing context so "can you see me?" is answered honestly.
async def _announce_vision_state(self, session: Any, enabled: bool,
is_toggle: bool) -> None:
if is_toggle and enabled:
text = (
"[VISION ON] Your camera was just enabled — you can now see "
"the user through it. Briefly tell them you can see them now, "
"in your normal Khaleeji style (for example: "
"'هلا، الحين أشوفك زين')."
)
elif is_toggle and not enabled:
text = (
"[VISION OFF] Your camera was just disabled — you can no "
"longer see anything. Briefly tell the user you can't see "
"them anymore. If they later ask whether you can see them, "
"tell them to enable the camera from the dashboard."
)
elif enabled: # session start, camera already on
text = (
"[VISION STATUS] Your camera is ON — you can see the user "
"through it. Do not announce this unprompted; just answer "
"naturally if they ask what you see."
)
else: # session start, camera off
text = (
"[VISION STATUS] Your camera is OFF — you cannot see anything "
"right now. If the user asks whether you can see them, tell "
"them to enable the camera from the dashboard. Do not announce "
"this unprompted."
)
try:
await session.send_realtime_input(text=text)
log.info("vision-state injected (enabled=%s, toggle=%s)",
enabled, is_toggle)
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("vision-state inject failed: %s", exc)
# ─── face-recognition-state announcer ─────────────────
# Same idea as _announce_vision_state, for the face-recognition toggle.
# On a live OFF toggle it also tells Gemini to disregard the gallery —
# so OFF takes effect immediately instead of lingering until reconnect.
async def _announce_facerec_state(self, session: Any, enabled: bool,
is_toggle: bool) -> None:
if is_toggle and enabled:
text = (
"[FACE RECOGNITION ON] Face recognition was just enabled — "
"you'll be shown the people you know in a moment. Briefly "
"tell the user you can now recognise the people you know, in "
"your normal Khaleeji style."
)
elif is_toggle and not enabled:
text = (
"[FACE RECOGNITION OFF] Face recognition was just disabled. "
"Disregard the face gallery you were given earlier — stop "
"greeting people by name and do not identify anyone. Briefly "
"tell the user you'll no longer recognise faces."
)
elif enabled: # session start, face rec already on
text = (
"[FACE RECOGNITION STATUS] Face recognition is ON — when you "
"see someone you've been shown in the gallery, greet them by "
"name. Do not announce this unprompted."
)
else: # session start, face rec off
text = (
"[FACE RECOGNITION STATUS] Face recognition is OFF — you "
"cannot identify people. If the user asks who someone is or "
"whether you recognise them, tell them to enable face "
"recognition from the dashboard. Do not announce this "
"unprompted."
)
try:
await session.send_realtime_input(text=text)
log.info("face-rec-state injected (enabled=%s, toggle=%s)",
enabled, is_toggle)
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("face-rec-state inject failed: %s", exc)
# ─── place-recognition-state announcer (N3) ───────────
# Same idea as _announce_facerec_state, for the places-gallery toggle.
# On a live OFF toggle it also tells Gemini to disregard the places it
# was given so OFF takes effect immediately instead of lingering.
async def _announce_zonerec_state(self, session: Any, enabled: bool,
is_toggle: bool) -> None:
if is_toggle and enabled:
text = (
"[ZONE RECOGNITION ON] You were just given the zones and places "
"you know (and the people associated with them). Briefly tell "
"the user you now know your way around, in your normal Khaleeji "
"style."
)
elif is_toggle and not enabled:
text = (
"[ZONE RECOGNITION OFF] Zone recognition was just disabled. "
"Disregard the zones and places you were given earlier — stop "
"naming rooms or locations. Briefly tell the user you'll no "
"longer recognise places."
)
elif enabled: # session start, zone rec already on
text = (
"[ZONE RECOGNITION STATUS] Zone recognition is ON — when you see "
"or are asked about a zone/place you've been told about, you may "
"name it and use its description. Do not announce this "
"unprompted."
)
else: # session start, zone rec off
text = (
"[ZONE RECOGNITION STATUS] Zone recognition is OFF — you do not "
"know any specific zones or places. If the user asks where they "
"are or to go somewhere by name, tell them to enable zone "
"recognition from the dashboard. Do not announce this "
"unprompted."
)
try:
await session.send_realtime_input(text=text)
log.info("zone-rec-state injected (enabled=%s, toggle=%s)",
enabled, is_toggle)
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("zone-rec-state inject failed: %s", exc)
# ─── navigation-target announcer (N3 "go here") ───────
# When the operator sets a destination, tell Gemini which place to go to
# and show it the reference photo(s). Actual robot motion is wired by N2;
# this establishes the goal + visual reference.
async def _announce_nav_target(self, session: Any,
zone_id: int, place_id: int) -> None:
if not zone_id or not place_id:
try:
await session.send_realtime_input(text=(
"[DESTINATION CLEARED] You have no specific destination right "
"now. Do not announce this unprompted."
))
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("nav-clear inject failed: %s", exc)
return
try:
from Project.Sanad.vision.zone_gallery import ZoneGallery
gallery = ZoneGallery(_ZONES_DIR)
place = gallery.get_place(zone_id, place_id)
zone = gallery.get_zone(zone_id)
except Exception as exc:
log.warning("nav-target resolve failed: %s", exc)
return
if place is None:
log.info("nav-target zone_%d/place_%d not found — skipping", zone_id, place_id)
return
place_name = place.name or f"place {place_id}"
zone_name = (zone.name if zone else None) or f"zone {zone_id}"
instr = (
f"[GO HERE] The user has set your destination to '{place_name}' in "
f"'{zone_name}'."
)
if place.description:
instr += f" Notes: {place.description}."
instr += (
" The image(s) below show what it looks like so you can recognise it. "
"If walking is enabled you will head there; if it is off, tell the "
"user to enable movement from the dashboard. Briefly acknowledge the "
"destination in your normal Khaleeji style."
)
parts: list[dict[str, Any]] = [{"text": instr}]
for p in place.sample_paths[:_FACES_MAX_SAMPLES]:
try:
raw = p.read_bytes()
except OSError:
continue
jpeg = gallery._resize_for_primer(raw, _FACES_PRIMER_RESIZE) or raw
parts.append({"inline_data": {"mime_type": "image/jpeg", "data": jpeg}})
try:
await session.send_client_content(
turns=[{"role": "user", "parts": parts}], turn_complete=True,
)
log.info("nav-target injected → zone_%d/place_%d (%s)",
zone_id, place_id, place_name)
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("nav-target inject failed: %s", exc)
# ─── navigation tool-call handler (Gemini function-calling) ───
# Gemini issues tool_calls (navigate_to_place / list_places / where_am_i /
# stop_navigation); we execute them against the dashboard nav API and reply
# with a FunctionResponse so the model can speak from the real result.
async def _handle_tool_calls(self, session: Any, function_calls: Any) -> None:
responses = []
for fc in function_calls:
name = getattr(fc, "name", "") or ""
try:
args = dict(getattr(fc, "args", None) or {})
except Exception:
args = {}
log.info("TOOL CALL: %s(%s)", name, args)
result = await self._dispatch_tool(name, args)
log.info("TOOL RESULT: %s%s", name, result)
responses.append(types.FunctionResponse(
id=getattr(fc, "id", None), name=name, response=result,
))
if not responses:
return
try:
await session.send_tool_response(function_responses=responses)
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("send_tool_response failed: %s", exc)
async def _dispatch_tool(self, name: str, args: dict) -> dict:
try:
if name == "navigate_to_place":
# Respect the movement gate (kept fresh by the state watcher) so
# a nav call can't drive while walking is disabled.
if not self._movement_enabled:
return {"ok": False, "reason": "movement_off",
"say": "Movement is off — ask the user to enable it from the dashboard."}
place = str(args.get("place") or "").strip()
if not place:
return {"ok": False, "reason": "no_place"}
return await asyncio.to_thread(
_nav_api, "POST", "/api/nav/voice_goto", {"place": place})
if name == "list_places":
r = await asyncio.to_thread(_nav_api, "GET", "/api/nav/active", None)
if not isinstance(r, dict):
return {"ok": False, "reason": "bad_response"}
if r.get("reason"): # an error envelope from _nav_api
return r
return {"ok": True, "map": r.get("map"), "places": r.get("places", [])}
if name == "where_am_i":
r = await asyncio.to_thread(_nav_api, "GET", "/api/nav/active", None)
if not isinstance(r, dict):
return {"ok": False, "reason": "bad_response"}
if r.get("reason"):
return r
return {"ok": True, "map": r.get("map"),
"mode": r.get("mode_label"),
"ready": bool(r.get("bringup_alive")),
"localized": bool(r.get("localizing")),
"places": r.get("places", [])}
if name == "stop_navigation":
return await asyncio.to_thread(_nav_api, "POST", "/api/nav/cancel", None)
if name == "set_expression":
emotion = str(args.get("emotion") or "").strip().lower()
if emotion not in _FACE_EMOTIONS:
return {"ok": False, "reason": "unknown_emotion"}
# The parent (GeminiSubprocess) relays [[FACE:…]] to the LED mask.
log.info("[[FACE:%s]]", emotion)
return {"ok": True, "shown": emotion}
if name == "show_social":
account = str(args.get("account") or "").strip().lower()
if account not in _SOCIAL_ACCOUNTS:
return {"ok": False, "reason": "unknown_account"}
log.info("[[SHOW:%s]]", account)
return {"ok": True, "showing": account}
return {"ok": False, "reason": "unknown_tool"}
except Exception as exc:
log.warning("tool %s error: %s", name, exc)
return {"ok": False, "reason": "error", "detail": str(exc)[:200]}
# ─── movement-state announcer (N2) ────────────────────
# Spoken confirmation when the operator enables / disables Gemini-driven
# locomotion from the dashboard. The actual movement dispatch loop lives
# in the parent; this only gives the user audible feedback on the toggle.
async def _announce_movement_state(self, session: Any, enabled: bool,
is_toggle: bool) -> None:
if is_toggle and enabled:
text = (
"[MOVEMENT ON] Walking is now enabled — you can move when the "
"user asks. Briefly tell the user movement is enabled and they "
"can ask you to walk, in your normal Khaleeji style."
)
elif is_toggle and not enabled:
text = (
"[MOVEMENT OFF] Walking was just disabled — you must not move. "
"Briefly tell the user movement is now off. If they ask you to "
"walk, tell them to enable movement from the dashboard first."
)
elif enabled: # session start, movement already on
text = (
"[MOVEMENT STATUS] Walking is ON — you may move when asked. Do "
"not announce this unprompted."
)
else: # session start, movement off
text = (
"[MOVEMENT STATUS] Walking is OFF — you cannot move right now. "
"If the user asks you to walk, tell them to enable movement "
"from the dashboard. Do not announce this unprompted."
)
try:
await session.send_realtime_input(text=text)
log.info("movement-state injected (enabled=%s, toggle=%s)",
enabled, is_toggle)
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("movement-state inject failed: %s", exc)
# ─── audio profile hot-swap ───────────────────────────
# The parent (GeminiSubprocess) polls pactl for the Anker USB device
# and writes "profile:<json>" lines to our stdin. _stdin_watcher parses
# them into _PROFILE_PENDING; this loop drains the flag on the asyncio
# loop and performs the actual swap. The brain's read/write sites
# (_send_mic_loop / _receive_loop) keep using self._mic / self._speaker —
# an atomic ref reassignment is enough because nothing caches them in
# a loop-local variable (verified in exploration).
async def _audio_swap_loop(self, session: Any) -> None:
while not self._done.is_set() and not self._stop_flag.is_set():
await asyncio.sleep(0.25)
with _PROFILE_LOCK:
target = _PROFILE_PENDING.get("id")
reason = _PROFILE_PENDING.get("reason", "")
_PROFILE_PENDING["id"] = None
_PROFILE_PENDING["reason"] = ""
if not target or target == self._current_profile_id:
continue
try:
await self.swap_audio_devices(session, target, reason=reason)
except asyncio.CancelledError:
return
except Exception as exc:
log.warning("audio swap failed: %s", exc)
async def swap_audio_devices(self, session: Any, profile_id: str,
reason: str = "") -> None:
"""Hot-swap mic+speaker to `profile_id` without dropping the live
Gemini session. Idempotent (no-op if already on `profile_id`).
Order matters: start the new mic BEFORE we tear the old one down,
so a transient PyAudio failure (e.g. udev hasn't exposed Anker yet)
leaves the old backend in place. After 3 retries with backoff we
give up and log WARN — the watcher will retry on its next tick.
"""
if self._swap_lock is None:
log.warning("swap requested before session loop started — skipping")
return
async with self._swap_lock:
if profile_id == self._current_profile_id:
return
prev = self._current_profile_id
log.info("audio swap: %s%s (reason=%s)", prev, profile_id, reason or "")
# Build + start the new mic. Retry: pactl can see the device
# before PyAudio's get_device_count refreshes.
try:
from Project.Sanad.voice.audio_io import AudioIO as _AudioIO
except Exception as exc:
log.error("audio swap: AudioIO import failed: %s", exc)
return
new_mic = new_spk = None
last_exc: Optional[BaseException] = None
for attempt in range(3):
try:
new_mic, new_spk = _AudioIO.build_backends(
profile_id, audio_client=self._audio_client,
)
# mic.start() opens PyAudio + spawns reader thread.
# speaker is lazy (opens on first send_chunk).
await asyncio.to_thread(new_mic.start)
break
except Exception as exc:
last_exc = exc
# Tear down a partially-built backend so the next attempt
# gets a clean slate; don't leak PyAudio handles.
if new_mic is not None:
try:
await asyncio.to_thread(new_mic.stop)
except Exception:
pass
new_mic = new_spk = None
log.info("audio swap attempt %d failed: %s — retry in 0.4s",
attempt + 1, exc)
await asyncio.sleep(0.4)
if new_mic is None or new_spk is None:
log.warning("audio swap %s%s: all 3 attempts failed (%s); "
"keeping current profile",
prev, profile_id, last_exc)
return
# Drain the old playback so any in-flight AI utterance stops
# (interrupts mid-word — acceptable per spec, <1s gap).
# MUST be awaited via to_thread: _PyAudioSpeaker.stop now
# takes a per-instance RLock and an in-flight send_chunk on
# the executor may be holding it across a back-pressured
# PortAudio write. Calling stop() synchronously on the
# event-loop thread would wedge the entire loop (mic,
# vision, session.receive) until the pulse buffer drains.
try:
await asyncio.to_thread(self._speaker.stop)
except Exception:
pass
try:
self._mic.flush()
except Exception:
pass
# Atomic ref swap — next read_chunk / send_chunk hits new.
old_mic, old_spk = self._mic, self._speaker
self._mic = new_mic
self._speaker = new_spk
self._current_profile_id = profile_id
self._reset_turn_state()
# Tear down old AFTER the ref swap so any executor call still
# in flight finishes against the old handle and the next loop
# iteration picks up the new one cleanly.
try:
await asyncio.to_thread(old_mic.stop)
except Exception:
pass
try:
await asyncio.to_thread(old_spk.stop)
except Exception:
pass
# Silent context to Gemini — so it knows the input chain changed
# if asked (matches the _announce_vision_state pattern).
try:
await session.send_realtime_input(text=(
f"[AUDIO SWITCH] Mic + speaker are now on the {profile_id!s} "
f"audio profile. Do not announce this unprompted; just keep "
f"replying normally — the user's voice may sound clearer or "
f"different on the new device."
))
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("audio-swap announce failed: %s", exc)
log.info("audio swap complete: %s%s", prev, profile_id)
# ─── recognition state watcher ────────────────────────
# Polls data/.recognition_state.json at SANAD_RECOGNITION_POLL_S Hz and
# mirrors vision_enabled / face_rec_enabled into in-memory flags so the
# rest of the session can react WITHOUT a Gemini reconnect.
async def _recognition_state_watcher(self, session: Any) -> None:
last_mtime_ns = -1
last_state = _recog_state.RecognitionState(
vision_enabled=self._vision_enabled,
face_rec_enabled=self._face_rec_enabled,
gallery_version=self._gallery_version_primed,
zone_rec_enabled=self._zone_rec_enabled,
zones_version=self._zones_version_primed,
movement_enabled=self._movement_enabled,
record_enabled=self._record_enabled,
)
# Best-effort initial primer if face_rec is already on at session start.
if self._face_rec_enabled and self._vision_enabled:
try:
cur = _recog_state.read(_RECOG_STATE_PATH)
await self._send_gallery_primer(session, cur.gallery_version)
except Exception as exc:
log.warning("initial gallery primer failed: %s", exc)
# N3 — initial zones primer if zone recognition is already on. Unlike
# faces this does NOT require vision: name+description-only places still
# give Gemini useful knowledge to talk about.
if self._zone_rec_enabled:
try:
cur = _recog_state.read(_RECOG_STATE_PATH)
await self._send_zone_primer(session, cur.zones_version)
except Exception as exc:
log.warning("initial zone primer failed: %s", exc)
# Tell Gemini the current camera + recognition + movement state at
# session start — silent standing context so "can you see me?" / "do
# you know who I am?" are answered honestly even if nothing is toggled.
await self._announce_vision_state(
session, self._vision_enabled, is_toggle=False,
)
await self._announce_facerec_state(
session, self._face_rec_enabled, is_toggle=False,
)
await self._announce_zonerec_state(
session, self._zone_rec_enabled, is_toggle=False,
)
await self._announce_movement_state(
session, self._movement_enabled, is_toggle=False,
)
# N3 — announce the active "go here" destination (if any). _nav_target
# was reset to (-1,-1) per session so this fires on every reconnect.
try:
cur = _recog_state.read(_RECOG_STATE_PATH)
nav = (cur.nav_target_zone_id, cur.nav_target_place_id)
if nav != self._nav_target:
await self._announce_nav_target(session, nav[0], nav[1])
self._nav_target = nav
except Exception as exc:
log.warning("initial nav-target announce failed: %s", exc)
while not self._done.is_set() and not self._stop_flag.is_set():
await asyncio.sleep(_RECOG_POLL_S)
try:
st = _RECOG_STATE_PATH.stat()
except FileNotFoundError:
continue
except Exception:
continue
# Use nanosecond mtime: write() does os.replace of a fresh
# tempfile, so two CRUD ops within one coarse mtime tick would
# share an identical whole-second st_mtime and the second change
# would be skipped on this tick. st_mtime_ns has far finer
# resolution, so a rapid second write is observed. (The
# version-diff logic below is still the ultimate safety net.)
if st.st_mtime_ns == last_mtime_ns:
continue
last_mtime_ns = st.st_mtime_ns
new_state = _recog_state.read(_RECOG_STATE_PATH)
# Vision toggle — instant. Announce it out loud so Gemini reacts
# ("I can see you now" / "I can't see you anymore").
if new_state.vision_enabled != last_state.vision_enabled:
self._vision_enabled = new_state.vision_enabled
log.info("vision toggled → %s", self._vision_enabled)
await self._announce_vision_state(
session, self._vision_enabled, is_toggle=True,
)
# Face-rec toggle — announce it out loud. The OFF announcement
# also tells Gemini to disregard the gallery, so OFF takes effect
# immediately instead of lingering until the next reconnect.
if new_state.face_rec_enabled != last_state.face_rec_enabled:
self._face_rec_enabled = new_state.face_rec_enabled
if self._face_rec_enabled:
log.info("face rec enabled — announcing + sending primer")
else:
log.info("face rec disabled — telling Gemini to "
"disregard the gallery")
await self._announce_facerec_state(
session, self._face_rec_enabled, is_toggle=True,
)
# Conditions for re-priming:
# - face_rec just turned ON (no_face_rec_before)
# - gallery version bumped since the last primer
face_rec_just_on = (
new_state.face_rec_enabled and not last_state.face_rec_enabled
)
gallery_changed = (
new_state.gallery_version != self._gallery_version_primed
)
if (self._face_rec_enabled
and (face_rec_just_on or gallery_changed)
and self._vision_enabled):
try:
await self._send_gallery_primer(
session, new_state.gallery_version,
)
except Exception as exc:
log.warning("gallery primer failed: %s", exc)
# N3 — zone-recognition toggle (announce out loud, like face-rec).
if new_state.zone_rec_enabled != last_state.zone_rec_enabled:
self._zone_rec_enabled = new_state.zone_rec_enabled
log.info("zone rec toggled → %s", self._zone_rec_enabled)
await self._announce_zonerec_state(
session, self._zone_rec_enabled, is_toggle=True,
)
# Re-prime zones when zone-rec just turned ON or the zones version
# bumped (any zone/place/face-link/photo CRUD). No vision needed.
zone_rec_just_on = (
new_state.zone_rec_enabled and not last_state.zone_rec_enabled
)
zones_changed = (
new_state.zones_version != self._zones_version_primed
)
if self._zone_rec_enabled and (zone_rec_just_on or zones_changed):
try:
await self._send_zone_primer(
session, new_state.zones_version,
)
except Exception as exc:
log.warning("zone primer failed: %s", exc)
# N3 — "go here" destination changed (set or cleared). Announce +
# show the reference photo. Diffed against the announced tuple so a
# CRUD-only version bump above doesn't double-fire this.
nav = (new_state.nav_target_zone_id, new_state.nav_target_place_id)
if nav != self._nav_target:
self._nav_target = nav
await self._announce_nav_target(session, nav[0], nav[1])
# N2 — movement enable/disable toggle (spoken confirmation only).
if new_state.movement_enabled != last_state.movement_enabled:
self._movement_enabled = new_state.movement_enabled
log.info("movement toggled → %s", self._movement_enabled)
await self._announce_movement_state(
session, self._movement_enabled, is_toggle=True,
)
# Auto-record toggle — flip the recorder live (no session restart).
if new_state.record_enabled != last_state.record_enabled:
self._record_enabled = new_state.record_enabled
try:
self._recorder.enabled = self._record_enabled
except Exception:
pass
log.info("auto-record toggled → %s", self._record_enabled)
last_state = new_state
# ─── camera frame send loop ───────────────────────────
# Reads the latest JPEG from the _LATEST_FRAME cache (fed by the
# _stdin_watcher thread, which the GeminiSubprocess supervisor pushes
# 'frame:<b64>' lines into) and relays it to Gemini Live at
# _VISION_SEND_HZ. Only active when self._vision_enabled. Skips frames
# older than _VISION_STALE_MS so a stopped/unplugged camera doesn't
# waste tokens on a frozen scene.
async def _send_frame_loop(self, session: Any) -> None:
period = 1.0 / max(0.5, _VISION_SEND_HZ)
stale_s = _VISION_STALE_MS / 1000.0
backoff = 0.0
last_sent_ts = 0.0
while not self._done.is_set() and not self._stop_flag.is_set():
await asyncio.sleep(max(period, backoff))
if not self._vision_enabled:
continue
with _LATEST_FRAME_LOCK:
data = _LATEST_FRAME.get("bytes")
ts = _LATEST_FRAME.get("ts", 0.0)
if not data:
continue
# Stale — supervisor stopped pushing (camera off / unplugged).
if (time.time() - ts) > stale_s:
continue
# De-dup — don't re-send a frame we already relayed.
if ts == last_sent_ts:
continue
try:
await session.send_realtime_input(
video=types.Blob(data=data, mime_type="image/jpeg"),
)
last_sent_ts = ts
backoff = 0.0
except asyncio.CancelledError:
return
except Exception as exc:
log.warning("frame send failed: %s", exc)
backoff = min(backoff * 2 + 0.5, 5.0)
# ─── motion-state inject loop ─────────────────────────
# Drains _STATE_PENDING (fed by the _stdin_watcher from 'state:' lines
# the supervisor pushes when the arm starts/finishes/errors a motion)
# and injects each as silent text context into the live session, so
# Gemini can answer "what are you doing?" honestly. Per persona, Gemini
# reads these for context but does not narrate them unprompted.
async def _send_state_loop(self, session: Any) -> None:
while not self._done.is_set() and not self._stop_flag.is_set():
await asyncio.sleep(0.1)
with _STATE_LOCK:
if not _STATE_PENDING:
continue
pending = list(_STATE_PENDING)
_STATE_PENDING.clear()
for msg in pending:
try:
await session.send_realtime_input(text=msg)
log.info("STATE injected: %s", msg)
except asyncio.CancelledError:
return
except Exception as exc:
# Some SDK versions may not accept text on
# send_realtime_input — log once-ish and keep going;
# motion still works, only this context channel is lost.
log.warning("state inject failed: %s", exc)
# ─── face gallery primer ──────────────────────────────
# Builds one multimodal turn carrying the entire face gallery + a Khaleeji
# greeting instruction, and sends it via send_client_content. Gemini keeps
# this in session context until reconnect. Re-sent on gallery_version bumps.
async def _send_gallery_primer(self, session: Any, version: int) -> None:
try:
from Project.Sanad.vision.face_gallery import FaceGallery
except Exception as exc:
log.info("face gallery module unavailable: %s", exc)
return
gallery = FaceGallery(_FACES_DIR)
try:
entries = gallery.load_for_primer(
max_samples_per_face=_FACES_MAX_SAMPLES,
resize_long_side=_FACES_PRIMER_RESIZE,
)
except Exception as exc:
log.warning("face gallery load failed: %s", exc)
return
if not entries:
log.info("face gallery empty — primer skipped (v.%d)", version)
self._gallery_version_primed = version
return
parts: list[dict[str, Any]] = [{
"text": (
"GALLERY PRIMER (do not reply to this turn). "
"Below are people you know. When the live camera shows one of "
"them, greet them warmly by name in UAE Khaleeji dialect "
"(for example: 'هلا والله يا كسام، شحالك؟'), and you may use "
"the notes about them to make the conversation personal. "
"For faces NOT in this gallery, welcome them as a guest "
"without inventing a name. Greet each person only once per "
"minute to avoid repetition."
),
}]
for entry, jpegs in entries:
label = (
f"This person is named {entry.name}."
if entry.name
else "This person's name is unknown — greet as guest."
)
if entry.description:
label += f" Notes about them: {entry.description}"
parts.append({"text": f"\n{label}"})
for jpeg in jpegs:
parts.append({
"inline_data": {"mime_type": "image/jpeg", "data": jpeg},
})
try:
await session.send_client_content(
turns=[{"role": "user", "parts": parts}],
turn_complete=True,
)
except Exception as exc:
log.warning("primer send failed: %s", exc)
return
self._gallery_version_primed = version
log.info("face gallery primed: %d person(s), v.%d", len(entries), version)
# ─── zones primer (N3) ────────────────────────────────
# One multimodal turn carrying every zone, its places (name + description +
# reference photos), and the people linked to each place. A place may have
# NO photos (name + description only), so empty image lists are tolerated.
async def _send_zone_primer(self, session: Any, version: int) -> None:
try:
from Project.Sanad.vision.zone_gallery import ZoneGallery
except Exception as exc:
log.info("zone gallery module unavailable: %s", exc)
return
gallery = ZoneGallery(_ZONES_DIR)
try:
entries = gallery.load_for_primer(
max_samples_per_place=_FACES_MAX_SAMPLES,
resize_long_side=_FACES_PRIMER_RESIZE,
)
except Exception as exc:
log.warning("zone gallery load failed: %s", exc)
return
if not entries:
log.info("zone gallery empty — primer skipped (v.%d)", version)
self._zones_version_primed = version
return
# Resolve linked face ids → names once (cheap, small galleries).
face_names: dict[int, str] = {}
try:
from Project.Sanad.vision.face_gallery import FaceGallery
for fe in FaceGallery(_FACES_DIR).list():
if fe.name:
face_names[fe.id] = fe.name
except Exception:
pass
parts: list[dict[str, Any]] = [{
"text": (
"ZONES PRIMER (do not reply to this turn). Below are the zones "
"and places you know, with the people often found at each place. "
"Use them to answer where things are, to name a place when the "
"live camera shows one, and to make directions personal. Do not "
"invent zones or places that are not listed here."
),
}]
n_zones = n_places = 0
for zone, places in entries:
n_zones += 1
zhdr = f"\n# Zone: {zone.name or '(unnamed)'}"
if zone.description:
zhdr += f"{zone.description}"
parts.append({"text": zhdr})
if not places:
parts.append({"text": " (no places yet)"})
for place, jpegs in places:
n_places += 1
label = f"\n - Place: {place.name or '(unnamed)'}"
if place.description:
label += f"{place.description}"
people = [face_names[f] for f in place.face_ids if f in face_names]
if people:
label += f" | People often here: {', '.join(people)}"
parts.append({"text": label})
for jpeg in jpegs:
parts.append({
"inline_data": {"mime_type": "image/jpeg", "data": jpeg},
})
try:
await session.send_client_content(
turns=[{"role": "user", "parts": parts}],
turn_complete=True,
)
except Exception as exc:
log.warning("zone primer send failed: %s", exc)
return
self._zones_version_primed = version
log.info("zones primed: %d zone(s), %d place(s), v.%d",
n_zones, n_places, version)