220 lines
7.7 KiB
Python

"""Macro recording and playback endpoints."""
from __future__ import annotations
import asyncio
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from Project.Sanad.config import AUDIO_RECORDINGS_DIR, MOTIONS_DIR
from Project.Sanad.core.logger import get_logger
log = get_logger("macros_route")
router = APIRouter()
class MacroName(BaseModel):
name: str
class ComboPlayPayload(BaseModel):
audio_file: str = "" # filename under data/audio/ (or empty for none)
motion_file: str = "" # DEPRECATED — use action_id. Still accepted for bare JSONL by filename.
action_id: int | None = None # arm_controller action id (SDK built-in OR JSONL) — preferred
speed: float = 1.0
@router.get("/")
async def list_macros():
from Project.Sanad.main import macro_play
if macro_play is None:
return {"macros": []}
return {"macros": macro_play.list_macros()}
@router.get("/status")
async def macro_status():
from Project.Sanad.main import macro_rec, macro_play
return {
"recorder": macro_rec.status() if macro_rec else {},
"player": macro_play.status() if macro_play else {},
}
@router.post("/record/start")
async def start_recording(payload: MacroName):
from Project.Sanad.main import macro_rec
if macro_rec is None:
raise HTTPException(503, "Macro recorder not available.")
return macro_rec.start(payload.name)
@router.post("/record/stop")
async def stop_recording():
import asyncio
from Project.Sanad.main import macro_rec
if macro_rec is None:
raise HTTPException(503, "Macro recorder not available.")
return await asyncio.to_thread(macro_rec.stop)
@router.post("/play")
async def play_macro(payload: MacroName):
from Project.Sanad.main import brain
return await brain.play_macro(payload.name)
@router.post("/stop")
async def stop_macro():
from Project.Sanad.main import macro_play
if macro_play:
macro_play.stop()
return {"ok": True}
# ─── Ad-hoc audio + motion combined playback ─────────────────────────
# List the two catalogues so the dashboard can populate dropdowns, then
# play the chosen pair in parallel (asyncio.gather) — same scheme the
# Brain uses for `parallel`-mode skills, but ad-hoc instead of predefined.
@router.get("/audio-files")
async def list_audio_files():
"""Enumerate playable audio files under data/audio/."""
AUDIO_RECORDINGS_DIR.mkdir(parents=True, exist_ok=True)
files = []
for p in sorted(AUDIO_RECORDINGS_DIR.glob("*.wav")):
try:
files.append({
"name": p.name,
"size_kb": round(p.stat().st_size / 1024, 1),
})
except OSError:
continue
return {"files": files, "dir": str(AUDIO_RECORDINGS_DIR)}
@router.get("/motion-files")
async def list_motion_files():
"""Enumerate playable .jsonl motions under data/motions/ (thin wrapper
so the Macro Recorder dropdown doesn't have to call the replay route)."""
MOTIONS_DIR.mkdir(parents=True, exist_ok=True)
files = []
for p in sorted(MOTIONS_DIR.glob("*.jsonl")):
try:
files.append({
"name": p.name,
"size_kb": round(p.stat().st_size / 1024, 1),
})
except OSError:
continue
return {"files": files, "dir": str(MOTIONS_DIR)}
@router.post("/stop-combined")
async def stop_combined():
"""Immediately stop any in-flight combined playback.
- `arm.cancel()` — breaks the replay loop and triggers the smooth
return-to-home ramp (see `_return_home` in arm_controller.py).
- `audio_mgr.stop_playback()` — sends AUDIO_STOP_PLAY to the G1
chest speaker via DDS.
Both run unconditionally so Stop works even if only one side was
actually playing.
"""
from Project.Sanad.main import audio_mgr, arm
result = {"motion_stopped": False, "audio_stopped": False}
if arm is not None:
try:
arm.cancel()
result["motion_stopped"] = True
except Exception as exc:
log.warning("stop-combined: arm.cancel failed: %s", exc)
result["motion_error"] = str(exc)
if audio_mgr is not None:
try:
audio_mgr.stop_playback()
result["audio_stopped"] = True
except Exception as exc:
log.warning("stop-combined: audio stop failed: %s", exc)
result["audio_error"] = str(exc)
return {"ok": True, **result}
@router.post("/play-combined")
async def play_combined(payload: ComboPlayPayload):
"""Fire a user-picked audio clip and arm action in parallel.
Motion dispatch is via `arm.trigger_by_id(action_id)` which handles
BOTH SDK built-in actions (shake_hand, wave, …) and recorded JSONL
replays. Audio goes through `audio_mgr.play_wav` (routed to the G1
chest speaker via DDS). Either side may be omitted.
"""
from Project.Sanad.main import audio_mgr, arm
has_audio = bool(payload.audio_file)
has_motion = payload.action_id is not None or bool(payload.motion_file)
if not has_audio and not has_motion:
raise HTTPException(400, "pick at least one of audio_file / action_id / motion_file")
tasks = []
result: dict = {
"audio_file": payload.audio_file,
"action_id": payload.action_id,
"motion_file": payload.motion_file,
}
if has_audio:
if audio_mgr is None:
raise HTTPException(503, "AudioManager not available")
audio_path = (AUDIO_RECORDINGS_DIR / payload.audio_file).resolve()
try:
audio_path.relative_to(AUDIO_RECORDINGS_DIR.resolve())
except ValueError:
raise HTTPException(400, "audio_file path traversal denied")
if not audio_path.exists():
raise HTTPException(404, f"audio not found: {payload.audio_file}")
async def _play_audio():
try:
await asyncio.to_thread(audio_mgr.play_wav, audio_path)
result["audio_played"] = audio_path.name
except Exception as exc:
log.exception("combined play: audio failed")
result["audio_error"] = str(exc)
tasks.append(_play_audio())
if has_motion:
if arm is None:
raise HTTPException(503, "ArmController not available")
async def _play_motion():
try:
if payload.action_id is not None:
# SDK built-in OR JSONL — arm.trigger_by_id handles both
await asyncio.to_thread(arm.trigger_by_id,
int(payload.action_id),
payload.speed)
result["motion_played"] = f"action_id={payload.action_id}"
else:
# Legacy path: bare JSONL filename
motion_path = (MOTIONS_DIR / payload.motion_file).resolve()
try:
motion_path.relative_to(MOTIONS_DIR.resolve())
except ValueError:
result["motion_error"] = "motion_file path traversal denied"
return
if not motion_path.exists():
result["motion_error"] = f"motion not found: {payload.motion_file}"
return
await asyncio.to_thread(arm.replay_file, str(motion_path), payload.speed)
result["motion_played"] = motion_path.name
except Exception as exc:
log.exception("combined play: motion failed")
result["motion_error"] = str(exc)
tasks.append(_play_motion())
await asyncio.gather(*tasks)
return {"ok": True, **result}