496 lines
17 KiB
Python
496 lines
17 KiB
Python
"""Audio device profiles + pactl detection + selection persistence.
|
|
|
|
Manages multiple audio device profiles (generic built-in, Hollyland wireless
|
|
mic + built-in speaker, Anker PowerConf) and lets the dashboard switch
|
|
between them at runtime. Selection is persisted to data/audio_device.json
|
|
so the choice survives restart.
|
|
|
|
Resolution policy:
|
|
1. User-selected profile (from data/audio_device.json) — if its sink/source
|
|
is currently plugged in, use it.
|
|
2. Auto-detected profile based on what is currently plugged in.
|
|
3. Built-in fallback.
|
|
|
|
Each profile has:
|
|
- id: short identifier
|
|
- label: human-readable name
|
|
- match: substring used to find the actual pactl name (since exact names
|
|
contain serial numbers and may differ between machines)
|
|
- sink_pattern: substring matched against pactl sink names
|
|
- source_pattern: substring matched against pactl source names
|
|
- sample_rate / channels (optional defaults — read by AudioManager)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from Project.Sanad.config import DATA_DIR
|
|
from Project.Sanad.core.logger import get_logger
|
|
|
|
log = get_logger("audio_devices")
|
|
|
|
DEVICE_STATE_FILE = DATA_DIR / "audio_device.json"
|
|
_LOCK = threading.Lock()
|
|
|
|
|
|
@dataclass
|
|
class AudioProfile:
|
|
id: str
|
|
label: str
|
|
sink_pattern: str # substring used to find a sink
|
|
source_pattern: str # substring used to find a source
|
|
description: str = ""
|
|
sink_sample_rate: int = 0 # 0 = use device default
|
|
source_sample_rate: int = 0
|
|
|
|
|
|
# Built-in device profiles.
|
|
#
|
|
# MATCHING RULES:
|
|
# - Patterns are matched case-insensitively against the FULL PulseAudio name.
|
|
# - Multiple patterns per field: comma-separated → match ANY.
|
|
# - PulseAudio names change depending on the USB port, so we match the
|
|
# product-name portion only (not the serial or port suffix).
|
|
# - Order matters: the FIRST profile whose sink AND source both match
|
|
# becomes the auto-default when no explicit selection is saved.
|
|
#
|
|
# Example PulseAudio names:
|
|
# alsa_output.platform-sound.analog-stereo (built-in speaker)
|
|
# alsa_input.platform-sound.analog-stereo (built-in mic)
|
|
# alsa_output.usb-Anker_PowerConf_A3321-DEV-SN1-01.analog-stereo (Anker speaker — SN1-01 is port-dependent)
|
|
# alsa_input.usb-Anker_PowerConf_A3321-DEV-SN1-01.mono-fallback (Anker mic)
|
|
# alsa_input.usb-Shenzhen_Hollyland_Technology_Co._Ltd_Wireless_microphone_C63X223T6MX-01.analog-stereo
|
|
# (Hollyland mic — C63X... is serial-dependent)
|
|
|
|
PROFILES: list[AudioProfile] = [
|
|
AudioProfile(
|
|
id="builtin",
|
|
label="Built-in mic + speaker",
|
|
sink_pattern="platform-sound",
|
|
source_pattern="alsa_input.platform-sound",
|
|
description="Jetson / G1 built-in audio chip. (Default)",
|
|
),
|
|
AudioProfile(
|
|
id="hollyland_builtin",
|
|
label="Hollyland mic + built-in speaker",
|
|
sink_pattern="platform-sound",
|
|
source_pattern="hollyland,wireless_microphone",
|
|
description="Hollyland wireless lavalier microphone with the Jetson built-in speaker.",
|
|
),
|
|
AudioProfile(
|
|
id="anker_powerconf",
|
|
label="Anker PowerConf (mic + speaker)",
|
|
sink_pattern="powerconf,anker",
|
|
source_pattern="powerconf,anker",
|
|
description="Anker PowerConf USB conference unit — mic + speaker on the same device.",
|
|
),
|
|
]
|
|
|
|
# The profile that should be used when no saved state and no auto-detect succeeds.
|
|
DEFAULT_PROFILE_ID = "builtin"
|
|
|
|
PROFILES_BY_ID: dict[str, AudioProfile] = {p.id: p for p in PROFILES}
|
|
|
|
|
|
# ───────────────────────── pactl helpers ─────────────────────────
|
|
|
|
def _run_pactl(args: list[str], timeout: float = 1.0) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
["pactl", *args],
|
|
check=False,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
)
|
|
|
|
|
|
def pactl_available() -> bool:
|
|
try:
|
|
r = _run_pactl(["info"])
|
|
return r.returncode == 0
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
return False
|
|
|
|
|
|
def list_sinks() -> list[dict[str, str]]:
|
|
"""Return [{name, description, index}] for every sink."""
|
|
return _list_kind("sinks")
|
|
|
|
|
|
def list_sources() -> list[dict[str, str]]:
|
|
return _list_kind("sources")
|
|
|
|
|
|
def _list_kind(kind: str) -> list[dict[str, str]]:
|
|
out: list[dict[str, str]] = []
|
|
try:
|
|
short = _run_pactl(["list", "short", kind])
|
|
except (FileNotFoundError, subprocess.SubprocessError) as exc:
|
|
log.warning("pactl list %s failed: %s", kind, exc)
|
|
return out
|
|
if short.returncode != 0:
|
|
return out
|
|
for raw in (short.stdout or "").splitlines():
|
|
parts = raw.split("\t")
|
|
if len(parts) < 2:
|
|
parts = raw.split()
|
|
if len(parts) < 2:
|
|
continue
|
|
idx, name = parts[0], parts[1]
|
|
out.append({"index": idx, "name": name, "description": _description_for(kind, name)})
|
|
return out
|
|
|
|
|
|
def _description_for(kind: str, name: str) -> str:
|
|
"""Best-effort `pactl list <kind>s` to extract Description."""
|
|
try:
|
|
r = _run_pactl(["list", kind])
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
return ""
|
|
if r.returncode != 0:
|
|
return ""
|
|
block: list[str] = []
|
|
found = False
|
|
for line in (r.stdout or "").splitlines():
|
|
if line.startswith(("Sink #", "Source #")):
|
|
if found:
|
|
break
|
|
block = []
|
|
elif line.strip().startswith("Name:") and line.strip().endswith(name):
|
|
found = True
|
|
block.append(line)
|
|
if not found:
|
|
return ""
|
|
for line in block:
|
|
s = line.strip()
|
|
if s.startswith("Description:"):
|
|
return s.split(":", 1)[1].strip()
|
|
return ""
|
|
|
|
|
|
def get_default_sink() -> str:
|
|
try:
|
|
r = _run_pactl(["get-default-sink"])
|
|
return (r.stdout or "").strip() if r.returncode == 0 else ""
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
return ""
|
|
|
|
|
|
def get_default_source() -> str:
|
|
try:
|
|
r = _run_pactl(["get-default-source"])
|
|
return (r.stdout or "").strip() if r.returncode == 0 else ""
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
return ""
|
|
|
|
|
|
def set_default_sink(name: str) -> bool:
|
|
try:
|
|
r = _run_pactl(["set-default-sink", name])
|
|
return r.returncode == 0
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
return False
|
|
|
|
|
|
def set_default_source(name: str) -> bool:
|
|
try:
|
|
r = _run_pactl(["set-default-source", name])
|
|
return r.returncode == 0
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
return False
|
|
|
|
|
|
# ───────────────────────── matching ─────────────────────────
|
|
|
|
def find_first_match(items: list[dict[str, str]], pattern: str,
|
|
exclude_monitors: bool = False) -> dict[str, str] | None:
|
|
"""Return first item whose name (case-insensitive) contains ANY of the
|
|
comma-separated patterns.
|
|
|
|
Example: pattern="powerconf,anker" matches any name containing
|
|
"powerconf" OR "anker" (case-insensitive).
|
|
|
|
If exclude_monitors=True, skip PulseAudio monitor sources (names ending
|
|
in ".monitor") so we don't accidentally pick a loopback instead of a real mic.
|
|
"""
|
|
if not pattern:
|
|
return None
|
|
needles = [p.strip().lower() for p in pattern.split(",") if p.strip()]
|
|
if not needles:
|
|
return None
|
|
for it in items:
|
|
name_lower = it["name"].lower()
|
|
if exclude_monitors and name_lower.endswith(".monitor"):
|
|
continue
|
|
for needle in needles:
|
|
if needle in name_lower:
|
|
return it
|
|
return None
|
|
|
|
|
|
def detect_plugged_profiles() -> list[dict[str, Any]]:
|
|
"""Return all profiles whose sink AND source are currently plugged in."""
|
|
sinks = list_sinks()
|
|
sources = list_sources()
|
|
detected: list[dict[str, Any]] = []
|
|
for prof in PROFILES:
|
|
sink = find_first_match(sinks, prof.sink_pattern)
|
|
src = find_first_match(sources, prof.source_pattern, exclude_monitors=True)
|
|
if sink and src:
|
|
detected.append({
|
|
"profile": asdict(prof),
|
|
"sink": sink,
|
|
"source": src,
|
|
})
|
|
return detected
|
|
|
|
|
|
# ───────────────────────── persistence ─────────────────────────
|
|
|
|
def load_state() -> dict[str, Any]:
|
|
"""Load saved selection. Always returns a dict."""
|
|
if not DEVICE_STATE_FILE.exists():
|
|
return {}
|
|
try:
|
|
with open(DEVICE_STATE_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError) as exc:
|
|
log.warning("audio_device.json unreadable: %s", exc)
|
|
return {}
|
|
|
|
|
|
def save_state(state: dict[str, Any]) -> None:
|
|
"""Atomic write of audio_device.json."""
|
|
DEVICE_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with _LOCK:
|
|
fd, tmp = tempfile.mkstemp(
|
|
prefix=f".{DEVICE_STATE_FILE.name}.", suffix=".tmp",
|
|
dir=str(DEVICE_STATE_FILE.parent),
|
|
)
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
|
json.dump(state, f, indent=2)
|
|
os.replace(tmp, DEVICE_STATE_FILE)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
# ───────────────────────── current selection ─────────────────────────
|
|
|
|
def current_selection() -> dict[str, Any]:
|
|
"""Resolve the currently active sink/source.
|
|
|
|
Order:
|
|
1. Saved profile selection (if its sink/source still plugged)
|
|
2. Saved explicit sink/source pair
|
|
3. DEFAULT profile (builtin) if detected
|
|
4. First detected profile (in declaration order)
|
|
5. pactl defaults
|
|
6. Empty
|
|
"""
|
|
state = load_state()
|
|
|
|
# Detected profiles snapshot
|
|
detected = detect_plugged_profiles() if pactl_available() else []
|
|
detected_by_id = {d["profile"]["id"]: d for d in detected}
|
|
|
|
# 1. Saved profile preference
|
|
saved_profile = state.get("profile_id")
|
|
if saved_profile and saved_profile in detected_by_id:
|
|
d = detected_by_id[saved_profile]
|
|
return {
|
|
"source_kind": "profile",
|
|
"profile": d["profile"],
|
|
"sink": d["sink"]["name"],
|
|
"source": d["source"]["name"],
|
|
"sink_description": d["sink"]["description"],
|
|
"source_description": d["source"]["description"],
|
|
}
|
|
|
|
# 2. Saved explicit sink/source
|
|
if state.get("sink") and state.get("source"):
|
|
return {
|
|
"source_kind": "manual",
|
|
"profile": None,
|
|
"sink": state["sink"],
|
|
"source": state["source"],
|
|
"sink_description": "",
|
|
"source_description": "",
|
|
}
|
|
|
|
# 3. Default profile if it is plugged in
|
|
if DEFAULT_PROFILE_ID in detected_by_id:
|
|
d = detected_by_id[DEFAULT_PROFILE_ID]
|
|
return {
|
|
"source_kind": "default",
|
|
"profile": d["profile"],
|
|
"sink": d["sink"]["name"],
|
|
"source": d["source"]["name"],
|
|
"sink_description": d["sink"]["description"],
|
|
"source_description": d["source"]["description"],
|
|
}
|
|
|
|
# 4. First detected profile (in declaration order)
|
|
if detected:
|
|
d = detected[0]
|
|
return {
|
|
"source_kind": "auto",
|
|
"profile": d["profile"],
|
|
"sink": d["sink"]["name"],
|
|
"source": d["source"]["name"],
|
|
"sink_description": d["sink"]["description"],
|
|
"source_description": d["source"]["description"],
|
|
}
|
|
|
|
# 5. pactl defaults (system-wide)
|
|
sink = get_default_sink()
|
|
source = get_default_source()
|
|
if sink and source:
|
|
return {
|
|
"source_kind": "pactl_default",
|
|
"profile": None,
|
|
"sink": sink,
|
|
"source": source,
|
|
"sink_description": "",
|
|
"source_description": "",
|
|
}
|
|
|
|
# 6. Empty
|
|
return {
|
|
"source_kind": "none",
|
|
"profile": None,
|
|
"sink": "",
|
|
"source": "",
|
|
"sink_description": "",
|
|
"source_description": "",
|
|
}
|
|
|
|
|
|
# ───────────────────────── apply selection ─────────────────────────
|
|
|
|
def apply_selection(sink: str, source: str) -> dict[str, Any]:
|
|
"""Run pactl set-default-* and unmute. Returns {ok, errors}."""
|
|
errors: list[str] = []
|
|
if sink:
|
|
if not set_default_sink(sink):
|
|
errors.append(f"set-default-sink failed: {sink}")
|
|
else:
|
|
try:
|
|
_run_pactl(["set-sink-mute", sink, "0"])
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
pass
|
|
if source:
|
|
if not set_default_source(source):
|
|
errors.append(f"set-default-source failed: {source}")
|
|
else:
|
|
try:
|
|
_run_pactl(["set-source-mute", source, "0"])
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
pass
|
|
return {"ok": not errors, "errors": errors}
|
|
|
|
|
|
def apply_current_selection() -> dict[str, Any]:
|
|
"""Resolve the current device selection (re-scanning all USB ports) and
|
|
apply it via pactl. Called at AudioManager startup and when devices change.
|
|
|
|
This is the key function that makes audio work regardless of which USB
|
|
port the device is plugged into — it re-discovers on every call.
|
|
"""
|
|
if not pactl_available():
|
|
return {"ok": False, "error": "pactl not available"}
|
|
cur = current_selection()
|
|
sink = cur.get("sink", "")
|
|
source = cur.get("source", "")
|
|
if not sink and not source:
|
|
return {"ok": False, "error": "no device resolved", "selection": cur}
|
|
result = apply_selection(sink, source)
|
|
result["selection"] = cur
|
|
if result["ok"]:
|
|
log.info("Audio applied — sink=%s source=%s (via %s)",
|
|
sink, source, cur.get("source_kind", "?"))
|
|
else:
|
|
log.warning("Audio apply partial — sink=%s source=%s errors=%s",
|
|
sink, source, result["errors"])
|
|
return result
|
|
|
|
|
|
def select_profile(profile_id: str) -> dict[str, Any]:
|
|
"""Switch to a named profile. Persists selection."""
|
|
if profile_id not in PROFILES_BY_ID:
|
|
return {"ok": False, "error": f"Unknown profile: {profile_id}"}
|
|
|
|
detected = detect_plugged_profiles()
|
|
detected_by_id = {d["profile"]["id"]: d for d in detected}
|
|
if profile_id not in detected_by_id:
|
|
return {
|
|
"ok": False,
|
|
"error": f"Profile '{profile_id}' is not currently plugged in",
|
|
"available": [d["profile"]["id"] for d in detected],
|
|
}
|
|
|
|
d = detected_by_id[profile_id]
|
|
sink_name = d["sink"]["name"]
|
|
source_name = d["source"]["name"]
|
|
|
|
apply_result = apply_selection(sink_name, source_name)
|
|
if not apply_result["ok"]:
|
|
return {"ok": False, "errors": apply_result["errors"]}
|
|
|
|
save_state({
|
|
"profile_id": profile_id,
|
|
"sink": sink_name,
|
|
"source": source_name,
|
|
})
|
|
log.info("Selected audio profile: %s (sink=%s, source=%s)", profile_id, sink_name, source_name)
|
|
return {
|
|
"ok": True,
|
|
"profile": d["profile"],
|
|
"sink": sink_name,
|
|
"source": source_name,
|
|
}
|
|
|
|
|
|
def select_manual(sink: str, source: str) -> dict[str, Any]:
|
|
"""Switch to an explicit sink/source pair (no profile)."""
|
|
apply_result = apply_selection(sink, source)
|
|
if not apply_result["ok"]:
|
|
return {"ok": False, "errors": apply_result["errors"]}
|
|
save_state({"profile_id": None, "sink": sink, "source": source})
|
|
log.info("Selected manual audio: sink=%s source=%s", sink, source)
|
|
return {"ok": True, "sink": sink, "source": source}
|
|
|
|
|
|
# ───────────────────────── status ─────────────────────────
|
|
|
|
def status() -> dict[str, Any]:
|
|
"""One-shot status for the dashboard."""
|
|
pa = pactl_available()
|
|
detected = detect_plugged_profiles() if pa else []
|
|
detected_ids = [d["profile"]["id"] for d in detected]
|
|
cur = current_selection()
|
|
return {
|
|
"pactl_available": pa,
|
|
"current": cur,
|
|
"saved_state": load_state(),
|
|
"profiles": [asdict(p) for p in PROFILES],
|
|
"detected": detected,
|
|
"detected_ids": detected_ids,
|
|
"all_sinks": list_sinks() if pa else [],
|
|
"all_sources": list_sources() if pa else [],
|
|
"default_sink": get_default_sink() if pa else "",
|
|
"default_source": get_default_source() if pa else "",
|
|
}
|