164 lines
5.5 KiB
Python
164 lines
5.5 KiB
Python
"""Macro Recorder — simultaneously captures audio + robot joint positions.
|
|
|
|
Produces a paired set of files:
|
|
recordings/audio/<name>.wav — microphone or Gemini output audio
|
|
recordings/motion/<name>.jsonl — timestamped joint positions
|
|
|
|
These can be replayed in sync via MacroPlayer.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
import wave
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from Project.Sanad.config import (
|
|
AUDIO_RECORDINGS_DIR,
|
|
CHANNELS,
|
|
CHUNK_SIZE,
|
|
MOTION_RECORDINGS_DIR,
|
|
RECEIVE_SAMPLE_RATE,
|
|
REPLAY_HZ,
|
|
)
|
|
from Project.Sanad.core.logger import get_logger
|
|
|
|
log = get_logger("macro_recorder")
|
|
|
|
|
|
class MacroRecorder:
|
|
"""Records audio + joint positions simultaneously."""
|
|
|
|
def __init__(self, arm_controller=None):
|
|
self._arm = arm_controller
|
|
self._lock = threading.Lock()
|
|
self._recording = False
|
|
self._audio_thread: threading.Thread | None = None
|
|
self._motion_thread: threading.Thread | None = None
|
|
self._stop_event = threading.Event()
|
|
self._name = ""
|
|
self._audio_frames: list[bytes] = []
|
|
self._motion_frames: list[dict[str, Any]] = []
|
|
self._started_at = 0.0
|
|
|
|
@property
|
|
def is_recording(self) -> bool:
|
|
return self._recording
|
|
|
|
def start(self, name: str) -> dict[str, Any]:
|
|
with self._lock:
|
|
if self._recording:
|
|
raise RuntimeError("Already recording a macro.")
|
|
self._recording = True
|
|
self._name = name
|
|
self._stop_event.clear()
|
|
self._audio_frames = []
|
|
self._motion_frames = []
|
|
self._started_at = time.monotonic()
|
|
|
|
AUDIO_RECORDINGS_DIR.mkdir(parents=True, exist_ok=True)
|
|
MOTION_RECORDINGS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._audio_thread = threading.Thread(target=self._record_audio, daemon=True)
|
|
self._motion_thread = threading.Thread(target=self._record_motion, daemon=True)
|
|
self._audio_thread.start()
|
|
self._motion_thread.start()
|
|
|
|
log.info("Macro recording started: %s", name)
|
|
return {"recording": True, "name": name}
|
|
|
|
def stop(self) -> dict[str, Any]:
|
|
with self._lock:
|
|
if not self._recording:
|
|
raise RuntimeError("No macro recording in progress.")
|
|
|
|
self._stop_event.set()
|
|
if self._audio_thread:
|
|
self._audio_thread.join(timeout=3.0)
|
|
if self._motion_thread:
|
|
self._motion_thread.join(timeout=3.0)
|
|
|
|
# Save audio
|
|
audio_path = AUDIO_RECORDINGS_DIR / f"{self._name}.wav"
|
|
pcm = b"".join(self._audio_frames)
|
|
with wave.open(str(audio_path), "wb") as wf:
|
|
wf.setnchannels(CHANNELS)
|
|
wf.setsampwidth(2) # int16
|
|
wf.setframerate(RECEIVE_SAMPLE_RATE)
|
|
wf.writeframes(pcm)
|
|
|
|
# Save motion
|
|
motion_path = MOTION_RECORDINGS_DIR / f"{self._name}.jsonl"
|
|
with open(motion_path, "w") as f:
|
|
f.write(json.dumps({"meta": {"hz": REPLAY_HZ, "motors": 29}}) + "\n")
|
|
for frame in self._motion_frames:
|
|
f.write(json.dumps(frame) + "\n")
|
|
|
|
duration = time.monotonic() - self._started_at
|
|
|
|
with self._lock:
|
|
self._recording = False
|
|
|
|
log.info("Macro saved: audio=%s motion=%s (%.1fs)", audio_path, motion_path, duration)
|
|
return {
|
|
"recording": False,
|
|
"name": self._name,
|
|
"audio_path": str(audio_path),
|
|
"motion_path": str(motion_path),
|
|
"duration_sec": round(duration, 2),
|
|
"audio_frames": len(self._audio_frames),
|
|
"motion_frames": len(self._motion_frames),
|
|
}
|
|
|
|
def _record_audio(self):
|
|
"""Capture mic audio in background thread."""
|
|
try:
|
|
import pyaudio
|
|
|
|
pya = pyaudio.PyAudio()
|
|
stream = pya.open(
|
|
format=pyaudio.paInt16,
|
|
channels=CHANNELS,
|
|
rate=RECEIVE_SAMPLE_RATE,
|
|
input=True,
|
|
frames_per_buffer=CHUNK_SIZE,
|
|
)
|
|
while not self._stop_event.is_set():
|
|
data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
|
|
self._audio_frames.append(data)
|
|
stream.stop_stream()
|
|
stream.close()
|
|
pya.terminate()
|
|
except Exception as exc:
|
|
log.error("Audio recording failed: %s", exc)
|
|
|
|
def _record_motion(self):
|
|
"""Capture joint positions at REPLAY_HZ."""
|
|
interval = 1.0 / REPLAY_HZ
|
|
t0 = time.monotonic()
|
|
while not self._stop_event.is_set():
|
|
t = round(time.monotonic() - t0, 4)
|
|
# Read current joint positions from arm controller
|
|
q = self._read_joint_positions()
|
|
self._motion_frames.append({"t": t, "q": q})
|
|
time.sleep(interval)
|
|
|
|
def _read_joint_positions(self) -> list[float]:
|
|
"""Read current joint positions. Returns zeros if SDK unavailable."""
|
|
if self._arm is not None and self._arm._initialized:
|
|
return self._arm._get_current_q()
|
|
return [0.0] * 29
|
|
|
|
def status(self) -> dict[str, Any]:
|
|
elapsed = time.monotonic() - self._started_at if self._recording else 0
|
|
return {
|
|
"recording": self._recording,
|
|
"name": self._name,
|
|
"elapsed_sec": round(elapsed, 1),
|
|
"audio_frames": len(self._audio_frames),
|
|
"motion_frames": len(self._motion_frames),
|
|
}
|