843 lines
33 KiB
Python
843 lines
33 KiB
Python
"""Audio device profiles + pactl detection + selection persistence.
|
||
|
||
Manages multiple audio device profiles (generic built-in, Hollyland wireless
|
||
mic + built-in speaker, Anker PowerConf) and lets the dashboard switch
|
||
between them at runtime. Selection is persisted to data/audio_device.json
|
||
so the choice survives restart.
|
||
|
||
Resolution policy:
|
||
1. User-selected profile (from data/audio_device.json) — if its sink/source
|
||
is currently plugged in, use it.
|
||
2. Auto-detected profile based on what is currently plugged in.
|
||
3. Built-in fallback.
|
||
|
||
Each profile has:
|
||
- id: short identifier
|
||
- label: human-readable name
|
||
- match: substring used to find the actual pactl name (since exact names
|
||
contain serial numbers and may differ between machines)
|
||
- sink_pattern: substring matched against pactl sink names
|
||
- source_pattern: substring matched against pactl source names
|
||
- sample_rate / channels (optional defaults — read by AudioManager)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import subprocess
|
||
import tempfile
|
||
import threading
|
||
from dataclasses import dataclass, asdict
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from Project.Sanad.config import DATA_DIR
|
||
from Project.Sanad.core.logger import get_logger
|
||
|
||
log = get_logger("audio_devices")
|
||
|
||
DEVICE_STATE_FILE = DATA_DIR / "audio_device.json"
|
||
_LOCK = threading.Lock()
|
||
|
||
|
||
@dataclass
|
||
class AudioProfile:
|
||
id: str
|
||
label: str
|
||
sink_pattern: str # substring used to find a sink
|
||
source_pattern: str # substring used to find a source
|
||
description: str = ""
|
||
sink_sample_rate: int = 0 # 0 = use device default
|
||
source_sample_rate: int = 0
|
||
|
||
|
||
# Built-in device profiles.
|
||
#
|
||
# MATCHING RULES:
|
||
# - Patterns are matched case-insensitively against the FULL PulseAudio name.
|
||
# - Multiple patterns per field: comma-separated → match ANY.
|
||
# - PulseAudio names change depending on the USB port, so we match the
|
||
# product-name portion only (not the serial or port suffix).
|
||
# - Order matters: the FIRST profile whose sink AND source both match
|
||
# becomes the auto-default when no explicit selection is saved.
|
||
#
|
||
# Example PulseAudio names:
|
||
# alsa_output.platform-sound.analog-stereo (built-in speaker)
|
||
# alsa_input.platform-sound.analog-stereo (built-in mic)
|
||
# alsa_output.usb-Anker_PowerConf_A3321-DEV-SN1-01.analog-stereo (Anker speaker — SN1-01 is port-dependent)
|
||
# alsa_input.usb-Anker_PowerConf_A3321-DEV-SN1-01.mono-fallback (Anker mic)
|
||
# alsa_input.usb-Shenzhen_Hollyland_Technology_Co._Ltd_Wireless_microphone_C63X223T6MX-01.analog-stereo
|
||
# (Hollyland mic — C63X... is serial-dependent)
|
||
|
||
PROFILES: list[AudioProfile] = [
|
||
AudioProfile(
|
||
id="builtin",
|
||
label="Built-in mic + speaker",
|
||
sink_pattern="platform-sound",
|
||
source_pattern="alsa_input.platform-sound",
|
||
description="Jetson / G1 built-in audio chip. (Default)",
|
||
),
|
||
AudioProfile(
|
||
id="hollyland_builtin",
|
||
label="Hollyland mic + built-in speaker",
|
||
sink_pattern="platform-sound",
|
||
source_pattern="hollyland,wireless_microphone",
|
||
description="Hollyland wireless lavalier microphone with the Jetson built-in speaker.",
|
||
),
|
||
AudioProfile(
|
||
id="anker_powerconf",
|
||
label="Anker PowerConf (mic + speaker)",
|
||
sink_pattern="powerconf,anker",
|
||
source_pattern="powerconf,anker",
|
||
description="Anker PowerConf USB conference unit — mic + speaker on the same device.",
|
||
),
|
||
AudioProfile(
|
||
id="jbl_builtin_mic",
|
||
label="JBL speaker + built-in mic",
|
||
# The JBL connects over Bluetooth → its PulseAudio sink is a bluez sink
|
||
# (name is MAC-based, e.g. bluez_output.XX_XX_…). Match "jbl" or "bluez".
|
||
sink_pattern="jbl,bluez",
|
||
# The JBL has NO microphone → input stays on the G1 built-in mic.
|
||
source_pattern="alsa_input.platform-sound",
|
||
description="JBL Bluetooth speaker for output + the G1 built-in microphone for input (the JBL has no mic).",
|
||
),
|
||
]
|
||
|
||
# The profile that should be used when no saved state and no auto-detect succeeds.
|
||
DEFAULT_PROFILE_ID = "builtin"
|
||
|
||
PROFILES_BY_ID: dict[str, AudioProfile] = {p.id: p for p in PROFILES}
|
||
|
||
|
||
# ───────────────────────── pactl helpers ─────────────────────────
|
||
|
||
def _run_pactl(args: list[str], timeout: float = 1.0) -> subprocess.CompletedProcess[str]:
|
||
return subprocess.run(
|
||
["pactl", *args],
|
||
check=False,
|
||
text=True,
|
||
capture_output=True,
|
||
timeout=timeout,
|
||
)
|
||
|
||
|
||
def pactl_available() -> bool:
|
||
try:
|
||
r = _run_pactl(["info"])
|
||
return r.returncode == 0
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
return False
|
||
|
||
|
||
def list_sinks() -> list[dict[str, str]]:
|
||
"""Return [{name, description, index}] for every sink."""
|
||
return _list_kind("sinks")
|
||
|
||
|
||
def list_sources() -> list[dict[str, str]]:
|
||
return _list_kind("sources")
|
||
|
||
|
||
def _list_kind(kind: str) -> list[dict[str, str]]:
|
||
out: list[dict[str, str]] = []
|
||
try:
|
||
short = _run_pactl(["list", "short", kind])
|
||
except (FileNotFoundError, subprocess.SubprocessError) as exc:
|
||
log.warning("pactl list %s failed: %s", kind, exc)
|
||
return out
|
||
if short.returncode != 0:
|
||
return out
|
||
for raw in (short.stdout or "").splitlines():
|
||
parts = raw.split("\t")
|
||
if len(parts) < 2:
|
||
parts = raw.split()
|
||
if len(parts) < 2:
|
||
continue
|
||
idx, name = parts[0], parts[1]
|
||
out.append({"index": idx, "name": name, "description": _description_for(kind, name)})
|
||
return out
|
||
|
||
|
||
def _description_for(kind: str, name: str) -> str:
|
||
"""Best-effort `pactl list <kind>s` to extract Description."""
|
||
try:
|
||
r = _run_pactl(["list", kind])
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
return ""
|
||
if r.returncode != 0:
|
||
return ""
|
||
block: list[str] = []
|
||
found = False
|
||
for line in (r.stdout or "").splitlines():
|
||
if line.startswith(("Sink #", "Source #")):
|
||
if found:
|
||
break
|
||
block = []
|
||
elif line.strip().startswith("Name:") and line.strip().endswith(name):
|
||
found = True
|
||
block.append(line)
|
||
if not found:
|
||
return ""
|
||
for line in block:
|
||
s = line.strip()
|
||
if s.startswith("Description:"):
|
||
return s.split(":", 1)[1].strip()
|
||
return ""
|
||
|
||
|
||
def get_default_sink() -> str:
|
||
try:
|
||
r = _run_pactl(["get-default-sink"])
|
||
return (r.stdout or "").strip() if r.returncode == 0 else ""
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
return ""
|
||
|
||
|
||
def get_default_source() -> str:
|
||
try:
|
||
r = _run_pactl(["get-default-source"])
|
||
return (r.stdout or "").strip() if r.returncode == 0 else ""
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
return ""
|
||
|
||
|
||
def set_default_sink(name: str) -> bool:
|
||
try:
|
||
r = _run_pactl(["set-default-sink", name])
|
||
return r.returncode == 0
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
return False
|
||
|
||
|
||
def set_default_source(name: str) -> bool:
|
||
try:
|
||
r = _run_pactl(["set-default-source", name])
|
||
return r.returncode == 0
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
return False
|
||
|
||
|
||
# ───────────────────────── matching ─────────────────────────
|
||
|
||
def find_first_match(items: list[dict[str, str]], pattern: str,
|
||
exclude_monitors: bool = False) -> dict[str, str] | None:
|
||
"""Return first item whose name (case-insensitive) contains ANY of the
|
||
comma-separated patterns.
|
||
|
||
Example: pattern="powerconf,anker" matches any name containing
|
||
"powerconf" OR "anker" (case-insensitive).
|
||
|
||
If exclude_monitors=True, skip PulseAudio monitor sources (names ending
|
||
in ".monitor") so we don't accidentally pick a loopback instead of a real mic.
|
||
"""
|
||
if not pattern:
|
||
return None
|
||
needles = [p.strip().lower() for p in pattern.split(",") if p.strip()]
|
||
if not needles:
|
||
return None
|
||
for it in items:
|
||
name_lower = it["name"].lower()
|
||
if exclude_monitors and name_lower.endswith(".monitor"):
|
||
continue
|
||
for needle in needles:
|
||
if needle in name_lower:
|
||
return it
|
||
return None
|
||
|
||
|
||
# PyAudio fallback cache — avoid re-init'ing PyAudio on every poll
|
||
# (PyAudio init takes ~100 ms and the watcher polls at 1.5 s).
|
||
_PYAUDIO_CACHE: dict[str, Any] = {"ts": 0.0, "input_names": []}
|
||
_PYAUDIO_TTL_S = 2.0
|
||
|
||
|
||
def _pyaudio_input_names() -> list[str]:
|
||
"""Return lowercase names of all PyAudio input devices. Cached for ~2 s.
|
||
|
||
Used as a fallback in detect_plugged_profiles() when pactl can't see
|
||
a profile's source — some USB devices (notably the Anker PowerConf on
|
||
JetPack 5) only expose their mic via the raw ALSA layer, which PyAudio
|
||
can still open even when PulseAudio's card profile is output-only.
|
||
"""
|
||
import time as _t
|
||
now = _t.time()
|
||
if now - _PYAUDIO_CACHE["ts"] < _PYAUDIO_TTL_S:
|
||
return _PYAUDIO_CACHE["input_names"]
|
||
names: list[str] = []
|
||
try:
|
||
import pyaudio # type: ignore
|
||
pa = pyaudio.PyAudio()
|
||
try:
|
||
for i in range(pa.get_device_count()):
|
||
try:
|
||
info = pa.get_device_info_by_index(i)
|
||
except Exception:
|
||
continue
|
||
if info.get("maxInputChannels", 0) <= 0:
|
||
continue
|
||
names.append(str(info.get("name", "")).lower())
|
||
finally:
|
||
pa.terminate()
|
||
except Exception as exc:
|
||
log.debug("PyAudio enumeration unavailable: %s", exc)
|
||
_PYAUDIO_CACHE["ts"] = now
|
||
_PYAUDIO_CACHE["input_names"] = names
|
||
return names
|
||
|
||
|
||
def _pyaudio_input_matches(pattern: str) -> dict[str, str] | None:
|
||
"""If any PyAudio input device name matches one of the comma-separated
|
||
patterns, return a synthetic source dict (matches find_first_match()'s
|
||
shape). Else None.
|
||
"""
|
||
if not pattern:
|
||
return None
|
||
needles = [p.strip().lower() for p in pattern.split(",") if p.strip()]
|
||
if not needles:
|
||
return None
|
||
for name in _pyaudio_input_names():
|
||
if any(n in name for n in needles):
|
||
# Synthetic — mark the origin so logs / dashboards can see it
|
||
# came from PyAudio, not pactl. Includes `description` so any
|
||
# consumer that expects the same shape as a real pactl
|
||
# source dict (`{name, description, index}`) doesn't KeyError.
|
||
return {
|
||
"name": f"pyaudio:{name}",
|
||
"driver": "pyaudio",
|
||
"description": f"PyAudio fallback — {name}",
|
||
"index": "",
|
||
}
|
||
return None
|
||
|
||
|
||
# Per-card cooldown for ensure_card_input_capable so a card whose firmware
|
||
# truly doesn't expose input doesn't get hammered with set-card-profile
|
||
# calls on every detection poll (every 1.5s from the live-Gemini watcher).
|
||
_CARD_PROFILE_LAST_ATTEMPT: dict[str, float] = {}
|
||
_CARD_PROFILE_COOLDOWN_S = 30.0
|
||
|
||
|
||
def _parse_card_profiles(card_block: str) -> tuple[str, list[tuple[int, str, bool, bool]]]:
|
||
"""Parse the `Profiles:` section of a single card stanza from
|
||
`pactl list cards`. Returns (active_profile, [(priority, name,
|
||
has_sink, has_source), ...]) — only profiles marked
|
||
`available: yes` are included.
|
||
|
||
Profile lines look like:
|
||
\\toutput:analog-stereo+input:mono-fallback: Analog Stereo Output + Mono Input \\
|
||
(sinks: 1, sources: 1, priority: 6501, available: yes)
|
||
"""
|
||
active = ""
|
||
profiles: list[tuple[int, str, bool, bool]] = []
|
||
in_profiles = False
|
||
for raw in card_block.splitlines():
|
||
line = raw.rstrip()
|
||
stripped = line.strip()
|
||
if stripped.startswith("Active Profile:"):
|
||
active = stripped[len("Active Profile:"):].strip()
|
||
continue
|
||
if stripped == "Profiles:":
|
||
in_profiles = True
|
||
continue
|
||
if in_profiles:
|
||
# End of Profiles section: next top-level key starts with non-tab
|
||
# OR an empty line. The block ends when leading whitespace drops
|
||
# to a tab/spaces shallower than the profile lines — easiest
|
||
# check: stop when we hit "Ports:" or "Active Profile:".
|
||
if stripped.startswith("Ports:") or stripped.startswith("Active Profile:"):
|
||
in_profiles = False
|
||
if stripped.startswith("Active Profile:"):
|
||
active = stripped[len("Active Profile:"):].strip()
|
||
continue
|
||
# Profile line — must contain "(sinks: N, sources: M, priority: P, available: yes)"
|
||
paren = stripped.rfind(" (")
|
||
if paren < 0 or "available: yes" not in stripped:
|
||
continue
|
||
head = stripped[:paren]
|
||
sep = head.find(": ")
|
||
if sep < 0:
|
||
continue
|
||
name = head[:sep]
|
||
props = stripped[paren+2:].rstrip(")")
|
||
sinks_n = sources_n = priority = 0
|
||
for tok in props.split(","):
|
||
tok = tok.strip()
|
||
if tok.startswith("sinks: "):
|
||
try: sinks_n = int(tok[len("sinks: "):])
|
||
except ValueError: pass
|
||
elif tok.startswith("sources: "):
|
||
try: sources_n = int(tok[len("sources: "):])
|
||
except ValueError: pass
|
||
elif tok.startswith("priority: "):
|
||
try: priority = int(tok[len("priority: "):])
|
||
except ValueError: pass
|
||
profiles.append((priority, name, sinks_n > 0, sources_n > 0))
|
||
return active, profiles
|
||
|
||
|
||
def ensure_card_input_capable(card_pattern: str) -> bool:
|
||
"""If a PulseAudio card whose Name matches `card_pattern` is on an
|
||
output-only profile but has an input+output profile available, switch
|
||
to the highest-priority input+output profile.
|
||
|
||
Why: USB UAC1 conference devices (Anker PowerConf and similar) can get
|
||
pinned to an output-only profile by PulseAudio's `module-card-restore`
|
||
— observed on G1 / JetPack 5 after an earlier session left the card in
|
||
that state. The mic-capable profile is right there in the card's
|
||
advertised list (we see `output:analog-stereo+input:mono-fallback` with
|
||
`sinks: 1, sources: 1` and `available: yes`), but the active profile
|
||
is the output-only one. Calling `pactl set-card-profile` against an
|
||
already-advertised, available profile is SAFE — unlike the older
|
||
`module-alsa-source device=hw:N,0` hack, which had to guess the hw
|
||
index and broke things when it guessed wrong.
|
||
|
||
`card_pattern` is a comma-separated substring list (same semantics as
|
||
`find_first_match`). Returns True if a switch happened OR the card was
|
||
already input-capable; False if no matching card / no switchable
|
||
profile exists.
|
||
|
||
Rate-limited per-card via `_CARD_PROFILE_COOLDOWN_S` so a card whose
|
||
firmware genuinely can't do input doesn't get polled to death.
|
||
"""
|
||
if not pactl_available():
|
||
return False
|
||
needles = [p.strip().lower() for p in card_pattern.split(",") if p.strip()]
|
||
if not needles:
|
||
return False
|
||
try:
|
||
r = _run_pactl(["list", "cards"])
|
||
if r.returncode != 0:
|
||
return False
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
return False
|
||
|
||
# Split into per-card blocks. `pactl list cards` separates cards with a
|
||
# blank line (and starts each with "Card #N").
|
||
import time as _t
|
||
blocks: list[str] = []
|
||
current: list[str] = []
|
||
for line in (r.stdout or "").splitlines():
|
||
if line.startswith("Card #") and current:
|
||
blocks.append("\n".join(current))
|
||
current = []
|
||
current.append(line)
|
||
if current:
|
||
blocks.append("\n".join(current))
|
||
|
||
switched_any = False
|
||
for block in blocks:
|
||
# Extract card Name
|
||
card_name = ""
|
||
for line in block.splitlines():
|
||
s = line.strip()
|
||
if s.startswith("Name: "):
|
||
card_name = s[len("Name: "):].strip()
|
||
break
|
||
if not card_name:
|
||
continue
|
||
if not any(n in card_name.lower() for n in needles):
|
||
continue
|
||
|
||
active, profiles = _parse_card_profiles(block)
|
||
if not profiles:
|
||
continue
|
||
|
||
# If active profile already has input, nothing to do.
|
||
active_has_input = any(
|
||
name == active and has_src
|
||
for _, name, _, has_src in profiles
|
||
)
|
||
if active_has_input:
|
||
return True
|
||
|
||
# Find best input+output profile
|
||
candidates = [(prio, name) for prio, name, has_sink, has_src in profiles
|
||
if has_sink and has_src]
|
||
if not candidates:
|
||
# Card has no input+output profile (firmware truly output-only).
|
||
log.debug("ensure_card_input_capable: %s has no input+output "
|
||
"profile — nothing to switch to", card_name)
|
||
continue
|
||
|
||
now = _t.time()
|
||
last = _CARD_PROFILE_LAST_ATTEMPT.get(card_name, 0.0)
|
||
if (now - last) < _CARD_PROFILE_COOLDOWN_S:
|
||
continue
|
||
_CARD_PROFILE_LAST_ATTEMPT[card_name] = now
|
||
|
||
# Sort: highest priority first; on ties, alphabetical name asc so
|
||
# `mono-fallback` wins over `multichannel-input` (the source name
|
||
# `...mono-fallback` matches Sanad's hardcoded SOURCE patterns and
|
||
# the AI_Photographer setup that's known to work on this hardware).
|
||
candidates.sort(key=lambda x: (-x[0], x[1]))
|
||
target = candidates[0][1]
|
||
log.info("ensure_card_input_capable: %s active=%r → %r "
|
||
"(exposes mic to PulseAudio)",
|
||
card_name, active or "?", target)
|
||
try:
|
||
sr = _run_pactl(["set-card-profile", card_name, target])
|
||
if sr.returncode == 0:
|
||
switched_any = True
|
||
else:
|
||
log.warning("ensure_card_input_capable: set-card-profile "
|
||
"%s %r failed: %s", card_name, target,
|
||
(sr.stderr or "").strip())
|
||
except (FileNotFoundError, subprocess.SubprocessError) as exc:
|
||
log.warning("ensure_card_input_capable: pactl error: %s", exc)
|
||
return switched_any
|
||
|
||
|
||
def detect_plugged_profiles() -> list[dict[str, Any]]:
|
||
"""Return all profiles whose sink AND source are currently plugged in.
|
||
|
||
For each profile, the source is resolved in three passes:
|
||
1. pactl list short sources (standard path)
|
||
2. `ensure_card_input_capable` to unstick output-only PulseAudio
|
||
card profiles (Anker UAC1 quirk) — re-checks pactl sources
|
||
after the switch
|
||
3. PyAudio device list (fallback — see _pyaudio_input_matches docstring)
|
||
|
||
A profile counts as "plugged" if the sink matches via pactl AND a source
|
||
is found via ANY pass.
|
||
"""
|
||
sinks = list_sinks()
|
||
sources = list_sources()
|
||
detected: list[dict[str, Any]] = []
|
||
refreshed_sources = False
|
||
for prof in PROFILES:
|
||
sink = find_first_match(sinks, prof.sink_pattern)
|
||
if not sink:
|
||
continue
|
||
src = find_first_match(sources, prof.source_pattern, exclude_monitors=True)
|
||
via = "pactl"
|
||
if src is None:
|
||
# Try to unstick the card's PulseAudio profile (most common
|
||
# cause of "sink present, source missing" on Anker). Re-list
|
||
# sources once if any switch happened — and cache for the rest
|
||
# of this detection pass so we don't re-list per profile.
|
||
switched = ensure_card_input_capable(prof.sink_pattern)
|
||
if switched and not refreshed_sources:
|
||
sources = list_sources()
|
||
refreshed_sources = True
|
||
src = find_first_match(sources, prof.source_pattern,
|
||
exclude_monitors=True)
|
||
if src is not None:
|
||
via = "pactl-after-profile-switch"
|
||
log.info("detect_plugged_profiles: %s source appeared "
|
||
"after card-profile switch: %s",
|
||
prof.id, src.get("name", "?"))
|
||
if src is None:
|
||
# Last resort — PyAudio may still see the mic.
|
||
src = _pyaudio_input_matches(prof.source_pattern)
|
||
if src is not None:
|
||
via = "pyaudio"
|
||
log.info("detect_plugged_profiles: %s source resolved via "
|
||
"PyAudio fallback (pactl missed it): %s",
|
||
prof.id, src.get("name", "?"))
|
||
if sink and src:
|
||
detected.append({
|
||
"profile": asdict(prof),
|
||
"sink": sink,
|
||
"source": src,
|
||
"source_via": via,
|
||
})
|
||
return detected
|
||
|
||
|
||
# ───────────────────────── persistence ─────────────────────────
|
||
|
||
def load_state() -> dict[str, Any]:
|
||
"""Load saved selection. Always returns a dict."""
|
||
if not DEVICE_STATE_FILE.exists():
|
||
return {}
|
||
try:
|
||
with open(DEVICE_STATE_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError) as exc:
|
||
log.warning("audio_device.json unreadable: %s", exc)
|
||
return {}
|
||
|
||
|
||
def save_state(state: dict[str, Any]) -> None:
|
||
"""Atomic write of audio_device.json."""
|
||
DEVICE_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with _LOCK:
|
||
fd, tmp = tempfile.mkstemp(
|
||
prefix=f".{DEVICE_STATE_FILE.name}.", suffix=".tmp",
|
||
dir=str(DEVICE_STATE_FILE.parent),
|
||
)
|
||
try:
|
||
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||
json.dump(state, f, indent=2)
|
||
os.replace(tmp, DEVICE_STATE_FILE)
|
||
except Exception:
|
||
try:
|
||
os.unlink(tmp)
|
||
except OSError:
|
||
pass
|
||
raise
|
||
|
||
|
||
# ───────────────────────── current selection ─────────────────────────
|
||
|
||
# Dedupe the "manual override stale" fall-through log. current_selection() is
|
||
# called every ~1.5s by the audio watcher, so logging every cycle spams the log
|
||
# whenever a saved device (e.g. an unplugged Anker) stays absent. We log only
|
||
# when the stale state changes (and reset when the override becomes valid).
|
||
_LAST_STALE_LOG: Any = None
|
||
|
||
|
||
def current_selection() -> dict[str, Any]:
|
||
"""Resolve the currently active sink/source.
|
||
|
||
Order:
|
||
1. Saved profile selection (if its sink/source still plugged)
|
||
2. Saved explicit sink/source pair (ONLY if both still exist in pactl —
|
||
otherwise the saved manual override is stale and we fall through)
|
||
3. DEFAULT profile (builtin) if detected
|
||
4. First detected profile (in declaration order)
|
||
5. pactl defaults
|
||
6. Empty
|
||
"""
|
||
state = load_state()
|
||
|
||
# Detected profiles snapshot
|
||
detected = detect_plugged_profiles() if pactl_available() else []
|
||
detected_by_id = {d["profile"]["id"]: d for d in detected}
|
||
|
||
# 1. Saved profile preference
|
||
saved_profile = state.get("profile_id")
|
||
if saved_profile and saved_profile in detected_by_id:
|
||
d = detected_by_id[saved_profile]
|
||
return {
|
||
"source_kind": "profile",
|
||
"profile": d["profile"],
|
||
"sink": d["sink"].get("name", ""),
|
||
"source": d["source"].get("name", ""),
|
||
"sink_description": d["sink"].get("description", ""),
|
||
"source_description": d["source"].get("description", ""),
|
||
}
|
||
|
||
# 2. Saved explicit sink/source — but VERIFY both names still exist in
|
||
# pactl before returning them. Without this check, unplugging a device
|
||
# (e.g. the Anker) leaves the dashboard showing the dead manual override
|
||
# forever, play_wav routes to a non-existent sink, PortAudio raises
|
||
# paBadIODeviceCombination, and the user gets no audio. Falling through
|
||
# here lets steps 3–5 re-resolve to whatever's actually plugged in
|
||
# (typically the builtin/chest), no user click required.
|
||
saved_sink = (state.get("sink") or "").strip()
|
||
saved_source = (state.get("source") or "").strip()
|
||
if saved_sink and saved_source:
|
||
sink_names = {s["name"] for s in (list_sinks() if pactl_available() else [])}
|
||
source_names = {
|
||
s["name"] for s in (list_sources() if pactl_available() else [])
|
||
}
|
||
global _LAST_STALE_LOG
|
||
if saved_sink in sink_names and saved_source in source_names:
|
||
_LAST_STALE_LOG = None # override valid again — re-arm the log
|
||
return {
|
||
"source_kind": "manual",
|
||
"profile": None,
|
||
"sink": saved_sink,
|
||
"source": saved_source,
|
||
"sink_description": "",
|
||
"source_description": "",
|
||
}
|
||
# Benign expected state (a saved device is simply unplugged) — and this
|
||
# is hit by every status poll, possibly from more than one process, so a
|
||
# module cache can't fully suppress it. Log at DEBUG (off the INFO log),
|
||
# and at INFO only ONCE when the stale state first changes, so an
|
||
# operator still gets a single breadcrumb without the 30s spam.
|
||
_stale_key = (saved_sink, saved_sink in sink_names,
|
||
saved_source, saved_source in source_names)
|
||
_first = _stale_key != _LAST_STALE_LOG
|
||
_LAST_STALE_LOG = _stale_key
|
||
(log.info if _first else log.debug)(
|
||
"current_selection: manual override stale (sink=%s present=%s, "
|
||
"source=%s present=%s) — falling through to auto-detect",
|
||
saved_sink, saved_sink in sink_names,
|
||
saved_source, saved_source in source_names,
|
||
)
|
||
|
||
# 3. Default profile if it is plugged in
|
||
if DEFAULT_PROFILE_ID in detected_by_id:
|
||
d = detected_by_id[DEFAULT_PROFILE_ID]
|
||
return {
|
||
"source_kind": "default",
|
||
"profile": d["profile"],
|
||
"sink": d["sink"].get("name", ""),
|
||
"source": d["source"].get("name", ""),
|
||
"sink_description": d["sink"].get("description", ""),
|
||
"source_description": d["source"].get("description", ""),
|
||
}
|
||
|
||
# 4. First detected profile (in declaration order)
|
||
if detected:
|
||
d = detected[0]
|
||
return {
|
||
"source_kind": "auto",
|
||
"profile": d["profile"],
|
||
"sink": d["sink"].get("name", ""),
|
||
"source": d["source"].get("name", ""),
|
||
"sink_description": d["sink"].get("description", ""),
|
||
"source_description": d["source"].get("description", ""),
|
||
}
|
||
|
||
# 5. pactl defaults (system-wide)
|
||
sink = get_default_sink()
|
||
source = get_default_source()
|
||
if sink and source:
|
||
return {
|
||
"source_kind": "pactl_default",
|
||
"profile": None,
|
||
"sink": sink,
|
||
"source": source,
|
||
"sink_description": "",
|
||
"source_description": "",
|
||
}
|
||
|
||
# 6. Empty
|
||
return {
|
||
"source_kind": "none",
|
||
"profile": None,
|
||
"sink": "",
|
||
"source": "",
|
||
"sink_description": "",
|
||
"source_description": "",
|
||
}
|
||
|
||
|
||
# ───────────────────────── apply selection ─────────────────────────
|
||
|
||
def apply_selection(sink: str, source: str) -> dict[str, Any]:
|
||
"""Run pactl set-default-* and unmute. Returns {ok, errors}.
|
||
|
||
A source name starting with `pyaudio:` is the synthetic marker emitted
|
||
by detect_plugged_profiles() when the source was resolved only via the
|
||
PyAudio fallback (PulseAudio doesn't expose the mic, but PortAudio can
|
||
open it directly via raw ALSA). We can't `pactl set-default-source` on
|
||
a synthetic name — pactl would error. So in that case we set only the
|
||
sink and skip the source; Sanad's live mic path uses PortAudio direct
|
||
via AnkerMic's substring index lookup and doesn't depend on the pactl
|
||
default source. Dashboard playback (audio_manager.play_wav) records
|
||
from whatever pactl considers default — that stays on the boot mic
|
||
until the recovery script (Path B) fully exposes Anker in PulseAudio.
|
||
"""
|
||
errors: list[str] = []
|
||
if sink:
|
||
if not set_default_sink(sink):
|
||
errors.append(f"set-default-sink failed: {sink}")
|
||
else:
|
||
try:
|
||
_run_pactl(["set-sink-mute", sink, "0"])
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
pass
|
||
if source:
|
||
if source.startswith("pyaudio:"):
|
||
log.info("apply_selection: source is PyAudio-direct (%s) — "
|
||
"skipping pactl set-default-source. Live mic path "
|
||
"uses PortAudio device match; pactl defaults stay put.",
|
||
source)
|
||
elif not set_default_source(source):
|
||
errors.append(f"set-default-source failed: {source}")
|
||
else:
|
||
try:
|
||
_run_pactl(["set-source-mute", source, "0"])
|
||
except (FileNotFoundError, subprocess.SubprocessError):
|
||
pass
|
||
return {"ok": not errors, "errors": errors}
|
||
|
||
|
||
def apply_current_selection() -> dict[str, Any]:
|
||
"""Resolve the current device selection (re-scanning all USB ports) and
|
||
apply it via pactl. Called at AudioManager startup and when devices change.
|
||
|
||
This is the key function that makes audio work regardless of which USB
|
||
port the device is plugged into — it re-discovers on every call.
|
||
"""
|
||
if not pactl_available():
|
||
return {"ok": False, "error": "pactl not available"}
|
||
cur = current_selection()
|
||
sink = cur.get("sink", "")
|
||
source = cur.get("source", "")
|
||
if not sink and not source:
|
||
return {"ok": False, "error": "no device resolved", "selection": cur}
|
||
result = apply_selection(sink, source)
|
||
result["selection"] = cur
|
||
if result["ok"]:
|
||
log.info("Audio applied — sink=%s source=%s (via %s)",
|
||
sink, source, cur.get("source_kind", "?"))
|
||
else:
|
||
log.warning("Audio apply partial — sink=%s source=%s errors=%s",
|
||
sink, source, result["errors"])
|
||
return result
|
||
|
||
|
||
def select_profile(profile_id: str) -> dict[str, Any]:
|
||
"""Switch to a named profile. Persists selection."""
|
||
if profile_id not in PROFILES_BY_ID:
|
||
return {"ok": False, "error": f"Unknown profile: {profile_id}"}
|
||
|
||
detected = detect_plugged_profiles()
|
||
detected_by_id = {d["profile"]["id"]: d for d in detected}
|
||
if profile_id not in detected_by_id:
|
||
return {
|
||
"ok": False,
|
||
"error": f"Profile '{profile_id}' is not currently plugged in",
|
||
"available": [d["profile"]["id"] for d in detected],
|
||
}
|
||
|
||
d = detected_by_id[profile_id]
|
||
sink_name = d["sink"]["name"]
|
||
source_name = d["source"]["name"]
|
||
|
||
apply_result = apply_selection(sink_name, source_name)
|
||
if not apply_result["ok"]:
|
||
return {"ok": False, "errors": apply_result["errors"]}
|
||
|
||
save_state({
|
||
"profile_id": profile_id,
|
||
"sink": sink_name,
|
||
"source": source_name,
|
||
})
|
||
log.info("Selected audio profile: %s (sink=%s, source=%s)", profile_id, sink_name, source_name)
|
||
return {
|
||
"ok": True,
|
||
"profile": d["profile"],
|
||
"sink": sink_name,
|
||
"source": source_name,
|
||
}
|
||
|
||
|
||
def select_manual(sink: str, source: str) -> dict[str, Any]:
|
||
"""Switch to an explicit sink/source pair (no profile)."""
|
||
apply_result = apply_selection(sink, source)
|
||
if not apply_result["ok"]:
|
||
return {"ok": False, "errors": apply_result["errors"]}
|
||
save_state({"profile_id": None, "sink": sink, "source": source})
|
||
log.info("Selected manual audio: sink=%s source=%s", sink, source)
|
||
return {"ok": True, "sink": sink, "source": source}
|
||
|
||
|
||
# ───────────────────────── status ─────────────────────────
|
||
|
||
def status() -> dict[str, Any]:
|
||
"""One-shot status for the dashboard."""
|
||
pa = pactl_available()
|
||
detected = detect_plugged_profiles() if pa else []
|
||
detected_ids = [d["profile"]["id"] for d in detected]
|
||
cur = current_selection()
|
||
return {
|
||
"pactl_available": pa,
|
||
"current": cur,
|
||
"saved_state": load_state(),
|
||
"profiles": [asdict(p) for p in PROFILES],
|
||
"detected": detected,
|
||
"detected_ids": detected_ids,
|
||
"all_sinks": list_sinks() if pa else [],
|
||
"all_sources": list_sources() if pa else [],
|
||
"default_sink": get_default_sink() if pa else "",
|
||
"default_source": get_default_source() if pa else "",
|
||
}
|