Sanadv3/motion/macro_recorder.py

199 lines
7.1 KiB
Python

"""Macro Recorder — simultaneously captures audio + robot joint positions.
Produces a paired set of files:
recordings/audio/<name>.wav — microphone or Gemini output audio
recordings/motion/<name>.jsonl — timestamped joint positions
These can be replayed in sync via MacroPlayer.
"""
from __future__ import annotations
import json
import os
import tempfile
import threading
import time
import wave
from pathlib import Path
from typing import Any
from Project.Sanad.config import (
AUDIO_RECORDINGS_DIR,
CHANNELS,
CHUNK_SIZE,
MOTION_RECORDINGS_DIR,
RECEIVE_SAMPLE_RATE,
REPLAY_HZ,
)
from Project.Sanad.core.logger import get_logger
log = get_logger("macro_recorder")
class MacroRecorder:
"""Records audio + joint positions simultaneously."""
def __init__(self, arm_controller=None):
self._arm = arm_controller
self._lock = threading.Lock()
self._recording = False
self._audio_thread: threading.Thread | None = None
self._motion_thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._name = ""
self._audio_frames: list[bytes] = []
self._motion_frames: list[dict[str, Any]] = []
self._started_at = 0.0
@property
def is_recording(self) -> bool:
return self._recording
def start(self, name: str) -> dict[str, Any]:
with self._lock:
if self._recording:
raise RuntimeError("Already recording a macro.")
self._recording = True
self._name = name
self._stop_event.clear()
self._audio_frames = []
self._motion_frames = []
self._started_at = time.monotonic()
AUDIO_RECORDINGS_DIR.mkdir(parents=True, exist_ok=True)
MOTION_RECORDINGS_DIR.mkdir(parents=True, exist_ok=True)
self._audio_thread = threading.Thread(target=self._record_audio, daemon=True)
self._motion_thread = threading.Thread(target=self._record_motion, daemon=True)
self._audio_thread.start()
self._motion_thread.start()
log.info("Macro recording started: %s", name)
return {"recording": True, "name": name}
def stop(self) -> dict[str, Any]:
with self._lock:
if not self._recording:
raise RuntimeError("No macro recording in progress.")
self._stop_event.set()
if self._audio_thread:
self._audio_thread.join(timeout=3.0)
if self._motion_thread:
self._motion_thread.join(timeout=3.0)
audio_path = AUDIO_RECORDINGS_DIR / f"{self._name}.wav"
motion_path = MOTION_RECORDINGS_DIR / f"{self._name}.jsonl"
# A failed write must NOT leave _recording=True forever (which would
# wedge every future start()). Clear the busy flag in finally no matter
# what; write both files atomically (tempfile + os.replace) so a partial
# write can't surface a corrupt recording.
try:
# Save audio
pcm = b"".join(self._audio_frames)
tmp_audio = f"{audio_path}.tmp"
with wave.open(tmp_audio, "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(2) # int16
wf.setframerate(RECEIVE_SAMPLE_RATE)
wf.writeframes(pcm)
os.replace(tmp_audio, str(audio_path))
# Save motion
content_lines = [json.dumps({"meta": {"hz": REPLAY_HZ, "motors": 29}})]
for frame in self._motion_frames:
content_lines.append(json.dumps(frame))
content = ("\n".join(content_lines) + "\n").encode("utf-8")
fd, tmp_motion = tempfile.mkstemp(
prefix=f".{motion_path.name}.", suffix=".tmp",
dir=str(motion_path.parent),
)
try:
with os.fdopen(fd, "wb") as f:
f.write(content)
os.replace(tmp_motion, str(motion_path))
except Exception:
try:
os.unlink(tmp_motion)
except OSError:
pass
raise
finally:
with self._lock:
self._recording = False
duration = time.monotonic() - self._started_at
log.info("Macro saved: audio=%s motion=%s (%.1fs)", audio_path, motion_path, duration)
return {
"recording": False,
"name": self._name,
"audio_path": str(audio_path),
"motion_path": str(motion_path),
"duration_sec": round(duration, 2),
"audio_frames": len(self._audio_frames),
"motion_frames": len(self._motion_frames),
}
def _record_audio(self):
"""Capture mic audio in background thread."""
try:
import pyaudio
pya = pyaudio.PyAudio()
stream = pya.open(
format=pyaudio.paInt16,
channels=CHANNELS,
rate=RECEIVE_SAMPLE_RATE,
input=True,
frames_per_buffer=CHUNK_SIZE,
)
while not self._stop_event.is_set():
data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
self._audio_frames.append(data)
stream.stop_stream()
stream.close()
pya.terminate()
except Exception as exc:
log.error("Audio recording failed: %s", exc)
def _record_motion(self):
"""Capture joint positions at REPLAY_HZ."""
interval = 1.0 / REPLAY_HZ
# Wait for the first LowState before sampling real hardware, otherwise
# _get_current_q() returns the seed [0.0]*29 and we capture an all-zero
# macro that body-locks the arms to zero on replay. Only relevant when a
# live, initialized arm is present (sim path records zeros by design).
if self._arm is not None and getattr(self._arm, "_initialized", False):
wait = getattr(self._arm, "wait_for_state", None)
if callable(wait) and not wait(timeout=2.0):
log.error("Macro motion aborted — no LowState received in 2s")
return
t0 = time.monotonic()
while not self._stop_event.is_set():
t = round(time.monotonic() - t0, 4)
# Read current joint positions from arm controller
q = self._read_joint_positions()
self._motion_frames.append({"t": t, "q": q})
time.sleep(interval)
def _read_joint_positions(self) -> list[float]:
"""Read current joint positions. Returns zeros if SDK unavailable."""
if self._arm is not None and self._arm._initialized:
return self._arm._get_current_q()
return [0.0] * 29
def status(self) -> dict[str, Any]:
elapsed = time.monotonic() - self._started_at if self._recording else 0
return {
"recording": self._recording,
"name": self._name,
"elapsed_sec": round(elapsed, 1),
"audio_frames": len(self._audio_frames),
"motion_frames": len(self._motion_frames),
}