"""Audio I/O manager — recording and playback via PyAudio. Handles microphone capture, speaker playback, and speaker-monitor recording. Thread-safe; one playback at a time via play_lock. Device selection is dynamic — read from voice.audio_devices on each refresh. """ from __future__ import annotations import json import subprocess import threading import time import wave from pathlib import Path from typing import Any try: import numpy as np _HAS_NUMPY = True except ImportError: np = None _HAS_NUMPY = False try: import pyaudio except ImportError: pyaudio = None # optional — only needed for local PCM playback # G1 AudioClient — used to route playback through the robot chest speaker # via DDS `PlayStream` (the same pipe Gemini uses). Without this, WAV # playback would go to the Jetson's built-in audio codec, which isn't # wired to any audible output on the G1. try: from unitree_sdk2py.g1.audio.g1_audio_client import AudioClient from unitree_sdk2py.g1.audio.g1_audio_api import ( ROBOT_API_ID_AUDIO_STOP_PLAY, ) _HAS_G1_AUDIO = True except ImportError: AudioClient = None ROBOT_API_ID_AUDIO_STOP_PLAY = 0 _HAS_G1_AUDIO = False from Project.Sanad.config import ( CHANNELS, CHUNK_SIZE, RECEIVE_SAMPLE_RATE, SINK as DEFAULT_SINK, SOURCE as DEFAULT_SOURCE, MONITOR_SOURCE, ) from Project.Sanad.core.logger import get_logger from Project.Sanad.voice import audio_devices as ad log = get_logger("audio_manager") FORMAT = pyaudio.paInt16 if pyaudio else 8 # Cached current selection — updated by refresh_devices() _DEVICE_LOCK = threading.Lock() _current_sink = DEFAULT_SINK _current_source = DEFAULT_SOURCE def _run_pactl(args: list[str]) -> subprocess.CompletedProcess[str]: return subprocess.run(["pactl", *args], check=True, text=True, capture_output=True) def _resolve_devices() -> tuple[str, str]: """Return current (sink, source) — falls back to config defaults.""" try: cur = ad.current_selection() sink = cur.get("sink") or DEFAULT_SINK source = cur.get("source") or DEFAULT_SOURCE return sink, source except Exception as exc: log.warning("Could not resolve audio devices: %s", exc) return DEFAULT_SINK, DEFAULT_SOURCE def ensure_audio_defaults(): """Re-scan all USB ports, resolve the active profile, set pactl defaults. This is called at startup AND before every playback/recording so that even if the user unplugs/re-plugs a device into a different port, the correct sink/source is always used. """ try: result = ad.apply_current_selection() cur = result.get("selection", {}) sink = cur.get("sink", "") source = cur.get("source", "") with _DEVICE_LOCK: global _current_sink, _current_source _current_sink = sink or DEFAULT_SINK _current_source = source or DEFAULT_SOURCE except Exception as exc: log.warning("Audio defaults not applied: %s", exc) class AudioManager: def __init__(self): if pyaudio is None: raise RuntimeError( "pyaudio not installed — AudioManager cannot play local PCM. " "Install with `pip install pyaudio` (needs portaudio headers), " "or rely on the G1 speaker via AudioClient.PlayStream." ) self.pya = pyaudio.PyAudio() self.play_lock = threading.Lock() # Lazily-initialised G1 DDS audio client (for play_wav → chest speaker) self._g1_audio_client: Any = None # Resolve devices and set PulseAudio defaults at startup self.refresh_devices() ensure_audio_defaults() def _get_g1_audio_client(self): """Return a cached G1 AudioClient (DDS) — creates on first use. Assumes `ChannelFactoryInitialize` has already been called (our ArmController does this at startup on eth0). Returns None if the Unitree SDK is unavailable or init fails. """ if not _HAS_G1_AUDIO: return None if self._g1_audio_client is not None: return self._g1_audio_client try: c = AudioClient() c.SetTimeout(5.0) c.Init() try: c.SetVolume(100) except Exception: pass self._g1_audio_client = c log.info("G1 AudioClient initialized (for chest-speaker playback)") except Exception as exc: log.warning("G1 AudioClient init failed: %s", exc) self._g1_audio_client = None return self._g1_audio_client def refresh_devices(self) -> dict[str, str]: """Re-read selected sink/source from audio_devices module.""" sink, source = _resolve_devices() with _DEVICE_LOCK: global _current_sink, _current_source _current_sink, _current_source = sink, source log.info("AudioManager devices refreshed: sink=%s source=%s", sink, source) return {"sink": sink, "source": source} @property def current_sink(self) -> str: with _DEVICE_LOCK: return _current_sink @property def current_source(self) -> str: with _DEVICE_LOCK: return _current_source def close(self): self.pya.terminate() def sample_width(self) -> int: return self.pya.get_sample_size(FORMAT) # -- playback -- def play_pcm(self, pcm_bytes: bytes, channels: int, sample_rate: int, sample_width: int): with self.play_lock: ensure_audio_defaults() stream = self.pya.open( format=self.pya.get_format_from_width(sample_width), channels=channels, rate=sample_rate, output=True, frames_per_buffer=CHUNK_SIZE, ) try: frame_bytes = CHUNK_SIZE * channels * sample_width for offset in range(0, len(pcm_bytes), frame_bytes): stream.write(pcm_bytes[offset : offset + frame_bytes]) finally: stream.stop_stream() stream.close() def play_wav(self, path: Path) -> dict[str, Any]: """Play a WAV file through the G1 chest speaker via DDS when available, falling back to the host PulseAudio sink otherwise. The G1's built-in audio (Jetson `platform-sound`) isn't wired to any audible speaker — the robot's loudspeaker is only reachable over DDS `AudioClient.PlayStream` (same pipe Gemini uses). """ with wave.open(str(path), "rb") as wf: channels = wf.getnchannels() sw = wf.getsampwidth() rate = wf.getframerate() data = wf.readframes(wf.getnframes()) # Prefer G1 chest speaker when the Unitree SDK is present client = self._get_g1_audio_client() if client is not None and _HAS_NUMPY and sw == 2: self._play_pcm_via_g1(data, channels, rate) else: if client is None and _HAS_G1_AUDIO: log.warning("G1 AudioClient unavailable, using host PulseAudio sink") self.play_pcm(data, channels, rate, sw) duration = len(data) / (rate * channels * sw) if rate else 0 return {"path": str(path), "duration_seconds": round(duration, 3)} # -- G1 DDS-routed playback -- _G1_STREAM_APP = "sanad_playback" _G1_HW_RATE = 16_000 def stop_playback(self) -> None: """Stop any in-flight G1 DDS audio stream. Used by the dashboard's Stop button to halt `play_wav` / `_play_pcm_via_g1` mid-stream. Safe to call even when nothing is playing — the DDS call is idempotent. """ client = self._get_g1_audio_client() if client is None: return try: client._Call( ROBOT_API_ID_AUDIO_STOP_PLAY, json.dumps({"app_name": self._G1_STREAM_APP}), ) log.info("G1 audio stream stopped (app=%s)", self._G1_STREAM_APP) except Exception as exc: log.warning("stop_playback failed: %s", exc) def _play_pcm_via_g1(self, pcm_bytes: bytes, channels: int, source_rate: int) -> None: """Stream int16 PCM to the G1 chest speaker via AudioClient.PlayStream. Converts stereo → mono and resamples to 16 kHz (the rate AudioClient expects). Uses a fresh stream_id per call so back-to-back plays don't interfere. """ client = self._get_g1_audio_client() if client is None: raise RuntimeError("G1 AudioClient not available") arr = np.frombuffer(pcm_bytes, dtype=np.int16) # Stereo → mono average if channels == 2 and arr.size % 2 == 0: arr = arr.reshape(-1, 2).mean(axis=1).astype(np.int16) # Resample to 16 kHz if source_rate != self._G1_HW_RATE and arr.size: target_len = max(1, int(len(arr) * self._G1_HW_RATE / source_rate)) arr = np.interp( np.linspace(0, len(arr), target_len, endpoint=False), np.arange(len(arr)), arr.astype(np.float64), ).astype(np.int16) stream_id = f"wav_{int(time.time() * 1000)}" # Clear any lingering stream from a previous call try: client._Call(ROBOT_API_ID_AUDIO_STOP_PLAY, json.dumps({"app_name": self._G1_STREAM_APP})) except Exception: pass time.sleep(0.15) # Push the whole clip in one PlayStream — G1 handles buffering with self.play_lock: play_start = time.time() client.PlayStream(self._G1_STREAM_APP, stream_id, arr.tobytes()) total_sec = len(arr) / self._G1_HW_RATE # Block until audio has drained (plus a small safety margin) elapsed = time.time() - play_start remaining = total_sec - elapsed + 0.3 if remaining > 0: time.sleep(remaining) try: client._Call(ROBOT_API_ID_AUDIO_STOP_PLAY, json.dumps({"app_name": self._G1_STREAM_APP})) except Exception: pass # -- recording -- def record_mic(self, duration_sec: float) -> bytes: """Record from default mic for *duration_sec* seconds, return raw PCM.""" ensure_audio_defaults() stream = self.pya.open( format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE, input=True, frames_per_buffer=CHUNK_SIZE, ) frames: list[bytes] = [] total_chunks = int(RECEIVE_SAMPLE_RATE / CHUNK_SIZE * duration_sec) try: for _ in range(total_chunks): frames.append(stream.read(CHUNK_SIZE, exception_on_overflow=False)) finally: stream.stop_stream() stream.close() return b"".join(frames) def save_wav(self, pcm_bytes: bytes, path: Path, channels: int, sample_rate: int): path.parent.mkdir(parents=True, exist_ok=True) with wave.open(str(path), "wb") as wf: wf.setnchannels(channels) wf.setsampwidth(self.sample_width()) wf.setframerate(sample_rate) wf.writeframes(pcm_bytes)