1284 lines
61 KiB
Python
1284 lines
61 KiB
Python
"""Audio I/O manager — recording and playback via PyAudio.
|
|
|
|
Handles microphone capture and speaker playback. Thread-safe; one
|
|
playback at a time via play_lock.
|
|
|
|
(Speaker-monitor / `.monitor`-source capture lives in voice/typed_replay.py,
|
|
not here — see its parec/PyAudio MonitorRecorder.)
|
|
|
|
Device selection is dynamic — read from voice.audio_devices on each refresh.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import wave
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
try:
|
|
import numpy as np
|
|
_HAS_NUMPY = True
|
|
except ImportError:
|
|
np = None
|
|
_HAS_NUMPY = False
|
|
|
|
try:
|
|
import pyaudio
|
|
except ImportError:
|
|
pyaudio = None # optional — only needed for local PCM playback
|
|
|
|
# G1 AudioClient — used to route playback through the robot chest speaker
|
|
# via DDS `PlayStream` (the same pipe Gemini uses). Without this, WAV
|
|
# playback would go to the Jetson's built-in audio codec, which isn't
|
|
# wired to any audible output on the G1.
|
|
try:
|
|
from unitree_sdk2py.g1.audio.g1_audio_client import AudioClient
|
|
from unitree_sdk2py.g1.audio.g1_audio_api import (
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
)
|
|
_HAS_G1_AUDIO = True
|
|
except ImportError:
|
|
AudioClient = None
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY = 0
|
|
_HAS_G1_AUDIO = False
|
|
|
|
from Project.Sanad.config import (
|
|
CHANNELS,
|
|
CHUNK_SIZE,
|
|
RECEIVE_SAMPLE_RATE,
|
|
SINK as DEFAULT_SINK,
|
|
SOURCE as DEFAULT_SOURCE,
|
|
)
|
|
from Project.Sanad.core.logger import get_logger
|
|
from Project.Sanad.voice import audio_devices as ad
|
|
|
|
log = get_logger("audio_manager")
|
|
|
|
FORMAT = pyaudio.paInt16 if pyaudio else 8
|
|
|
|
# Default fallback constants only — the live selection is per-instance state
|
|
# on AudioManager (self._current_sink / self._current_source), guarded by
|
|
# self._device_lock. Keeping the selection module-global meant two
|
|
# AudioManager instances stomped each other's sink/source; it now lives on
|
|
# the instance.
|
|
|
|
# How long an applied pactl selection is trusted before the hot playback /
|
|
# recording path re-runs the (expensive, multi-shell) pactl scan. The
|
|
# audio_devices watcher and the dashboard Apply endpoint already re-resolve
|
|
# on device change, so a short TTL here is purely a backstop against an
|
|
# unobserved hot-unplug — it does NOT need to be tight.
|
|
_DEFAULTS_TTL_S = 5.0
|
|
|
|
|
|
def _run_pactl(args: list[str]) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(["pactl", *args], check=True, text=True, capture_output=True)
|
|
|
|
|
|
def _resolve_devices() -> tuple[str, str]:
|
|
"""Return current (sink, source) — falls back to config defaults."""
|
|
try:
|
|
cur = ad.current_selection()
|
|
sink = cur.get("sink") or DEFAULT_SINK
|
|
source = cur.get("source") or DEFAULT_SOURCE
|
|
return sink, source
|
|
except Exception as exc:
|
|
log.warning("Could not resolve audio devices: %s", exc)
|
|
return DEFAULT_SINK, DEFAULT_SOURCE
|
|
|
|
|
|
class _PulseOpenFailed(RuntimeError):
|
|
"""Signal from `_play_pcm_via_pulse` that PortAudio refused to open the
|
|
output stream (sink gone, bad I/O combination, etc.) — lets `play_wav`
|
|
fall back to G1 DDS chest playback so the user still hears the clip."""
|
|
|
|
|
|
class AudioManager:
|
|
def __init__(self):
|
|
if pyaudio is None:
|
|
raise RuntimeError(
|
|
"pyaudio not installed — AudioManager cannot play local PCM. "
|
|
"Install with `pip install pyaudio` (needs portaudio headers), "
|
|
"or rely on the G1 speaker via AudioClient.PlayStream."
|
|
)
|
|
self.pya = pyaudio.PyAudio()
|
|
self.play_lock = threading.Lock()
|
|
# Per-instance device selection (was module-global — two
|
|
# AudioManagers used to share one sink/source and stomp each
|
|
# other). _device_lock guards _current_sink / _current_source.
|
|
self._device_lock = threading.Lock()
|
|
self._current_sink = DEFAULT_SINK
|
|
self._current_source = DEFAULT_SOURCE
|
|
# Throttle ensure_audio_defaults() on the hot path — monotonic ts of
|
|
# the last successful apply. 0.0 = never applied yet.
|
|
self._defaults_applied_at = 0.0
|
|
# Cached PortAudio device index for the 'pulse'/'default' device
|
|
# (None = not probed; -1 = probed, absent). Lets play_pcm/record_mic
|
|
# route through PulseAudio instead of PortAudio's silent hw:0 default.
|
|
self._pulse_pa_index: int | None = None
|
|
# Lazily-initialised G1 DDS audio client (for play_wav → chest speaker)
|
|
self._g1_audio_client: Any = None
|
|
# G1 playback state — present during an active play_wav() call,
|
|
# None when idle. Mutated by pause_playback/resume_playback/stop_playback
|
|
# from other threads while _play_pcm_via_g1 holds play_lock.
|
|
self._play_state_lock = threading.Lock()
|
|
self._play_state: dict[str, Any] | None = None
|
|
# Monotonic play id — a new play_wav bumps it to preempt the in-flight
|
|
# one (so playing a record interrupts the previous instead of queueing).
|
|
self._play_epoch = 0
|
|
# Manual "hold" for the live Gemini pause. Default False = AUTO (record
|
|
# playback pauses Gemini only for the clip, then resumes). When True, the
|
|
# live voice is paused and STAYS paused (record playback won't resume it)
|
|
# until the dashboard releases the hold. Set via set_live_voice_hold().
|
|
self._live_voice_hold = False
|
|
# Resolve devices and set PulseAudio defaults at startup
|
|
self.refresh_devices()
|
|
self.ensure_audio_defaults(force=True)
|
|
|
|
def _get_g1_audio_client(self):
|
|
"""Return a cached G1 AudioClient (DDS) — creates on first use.
|
|
|
|
Assumes `ChannelFactoryInitialize` has already been called (our
|
|
ArmController does this at startup on eth0). Returns None if the
|
|
Unitree SDK is unavailable or init fails.
|
|
"""
|
|
if not _HAS_G1_AUDIO:
|
|
return None
|
|
if self._g1_audio_client is not None:
|
|
return self._g1_audio_client
|
|
try:
|
|
c = AudioClient()
|
|
# SHORT RPC timeout (was 5.0). The G1 "voice" service replies to RPCs
|
|
# on a topic SHARED with the live-voice child's AudioClient; when both
|
|
# run, the dashboard's reply ack is frequently lost in the collision,
|
|
# so _Call would block the FULL timeout per STOP/PlayStream — that was
|
|
# the "5s delay / no sound". The request itself is still published
|
|
# (audio plays); we don't need the ack, so fail fast. Good-case replies
|
|
# arrive in ~0.1s, so 0.6s keeps the happy path while killing the hang.
|
|
c.SetTimeout(0.6)
|
|
c.Init()
|
|
try:
|
|
c.SetVolume(100)
|
|
except Exception:
|
|
pass
|
|
self._g1_audio_client = c
|
|
log.info("G1 AudioClient initialized (for chest-speaker playback)")
|
|
except Exception as exc:
|
|
log.warning("G1 AudioClient init failed: %s", exc)
|
|
self._g1_audio_client = None
|
|
return self._g1_audio_client
|
|
|
|
def refresh_devices(self) -> dict[str, str]:
|
|
"""Re-read selected sink/source from audio_devices module."""
|
|
sink, source = _resolve_devices()
|
|
with self._device_lock:
|
|
self._current_sink, self._current_source = sink, source
|
|
log.info("AudioManager devices refreshed: sink=%s source=%s", sink, source)
|
|
return {"sink": sink, "source": source}
|
|
|
|
def ensure_audio_defaults(self, force: bool = False) -> None:
|
|
"""Re-scan all USB ports, resolve the active profile, set pactl defaults.
|
|
|
|
Called at startup AND before playback/recording so that even if the
|
|
user unplugs/re-plugs a device into a different port, the correct
|
|
sink/source is always used.
|
|
|
|
The scan (ad.apply_current_selection → current_selection →
|
|
detect_plugged_profiles) shells out to pactl many times, so on the
|
|
hot playback/record path we skip it when it ran within
|
|
`_DEFAULTS_TTL_S`. Pass `force=True` (startup / device-change) to
|
|
bypass the throttle.
|
|
"""
|
|
if not force:
|
|
with self._device_lock:
|
|
if (time.monotonic() - self._defaults_applied_at) < _DEFAULTS_TTL_S:
|
|
return
|
|
try:
|
|
result = ad.apply_current_selection()
|
|
cur = result.get("selection", {})
|
|
sink = cur.get("sink", "")
|
|
source = cur.get("source", "")
|
|
with self._device_lock:
|
|
self._current_sink = sink or DEFAULT_SINK
|
|
self._current_source = source or DEFAULT_SOURCE
|
|
self._defaults_applied_at = time.monotonic()
|
|
# At startup / device-change, re-apply the user's SAVED speaker volume
|
|
# to the active sink — PulseAudio doesn't persist our USB/BT (JBL/Anker)
|
|
# sink volume across restarts, so without this the JBL comes back at a
|
|
# default level instead of where the user left it.
|
|
if force:
|
|
self._restore_sink_volume()
|
|
except Exception as exc:
|
|
log.warning("Audio defaults not applied: %s", exc)
|
|
|
|
def _restore_sink_volume(self) -> None:
|
|
"""Apply config audio.g1_volume to the active PulseAudio sink."""
|
|
try:
|
|
from Project.Sanad.config import load_config
|
|
vol = int(((load_config() or {}).get("audio") or {}).get("g1_volume", 100))
|
|
vol = max(0, min(100, vol))
|
|
sink = self._current_sink or "@DEFAULT_SINK@"
|
|
import subprocess as _sp
|
|
_sp.run(["pactl", "set-sink-volume", sink, "%d%%" % vol],
|
|
timeout=3, check=False,
|
|
stdout=_sp.DEVNULL, stderr=_sp.DEVNULL)
|
|
if vol > 0:
|
|
_sp.run(["pactl", "set-sink-mute", sink, "0"], timeout=3,
|
|
check=False, stdout=_sp.DEVNULL, stderr=_sp.DEVNULL)
|
|
log.info("restored saved speaker volume → %d%% (sink=%s)", vol, sink)
|
|
except Exception as exc:
|
|
log.warning("restore sink volume failed: %s", exc)
|
|
|
|
def _pulse_device_index(self) -> int | None:
|
|
"""Resolve the PortAudio device index that routes through PulseAudio.
|
|
|
|
On this Jetson's conda PyAudio, opening with output/input device
|
|
index None lands on PortAudio's default — the silent hw:0
|
|
platform-sound card. Opening PortAudio's 'pulse' (or 'default')
|
|
device instead routes through the PulseAudio daemon, which
|
|
ensure_audio_defaults() has already pointed at the resolved
|
|
sink/source. Mirrors voice/audio_io.py's _resolve_device_index.
|
|
|
|
Returns the device index, or None when PortAudio exposes no
|
|
pulse/default device (then the caller falls back to PortAudio's
|
|
own default). Cached for the lifetime of the PyAudio handle.
|
|
"""
|
|
if self._pulse_pa_index is not None:
|
|
return self._pulse_pa_index if self._pulse_pa_index >= 0 else None
|
|
pulse_idx = default_idx = None
|
|
try:
|
|
for i in range(self.pya.get_device_count()):
|
|
info = self.pya.get_device_info_by_index(i)
|
|
name_lower = str(info.get("name", "")).lower()
|
|
if pulse_idx is None and name_lower == "pulse":
|
|
pulse_idx = i
|
|
elif default_idx is None and name_lower == "default":
|
|
default_idx = i
|
|
except Exception as exc:
|
|
log.debug("pulse device probe failed: %s", exc)
|
|
idx = pulse_idx if pulse_idx is not None else default_idx
|
|
self._pulse_pa_index = idx if idx is not None else -1
|
|
return idx
|
|
|
|
@property
|
|
def current_sink(self) -> str:
|
|
with self._device_lock:
|
|
return self._current_sink
|
|
|
|
@property
|
|
def current_source(self) -> str:
|
|
with self._device_lock:
|
|
return self._current_source
|
|
|
|
def close(self):
|
|
# Cached PortAudio device index is tied to this PyAudio handle —
|
|
# invalidate it so a re-init (audio reset) re-probes 'pulse'.
|
|
self._pulse_pa_index = None
|
|
self.pya.terminate()
|
|
|
|
def sample_width(self) -> int:
|
|
return self.pya.get_sample_size(FORMAT)
|
|
|
|
# -- playback --
|
|
|
|
def play_pcm(self, pcm_bytes: bytes, channels: int, sample_rate: int, sample_width: int):
|
|
with self.play_lock:
|
|
self.ensure_audio_defaults()
|
|
# Route through PortAudio's 'pulse' device so playback reaches
|
|
# the resolved sink — output_device_index=None defaults to the
|
|
# silent hw:0 platform-sound card on this Jetson's conda PyAudio.
|
|
stream = self.pya.open(
|
|
format=self.pya.get_format_from_width(sample_width),
|
|
channels=channels,
|
|
rate=sample_rate,
|
|
output=True,
|
|
output_device_index=self._pulse_device_index(),
|
|
frames_per_buffer=CHUNK_SIZE,
|
|
)
|
|
try:
|
|
frame_bytes = CHUNK_SIZE * channels * sample_width
|
|
for offset in range(0, len(pcm_bytes), frame_bytes):
|
|
stream.write(pcm_bytes[offset : offset + frame_bytes])
|
|
finally:
|
|
stream.stop_stream()
|
|
stream.close()
|
|
|
|
# Sink-name substrings that mean "PulseAudio routes this somewhere
|
|
# audible without DDS" — extend the tuple to add more USB cards (e.g.
|
|
# hollyland sink). Matched case-insensitively.
|
|
# "jbl"/"bluez" → the JBL Bluetooth speaker (and any bluez sink) is a real
|
|
# PulseAudio sink, so record playback must go via paplay/PulseAudio, NOT the
|
|
# G1 DDS chest speaker.
|
|
_PULSE_SINK_MARKERS = ("anker", "powerconf", "hollyland", "jbl", "bluez")
|
|
# Sample rate Anker PowerConf (and most USB UAC1 cards) accept natively
|
|
# — used as the resample target before opening a PortAudio stream so
|
|
# we don't hit paInvalidSampleRate when the WAV's native rate
|
|
# (24kHz from Gemini TTS, 22050 from old TTS, etc.) doesn't match
|
|
# the card's HW caps.
|
|
_PULSE_TARGET_RATE = 48_000
|
|
|
|
@staticmethod
|
|
def _resample_pcm16(pcm_bytes: bytes, channels: int,
|
|
src_rate: int, dst_rate: int) -> bytes:
|
|
"""Linear-interpolation resample of int16 PCM. numpy-only (no scipy)
|
|
— matches the pattern used by `_play_pcm_via_g1`.
|
|
|
|
Returns the resampled PCM bytes (same channel layout). No-op when
|
|
rates already match. Requires numpy (caller guards with _HAS_NUMPY).
|
|
"""
|
|
if src_rate == dst_rate or not pcm_bytes:
|
|
return pcm_bytes
|
|
arr = np.frombuffer(pcm_bytes, dtype=np.int16)
|
|
if channels > 1:
|
|
# De-interleave so each channel resamples independently
|
|
# (cheap on numpy; avoids stereo→mono surprises).
|
|
if arr.size % channels != 0:
|
|
arr = arr[: arr.size - (arr.size % channels)]
|
|
arr = arr.reshape(-1, channels)
|
|
n_in = arr.shape[0]
|
|
n_out = max(1, int(n_in * dst_rate / src_rate))
|
|
xp = np.arange(n_in, dtype=np.float64)
|
|
x_new = np.linspace(0, n_in, n_out, endpoint=False)
|
|
cols = [
|
|
np.interp(x_new, xp, arr[:, ch].astype(np.float64))
|
|
for ch in range(channels)
|
|
]
|
|
out = np.column_stack(cols).astype(np.int16)
|
|
return out.tobytes()
|
|
n_in = arr.size
|
|
n_out = max(1, int(n_in * dst_rate / src_rate))
|
|
out = np.interp(
|
|
np.linspace(0, n_in, n_out, endpoint=False),
|
|
np.arange(n_in, dtype=np.float64),
|
|
arr.astype(np.float64),
|
|
).astype(np.int16)
|
|
return out.tobytes()
|
|
|
|
def _active_sink_name(self) -> str:
|
|
"""Return the currently-tracked default sink name, ORIGINAL case
|
|
preserved.
|
|
|
|
Reads `self.current_sink` which is kept in lock-step with pactl
|
|
defaults by `refresh_devices()` (called by the dashboard Apply
|
|
endpoint and by the live-Gemini watcher on profile swaps). Empty
|
|
string if nothing's tracked yet.
|
|
|
|
IMPORTANT: PulseAudio sink names are CASE-SENSITIVE. paplay
|
|
--device=<name> needs the exact name pactl uses (e.g.
|
|
`alsa_output.usb-Anker_PowerConf_A3321-DEV-SN1-01.analog-stereo`).
|
|
Routing-decision substring checks (against `_PULSE_SINK_MARKERS`)
|
|
lowercase BOTH sides explicitly so the case-sensitivity of the
|
|
sink name doesn't break marker matching.
|
|
"""
|
|
try:
|
|
return (self.current_sink or "").strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
def play_wav(self, path: Path,
|
|
record_name: str | None = None) -> dict[str, Any]:
|
|
"""Play a WAV file through the speaker that matches the active
|
|
PulseAudio default sink:
|
|
|
|
• Default sink is a USB conference speaker (Anker PowerConf,
|
|
Hollyland, anything matching `_PULSE_SINK_MARKERS`) → write
|
|
via PyAudio → PortAudio 'pulse' device → PulseAudio default
|
|
sink. This works even when the user picked the device via
|
|
the dashboard's "Manual sink/source override" (no profile
|
|
id) — we key off the sink name, not the profile.
|
|
• Default sink is the Jetson platform-sound (or anything that
|
|
doesn't match a marker) → use G1 DDS (`AudioClient.PlayStream`)
|
|
because platform-sound isn't wired to any audible speaker on
|
|
the G1; only the DDS pipe reaches the chest loudspeaker.
|
|
|
|
`record_name` is purely a label surfaced via `playback_status()`
|
|
so the dashboard can show "Now playing: t6_1" etc.
|
|
"""
|
|
with wave.open(str(path), "rb") as wf:
|
|
channels = wf.getnchannels()
|
|
sw = wf.getsampwidth()
|
|
rate = wf.getframerate()
|
|
data = wf.readframes(wf.getnframes())
|
|
|
|
sink = self._active_sink_name()
|
|
sink_lc = sink.lower()
|
|
# Marker check is case-insensitive; the original `sink` (with case
|
|
# preserved) is what gets passed to paplay --device.
|
|
use_pulse = any(m in sink_lc for m in self._PULSE_SINK_MARKERS)
|
|
client = self._get_g1_audio_client() if not use_pulse else None
|
|
|
|
# Lip-sync: drive the LED mask mouth from THIS clip's amplitude while it
|
|
# plays (synced to the playback position via _play_state), same as the
|
|
# live Gemini voice does. Best-effort; stopped + mouth-closed when the
|
|
# playback path below returns. No-op if numpy / the mask are unavailable.
|
|
_mask_stop = threading.Event()
|
|
self._start_mask_lipsync(data, channels, sw, rate, _mask_stop)
|
|
try:
|
|
if not use_pulse and client is not None and _HAS_NUMPY and sw == 2:
|
|
log.info("play_wav route=g1_dds sink=%s record=%s",
|
|
sink or "?", record_name or "?")
|
|
self._play_pcm_via_g1(data, channels, rate, record_name=record_name)
|
|
route = "g1_dds"
|
|
else:
|
|
if not use_pulse and _HAS_G1_AUDIO and client is None:
|
|
log.warning("play_wav: non-PulseAudio sink but G1 AudioClient "
|
|
"unavailable — falling back to PulseAudio default")
|
|
# Prefer paplay subprocess when it's installed — bypasses
|
|
# PortAudio (which on this Jetson's conda env doesn't expose a
|
|
# 'pulse' device, leading to PyAudio defaulting to the silent
|
|
# Jetson platform-sound card). paplay routes through PulseAudio
|
|
# at the daemon level so audio actually reaches the Anker sink.
|
|
use_paplay = bool(self._paplay_binary())
|
|
try:
|
|
if use_paplay:
|
|
log.info("play_wav route=paplay sink=%s record=%s",
|
|
sink or "default", record_name or "?")
|
|
self._play_pcm_via_paplay(data, channels, rate, sw,
|
|
record_name=record_name)
|
|
route = "paplay"
|
|
else:
|
|
log.info("play_wav route=pulse sink=%s record=%s "
|
|
"(paplay not installed — using PyAudio)",
|
|
sink or "default", record_name or "?")
|
|
self._play_pcm_via_pulse(data, channels, rate, sw,
|
|
record_name=record_name)
|
|
route = "pulse"
|
|
except _PulseOpenFailed as exc:
|
|
# paplay spawn failed, USB device gone mid-flight, etc.
|
|
# Fall back to DDS chest if available so the user gets
|
|
# audio out of *something* rather than silence.
|
|
fb_client = self._get_g1_audio_client()
|
|
if fb_client is not None and _HAS_NUMPY and sw == 2:
|
|
log.warning("play_wav route=%s failed (%s); falling "
|
|
"back to g1_dds",
|
|
"paplay" if use_paplay else "pulse", exc)
|
|
self._play_pcm_via_g1(data, channels, rate,
|
|
record_name=record_name)
|
|
route = ("paplay" if use_paplay else "pulse") + "_failed_to_g1_dds"
|
|
else:
|
|
log.warning("play_wav pulse path failed (%s); no DDS "
|
|
"fallback available", exc)
|
|
route = ("paplay" if use_paplay else "pulse") + "_failed"
|
|
finally:
|
|
_mask_stop.set()
|
|
|
|
duration = len(data) / (rate * channels * sw) if rate else 0
|
|
return {"path": str(path), "duration_seconds": round(duration, 3),
|
|
"route": route, "sink": sink or "default"}
|
|
|
|
def _set_live_voice_paused(self, paused: bool) -> None:
|
|
"""Pause/resume the live Gemini session around a record playback so it
|
|
doesn't talk over (or react to) the clip. Best-effort + lazy import to
|
|
avoid a hard dependency on the dashboard process; no-op if the live
|
|
subprocess isn't running.
|
|
|
|
Runs on a DETACHED daemon thread: the pause is sent over the child's
|
|
stdin pipe, and when the child is busy (e.g. mid-reconnect) that write
|
|
can block. We must NEVER let it stall the playback loop — which calls
|
|
this right before streaming — or the record goes silent. Fire-and-forget
|
|
keeps playback starting immediately; a slightly late pause is harmless."""
|
|
def _do() -> None:
|
|
try:
|
|
from Project.Sanad.main import live_sub
|
|
if (live_sub is not None and hasattr(live_sub, "send_pause")
|
|
and hasattr(live_sub, "is_running")
|
|
and live_sub.is_running()):
|
|
live_sub.send_pause(paused)
|
|
except Exception:
|
|
pass
|
|
threading.Thread(target=_do, name="live-voice-pause", daemon=True).start()
|
|
|
|
def set_live_voice_hold(self, hold: bool) -> bool:
|
|
"""Manual hold for the live-Gemini pause.
|
|
|
|
hold=True → pause the live voice NOW and keep it paused; record playback
|
|
will not auto-resume it (the finally skips the resume).
|
|
hold=False → release: resume the live voice, unless a clip is currently
|
|
playing (that play's own finally resumes when it ends).
|
|
Returns the resulting hold state. Idempotent."""
|
|
self._live_voice_hold = bool(hold)
|
|
if self._live_voice_hold:
|
|
self._set_live_voice_paused(True)
|
|
else:
|
|
with self._play_state_lock:
|
|
playing = self._play_state is not None
|
|
if not playing:
|
|
self._set_live_voice_paused(False)
|
|
log.info("live-voice hold → %s", "PAUSED" if self._live_voice_hold else "AUTO")
|
|
return self._live_voice_hold
|
|
|
|
# -- LED mask lip-sync for record playback --------------------------------
|
|
|
|
_MASK_FRAME_SEC = 0.08 # 80 ms mouth-level frame (matches the Gemini lip-sync)
|
|
|
|
def _set_mask_mouth(self, level: int) -> None:
|
|
"""Push a mouth-open level (0..3) to the LED mask. Best-effort, lazy
|
|
import, thread-safe + a no-op if the mask isn't running."""
|
|
try:
|
|
from Project.Sanad.main import mask_face
|
|
if mask_face is not None and hasattr(mask_face, "set_mouth"):
|
|
mask_face.set_mouth(int(level))
|
|
except Exception:
|
|
pass
|
|
|
|
def _mouth_envelope(self, data: bytes, channels: int, sw: int,
|
|
rate: int) -> list[int]:
|
|
"""Per-80ms mouth-open levels (0..3) from a clip's RMS — same thresholds
|
|
the Gemini child uses, so records and the live voice move the mouth the
|
|
same way. Empty if numpy/format unsupported."""
|
|
if not _HAS_NUMPY or sw != 2 or not rate:
|
|
return []
|
|
try:
|
|
arr = np.frombuffer(data, dtype=np.int16)
|
|
if channels == 2 and arr.size % 2 == 0:
|
|
arr = arr.reshape(-1, 2).mean(axis=1).astype(np.int16)
|
|
frame = max(1, int(rate * self._MASK_FRAME_SEC))
|
|
env: list[int] = []
|
|
for i in range(0, len(arr), frame):
|
|
chunk = arr[i:i + frame].astype(np.float64)
|
|
rms = float(np.sqrt(np.mean(chunk ** 2))) if chunk.size else 0.0
|
|
env.append(0 if rms < 140 else 1 if rms < 650
|
|
else 2 if rms < 1700 else 3)
|
|
return env
|
|
except Exception:
|
|
return []
|
|
|
|
def _start_mask_lipsync(self, data: bytes, channels: int, sw: int,
|
|
rate: int, stop_evt: "threading.Event") -> None:
|
|
env = self._mouth_envelope(data, channels, sw, rate)
|
|
if not env:
|
|
return
|
|
threading.Thread(
|
|
target=self._mask_mouth_driver, args=(env, stop_evt),
|
|
name="rec-lipsync", daemon=True,
|
|
).start()
|
|
|
|
def _mask_mouth_driver(self, env: list[int],
|
|
stop_evt: "threading.Event") -> None:
|
|
"""Walk the mouth envelope synced to the live playback position
|
|
(_play_state) and drive the mask mouth. Honours pause (mouth closed)
|
|
and seeks. Closes the mouth when the play ends."""
|
|
last = -1
|
|
try:
|
|
while not stop_evt.is_set():
|
|
t = -1.0
|
|
with self._play_state_lock:
|
|
st = self._play_state
|
|
if st is not None and not st["paused"] and st["play_started_at"] > 0:
|
|
r = st["rate"] or 1
|
|
t = (st["play_started_pos"] / r
|
|
+ (time.time() - st["play_started_at"]))
|
|
lvl = 0
|
|
if t >= 0:
|
|
idx = int(t / self._MASK_FRAME_SEC)
|
|
lvl = env[idx] if 0 <= idx < len(env) else 0
|
|
if lvl != last:
|
|
self._set_mask_mouth(lvl)
|
|
last = lvl
|
|
stop_evt.wait(0.05)
|
|
finally:
|
|
self._set_mask_mouth(0)
|
|
|
|
# -- G1 DDS-routed playback --
|
|
|
|
_G1_STREAM_APP = "sanad_playback"
|
|
# The live Gemini voice streams to the SAME G1 chest speaker under a
|
|
# DIFFERENT app_name (config/voice_config.json speaker.app_name, default
|
|
# "sanad"). The G1 "voice" audio service is per-app-name, so a record must
|
|
# STOP that app too — otherwise Gemini's chunked PlayStream("sanad", …) per
|
|
# spoken word keeps stomping the record's single PlayStream and the clip is
|
|
# silent while its counter ticks. STOP_PLAY is process-agnostic (keyed only
|
|
# by app_name on the shared DDS "voice" service), so stopping it from here
|
|
# halts the separate voice child's stream. Must match voice_config.json.
|
|
_LIVE_VOICE_APP = "sanad"
|
|
_G1_HW_RATE = 16_000
|
|
|
|
def stop_playback(self) -> None:
|
|
"""Stop any in-flight G1 DDS audio stream + tear down the playback
|
|
state so a pause/resume cycle can't keep trying.
|
|
|
|
Used by the dashboard's Stop button. Safe to call even when
|
|
nothing is playing — the DDS call is idempotent.
|
|
"""
|
|
with self._play_state_lock:
|
|
if self._play_state is not None:
|
|
self._play_state["stop"] = True
|
|
client = self._get_g1_audio_client()
|
|
if client is None:
|
|
return
|
|
try:
|
|
client._Call(
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
json.dumps({"app_name": self._G1_STREAM_APP}),
|
|
)
|
|
log.info("G1 audio stream stopped (app=%s)", self._G1_STREAM_APP)
|
|
except Exception as exc:
|
|
log.warning("stop_playback failed: %s", exc)
|
|
|
|
def pause_playback(self) -> dict[str, Any]:
|
|
"""Pause the active G1 playback. The play loop notices the flag,
|
|
sends STOP_PLAY to halt the chest speaker, and advances the saved
|
|
position by the time elapsed since this chunk started. resume()
|
|
re-pushes from there. No-op if nothing is playing."""
|
|
with self._play_state_lock:
|
|
if self._play_state is None:
|
|
return {"ok": False, "reason": "nothing playing"}
|
|
if self._play_state["paused"]:
|
|
return {"ok": True, "already": True, "paused": True}
|
|
self._play_state["paused"] = True
|
|
log.info("Playback paused (record=%s)",
|
|
self._play_state.get("record_name") or "?")
|
|
return {"ok": True, "paused": True}
|
|
|
|
def resume_playback(self) -> dict[str, Any]:
|
|
"""Resume after a pause. The play loop re-pushes pcm[pos:] to G1
|
|
and re-enters the wait/poll cycle."""
|
|
with self._play_state_lock:
|
|
if self._play_state is None:
|
|
return {"ok": False, "reason": "nothing playing"}
|
|
if not self._play_state["paused"]:
|
|
return {"ok": True, "already": True, "paused": False}
|
|
self._play_state["paused"] = False
|
|
log.info("Playback resumed (record=%s)",
|
|
self._play_state.get("record_name") or "?")
|
|
return {"ok": True, "resumed": True}
|
|
|
|
def seek_playback(self, position_sec: float) -> dict[str, Any]:
|
|
"""Jump to `position_sec` in the active clip. The play loop re-pushes
|
|
pcm[pos:] from the new position (works whether playing or paused — if
|
|
paused, the new position takes effect on resume)."""
|
|
with self._play_state_lock:
|
|
if self._play_state is None:
|
|
return {"ok": False, "reason": "nothing playing"}
|
|
rate = self._play_state["rate"] or 1
|
|
total = self._play_state["total_samples"]
|
|
target = max(0, min(total, int(float(position_sec) * rate)))
|
|
self._play_state["pos"] = target
|
|
self._play_state["play_started_pos"] = target
|
|
self._play_state["play_started_at"] = 0.0 # park until re-push
|
|
self._play_state["seek"] = True
|
|
log.info("Playback seek → %.2fs (record=%s)",
|
|
target / rate, self._play_state.get("record_name") or "?")
|
|
return {"ok": True, "position_sec": round(target / rate, 2),
|
|
"duration_sec": round(total / rate, 2) if rate else 0.0}
|
|
|
|
def playback_status(self) -> dict[str, Any]:
|
|
"""Snapshot of the current playback for the dashboard. Returns
|
|
`playing=False` when idle. `position_sec` is best-effort —
|
|
derived from elapsed wall time since the last PlayStream call."""
|
|
with self._play_state_lock:
|
|
if self._play_state is None:
|
|
return {"playing": False, "paused": False, "record_name": None,
|
|
"position_sec": 0.0, "duration_sec": 0.0,
|
|
"live_hold": self._live_voice_hold}
|
|
rate = self._play_state["rate"] or 1
|
|
total = self._play_state["total_samples"]
|
|
pos = self._play_state["pos"]
|
|
if (not self._play_state["paused"]
|
|
and self._play_state["play_started_at"] > 0):
|
|
elapsed = time.time() - self._play_state["play_started_at"]
|
|
advance = int(max(0.0, elapsed) * rate)
|
|
pos = min(self._play_state["play_started_pos"] + advance, total)
|
|
return {
|
|
"playing": True,
|
|
"paused": self._play_state["paused"],
|
|
"record_name": self._play_state.get("record_name"),
|
|
"position_sec": round(pos / rate, 2),
|
|
"duration_sec": round(total / rate, 2) if rate else 0.0,
|
|
"live_hold": self._live_voice_hold,
|
|
}
|
|
|
|
def _play_pcm_via_g1(self, pcm_bytes: bytes, channels: int,
|
|
source_rate: int,
|
|
record_name: str | None = None) -> None:
|
|
"""Stream int16 PCM to the G1 chest speaker via AudioClient.PlayStream,
|
|
with pause / resume / stop support.
|
|
|
|
Converts stereo → mono and resamples to 16 kHz (the rate
|
|
AudioClient expects). The play loop pushes pcm[pos:] in one
|
|
PlayStream call, then polls _play_state every 50 ms while the
|
|
clip drains so pause / stop are honoured promptly. Pause sends
|
|
STOP_PLAY, snapshots the position from elapsed wall time, then
|
|
loops until resumed or stopped. Resume re-pushes pcm[pos:].
|
|
"""
|
|
client = self._get_g1_audio_client()
|
|
if client is None:
|
|
raise RuntimeError("G1 AudioClient not available")
|
|
|
|
arr = np.frombuffer(pcm_bytes, dtype=np.int16)
|
|
if channels == 2 and arr.size % 2 == 0:
|
|
arr = arr.reshape(-1, 2).mean(axis=1).astype(np.int16)
|
|
if source_rate != self._G1_HW_RATE and arr.size:
|
|
target_len = max(1, int(len(arr) * self._G1_HW_RATE / source_rate))
|
|
arr = np.interp(
|
|
np.linspace(0, len(arr), target_len, endpoint=False),
|
|
np.arange(len(arr)),
|
|
arr.astype(np.float64),
|
|
).astype(np.int16)
|
|
rate = self._G1_HW_RATE
|
|
total_samples = len(arr)
|
|
|
|
# Preempt any in-flight playback: signal it to stop + bump the epoch so
|
|
# a NEW play starts promptly instead of queueing behind the previous
|
|
# clip (or blocking forever on a paused one). This is what makes
|
|
# "play another record" interrupt-and-start rather than stall.
|
|
with self._play_state_lock:
|
|
if self._play_state is not None:
|
|
self._play_state["stop"] = True
|
|
self._play_epoch += 1
|
|
my_epoch = self._play_epoch
|
|
|
|
# play_lock serialises overlapping play_wav() calls; the preempted
|
|
# playback (stop=True) releases it promptly. pause/resume/stop do NOT
|
|
# take it (they only touch _play_state under _play_state_lock).
|
|
with self.play_lock:
|
|
# State is set INSIDE the lock now (was before — which let a second
|
|
# play stomp the first's state). Bail if a still-newer play won the
|
|
# race while we waited for the lock.
|
|
with self._play_state_lock:
|
|
if my_epoch != self._play_epoch:
|
|
return
|
|
self._play_state = {
|
|
"record_name": record_name,
|
|
"rate": rate,
|
|
"total_samples": total_samples,
|
|
"pos": 0,
|
|
"paused": False,
|
|
"stop": False,
|
|
"seek": False,
|
|
"play_started_at": 0.0,
|
|
"play_started_pos": 0,
|
|
"epoch": my_epoch,
|
|
}
|
|
# Pause the live Gemini for the clip (idempotent across preempting
|
|
# plays; the last play's finally resumes it).
|
|
self._set_live_voice_paused(True)
|
|
try:
|
|
while True:
|
|
# Snapshot the state for this iteration
|
|
with self._play_state_lock:
|
|
st = self._play_state
|
|
if st is None or st.get("epoch") != my_epoch or st["stop"]:
|
|
break
|
|
if st["paused"]:
|
|
paused_now = True
|
|
sub_bytes = None
|
|
sub_total_sec = 0.0
|
|
else:
|
|
paused_now = False
|
|
st["seek"] = False # consumed — pushing from st["pos"]
|
|
pos = st["pos"]
|
|
if pos >= total_samples:
|
|
break
|
|
sub_bytes = arr[pos:].tobytes()
|
|
sub_total_sec = (total_samples - pos) / rate
|
|
st["play_started_pos"] = pos
|
|
# Set for real only AFTER PlayStream fires (below) so
|
|
# the dashboard counter doesn't tick on a stream that
|
|
# was dropped/never started. 0.0 → playback_status
|
|
# parks at play_started_pos until audio truly begins.
|
|
st["play_started_at"] = 0.0
|
|
|
|
if paused_now:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
# Push remainder to G1. A SINGLE STOP suffices: the G1 "voice"
|
|
# service treats the chest speaker as one stream and STOP_PLAY
|
|
# is global (stops whatever's playing regardless of app_name),
|
|
# so this also clears any Gemini stream. Two STOP RPCs doubled
|
|
# the latency on the shared DDS bus and stalled the start; the
|
|
# live-voice pause (child stops its own stream) covers Gemini.
|
|
stream_id = f"wav_{int(time.time() * 1000)}"
|
|
try:
|
|
client._Call(
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
json.dumps({"app_name": self._G1_STREAM_APP}),
|
|
)
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.15)
|
|
# After the STOP+settle window, re-check our state: bail if a
|
|
# newer press superseded us (no churn / no queue), or loop back
|
|
# if a Pause was clicked during the window (don't leak audio).
|
|
with self._play_state_lock:
|
|
st = self._play_state
|
|
if st is None or st.get("epoch") != my_epoch or st["stop"]:
|
|
break
|
|
paused_in_settle = st["paused"]
|
|
if paused_in_settle:
|
|
continue
|
|
# PlayStream can raise on a DDS hiccup; if it does, abort this
|
|
# play rather than leaving play_started_at=0 while the poll loop
|
|
# runs (which would make the pause-math elapsed huge and snap
|
|
# the counter to the end). Set the timestamp only on success.
|
|
try:
|
|
client.PlayStream(self._G1_STREAM_APP, stream_id, sub_bytes)
|
|
except Exception as exc:
|
|
log.warning("PlayStream failed: %s", exc)
|
|
break
|
|
with self._play_state_lock:
|
|
if (self._play_state is not None
|
|
and self._play_state.get("epoch") == my_epoch):
|
|
self._play_state["play_started_at"] = time.time()
|
|
# NOTE: do NOT issue a STOP_PLAY here. The G1 "voice" service
|
|
# treats the chest speaker as a SINGLE stream — STOP_PLAY halts
|
|
# whatever is currently playing regardless of app_name (verified
|
|
# empirically: a post-PlayStream STOP("sanad") silenced the
|
|
# record entirely). The pre-stream STOP(both) above already
|
|
# cleared Gemini; the live-voice pause keeps it from re-pushing.
|
|
|
|
# Poll for pause / stop while the clip drains
|
|
poll_deadline = time.time() + sub_total_sec + 0.3
|
|
interrupted = False
|
|
while time.time() < poll_deadline:
|
|
with self._play_state_lock:
|
|
if self._play_state is None or self._play_state["stop"]:
|
|
interrupted = True
|
|
try:
|
|
client._Call(
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
json.dumps({"app_name": self._G1_STREAM_APP}),
|
|
)
|
|
except Exception:
|
|
pass
|
|
break
|
|
if self._play_state.get("seek"):
|
|
# Seek requested — halt the current stream and let
|
|
# the outer loop re-push from the new pos (already
|
|
# set by seek_playback). Cleared in the push branch.
|
|
try:
|
|
client._Call(
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
json.dumps({"app_name": self._G1_STREAM_APP}),
|
|
)
|
|
except Exception:
|
|
pass
|
|
interrupted = True
|
|
break
|
|
if self._play_state["paused"]:
|
|
# Halt G1 and snapshot the new position
|
|
try:
|
|
client._Call(
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
json.dumps({"app_name": self._G1_STREAM_APP}),
|
|
)
|
|
except Exception:
|
|
pass
|
|
elapsed = (time.time()
|
|
- self._play_state["play_started_at"])
|
|
advance = int(max(0.0, elapsed) * rate)
|
|
self._play_state["pos"] = min(
|
|
self._play_state["play_started_pos"] + advance,
|
|
total_samples,
|
|
)
|
|
interrupted = True
|
|
break
|
|
time.sleep(0.05)
|
|
|
|
if not interrupted:
|
|
# Finished naturally — mark fully consumed and exit
|
|
with self._play_state_lock:
|
|
if self._play_state is not None:
|
|
self._play_state["pos"] = total_samples
|
|
try:
|
|
client._Call(
|
|
ROBOT_API_ID_AUDIO_STOP_PLAY,
|
|
json.dumps({"app_name": self._G1_STREAM_APP}),
|
|
)
|
|
except Exception:
|
|
pass
|
|
break
|
|
finally:
|
|
with self._play_state_lock:
|
|
# Only clear if it's still OURS — a preempting play may have
|
|
# already installed its own state after bumping the epoch.
|
|
mine = (self._play_state is not None
|
|
and self._play_state.get("epoch") == my_epoch)
|
|
if mine:
|
|
self._play_state = None
|
|
# Resume the live Gemini only if WE were the last play — if a
|
|
# newer play preempted us, it keeps Gemini paused and will
|
|
# resume when it finishes (no pause/resume thrash on rapid clicks).
|
|
# Skip the resume entirely while a manual hold is active: the user
|
|
# wants Gemini to STAY paused until they release it.
|
|
if mine and not self._live_voice_hold:
|
|
self._set_live_voice_paused(False)
|
|
|
|
# paplay binary path. Cached on first probe so we don't keep re-shelling
|
|
# `which paplay` on every play_wav call. None = probe pending; "" = absent.
|
|
_PAPLAY_BIN: str | None = None
|
|
|
|
@classmethod
|
|
def _paplay_binary(cls) -> str:
|
|
"""Return the absolute path to `paplay` if installed, else "".
|
|
Cached for the lifetime of the process — paplay doesn't appear/
|
|
disappear mid-run."""
|
|
if cls._PAPLAY_BIN is None:
|
|
from shutil import which
|
|
cls._PAPLAY_BIN = which("paplay") or ""
|
|
return cls._PAPLAY_BIN
|
|
|
|
def _play_pcm_via_paplay(self, pcm_bytes: bytes, channels: int,
|
|
sample_rate: int, sample_width: int,
|
|
record_name: str | None = None) -> None:
|
|
"""Play int16 PCM via the `paplay` subprocess. Bypasses PortAudio
|
|
entirely — we just pipe raw PCM into paplay's stdin and let
|
|
PulseAudio do the resampling/format conversion/device routing.
|
|
|
|
Why this exists: on conda's bundled PyAudio (the build shipped in
|
|
the gemini_sdk env on this Jetson), PortAudio does NOT enumerate a
|
|
'pulse' device — only direct ALSA hw:N entries. Opening
|
|
`output_device_index=None` then defaults to hw:0 which is the
|
|
Jetson `platform-sound` card → silent (not wired to any speaker).
|
|
Opening a discrete `hw:N` for the Anker grabs the card exclusively
|
|
and PulseAudio drops it. Neither path actually plays through the
|
|
Anker. paplay sidesteps the whole stack.
|
|
|
|
Targets the dashboard's currently-selected sink by name via
|
|
`--device=<sink>`, which guarantees the audio goes to the same
|
|
place pactl set-default-sink would have routed.
|
|
|
|
Reuses the same `_play_state` machinery as the DDS path so the
|
|
dashboard's Pause / Stop / position-meter behave identically.
|
|
"""
|
|
sink_name = self._active_sink_name()
|
|
bytes_per_sample = max(1, channels * sample_width)
|
|
total_bytes = len(pcm_bytes) - (len(pcm_bytes) % bytes_per_sample)
|
|
total_samples = total_bytes // bytes_per_sample
|
|
chunk_bytes = max(
|
|
bytes_per_sample, (sample_rate // 10) * bytes_per_sample,
|
|
)
|
|
# paplay format codes: s16le is the only one we ever produce here.
|
|
fmt = "s16le" if sample_width == 2 else \
|
|
"s32le" if sample_width == 4 else \
|
|
"u8"
|
|
# Keep cmd minimal — older paplay versions reject unknown long
|
|
# options and exit immediately (manifests as instant paplay death +
|
|
# a flood of BrokenPipeError on stdin write). --raw / --format /
|
|
# --rate / --channels / --device are all standard since 0.9.x.
|
|
cmd = [
|
|
self._paplay_binary(), "--raw",
|
|
f"--format={fmt}", f"--rate={sample_rate}",
|
|
f"--channels={channels}",
|
|
]
|
|
if sink_name:
|
|
cmd.extend(["--device", sink_name])
|
|
|
|
with self._play_state_lock:
|
|
self._play_state = {
|
|
"record_name": record_name,
|
|
"rate": sample_rate,
|
|
"total_samples": total_samples,
|
|
"pos": 0,
|
|
"paused": False,
|
|
"stop": False,
|
|
"play_started_at": 0.0,
|
|
"play_started_pos": 0,
|
|
}
|
|
|
|
with self.play_lock:
|
|
try:
|
|
while True:
|
|
with self._play_state_lock:
|
|
st = self._play_state
|
|
if st is None or st["stop"]:
|
|
break
|
|
if st["paused"]:
|
|
time.sleep(0.1)
|
|
continue
|
|
pos = st["pos"]
|
|
if pos >= total_samples:
|
|
break
|
|
st["play_started_pos"] = pos
|
|
st["play_started_at"] = time.time()
|
|
|
|
byte_pos = pos * bytes_per_sample
|
|
local_pos = pos
|
|
try:
|
|
proc = subprocess.Popen(
|
|
cmd, stdin=subprocess.PIPE,
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
|
|
)
|
|
except Exception as exc:
|
|
log.warning("paplay spawn failed (%s) — signalling "
|
|
"DDS fallback", exc)
|
|
with self._play_state_lock:
|
|
self._play_state = None
|
|
raise _PulseOpenFailed(str(exc)) from exc
|
|
|
|
# Brief settle so paplay can validate args + connect to
|
|
# PulseAudio. If it's going to die (bad sink, format,
|
|
# connection refused), it dies within ~50ms. Without
|
|
# this check, the next stdin.write() would get a sea
|
|
# of BrokenPipeError messages and the outer loop would
|
|
# keep re-spawning forever.
|
|
time.sleep(0.05)
|
|
if proc.poll() is not None:
|
|
try:
|
|
err = (proc.stderr.read() or b"").decode(
|
|
"utf-8", "replace").strip()[:400]
|
|
except Exception:
|
|
err = ""
|
|
log.warning("paplay died immediately rc=%d device=%s err=%s",
|
|
proc.returncode, sink_name or "default", err)
|
|
with self._play_state_lock:
|
|
self._play_state = None
|
|
raise _PulseOpenFailed(
|
|
f"paplay rc={proc.returncode} {err or 'no stderr'}"
|
|
)
|
|
|
|
interrupted = False
|
|
fatal_exc: Exception | None = None
|
|
try:
|
|
while byte_pos < total_bytes:
|
|
with self._play_state_lock:
|
|
ps = self._play_state
|
|
if ps is None or ps["stop"]:
|
|
interrupted = True
|
|
break
|
|
if ps["paused"]:
|
|
ps["pos"] = local_pos
|
|
interrupted = True
|
|
break
|
|
end = min(byte_pos + chunk_bytes, total_bytes)
|
|
try:
|
|
proc.stdin.write(pcm_bytes[byte_pos:end])
|
|
proc.stdin.flush()
|
|
except (BrokenPipeError, OSError) as exc:
|
|
# paplay died mid-stream (USB unplugged,
|
|
# PulseAudio crashed, etc.). Abort entire
|
|
# clip — DO NOT let the outer loop respawn
|
|
# paplay; we just got hundreds of
|
|
# broken-pipe lines as a result of that bug.
|
|
try:
|
|
err = (proc.stderr.read() or b"").decode(
|
|
"utf-8", "replace").strip()[:400]
|
|
except Exception:
|
|
err = ""
|
|
log.warning("paplay died mid-stream (%s) "
|
|
"device=%s stderr=%s",
|
|
exc, sink_name or "default", err)
|
|
fatal_exc = _PulseOpenFailed(
|
|
f"paplay died: {err or exc}")
|
|
break
|
|
byte_pos = end
|
|
local_pos = byte_pos // bytes_per_sample
|
|
finally:
|
|
try:
|
|
proc.stdin.close()
|
|
except Exception:
|
|
pass
|
|
if interrupted or fatal_exc is not None:
|
|
proc.terminate()
|
|
try:
|
|
rc = proc.wait(timeout=3.0)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
rc = -1
|
|
if rc != 0 and not interrupted and fatal_exc is None:
|
|
# Drained successfully but paplay exited non-zero
|
|
# — surface stderr so the failure isn't silent.
|
|
try:
|
|
err = (proc.stderr.read() or b"").decode(
|
|
"utf-8", "replace").strip()[:300]
|
|
except Exception:
|
|
err = ""
|
|
log.warning("paplay exit rc=%d device=%s err=%s",
|
|
rc, sink_name or "default", err)
|
|
|
|
if fatal_exc is not None:
|
|
# Re-raise OUTSIDE the inner try/finally so play_wav
|
|
# catches it and falls back to G1 DDS chest. Without
|
|
# this, the outer `while True` loop would respawn
|
|
# paplay and we'd loop indefinitely.
|
|
with self._play_state_lock:
|
|
self._play_state = None
|
|
raise fatal_exc
|
|
|
|
if not interrupted:
|
|
with self._play_state_lock:
|
|
if self._play_state is not None:
|
|
self._play_state["pos"] = total_samples
|
|
break
|
|
finally:
|
|
with self._play_state_lock:
|
|
self._play_state = None
|
|
|
|
def _play_pcm_via_pulse(self, pcm_bytes: bytes, channels: int,
|
|
sample_rate: int, sample_width: int,
|
|
record_name: str | None = None) -> None:
|
|
"""Play int16 PCM via PyAudio (→ PulseAudio default sink) with
|
|
pause / resume / stop support.
|
|
|
|
Mirrors `_play_pcm_via_g1`'s state-poll pattern so the dashboard's
|
|
Play / Pause / Stop / Position buttons behave identically whether
|
|
the active profile uses DDS or PyAudio. Writes ~100 ms chunks so
|
|
pause / stop latency is bounded.
|
|
"""
|
|
# Make sure pactl defaults reflect the current selection — this is
|
|
# a no-op when the watcher or dashboard Apply already aligned them
|
|
# (throttled so the multi-shell pactl scan doesn't run per clip).
|
|
self.ensure_audio_defaults()
|
|
|
|
# Resample to a USB-native rate before opening the stream.
|
|
# PortAudio's ALSA backend (the one PyAudio uses) opens the underlying
|
|
# hardware via the ALSA 'pulse' plugin, which on this Jetson does
|
|
# NOT advertise rate conversion in `snd_pcm_hw_params` — so opening
|
|
# at the WAV's native rate (24kHz from Gemini TTS, etc.) gets
|
|
# rejected with paInvalidSampleRate. Resampling app-side mirrors
|
|
# what `_play_pcm_via_g1` already does for the DDS path. Anker
|
|
# PowerConf and most USB UAC1 cards report 48kHz s16le stereo
|
|
# natively, so target that.
|
|
if _HAS_NUMPY and sample_width == 2 and sample_rate != self._PULSE_TARGET_RATE:
|
|
try:
|
|
pcm_bytes = self._resample_pcm16(
|
|
pcm_bytes, channels, sample_rate, self._PULSE_TARGET_RATE,
|
|
)
|
|
log.info("_play_pcm_via_pulse: resampled %dHz → %dHz "
|
|
"(USB card native rate)",
|
|
sample_rate, self._PULSE_TARGET_RATE)
|
|
sample_rate = self._PULSE_TARGET_RATE
|
|
except Exception as exc:
|
|
log.warning("_play_pcm_via_pulse: resample failed (%s) — "
|
|
"trying native rate, may hit paInvalidSampleRate",
|
|
exc)
|
|
|
|
bytes_per_sample = max(1, channels * sample_width)
|
|
total_bytes = len(pcm_bytes) - (len(pcm_bytes) % bytes_per_sample)
|
|
total_samples = total_bytes // bytes_per_sample
|
|
chunk_bytes = max(bytes_per_sample, (sample_rate // 10) * bytes_per_sample)
|
|
|
|
with self._play_state_lock:
|
|
self._play_state = {
|
|
"record_name": record_name,
|
|
"rate": sample_rate,
|
|
"total_samples": total_samples,
|
|
"pos": 0,
|
|
"paused": False,
|
|
"stop": False,
|
|
"play_started_at": 0.0,
|
|
"play_started_pos": 0,
|
|
}
|
|
|
|
# play_lock serialises overlapping play_wav() calls; pause/resume/stop
|
|
# only touch _play_state under _play_state_lock so they don't block.
|
|
with self.play_lock:
|
|
try:
|
|
while True:
|
|
# Snapshot — decide whether to play, wait, or exit
|
|
with self._play_state_lock:
|
|
st = self._play_state
|
|
if st is None or st["stop"]:
|
|
break
|
|
if st["paused"]:
|
|
paused_now = True
|
|
pos = 0
|
|
else:
|
|
paused_now = False
|
|
pos = st["pos"]
|
|
if pos >= total_samples:
|
|
break
|
|
st["play_started_pos"] = pos
|
|
st["play_started_at"] = time.time()
|
|
if paused_now:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
byte_pos = pos * bytes_per_sample
|
|
local_pos = pos
|
|
try:
|
|
stream = self.pya.open(
|
|
format=self.pya.get_format_from_width(sample_width),
|
|
channels=channels,
|
|
rate=sample_rate,
|
|
output=True,
|
|
output_device_index=self._pulse_device_index(),
|
|
frames_per_buffer=CHUNK_SIZE,
|
|
)
|
|
except Exception as exc:
|
|
# PortAudio open failed (sink gone, paBadIODevice
|
|
# combination, etc.). Signal the caller so play_wav
|
|
# can fall back to DDS chest rather than silently
|
|
# dropping the clip.
|
|
log.warning("Pulse playback open failed: %s — "
|
|
"signalling caller for DDS fallback", exc)
|
|
with self._play_state_lock:
|
|
self._play_state = None
|
|
raise _PulseOpenFailed(str(exc)) from exc
|
|
interrupted = False
|
|
try:
|
|
while byte_pos < total_bytes:
|
|
with self._play_state_lock:
|
|
ps = self._play_state
|
|
if ps is None or ps["stop"]:
|
|
interrupted = True
|
|
break
|
|
if ps["paused"]:
|
|
ps["pos"] = local_pos
|
|
interrupted = True
|
|
break
|
|
end = min(byte_pos + chunk_bytes, total_bytes)
|
|
try:
|
|
stream.write(pcm_bytes[byte_pos:end])
|
|
except Exception as exc:
|
|
log.warning("Pulse playback write failed: %s", exc)
|
|
interrupted = True
|
|
break
|
|
byte_pos = end
|
|
local_pos = byte_pos // bytes_per_sample
|
|
finally:
|
|
try:
|
|
stream.stop_stream()
|
|
stream.close()
|
|
except Exception:
|
|
pass
|
|
|
|
if not interrupted:
|
|
with self._play_state_lock:
|
|
if self._play_state is not None:
|
|
self._play_state["pos"] = total_samples
|
|
break
|
|
# Interrupted by pause → outer loop will wait for resume
|
|
# or exit on stop. Interrupted by stop → outer loop exits.
|
|
finally:
|
|
with self._play_state_lock:
|
|
self._play_state = None
|
|
|
|
# -- recording --
|
|
|
|
def record_mic(self, duration_sec: float) -> bytes:
|
|
"""Record from the resolved mic for *duration_sec* seconds, return raw PCM."""
|
|
self.ensure_audio_defaults()
|
|
# Capture through PortAudio's 'pulse' device so we read the resolved
|
|
# default source — input_device_index=None defaults to the silent
|
|
# hw:0 platform-sound card on this Jetson's conda PyAudio.
|
|
stream = self.pya.open(
|
|
format=FORMAT,
|
|
channels=CHANNELS,
|
|
rate=RECEIVE_SAMPLE_RATE,
|
|
input=True,
|
|
input_device_index=self._pulse_device_index(),
|
|
frames_per_buffer=CHUNK_SIZE,
|
|
)
|
|
frames: list[bytes] = []
|
|
total_chunks = int(RECEIVE_SAMPLE_RATE / CHUNK_SIZE * duration_sec)
|
|
try:
|
|
for _ in range(total_chunks):
|
|
frames.append(stream.read(CHUNK_SIZE, exception_on_overflow=False))
|
|
finally:
|
|
stream.stop_stream()
|
|
stream.close()
|
|
return b"".join(frames)
|
|
|
|
def save_wav(self, pcm_bytes: bytes, path: Path, channels: int, sample_rate: int):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with wave.open(str(path), "wb") as wf:
|
|
wf.setnchannels(channels)
|
|
wf.setsampwidth(self.sample_width())
|
|
wf.setframerate(sample_rate)
|
|
wf.writeframes(pcm_bytes)
|