72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
"""Motion endpoints — arm actions, replay management."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/status")
|
|
async def motion_status():
|
|
from Project.Sanad.main import arm
|
|
return arm.status() if arm else {"error": "Arm not attached"}
|
|
|
|
|
|
@router.get("/actions")
|
|
async def list_actions():
|
|
from Project.Sanad.main import arm
|
|
return {"actions": arm.list_actions() if arm else []}
|
|
|
|
|
|
class TriggerPayload(BaseModel):
|
|
action_id: int | None = None
|
|
action_name: str | None = None
|
|
speed: float = 1.0
|
|
|
|
|
|
@router.post("/trigger")
|
|
async def trigger_action(payload: TriggerPayload):
|
|
from Project.Sanad.main import arm
|
|
if arm is None:
|
|
raise HTTPException(503, "Arm controller not attached.")
|
|
|
|
speed = max(0.1, min(payload.speed, 5.0))
|
|
|
|
# NOTE: TOCTOU on arm.is_busy is unavoidable from the route layer.
|
|
# The internal arm controller has its own _lock + _is_busy guard inside
|
|
# _execute() that returns silently if busy. We rely on that.
|
|
if payload.action_id is not None:
|
|
try:
|
|
await asyncio.to_thread(arm.trigger_by_id, payload.action_id, speed)
|
|
except KeyError as exc:
|
|
raise HTTPException(404, str(exc))
|
|
return {"ok": True, "action_id": payload.action_id, "speed": speed}
|
|
elif payload.action_name:
|
|
try:
|
|
await asyncio.to_thread(arm.trigger_by_name, payload.action_name, speed)
|
|
except KeyError as exc:
|
|
raise HTTPException(404, str(exc))
|
|
return {"ok": True, "action_name": payload.action_name, "speed": speed}
|
|
else:
|
|
raise HTTPException(400, "Provide action_id or action_name.")
|
|
|
|
|
|
@router.post("/cancel")
|
|
async def cancel_motion():
|
|
from Project.Sanad.main import arm
|
|
if arm is None:
|
|
raise HTTPException(503, "Arm controller not attached.")
|
|
arm.cancel()
|
|
return {"ok": True, "cancelled": True}
|
|
|
|
|
|
@router.post("/gestural-speaking")
|
|
async def toggle_gestural(enabled: bool = True):
|
|
from Project.Sanad.main import brain
|
|
brain.set_gestural_speaking(enabled)
|
|
return {"gestural_speaking": brain.gestural_speaking}
|