"""Macro recording and playback endpoints.""" from __future__ import annotations import asyncio from pathlib import Path from fastapi import APIRouter, HTTPException from pydantic import BaseModel from Project.Sanad.config import AUDIO_RECORDINGS_DIR, MOTIONS_DIR from Project.Sanad.core.logger import get_logger log = get_logger("macros_route") router = APIRouter() class MacroName(BaseModel): name: str class ComboPlayPayload(BaseModel): audio_file: str = "" # filename under data/audio/ (or empty for none) motion_file: str = "" # DEPRECATED — use action_id. Still accepted for bare JSONL by filename. action_id: int | None = None # arm_controller action id (SDK built-in OR JSONL) — preferred speed: float = 1.0 @router.get("/") async def list_macros(): from Project.Sanad.main import macro_play if macro_play is None: return {"macros": []} return {"macros": macro_play.list_macros()} @router.get("/status") async def macro_status(): from Project.Sanad.main import macro_rec, macro_play return { "recorder": macro_rec.status() if macro_rec else {}, "player": macro_play.status() if macro_play else {}, } @router.post("/record/start") async def start_recording(payload: MacroName): from Project.Sanad.main import macro_rec if macro_rec is None: raise HTTPException(503, "Macro recorder not available.") return macro_rec.start(payload.name) @router.post("/record/stop") async def stop_recording(): import asyncio from Project.Sanad.main import macro_rec if macro_rec is None: raise HTTPException(503, "Macro recorder not available.") return await asyncio.to_thread(macro_rec.stop) @router.post("/play") async def play_macro(payload: MacroName): from Project.Sanad.main import brain return await brain.play_macro(payload.name) @router.post("/stop") async def stop_macro(): from Project.Sanad.main import macro_play if macro_play: macro_play.stop() return {"ok": True} # ─── Ad-hoc audio + motion combined playback ───────────────────────── # List the two catalogues so the dashboard can populate dropdowns, then # play the chosen pair in parallel (asyncio.gather) — same scheme the # Brain uses for `parallel`-mode skills, but ad-hoc instead of predefined. @router.get("/audio-files") async def list_audio_files(): """Enumerate playable audio files under data/audio/.""" AUDIO_RECORDINGS_DIR.mkdir(parents=True, exist_ok=True) files = [] for p in sorted(AUDIO_RECORDINGS_DIR.glob("*.wav")): try: files.append({ "name": p.name, "size_kb": round(p.stat().st_size / 1024, 1), }) except OSError: continue return {"files": files, "dir": str(AUDIO_RECORDINGS_DIR)} @router.get("/motion-files") async def list_motion_files(): """Enumerate playable .jsonl motions under data/motions/ (thin wrapper so the Macro Recorder dropdown doesn't have to call the replay route).""" MOTIONS_DIR.mkdir(parents=True, exist_ok=True) files = [] for p in sorted(MOTIONS_DIR.glob("*.jsonl")): try: files.append({ "name": p.name, "size_kb": round(p.stat().st_size / 1024, 1), }) except OSError: continue return {"files": files, "dir": str(MOTIONS_DIR)} @router.post("/stop-combined") async def stop_combined(): """Immediately stop any in-flight combined playback. - `arm.cancel()` — breaks the replay loop and triggers the smooth return-to-home ramp (see `_return_home` in arm_controller.py). - `audio_mgr.stop_playback()` — sends AUDIO_STOP_PLAY to the G1 chest speaker via DDS. Both run unconditionally so Stop works even if only one side was actually playing. """ from Project.Sanad.main import audio_mgr, arm result = {"motion_stopped": False, "audio_stopped": False} if arm is not None: try: arm.cancel() result["motion_stopped"] = True except Exception as exc: log.warning("stop-combined: arm.cancel failed: %s", exc) result["motion_error"] = str(exc) if audio_mgr is not None: try: audio_mgr.stop_playback() result["audio_stopped"] = True except Exception as exc: log.warning("stop-combined: audio stop failed: %s", exc) result["audio_error"] = str(exc) return {"ok": True, **result} @router.post("/play-combined") async def play_combined(payload: ComboPlayPayload): """Fire a user-picked audio clip and arm action in parallel. Motion dispatch is via `arm.trigger_by_id(action_id)` which handles BOTH SDK built-in actions (shake_hand, wave, …) and recorded JSONL replays. Audio goes through `audio_mgr.play_wav` (routed to the G1 chest speaker via DDS). Either side may be omitted. """ from Project.Sanad.main import audio_mgr, arm has_audio = bool(payload.audio_file) has_motion = payload.action_id is not None or bool(payload.motion_file) if not has_audio and not has_motion: raise HTTPException(400, "pick at least one of audio_file / action_id / motion_file") tasks = [] result: dict = { "audio_file": payload.audio_file, "action_id": payload.action_id, "motion_file": payload.motion_file, } if has_audio: if audio_mgr is None: raise HTTPException(503, "AudioManager not available") audio_path = (AUDIO_RECORDINGS_DIR / payload.audio_file).resolve() try: audio_path.relative_to(AUDIO_RECORDINGS_DIR.resolve()) except ValueError: raise HTTPException(400, "audio_file path traversal denied") if not audio_path.exists(): raise HTTPException(404, f"audio not found: {payload.audio_file}") async def _play_audio(): try: await asyncio.to_thread(audio_mgr.play_wav, audio_path) result["audio_played"] = audio_path.name except Exception as exc: log.exception("combined play: audio failed") result["audio_error"] = str(exc) tasks.append(_play_audio()) if has_motion: if arm is None: raise HTTPException(503, "ArmController not available") async def _play_motion(): try: if payload.action_id is not None: # SDK built-in OR JSONL — arm.trigger_by_id handles both await asyncio.to_thread(arm.trigger_by_id, int(payload.action_id), payload.speed) result["motion_played"] = f"action_id={payload.action_id}" else: # Legacy path: bare JSONL filename motion_path = (MOTIONS_DIR / payload.motion_file).resolve() try: motion_path.relative_to(MOTIONS_DIR.resolve()) except ValueError: result["motion_error"] = "motion_file path traversal denied" return if not motion_path.exists(): result["motion_error"] = f"motion not found: {payload.motion_file}" return await asyncio.to_thread(arm.replay_file, str(motion_path), payload.speed) result["motion_played"] = motion_path.name except Exception as exc: log.exception("combined play: motion failed") result["motion_error"] = str(exc) tasks.append(_play_motion()) await asyncio.gather(*tasks) return {"ok": True, **result}