Sanad/config.py

448 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Centralized configuration for the Sanad robot assistant.
Resolution order for BASE_DIR (highest priority first):
1. SANAD_PROJECT_ROOT environment variable
2. PROJECT_BASE + PROJECT_NAME from .env file (or env vars)
3. Path(__file__).resolve().parent.parent (auto-detected from this file's location)
Every other directory is derived from BASE_DIR — never hardcode an absolute path.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
def _read_env_file(env_path: Path) -> dict[str, str]:
"""Minimal .env reader (no python-dotenv dependency)."""
out: dict[str, str] = {}
if not env_path.exists():
return out
try:
for raw in env_path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip().strip('"').strip("'")
except OSError:
pass
return out
def _resolve_base_dir() -> Path:
"""Resolve the Sanad project root with override support."""
# 1. Direct env override
override = os.environ.get("SANAD_PROJECT_ROOT", "").strip()
if override:
p = Path(override).expanduser().resolve()
if p.exists():
return p
# 2. PROJECT_BASE + PROJECT_NAME pattern
_here = Path(__file__).resolve().parent # Sanad/
env_files = [
_here / ".env", # Sanad/.env
_here.parent / ".env", # Project/.env
]
for env_path in env_files:
env = _read_env_file(env_path)
base = env.get("PROJECT_BASE") or os.environ.get("PROJECT_BASE", "")
name = env.get("PROJECT_NAME") or os.environ.get("PROJECT_NAME", "")
if base and name:
candidate = Path(base).expanduser().resolve() / name
if candidate.exists():
return candidate
# 3. Auto-detect — this file lives at Sanad/config.py, so parent = Sanad/
return _here
BASE_DIR = _resolve_base_dir()
DATA_DIR = BASE_DIR / "data"
LOGS_DIR = BASE_DIR / "logs"
SCRIPTS_DIR = BASE_DIR / "scripts"
MODEL_DIR = BASE_DIR / "model"
# Audio recordings (typed-replay, etc.) live under data/audio
AUDIO_RECORDINGS_DIR = DATA_DIR / "audio"
# Motion macro recordings (paired with audio) live under data/recordings/motion
MOTION_RECORDINGS_DIR = DATA_DIR / "recordings" / "motion"
# Motion JSONL macros (auto-discovered as actions)
MOTIONS_DIR = DATA_DIR / "motions"
SKILLS_FILE = MOTIONS_DIR / "skills.json"
CONFIG_FILE = MOTIONS_DIR / "config.json"
# ─── Load baseline defaults from config/core_config.json ───
# Single source of truth. Runtime overrides via:
# 1. env vars (SANAD_GEMINI_API_KEY, SANAD_GEMINI_MODEL, ...)
# 2. data/motions/config.json (dashboard-editable — see load_config())
# 3. config/core_config.json (this file)
def _load_core_config() -> dict[str, Any]:
cfg_path = BASE_DIR / "config" / "core_config.json"
if not cfg_path.exists():
return {}
try:
raw = json.loads(cfg_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {}
# Strip _comment / _description noise
return {k: v for k, v in raw.items() if not k.startswith("_")}
_CORE_CFG = _load_core_config()
_GEMINI = _CORE_CFG.get("gemini_defaults", {})
_AUDIO = _CORE_CFG.get("audio_defaults", {})
# -- Gemini defaults (override via data/motions/config.json or env) --
GEMINI_API_KEY = os.environ.get(
"SANAD_GEMINI_API_KEY",
_GEMINI.get("api_key", ""))
GEMINI_MODEL = os.environ.get(
"SANAD_GEMINI_MODEL",
"models/" + _GEMINI.get("model_live", "gemini-2.5-flash-native-audio-preview-12-2025"))
GEMINI_VOICE = os.environ.get(
"SANAD_GEMINI_VOICE",
_GEMINI.get("voice_name", "Charon"))
GEMINI_WS_URI = _GEMINI.get(
"model_ws_uri",
"wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent")
GEMINI_WS_TIMEOUT = _GEMINI.get("ws_timeout_sec", 30)
# -- Audio defaults --
SEND_SAMPLE_RATE = _AUDIO.get("send_sample_rate", 16000)
RECEIVE_SAMPLE_RATE = _AUDIO.get("receive_sample_rate", 24000)
CHUNK_SIZE = _AUDIO.get("chunk_size", 512)
CHANNELS = _AUDIO.get("channels", 1)
# -- PulseAudio hardware IDs --
SINK = _AUDIO.get("sink", "alsa_output.usb-Anker_PowerConf_A3321-DEV-SN1-01.analog-stereo")
SOURCE = _AUDIO.get("source", "alsa_input.usb-Anker_PowerConf_A3321-DEV-SN1-01.mono-fallback")
MONITOR_SOURCE = f"{SINK}.monitor"
# -- Dashboard --
# Default: bind to wlan0's IP (auto-detected at startup) so the dashboard is
# reachable on the wireless network. Falls back to 0.0.0.0 (all interfaces)
# if wlan0 isn't present.
#
# Resolution order (highest priority first):
# 1. SANAD_DASHBOARD_HOST env var (explicit IP or hostname)
# 2. SANAD_DASHBOARD_INTERFACE env var → that interface's IP
# 3. wlan0 interface IP (default)
# 4. 0.0.0.0 (bind to all)
#
# Override via --host CLI flag too.
DASHBOARD_INTERFACE = os.environ.get("SANAD_DASHBOARD_INTERFACE", "wlan0")
def _get_interface_ip(iface: str) -> str | None:
"""Return the IPv4 address bound to `iface`, or None if not present.
Tries multiple strategies in order — different Linux setups expose
interface info via different mechanisms.
"""
# Strategy 1: fcntl SIOCGIFADDR (fastest, no subprocess)
ip = _get_iface_ip_fcntl(iface)
if ip:
return ip
# Strategy 2: parse `ip -4 -o addr show <iface>` (works on Ubuntu/Jetson)
ip = _get_iface_ip_via_ip_cmd(iface)
if ip:
return ip
# Strategy 3: parse `/proc/net/fib_trie` (last resort)
ip = _get_iface_ip_via_proc(iface)
if ip:
return ip
return None
def _get_iface_ip_fcntl(iface: str) -> str | None:
try:
import fcntl
import socket
import struct
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
ifname = iface[:15].encode("utf-8")
packed = fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack("256s", ifname),
)
return socket.inet_ntoa(packed[20:24])
finally:
s.close()
except Exception:
return None
def _get_iface_ip_via_ip_cmd(iface: str) -> str | None:
try:
import subprocess
r = subprocess.run(
["ip", "-4", "-o", "addr", "show", iface],
capture_output=True, text=True, timeout=2.0,
)
if r.returncode != 0:
return None
# Output: "5: wlan0 inet 10.255.254.86/24 brd ..."
for line in r.stdout.splitlines():
parts = line.split()
for i, p in enumerate(parts):
if p == "inet" and i + 1 < len(parts):
return parts[i + 1].split("/")[0]
except Exception:
return None
return None
def _get_iface_ip_via_proc(iface: str) -> str | None:
"""Fallback: scrape /proc/net/fib_trie for an IP advertised on this iface.
Less reliable than fcntl/ip cmd but doesn't need any external tooling.
"""
try:
import subprocess
# Try `hostname -I` as a final fallback (returns space-separated IPs)
r = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=1.0)
if r.returncode == 0:
ips = (r.stdout or "").strip().split()
# Return first non-loopback IPv4
for ip in ips:
if "." in ip and not ip.startswith("127."):
return ip
except Exception:
return None
return None
def list_network_interfaces() -> list[dict]:
"""Return [{name, ip, is_up}] for every interface on the box.
Used by the dashboard's system-info panel.
"""
out: list[dict] = []
try:
import socket
for idx, name in socket.if_nameindex():
ip = _get_interface_ip(name)
out.append({
"name": name,
"index": idx,
"ip": ip or "",
"is_up": ip is not None,
})
except Exception:
pass
return out
def _resolve_dashboard_host() -> str:
"""Resolve the host the dashboard should bind to.
Order:
1. SANAD_DASHBOARD_HOST env var (explicit IP/hostname)
2. SANAD_DASHBOARD_INTERFACE → that interface's IP
3. wlan0's IP (default)
4. First non-loopback IP from `hostname -I`
5. 0.0.0.0 (bind everywhere)
"""
explicit = os.environ.get("SANAD_DASHBOARD_HOST", "").strip()
if explicit:
return explicit
iface_ip = _get_interface_ip(DASHBOARD_INTERFACE)
if iface_ip:
return iface_ip
# Try `hostname -I` as a final non-loopback fallback
try:
import subprocess
r = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=1.0)
if r.returncode == 0:
for ip in (r.stdout or "").strip().split():
if "." in ip and not ip.startswith("127."):
return ip
except Exception:
pass
return "0.0.0.0"
DASHBOARD_HOST = _resolve_dashboard_host()
DASHBOARD_PORT = 8000
# -- Local TTS --
LOCAL_TTS_MODEL = "MBZUAI/speecht5_tts_clartts_ar"
LOCAL_TTS_MODEL_PATH = str(MODEL_DIR / "speecht5_tts_clartts_ar")
LOCAL_TTS_HIFIGAN_PATH = str(MODEL_DIR / "speecht5_hifigan")
LOCAL_TTS_XVECTOR_PATH = str(MODEL_DIR / "arabic_xvector_embedding.pt")
# -- Motion --
_G1 = _CORE_CFG.get("g1_hardware", {})
REPLAY_HZ = _G1.get("replay_hz", 60.0)
G1_NUM_MOTOR = _G1.get("num_motor", 29)
ENABLE_ARM_SDK_INDEX = _G1.get("enable_arm_sdk_index", 29)
KP_HIGH = 300.0
KD_HIGH = 3.0
KP_LOW = 80.0
KD_LOW = 3.0
KP_WRIST = 40.0
KD_WRIST = 1.5
WEAK_MOTORS = {4, 10, 15, 16, 17, 18, 22, 23, 24, 25}
WRIST_MOTORS = {19, 20, 21, 26, 27, 28}
# -- Live Gemini subprocess tuning --
LIVE_TUNE: dict[str, str] = {
"SANAD_REQUIRED_LOUD_CHUNKS": "5",
"SANAD_PREBUFFER_CHUNKS": "3",
"SANAD_PLAYBACK_TIMEOUT": "0.25",
"SANAD_BARGE_IN_COOLDOWN": "1.0",
"SANAD_AI_SPEAK_GRACE": "0.5",
# ECHO_GUARD_SEC suppresses USER SAID log lines for this many seconds
# after the robot finishes a chunk. Previously 1.2 — caused a visible
# lag where "robot finished talking" was followed by silence in the
# log even though Gemini was transcribing the user's new speech
# immediately. Lowered to 0.3 to match typical room reverb tail; the
# real echo protection is the silence-during-speaking gate, not this.
"SANAD_ECHO_GUARD_SEC": "0.3",
"SANAD_SPEAKING_ENERGY_GATE": "0.90",
"SANAD_CALIBRATION_CHUNKS": "30",
"SANAD_THRESHOLD_MULTIPLIER": "4.0",
# Base barge-in threshold calibrated at the REFERENCE volume (50%).
# At runtime, scaled QUADRATICALLY with actual G1 volume:
# scale = (actual_vol / ref_vol) ** 2
#
# Physical reason: doubling digital speaker volume doubles sample
# amplitude, which means RECEIVED energy at the mic quadruples
# (energy ~ amplitude²). Linear scaling under-threshold echo at
# high volumes → caused "robot listening to himself" feedback.
#
# Measured on Hollyland + G1 speaker at 100% volume:
# echo peak (no user) up to ~15700
# voice peak (user) 25000-32000+ (often saturates 32767)
# Safe threshold at 100% vol: ~18000, above echo / below voice.
#
# Working back with quadratic scale: base × (100/50)² = 18000
# base × 4 = 18000 → base = 4500 at 50% ref volume.
"SANAD_MIN_THRESHOLD": "800",
"SANAD_PLAYBACK_BARGE_MIN": "2500",
"SANAD_PLAYBACK_BARGE_MULT": "1.5",
# Sustained-chunk requirement for barge-in. Balance:
# higher = fewer false triggers from echo bursts
# lower = quicker response to short commands ("stop", "توقف")
# Default 5 = ~160ms sustained voice. Real speech reliably
# sustains that long; single-chunk echo spikes don't.
"SANAD_PLAYBACK_REQUIRED_CHUNKS": "2",
"SANAD_SILENCE_AFTER_SPEECH": "1.2",
"SANAD_SPEECH_THRESHOLD": "300",
"SANAD_DDS_INTERFACE": os.environ.get("SANAD_DDS_INTERFACE", "eth0"),
# G1 built-in mic — UDP multicast 239.168.123.161:5555.
# Requires wake-up conversation mode ON in Unitree app.
"SANAD_USE_G1_MIC": "1",
}
# -- Camera --
CAMERA_SERVICE_PORT = 8091
DIRECT_CAMERA_URL = f"http://127.0.0.1:{CAMERA_SERVICE_PORT}"
# -- DDS / hardware --
# Jetson G1 default is eth0 (the robot's internal network).
# Override with SANAD_DDS_INTERFACE=lo for desktop/sim development.
DDS_NETWORK_INTERFACE = os.environ.get("SANAD_DDS_INTERFACE", "eth0")
def _ensure_dirs() -> list[str]:
"""Create runtime directories. Failures are collected, not raised.
Returns the list of directories that failed to create — caller can decide
whether to log/abort. The module import never crashes due to a single
permission error on a single directory.
"""
failed: list[str] = []
for d in (DATA_DIR, LOGS_DIR, SCRIPTS_DIR, AUDIO_RECORDINGS_DIR,
MOTION_RECORDINGS_DIR, MOTIONS_DIR):
try:
d.mkdir(parents=True, exist_ok=True)
except OSError:
failed.append(str(d))
return failed
# Best-effort: create dirs at import. Ignore failures here — individual
# subsystems will handle missing dirs at usage time and isolation prevents
# cascading import failures.
_DIRS_FAILED = _ensure_dirs()
def load_config() -> dict[str, Any]:
"""Load runtime config overrides from CONFIG_FILE (if present)."""
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_config(cfg: dict[str, Any]):
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
import os, tempfile
fd, tmp = tempfile.mkstemp(
prefix=f".{CONFIG_FILE.name}.", suffix=".tmp",
dir=str(CONFIG_FILE.parent),
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(cfg, f, ensure_ascii=False, indent=2)
os.replace(tmp, CONFIG_FILE)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
# Apply config.json overrides on top of module constants (was previously dead code).
def _apply_overrides():
cfg = load_config()
if not cfg:
return
g = globals()
gemini = cfg.get("gemini", {})
if isinstance(gemini, dict):
if "api_key" in gemini and gemini["api_key"]:
g["GEMINI_API_KEY"] = gemini["api_key"]
if "model" in gemini:
g["GEMINI_MODEL"] = gemini["model"]
if "voice" in gemini:
g["GEMINI_VOICE"] = gemini["voice"]
audio = cfg.get("audio", {})
if isinstance(audio, dict):
if "send_sample_rate" in audio:
g["SEND_SAMPLE_RATE"] = int(audio["send_sample_rate"])
if "receive_sample_rate" in audio:
g["RECEIVE_SAMPLE_RATE"] = int(audio["receive_sample_rate"])
if "chunk_size" in audio:
g["CHUNK_SIZE"] = int(audio["chunk_size"])
if "sink" in audio:
g["SINK"] = audio["sink"]
if "source" in audio:
g["SOURCE"] = audio["source"]
dashboard = cfg.get("dashboard", {})
if isinstance(dashboard, dict):
if "host" in dashboard:
g["DASHBOARD_HOST"] = dashboard["host"]
if "port" in dashboard:
g["DASHBOARD_PORT"] = int(dashboard["port"])
try:
_apply_overrides()
except Exception:
# Never let a malformed config.json kill module import.
pass