Sanadv3/voice/movement_dispatch.py

391 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""MovementDispatcher — Gemini voice → locomotion (N2 Phase 3).
The Marcus phrase-confirmation pattern, ported to Sanad. Gemini Live runs in a
child subprocess; the parent supervisor (gemini/subprocess.py) parses Gemini's
OWN spoken output into BOT: transcript lines and fires `on_bot_text(line)` here.
Flow:
Gemini speaks a canonical confirmation phrase ("Turning right." / "أستدير
يميناً.") → on_bot_text() matches it against data/motions/instruction.json
→ enqueues a canonical command → a worker thread drives loco_controller
(discrete steps that self-terminate with StopMove).
Gating: every dispatch is gated on `recognition_state.movement_enabled` (the
"Enable Gemini movement" dashboard toggle) — SEPARATE from the manual
"Enable movement" arm flag (loco_controller._armed). When the flag is off,
matches are dropped. "stop" is honoured immediately (cross-thread abort + drain
+ E-STOP) regardless of cooldown.
Safety: discrete `loco.step()` self-stops; velocity caps live in LocoController;
N-step / N-degree commands are bounded by instruction.json (max_steps,
max_degrees) and check the abort flag + enable gate between each step. Numbers
are kept verbatim but the degrees→steps and steps mapping is APPROXIMATE and
must be calibrated on the real robot.
"""
from __future__ import annotations
import json
import queue
import re
import threading
import time
from pathlib import Path
from typing import Any, List, Optional
from Project.Sanad.core.logger import get_logger
log = get_logger("movement_dispatch")
_SENTINEL = object()
_STATE_CACHE_TTL = 0.5 # seconds — re-read recognition_state at most this often
# Map a resolved canonical command to a LocoController discrete-step direction.
_FIXED_STEP = {
"move forward": "forward",
"move backward": "backward",
"turn right": "rotate_right",
"turn left": "rotate_left",
"slide left": "slide_left",
"slide right": "slide_right",
}
# ── transcript cleaning (Marcus pattern) ──────────────────────────────────────
# Gemini's spoken text can CONTAIN our trigger phrases without intending a
# command — inside a question ("do you want me to move forward?"), a negation
# ("I'm not turning right"), a hypothetical ("I would be turning right"), a
# quote, or an echoed [STATE] tag. We drop those whole clauses before matching so
# only genuine confirmations actuate the robot.
_BRACKET_RE = re.compile(r"\[[^\]]*\]") # [STATE-DONE] echoes
_QUOTE_RE = re.compile(r"[\"'«»“”„‟‹›][^\"'«»“”„‟‹›]{0,80}?[\"'«»“”„‟‹›]")
_SENT_SPLIT_RE = re.compile(r"([.!?؟؛\n]+)") # keep delimiters
# NOTE: Arabic tokens are whitespace-delimited so we don't match a negation
# substring inside a real word — e.g. "ما" lives inside "أمام" (forward), "لا"
# inside many words. \b doesn't help for Arabic (all letters are \w), so we
# anchor on spaces/string-edges explicitly.
_NEG_RE = re.compile(
r"\b(?:not|never|without|cannot|would|could|should|might|instead|"
r"going to|want to|trying to|rather than)\b|\w+n[']t\b|"
r"(?:^|\s)(?:لا|ما|لن|لم|مش|بدون|غير|لست|ليس|بدل)(?:\s|$)")
class MovementDispatcher:
def __init__(self, loco, instruction_path: Path, state_path: Path):
self._loco = loco
self._instruction_path = Path(instruction_path)
self._state_path = Path(state_path)
self._queue: "queue.Queue[Any]" = queue.Queue(maxsize=32)
self._abort = threading.Event()
self._worker: Optional[threading.Thread] = None
self._running = False
# Hard-stop latch set by a dashboard E-STOP. Drops all voice commands
# until cleared (by re-enabling Gemini movement). Kept SEPARATE from the
# movement_enabled file flag so an E-STOP doesn't trigger the spoken
# "movement disabled" announcement.
self._estop = False
# dedup / cooldown
self._last_canon = ""
self._last_at = 0.0
# cached enable-flag
self._enabled_cached = False
self._enabled_at = 0.0
# config (filled by _load)
self._cooldown = 1.5
self._max_steps = 8
self._max_degrees = 360
self._deg_per_step = 15
self._fixed_patterns: List[tuple] = [] # (needle_lower, canonical) sorted long→short
self._parametric: List[tuple] = [] # (compiled_regex, template)
self._load()
# ── instruction.json ─────────────────────────────────────────────────────
def _load(self):
try:
data = json.loads(self._instruction_path.read_text(encoding="utf-8"))
except Exception as exc:
log.error("could not load %s: %s — dispatcher inert", self._instruction_path, exc)
data = {}
self._cooldown = float(data.get("command_cooldown_sec", 1.5))
self._max_steps = int(data.get("max_steps", 8))
self._max_degrees = int(data.get("max_degrees", 360))
self._deg_per_step = max(1, int(data.get("degrees_per_step", 15)))
needles: List[tuple] = []
for spec in (data.get("actions") or {}).values():
canonical = spec.get("canonical", "")
phrases = spec.get("bot_phrases", {}) or {}
for lang_list in phrases.values():
for p in lang_list:
if p:
# English folded to lower; Arabic unaffected by .lower()
needles.append((p.lower(), canonical))
# longest needle first so "walking forward" wins over "forward"
needles.sort(key=lambda t: len(t[0]), reverse=True)
self._fixed_patterns = needles
self._parametric = []
for pa in (data.get("parametric_actions") or []):
try:
self._parametric.append((re.compile(pa["regex"], re.IGNORECASE), pa["canonical"]))
except re.error as exc:
log.warning("bad parametric regex %r: %s", pa.get("regex"), exc)
log.info("instruction.json loaded: %d fixed phrases, %d parametric, cooldown=%.1fs",
len(self._fixed_patterns), len(self._parametric), self._cooldown)
# ── lifecycle ─────────────────────────────────────────────────────────────
def start(self):
if self._running:
return
self._running = True
self._worker = threading.Thread(target=self._worker_loop, daemon=True,
name="movement-dispatch")
self._worker.start()
log.info("movement dispatcher started")
def stop(self):
self._running = False
self._abort.set()
try:
self._queue.put_nowait(_SENTINEL)
except queue.Full:
pass
def status(self) -> dict:
return {
"running": self._running,
"movement_enabled": self._movement_enabled(force=True),
"estopped": self._estop,
"queue_depth": self._queue.qsize(),
"fixed_phrases": len(self._fixed_patterns),
"parametric": len(self._parametric),
}
# ── E-STOP latch ──────────────────────────────────────────────────────────
def emergency_stop(self):
"""Latch off after a dashboard E-STOP: abort the in-flight command, drain
the queue, and refuse new commands until clear_estop(). Does NOT touch the
movement_enabled file flag (so the Gemini child stays quiet)."""
self._estop = True
self._abort.set()
self._drain()
log.warning("movement dispatch E-STOP latch set")
def clear_estop(self):
self._estop = False
def is_estopped(self) -> bool:
return self._estop
# ── enable gate ───────────────────────────────────────────────────────────
def _movement_enabled(self, force: bool = False) -> bool:
now = time.monotonic()
if not force and (now - self._enabled_at) < _STATE_CACHE_TTL:
return self._enabled_cached
try:
from Project.Sanad.vision import recognition_state
self._enabled_cached = bool(recognition_state.read(self._state_path).movement_enabled)
except Exception:
self._enabled_cached = False
self._enabled_at = now
return self._enabled_cached
# ── transcript hook (called from the supervisor reader thread) ────────────
def on_bot_text(self, text: str):
if not text or not self._running or self._estop:
return
if not self._movement_enabled():
return
cmds = self._match(text)
if not cmds:
return
now = time.monotonic() # monotonic — immune to NTP/wall-clock jumps
for c in cmds:
if c == "stop":
# Safety: preempt anything in flight immediately, then E-STOP.
self._abort.set()
self._drain()
self._enqueue("stop")
self._last_canon = "stop"
self._last_at = now
continue
# cross-turn cooldown: same canonical not re-fired too soon
if c == self._last_canon and (now - self._last_at) < self._cooldown:
continue
self._last_canon = c
self._last_at = now
self._enqueue(c)
def _enqueue(self, cmd: str):
try:
self._queue.put_nowait(cmd)
except queue.Full:
log.warning("motion queue full — dropping %r", cmd)
def _drain(self):
try:
while True:
self._queue.get_nowait()
except queue.Empty:
pass
# ── matcher ───────────────────────────────────────────────────────────────
def _clean(self, text: str) -> str:
"""Drop clauses that are NOT genuine motion confirmations: bracketed
[STATE] echoes, quoted spans, questions, and negation/hypothetical
sentences. Only the surviving clauses are matched."""
t = _BRACKET_RE.sub(" ", text)
t = _QUOTE_RE.sub(" ", t)
parts = _SENT_SPLIT_RE.split(t)
kept: List[str] = []
i = 0
while i < len(parts):
seg = parts[i].strip()
delim = parts[i + 1] if i + 1 < len(parts) else ""
is_question = ("?" in delim) or ("؟" in delim)
if seg and not is_question and not _NEG_RE.search(seg.lower()):
kept.append(seg)
i += 2
return " . ".join(kept)
def _match(self, text: str) -> List[str]:
"""Return canonical commands in spoken order. Parametric (with numbers)
claim their spans first so a bare phrase doesn't double-fire."""
low = self._clean(text).lower()
matches: List[tuple] = [] # (start, canonical)
claimed: List[tuple] = [] # (start, end) spans already taken
def overlaps(s, e):
return any(s < ce and cs < e for cs, ce in claimed)
# 1) parametric first. Claim the span even when the quantity is zero so a
# mis-heard "0 steps" suppresses the bare phrase underneath (no surprise
# motion) rather than falling through to a single step.
for rx, template in self._parametric:
for m in rx.finditer(low):
if overlaps(*m.span()):
continue
claimed.append(m.span())
canonical = self._format(template, m.groups())
if canonical:
matches.append((m.start(), canonical))
# 2) fixed phrases (longest first), skipping claimed spans
for needle, canonical in self._fixed_patterns:
start = 0
while True:
j = low.find(needle, start)
if j < 0:
break
end = j + len(needle)
if not overlaps(j, end):
matches.append((j, canonical))
claimed.append((j, end))
start = end
matches.sort(key=lambda t: t[0])
# de-dup consecutive repeats within this single line
out: List[str] = []
for _, c in matches:
if not out or out[-1] != c:
out.append(c)
return out
@staticmethod
def _format(template: str, groups) -> str:
out = template
for i, g in enumerate(groups, start=1):
out = out.replace(f"${i}", str(g))
# reject zero-quantity motions ("walk 0 steps")
nums = re.findall(r"\d+", out)
if nums and all(int(n) == 0 for n in nums):
return ""
return out
# ── worker ────────────────────────────────────────────────────────────────
def _worker_loop(self):
while self._running:
cmd = self._queue.get()
if cmd is _SENTINEL:
return
if cmd != "stop":
self._abort.clear()
if self._estop:
continue # E-STOP latched — drop everything
# force a fresh read — don't let the 0.5s cache execute a command
# after the operator just toggled movement off.
if cmd != "stop" and not self._movement_enabled(force=True):
continue # toggled off while queued — drop
# Never step while Nav2 owns the legs (autonomous goal in progress).
# Two stacks driving at once is the exact hazard _arbiter guards.
# STOP always passes through (safety). Read-only check — manual loco
# uses acquire_loco; the discrete-step voice path must only YIELD.
if cmd != "stop" and self._nav_holds_legs():
log.info("voice movement dropped — Nav2 owns the legs (%r)", cmd)
continue
try:
self._execute(cmd)
except Exception:
log.exception("execute %r failed", cmd)
def _execute(self, canonical: str):
c = canonical.lower().strip()
if c == "stop":
log.info("voice → STOP")
self._loco.estop()
return
m = re.match(r"walk (forward|backward) (\d+) steps?$", c)
if m:
direction = "forward" if m.group(1) == "forward" else "backward"
n = min(int(m.group(2)), self._max_steps)
log.info("voice → walk %s %d steps", direction, n)
self._repeat_step(direction, n)
return
m = re.match(r"turn (right|left) (\d+) degrees?$", c)
if m:
direction = "rotate_right" if m.group(1) == "right" else "rotate_left"
deg = min(int(m.group(2)), self._max_degrees)
n = max(1, round(deg / self._deg_per_step))
log.info("voice → turn %s %d° (~%d steps)", m.group(1), deg, n)
self._repeat_step(direction, n)
return
direction = _FIXED_STEP.get(c)
if direction:
log.info("voice → %s", c)
self._loco.step(direction)
return
log.debug("no loco mapping for canonical %r", c)
@staticmethod
def _nav_holds_legs() -> bool:
"""True if Nav2 currently owns the legs (in-process arbiter). Lazy
import so a missing/absent dashboard package never breaks voice."""
try:
from Project.Sanad.dashboard.routes import _arbiter
return _arbiter.nav_active()
except Exception:
return False
def _repeat_step(self, direction: str, n: int):
for _ in range(max(1, n)):
if (self._abort.is_set() or self._estop
or not self._movement_enabled(force=True)
or self._nav_holds_legs()):
log.info("voice multi-step aborted")
break
self._loco.step(direction)