Sanadv3/voice/sanad_voice.py

462 lines
20 KiB
Python

#!/usr/bin/env python3
"""Sanad voice subprocess — orchestrator.
Wires three independently-swappable pieces together:
1. Audio I/O — voice/audio_io.py (mic + speaker)
2. Turn recorder — TurnRecorder (in this file; model-agnostic WAV capture)
3. Voice brain — gemini/script.py (Gemini, default — cloud)
local/script.py (offline — Whisper+Qwen+CosyVoice2)
voice/model_script.py (template for new models)
Runtime selection:
SANAD_AUDIO_PROFILE = builtin | anker | hollyland_builtin (default builtin)
SANAD_VOICE_BRAIN = gemini | local | model (default gemini)
Usage:
python3 voice/sanad_voice.py eth0
python3 voice/sanad_voice.py eth0 --voice Charon
SANAD_AUDIO_PROFILE=anker SANAD_VOICE_BRAIN=gemini \\
python3 voice/sanad_voice.py eth0
System prompt priority (first hit wins):
1. scripts/sanad_script.txt (edit-live via the dashboard)
2. config/core_config.json > gemini_defaults.default_system_prompt
3. the hardcoded fallback in _load_system_prompt() below
"""
from __future__ import annotations
import array
import asyncio
import importlib
import json
import logging
import os
import sys
import tempfile
import threading
import time
import types
import wave
from datetime import datetime
from pathlib import Path
# ─────────────────────────────────────────────────────────────────────────────
# Layout bootstrap — MUST run before any `Project.Sanad.*` import.
# This file runs as a standalone subprocess (spawned by gemini/subprocess.py
# or local/subprocess.py); it can't rely on main.py having set up sys.path.
# Mirrors the dev-vs-deployed detection in main.py.
# dev layout: <root>/Project/Sanad/voice/sanad_voice.py
# deployed layout: /home/unitree/Sanad/voice/sanad_voice.py
# ─────────────────────────────────────────────────────────────────────────────
_SANAD_DIR = Path(__file__).resolve().parent.parent # .../Sanad
_SANAD_PARENT = _SANAD_DIR.parent # .../Project OR /home/unitree
if _SANAD_PARENT.name == "Project":
_ROOT = _SANAD_PARENT.parent
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
else:
if str(_SANAD_PARENT) not in sys.path:
sys.path.insert(0, str(_SANAD_PARENT))
if "Project" not in sys.modules:
_proj = types.ModuleType("Project")
_proj.__path__ = [] # namespace package marker
sys.modules["Project"] = _proj
if "Project.Sanad" not in sys.modules:
_sanad = importlib.import_module(_SANAD_DIR.name)
sys.modules["Project.Sanad"] = _sanad
sys.modules["Project"].Sanad = _sanad # type: ignore[attr-defined]
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
from unitree_sdk2py.g1.audio.g1_audio_client import AudioClient
from Project.Sanad.config import (
GEMINI_VOICE,
RECEIVE_SAMPLE_RATE,
SCRIPTS_DIR,
SEND_SAMPLE_RATE,
)
from Project.Sanad.core.config_loader import section as _cfg_section
from Project.Sanad.voice.audio_io import AudioIO
# ─── LOGGING ─────────────────────────────────────────────
_LOG_CFG = _cfg_section("voice", "sanad_voice")
LOG_DIR = os.path.expanduser(_LOG_CFG.get("log_dir", "~/logs"))
os.makedirs(LOG_DIR, exist_ok=True)
_LOG_NAME = _LOG_CFG.get("log_name", "gemini_live_v2")
LOG_FILE = os.path.join(LOG_DIR, f"{_LOG_NAME}_{datetime.now():%Y%m%d}.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(),
],
)
log = logging.getLogger("sanad_voice")
# ─── CONFIG ──────────────────────────────────────────────
_REC = _cfg_section("voice", "recording")
_SCRIPTS = _cfg_section("core", "script_files")
_GEMINI_DEFAULTS = _cfg_section("core", "gemini_defaults")
_PERSONA_FILE = SCRIPTS_DIR / _SCRIPTS.get("persona", "sanad_script.txt")
RECORD_ENABLED = os.environ.get(
"SANAD_RECORD",
"1" if _REC.get("enabled", True) else "0",
) != "0"
_REC_DIR_REL = _REC.get("dir_relative", "data/recordings")
RECORD_DIR = Path(os.environ.get(
"SANAD_RECORD_DIR",
str(Path(__file__).resolve().parent.parent / _REC_DIR_REL),
))
_FALLBACK_SYSTEM_PROMPT = (
"You are Marcus, a bilingual humanoid robot assistant made by YS Lootah "
"Technology, Dubai, UAE. RESPOND IN ARABIC (Gulf/Emirati dialect) OR "
"ENGLISH ONLY. YOU MUST RESPOND UNMISTAKABLY IN THE SAME LANGUAGE THE "
"USER SPEAKS. If the user speaks Arabic, you MUST reply in Arabic Gulf "
"dialect. If the user speaks English, you MUST reply in English. Do NOT "
"confuse Arabic with Japanese, Hindi, Russian, or any other language. "
"The user is speaking Arabic or English — nothing else. Be concise — 1 "
"to 2 sentences max. Be friendly and natural. If the user interrupts "
"and says 'continue' or 'كمل', resume EXACTLY where you stopped. Only "
"respond to clear human speech. Ignore background noise and silence "
"completely. Do not respond to sounds that are not words."
)
# N2 Phase 3 — movement confirmation-phrase rules. The parent's
# MovementDispatcher drives the robot off Gemini's OWN spoken phrases, so Gemini
# must say one of these EXACT short phrases (per motion) when it agrees to move.
# Kept in lock-step with data/motions/instruction.json. Always appended; Gemini
# is told at runtime ([MOVEMENT ON/OFF/STATUS]) whether movement is enabled and
# must only confirm motion when it is ON.
_MOVEMENT_PROMPT_RULES = (
"\n\n--- MOVEMENT (walking) ---\n"
"You can make the robot walk ONLY when you are told movement is ON "
"(you receive a [MOVEMENT ON] or [MOVEMENT STATUS] note). When movement is "
"OFF, never confirm a motion — tell the user to enable movement from the "
"dashboard.\n"
"When movement is ON and the user addresses you by name (Bousandah / بوسنده) AND "
"asks you to move, reply with ONE short confirmation phrase per requested "
"motion, in the SAME language, in the order asked. Use these EXACT shapes — "
"they are what triggers the motion:\n"
" forward : 'Walking forward.' / 'أمشي للأمام.'\n"
" backward : 'Walking back.' / 'أمشي للخلف.'\n"
" turn right : 'Turning right.' / 'أستدير يميناً.'\n"
" turn left : 'Turning left.' / 'أستدير يساراً.'\n"
" slide left : 'Sliding left.' / 'أنزلق لليسار.'\n"
" slide right : 'Sliding right.' / 'أنزلق لليمين.'\n"
" stop : 'Stopping.' / 'أتوقف.'\n"
"With a NUMBER, keep it: 'Walking forward 3 steps.' / 'أمشي للأمام 3 خطوات.' "
"'Turning right 90 degrees.' / 'أستدير يميناً 90 درجة.'\n"
"STOP is safety-first: if the user clearly wants the robot to halt "
"(stop/halt/wait/توقف/استنى), confirm 'Stopping.' / 'أتوقف.' immediately, "
"even without your name.\n"
"Never emit bracketed tags like [STATE-DONE] or numbers in parentheses — "
"speak only plain prose. Never include 0 or a negative quantity; if you "
"mis-hear a 0, drop the number and say the bare motion."
)
# Native function-calling: Gemini can DRIVE the robot to saved map places via
# the navigate_to_place / list_places / where_am_i / stop_navigation tools. The
# tool schemas are declared in the Live config; this block tells Gemini WHEN and
# HOW to use them, and the safety constraints. Appended only when nav tools are
# enabled (SANAD_NAV_TOOLS != 0).
_NAV_TOOLS_ENABLED = os.environ.get("SANAD_NAV_TOOLS", "1") != "0"
_NAV_PROMPT_RULES = (
"\n\n--- NAVIGATION (autonomous driving to places) ---\n"
"You can autonomously DRIVE the robot to a saved place on the loaded map "
"using your tools. This is different from step-by-step walking above.\n"
"- When the user asks to GO/MOVE/TAKE them to a named place (e.g. 'go to "
"the kitchen', 'خذني للاستقبال'), call the navigate_to_place tool with the "
"place name. Do NOT say the walking phrases above for this — the tool does "
"the driving.\n"
"- You can only drive to places that exist in the CURRENTLY loaded map. If "
"you are unsure which places exist, call list_places first and offer them.\n"
"- If the tool returns reason 'no_map', tell the user to load a map first. "
"If 'movement_off', tell them to enable movement. If 'ambiguous' or "
"'unknown_place', read back the candidate names and ask which one.\n"
"- After a successful navigate_to_place, briefly say you're heading there — "
"but do NOT claim you have arrived. You will receive a [NAV ARRIVED] note "
"when you actually arrive (then tell the user), or [NAV FAILED] if you "
"could not reach it (then apologise and say why).\n"
"- To stop an in-progress drive, call stop_navigation.\n"
"Keep all of this in your normal Khaleeji style."
)
def _load_system_prompt() -> str:
"""scripts/<persona file> → config default → hardcoded fallback, with the
movement confirmation-phrase rules appended (N2 Phase 3).
A missing persona file used to be silent — that hid a config-vs-filename
mismatch (e.g. `persona: "sanad_v2"` while only `sanad_script.txt`
existed) which made the robot fall back to the English default that
introduces itself as "Sanad" instead of using the Arabic persona on
disk. We now WARN so the same trap doesn't bite again.
The persona is resolved HERE (at session start), not at import — so the
operator's Scripts Manager selection (a sanad_script_v*.txt variant) is
picked up on the next voice (re)connect. Falls back to sanad_script.txt."""
# Resolve the selected persona variant (or the default sanad_script.txt).
try:
from Project.Sanad.core.persona import active_persona_path
persona_file = active_persona_path()
except Exception:
persona_file = _PERSONA_FILE
base = None
try:
text = persona_file.read_text(encoding="utf-8-sig").strip()
if text:
base = text
log.info("persona loaded: %s", persona_file.name)
except FileNotFoundError:
log.warning(
"Persona file not found at %s — falling back to "
"config.core.gemini_defaults.default_system_prompt. "
"Check `script_files.persona` in config/core_config.json "
"matches an actual file under scripts/.", persona_file,
)
except (OSError, UnicodeDecodeError) as exc:
# An existing-but-unreadable persona file (bad encoding, permissions, a
# directory) must NOT crash the voice child — fall back to the default.
log.warning("Persona file at %s could not be read (%s) — "
"falling back to default system prompt.", persona_file, exc)
if base is None:
base = _GEMINI_DEFAULTS.get("default_system_prompt", _FALLBACK_SYSTEM_PROMPT)
prompt = base + _MOVEMENT_PROMPT_RULES
if _NAV_TOOLS_ENABLED:
prompt += _NAV_PROMPT_RULES
return prompt
def _audio_energy(pcm: bytes) -> int:
try:
samples = array.array("h", pcm)
return sum(abs(s) for s in samples) // len(samples) if samples else 0
except Exception:
return 0
# ─── TURN RECORDER ──────────────────────────────────────
class TurnRecorder:
"""Saves each turn as two WAV files: user mic + model output.
A turn starts when user audio starts flowing through `capture_user`
and ends on `finish_turn`. Files land in `RECORD_DIR` as
`<timestamp>_user.wav` (at `user_rate`) and `<timestamp>_robot.wav`
(at `robot_rate`). An `index.json` in the same directory tracks
every turn with timestamp + transcripts + durations for the dashboard.
"""
def __init__(self, enabled: bool = True, out_dir: Path = RECORD_DIR,
user_rate: int = SEND_SAMPLE_RATE,
robot_rate: int = RECEIVE_SAMPLE_RATE):
self.enabled = enabled
self.out_dir = out_dir
self.user_rate = user_rate
self.robot_rate = robot_rate
if self.enabled:
self.out_dir.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
self._user_buf: list[bytes] = []
self._robot_buf: list[bytes] = []
self._user_text = ""
self._robot_text = ""
self._started_at: float = 0.0
def capture_user(self, pcm: bytes) -> None:
if not self.enabled or not pcm:
return
with self._lock:
if not self._user_buf and not self._robot_buf:
self._started_at = time.time()
self._user_buf.append(pcm)
def capture_robot(self, pcm: bytes) -> None:
if not self.enabled or not pcm:
return
with self._lock:
if not self._user_buf and not self._robot_buf:
self._started_at = time.time()
self._robot_buf.append(pcm)
def add_user_text(self, text: str) -> None:
if text and self.enabled:
with self._lock:
self._user_text = (self._user_text + " " + text).strip()
def add_robot_text(self, text: str) -> None:
if text and self.enabled:
with self._lock:
self._robot_text = (self._robot_text + " " + text).strip()
def finish_turn(self) -> dict:
if not self.enabled:
return {}
with self._lock:
user_data = b"".join(self._user_buf)
robot_data = b"".join(self._robot_buf)
user_text = self._user_text
robot_text = self._robot_text
started_at = self._started_at
self._user_buf.clear()
self._robot_buf.clear()
self._user_text = ""
self._robot_text = ""
if not user_data and not robot_data:
return {}
stamp = datetime.fromtimestamp(started_at).strftime("%Y%m%d_%H%M%S")
entry = {"timestamp": stamp, "started_at": started_at,
"user_text": user_text, "robot_text": robot_text}
try:
if user_data:
p = self.out_dir / f"{stamp}_user.wav"
self._save_wav(p, user_data, self.user_rate)
entry["user_wav"] = str(p)
entry["user_duration_sec"] = round(
len(user_data) / (self.user_rate * 2), 3)
if robot_data:
p = self.out_dir / f"{stamp}_robot.wav"
self._save_wav(p, robot_data, self.robot_rate)
entry["robot_wav"] = str(p)
entry["robot_duration_sec"] = round(
len(robot_data) / (self.robot_rate * 2), 3)
self._append_index(entry)
log.info("recorded turn → %s (user %.1fs, robot %.1fs)",
stamp,
entry.get("user_duration_sec", 0),
entry.get("robot_duration_sec", 0))
except Exception as exc:
log.warning("recording save failed: %s", exc)
return entry
@staticmethod
def _save_wav(path: Path, pcm: bytes, rate: int) -> None:
with wave.open(str(path), "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(rate)
wf.writeframes(pcm)
def _append_index(self, entry: dict) -> None:
idx_path = self.out_dir / "index.json"
try:
if idx_path.exists():
payload = json.loads(idx_path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
payload = {"records": []}
else:
payload = {"records": []}
except Exception:
payload = {"records": []}
payload.setdefault("records", []).append(entry)
payload["total_records"] = len(payload["records"])
# Atomic write (tempfile + os.replace) — an in-place write_text that is
# interrupted (the start_all.sh supervisor Ctrl+C-teardowns this voice
# child) can truncate index.json, so the next read falls back to an
# empty {"records": []} and silently drops all prior turn metadata.
# Mirrors voice/typed_replay._save_index.
fd, tmp = tempfile.mkstemp(dir=str(idx_path.parent), suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
os.replace(tmp, idx_path)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
# ─── BRAIN FACTORY ───────────────────────────────────────
def _build_brain(name: str, audio_io, recorder, voice: str, system_prompt: str):
name = (name or "").strip().lower()
if name in ("", "gemini"):
from Project.Sanad.gemini.script import GeminiBrain
return GeminiBrain(audio_io, recorder, voice, system_prompt)
if name == "local":
from Project.Sanad.local.script import LocalBrain
return LocalBrain(audio_io, recorder, voice, system_prompt)
if name == "model":
from Project.Sanad.voice.model_script import ModelBrain
return ModelBrain(audio_io, recorder, voice, system_prompt)
# To add a provider: import the module and return its brain class here.
raise ValueError(f"unknown voice brain: {name!r}")
# ─── MAIN ────────────────────────────────────────────────
def main() -> None:
if len(sys.argv) < 2:
print(__doc__)
sys.exit(1)
iface = sys.argv[1]
voice = GEMINI_VOICE
if "--voice" in sys.argv:
voice = sys.argv[sys.argv.index("--voice") + 1]
log.info("DDS on %s", iface)
ChannelFactoryInitialize(0, iface)
ac = AudioClient()
ac.SetTimeout(10.0)
ac.Init()
log.info("AudioClient ready")
profile = os.environ.get("SANAD_AUDIO_PROFILE", "builtin")
audio = AudioIO.from_profile(profile, audio_client=ac)
audio.start()
log.info("audio profile=%s", audio.profile_id)
# Sanity-check the mic before handing it to the brain
log.info("testing mic 2s...")
time.sleep(2)
test = audio.mic.read_chunk(1024)
e = _audio_energy(test)
log.info("mic energy=%d %s", e, "OK" if e > 0 else "SILENT")
recorder = TurnRecorder(enabled=RECORD_ENABLED)
if RECORD_ENABLED:
log.info("recording enabled → %s", RECORD_DIR)
system_prompt = _load_system_prompt()
brain_name = os.environ.get("SANAD_VOICE_BRAIN", "gemini")
brain = _build_brain(brain_name, audio, recorder, voice, system_prompt)
log.info("voice brain=%s voice=%s log=%s", brain_name, voice, LOG_FILE)
log.info("" * 50)
try:
asyncio.run(brain.run())
except KeyboardInterrupt:
pass
except Exception as exc:
log.error("fatal: %s", exc)
finally:
log.info("stopping")
try:
brain.stop()
except Exception:
log.warning("brain.stop() failed", exc_info=True)
audio.stop()
log.info("stopped")
if __name__ == "__main__":
main()