914 lines
34 KiB
Python
914 lines
34 KiB
Python
"""Hardware-agnostic audio I/O for Sanad voice pipelines.
|
|
|
|
Provides a uniform Mic / Speaker interface so the model layer (Gemini
|
|
today, or any future alternative) doesn't need to know which physical
|
|
audio path is active. Pick a pairing via `AudioIO.from_profile()`:
|
|
|
|
builtin → G1 UDP multicast mic + AudioClient.PlayStream
|
|
anker → Anker PowerConf USB mic + speaker (PyAudio)
|
|
hollyland_builtin → Hollyland wireless mic + G1 built-in speaker
|
|
|
|
Mics deliver int16 mono PCM at 16 kHz.
|
|
Speakers accept int16 mono PCM plus a `source_rate` and resample
|
|
internally if the hardware runs at a different rate.
|
|
|
|
Usage:
|
|
|
|
audio = AudioIO.from_profile("builtin", audio_client=ac)
|
|
audio.start()
|
|
try:
|
|
chunk = audio.mic.read_chunk(1024) # mic
|
|
audio.speaker.begin_stream() # speaker
|
|
audio.speaker.send_chunk(pcm_24k, 24000)
|
|
audio.speaker.wait_finish()
|
|
finally:
|
|
audio.stop()
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import socket
|
|
import struct
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Optional, Union
|
|
|
|
import numpy as np
|
|
|
|
try:
|
|
import pyaudio
|
|
_HAS_PYAUDIO = True
|
|
except ImportError:
|
|
pyaudio = None
|
|
_HAS_PYAUDIO = False
|
|
|
|
from Project.Sanad.core.config_loader import section as _cfg_section
|
|
from Project.Sanad.core.logger import get_logger
|
|
|
|
log = get_logger("audio_io")
|
|
|
|
_MIC_CFG = _cfg_section("voice", "mic_udp")
|
|
_SP_CFG = _cfg_section("voice", "speaker")
|
|
|
|
TARGET_MIC_RATE = 16_000
|
|
|
|
_MCAST_GRP = _MIC_CFG.get("group", "239.168.123.161")
|
|
_MCAST_PORT = _MIC_CFG.get("port", 5555)
|
|
_MIC_BUF_MAX = _MIC_CFG.get("buffer_max_bytes", 64_000)
|
|
_MIC_READ_TIMEOUT = _MIC_CFG.get("read_timeout_sec", 0.04)
|
|
|
|
PCMLike = Union[bytes, bytearray, memoryview, np.ndarray]
|
|
|
|
|
|
def _find_g1_local_ip() -> str:
|
|
"""Find the host IPv4 address on the G1's internal 192.168.123.0/24 network."""
|
|
out = subprocess.run(
|
|
["ip", "-4", "-o", "addr"], capture_output=True, text=True,
|
|
).stdout
|
|
for line in out.splitlines():
|
|
for tok in line.split():
|
|
if tok.startswith("192.168.123."):
|
|
return tok.split("/")[0]
|
|
raise RuntimeError("no 192.168.123.x interface found")
|
|
|
|
|
|
def _resample_int16(pcm: np.ndarray, src_rate: int, dst_rate: int) -> np.ndarray:
|
|
if src_rate == dst_rate or pcm.size == 0:
|
|
return pcm.astype(np.int16, copy=False)
|
|
target_len = max(1, int(len(pcm) * dst_rate / src_rate))
|
|
return np.interp(
|
|
np.linspace(0, len(pcm), target_len, endpoint=False),
|
|
np.arange(len(pcm)),
|
|
pcm.astype(np.float64),
|
|
).astype(np.int16)
|
|
|
|
|
|
def _as_int16_array(pcm: PCMLike) -> np.ndarray:
|
|
if isinstance(pcm, np.ndarray):
|
|
return pcm.astype(np.int16, copy=False)
|
|
return np.frombuffer(bytes(pcm), dtype=np.int16)
|
|
|
|
|
|
# ─── Protocols ────────────────────────────────────────────
|
|
|
|
class Mic(ABC):
|
|
sample_rate: int = TARGET_MIC_RATE
|
|
|
|
@abstractmethod
|
|
def start(self) -> None: ...
|
|
@abstractmethod
|
|
def read_chunk(self, num_bytes: int) -> bytes: ...
|
|
@abstractmethod
|
|
def flush(self) -> None: ...
|
|
@abstractmethod
|
|
def stop(self) -> None: ...
|
|
|
|
|
|
class Speaker(ABC):
|
|
@abstractmethod
|
|
def begin_stream(self) -> None: ...
|
|
|
|
@abstractmethod
|
|
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
|
|
"""Queue PCM for playback. `source_rate` is the sample rate of `pcm`."""
|
|
|
|
@abstractmethod
|
|
def wait_finish(self) -> None: ...
|
|
@abstractmethod
|
|
def stop(self) -> None: ...
|
|
|
|
@property
|
|
@abstractmethod
|
|
def interrupted(self) -> bool: ...
|
|
|
|
@property
|
|
def total_sent_sec(self) -> float:
|
|
return 0.0
|
|
|
|
|
|
# ─── G1 built-in (UDP mic + AudioClient speaker) ──────────
|
|
|
|
class BuiltinMic(Mic):
|
|
"""G1 robot's on-board mic published over UDP multicast."""
|
|
|
|
sample_rate = TARGET_MIC_RATE
|
|
|
|
def __init__(self, group: str = _MCAST_GRP, port: int = _MCAST_PORT,
|
|
buf_max: int = _MIC_BUF_MAX):
|
|
self._group = group
|
|
self._port = port
|
|
self._buf_max = buf_max
|
|
self._sock: Optional[socket.socket] = None
|
|
self._buf = bytearray()
|
|
self._lock = threading.Lock()
|
|
self._running = False
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
def start(self) -> None:
|
|
local_ip = _find_g1_local_ip()
|
|
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self._sock.bind(("", self._port))
|
|
mreq = struct.pack(
|
|
"4s4s",
|
|
socket.inet_aton(self._group),
|
|
socket.inet_aton(local_ip),
|
|
)
|
|
self._sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
|
|
self._sock.settimeout(1.0)
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._recv_loop, daemon=True)
|
|
self._thread.start()
|
|
log.info("BuiltinMic joined %s:%d on %s", self._group, self._port, local_ip)
|
|
|
|
def _recv_loop(self) -> None:
|
|
while self._running:
|
|
try:
|
|
data, _ = self._sock.recvfrom(4096)
|
|
with self._lock:
|
|
self._buf.extend(data)
|
|
if len(self._buf) > self._buf_max:
|
|
del self._buf[:len(self._buf) - self._buf_max]
|
|
except socket.timeout:
|
|
continue
|
|
except Exception:
|
|
if self._running:
|
|
time.sleep(0.01)
|
|
|
|
def read_chunk(self, num_bytes: int) -> bytes:
|
|
deadline = time.time() + _MIC_READ_TIMEOUT
|
|
while time.time() < deadline:
|
|
with self._lock:
|
|
if len(self._buf) >= num_bytes:
|
|
chunk = bytes(self._buf[:num_bytes])
|
|
del self._buf[:num_bytes]
|
|
return chunk
|
|
time.sleep(0.003)
|
|
with self._lock:
|
|
avail = len(self._buf)
|
|
if avail > 0:
|
|
chunk = bytes(self._buf[:avail])
|
|
del self._buf[:avail]
|
|
return chunk + b"\x00" * (num_bytes - avail)
|
|
return b"\x00" * num_bytes
|
|
|
|
def flush(self) -> None:
|
|
with self._lock:
|
|
self._buf.clear()
|
|
|
|
def stop(self) -> None:
|
|
self._running = False
|
|
if self._sock is not None:
|
|
try:
|
|
self._sock.close()
|
|
except Exception:
|
|
pass
|
|
self._sock = None
|
|
|
|
|
|
class BuiltinSpeaker(Speaker):
|
|
"""G1 robot's built-in speaker via AudioClient.PlayStream (16 kHz mono)."""
|
|
|
|
HARDWARE_RATE = 16_000
|
|
|
|
def __init__(self, audio_client: Any, app_name: Optional[str] = None):
|
|
self._ac = audio_client
|
|
try:
|
|
self._ac.SetVolume(100)
|
|
except Exception:
|
|
log.warning("BuiltinSpeaker.SetVolume failed")
|
|
self._app_name = app_name or _SP_CFG.get("app_name", "sanad")
|
|
self._begin_pause = _SP_CFG.get("begin_stream_pause_sec", 0.15)
|
|
self._finish_margin = _SP_CFG.get("wait_finish_margin_sec", 0.3)
|
|
self._stop_flag = threading.Event()
|
|
self._stream_id: Optional[str] = None
|
|
self._total_sent = 0.0
|
|
self._play_start = 0.0
|
|
|
|
def _stop_play_api(self) -> None:
|
|
try:
|
|
from unitree_sdk2py.g1.audio.g1_audio_api import (
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
)
|
|
self._ac._Call(
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
json.dumps({"app_name": self._app_name}),
|
|
)
|
|
except Exception:
|
|
log.warning("BuiltinSpeaker AUDIO_STOP_PLAY failed")
|
|
|
|
def begin_stream(self) -> None:
|
|
self._stop_flag.clear()
|
|
self._stop_play_api()
|
|
time.sleep(self._begin_pause)
|
|
self._stream_id = f"s_{int(time.time() * 1000)}"
|
|
self._total_sent = 0.0
|
|
self._play_start = time.time()
|
|
|
|
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
|
|
if self._stop_flag.is_set():
|
|
return
|
|
arr = _as_int16_array(pcm)
|
|
if arr.size < 10:
|
|
return
|
|
hw = _resample_int16(arr, source_rate, self.HARDWARE_RATE)
|
|
self._ac.PlayStream(self._app_name, self._stream_id, hw.tobytes())
|
|
self._total_sent += len(hw) / self.HARDWARE_RATE
|
|
|
|
def wait_finish(self) -> None:
|
|
elapsed = time.time() - self._play_start
|
|
remaining = self._total_sent - elapsed + self._finish_margin
|
|
waited = 0.0
|
|
while waited < remaining and not self._stop_flag.is_set():
|
|
time.sleep(0.1)
|
|
waited += 0.1
|
|
self._stop_play_api()
|
|
|
|
def stop(self) -> None:
|
|
self._stop_flag.set()
|
|
self._stop_play_api()
|
|
|
|
@property
|
|
def interrupted(self) -> bool:
|
|
return self._stop_flag.is_set()
|
|
|
|
@property
|
|
def total_sent_sec(self) -> float:
|
|
return self._total_sent
|
|
|
|
|
|
# ─── PyAudio-backed mic/speaker ───────────────────────────
|
|
|
|
class _PyAudioMic(Mic):
|
|
"""Shared base for PulseAudio/ALSA input — matches device by name pattern."""
|
|
|
|
sample_rate = TARGET_MIC_RATE
|
|
|
|
def __init__(self, device_pattern: str, label: str,
|
|
frames_per_buffer: int = 512):
|
|
if not _HAS_PYAUDIO:
|
|
raise RuntimeError(f"{label}Mic requires pyaudio")
|
|
self._device_pattern = device_pattern
|
|
self._label = label
|
|
self._frames_per_buffer = frames_per_buffer
|
|
self._pa: Optional["pyaudio.PyAudio"] = None
|
|
self._stream = None
|
|
self._running = False
|
|
self._buf = bytearray()
|
|
self._lock = threading.Lock()
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
def _resolve_device_index(self) -> Optional[int]:
|
|
"""Pick the PyAudio input device to open.
|
|
|
|
Preference order:
|
|
1. PortAudio's 'pulse' device — routes capture through
|
|
PulseAudio's default source. We MUST use this on USB UAC1
|
|
cards (Anker PowerConf, Hollyland) — opening hw:N,0
|
|
directly grabs ALSA exclusively, which makes PulseAudio's
|
|
module-alsa-card lose the device. That drops the card
|
|
from `pactl list`, the parent's audio watcher then sees
|
|
"anker unplugged" within seconds, and reverts the brain
|
|
to the boot profile (chest speaker). The dashboard's
|
|
"Apply" call already pins PulseAudio's default
|
|
source/sink to the matching Anker device, so opening
|
|
'pulse' lands on the right hardware.
|
|
2. PortAudio's 'default' device — also pulse-routed on a
|
|
standard Pulse-on-Linux setup, used as a fallback.
|
|
3. Substring match against the device_pattern (legacy
|
|
direct-hw path) — only used when neither 'pulse' nor
|
|
'default' is enumerated by PortAudio, which would only
|
|
happen on a system without Pulse. Logs a WARN because
|
|
this is the path that triggers the bug above.
|
|
"""
|
|
if self._pa is None:
|
|
return None
|
|
pulse_idx = default_idx = pattern_idx = None
|
|
pattern_name = ""
|
|
patterns = [p.strip().lower()
|
|
for p in self._device_pattern.split(",") if p.strip()]
|
|
for i in range(self._pa.get_device_count()):
|
|
info = self._pa.get_device_info_by_index(i)
|
|
if info.get("maxInputChannels", 0) <= 0:
|
|
continue
|
|
name_lower = str(info.get("name", "")).lower()
|
|
if pulse_idx is None and name_lower == "pulse":
|
|
pulse_idx = i
|
|
elif default_idx is None and name_lower == "default":
|
|
default_idx = i
|
|
if pattern_idx is None and any(n in name_lower for n in patterns):
|
|
pattern_idx = i
|
|
pattern_name = name_lower
|
|
if pulse_idx is not None:
|
|
return pulse_idx
|
|
if default_idx is not None:
|
|
return default_idx
|
|
if pattern_idx is not None:
|
|
log.warning(
|
|
"%sMic falling back to direct ALSA device '%s' "
|
|
"(no 'pulse'/'default' device exposed by PortAudio) — "
|
|
"this grabs the card exclusively and may cause PulseAudio "
|
|
"to drop it; consider installing the ALSA pulse plugin",
|
|
self._label, pattern_name,
|
|
)
|
|
return pattern_idx
|
|
return None
|
|
|
|
def start(self) -> None:
|
|
self._pa = pyaudio.PyAudio()
|
|
idx = self._resolve_device_index()
|
|
# Log which device we picked so a "wrong sink" symptom is easy
|
|
# to attribute. Includes the device name (e.g. 'pulse' vs hw:N)
|
|
# since the index alone tells you nothing useful in a tail.
|
|
try:
|
|
picked = self._pa.get_device_info_by_index(idx) if idx is not None else {}
|
|
picked_name = picked.get("name", "?")
|
|
except Exception:
|
|
picked_name = "?"
|
|
self._stream = self._pa.open(
|
|
format=pyaudio.paInt16,
|
|
channels=1,
|
|
rate=self.sample_rate,
|
|
input=True,
|
|
input_device_index=idx,
|
|
frames_per_buffer=self._frames_per_buffer,
|
|
)
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._recv_loop, daemon=True)
|
|
self._thread.start()
|
|
log.info("%sMic started (device_index=%s name=%r)",
|
|
self._label, idx, picked_name)
|
|
|
|
def _recv_loop(self) -> None:
|
|
while self._running:
|
|
try:
|
|
data = self._stream.read(
|
|
self._frames_per_buffer, exception_on_overflow=False,
|
|
)
|
|
with self._lock:
|
|
self._buf.extend(data)
|
|
if len(self._buf) > _MIC_BUF_MAX:
|
|
del self._buf[:len(self._buf) - _MIC_BUF_MAX]
|
|
except Exception:
|
|
if self._running:
|
|
time.sleep(0.01)
|
|
|
|
def read_chunk(self, num_bytes: int) -> bytes:
|
|
deadline = time.time() + _MIC_READ_TIMEOUT
|
|
while time.time() < deadline:
|
|
with self._lock:
|
|
if len(self._buf) >= num_bytes:
|
|
chunk = bytes(self._buf[:num_bytes])
|
|
del self._buf[:num_bytes]
|
|
return chunk
|
|
time.sleep(0.003)
|
|
with self._lock:
|
|
avail = len(self._buf)
|
|
if avail > 0:
|
|
chunk = bytes(self._buf[:avail])
|
|
del self._buf[:avail]
|
|
return chunk + b"\x00" * (num_bytes - avail)
|
|
return b"\x00" * num_bytes
|
|
|
|
def flush(self) -> None:
|
|
with self._lock:
|
|
self._buf.clear()
|
|
|
|
def stop(self) -> None:
|
|
self._running = False
|
|
if self._stream is not None:
|
|
try:
|
|
self._stream.stop_stream()
|
|
self._stream.close()
|
|
except Exception:
|
|
pass
|
|
self._stream = None
|
|
if self._pa is not None:
|
|
try:
|
|
self._pa.terminate()
|
|
except Exception:
|
|
pass
|
|
self._pa = None
|
|
|
|
|
|
class AnkerMic(_PyAudioMic):
|
|
def __init__(self):
|
|
super().__init__(device_pattern="powerconf,anker", label="Anker")
|
|
|
|
|
|
class HollylandMic(_PyAudioMic):
|
|
def __init__(self):
|
|
super().__init__(
|
|
device_pattern="hollyland,wireless_microphone",
|
|
label="Hollyland",
|
|
)
|
|
|
|
|
|
class _PyAudioSpeaker(Speaker):
|
|
"""PulseAudio/ALSA output — opens a fresh output stream per begin_stream()."""
|
|
|
|
def __init__(self, device_pattern: str, label: str):
|
|
if not _HAS_PYAUDIO:
|
|
raise RuntimeError(f"{label}Speaker requires pyaudio")
|
|
self._device_pattern = device_pattern
|
|
self._label = label
|
|
self._pa: Optional["pyaudio.PyAudio"] = None
|
|
self._stream = None
|
|
self._stream_rate: Optional[int] = None
|
|
self._stop_flag = threading.Event()
|
|
self._total_sent = 0.0
|
|
# Serialises every touch of self._stream / self._pa. PortAudio's
|
|
# ALSA→pulse plugin is NOT re-entrant: a concurrent snd_pcm_close
|
|
# (from stop()/wait_finish()) while another thread is inside
|
|
# snd_pcm_writei (from send_chunk()) corrupts the pulse mainloop
|
|
# heap — observed as `malloc_consolidate(): invalid chunk size`
|
|
# on barge-in. RLock so stop()→wait_finish() nesting is safe.
|
|
self._lock = threading.RLock()
|
|
# Sticky teardown signal — once stop() has run, refuse to
|
|
# lazy-reopen the stream from a late send_chunk on the same
|
|
# instance (the swap path replaces the instance entirely).
|
|
self._closed = False
|
|
|
|
def _resolve_device_index(self) -> Optional[int]:
|
|
"""Pick the PyAudio output device to open.
|
|
|
|
Mirrors `_PyAudioMic._resolve_device_index` — see that method's
|
|
docstring for the rationale. Short version: prefer 'pulse' so
|
|
playback goes through PulseAudio's default sink (which the
|
|
dashboard's Apply pins to the active profile's sink); only fall
|
|
back to direct hw:N if PulseAudio isn't wired into PortAudio at
|
|
all. Grabbing hw:N exclusively makes PulseAudio drop the card
|
|
and the parent's audio watcher will then revert the brain to
|
|
the boot profile within seconds.
|
|
"""
|
|
if self._pa is None:
|
|
return None
|
|
pulse_idx = default_idx = pattern_idx = None
|
|
pattern_name = ""
|
|
patterns = [p.strip().lower()
|
|
for p in self._device_pattern.split(",") if p.strip()]
|
|
for i in range(self._pa.get_device_count()):
|
|
info = self._pa.get_device_info_by_index(i)
|
|
if info.get("maxOutputChannels", 0) <= 0:
|
|
continue
|
|
name_lower = str(info.get("name", "")).lower()
|
|
if pulse_idx is None and name_lower == "pulse":
|
|
pulse_idx = i
|
|
elif default_idx is None and name_lower == "default":
|
|
default_idx = i
|
|
if pattern_idx is None and any(n in name_lower for n in patterns):
|
|
pattern_idx = i
|
|
pattern_name = name_lower
|
|
if pulse_idx is not None:
|
|
return pulse_idx
|
|
if default_idx is not None:
|
|
return default_idx
|
|
if pattern_idx is not None:
|
|
log.warning(
|
|
"%sSpeaker falling back to direct ALSA device '%s' "
|
|
"(no 'pulse'/'default' device exposed by PortAudio) — "
|
|
"this grabs the card exclusively and may cause PulseAudio "
|
|
"to drop it; consider installing the ALSA pulse plugin",
|
|
self._label, pattern_name,
|
|
)
|
|
return pattern_idx
|
|
return None
|
|
|
|
# USB-native rate for the underlying card. PortAudio's ALSA backend
|
|
# (the only backend available in conda's PyAudio build on Jetson)
|
|
# opens via the ALSA 'pulse' plugin, which on this system DOES NOT
|
|
# advertise rate conversion in `snd_pcm_hw_params` — opening at the
|
|
# source rate (24 kHz from Gemini TTS, etc.) gets rejected with
|
|
# paInvalidSampleRate. We pin the stream rate to the card's native
|
|
# 48 kHz and resample chunks app-side before writing. Same approach
|
|
# `_play_pcm_via_g1` uses for the DDS path.
|
|
_STREAM_TARGET_RATE = 48_000
|
|
|
|
def _open_stream(self, _ignored_rate: int) -> None:
|
|
idx = self._resolve_device_index()
|
|
try:
|
|
picked = self._pa.get_device_info_by_index(idx) if idx is not None else {}
|
|
picked_name = picked.get("name", "?")
|
|
except Exception:
|
|
picked_name = "?"
|
|
# ALWAYS open at _STREAM_TARGET_RATE — see class docstring above.
|
|
self._stream = self._pa.open(
|
|
format=pyaudio.paInt16,
|
|
channels=1,
|
|
rate=self._STREAM_TARGET_RATE,
|
|
output=True,
|
|
output_device_index=idx,
|
|
)
|
|
self._stream_rate = self._STREAM_TARGET_RATE
|
|
log.info("%sSpeaker output opened (device_index=%s name=%r, rate=%d "
|
|
"— chunks resampled to this rate)",
|
|
self._label, idx, picked_name, self._STREAM_TARGET_RATE)
|
|
|
|
def begin_stream(self) -> None:
|
|
# Hold the lock so a concurrent stop()/wait_finish() (from the
|
|
# barge-in path or a swap drain) cannot interleave with the
|
|
# flag clear + PyAudio init — which would otherwise re-enable
|
|
# writes against a stream the teardown is about to close.
|
|
with self._lock:
|
|
if self._closed:
|
|
# Speaker was torn down for swap or session end; do not
|
|
# revive on the same instance. Caller swap_audio_devices
|
|
# replaces the instance entirely.
|
|
return
|
|
self._stop_flag.clear()
|
|
self._total_sent = 0.0
|
|
if self._pa is None:
|
|
self._pa = pyaudio.PyAudio()
|
|
|
|
def _resample_mono16(self, arr, src_rate: int, dst_rate: int):
|
|
"""Linear interp resample of mono int16. Returns ndarray (int16).
|
|
No-op when rates match. numpy-only — matches _play_pcm_via_g1."""
|
|
import numpy as _np # local — keep top-level import surface unchanged
|
|
if src_rate == dst_rate or arr.size == 0:
|
|
return arr
|
|
n_out = max(1, int(arr.size * dst_rate / src_rate))
|
|
return _np.interp(
|
|
_np.linspace(0, arr.size, n_out, endpoint=False),
|
|
_np.arange(arr.size, dtype=_np.float64),
|
|
arr.astype(_np.float64),
|
|
).astype(_np.int16)
|
|
|
|
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
|
|
# Cheap pre-check OUTSIDE the lock — avoids ever taking the
|
|
# lock for empty/late chunks once a stop has fired. Event +
|
|
# bool reads are atomic.
|
|
if self._stop_flag.is_set() or self._closed:
|
|
return
|
|
arr = _as_int16_array(pcm)
|
|
if arr.size < 10:
|
|
return
|
|
# Resample BEFORE acquiring the lock — pure CPU, no shared
|
|
# state, keeps the critical section to just the PortAudio write
|
|
# so a concurrent stop() doesn't wait on numpy work.
|
|
if source_rate != self._STREAM_TARGET_RATE:
|
|
arr = self._resample_mono16(arr, source_rate, self._STREAM_TARGET_RATE)
|
|
payload = arr.tobytes()
|
|
sent_sec = len(arr) / self._STREAM_TARGET_RATE
|
|
with self._lock:
|
|
# CRITICAL re-check inside the lock: stop() may have run
|
|
# between our pre-check and acquiring the lock. Without
|
|
# this, the lazy-open below would resurrect a stream that
|
|
# barge-in just tore down — defeating the whole fix.
|
|
if self._stop_flag.is_set() or self._closed:
|
|
return
|
|
if self._pa is None:
|
|
self._pa = pyaudio.PyAudio()
|
|
if self._stream is None:
|
|
# Pass any rate — _open_stream ignores it and always
|
|
# opens at _STREAM_TARGET_RATE.
|
|
self._open_stream(source_rate)
|
|
stream = self._stream # snapshot — wait_finish nulls under same lock
|
|
if stream is None: # _open_stream failed
|
|
return
|
|
try:
|
|
stream.write(payload)
|
|
self._total_sent += sent_sec
|
|
except Exception as exc:
|
|
log.warning("%sSpeaker write failed: %s", self._label, exc)
|
|
|
|
def wait_finish(self) -> None:
|
|
with self._lock:
|
|
stream = self._stream
|
|
# Null the ref BEFORE close so a racing send_chunk (waiting
|
|
# on the lock) re-checks and bails instead of touching a
|
|
# half-closed handle. Double-close-safe: if another caller
|
|
# already nulled it, we do nothing.
|
|
self._stream = None
|
|
self._stream_rate = None
|
|
if stream is not None:
|
|
try:
|
|
stream.stop_stream()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
stream.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def stop(self) -> None:
|
|
# Set the flag FIRST (outside the lock — Event is atomic) so a
|
|
# concurrent send_chunk on another thread sees teardown ASAP
|
|
# even before it tries to acquire the lock. Then take the
|
|
# RLock and finish teardown; wait_finish re-enters the RLock
|
|
# safely.
|
|
self._stop_flag.set()
|
|
with self._lock:
|
|
self._closed = True
|
|
self.wait_finish()
|
|
|
|
@property
|
|
def interrupted(self) -> bool:
|
|
return self._stop_flag.is_set()
|
|
|
|
@property
|
|
def total_sent_sec(self) -> float:
|
|
return self._total_sent
|
|
|
|
|
|
class AnkerSpeaker(_PyAudioSpeaker):
|
|
def __init__(self):
|
|
super().__init__(device_pattern="powerconf,anker", label="Anker")
|
|
|
|
|
|
class PulseStreamSpeaker(Speaker):
|
|
"""Stream PCM to PulseAudio's default sink via a `pacat` subprocess.
|
|
|
|
Why not _PyAudioSpeaker: PortAudio's 'pulse' device is unavailable in this
|
|
conda env (the ALSA→pulse plugin libasound_module_conf_pulse.so isn't on the
|
|
env's plugin path), so PyAudio can't reach PulseAudio at all → silence. The
|
|
record-playback path proved `pacat`/`paplay` work, so we reuse that: pacat
|
|
inherits PULSE_SERVER/XDG_RUNTIME_DIR from the child and plays to PulseAudio's
|
|
DEFAULT sink — which the dashboard's Apply pins to the active profile's sink
|
|
(the JBL). Used by the JBL profile (paired with the G1 built-in DDS mic)."""
|
|
|
|
HW_RATE = 24_000 # Gemini's native receive rate; PulseAudio resamples to the sink
|
|
|
|
def __init__(self, label: str = "Pulse", sink_pattern: str = ""):
|
|
self._label = label
|
|
self._sink_pattern = sink_pattern
|
|
self._sink_name: Optional[str] = None # resolved PA sink, cached
|
|
self._proc: Optional["subprocess.Popen"] = None
|
|
self._stop_flag = threading.Event()
|
|
self._lock = threading.RLock()
|
|
self._total_sent = 0.0
|
|
self._closed = False
|
|
|
|
def _resolve_sink(self) -> Optional[str]:
|
|
"""Find the PA sink whose name matches our pattern (e.g. the JBL), so we
|
|
can pin pacat to it with --device instead of relying on the (drift-prone)
|
|
default sink. Returns None → pacat falls back to the default sink."""
|
|
if not self._sink_pattern:
|
|
return None
|
|
pats = [p.strip().lower() for p in self._sink_pattern.split(",") if p.strip()]
|
|
try:
|
|
out = subprocess.run(
|
|
["pactl", "list", "short", "sinks"],
|
|
capture_output=True, text=True, timeout=2,
|
|
).stdout
|
|
except Exception:
|
|
return None
|
|
for line in out.splitlines():
|
|
cols = line.split("\t")
|
|
name = cols[1] if len(cols) > 1 else ""
|
|
if name and any(p in name.lower() for p in pats):
|
|
return name
|
|
return None
|
|
|
|
def _spawn(self) -> None:
|
|
if self._proc is not None and self._proc.poll() is None:
|
|
return
|
|
if self._sink_name is None:
|
|
self._sink_name = self._resolve_sink()
|
|
cmd = [
|
|
"pacat", "--playback",
|
|
"--rate=%d" % self.HW_RATE, "--format=s16le", "--channels=1",
|
|
"--latency-msec=120",
|
|
"--client-name=sanad_voice", "--stream-name=sanad_voice_jbl",
|
|
]
|
|
if self._sink_name:
|
|
cmd.append("--device=%s" % self._sink_name)
|
|
try:
|
|
self._proc = subprocess.Popen(
|
|
cmd, stdin=subprocess.PIPE,
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
except Exception as exc:
|
|
log.warning("%sSpeaker: pacat spawn failed: %s", self._label, exc)
|
|
self._proc = None
|
|
|
|
def begin_stream(self) -> None:
|
|
with self._lock:
|
|
self._stop_flag.clear()
|
|
self._closed = False
|
|
self._total_sent = 0.0
|
|
self._spawn()
|
|
|
|
def send_chunk(self, pcm: PCMLike, source_rate: int) -> None:
|
|
with self._lock:
|
|
if self._stop_flag.is_set() or self._closed:
|
|
return
|
|
arr = _as_int16_array(pcm)
|
|
if arr.size < 10:
|
|
return
|
|
if source_rate != self.HW_RATE:
|
|
arr = _resample_int16(arr, source_rate, self.HW_RATE)
|
|
if self._proc is None or self._proc.poll() is not None:
|
|
self._spawn()
|
|
p = self._proc
|
|
if p is None or p.stdin is None:
|
|
return
|
|
try:
|
|
p.stdin.write(arr.tobytes())
|
|
p.stdin.flush()
|
|
self._total_sent += len(arr) / self.HW_RATE
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
def wait_finish(self) -> None:
|
|
# Close stdin so pacat drains its buffer and exits on its own.
|
|
with self._lock:
|
|
p = self._proc
|
|
self._proc = None
|
|
if p is None:
|
|
return
|
|
try:
|
|
if p.stdin:
|
|
p.stdin.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
p.wait(timeout=8)
|
|
except Exception:
|
|
try:
|
|
p.kill()
|
|
except Exception:
|
|
pass
|
|
|
|
def stop(self) -> None:
|
|
with self._lock:
|
|
self._stop_flag.set()
|
|
self._closed = True
|
|
p = self._proc
|
|
self._proc = None
|
|
if p is not None:
|
|
try:
|
|
if p.stdin:
|
|
p.stdin.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
p.terminate()
|
|
p.wait(timeout=2)
|
|
except Exception:
|
|
try:
|
|
p.kill()
|
|
except Exception:
|
|
pass
|
|
|
|
@property
|
|
def interrupted(self) -> bool:
|
|
return self._stop_flag.is_set()
|
|
|
|
@property
|
|
def total_sent_sec(self) -> float:
|
|
return self._total_sent
|
|
|
|
|
|
# ─── Factory ──────────────────────────────────────────────
|
|
|
|
_PROFILE_ALIASES = {
|
|
"builtin": "builtin",
|
|
"g1_builtin": "builtin",
|
|
"g1": "builtin",
|
|
"anker": "anker",
|
|
"anker_powerconf": "anker",
|
|
"hollyland": "hollyland_builtin",
|
|
"hollyland_builtin": "hollyland_builtin",
|
|
"jbl": "jbl_builtin_mic",
|
|
"jbl_builtin_mic": "jbl_builtin_mic",
|
|
}
|
|
|
|
SUPPORTED_PROFILES = ("builtin", "anker", "hollyland_builtin", "jbl_builtin_mic")
|
|
|
|
|
|
@dataclass
|
|
class AudioIO:
|
|
mic: Mic
|
|
speaker: Speaker
|
|
profile_id: str = field(default="builtin")
|
|
# Kept on the instance so the brain can rebuild a profile that needs
|
|
# the DDS handle (`builtin`, `hollyland_builtin`) during a hot-swap —
|
|
# without re-init'ing the channel. `repr=False` keeps it out of logs.
|
|
_audio_client: Optional[Any] = field(default=None, repr=False, compare=False)
|
|
|
|
def start(self) -> None:
|
|
self.mic.start()
|
|
|
|
def stop(self) -> None:
|
|
try:
|
|
self.speaker.stop()
|
|
except Exception:
|
|
log.warning("AudioIO speaker.stop failed", exc_info=True)
|
|
try:
|
|
self.mic.stop()
|
|
except Exception:
|
|
log.warning("AudioIO mic.stop failed", exc_info=True)
|
|
|
|
@classmethod
|
|
def build_backends(
|
|
cls,
|
|
profile_id: str,
|
|
*,
|
|
audio_client: Optional[Any] = None,
|
|
) -> tuple[Mic, Speaker]:
|
|
"""Return a fresh (Mic, Speaker) pair for a profile WITHOUT wrapping
|
|
in an AudioIO. Used by GeminiBrain.swap_audio_devices() for the
|
|
hot-swap path: build a new pair, switch refs, tear down the old.
|
|
|
|
Same validation as from_profile(). `audio_client` is required for
|
|
profiles that route playback through the G1 chest speaker.
|
|
"""
|
|
raw = (profile_id or "").strip().lower()
|
|
resolved = _PROFILE_ALIASES.get(raw)
|
|
if resolved is None:
|
|
raise ValueError(
|
|
f"unknown audio profile {profile_id!r}; "
|
|
f"supported: {', '.join(SUPPORTED_PROFILES)}"
|
|
)
|
|
if resolved == "builtin":
|
|
if audio_client is None:
|
|
raise ValueError(
|
|
"profile 'builtin' requires audio_client (G1 AudioClient)"
|
|
)
|
|
return BuiltinMic(), BuiltinSpeaker(audio_client)
|
|
if resolved == "anker":
|
|
return AnkerMic(), AnkerSpeaker()
|
|
if resolved == "hollyland_builtin":
|
|
if audio_client is None:
|
|
raise ValueError(
|
|
"profile 'hollyland_builtin' uses the G1 speaker — "
|
|
"requires audio_client"
|
|
)
|
|
return HollylandMic(), BuiltinSpeaker(audio_client)
|
|
if resolved == "jbl_builtin_mic":
|
|
# JBL speaker via pacat → PulseAudio default sink (pinned to the JBL
|
|
# by the dashboard) + the G1 built-in DDS mic (the JBL has no mic).
|
|
# pacat is used because PyAudio's 'pulse' device is unavailable in
|
|
# this env. Neither backend needs the AudioClient.
|
|
return BuiltinMic(), PulseStreamSpeaker(label="JBL", sink_pattern="jbl,bluez")
|
|
raise AssertionError(f"unhandled resolved profile: {resolved!r}")
|
|
|
|
@classmethod
|
|
def from_profile(
|
|
cls,
|
|
profile_id: str,
|
|
*,
|
|
audio_client: Optional[Any] = None,
|
|
) -> "AudioIO":
|
|
"""Build an AudioIO for the requested profile.
|
|
|
|
`audio_client` is the initialised `unitree_sdk2py` `AudioClient` and
|
|
is required for any profile that speaks through the G1's on-board
|
|
speaker (`builtin`, `hollyland_builtin`). It's also retained on the
|
|
returned AudioIO so a later hot-swap can rebuild without re-init.
|
|
"""
|
|
raw = (profile_id or "").strip().lower()
|
|
resolved = _PROFILE_ALIASES.get(raw)
|
|
if resolved is None:
|
|
raise ValueError(
|
|
f"unknown audio profile {profile_id!r}; "
|
|
f"supported: {', '.join(SUPPORTED_PROFILES)}"
|
|
)
|
|
mic, speaker = cls.build_backends(resolved, audio_client=audio_client)
|
|
return cls(mic=mic, speaker=speaker, profile_id=resolved,
|
|
_audio_client=audio_client)
|