"""Centralized configuration for the Sanad robot assistant. Resolution order for BASE_DIR (highest priority first): 1. SANAD_PROJECT_ROOT environment variable 2. PROJECT_BASE + PROJECT_NAME from .env file (or env vars) 3. Path(__file__).resolve().parent.parent (auto-detected from this file's location) Every other directory is derived from BASE_DIR — never hardcode an absolute path. """ from __future__ import annotations import json import os from pathlib import Path from typing import Any def _read_env_file(env_path: Path) -> dict[str, str]: """Minimal .env reader (no python-dotenv dependency).""" out: dict[str, str] = {} if not env_path.exists(): return out try: for raw in env_path.read_text(encoding="utf-8").splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) out[k.strip()] = v.strip().strip('"').strip("'") except OSError: pass return out def _resolve_base_dir() -> Path: """Resolve the Sanad project root with override support.""" # 1. Direct env override override = os.environ.get("SANAD_PROJECT_ROOT", "").strip() if override: p = Path(override).expanduser().resolve() if p.exists(): return p # 2. PROJECT_BASE + PROJECT_NAME pattern _here = Path(__file__).resolve().parent # Sanad/ env_files = [ _here / ".env", # Sanad/.env _here.parent / ".env", # Project/.env ] for env_path in env_files: env = _read_env_file(env_path) base = env.get("PROJECT_BASE") or os.environ.get("PROJECT_BASE", "") name = env.get("PROJECT_NAME") or os.environ.get("PROJECT_NAME", "") if base and name: candidate = Path(base).expanduser().resolve() / name if candidate.exists(): return candidate # 3. Auto-detect — this file lives at Sanad/config.py, so parent = Sanad/ return _here BASE_DIR = _resolve_base_dir() DATA_DIR = BASE_DIR / "data" LOGS_DIR = BASE_DIR / "logs" SCRIPTS_DIR = BASE_DIR / "scripts" MODEL_DIR = BASE_DIR / "model" # Audio recordings (typed-replay, etc.) live under data/audio AUDIO_RECORDINGS_DIR = DATA_DIR / "audio" # Motion macro recordings (paired with audio) live under data/recordings/motion MOTION_RECORDINGS_DIR = DATA_DIR / "recordings" / "motion" # Motion JSONL macros (auto-discovered as actions) MOTIONS_DIR = DATA_DIR / "motions" SKILLS_FILE = MOTIONS_DIR / "skills.json" CONFIG_FILE = MOTIONS_DIR / "config.json" # ─── Load baseline defaults from config/core_config.json ─── # Single source of truth. Runtime overrides via: # 1. env vars (SANAD_GEMINI_API_KEY, SANAD_GEMINI_MODEL, ...) # 2. data/motions/config.json (dashboard-editable — see load_config()) # 3. config/core_config.json (this file) def _load_core_config() -> dict[str, Any]: cfg_path = BASE_DIR / "config" / "core_config.json" if not cfg_path.exists(): return {} try: raw = json.loads(cfg_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return {} # Strip _comment / _description noise return {k: v for k, v in raw.items() if not k.startswith("_")} _CORE_CFG = _load_core_config() _GEMINI = _CORE_CFG.get("gemini_defaults", {}) _AUDIO = _CORE_CFG.get("audio_defaults", {}) # -- Gemini defaults (override via data/motions/config.json or env) -- GEMINI_API_KEY = os.environ.get( "SANAD_GEMINI_API_KEY", _GEMINI.get("api_key", "")) GEMINI_MODEL = os.environ.get( "SANAD_GEMINI_MODEL", "models/" + _GEMINI.get("model_live", "gemini-2.5-flash-native-audio-preview-12-2025")) GEMINI_VOICE = os.environ.get( "SANAD_GEMINI_VOICE", _GEMINI.get("voice_name", "Charon")) GEMINI_WS_URI = _GEMINI.get( "model_ws_uri", "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent") GEMINI_WS_TIMEOUT = _GEMINI.get("ws_timeout_sec", 30) # -- Audio defaults -- SEND_SAMPLE_RATE = _AUDIO.get("send_sample_rate", 16000) RECEIVE_SAMPLE_RATE = _AUDIO.get("receive_sample_rate", 24000) CHUNK_SIZE = _AUDIO.get("chunk_size", 512) CHANNELS = _AUDIO.get("channels", 1) # -- PulseAudio hardware IDs -- SINK = _AUDIO.get("sink", "alsa_output.usb-Anker_PowerConf_A3321-DEV-SN1-01.analog-stereo") SOURCE = _AUDIO.get("source", "alsa_input.usb-Anker_PowerConf_A3321-DEV-SN1-01.mono-fallback") MONITOR_SOURCE = f"{SINK}.monitor" # -- Dashboard -- # Default: bind to wlan0's IP (auto-detected at startup) so the dashboard is # reachable on the wireless network. Falls back to 0.0.0.0 (all interfaces) # if wlan0 isn't present. # # Resolution order (highest priority first): # 1. SANAD_DASHBOARD_HOST env var (explicit IP or hostname) # 2. SANAD_DASHBOARD_INTERFACE env var → that interface's IP # 3. wlan0 interface IP (default) # 4. 0.0.0.0 (bind to all) # # Override via --host CLI flag too. DASHBOARD_INTERFACE = os.environ.get("SANAD_DASHBOARD_INTERFACE", "wlan0") def _get_interface_ip(iface: str) -> str | None: """Return the IPv4 address bound to `iface`, or None if not present. Tries multiple strategies in order — different Linux setups expose interface info via different mechanisms. """ # Strategy 1: fcntl SIOCGIFADDR (fastest, no subprocess) ip = _get_iface_ip_fcntl(iface) if ip: return ip # Strategy 2: parse `ip -4 -o addr show ` (works on Ubuntu/Jetson) ip = _get_iface_ip_via_ip_cmd(iface) if ip: return ip # Strategy 3: parse `/proc/net/fib_trie` (last resort) ip = _get_iface_ip_via_proc(iface) if ip: return ip return None def _get_iface_ip_fcntl(iface: str) -> str | None: try: import fcntl import socket import struct s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: ifname = iface[:15].encode("utf-8") packed = fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack("256s", ifname), ) return socket.inet_ntoa(packed[20:24]) finally: s.close() except Exception: return None def _get_iface_ip_via_ip_cmd(iface: str) -> str | None: try: import subprocess r = subprocess.run( ["ip", "-4", "-o", "addr", "show", iface], capture_output=True, text=True, timeout=2.0, ) if r.returncode != 0: return None # Output: "5: wlan0 inet 10.255.254.86/24 brd ..." for line in r.stdout.splitlines(): parts = line.split() for i, p in enumerate(parts): if p == "inet" and i + 1 < len(parts): return parts[i + 1].split("/")[0] except Exception: return None return None def _get_iface_ip_via_proc(iface: str) -> str | None: """Fallback: scrape /proc/net/fib_trie for an IP advertised on this iface. Less reliable than fcntl/ip cmd but doesn't need any external tooling. """ try: import subprocess # Try `hostname -I` as a final fallback (returns space-separated IPs) r = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=1.0) if r.returncode == 0: ips = (r.stdout or "").strip().split() # Return first non-loopback IPv4 for ip in ips: if "." in ip and not ip.startswith("127."): return ip except Exception: return None return None def list_network_interfaces() -> list[dict]: """Return [{name, ip, is_up}] for every interface on the box. Used by the dashboard's system-info panel. """ out: list[dict] = [] try: import socket for idx, name in socket.if_nameindex(): ip = _get_interface_ip(name) out.append({ "name": name, "index": idx, "ip": ip or "", "is_up": ip is not None, }) except Exception: pass return out def _resolve_dashboard_host() -> str: """Resolve the host the dashboard should bind to. Order: 1. SANAD_DASHBOARD_HOST env var (explicit IP/hostname) 2. SANAD_DASHBOARD_INTERFACE → that interface's IP 3. wlan0's IP (default) 4. First non-loopback IP from `hostname -I` 5. 0.0.0.0 (bind everywhere) """ explicit = os.environ.get("SANAD_DASHBOARD_HOST", "").strip() if explicit: return explicit iface_ip = _get_interface_ip(DASHBOARD_INTERFACE) if iface_ip: return iface_ip # Try `hostname -I` as a final non-loopback fallback try: import subprocess r = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=1.0) if r.returncode == 0: for ip in (r.stdout or "").strip().split(): if "." in ip and not ip.startswith("127."): return ip except Exception: pass return "0.0.0.0" DASHBOARD_HOST = _resolve_dashboard_host() DASHBOARD_PORT = 8000 # -- Local TTS -- LOCAL_TTS_MODEL = "MBZUAI/speecht5_tts_clartts_ar" LOCAL_TTS_MODEL_PATH = str(MODEL_DIR / "speecht5_tts_clartts_ar") LOCAL_TTS_HIFIGAN_PATH = str(MODEL_DIR / "speecht5_hifigan") LOCAL_TTS_XVECTOR_PATH = str(MODEL_DIR / "arabic_xvector_embedding.pt") # -- Motion -- _G1 = _CORE_CFG.get("g1_hardware", {}) REPLAY_HZ = _G1.get("replay_hz", 60.0) G1_NUM_MOTOR = _G1.get("num_motor", 29) ENABLE_ARM_SDK_INDEX = _G1.get("enable_arm_sdk_index", 29) KP_HIGH = 300.0 KD_HIGH = 3.0 KP_LOW = 80.0 KD_LOW = 3.0 KP_WRIST = 40.0 KD_WRIST = 1.5 WEAK_MOTORS = {4, 10, 15, 16, 17, 18, 22, 23, 24, 25} WRIST_MOTORS = {19, 20, 21, 26, 27, 28} # -- Live Gemini subprocess tuning -- LIVE_TUNE: dict[str, str] = { "SANAD_REQUIRED_LOUD_CHUNKS": "5", "SANAD_PREBUFFER_CHUNKS": "3", "SANAD_PLAYBACK_TIMEOUT": "0.25", "SANAD_BARGE_IN_COOLDOWN": "1.0", "SANAD_AI_SPEAK_GRACE": "0.5", # ECHO_GUARD_SEC suppresses USER SAID log lines for this many seconds # after the robot finishes a chunk. Previously 1.2 — caused a visible # lag where "robot finished talking" was followed by silence in the # log even though Gemini was transcribing the user's new speech # immediately. Lowered to 0.3 to match typical room reverb tail; the # real echo protection is the silence-during-speaking gate, not this. "SANAD_ECHO_GUARD_SEC": "0.3", "SANAD_SPEAKING_ENERGY_GATE": "0.90", "SANAD_CALIBRATION_CHUNKS": "30", "SANAD_THRESHOLD_MULTIPLIER": "4.0", # Base barge-in threshold calibrated at the REFERENCE volume (50%). # At runtime, scaled QUADRATICALLY with actual G1 volume: # scale = (actual_vol / ref_vol) ** 2 # # Physical reason: doubling digital speaker volume doubles sample # amplitude, which means RECEIVED energy at the mic quadruples # (energy ~ amplitude²). Linear scaling under-threshold echo at # high volumes → caused "robot listening to himself" feedback. # # Measured on Hollyland + G1 speaker at 100% volume: # echo peak (no user) up to ~15700 # voice peak (user) 25000-32000+ (often saturates 32767) # Safe threshold at 100% vol: ~18000, above echo / below voice. # # Working back with quadratic scale: base × (100/50)² = 18000 # base × 4 = 18000 → base = 4500 at 50% ref volume. "SANAD_MIN_THRESHOLD": "800", "SANAD_PLAYBACK_BARGE_MIN": "2500", "SANAD_PLAYBACK_BARGE_MULT": "1.5", # Sustained-chunk requirement for barge-in. Balance: # higher = fewer false triggers from echo bursts # lower = quicker response to short commands ("stop", "توقف") # Default 5 = ~160ms sustained voice. Real speech reliably # sustains that long; single-chunk echo spikes don't. "SANAD_PLAYBACK_REQUIRED_CHUNKS": "2", "SANAD_SILENCE_AFTER_SPEECH": "1.2", "SANAD_SPEECH_THRESHOLD": "300", "SANAD_DDS_INTERFACE": os.environ.get("SANAD_DDS_INTERFACE", "eth0"), # G1 built-in mic — UDP multicast 239.168.123.161:5555. # Requires wake-up conversation mode ON in Unitree app. "SANAD_USE_G1_MIC": "1", } # -- Camera -- CAMERA_SERVICE_PORT = 8091 DIRECT_CAMERA_URL = f"http://127.0.0.1:{CAMERA_SERVICE_PORT}" # -- DDS / hardware -- # Jetson G1 default is eth0 (the robot's internal network). # Override with SANAD_DDS_INTERFACE=lo for desktop/sim development. DDS_NETWORK_INTERFACE = os.environ.get("SANAD_DDS_INTERFACE", "eth0") def _ensure_dirs() -> list[str]: """Create runtime directories. Failures are collected, not raised. Returns the list of directories that failed to create — caller can decide whether to log/abort. The module import never crashes due to a single permission error on a single directory. """ failed: list[str] = [] for d in (DATA_DIR, LOGS_DIR, SCRIPTS_DIR, AUDIO_RECORDINGS_DIR, MOTION_RECORDINGS_DIR, MOTIONS_DIR): try: d.mkdir(parents=True, exist_ok=True) except OSError: failed.append(str(d)) return failed # Best-effort: create dirs at import. Ignore failures here — individual # subsystems will handle missing dirs at usage time and isolation prevents # cascading import failures. _DIRS_FAILED = _ensure_dirs() def load_config() -> dict[str, Any]: """Load runtime config overrides from CONFIG_FILE (if present).""" if CONFIG_FILE.exists(): try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): return {} return {} def save_config(cfg: dict[str, Any]): CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) import os, tempfile fd, tmp = tempfile.mkstemp( prefix=f".{CONFIG_FILE.name}.", suffix=".tmp", dir=str(CONFIG_FILE.parent), ) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(cfg, f, ensure_ascii=False, indent=2) os.replace(tmp, CONFIG_FILE) except Exception: try: os.unlink(tmp) except OSError: pass raise # Apply config.json overrides on top of module constants (was previously dead code). def _apply_overrides(): cfg = load_config() if not cfg: return g = globals() gemini = cfg.get("gemini", {}) if isinstance(gemini, dict): if "api_key" in gemini and gemini["api_key"]: g["GEMINI_API_KEY"] = gemini["api_key"] if "model" in gemini: g["GEMINI_MODEL"] = gemini["model"] if "voice" in gemini: g["GEMINI_VOICE"] = gemini["voice"] audio = cfg.get("audio", {}) if isinstance(audio, dict): if "send_sample_rate" in audio: g["SEND_SAMPLE_RATE"] = int(audio["send_sample_rate"]) if "receive_sample_rate" in audio: g["RECEIVE_SAMPLE_RATE"] = int(audio["receive_sample_rate"]) if "chunk_size" in audio: g["CHUNK_SIZE"] = int(audio["chunk_size"]) if "sink" in audio: g["SINK"] = audio["sink"] if "source" in audio: g["SOURCE"] = audio["source"] dashboard = cfg.get("dashboard", {}) if isinstance(dashboard, dict): if "host" in dashboard: g["DASHBOARD_HOST"] = dashboard["host"] if "port" in dashboard: g["DASHBOARD_PORT"] = int(dashboard["port"]) try: _apply_overrides() except Exception: # Never let a malformed config.json kill module import. pass