1521 lines
66 KiB
Python
1521 lines
66 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Voice/marcus_voice.py — Gemini Live voice orchestrator for Marcus.
|
||
|
||
Pipeline:
|
||
|
||
G1 mic UDP ──► BuiltinMic (Voice/audio_io.py)
|
||
│
|
||
▼
|
||
GeminiBrain (Voice/gemini_script.py — subprocess in gemini_sdk env)
|
||
│ audio out (24 kHz, Gemini's voice through G1 speaker)
|
||
│ output_transcription (chunks of Gemini's reply text)
|
||
▼
|
||
_dispatch_gemini_bot (Marcus side-channel)
|
||
- match Gemini's spoken phrase against
|
||
Config/instruction.json::actions[*].bot_phrases
|
||
- dedup canonical command within command_cooldown_sec
|
||
│
|
||
▼
|
||
on_command(canonical, "en") ──► Marcus brain → motion
|
||
|
||
GEMINI IS THE WAKE-WORD GATEKEEPER. The persona prompt in
|
||
Config/config_Voice.json::gemini_system_prompt instructs Gemini to ONLY
|
||
speak motion-confirmation phrases ('Turning right', 'أستدير يميناً')
|
||
when the user said the wake word ('Sanad' or 'سند') AND requested an
|
||
action. If Gemini says a motion phrase, Marcus trusts that decision and
|
||
fires the matching robot motion. Marcus does NOT inspect user transcripts
|
||
for wake words anymore — single gatekeeper, no double-checking.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import queue
|
||
import re
|
||
import sys
|
||
import threading
|
||
import time
|
||
from collections import deque
|
||
from difflib import SequenceMatcher
|
||
from logging.handlers import RotatingFileHandler
|
||
from typing import Callable, Optional
|
||
|
||
import numpy as np
|
||
|
||
_PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
if _PROJECT_DIR not in sys.path:
|
||
sys.path.insert(0, _PROJECT_DIR)
|
||
from Core.env_loader import PROJECT_ROOT
|
||
from Core.config_loader import load_config
|
||
from Core.motion_state import motion_abort, motion_pause
|
||
from Core.motion_log import log_motion as _log_motion
|
||
from Voice.number_words import normalise_numbers
|
||
from Voice.canonical_normalizer import to_canonical_english
|
||
from Voice.sequences import get_sequences
|
||
|
||
LOG_DIR = os.path.join(PROJECT_ROOT, "logs")
|
||
os.makedirs(LOG_DIR, exist_ok=True)
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
||
handlers=[
|
||
RotatingFileHandler(
|
||
os.path.join(LOG_DIR, "voice.log"),
|
||
maxBytes=5_000_000, backupCount=3, encoding="utf-8",
|
||
),
|
||
],
|
||
)
|
||
log = logging.getLogger("marcus_voice")
|
||
|
||
|
||
# ── Transcript log ─────────────────────────────────────────────
|
||
# Every user transcript Gemini emits is written here in a simple
|
||
# one-line-per-entry format. Rotates every 5 MB × 3 backups.
|
||
_TRANSCRIPT_PATH = os.path.join(LOG_DIR, "transcript.log")
|
||
_transcript_log = logging.getLogger("transcript")
|
||
_transcript_log.setLevel(logging.INFO)
|
||
_transcript_log.propagate = False
|
||
if not _transcript_log.handlers:
|
||
_th = RotatingFileHandler(
|
||
_TRANSCRIPT_PATH, maxBytes=5_000_000, backupCount=3, encoding="utf-8",
|
||
)
|
||
_th.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
|
||
_transcript_log.addHandler(_th)
|
||
|
||
|
||
def _log_transcript(action: str, text: str) -> None:
|
||
_transcript_log.info("%-5s %s", action, (text or "").strip())
|
||
|
||
|
||
# ─── instruction.json — bilingual phrase tables ──────────────────
|
||
#
|
||
# Single source of truth for every voice phrase the dispatch layer cares
|
||
# about: wake-word variants (EN + AR), per-action user_phrases (what the
|
||
# user might say), per-action bot_phrases (what Gemini might say back).
|
||
# Loaded ONCE at module import; rebuilds the runtime tables below.
|
||
# Adding a new accent / variant / action is a JSON-only edit — no Python
|
||
# change required.
|
||
import json as _json
|
||
|
||
|
||
def _load_instructions() -> dict:
|
||
"""Load Config/instruction.json. Raises RuntimeError if the file is
|
||
missing, empty, or has no actions — voice dispatch is unusable
|
||
without it (every Gemini phrase gets rejected, robot never moves,
|
||
operator gets a confused user complaint hours later). Loud failure
|
||
surfaces in the brain init banner; quiet degradation does not."""
|
||
path = os.path.join(PROJECT_ROOT, "Config", "instruction.json")
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = _json.load(f) or {}
|
||
except FileNotFoundError as e:
|
||
raise RuntimeError(
|
||
"instruction.json missing at {} — voice dispatch cannot run "
|
||
"without it (this file holds every wake-word + bot phrase "
|
||
"the dispatcher matches against). Restore from git or "
|
||
"rebuild from Config/instruction.example.json.".format(path)
|
||
) from e
|
||
except Exception as e:
|
||
raise RuntimeError(
|
||
"instruction.json at {} is malformed JSON: {}".format(path, e)
|
||
) from e
|
||
actions = data.get("actions")
|
||
if not isinstance(actions, dict) or not actions:
|
||
raise RuntimeError(
|
||
"instruction.json at {} has no actions — every dispatch "
|
||
"would silently fail. Add at least one action with "
|
||
"canonical/user_phrases/bot_phrases.".format(path)
|
||
)
|
||
return data
|
||
|
||
|
||
_INSTRUCTIONS = _load_instructions()
|
||
|
||
|
||
def _build_wake_words(data: dict) -> set:
|
||
out = set()
|
||
wake = data.get("wake_words", {}) or {}
|
||
for lang in ("english", "arabic"):
|
||
for w in wake.get(lang, []) or []:
|
||
if isinstance(w, str) and w.strip():
|
||
out.add(w.strip().lower())
|
||
return out
|
||
|
||
|
||
def _build_command_vocab(data: dict) -> list:
|
||
"""English-only canonical phrases — used by the difflib fuzzy matcher.
|
||
Includes every action's `canonical` plus all entries from
|
||
`user_phrases.english` (deduped, original order preserved per action)."""
|
||
seen = set()
|
||
out = []
|
||
for action in (data.get("actions", {}) or {}).values():
|
||
canon = (action.get("canonical") or "").strip()
|
||
if canon and canon not in seen:
|
||
seen.add(canon)
|
||
out.append(canon)
|
||
for p in (action.get("user_phrases", {}) or {}).get("english", []) or []:
|
||
p = (p or "").strip()
|
||
if p and p not in seen:
|
||
seen.add(p)
|
||
out.append(p)
|
||
return out
|
||
|
||
|
||
def _build_arabic_motion_map(data: dict) -> dict:
|
||
"""Map Arabic user phrase → English canonical for every action."""
|
||
out = {}
|
||
for action in (data.get("actions", {}) or {}).values():
|
||
canon = (action.get("canonical") or "").strip()
|
||
if not canon:
|
||
continue
|
||
for p in (action.get("user_phrases", {}) or {}).get("arabic", []) or []:
|
||
p = (p or "").strip()
|
||
if p:
|
||
out[p] = canon
|
||
return out
|
||
|
||
|
||
def _build_parametric_actions(data: dict) -> list:
|
||
"""Compile parametric_actions from instruction.json into a list of
|
||
(compiled_regex, command_template) tuples. Every regex carries
|
||
re.IGNORECASE so 'Turning' / 'turning' / 'TURNING' all match. Arabic
|
||
is unaffected by IGNORECASE.
|
||
|
||
Bad entries (missing regex/template, regex compile errors) are
|
||
skipped with a warning — the rest of the table still loads.
|
||
"""
|
||
out = []
|
||
for entry in (data.get("parametric_actions", []) or []):
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
rx = entry.get("regex")
|
||
tpl = entry.get("command_template")
|
||
if not rx or not tpl:
|
||
continue
|
||
try:
|
||
compiled = re.compile(rx, re.IGNORECASE)
|
||
except re.error as e:
|
||
try:
|
||
log.warning("parametric_actions skipped (bad regex %r): %s", rx, e)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
out.append((compiled, tpl))
|
||
return out
|
||
|
||
|
||
# ─── quoted-phrase stripper ─────────────────────────────────────
|
||
#
|
||
# When Gemini quotes the user's phrase back ("Sorry, I don't understand
|
||
# the phrase 'turn left'") the dispatcher would otherwise match the
|
||
# canonical name INSIDE the quoted string and false-fire. Strip every
|
||
# matched pair of quotes — straight, smart, Arabic — before scanning.
|
||
# Pattern handles ASCII '...' "..." plus typographic “…” ’…’ «…» 「…」.
|
||
_QUOTED_RE = re.compile(
|
||
r"\".*?\""
|
||
r"|\'.*?\'"
|
||
r"|“.*?”"
|
||
r"|‘.*?’"
|
||
r"|«.*?»"
|
||
r"|「.*?」"
|
||
r"|『.*?』",
|
||
re.DOTALL,
|
||
)
|
||
|
||
|
||
# ─── STATE echo stripper ────────────────────────────────────────
|
||
#
|
||
# When the runner injects '[STATE-DONE] walk 1 steps (3.0s)' into
|
||
# Gemini Live as silent context, Gemini occasionally echoes that text
|
||
# back through its output transcription. A naive dispatcher scan
|
||
# would re-fire 'walk 1 steps' from the embedded canonical, creating
|
||
# a feedback loop (one voice command → 3-4× motions). This regex
|
||
# strips:
|
||
# '[STATE-START] <cmd>'
|
||
# '[STATE-DONE] <cmd> (1.5s)'
|
||
# '[STATE-INTERRUPTED] <cmd> (2.3s)'
|
||
# '[STATE-ERROR] <cmd> — <reason>'
|
||
# '[STATE-PAUSED] <cmd>' / '[STATE-RESUMED] <cmd>'
|
||
# The match consumes the bracketed tag and the canonical/duration
|
||
# tail, but stops at sentence boundaries (./!/?/؟/،) so a coexisting
|
||
# real motion phrase in the same chunk survives.
|
||
_STATE_ECHO_RE = re.compile(
|
||
r"\[STATE-(?:START|DONE|INTERRUPTED|ERROR|PAUSED|RESUMED)\]"
|
||
r"\s*[^.!?؟،,\n(]*" # canonical name; stops before sentence-end OR '('
|
||
r"(?:\s*\([^)]*\))?" # optional parenthesised duration like (1.5s)
|
||
r"(?:\s+—\s+[^.!?؟،\n]*)?", # optional ' — <reason>' suffix on errors
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
# ─── question-clause stripper ───────────────────────────────────
|
||
#
|
||
# CRITICAL safety filter. Sentences ending in '?' or '؟' (Arabic
|
||
# question mark) are CLARIFICATION QUESTIONS Gemini asks the user —
|
||
# they are NEVER motion confirmations. Without this filter, a question
|
||
# like 'Do you mean turn left or walk one step forward?' would match
|
||
# BOTH 'turn left' and 'walk one step' canonicals inside, dispatching
|
||
# contradictory motions while Gemini thinks it's still waiting for
|
||
# clarification. Production failure observed at 09:13:57.
|
||
#
|
||
# Match: from previous sentence terminator (or string start) up to and
|
||
# including the question mark. Greedy on the left so consecutive
|
||
# questions are stripped one at a time and statements between them
|
||
# survive in the residual text.
|
||
_QUESTION_RE = re.compile(
|
||
r"(?:^|(?<=[.!؟?\n]))[^.!؟?\n]*[?؟]",
|
||
re.DOTALL,
|
||
)
|
||
|
||
|
||
# ─── sequence-name extraction ───────────────────────────────────
|
||
#
|
||
# Sequence canonicals carry a NAME ('save sequence as my-greet',
|
||
# 'play my-greet'). Gemini's bot_phrase contains it; we extract from
|
||
# the surrounding spoken text. Two extractor patterns:
|
||
# - 'as <name>' — for save: 'saved as my-greet'
|
||
# - 'play(ing)? <name>' / 'run(ning)? <name>' — for play
|
||
# Names follow the same shape Sequences validates: lowercase letters,
|
||
# digits, hyphens, underscores; 1–31 chars. Gemini occasionally adds
|
||
# a period; we strip trailing punctuation.
|
||
_SEQ_NAME_AS = re.compile(
|
||
r"\bas\s+(?:my\s+)?([a-z0-9][a-z0-9_-]{0,30})\b", re.I,
|
||
)
|
||
_SEQ_NAME_PLAY = re.compile(
|
||
r"\b(?:play(?:ing)?|run(?:ning)?|perform(?:ing)?|do(?:ing)?)\s+"
|
||
r"(?:my\s+|the\s+|sequence\s+)*"
|
||
r"([a-z0-9][a-z0-9_-]{0,30})\b",
|
||
re.I,
|
||
)
|
||
|
||
|
||
def _extract_sequence_name(text: str, mode: str) -> Optional[str]:
|
||
"""Pull a sequence name out of Gemini's spoken confirmation.
|
||
`mode` is 'as' (save) or 'play' (start_recording / play). Returns
|
||
None if no plausible name is found — caller decides what to do."""
|
||
if not text:
|
||
return None
|
||
t = text.strip().rstrip(".!?,").strip()
|
||
rx = _SEQ_NAME_AS if mode == "as" else _SEQ_NAME_PLAY
|
||
m = rx.search(t)
|
||
if not m:
|
||
return None
|
||
name = m.group(1).strip().lower()
|
||
# Reject likely false positives — bare 'sequence' / 'recording' /
|
||
# 'this' shouldn't be a name.
|
||
if name in {"sequence", "recording", "macro", "this", "that",
|
||
"it", "the", "my"}:
|
||
return None
|
||
return name
|
||
|
||
|
||
# ─── memory helpers ─────────────────────────────────────────────
|
||
#
|
||
# Static map of pairwise inverses for fixed canonicals. Parametric forms
|
||
# (turn left N degrees ↔ turn right N degrees, walk forward N → walk
|
||
# backward N) are handled by _reverse_command via regex below.
|
||
# Non-reversible canonicals (wave, look around, follow me, patrol, stop,
|
||
# come here, stay here, go home, turn around) intentionally absent —
|
||
# _reverse_command returns None for those and the dispatcher logs &
|
||
# skips with no harm done.
|
||
# Pairs loaded from Config/language_tables.json::motion_inverses.
|
||
# Adding a new reversible-pair = JSON edit. See Voice/_language_tables.py.
|
||
from Voice._language_tables import motion_inverses as _load_motion_inverses
|
||
_REVERSE_PAIRS = _load_motion_inverses()
|
||
|
||
|
||
def _reverse_command(cmd: str) -> Optional[str]:
|
||
"""Return the inverse of `cmd` if it is reversible, else None.
|
||
|
||
Handles fixed pairs via _REVERSE_PAIRS and parametric forms via
|
||
regex (turn left N deg ↔ turn right N deg, walk forward N steps/m
|
||
↔ walk backward N steps/m, default-direction walks become explicit
|
||
backward). Bare 'turn N degrees' (no direction) is treated as
|
||
self-inverse — turning N degrees is geometrically reversible by
|
||
repeating it, but more importantly the brain accepts it unchanged.
|
||
"""
|
||
if not cmd:
|
||
return None
|
||
s = cmd.strip().lower()
|
||
if s in _REVERSE_PAIRS:
|
||
return _REVERSE_PAIRS[s]
|
||
m = re.match(r"^turn\s+(left|right)\s+(\d+(?:\.\d+)?)\s*deg(?:rees?)?$", s)
|
||
if m:
|
||
flip = "right" if m.group(1) == "left" else "left"
|
||
return "turn {} {} degrees".format(flip, m.group(2))
|
||
m = re.match(r"^turn\s+(\d+(?:\.\d+)?)\s*deg(?:rees?)?$", s)
|
||
if m:
|
||
return s # un-directional turn — self-inverse for the brain
|
||
m = re.match(
|
||
r"^walk\s+(forward|back|backward)\s+(\d+(?:\.\d+)?)\s*(steps?|meters?)$", s,
|
||
)
|
||
if m:
|
||
flip = "backward" if m.group(1) in ("forward",) else "forward"
|
||
return "walk {} {} {}".format(flip, m.group(2), m.group(3))
|
||
m = re.match(r"^walk\s+(\d+(?:\.\d+)?)\s*(steps?|meters?)$", s)
|
||
if m:
|
||
# Implicit-direction walk → defaults to forward in the brain, so
|
||
# the inverse is an explicit backward walk.
|
||
return "walk backward {} {}".format(m.group(1), m.group(2))
|
||
return None
|
||
|
||
|
||
def _format_param_template(template: str, groups: tuple) -> str:
|
||
"""Substitute $1, $2, … in a parametric command template with the
|
||
matching regex groups. None groups (optional captures that didn't
|
||
fire) are treated as empty strings; surrounding whitespace is
|
||
collapsed so 'turn 90 degrees' becomes 'turn 90 degrees'.
|
||
|
||
Returns an EMPTY string if any captured numeric group is a literal
|
||
'0' (or '0.0') — a 0-quantity motion is a no-op and the dispatcher
|
||
skips empty results. This protects against Gemini hallucinating
|
||
'turn 0 degrees' when the user didn't actually request a turn but
|
||
Gemini misheard 'صفر' or similar. Bug #3 in the 16:42:58 audit.
|
||
"""
|
||
out = template
|
||
for i, g in enumerate(groups, 1):
|
||
token = "$" + str(i)
|
||
if g is not None and re.match(r"^0+(?:\.0+)?$", str(g).strip()):
|
||
return "" # zero-quantity motion → discard
|
||
out = out.replace(token, "" if g is None else str(g))
|
||
out = re.sub(r"\s+", " ", out).strip()
|
||
return out
|
||
|
||
|
||
def _build_bot_motion_patterns(data: dict) -> list:
|
||
"""List of (needle, canonical) the bot dispatcher matches Gemini's
|
||
spoken reply against. English needles are lowercased so the dispatcher
|
||
can use case-insensitive `in` checks; Arabic needles are kept verbatim."""
|
||
out = []
|
||
for action in (data.get("actions", {}) or {}).values():
|
||
canon = (action.get("canonical") or "").strip()
|
||
if not canon:
|
||
continue
|
||
bot = action.get("bot_phrases", {}) or {}
|
||
for p in bot.get("english", []) or []:
|
||
p = (p or "").strip()
|
||
if p:
|
||
out.append((p.lower(), canon))
|
||
for p in bot.get("arabic", []) or []:
|
||
p = (p or "").strip()
|
||
if p:
|
||
out.append((p, canon))
|
||
# Sort by needle length descending so multi-word phrases match before
|
||
# their shorter prefixes (e.g. "moving forward" before "moving").
|
||
out.sort(key=lambda x: len(x[0]), reverse=True)
|
||
return out
|
||
|
||
|
||
# Module-level vocabulary tables, all derived from instruction.json.
|
||
# Mutable (rebuildable) — VoiceModule.__init__ re-reads in case the file
|
||
# changed since import.
|
||
WAKE_WORDS: set = _build_wake_words(_INSTRUCTIONS)
|
||
COMMAND_VOCAB: list = _build_command_vocab(_INSTRUCTIONS)
|
||
_ARABIC_MOTION_TO_CANONICAL: dict = _build_arabic_motion_map(_INSTRUCTIONS)
|
||
_BOT_MOTION_PATTERNS: list = _build_bot_motion_patterns(_INSTRUCTIONS)
|
||
_PARAMETRIC_ACTIONS: list = _build_parametric_actions(_INSTRUCTIONS)
|
||
|
||
# Garbage patterns + min length stay in config_Voice.json (they're
|
||
# noise filtering, not voice instructions).
|
||
GARBAGE_PATTERNS: set = set()
|
||
_MIN_TRANSCRIPTION_LENGTH: int = 3
|
||
|
||
|
||
def _has_wake_word(text: str) -> bool:
|
||
"""True if `text` contains any wake-word variant as a whole word."""
|
||
low = text.lower()
|
||
for w in WAKE_WORDS:
|
||
if re.search(r'\b' + re.escape(w) + r'\b', low):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _strip_wake_word_once(text: str) -> str:
|
||
"""Single pass of wake-word stripping. Use via _strip_wake_word()."""
|
||
stripped = text.strip()
|
||
for w in WAKE_WORDS:
|
||
if re.fullmatch(rf'{re.escape(w)}[\s,.!?]*', stripped, re.IGNORECASE):
|
||
return ""
|
||
for w in sorted(WAKE_WORDS, key=len, reverse=True):
|
||
m = re.match(
|
||
rf'^\s*{re.escape(w)}\s*[,.!?]?\s+(.+)$',
|
||
text, re.IGNORECASE,
|
||
)
|
||
if m:
|
||
return m.group(1).strip(' ,.!?')
|
||
m = re.match(
|
||
rf'^(.+?)\s+{re.escape(w)}\s*[.!?]*\s*$',
|
||
text, re.IGNORECASE,
|
||
)
|
||
if m:
|
||
return m.group(1).strip(' ,.!?')
|
||
return text
|
||
|
||
|
||
def _strip_wake_word(text: str) -> str:
|
||
"""
|
||
Remove the wake word from the start or end of text, iteratively, so
|
||
repeated-wake transcriptions ("Sanad. Sanad.") fully collapse.
|
||
Capped at 5 passes to prevent pathological inputs from looping.
|
||
"""
|
||
for _ in range(5):
|
||
stripped = _strip_wake_word_once(text)
|
||
if stripped == text:
|
||
return text
|
||
text = stripped
|
||
return text
|
||
|
||
|
||
def _translate_arabic_motion(text: str) -> str:
|
||
"""Translate Arabic motion phrases to English canonical equivalents,
|
||
using the table built from instruction.json::actions[*].user_phrases.arabic.
|
||
Substring match; longest-needle wins so multi-word phrases match before
|
||
their shorter prefixes."""
|
||
s = text.strip()
|
||
if not s or not _ARABIC_MOTION_TO_CANONICAL:
|
||
return text
|
||
for ar in sorted(_ARABIC_MOTION_TO_CANONICAL.keys(), key=len, reverse=True):
|
||
if ar in s:
|
||
return _ARABIC_MOTION_TO_CANONICAL[ar]
|
||
return text
|
||
|
||
|
||
def _closest_command(text: str, cutoff: float = 0.72) -> str:
|
||
"""
|
||
Map a transcription to the closest known command phrase.
|
||
Returns the canonical command if there's a close-enough match, else
|
||
returns the original text unchanged.
|
||
"""
|
||
low = text.lower().strip().rstrip(".!?,")
|
||
if not low:
|
||
return text
|
||
|
||
for cmd in COMMAND_VOCAB:
|
||
if cmd in low:
|
||
return cmd
|
||
|
||
best_cmd = None
|
||
best_ratio = 0.0
|
||
for cmd in COMMAND_VOCAB:
|
||
r = SequenceMatcher(None, low, cmd).ratio()
|
||
if r > best_ratio:
|
||
best_ratio = r
|
||
best_cmd = cmd
|
||
|
||
if best_ratio >= cutoff:
|
||
return best_cmd
|
||
return text
|
||
|
||
|
||
class VoiceModule:
|
||
"""Thin orchestrator around GeminiBrain + command dispatch."""
|
||
|
||
def __init__(
|
||
self,
|
||
audio_api,
|
||
on_command: Optional[Callable] = None,
|
||
on_wake: Optional[Callable] = None,
|
||
):
|
||
self._audio = audio_api
|
||
self._on_command = on_command
|
||
self._on_wake = on_wake
|
||
|
||
self._config = load_config("Voice")
|
||
self._stt = self._config.get("stt", {})
|
||
self._messages = self._config.get("messages", {})
|
||
|
||
# Reload instruction.json so a hot-edit between runs is picked
|
||
# up without re-importing the module. All four phrase tables
|
||
# (wake_words, command_vocab, Arabic→canonical map, bot motion
|
||
# patterns) are rebuilt from instruction.json — single source of
|
||
# truth. Garbage_patterns + min_transcription_length stay in
|
||
# config_Voice.json (those are noise filtering, not voice
|
||
# instruction tables).
|
||
global WAKE_WORDS, COMMAND_VOCAB, GARBAGE_PATTERNS, \
|
||
_MIN_TRANSCRIPTION_LENGTH, _ARABIC_MOTION_TO_CANONICAL, \
|
||
_BOT_MOTION_PATTERNS, _PARAMETRIC_ACTIONS, _INSTRUCTIONS
|
||
_INSTRUCTIONS = _load_instructions()
|
||
WAKE_WORDS = _build_wake_words(_INSTRUCTIONS)
|
||
COMMAND_VOCAB = _build_command_vocab(_INSTRUCTIONS)
|
||
_ARABIC_MOTION_TO_CANONICAL = _build_arabic_motion_map(_INSTRUCTIONS)
|
||
_BOT_MOTION_PATTERNS = _build_bot_motion_patterns(_INSTRUCTIONS)
|
||
_PARAMETRIC_ACTIONS = _build_parametric_actions(_INSTRUCTIONS)
|
||
GARBAGE_PATTERNS = {p.lower() for p in self._stt.get("garbage_patterns", [])}
|
||
_MIN_TRANSCRIPTION_LENGTH = int(self._stt.get("min_transcription_length", 3))
|
||
self._vocab_cutoff = float(self._stt.get("command_vocab_cutoff", 0.72))
|
||
log.info(
|
||
"instruction.json loaded: %d wake_words, %d command_vocab, "
|
||
"%d arabic→canonical, %d bot patterns, %d parametric; "
|
||
"+ %d garbage_patterns from config_Voice.json",
|
||
len(WAKE_WORDS), len(COMMAND_VOCAB),
|
||
len(_ARABIC_MOTION_TO_CANONICAL), len(_BOT_MOTION_PATTERNS),
|
||
len(_PARAMETRIC_ACTIONS), len(GARBAGE_PATTERNS),
|
||
)
|
||
|
||
# Dispatch dedup state. Gemini's output_transcription arrives in
|
||
# many small chunks per turn — and one chunk may contain MULTIPLE
|
||
# motion phrases for compound commands ("Turning right, then
|
||
# moving forward."). Two layers of dedup:
|
||
# 1. _fired_canons_this_turn — set of canonicals already fired
|
||
# since the current Gemini turn started; cleared on turn_end.
|
||
# Same motion never fires twice within one reply.
|
||
# 2. _last_gemini_canon / _last_gemini_dispatch_at — cross-chunk
|
||
# cooldown safety net for cases where a turn_end is delayed
|
||
# and the next turn's phrase arrives before the reset.
|
||
self._fired_canons_this_turn: set = set()
|
||
self._last_gemini_canon = ""
|
||
self._last_gemini_dispatch_at = 0.0
|
||
# Wake-word gate is enforced INSIDE Gemini via the persona prompt
|
||
# (see config_Voice.json::gemini_system_prompt). Marcus does NOT
|
||
# check for "Sanad"/"سند" in Python anymore — if Gemini speaks
|
||
# a motion-confirmation phrase, that's its own decision and we
|
||
# trust it. Cleaner, single gatekeeper, no double-checking.
|
||
|
||
# Gemini brain reference for flush_mic() — populated by
|
||
# _voice_loop_gemini after spawning the runner subprocess.
|
||
self._brain = None
|
||
|
||
# Per-turn buffer for Gemini's spoken text. Gemini Live emits the
|
||
# output transcription in many small chunks ("I", "see", "a", ...);
|
||
# we accumulate them and print one clean `[Sanad] said: "..."`
|
||
# line per turn. Flushed on turn_end OR when a chunk ends with
|
||
# sentence-ending punctuation.
|
||
self._gemini_say_buf = []
|
||
self._gemini_say_lock = threading.Lock()
|
||
self._gemini_say_last_chunk_at = 0.0
|
||
|
||
# CUMULATIVE per-turn text used by the motion dispatcher. Gemini
|
||
# frequently splits a single phrase across chunks ('Turning' /
|
||
# 'right.'), and a per-chunk scan would miss 'turning right'
|
||
# because neither chunk alone contains the full needle. We
|
||
# instead append every chunk to this string and re-scan the
|
||
# WHOLE accumulated text on each new chunk; the per-turn
|
||
# _fired_canons_this_turn set prevents the same motion from
|
||
# dispatching twice. Reset on turn_end.
|
||
self._gemini_turn_text = ""
|
||
|
||
# Short-term motion memory. Every REAL command dispatched to the
|
||
# brain is appended here (the memory canonicals 'repeat last' /
|
||
# 'reverse last' are NOT — only their resolved targets). Used
|
||
# by the dispatcher to handle 'do that again' / 'undo' triggers
|
||
# locally without bothering the brain. Cap = 10; older commands
|
||
# roll off automatically. PERSISTS to disk across Marcus
|
||
# restarts AND Gemini Live session reconnects so the user can
|
||
# ask 'what did you do last hour' without losing context (Bug
|
||
# #12 in the 14-bug audit).
|
||
self._command_history: deque = deque(maxlen=10)
|
||
self._history_path = os.path.join(
|
||
PROJECT_ROOT, "logs", "command_history.json",
|
||
)
|
||
self._load_command_history()
|
||
|
||
# ─── motion worker (async dispatch) ───────────────────────
|
||
#
|
||
# The dispatcher used to call self._on_command directly on the
|
||
# IPC reader thread. That thread blocks for the duration of
|
||
# the brain's motion (a 10-step walk = ~10s of dead silence
|
||
# for Gemini chunks). We now route every command through a
|
||
# Queue + dedicated worker thread:
|
||
# - dispatcher pushes commands and returns immediately
|
||
# - worker pulls FIFO and calls self._on_command serially
|
||
# - 'stop' has priority: it sets motion_abort (interrupts
|
||
# the running brain loop) AND drains the queue (cancels
|
||
# pending motions), then a stop is enqueued so the brain
|
||
# runs its proper stop handler.
|
||
# - worker tracks self._current_motion for status queries
|
||
# and clears motion_abort at the start of each new (non-
|
||
# stop) command so a stale set doesn't kill the next one.
|
||
self._motion_queue: "queue.Queue" = queue.Queue(maxsize=20)
|
||
self._motion_worker: Optional[threading.Thread] = None
|
||
self._current_motion: Optional[str] = None
|
||
self._current_motion_at: float = 0.0
|
||
# Sentinel used to politely shut the worker down on stop().
|
||
self._MOTION_SENTINEL = object()
|
||
|
||
self._running = False
|
||
self._thread = None
|
||
|
||
log.info("VoiceModule initialized (backend=gemini)")
|
||
|
||
# ─── main loop ────────────────────────────────────────
|
||
|
||
def _voice_loop(self):
|
||
"""
|
||
Spawn the Gemini Live STT subprocess (runs in the gemini_sdk
|
||
Python 3.10+ env) and forward its transcripts into Marcus's
|
||
dispatch gate. Marcus's main process never opens the Gemini
|
||
WebSocket itself — google-genai needs Python ≥3.9 and marcus
|
||
is pinned to 3.8 by the Jetson torch wheel.
|
||
"""
|
||
api_key = (
|
||
os.environ.get("MARCUS_GEMINI_API_KEY")
|
||
or os.environ.get("SANAD_GEMINI_API_KEY")
|
||
or self._stt.get("gemini_api_key", "")
|
||
)
|
||
if not api_key:
|
||
log.error(
|
||
"No Gemini API key found. Set env MARCUS_GEMINI_API_KEY "
|
||
"or stt.gemini_api_key in Config/config_Voice.json"
|
||
)
|
||
while self._running:
|
||
time.sleep(0.5)
|
||
return
|
||
|
||
from Voice.gemini_script import GeminiBrain
|
||
|
||
# Env overrides for model + voice are passed through to the
|
||
# runner subprocess automatically (it reads the same env vars).
|
||
model = (
|
||
os.environ.get("MARCUS_GEMINI_MODEL")
|
||
or self._stt.get(
|
||
"gemini_model",
|
||
"gemini-2.5-flash-native-audio-preview-12-2025",
|
||
)
|
||
)
|
||
voice_name = (
|
||
os.environ.get("MARCUS_GEMINI_VOICE")
|
||
or self._stt.get("gemini_voice_name", "Charon")
|
||
)
|
||
# System prompt: the runner reads the same config & file paths,
|
||
# but we forward the resolved string in case marcus's config layer
|
||
# picked a fallback. Forwarded via env in GeminiBrain.start().
|
||
system_prompt = self._stt.get(
|
||
"gemini_system_prompt",
|
||
"Transcribe what the user says to Sanad. Stay silent.",
|
||
)
|
||
sp_file = self._stt.get("gemini_system_prompt_file", "")
|
||
if sp_file:
|
||
sp_path = sp_file if os.path.isabs(sp_file) else os.path.join(
|
||
PROJECT_ROOT, sp_file,
|
||
)
|
||
try:
|
||
with open(sp_path, "r", encoding="utf-8") as f:
|
||
loaded = f.read().strip()
|
||
if loaded:
|
||
system_prompt = loaded
|
||
log.info(
|
||
"gemini system prompt loaded from %s (%d chars)",
|
||
sp_path, len(loaded),
|
||
)
|
||
except Exception as e:
|
||
log.warning(
|
||
"gemini_system_prompt_file=%r unreadable: %s — "
|
||
"using inline config", sp_file, e,
|
||
)
|
||
|
||
log.info(
|
||
"Voice loop started — GEMINI S2S subprocess "
|
||
"(model=%s, voice=%s)", model, voice_name,
|
||
)
|
||
|
||
brain = GeminiBrain(
|
||
None, None, # audio_io, recorder owned by runner
|
||
voice_name=voice_name,
|
||
system_prompt=system_prompt,
|
||
api_key=api_key,
|
||
on_transcript=self._on_gemini_transcript,
|
||
# No on_command — Marcus does NOT inspect user transcripts
|
||
# for wake words anymore. Gemini's persona prompt enforces
|
||
# the wake-word gate; if Gemini speaks a motion phrase, the
|
||
# bot-side dispatcher below picks it up and fires motion.
|
||
on_bot_text=self._on_gemini_say_chunk,
|
||
on_turn_end=self._on_gemini_turn_end,
|
||
)
|
||
self._brain = brain
|
||
brain.start()
|
||
|
||
# ── Camera-frame sender ────────────────────────────────────
|
||
# Stream JPEG frames to the runner so Gemini Live can SEE what
|
||
# the robot sees. Without this, "what do you see" / "describe
|
||
# this exhibit" answers would be hallucinations. The runner
|
||
# forwards them to Gemini as image/jpeg blobs and de-stales
|
||
# anything older than gemini_frame_max_age_sec.
|
||
send_frames = bool(self._stt.get("gemini_send_frames", True))
|
||
frame_interval = float(self._stt.get("gemini_frame_interval_sec", 0.5))
|
||
frame_thread = None
|
||
frame_stop = threading.Event()
|
||
if send_frames:
|
||
try:
|
||
from API.camera_api import get_frame as _camera_get_frame
|
||
except Exception as e:
|
||
log.warning("camera_api unavailable — frame streaming disabled: %s", e)
|
||
_camera_get_frame = None
|
||
if _camera_get_frame is not None:
|
||
def _frame_sender_loop():
|
||
log.info(
|
||
"frame sender started — %.2fs interval, "
|
||
"streaming camera frames to Gemini Live",
|
||
frame_interval,
|
||
)
|
||
while not frame_stop.is_set() and self._running:
|
||
try:
|
||
frame_b64 = _camera_get_frame()
|
||
if frame_b64:
|
||
# camera_api returns a base64 ASCII string —
|
||
# GeminiBrain.send_frame accepts that directly.
|
||
brain.send_frame(frame_b64)
|
||
except Exception as e:
|
||
log.debug("frame send failed: %s", e)
|
||
frame_stop.wait(frame_interval)
|
||
log.info("frame sender stopped")
|
||
|
||
frame_thread = threading.Thread(
|
||
target=_frame_sender_loop,
|
||
daemon=True, name="gemini-frames",
|
||
)
|
||
frame_thread.start()
|
||
|
||
try:
|
||
while self._running:
|
||
time.sleep(0.25)
|
||
finally:
|
||
frame_stop.set()
|
||
if frame_thread is not None:
|
||
frame_thread.join(timeout=2)
|
||
brain.stop()
|
||
self._brain = None
|
||
|
||
# ─── dispatch side channel ────────────────────────────
|
||
|
||
def _on_gemini_transcript(self, text: str) -> None:
|
||
"""Log every user transcript to logs/transcript.log."""
|
||
if text:
|
||
_log_transcript("HEARD", text)
|
||
|
||
def _on_gemini_say_chunk(self, text: str) -> None:
|
||
"""
|
||
Receive a Gemini output-transcription chunk. Two side effects:
|
||
1. Forward to the bot dispatcher so motion can fire on
|
||
confirmation phrases (Turning right / Sitting down / etc.).
|
||
2. Buffer the chunk for the per-turn `[Sanad] said: ...` line
|
||
that prints once on turn_end (or sooner if the chunk ends
|
||
with sentence punctuation).
|
||
"""
|
||
# Motion side-channel. Dispatch on the CUMULATIVE per-turn text
|
||
# so split chunks ('Turning' + ' right.') still match the full
|
||
# 'turning right' needle. We DEFER dispatch until the cumulative
|
||
# text settles at a sentence boundary (./!/?/؟) — otherwise a
|
||
# bare canonical like 'turn right' would fire on chunk 1
|
||
# ('Turning right') before chunk 2 (' 90 degrees.') brings in
|
||
# the parametric extension, and the robot would do BOTH a small
|
||
# turn and a 90° turn. A leftover-flush on turn_end catches the
|
||
# rare case where a phrase ends with no terminal punctuation.
|
||
#
|
||
# Word-boundary fix: Gemini Live often emits chunks WITHOUT
|
||
# leading whitespace (e.g. ["Turning right,", "then", "walking"]
|
||
# → naive `+=` produces 'Turning right,thenwalking' and breaks
|
||
# every \s+ in our regexes). Insert a space whenever the previous
|
||
# buffer doesn't already end in whitespace and the new chunk
|
||
# doesn't start with one. This mirrors what _flush_gemini_say
|
||
# does via " ".join(...) for the [Sanad] said: line.
|
||
if (self._gemini_turn_text
|
||
and not self._gemini_turn_text[-1:].isspace()
|
||
and text[:1] and not text[:1].isspace()):
|
||
self._gemini_turn_text += " "
|
||
self._gemini_turn_text += text
|
||
if self._gemini_turn_text.rstrip().endswith((".", "!", "?", "؟")):
|
||
try:
|
||
self._dispatch_gemini_bot(self._gemini_turn_text)
|
||
except Exception:
|
||
pass
|
||
|
||
with self._gemini_say_lock:
|
||
self._gemini_say_buf.append(text)
|
||
self._gemini_say_last_chunk_at = time.time()
|
||
# Flush early if this chunk closes a sentence — typical for
|
||
# short acks like "Turning right." that arrive as one chunk.
|
||
if text.rstrip().endswith((".", "!", "?")):
|
||
self._flush_gemini_say_locked()
|
||
|
||
def _on_gemini_turn_end(self) -> None:
|
||
"""Flush any pending Gemini output chunks at turn boundary, and
|
||
reset the per-turn motion dedup state (fired-set + cumulative
|
||
text buffer) so the next user→Gemini exchange starts fresh.
|
||
|
||
IMPORTANT: also runs a final dispatch pass on the cumulative
|
||
text in case the last chunk ended without terminal punctuation
|
||
(rare — Gemini usually finishes with '.', but the model can
|
||
occasionally emit a turn that ends mid-clause)."""
|
||
with self._gemini_say_lock:
|
||
self._flush_gemini_say_locked()
|
||
if self._gemini_turn_text.strip():
|
||
try:
|
||
self._dispatch_gemini_bot(self._gemini_turn_text)
|
||
except Exception:
|
||
pass
|
||
self._fired_canons_this_turn.clear()
|
||
self._gemini_turn_text = ""
|
||
|
||
def _flush_gemini_say_locked(self) -> None:
|
||
"""Caller MUST hold self._gemini_say_lock. Prints one [Sanad] said: line."""
|
||
if not self._gemini_say_buf:
|
||
return
|
||
joined = " ".join(t.strip() for t in self._gemini_say_buf if t).strip()
|
||
while " " in joined:
|
||
joined = joined.replace(" ", " ")
|
||
self._gemini_say_buf = []
|
||
if joined:
|
||
_log_transcript("SAID", joined)
|
||
try:
|
||
print(f' [Sanad] said: "{joined[:200]}"')
|
||
print("Command: ", end="", flush=True)
|
||
except Exception:
|
||
pass
|
||
|
||
def flush_mic(self) -> None:
|
||
"""
|
||
Tell the Gemini runner subprocess to drop its mic buffer. Called
|
||
before AND after `audio_api.speak()` so the robot's own voice
|
||
(picked up by the mic during TtsMaker playback) doesn't come back
|
||
from Gemini as a fake user utterance.
|
||
No-op if the runner hasn't started yet.
|
||
"""
|
||
b = getattr(self, "_brain", None)
|
||
if b is None:
|
||
return
|
||
try:
|
||
b.flush_mic()
|
||
except Exception:
|
||
pass
|
||
|
||
def _normalize_command(self, text: str) -> str:
|
||
"""Fuzzy-match a transcription to the closest canonical phrase."""
|
||
canonical = _closest_command(text, cutoff=self._vocab_cutoff)
|
||
if canonical != text:
|
||
log.info("fuzzy-match: %r → %r", text, canonical)
|
||
return canonical
|
||
|
||
# _BOT_MOTION_PATTERNS is built at module load from
|
||
# Config/instruction.json::actions[*].bot_phrases (both English and
|
||
# Arabic). The dispatcher reads it via the module-level reference.
|
||
@property
|
||
def _BOT_MOTION_PATTERNS(self):
|
||
return _BOT_MOTION_PATTERNS
|
||
|
||
# _PARAMETRIC_ACTIONS holds the regex/template tuples for motion
|
||
# confirmations that carry a number ('Turning 360 degrees.',
|
||
# 'Walking 5 steps.'). Built from instruction.json::parametric_actions.
|
||
@property
|
||
def _PARAMETRIC_ACTIONS(self):
|
||
return _PARAMETRIC_ACTIONS
|
||
|
||
def _dispatch_gemini_bot(self, text: str) -> None:
|
||
"""
|
||
Dispatch motion when Gemini's spoken reply contains motion-
|
||
confirmation patterns (English or Arabic). The wake-word gate
|
||
lives INSIDE Gemini (see config_Voice.json::gemini_system_prompt) —
|
||
if Gemini speaks a motion phrase, that is its own decision after
|
||
evaluating whether the user said "Sanad"/"سند". Marcus trusts it
|
||
and dispatches the motion(s).
|
||
|
||
COMPOUND-COMMAND HANDLING: a single chunk may contain multiple
|
||
motion phrases — e.g. "Turning right, then moving forward." We
|
||
scan for ALL non-overlapping motion patterns in the chunk and
|
||
fire each unique canonical in spoken order. Three guards:
|
||
1. Longest-needle-first sort + span-claiming on the pattern
|
||
list rejects overlaps so 'moving forward' wins over the
|
||
shorter prefix 'moving' inside the same span.
|
||
2. _fired_canons_this_turn ensures each canonical fires AT
|
||
MOST ONCE per Gemini turn (compound replies often repeat
|
||
the same verb across chunks); cleared on turn_end.
|
||
3. command_cooldown_sec is a final safety net for runaway
|
||
same-canon dispatch across a turn_end gap.
|
||
"""
|
||
if not text:
|
||
return
|
||
|
||
# Strip [STATE-...] markers we previously injected ourselves —
|
||
# Gemini Live often echoes them through its output transcription
|
||
# ('[STATE-DONE] walk 1 steps') and a naive scan picks up the
|
||
# canonical name inside, re-firing the motion. The state markers
|
||
# are NOT user intent; they're our own bookkeeping coming back.
|
||
# Pattern matches the tag, the optional " (X.Ys)" duration, and
|
||
# any optional " — reason" suffix on errors. Leaves the rest of
|
||
# the chunk intact so a real user-intent phrase coexisting with
|
||
# an echoed state event still gets dispatched.
|
||
text = _STATE_ECHO_RE.sub("", text)
|
||
# Strip quoted phrases. Gemini sometimes quotes the user's phrase
|
||
# back ('Sorry, I don't understand the phrase "turn left"') and
|
||
# the dispatcher would otherwise match the embedded canonical
|
||
# and false-fire. Quoted phrases are NEVER motion confirmations.
|
||
text = _QUOTED_RE.sub(" ", text)
|
||
# Strip question clauses. A clarification like 'Do you mean turn
|
||
# left or walk a step?' contains motion phrases but is NOT a
|
||
# confirmation — Gemini is awaiting an answer, not acting.
|
||
# Without this filter, both motions would dispatch contradictorily
|
||
# while Gemini thinks the robot is idle. (Production failure at
|
||
# 09:13:57 — Gemini said 'هل تقصد أن أستدير لليسار أم أمشي خطوة
|
||
# للأمام؟' and the dispatcher fired BOTH turn_left AND walk_forward_1.)
|
||
text = _QUESTION_RE.sub(" ", text)
|
||
if not text.strip():
|
||
return
|
||
|
||
# Normalise spelled-out numbers BEFORE the lower-cased scan so
|
||
# 'turn ninety degrees' / 'أستدير تسعين درجة' get matched by
|
||
# the parametric (\d+) regex. normalise_numbers is idempotent
|
||
# and word-boundary-aware ('someone' / 'often' are unaffected).
|
||
text = normalise_numbers(text)
|
||
|
||
# Translate Arabic / dialect structural motion phrasings to
|
||
# English canonical form so the existing English parametric
|
||
# regexes can match them. Replaces 14+ dialect-specific Arabic
|
||
# regexes with one principled translator. Idempotent on
|
||
# already-English input (passes through unchanged). See
|
||
# Voice/canonical_normalizer.py for the verb/dir/unit/dual
|
||
# dictionaries and the future memory+LiDAR disambiguation hook.
|
||
text = to_canonical_english(text)
|
||
|
||
# .lower() preserves character-by-character indexing for both
|
||
# ASCII and Arabic, so positions in `low` correspond to positions
|
||
# in `text` for ordering. Arabic patterns are stored as-is and
|
||
# unaffected by lowercasing.
|
||
low = text.lower()
|
||
if not low.strip():
|
||
return
|
||
|
||
# Walk every pattern, collect all (start_index, command_string,
|
||
# label) matches that don't overlap an already-claimed span.
|
||
# Two pattern families:
|
||
# - PARAMETRIC (regex with capture groups, e.g. 'turning 90
|
||
# degrees') — scanned FIRST because they're more specific
|
||
# than the bare-canonical match 'turn right'. The dispatched
|
||
# string is the formatted command_template ('turn left 90
|
||
# degrees'), which Brain/command_parser.py parses natively.
|
||
# - FIXED CANONICALS (substring needles from bot_phrases)
|
||
# — scanned second; pre-sorted longest-first so multi-word
|
||
# phrases claim spans before shorter prefixes.
|
||
# Per-turn dedup (_fired_canons_this_turn) is keyed by the
|
||
# dispatched command string, so the same parametric command
|
||
# ('turn 90 degrees') only fires once per Gemini reply.
|
||
matches = [] # type: list[tuple[int, str, str]]
|
||
claimed_spans = [] # type: list[tuple[int, int]]
|
||
|
||
for compiled, template in self._PARAMETRIC_ACTIONS:
|
||
for m in compiled.finditer(low):
|
||
j, end = m.start(), m.end()
|
||
if any(not (end <= s or j >= e)
|
||
for (s, e) in claimed_spans):
|
||
continue
|
||
cmd_text = _format_param_template(template, m.groups())
|
||
if not cmd_text:
|
||
# Parametric matched but produced an empty command
|
||
# (e.g. 0-quantity rejection). Claim the span so the
|
||
# bare-canonical pass below doesn't double-match
|
||
# 'turning right' inside what was 'turning right 0
|
||
# degrees' — which would defeat the 0-rejection.
|
||
claimed_spans.append((j, end))
|
||
continue
|
||
matches.append((j, cmd_text, "param"))
|
||
claimed_spans.append((j, end))
|
||
|
||
for needle, cmd in self._BOT_MOTION_PATTERNS:
|
||
if not needle:
|
||
continue
|
||
i = 0
|
||
while True:
|
||
j = low.find(needle, i)
|
||
if j == -1:
|
||
break
|
||
end = j + len(needle)
|
||
if any(not (end <= s or j >= e)
|
||
for (s, e) in claimed_spans):
|
||
i = j + 1
|
||
continue
|
||
matches.append((j, cmd, "canon"))
|
||
claimed_spans.append((j, end))
|
||
i = end
|
||
|
||
if not matches:
|
||
return
|
||
|
||
# Spoken order — sort by position so we fire turn_right BEFORE
|
||
# move_forward when Gemini said "Turning right, then moving
|
||
# forward."
|
||
matches.sort(key=lambda m: m[0])
|
||
|
||
now = time.time()
|
||
cooldown = float(self._stt.get("command_cooldown_sec", 1.5))
|
||
for _, cmd, label in matches:
|
||
if cmd in self._fired_canons_this_turn:
|
||
continue
|
||
if (cmd == self._last_gemini_canon
|
||
and now - self._last_gemini_dispatch_at < cooldown):
|
||
continue
|
||
self._fired_canons_this_turn.add(cmd)
|
||
self._last_gemini_canon = cmd
|
||
self._last_gemini_dispatch_at = now
|
||
|
||
# Memory intercepts. 'repeat last' and 'reverse last' are
|
||
# synthetic canonicals — they NEVER reach the brain. Resolve
|
||
# them against _command_history and dispatch the resolved
|
||
# command instead. Empty history → log and skip silently
|
||
# (no-op rather than an awkward error). Non-reversible
|
||
# commands → log and skip.
|
||
resolved = cmd
|
||
if cmd == "repeat last":
|
||
if not self._command_history:
|
||
log.info("memory: 'repeat last' but history is empty — skip")
|
||
continue
|
||
resolved = self._command_history[-1]
|
||
log.info("memory: repeat → %r", resolved)
|
||
elif cmd == "reverse last":
|
||
if not self._command_history:
|
||
log.info("memory: 'reverse last' but history is empty — skip")
|
||
continue
|
||
last = self._command_history[-1]
|
||
resolved = _reverse_command(last)
|
||
if resolved is None:
|
||
log.info("memory: %r is not reversible — skip", last)
|
||
continue
|
||
log.info("memory: reverse %r → %r", last, resolved)
|
||
|
||
# Push the dispatched (resolved) command to history. This
|
||
# means 'again' followed by 'again' replays the original
|
||
# motion; 'turn right' followed by 'undo' followed by 'undo'
|
||
# toggles right/left. Consistent with what users intuit.
|
||
# Persisted to disk so it survives reconnects and restarts.
|
||
self._command_history.append(resolved)
|
||
self._save_command_history()
|
||
|
||
log.info(
|
||
"dispatch (gemini-bot, %s): %s (heard: %r)",
|
||
label, resolved, text[:80],
|
||
)
|
||
_log_transcript("CMD-BOT", resolved)
|
||
|
||
# Pause/resume intercepts. These take effect IMMEDIATELY —
|
||
# not after the queue drains — because the user expects
|
||
# 'pause' to halt mid-walk right now, not after the brain
|
||
# finishes its current command. Setting motion_pause makes
|
||
# all interrupt-aware brain loops hold at zero velocity
|
||
# (see wait_while_paused in Core/motion_state.py). Resume
|
||
# clears the event so the running loop continues. We DO
|
||
# NOT enqueue these — the worker never sees them, the
|
||
# brain doesn't know about them, no [STATE-...] event is
|
||
# emitted (motion is still running, just paused).
|
||
if resolved == "pause motion":
|
||
motion_pause.set()
|
||
# Tag the state event with the motion that's actually
|
||
# being held (current_motion), not the literal 'pause
|
||
# motion' canonical — Gemini Rule 9 reads this to
|
||
# answer 'are you still moving?' honestly. Bug #13.
|
||
held = self._current_motion or "no motion in flight"
|
||
log.info("memory: motion_pause SET — holding %r", held)
|
||
try:
|
||
if (self._brain is not None
|
||
and self._stt.get("gemini_send_state_events", False)):
|
||
self._brain.send_state("paused", held)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
if resolved == "resume motion":
|
||
was_paused = motion_pause.is_set()
|
||
if was_paused:
|
||
motion_pause.clear()
|
||
held = self._current_motion or "no motion in flight"
|
||
log.info("memory: motion_pause CLEARED — resuming %r", held)
|
||
else:
|
||
held = "(no pause was active)"
|
||
log.info("memory: resume requested but nothing paused")
|
||
try:
|
||
if (self._brain is not None
|
||
and self._stt.get("gemini_send_state_events", False)):
|
||
self._brain.send_state("resumed", held)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
|
||
# ─── sequence-control intercepts ────────────────────
|
||
# These never reach the brain. They mutate the in-process
|
||
# Sequences singleton and (for play) push commands onto
|
||
# this same dispatcher path again.
|
||
if resolved == "start recording":
|
||
# Try to extract a pre-name from Gemini's text:
|
||
# 'recording your sequence as my-greet'.
|
||
preset_name = _extract_sequence_name(text, mode="as")
|
||
seqs = get_sequences()
|
||
rs = seqs.start_recording(name=preset_name)
|
||
log.info("sequence: start_recording(%r) -> %s",
|
||
preset_name, rs)
|
||
continue
|
||
if resolved == "save sequence":
|
||
# Name MUST come from the bot phrase ('saved as <name>').
|
||
save_name = _extract_sequence_name(text, mode="as")
|
||
seqs = get_sequences()
|
||
rs = seqs.save_recording(name=save_name)
|
||
log.info("sequence: save_recording(%r) -> %s",
|
||
save_name, rs)
|
||
if not rs.get("ok"):
|
||
try:
|
||
if (self._brain is not None
|
||
and self._stt.get("gemini_send_state_events", False)):
|
||
self._brain.send_state(
|
||
"error", "save sequence",
|
||
reason=rs.get("reason") or "save failed",
|
||
)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
if resolved == "cancel recording":
|
||
seqs = get_sequences()
|
||
rs = seqs.cancel_recording()
|
||
log.info("sequence: cancel_recording -> %s", rs)
|
||
continue
|
||
if resolved == "play sequence":
|
||
play_name = _extract_sequence_name(text, mode="play")
|
||
seqs = get_sequences()
|
||
cmds = seqs.play(play_name) if play_name else None
|
||
if cmds is None:
|
||
# Two failure modes worth distinguishing for the
|
||
# operator AND for Gemini (which uses send_state
|
||
# to give honest answers when the user later asks
|
||
# 'did you do it?'). Surface both as state errors.
|
||
if not play_name:
|
||
reason = "no sequence name in your phrase"
|
||
else:
|
||
reason = "no sequence saved as %r" % play_name
|
||
log.warning("sequence: play(%r) failed — %s",
|
||
play_name, reason)
|
||
try:
|
||
if (self._brain is not None
|
||
and self._stt.get("gemini_send_state_events", False)):
|
||
self._brain.send_state(
|
||
"error", "play sequence",
|
||
reason=reason,
|
||
)
|
||
except Exception:
|
||
pass
|
||
else:
|
||
log.info("sequence: play(%r) — enqueueing %d commands",
|
||
play_name, len(cmds))
|
||
for sub in cmds:
|
||
self._enqueue_motion(sub)
|
||
continue
|
||
|
||
# Append to recording buffer if a session is active. We do
|
||
# this BEFORE enqueueing so a control flag flip mid-run
|
||
# doesn't change the captured macro shape. record_command
|
||
# silently skips control canonicals (stop / pause / etc.).
|
||
try:
|
||
get_sequences().record_command(resolved)
|
||
except Exception:
|
||
pass
|
||
|
||
# Enqueue for the motion worker — does NOT block here.
|
||
# Worker calls self._on_command(resolved, "en") when it
|
||
# picks the command up. 'stop' sets motion_abort + drains
|
||
# the queue inside _enqueue_motion before requeueing.
|
||
self._enqueue_motion(resolved)
|
||
|
||
# ─── command history persistence ──────────────────────
|
||
|
||
def _load_command_history(self) -> None:
|
||
"""Restore _command_history from disk on startup. Best-effort —
|
||
any read/parse error leaves history empty without crashing."""
|
||
try:
|
||
if not os.path.isfile(self._history_path):
|
||
return
|
||
with open(self._history_path, "r", encoding="utf-8") as f:
|
||
payload = _json.load(f)
|
||
cmds = payload.get("commands", [])
|
||
if isinstance(cmds, list):
|
||
for c in cmds[-10:]:
|
||
if isinstance(c, str) and c.strip():
|
||
self._command_history.append(c.strip())
|
||
log.info(
|
||
"command history restored: %d entries from %s",
|
||
len(self._command_history), self._history_path,
|
||
)
|
||
except Exception as e:
|
||
log.warning("could not load command history: %s", e)
|
||
|
||
def _save_command_history(self) -> None:
|
||
"""Write _command_history to disk after every dispatch.
|
||
Atomically (write to .tmp + rename) so a crash mid-write doesn't
|
||
corrupt the file."""
|
||
try:
|
||
os.makedirs(os.path.dirname(self._history_path), exist_ok=True)
|
||
tmp = self._history_path + ".tmp"
|
||
with open(tmp, "w", encoding="utf-8") as f:
|
||
_json.dump(
|
||
{"commands": list(self._command_history),
|
||
"saved_at": time.time()},
|
||
f, indent=2, ensure_ascii=False,
|
||
)
|
||
os.replace(tmp, self._history_path)
|
||
except Exception:
|
||
# Persistence failure is non-fatal — history still works
|
||
# in-memory for the current process lifetime.
|
||
pass
|
||
|
||
# ─── motion worker ────────────────────────────────────
|
||
|
||
def _motion_worker_loop(self) -> None:
|
||
"""
|
||
Pull commands FIFO from self._motion_queue and run them on the
|
||
brain serially. Clears motion_abort at the start of each new
|
||
(non-stop) command so a stale set from a previous interrupt
|
||
doesn't kill the next motion.
|
||
|
||
Also notifies Gemini via send_state(...) at start / complete /
|
||
interrupted / error so the model can give honest answers when
|
||
the user asks 'what are you doing?'.
|
||
|
||
A brain exception is logged and the worker keeps running;
|
||
the queue must drain even if one motion crashes.
|
||
"""
|
||
while self._running:
|
||
try:
|
||
item = self._motion_queue.get()
|
||
except Exception:
|
||
continue
|
||
if item is self._MOTION_SENTINEL:
|
||
log.info("motion worker received sentinel — exiting")
|
||
return
|
||
cmd = item
|
||
if not cmd:
|
||
continue
|
||
|
||
is_stop = (cmd.strip().lower() == "stop")
|
||
# For non-stop commands, clear any leftover abort flag from
|
||
# a previous interrupt so the brain doesn't immediately
|
||
# break out of the new motion's loop. Also clear any stale
|
||
# pause — a fresh command should not start in a paused
|
||
# state just because the previous command was paused at
|
||
# the moment it was aborted/stopped.
|
||
if not is_stop:
|
||
motion_abort.clear()
|
||
motion_pause.clear()
|
||
|
||
self._current_motion = cmd
|
||
self._current_motion_at = time.time()
|
||
|
||
# Tell Gemini what's starting (best effort — runner may not
|
||
# be ready yet, e.g. first command before connection).
|
||
try:
|
||
if self._brain is not None and self._stt.get(
|
||
"gemini_send_state_events", False):
|
||
self._brain.send_state("start", cmd)
|
||
except Exception:
|
||
pass
|
||
|
||
# Structured per-motion log — one line per START with the
|
||
# parsed direction/magnitude/unit. Pairs with the END line
|
||
# below to give a complete audit of what each command
|
||
# actually did. See logs/motion.log.
|
||
try:
|
||
_log_motion("start", cmd, source="voice")
|
||
except Exception:
|
||
pass
|
||
|
||
# Watchdog: if a brain motion call exceeds the configured
|
||
# max RUNNING duration, fire motion_abort so its polling
|
||
# loops break out (gradual_stop runs as part of the exit
|
||
# path). 'Running' excludes time spent in motion_pause —
|
||
# a user who deliberately holds for 90s shouldn't trip the
|
||
# watchdog. We use a poller thread that ticks 100ms only
|
||
# while motion_pause is clear; pause-aware. Skip for
|
||
# commands listed in motion_watchdog_skip_canonicals
|
||
# (e.g. 'patrol' has its own 5-minute outer duration).
|
||
# 0 disables.
|
||
wd_sec = float(self._stt.get("motion_watchdog_sec", 0.0))
|
||
wd_skip = set(self._stt.get(
|
||
"motion_watchdog_skip_canonicals", []
|
||
))
|
||
wd_canon = cmd.strip().lower().split()[0] if cmd.strip() else ""
|
||
wd_active = (wd_sec > 0 and wd_canon and
|
||
wd_canon not in wd_skip and not is_stop)
|
||
wd_stop_evt = threading.Event()
|
||
|
||
def _watchdog_loop(_cmd=cmd, _budget=wd_sec, _stop=wd_stop_evt):
|
||
# Tick every 100ms while motion is RUNNING (not paused
|
||
# and not aborted). When elapsed running-time exceeds
|
||
# budget, fire motion_abort. Exits when _stop is set
|
||
# by the worker (motion completed normally).
|
||
ticks_at_100ms = 0
|
||
budget_ticks = int(_budget / 0.1)
|
||
while not _stop.is_set():
|
||
if motion_abort.is_set():
|
||
return # worker / external abort already fired
|
||
if not motion_pause.is_set():
|
||
ticks_at_100ms += 1
|
||
if ticks_at_100ms >= budget_ticks:
|
||
log.warning(
|
||
"watchdog: %r exceeded %.1fs running time "
|
||
"— firing motion_abort", _cmd, _budget,
|
||
)
|
||
motion_abort.set()
|
||
return
|
||
_stop.wait(0.1)
|
||
|
||
wd_thread = None
|
||
if wd_active:
|
||
wd_thread = threading.Thread(
|
||
target=_watchdog_loop, daemon=True,
|
||
name="motion-watchdog",
|
||
)
|
||
wd_thread.start()
|
||
|
||
t0 = time.time()
|
||
had_error = False
|
||
err_reason = ""
|
||
try:
|
||
if self._on_command:
|
||
self._on_command(cmd, "en")
|
||
except Exception as e:
|
||
had_error = True
|
||
err_reason = str(e)
|
||
log.error("worker on_command(%r) failed: %s", cmd, e, exc_info=True)
|
||
finally:
|
||
if wd_thread is not None:
|
||
wd_stop_evt.set()
|
||
# Best-effort join — the watchdog wakes on its
|
||
# 100ms tick, so this returns within ~100ms.
|
||
wd_thread.join(timeout=0.3)
|
||
elapsed = time.time() - t0
|
||
|
||
# Determine outcome. Stop itself never reports interrupted
|
||
# (it IS the interrupt). For other commands, an abort flag
|
||
# set at any point during execution means we were halted.
|
||
if had_error:
|
||
outcome = "error"
|
||
elif is_stop:
|
||
outcome = "complete"
|
||
elif motion_abort.is_set():
|
||
outcome = "interrupted"
|
||
else:
|
||
outcome = "complete"
|
||
|
||
try:
|
||
if (self._brain is not None
|
||
and self._stt.get("gemini_send_state_events", False)):
|
||
if outcome == "error":
|
||
self._brain.send_state(
|
||
"error", cmd, elapsed_sec=elapsed, reason=err_reason,
|
||
)
|
||
else:
|
||
self._brain.send_state(
|
||
outcome, cmd, elapsed_sec=elapsed,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
log.info(
|
||
"motion worker: %s [%s] in %.2fs", cmd, outcome, elapsed,
|
||
)
|
||
|
||
# Structured per-motion END log — pairs with the START line
|
||
# above. Captures actual elapsed and outcome alongside the
|
||
# auto-parsed direction/magnitude/unit.
|
||
try:
|
||
_log_motion(
|
||
outcome, cmd, source="voice",
|
||
actual_sec=elapsed,
|
||
**({"reason": err_reason} if had_error else {}),
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
self._current_motion = None
|
||
self._current_motion_at = 0.0
|
||
|
||
def _enqueue_motion(self, cmd: str) -> None:
|
||
"""
|
||
Queue a command for the motion worker. 'stop' has priority:
|
||
it sets motion_abort (interrupting the in-flight brain loop)
|
||
AND drains pending commands so they don't fire after the user
|
||
explicitly asked to halt. Non-stop commands are simply enqueued.
|
||
"""
|
||
if not cmd:
|
||
return
|
||
if cmd.strip().lower() == "stop":
|
||
# 1. Cut the in-flight motion immediately. Brain motion
|
||
# loops poll motion_abort.is_set() and break early.
|
||
# Also clear any pause — otherwise wait_while_paused
|
||
# would keep holding the brain inside its loop and the
|
||
# abort check wouldn't fire until pause clears.
|
||
motion_pause.clear()
|
||
motion_abort.set()
|
||
# 2. Drain pending — anything queued behind a stop is
|
||
# cancelled. We log the count for ops visibility.
|
||
drained = 0
|
||
try:
|
||
while True:
|
||
self._motion_queue.get_nowait()
|
||
drained += 1
|
||
except queue.Empty:
|
||
pass
|
||
if drained:
|
||
log.info("stop drained %d pending motion(s)", drained)
|
||
# 3. Enqueue the stop itself so brain.process_command runs
|
||
# its proper stop handler (gradual_stop, neutral pose).
|
||
try:
|
||
self._motion_queue.put_nowait("stop")
|
||
except queue.Full:
|
||
log.warning("motion queue full — stop dropped")
|
||
return
|
||
|
||
try:
|
||
self._motion_queue.put_nowait(cmd)
|
||
except queue.Full:
|
||
log.warning("motion queue full — dropping %r", cmd)
|
||
|
||
@property
|
||
def current_motion(self) -> Optional[str]:
|
||
"""The canonical command currently being executed by the brain,
|
||
or None if idle. Worker sets it before calling on_command and
|
||
clears it when the brain returns. Safe to read from any thread."""
|
||
return self._current_motion
|
||
|
||
# ─── start / stop ─────────────────────────────────────
|
||
|
||
def start(self):
|
||
if self._running:
|
||
log.warning("VoiceModule already running")
|
||
return
|
||
self._running = True
|
||
# Spawn the motion worker FIRST so any early dispatch (e.g.
|
||
# from a quick wake-on-startup) has somewhere to go.
|
||
self._motion_worker = threading.Thread(
|
||
target=self._motion_worker_loop,
|
||
daemon=True, name="motion-worker",
|
||
)
|
||
self._motion_worker.start()
|
||
self._thread = threading.Thread(
|
||
target=self._voice_loop, daemon=True, name="voice",
|
||
)
|
||
self._thread.start()
|
||
log.info("Voice module started")
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
# Wake the worker so it can see _running=False and exit.
|
||
try:
|
||
self._motion_queue.put_nowait(self._MOTION_SENTINEL)
|
||
except Exception:
|
||
pass
|
||
if self._motion_worker:
|
||
self._motion_worker.join(timeout=3)
|
||
self._motion_worker = None
|
||
if self._thread:
|
||
self._thread.join(timeout=5)
|
||
self._thread = None
|
||
log.info("Voice module stopped")
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
"""True while the voice loop thread is alive."""
|
||
t = self._thread
|
||
return bool(self._running and t is not None and t.is_alive())
|
||
|
||
@property
|
||
def is_speaking(self) -> bool:
|
||
"""Delegates to AudioAPI — True while TtsMaker is playing."""
|
||
try:
|
||
return bool(self._audio.is_speaking)
|
||
except Exception:
|
||
return False
|