Sanadv3/voice/audio_io.py

914 lines
34 KiB
Python

"""Hardware-agnostic audio I/O for Sanad voice pipelines.
Provides a uniform Mic / Speaker interface so the model layer (Gemini
today, or any future alternative) doesn't need to know which physical
audio path is active. Pick a pairing via `AudioIO.from_profile()`:
builtin → G1 UDP multicast mic + AudioClient.PlayStream
anker → Anker PowerConf USB mic + speaker (PyAudio)
hollyland_builtin → Hollyland wireless mic + G1 built-in speaker
Mics deliver int16 mono PCM at 16 kHz.
Speakers accept int16 mono PCM plus a `source_rate` and resample
internally if the hardware runs at a different rate.
Usage:
audio = AudioIO.from_profile("builtin", audio_client=ac)
audio.start()
try:
chunk = audio.mic.read_chunk(1024) # mic
audio.speaker.begin_stream() # speaker
audio.speaker.send_chunk(pcm_24k, 24000)
audio.speaker.wait_finish()
finally:
audio.stop()
"""
from __future__ import annotations
import json
import socket
import struct
import subprocess
import threading
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional, Union
import numpy as np
try:
import pyaudio
_HAS_PYAUDIO = True
except ImportError:
pyaudio = None
_HAS_PYAUDIO = False
from Project.Sanad.core.config_loader import section as _cfg_section
from Project.Sanad.core.logger import get_logger
log = get_logger("audio_io")
_MIC_CFG = _cfg_section("voice", "mic_udp")
_SP_CFG = _cfg_section("voice", "speaker")
TARGET_MIC_RATE = 16_000
_MCAST_GRP = _MIC_CFG.get("group", "239.168.123.161")
_MCAST_PORT = _MIC_CFG.get("port", 5555)
_MIC_BUF_MAX = _MIC_CFG.get("buffer_max_bytes", 64_000)
_MIC_READ_TIMEOUT = _MIC_CFG.get("read_timeout_sec", 0.04)
PCMLike = Union[bytes, bytearray, memoryview, np.ndarray]
def _find_g1_local_ip() -> str:
"""Find the host IPv4 address on the G1's internal 192.168.123.0/24 network."""
out = subprocess.run(
["ip", "-4", "-o", "addr"], capture_output=True, text=True,
).stdout
for line in out.splitlines():
for tok in line.split():
if tok.startswith("192.168.123."):
return tok.split("/")[0]
raise RuntimeError("no 192.168.123.x interface found")
def _resample_int16(pcm: np.ndarray, src_rate: int, dst_rate: int) -> np.ndarray:
if src_rate == dst_rate or pcm.size == 0:
return pcm.astype(np.int16, copy=False)
target_len = max(1, int(len(pcm) * dst_rate / src_rate))
return np.interp(
np.linspace(0, len(pcm), target_len, endpoint=False),
np.arange(len(pcm)),
pcm.astype(np.float64),
).astype(np.int16)
def _as_int16_array(pcm: PCMLike) -> np.ndarray:
if isinstance(pcm, np.ndarray):
return pcm.astype(np.int16, copy=False)
return np.frombuffer(bytes(pcm), dtype=np.int16)
# ─── Protocols ────────────────────────────────────────────
class Mic(ABC):
sample_rate: int = TARGET_MIC_RATE
@abstractmethod
def start(self) -> None: ...
@abstractmethod
def read_chunk(self, num_bytes: int) -> bytes: ...
@abstractmethod
def flush(self) -> None: ...
@abstractmethod
def stop(self) -> None: ...
class Speaker(ABC):
@abstractmethod
def begin_stream(self) -> None: ...
@abstractmethod
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
"""Queue PCM for playback. `source_rate` is the sample rate of `pcm`."""
@abstractmethod
def wait_finish(self) -> None: ...
@abstractmethod
def stop(self) -> None: ...
@property
@abstractmethod
def interrupted(self) -> bool: ...
@property
def total_sent_sec(self) -> float:
return 0.0
# ─── G1 built-in (UDP mic + AudioClient speaker) ──────────
class BuiltinMic(Mic):
"""G1 robot's on-board mic published over UDP multicast."""
sample_rate = TARGET_MIC_RATE
def __init__(self, group: str = _MCAST_GRP, port: int = _MCAST_PORT,
buf_max: int = _MIC_BUF_MAX):
self._group = group
self._port = port
self._buf_max = buf_max
self._sock: Optional[socket.socket] = None
self._buf = bytearray()
self._lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
def start(self) -> None:
local_ip = _find_g1_local_ip()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(("", self._port))
mreq = struct.pack(
"4s4s",
socket.inet_aton(self._group),
socket.inet_aton(local_ip),
)
self._sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self._sock.settimeout(1.0)
self._running = True
self._thread = threading.Thread(target=self._recv_loop, daemon=True)
self._thread.start()
log.info("BuiltinMic joined %s:%d on %s", self._group, self._port, local_ip)
def _recv_loop(self) -> None:
while self._running:
try:
data, _ = self._sock.recvfrom(4096)
with self._lock:
self._buf.extend(data)
if len(self._buf) > self._buf_max:
del self._buf[:len(self._buf) - self._buf_max]
except socket.timeout:
continue
except Exception:
if self._running:
time.sleep(0.01)
def read_chunk(self, num_bytes: int) -> bytes:
deadline = time.time() + _MIC_READ_TIMEOUT
while time.time() < deadline:
with self._lock:
if len(self._buf) >= num_bytes:
chunk = bytes(self._buf[:num_bytes])
del self._buf[:num_bytes]
return chunk
time.sleep(0.003)
with self._lock:
avail = len(self._buf)
if avail > 0:
chunk = bytes(self._buf[:avail])
del self._buf[:avail]
return chunk + b"\x00" * (num_bytes - avail)
return b"\x00" * num_bytes
def flush(self) -> None:
with self._lock:
self._buf.clear()
def stop(self) -> None:
self._running = False
if self._sock is not None:
try:
self._sock.close()
except Exception:
pass
self._sock = None
class BuiltinSpeaker(Speaker):
"""G1 robot's built-in speaker via AudioClient.PlayStream (16 kHz mono)."""
HARDWARE_RATE = 16_000
def __init__(self, audio_client: Any, app_name: Optional[str] = None):
self._ac = audio_client
try:
self._ac.SetVolume(100)
except Exception:
log.warning("BuiltinSpeaker.SetVolume failed")
self._app_name = app_name or _SP_CFG.get("app_name", "sanad")
self._begin_pause = _SP_CFG.get("begin_stream_pause_sec", 0.15)
self._finish_margin = _SP_CFG.get("wait_finish_margin_sec", 0.3)
self._stop_flag = threading.Event()
self._stream_id: Optional[str] = None
self._total_sent = 0.0
self._play_start = 0.0
def _stop_play_api(self) -> None:
try:
from unitree_sdk2py.g1.audio.g1_audio_api import (
ROBOT_API_ID_AUDIO_STOP_PLAY,
)
self._ac._Call(
ROBOT_API_ID_AUDIO_STOP_PLAY,
json.dumps({"app_name": self._app_name}),
)
except Exception:
log.warning("BuiltinSpeaker AUDIO_STOP_PLAY failed")
def begin_stream(self) -> None:
self._stop_flag.clear()
self._stop_play_api()
time.sleep(self._begin_pause)
self._stream_id = f"s_{int(time.time() * 1000)}"
self._total_sent = 0.0
self._play_start = time.time()
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
if self._stop_flag.is_set():
return
arr = _as_int16_array(pcm)
if arr.size < 10:
return
hw = _resample_int16(arr, source_rate, self.HARDWARE_RATE)
self._ac.PlayStream(self._app_name, self._stream_id, hw.tobytes())
self._total_sent += len(hw) / self.HARDWARE_RATE
def wait_finish(self) -> None:
elapsed = time.time() - self._play_start
remaining = self._total_sent - elapsed + self._finish_margin
waited = 0.0
while waited < remaining and not self._stop_flag.is_set():
time.sleep(0.1)
waited += 0.1
self._stop_play_api()
def stop(self) -> None:
self._stop_flag.set()
self._stop_play_api()
@property
def interrupted(self) -> bool:
return self._stop_flag.is_set()
@property
def total_sent_sec(self) -> float:
return self._total_sent
# ─── PyAudio-backed mic/speaker ───────────────────────────
class _PyAudioMic(Mic):
"""Shared base for PulseAudio/ALSA input — matches device by name pattern."""
sample_rate = TARGET_MIC_RATE
def __init__(self, device_pattern: str, label: str,
frames_per_buffer: int = 512):
if not _HAS_PYAUDIO:
raise RuntimeError(f"{label}Mic requires pyaudio")
self._device_pattern = device_pattern
self._label = label
self._frames_per_buffer = frames_per_buffer
self._pa: Optional["pyaudio.PyAudio"] = None
self._stream = None
self._running = False
self._buf = bytearray()
self._lock = threading.Lock()
self._thread: Optional[threading.Thread] = None
def _resolve_device_index(self) -> Optional[int]:
"""Pick the PyAudio input device to open.
Preference order:
1. PortAudio's 'pulse' device — routes capture through
PulseAudio's default source. We MUST use this on USB UAC1
cards (Anker PowerConf, Hollyland) — opening hw:N,0
directly grabs ALSA exclusively, which makes PulseAudio's
module-alsa-card lose the device. That drops the card
from `pactl list`, the parent's audio watcher then sees
"anker unplugged" within seconds, and reverts the brain
to the boot profile (chest speaker). The dashboard's
"Apply" call already pins PulseAudio's default
source/sink to the matching Anker device, so opening
'pulse' lands on the right hardware.
2. PortAudio's 'default' device — also pulse-routed on a
standard Pulse-on-Linux setup, used as a fallback.
3. Substring match against the device_pattern (legacy
direct-hw path) — only used when neither 'pulse' nor
'default' is enumerated by PortAudio, which would only
happen on a system without Pulse. Logs a WARN because
this is the path that triggers the bug above.
"""
if self._pa is None:
return None
pulse_idx = default_idx = pattern_idx = None
pattern_name = ""
patterns = [p.strip().lower()
for p in self._device_pattern.split(",") if p.strip()]
for i in range(self._pa.get_device_count()):
info = self._pa.get_device_info_by_index(i)
if info.get("maxInputChannels", 0) <= 0:
continue
name_lower = str(info.get("name", "")).lower()
if pulse_idx is None and name_lower == "pulse":
pulse_idx = i
elif default_idx is None and name_lower == "default":
default_idx = i
if pattern_idx is None and any(n in name_lower for n in patterns):
pattern_idx = i
pattern_name = name_lower
if pulse_idx is not None:
return pulse_idx
if default_idx is not None:
return default_idx
if pattern_idx is not None:
log.warning(
"%sMic falling back to direct ALSA device '%s' "
"(no 'pulse'/'default' device exposed by PortAudio) — "
"this grabs the card exclusively and may cause PulseAudio "
"to drop it; consider installing the ALSA pulse plugin",
self._label, pattern_name,
)
return pattern_idx
return None
def start(self) -> None:
self._pa = pyaudio.PyAudio()
idx = self._resolve_device_index()
# Log which device we picked so a "wrong sink" symptom is easy
# to attribute. Includes the device name (e.g. 'pulse' vs hw:N)
# since the index alone tells you nothing useful in a tail.
try:
picked = self._pa.get_device_info_by_index(idx) if idx is not None else {}
picked_name = picked.get("name", "?")
except Exception:
picked_name = "?"
self._stream = self._pa.open(
format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
input_device_index=idx,
frames_per_buffer=self._frames_per_buffer,
)
self._running = True
self._thread = threading.Thread(target=self._recv_loop, daemon=True)
self._thread.start()
log.info("%sMic started (device_index=%s name=%r)",
self._label, idx, picked_name)
def _recv_loop(self) -> None:
while self._running:
try:
data = self._stream.read(
self._frames_per_buffer, exception_on_overflow=False,
)
with self._lock:
self._buf.extend(data)
if len(self._buf) > _MIC_BUF_MAX:
del self._buf[:len(self._buf) - _MIC_BUF_MAX]
except Exception:
if self._running:
time.sleep(0.01)
def read_chunk(self, num_bytes: int) -> bytes:
deadline = time.time() + _MIC_READ_TIMEOUT
while time.time() < deadline:
with self._lock:
if len(self._buf) >= num_bytes:
chunk = bytes(self._buf[:num_bytes])
del self._buf[:num_bytes]
return chunk
time.sleep(0.003)
with self._lock:
avail = len(self._buf)
if avail > 0:
chunk = bytes(self._buf[:avail])
del self._buf[:avail]
return chunk + b"\x00" * (num_bytes - avail)
return b"\x00" * num_bytes
def flush(self) -> None:
with self._lock:
self._buf.clear()
def stop(self) -> None:
self._running = False
if self._stream is not None:
try:
self._stream.stop_stream()
self._stream.close()
except Exception:
pass
self._stream = None
if self._pa is not None:
try:
self._pa.terminate()
except Exception:
pass
self._pa = None
class AnkerMic(_PyAudioMic):
def __init__(self):
super().__init__(device_pattern="powerconf,anker", label="Anker")
class HollylandMic(_PyAudioMic):
def __init__(self):
super().__init__(
device_pattern="hollyland,wireless_microphone",
label="Hollyland",
)
class _PyAudioSpeaker(Speaker):
"""PulseAudio/ALSA output — opens a fresh output stream per begin_stream()."""
def __init__(self, device_pattern: str, label: str):
if not _HAS_PYAUDIO:
raise RuntimeError(f"{label}Speaker requires pyaudio")
self._device_pattern = device_pattern
self._label = label
self._pa: Optional["pyaudio.PyAudio"] = None
self._stream = None
self._stream_rate: Optional[int] = None
self._stop_flag = threading.Event()
self._total_sent = 0.0
# Serialises every touch of self._stream / self._pa. PortAudio's
# ALSA→pulse plugin is NOT re-entrant: a concurrent snd_pcm_close
# (from stop()/wait_finish()) while another thread is inside
# snd_pcm_writei (from send_chunk()) corrupts the pulse mainloop
# heap — observed as `malloc_consolidate(): invalid chunk size`
# on barge-in. RLock so stop()→wait_finish() nesting is safe.
self._lock = threading.RLock()
# Sticky teardown signal — once stop() has run, refuse to
# lazy-reopen the stream from a late send_chunk on the same
# instance (the swap path replaces the instance entirely).
self._closed = False
def _resolve_device_index(self) -> Optional[int]:
"""Pick the PyAudio output device to open.
Mirrors `_PyAudioMic._resolve_device_index` — see that method's
docstring for the rationale. Short version: prefer 'pulse' so
playback goes through PulseAudio's default sink (which the
dashboard's Apply pins to the active profile's sink); only fall
back to direct hw:N if PulseAudio isn't wired into PortAudio at
all. Grabbing hw:N exclusively makes PulseAudio drop the card
and the parent's audio watcher will then revert the brain to
the boot profile within seconds.
"""
if self._pa is None:
return None
pulse_idx = default_idx = pattern_idx = None
pattern_name = ""
patterns = [p.strip().lower()
for p in self._device_pattern.split(",") if p.strip()]
for i in range(self._pa.get_device_count()):
info = self._pa.get_device_info_by_index(i)
if info.get("maxOutputChannels", 0) <= 0:
continue
name_lower = str(info.get("name", "")).lower()
if pulse_idx is None and name_lower == "pulse":
pulse_idx = i
elif default_idx is None and name_lower == "default":
default_idx = i
if pattern_idx is None and any(n in name_lower for n in patterns):
pattern_idx = i
pattern_name = name_lower
if pulse_idx is not None:
return pulse_idx
if default_idx is not None:
return default_idx
if pattern_idx is not None:
log.warning(
"%sSpeaker falling back to direct ALSA device '%s' "
"(no 'pulse'/'default' device exposed by PortAudio) — "
"this grabs the card exclusively and may cause PulseAudio "
"to drop it; consider installing the ALSA pulse plugin",
self._label, pattern_name,
)
return pattern_idx
return None
# USB-native rate for the underlying card. PortAudio's ALSA backend
# (the only backend available in conda's PyAudio build on Jetson)
# opens via the ALSA 'pulse' plugin, which on this system DOES NOT
# advertise rate conversion in `snd_pcm_hw_params` — opening at the
# source rate (24 kHz from Gemini TTS, etc.) gets rejected with
# paInvalidSampleRate. We pin the stream rate to the card's native
# 48 kHz and resample chunks app-side before writing. Same approach
# `_play_pcm_via_g1` uses for the DDS path.
_STREAM_TARGET_RATE = 48_000
def _open_stream(self, _ignored_rate: int) -> None:
idx = self._resolve_device_index()
try:
picked = self._pa.get_device_info_by_index(idx) if idx is not None else {}
picked_name = picked.get("name", "?")
except Exception:
picked_name = "?"
# ALWAYS open at _STREAM_TARGET_RATE — see class docstring above.
self._stream = self._pa.open(
format=pyaudio.paInt16,
channels=1,
rate=self._STREAM_TARGET_RATE,
output=True,
output_device_index=idx,
)
self._stream_rate = self._STREAM_TARGET_RATE
log.info("%sSpeaker output opened (device_index=%s name=%r, rate=%d "
"— chunks resampled to this rate)",
self._label, idx, picked_name, self._STREAM_TARGET_RATE)
def begin_stream(self) -> None:
# Hold the lock so a concurrent stop()/wait_finish() (from the
# barge-in path or a swap drain) cannot interleave with the
# flag clear + PyAudio init — which would otherwise re-enable
# writes against a stream the teardown is about to close.
with self._lock:
if self._closed:
# Speaker was torn down for swap or session end; do not
# revive on the same instance. Caller swap_audio_devices
# replaces the instance entirely.
return
self._stop_flag.clear()
self._total_sent = 0.0
if self._pa is None:
self._pa = pyaudio.PyAudio()
def _resample_mono16(self, arr, src_rate: int, dst_rate: int):
"""Linear interp resample of mono int16. Returns ndarray (int16).
No-op when rates match. numpy-only — matches _play_pcm_via_g1."""
import numpy as _np # local — keep top-level import surface unchanged
if src_rate == dst_rate or arr.size == 0:
return arr
n_out = max(1, int(arr.size * dst_rate / src_rate))
return _np.interp(
_np.linspace(0, arr.size, n_out, endpoint=False),
_np.arange(arr.size, dtype=_np.float64),
arr.astype(_np.float64),
).astype(_np.int16)
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
# Cheap pre-check OUTSIDE the lock — avoids ever taking the
# lock for empty/late chunks once a stop has fired. Event +
# bool reads are atomic.
if self._stop_flag.is_set() or self._closed:
return
arr = _as_int16_array(pcm)
if arr.size < 10:
return
# Resample BEFORE acquiring the lock — pure CPU, no shared
# state, keeps the critical section to just the PortAudio write
# so a concurrent stop() doesn't wait on numpy work.
if source_rate != self._STREAM_TARGET_RATE:
arr = self._resample_mono16(arr, source_rate, self._STREAM_TARGET_RATE)
payload = arr.tobytes()
sent_sec = len(arr) / self._STREAM_TARGET_RATE
with self._lock:
# CRITICAL re-check inside the lock: stop() may have run
# between our pre-check and acquiring the lock. Without
# this, the lazy-open below would resurrect a stream that
# barge-in just tore down — defeating the whole fix.
if self._stop_flag.is_set() or self._closed:
return
if self._pa is None:
self._pa = pyaudio.PyAudio()
if self._stream is None:
# Pass any rate — _open_stream ignores it and always
# opens at _STREAM_TARGET_RATE.
self._open_stream(source_rate)
stream = self._stream # snapshot — wait_finish nulls under same lock
if stream is None: # _open_stream failed
return
try:
stream.write(payload)
self._total_sent += sent_sec
except Exception as exc:
log.warning("%sSpeaker write failed: %s", self._label, exc)
def wait_finish(self) -> None:
with self._lock:
stream = self._stream
# Null the ref BEFORE close so a racing send_chunk (waiting
# on the lock) re-checks and bails instead of touching a
# half-closed handle. Double-close-safe: if another caller
# already nulled it, we do nothing.
self._stream = None
self._stream_rate = None
if stream is not None:
try:
stream.stop_stream()
except Exception:
pass
try:
stream.close()
except Exception:
pass
def stop(self) -> None:
# Set the flag FIRST (outside the lock — Event is atomic) so a
# concurrent send_chunk on another thread sees teardown ASAP
# even before it tries to acquire the lock. Then take the
# RLock and finish teardown; wait_finish re-enters the RLock
# safely.
self._stop_flag.set()
with self._lock:
self._closed = True
self.wait_finish()
@property
def interrupted(self) -> bool:
return self._stop_flag.is_set()
@property
def total_sent_sec(self) -> float:
return self._total_sent
class AnkerSpeaker(_PyAudioSpeaker):
def __init__(self):
super().__init__(device_pattern="powerconf,anker", label="Anker")
class PulseStreamSpeaker(Speaker):
"""Stream PCM to PulseAudio's default sink via a `pacat` subprocess.
Why not _PyAudioSpeaker: PortAudio's 'pulse' device is unavailable in this
conda env (the ALSA→pulse plugin libasound_module_conf_pulse.so isn't on the
env's plugin path), so PyAudio can't reach PulseAudio at all → silence. The
record-playback path proved `pacat`/`paplay` work, so we reuse that: pacat
inherits PULSE_SERVER/XDG_RUNTIME_DIR from the child and plays to PulseAudio's
DEFAULT sink — which the dashboard's Apply pins to the active profile's sink
(the JBL). Used by the JBL profile (paired with the G1 built-in DDS mic)."""
HW_RATE = 24_000 # Gemini's native receive rate; PulseAudio resamples to the sink
def __init__(self, label: str = "Pulse", sink_pattern: str = ""):
self._label = label
self._sink_pattern = sink_pattern
self._sink_name: Optional[str] = None # resolved PA sink, cached
self._proc: Optional["subprocess.Popen"] = None
self._stop_flag = threading.Event()
self._lock = threading.RLock()
self._total_sent = 0.0
self._closed = False
def _resolve_sink(self) -> Optional[str]:
"""Find the PA sink whose name matches our pattern (e.g. the JBL), so we
can pin pacat to it with --device instead of relying on the (drift-prone)
default sink. Returns None → pacat falls back to the default sink."""
if not self._sink_pattern:
return None
pats = [p.strip().lower() for p in self._sink_pattern.split(",") if p.strip()]
try:
out = subprocess.run(
["pactl", "list", "short", "sinks"],
capture_output=True, text=True, timeout=2,
).stdout
except Exception:
return None
for line in out.splitlines():
cols = line.split("\t")
name = cols[1] if len(cols) > 1 else ""
if name and any(p in name.lower() for p in pats):
return name
return None
def _spawn(self) -> None:
if self._proc is not None and self._proc.poll() is None:
return
if self._sink_name is None:
self._sink_name = self._resolve_sink()
cmd = [
"pacat", "--playback",
"--rate=%d" % self.HW_RATE, "--format=s16le", "--channels=1",
"--latency-msec=120",
"--client-name=sanad_voice", "--stream-name=sanad_voice_jbl",
]
if self._sink_name:
cmd.append("--device=%s" % self._sink_name)
try:
self._proc = subprocess.Popen(
cmd, stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
except Exception as exc:
log.warning("%sSpeaker: pacat spawn failed: %s", self._label, exc)
self._proc = None
def begin_stream(self) -> None:
with self._lock:
self._stop_flag.clear()
self._closed = False
self._total_sent = 0.0
self._spawn()
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
with self._lock:
if self._stop_flag.is_set() or self._closed:
return
arr = _as_int16_array(pcm)
if arr.size < 10:
return
if source_rate != self.HW_RATE:
arr = _resample_int16(arr, source_rate, self.HW_RATE)
if self._proc is None or self._proc.poll() is not None:
self._spawn()
p = self._proc
if p is None or p.stdin is None:
return
try:
p.stdin.write(arr.tobytes())
p.stdin.flush()
self._total_sent += len(arr) / self.HW_RATE
except (BrokenPipeError, OSError):
pass
def wait_finish(self) -> None:
# Close stdin so pacat drains its buffer and exits on its own.
with self._lock:
p = self._proc
self._proc = None
if p is None:
return
try:
if p.stdin:
p.stdin.close()
except Exception:
pass
try:
p.wait(timeout=8)
except Exception:
try:
p.kill()
except Exception:
pass
def stop(self) -> None:
with self._lock:
self._stop_flag.set()
self._closed = True
p = self._proc
self._proc = None
if p is not None:
try:
if p.stdin:
p.stdin.close()
except Exception:
pass
try:
p.terminate()
p.wait(timeout=2)
except Exception:
try:
p.kill()
except Exception:
pass
@property
def interrupted(self) -> bool:
return self._stop_flag.is_set()
@property
def total_sent_sec(self) -> float:
return self._total_sent
# ─── Factory ──────────────────────────────────────────────
_PROFILE_ALIASES = {
"builtin": "builtin",
"g1_builtin": "builtin",
"g1": "builtin",
"anker": "anker",
"anker_powerconf": "anker",
"hollyland": "hollyland_builtin",
"hollyland_builtin": "hollyland_builtin",
"jbl": "jbl_builtin_mic",
"jbl_builtin_mic": "jbl_builtin_mic",
}
SUPPORTED_PROFILES = ("builtin", "anker", "hollyland_builtin", "jbl_builtin_mic")
@dataclass
class AudioIO:
mic: Mic
speaker: Speaker
profile_id: str = field(default="builtin")
# Kept on the instance so the brain can rebuild a profile that needs
# the DDS handle (`builtin`, `hollyland_builtin`) during a hot-swap —
# without re-init'ing the channel. `repr=False` keeps it out of logs.
_audio_client: Optional[Any] = field(default=None, repr=False, compare=False)
def start(self) -> None:
self.mic.start()
def stop(self) -> None:
try:
self.speaker.stop()
except Exception:
log.warning("AudioIO speaker.stop failed", exc_info=True)
try:
self.mic.stop()
except Exception:
log.warning("AudioIO mic.stop failed", exc_info=True)
@classmethod
def build_backends(
cls,
profile_id: str,
*,
audio_client: Optional[Any] = None,
) -> tuple[Mic, Speaker]:
"""Return a fresh (Mic, Speaker) pair for a profile WITHOUT wrapping
in an AudioIO. Used by GeminiBrain.swap_audio_devices() for the
hot-swap path: build a new pair, switch refs, tear down the old.
Same validation as from_profile(). `audio_client` is required for
profiles that route playback through the G1 chest speaker.
"""
raw = (profile_id or "").strip().lower()
resolved = _PROFILE_ALIASES.get(raw)
if resolved is None:
raise ValueError(
f"unknown audio profile {profile_id!r}; "
f"supported: {', '.join(SUPPORTED_PROFILES)}"
)
if resolved == "builtin":
if audio_client is None:
raise ValueError(
"profile 'builtin' requires audio_client (G1 AudioClient)"
)
return BuiltinMic(), BuiltinSpeaker(audio_client)
if resolved == "anker":
return AnkerMic(), AnkerSpeaker()
if resolved == "hollyland_builtin":
if audio_client is None:
raise ValueError(
"profile 'hollyland_builtin' uses the G1 speaker — "
"requires audio_client"
)
return HollylandMic(), BuiltinSpeaker(audio_client)
if resolved == "jbl_builtin_mic":
# JBL speaker via pacat → PulseAudio default sink (pinned to the JBL
# by the dashboard) + the G1 built-in DDS mic (the JBL has no mic).
# pacat is used because PyAudio's 'pulse' device is unavailable in
# this env. Neither backend needs the AudioClient.
return BuiltinMic(), PulseStreamSpeaker(label="JBL", sink_pattern="jbl,bluez")
raise AssertionError(f"unhandled resolved profile: {resolved!r}")
@classmethod
def from_profile(
cls,
profile_id: str,
*,
audio_client: Optional[Any] = None,
) -> "AudioIO":
"""Build an AudioIO for the requested profile.
`audio_client` is the initialised `unitree_sdk2py` `AudioClient` and
is required for any profile that speaks through the G1's on-board
speaker (`builtin`, `hollyland_builtin`). It's also retained on the
returned AudioIO so a later hot-swap can rebuild without re-init.
"""
raw = (profile_id or "").strip().lower()
resolved = _PROFILE_ALIASES.get(raw)
if resolved is None:
raise ValueError(
f"unknown audio profile {profile_id!r}; "
f"supported: {', '.join(SUPPORTED_PROFILES)}"
)
mic, speaker = cls.build_backends(resolved, audio_client=audio_client)
return cls(mic=mic, speaker=speaker, profile_id=resolved,
_audio_client=audio_client)