Sanadv3/dashboard/routes/mask_social.py

396 lines
14 KiB
Python

"""Social-media / QR display on the LED mask.
Renders a QR code (for a preset Instagram account) or an uploaded image onto the
mask's 46x58 display and holds it via the FaceController's reserved scratch slot
until the animated face is resumed. The shared helper :func:`show_social_on_mask`
is also called from the Gemini ``[[SHOW:account]]`` relay wired in ``main.py``.
Routes (under /api/mask):
POST /social/{account} -> show a preset Instagram QR
POST /qr -> upload an image (QR or any picture) + show it
POST /face/resume -> stop showing the scratch image, return to the face
GET /social -> list the preset accounts
"""
from __future__ import annotations
import asyncio
import io
import logging
import os
import sys
from pathlib import Path
import re
from fastapi import APIRouter, File, HTTPException, Query, UploadFile
from fastapi.responses import FileResponse
log = logging.getLogger("sanad.mask_social")
router = APIRouter() # prefix "/api/mask" supplied by dashboard/app.py _REST_ROUTES
# Preset Instagram accounts the mask can show as a QR. The mask is a low-res
# 46x58 panel, so a full-URL QR is dense; the black margin acts as the quiet
# zone and we scale modules crisply (NEAREST) to give it the best chance.
SOCIAL = {
"bu_sunaidah": {"handle": "@bu.sunaidah",
"url": "https://instagram.com/bu.sunaidah",
"short": "da.gd/VMkH8J"}, # -> instagram.com/bu.sunaidah (v1 QR)
"yslootahtech": {"handle": "@yslootahtech",
"url": "https://instagram.com/yslootahtech",
"short": "da.gd/Qr8RO"}, # -> instagram.com/yslootahtech (v1 QR)
}
def _ensure_mask_path() -> None:
"""Make the flat Mask lib (colorface) importable from this route — using the
SAME dir the FaceController resolved (the Mask lib lives outside the repo)."""
d = os.environ.get("SANAD_MASK_DIR")
if not d:
try:
from Project.Sanad.main import mask_face as _mf # type: ignore
d = getattr(_mf, "mask_dir", None)
except Exception:
d = None
if not d:
d = str(Path(__file__).resolve().parents[2] / "Mask")
if d and d not in sys.path:
sys.path.insert(0, d)
def _get_face():
from Project.Sanad.main import mask_face # type: ignore
if mask_face is None:
raise HTTPException(status_code=503, detail="mask face unavailable")
return mask_face
_EYE_BAND = 16 # top rows reserved for the cyan eyes; the code sits below them
def _compose_under_eyes(inner) -> bytes:
"""Draw two cyan eyes across the top and place ``inner`` (a QR / image) in the
area BELOW them, then encode for the mask. Keeps the panel looking like a face
with a code under the eyes instead of a full-screen QR."""
_ensure_mask_path()
import colorface as cf
from PIL import Image, ImageDraw
W, H = cf.DISPLAY_W, cf.DISPLAY_H
inner = inner.convert("RGB")
iw, ih = inner.size
# keep the code a small badge under the eyes (~70% of the space below them)
target = max(20, int(min(W, H - _EYE_BAND - 1) * 0.72))
if iw <= target and ih <= target:
s = max(1, min(target // iw, target // ih)) # crisp integer up-scale (QR)
nw, nh = iw * s, ih * s
else:
s = min(target / iw, target / ih) # scale big images down
nw, nh = max(1, int(iw * s)), max(1, int(ih * s))
inner = inner.resize((nw, nh), Image.NEAREST)
canvas = Image.new("RGB", (W, H), (0, 0, 0))
g = ImageDraw.Draw(canvas)
eye = cf.DEFAULT_EYE
for cx in (W // 2 - 10, W // 2 + 10): # two eyes at the top
g.ellipse([cx - 5, 3, cx + 5, 13], fill=(255, 255, 255))
g.ellipse([cx - 3, 5, cx + 3, 11], fill=eye)
g.ellipse([cx - 1, 7, cx + 1, 10], fill=(0, 0, 0))
x = (W - nw) // 2
y = _EYE_BAND + (H - _EYE_BAND - nh) // 2
canvas.paste(inner, (max(0, x), max(_EYE_BAND, y)))
return cf.encode(canvas)
def _qr_bytes(url: str) -> bytes:
"""Render a QR for ``url`` FULL-SCREEN with the largest crisp (integer) module
size the 46-wide panel allows — the only way it has any chance of scanning.
Only a ~version-1 QR (<=17 chars) reaches ~2 px/module; longer data is denser
and won't scan. Returns (bytes, qr_version)."""
_ensure_mask_path()
import qrcode
from PIL import Image
import colorface as cf
W, H = cf.DISPLAY_W, cf.DISPLAY_H
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=1, border=1)
qr.add_data(url)
qr.make(fit=True)
q = qr.make_image(fill_color=(255, 255, 255),
back_color=(0, 0, 0)).convert("RGB")
scale = max(1, min(W, H) // max(1, q.width)) # largest integer that fits
if scale > 1:
q = q.resize((q.width * scale, q.width * scale), Image.NEAREST)
canvas = Image.new("RGB", (W, H), (0, 0, 0))
canvas.paste(q, ((W - q.width) // 2, (H - q.height) // 2))
return cf.encode(canvas)
def _image_bytes(img) -> bytes:
"""Show an uploaded QR/image FULL-SCREEN, crisp (NEAREST) — best effort."""
_ensure_mask_path()
import colorface as cf
from PIL import Image
W, H = cf.DISPLAY_W, cf.DISPLAY_H
s = min(W, H)
img = img.convert("RGB").resize((s, s), Image.NEAREST)
canvas = Image.new("RGB", (W, H), (0, 0, 0))
canvas.paste(img, ((W - s) // 2, (H - s) // 2))
return cf.encode(canvas)
def show_social_on_mask(account: str) -> dict:
"""Show the account's **scannable** QR on the mask — a version-1 QR made from
a short (da.gd) link that redirects to the Instagram profile. Shared by the
dashboard button and the Gemini ``show_social`` tool. Raises for an unknown
account; propagates FaceController errors (e.g. not connected)."""
acc = SOCIAL.get(str(account).strip().lower())
if not acc:
raise HTTPException(status_code=404, detail="unknown account")
data = _qr_bytes(acc.get("short") or acc["url"]) # v1 short link -> scannable
mf = _get_face()
res = mf.show_scratch_image(data)
log.info("showing scannable social QR on mask: %s (%s)", acc["handle"], acc.get("short"))
return {"ok": True, "handle": acc["handle"], "scannable": True, **(res or {})}
@router.get("/social")
async def list_social():
return {"accounts": [{"id": k, "handle": v["handle"]} for k, v in SOCIAL.items()]}
def _friendly(exc: Exception) -> HTTPException:
"""Map FaceController errors to clean HTTP responses (esp. the common
'mask not connected' — usually the mask is off / far / held by the phone app)."""
if isinstance(exc, HTTPException):
return exc
msg = str(exc)
if "not connected" in msg or "not started" in msg or "MASK" in msg:
return HTTPException(status_code=503, detail=(
"Mask not connected — power it on, bring it close to the robot, and "
"free it from the phone app."))
log.exception("mask scratch op failed")
return HTTPException(status_code=500, detail="%s: %s" % (type(exc).__name__, msg))
@router.post("/social/{account}")
async def show_social(account: str):
try:
return await asyncio.to_thread(show_social_on_mask, account)
except Exception as exc:
raise _friendly(exc)
@router.post("/qr")
async def upload_qr(file: UploadFile = File(...)):
"""Upload an image (a QR you generated, or any picture) and show it on the mask."""
raw = await file.read()
if not raw:
raise HTTPException(status_code=400, detail="empty upload")
from PIL import Image
try:
img = Image.open(io.BytesIO(raw))
img.load()
except Exception:
raise HTTPException(status_code=400, detail="not a valid image")
try:
data = await asyncio.to_thread(_image_bytes, img)
mf = _get_face()
return await asyncio.to_thread(mf.show_scratch_image, data)
except Exception as exc:
raise _friendly(exc)
@router.post("/face/resume")
async def resume_face():
"""Stop showing the scratch image and resume the animated face."""
mf = _get_face()
return await asyncio.to_thread(mf.set_expression, None)
@router.post("/face/mouth")
async def face_mouth(hidden: bool = Query(...)):
"""Show (hidden=false) or hide (hidden=true) the mouth on the animated face."""
mf = _get_face()
return await asyncio.to_thread(mf.set_mouth_hidden, hidden)
@router.post("/link")
async def face_link(on: bool = Query(...)):
"""Link (on=true) / unlink (on=false) Gemini <-> the mask.
ON connects the mask + lets Gemini drive its emotions/social.
OFF tears the link down (no BLE churn) and Gemini stops touching the mask.
Default state is OFF. Runs in a thread — a link-on may briefly block while it
makes its first connect attempt."""
mf = _get_face()
return await asyncio.to_thread(mf.set_gemini_linked, on)
# ── saved QR library ────────────────────────────────────────────────
# Upload QR/images, save them by name, list/show/delete them. Stored as PNGs
# under data/qr_codes so they persist across restarts.
_QR_DIR = None
def _qr_dir() -> Path:
global _QR_DIR
if _QR_DIR is None:
try:
from Project.Sanad.config import BASE_DIR
base = Path(BASE_DIR)
except Exception:
base = Path(__file__).resolve().parents[2]
_QR_DIR = base / "data" / "qr_codes"
_QR_DIR.mkdir(parents=True, exist_ok=True)
return _QR_DIR
def _safe_name(name: str) -> str:
n = re.sub(r"[^A-Za-z0-9_.-]", "_", (name or "").strip())[:40].strip("._")
return n or "qr"
@router.post("/qr/save")
async def qr_save(name: str = Query(...), file: UploadFile = File(...)):
"""Save an uploaded QR/image into the library under ``name``."""
raw = await file.read()
if not raw:
raise HTTPException(status_code=400, detail="empty upload")
from PIL import Image
try:
img = Image.open(io.BytesIO(raw))
img.load()
except Exception:
raise HTTPException(status_code=400, detail="not a valid image")
sn = _safe_name(name)
await asyncio.to_thread(img.convert("RGB").save, str(_qr_dir() / (sn + ".png")))
return {"ok": True, "name": sn}
@router.post("/qr/save_link")
async def qr_save_link(name: str = Query(...), url: str = Query(...)):
"""Generate a QR from ``url`` and save it to the library. Returns the QR
version + whether it's short enough to actually scan on the mask (version 1)."""
u = (url or "").strip()
if not u:
raise HTTPException(status_code=400, detail="empty url")
_ensure_mask_path()
import qrcode
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10, border=2)
qr.add_data(u)
qr.make(fit=True)
img = qr.make_image(fill_color=(255, 255, 255),
back_color=(0, 0, 0)).convert("RGB")
sn = _safe_name(name or u)
await asyncio.to_thread(img.save, str(_qr_dir() / (sn + ".png")))
return {"ok": True, "name": sn, "version": qr.version,
"scannable_on_mask": qr.version <= 1,
"note": ("scannable" if qr.version <= 1 else
"too dense to scan on the mask — use a shorter link")}
@router.get("/qr/library")
async def qr_library():
"""List the saved QR names."""
return {"qr": sorted(p.stem for p in _qr_dir().glob("*.png"))}
@router.get("/qr/thumb/{name}")
async def qr_thumb(name: str):
"""Serve a saved QR image (for the dashboard thumbnail)."""
p = _qr_dir() / (_safe_name(name) + ".png")
if not p.exists():
raise HTTPException(status_code=404, detail="not found")
return FileResponse(str(p), media_type="image/png")
@router.post("/qr/show/{name}")
async def qr_show(name: str):
"""Show a saved QR (under the eyes) on the mask."""
p = _qr_dir() / (_safe_name(name) + ".png")
if not p.exists():
raise HTTPException(status_code=404, detail="not found")
from PIL import Image
try:
img = Image.open(p)
data = await asyncio.to_thread(_image_bytes, img)
mf = _get_face()
return await asyncio.to_thread(mf.show_scratch_image, data)
except Exception as exc:
raise _friendly(exc)
@router.delete("/qr/{name}")
async def qr_delete(name: str):
"""Delete a saved QR from the library."""
p = _qr_dir() / (_safe_name(name) + ".png")
if p.exists():
p.unlink()
return {"ok": True, "deleted": _safe_name(name)}
# ── saved TEXT library ──────────────────────────────────────────────
# Save words/phrases and scroll any of them across the mask on demand.
_TEXT_DIR = None
def _text_dir() -> Path:
global _TEXT_DIR
if _TEXT_DIR is None:
try:
from Project.Sanad.config import BASE_DIR
base = Path(BASE_DIR)
except Exception:
base = Path(__file__).resolve().parents[2]
_TEXT_DIR = base / "data" / "mask_texts"
_TEXT_DIR.mkdir(parents=True, exist_ok=True)
return _TEXT_DIR
@router.post("/texts/save")
async def text_save(text: str = Query(...), name: str = Query("")):
"""Save a word/phrase to the text library (name defaults to the text)."""
t = (text or "").strip()[:200]
if not t:
raise HTTPException(status_code=400, detail="empty text")
nm = _safe_name(name or t)
await asyncio.to_thread((_text_dir() / (nm + ".txt")).write_text, t)
return {"ok": True, "name": nm, "text": t}
@router.get("/texts/library")
async def text_library():
"""List the saved texts."""
out = []
for p in sorted(_text_dir().glob("*.txt")):
try:
out.append({"name": p.stem, "text": p.read_text()[:80]})
except Exception:
pass
return {"texts": out}
@router.post("/texts/show/{name}")
async def text_show(name: str):
"""Scroll a saved text across the mask."""
p = _text_dir() / (_safe_name(name) + ".txt")
if not p.exists():
raise HTTPException(status_code=404, detail="not found")
txt = p.read_text()
mf = _get_face()
try:
return await asyncio.to_thread(mf.set_text, txt, (255, 255, 255), None, None, 38)
except Exception as exc:
raise _friendly(exc)
@router.delete("/texts/{name}")
async def text_delete(name: str):
"""Delete a saved text."""
p = _text_dir() / (_safe_name(name) + ".txt")
if p.exists():
p.unlink()
return {"ok": True, "deleted": _safe_name(name)}