Sanad/motion/macro_player.py

276 lines
9.8 KiB
Python

"""Macro Player — synchronized playback of audio + motion recordings.
Reads paired files:
recordings/audio/<name>.wav
recordings/motion/<name>.jsonl
Plays audio and streams joint commands simultaneously so the robot's
physical movements perfectly match the recorded speech timing.
"""
from __future__ import annotations
import json
import threading
import time
import wave
from pathlib import Path
from typing import Any
from Project.Sanad.config import AUDIO_RECORDINGS_DIR, MOTION_RECORDINGS_DIR, REPLAY_HZ
from Project.Sanad.core.config_loader import section as _cfg_section
from Project.Sanad.core.logger import get_logger
log = get_logger("macro_player")
_MP = _cfg_section("motion", "macro_player")
RAMP_IN_STEPS = _MP.get("ramp_in_steps", 60)
RAMP_OUT_STEPS = _MP.get("ramp_out_steps", 60)
WATCHDOG_DISABLE_AFTER = _MP.get("watchdog_disable_after_sec", 1.0)
def _lerp_q(a: list[float], b: list[float], t: float) -> list[float]:
return [x + (y - x) * t for x, y in zip(a, b)]
class _ArmAdapter:
"""Uniform interface over either the public arm API or its private members.
Hides the hasattr branching that previously polluted _play_motion. If the
arm controller exposes the new public methods (get_current_q, send_frame,
disable, state_age, wait_for_state) we use those; otherwise we fall back
to the private versions for backward compatibility.
"""
def __init__(self, arm):
self._arm = arm
def wait_for_state(self, timeout: float = 2.0) -> bool:
fn = getattr(self._arm, "wait_for_state", None)
if callable(fn):
return fn(timeout=timeout)
# No state-wait API: assume ready
return True
def get_current_q(self) -> list[float]:
fn = getattr(self._arm, "get_current_q", None)
if callable(fn):
return fn()
return self._arm._get_current_q()
def send_frame(self, target_q: list[float], body_lock_q: list[float]):
fn = getattr(self._arm, "send_frame", None)
if callable(fn):
return fn(target_q, body_lock_q)
return self._arm._send_frame(target_q, body_lock_q)
def disable(self):
fn = getattr(self._arm, "disable", None)
if callable(fn):
return fn()
return self._arm._disable_sdk()
def state_age(self) -> float:
fn = getattr(self._arm, "state_age", None)
if callable(fn):
return fn()
# No watchdog support: pretend state is fresh
return 0.0
class MacroPlayer:
def __init__(self, audio_manager=None, arm_controller=None):
self._audio_mgr = audio_manager
self._arm = arm_controller
self._lock = threading.Lock()
self._playing = False
self._stop_event = threading.Event()
@property
def is_playing(self) -> bool:
return self._playing
def play(self, name: str) -> dict[str, Any]:
audio_path = AUDIO_RECORDINGS_DIR / f"{name}.wav"
motion_path = MOTION_RECORDINGS_DIR / f"{name}.jsonl"
if not audio_path.exists():
raise FileNotFoundError(f"Audio not found: {audio_path}")
if not motion_path.exists():
raise FileNotFoundError(f"Motion not found: {motion_path}")
with self._lock:
if self._playing:
raise RuntimeError("Macro playback already in progress.")
self._playing = True
self._stop_event.clear()
t0 = time.monotonic()
audio_thread = threading.Thread(target=self._play_audio, args=(audio_path,), daemon=True)
motion_thread = threading.Thread(target=self._play_motion, args=(motion_path,), daemon=True)
log.info("Macro playback starting: %s", name)
audio_thread.start()
motion_thread.start()
audio_thread.join()
motion_thread.join()
elapsed = time.monotonic() - t0
with self._lock:
self._playing = False
log.info("Macro playback complete: %s (%.1fs)", name, elapsed)
return {"name": name, "duration_sec": round(elapsed, 2)}
def stop(self):
self._stop_event.set()
# Best-effort: stop audio playback if the manager exposes a stop method
if self._audio_mgr is not None and hasattr(self._audio_mgr, "stop_playback"):
try:
self._audio_mgr.stop_playback()
except Exception as exc:
log.warning("audio stop failed: %s", exc)
def _play_audio(self, path: Path):
if self._audio_mgr is None:
log.warning("No audio manager — skipping audio playback")
return
try:
self._audio_mgr.play_wav(path)
except Exception as exc:
log.error("Audio playback failed: %s", exc)
def _play_motion(self, path: Path):
"""Stream JSONL motion frames at recorded timing — with ramp-in, watchdog, ramp-out."""
frames = self._load_frames(path)
if not frames:
return
if self._arm is None:
# Simulated playback — just sleep through
duration = frames[-1].get("t", 0)
log.info("[SIM] MacroPlayer would play %d frames over %.1fs", len(frames), duration)
time.sleep(min(duration, 30.0))
return
adapter = _ArmAdapter(self._arm)
interval = 1.0 / REPLAY_HZ
# CRITICAL: wait for first LowState before reading current pose
if not adapter.wait_for_state(timeout=2.0):
log.error("MacroPlayer aborting — no LowState received in 2s")
return
try:
current_q = adapter.get_current_q()
except Exception:
log.exception("Failed to read current pose")
return
body_lock_q = list(current_q)
first_frame_q = frames[0]["q"]
# Phase 1: Ramp-in
if not self._ramp(adapter, current_q, first_frame_q, body_lock_q, RAMP_IN_STEPS, "ramp-in"):
return
# Phase 2: Stream recorded frames with timing + watchdog
last_q = self._stream_frames(adapter, frames, body_lock_q, interval) or first_frame_q
# Phase 3: Ramp-out back to starting pose
self._ramp(adapter, last_q, body_lock_q, body_lock_q, RAMP_OUT_STEPS, "ramp-out")
# Phase 4: Disable arm SDK
try:
adapter.disable()
except Exception:
log.exception("disable() failed")
def _load_frames(self, path: Path) -> list[dict]:
"""Read JSONL motion file. Returns list of frames or [] on failure."""
frames: list[dict] = []
try:
with open(path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
except json.JSONDecodeError as exc:
log.warning("Skipping bad line in %s: %s", path.name, exc)
continue
if "q" in data:
frames.append(data)
except OSError:
log.exception("Failed to read motion file %s", path)
if not frames:
log.warning("No usable frames in %s", path.name)
return frames
def _ramp(self, adapter: "_ArmAdapter", from_q: list[float], to_q: list[float],
body_lock_q: list[float], steps: int, label: str) -> bool:
"""Smoothly interpolate from `from_q` to `to_q` over `steps` frames.
Returns True on success, False if cancelled or send failed."""
log.info("MacroPlayer %s (%d steps)", label, steps)
interval = 1.0 / REPLAY_HZ
for step in range(steps):
if self._stop_event.is_set():
return False
t = (step + 1) / steps
interp = _lerp_q(from_q, to_q, t)
try:
adapter.send_frame(interp, body_lock_q)
except Exception:
log.exception("%s send_frame failed", label)
return False
time.sleep(interval)
return True
def _stream_frames(self, adapter: "_ArmAdapter", frames: list[dict],
body_lock_q: list[float], interval: float) -> list[float] | None:
"""Stream the recorded frames with watchdog. Returns the last successfully sent q."""
t0 = time.monotonic()
last_q: list[float] | None = None
for frame in frames:
if self._stop_event.is_set():
break
age = adapter.state_age()
if age > WATCHDOG_DISABLE_AFTER:
log.error("MacroPlayer watchdog abort — state stale %.2fs", age)
break
target_t = frame.get("t", 0)
elapsed = time.monotonic() - t0
sleep_time = target_t - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
try:
adapter.send_frame(frame["q"], body_lock_q)
last_q = frame["q"]
except Exception:
log.exception("send_frame failed mid-replay")
return last_q
def list_macros(self) -> list[dict[str, Any]]:
"""List available macro recordings (paired audio + motion)."""
macros = []
for audio_path in sorted(AUDIO_RECORDINGS_DIR.glob("*.wav")):
name = audio_path.stem
motion_path = MOTION_RECORDINGS_DIR / f"{name}.jsonl"
if motion_path.exists():
macros.append({
"name": name,
"audio_path": str(audio_path),
"motion_path": str(motion_path),
"audio_size_kb": round(audio_path.stat().st_size / 1024, 1),
"motion_size_kb": round(motion_path.stat().st_size / 1024, 1),
})
return macros
def status(self) -> dict[str, Any]:
return {"playing": self._playing}