180 lines
5.6 KiB
Python

"""Mask Face tab — Shining LED face mask control (BLE).
Routes live under /api/mask. Backed by the FaceController subsystem
(face/mask_face.py), which owns a dedicated asyncio loop + BLE connection to the
standalone Mask project's `shiningmask` library.
Every handler is failure-safe: if the subsystem or its library is unavailable it
returns 503 (GET /status returns a degraded body) rather than crash the
dashboard. FaceController raises RuntimeError for "not connected" / "face not
started"; those map to 409. Blocking BLE calls run in a thread pool so the event
loop stays responsive.
"""
from __future__ import annotations
import asyncio
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from Project.Sanad.core.logger import get_logger
log = get_logger("mask_routes")
router = APIRouter()
# ── lazy subsystem accessor ─────────────────────────────────
def _get_face():
try:
from Project.Sanad.main import mask_face # type: ignore
return mask_face
except Exception:
return None
def _require():
mf = _get_face()
if mf is None:
raise HTTPException(503, "Mask face subsystem unavailable.")
return mf
def _run(fn, *args, **kwargs):
"""Call a FaceController method, mapping its errors to HTTP status codes."""
try:
return fn(*args, **kwargs)
except HTTPException:
raise
except RuntimeError as exc:
raise HTTPException(409, str(exc))
except Exception as exc: # noqa: BLE001
log.exception("mask operation failed")
raise HTTPException(500, str(exc))
# ── status ──────────────────────────────────────────────────
@router.get("/status")
async def status():
"""Never raises — returns a degraded body if the subsystem is missing."""
mf = _get_face()
if mf is None:
return {"available": False, "connected": False, "lib_available": False,
"last_error": "mask face subsystem not constructed"}
s = await asyncio.to_thread(mf.status)
s["available"] = True
return s
# ── connection ──────────────────────────────────────────────
@router.post("/connect")
async def connect(timeout: Optional[float] = Query(None),
attempts: Optional[int] = Query(None)):
mf = _require()
return await asyncio.to_thread(_run, mf.connect, timeout, attempts)
@router.post("/disconnect")
async def disconnect():
mf = _require()
return await asyncio.to_thread(_run, mf.disconnect)
# ── simple commands ─────────────────────────────────────────
@router.post("/brightness")
async def brightness(level: int = Query(..., ge=0, le=255)):
mf = _require()
return await asyncio.to_thread(_run, mf.set_brightness, level)
class TextBody(BaseModel):
text: str = ""
color: List[int] = [255, 255, 255]
mode: Optional[int] = None
bg: Optional[List[int]] = None # background RGB (None -> black)
speed: Optional[int] = None # scroll speed 0-255 (None -> firmware default)
@router.post("/text")
async def text(body: TextBody):
mf = _require()
bg = tuple(body.bg) if body.bg else None
return await asyncio.to_thread(_run, mf.set_text, body.text, tuple(body.color),
body.mode, bg, body.speed)
@router.post("/image")
async def image(id: int = Query(...)):
mf = _require()
return await asyncio.to_thread(_run, mf.show_image, id)
@router.post("/animation")
async def animation(id: int = Query(...)):
mf = _require()
return await asyncio.to_thread(_run, mf.play_animation, id)
@router.post("/clear")
async def clear():
mf = _require()
return await asyncio.to_thread(_run, mf.clear_diy)
# ── animated face ───────────────────────────────────────────
@router.post("/face/start")
async def face_start(reload: bool = Query(False)):
mf = _require()
return await asyncio.to_thread(_run, mf.face_start, reload)
@router.post("/face/stop")
async def face_stop():
mf = _require()
return await asyncio.to_thread(_run, mf.face_stop)
@router.post("/face/return")
async def face_return():
"""Resume the live animated face after a text/image/animation override."""
mf = _require()
return await asyncio.to_thread(_run, mf.return_face)
class FaceColorBody(BaseModel):
eye: Optional[List[int]] = None # eye/iris RGB
mouth: Optional[List[int]] = None # mouth RGB
sclera: Optional[List[int]] = None # white-of-the-eye RGB
@router.post("/face/color")
async def face_color(body: FaceColorBody):
"""Recolor the animated face (re-uploads the frame set if the face is live)."""
mf = _require()
return await asyncio.to_thread(_run, mf.set_face_color, body.eye, body.mouth, body.sclera)
@router.post("/speaking")
async def speaking(on: bool = Query(...)):
mf = _require()
return await asyncio.to_thread(_run, mf.set_speaking, on)
@router.post("/mouth")
async def mouth(level: int = Query(..., ge=0, le=3)):
mf = _require()
return await asyncio.to_thread(_run, mf.set_mouth, level)
@router.post("/expression/{name}")
async def expression(name: str):
mf = _require()
return await asyncio.to_thread(_run, mf.show_expression, name)