396 lines
14 KiB
Python
396 lines
14 KiB
Python
"""Social-media / QR display on the LED mask.
|
|
|
|
Renders a QR code (for a preset Instagram account) or an uploaded image onto the
|
|
mask's 46x58 display and holds it via the FaceController's reserved scratch slot
|
|
until the animated face is resumed. The shared helper :func:`show_social_on_mask`
|
|
is also called from the Gemini ``[[SHOW:account]]`` relay wired in ``main.py``.
|
|
|
|
Routes (under /api/mask):
|
|
POST /social/{account} -> show a preset Instagram QR
|
|
POST /qr -> upload an image (QR or any picture) + show it
|
|
POST /face/resume -> stop showing the scratch image, return to the face
|
|
GET /social -> list the preset accounts
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import io
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import re
|
|
|
|
from fastapi import APIRouter, File, HTTPException, Query, UploadFile
|
|
from fastapi.responses import FileResponse
|
|
|
|
log = logging.getLogger("sanad.mask_social")
|
|
router = APIRouter() # prefix "/api/mask" supplied by dashboard/app.py _REST_ROUTES
|
|
|
|
# Preset Instagram accounts the mask can show as a QR. The mask is a low-res
|
|
# 46x58 panel, so a full-URL QR is dense; the black margin acts as the quiet
|
|
# zone and we scale modules crisply (NEAREST) to give it the best chance.
|
|
SOCIAL = {
|
|
"bu_sunaidah": {"handle": "@bu.sunaidah",
|
|
"url": "https://instagram.com/bu.sunaidah",
|
|
"short": "da.gd/VMkH8J"}, # -> instagram.com/bu.sunaidah (v1 QR)
|
|
"yslootahtech": {"handle": "@yslootahtech",
|
|
"url": "https://instagram.com/yslootahtech",
|
|
"short": "da.gd/Qr8RO"}, # -> instagram.com/yslootahtech (v1 QR)
|
|
}
|
|
|
|
|
|
def _ensure_mask_path() -> None:
|
|
"""Make the flat Mask lib (colorface) importable from this route — using the
|
|
SAME dir the FaceController resolved (the Mask lib lives outside the repo)."""
|
|
d = os.environ.get("SANAD_MASK_DIR")
|
|
if not d:
|
|
try:
|
|
from Project.Sanad.main import mask_face as _mf # type: ignore
|
|
d = getattr(_mf, "mask_dir", None)
|
|
except Exception:
|
|
d = None
|
|
if not d:
|
|
d = str(Path(__file__).resolve().parents[2] / "Mask")
|
|
if d and d not in sys.path:
|
|
sys.path.insert(0, d)
|
|
|
|
|
|
def _get_face():
|
|
from Project.Sanad.main import mask_face # type: ignore
|
|
if mask_face is None:
|
|
raise HTTPException(status_code=503, detail="mask face unavailable")
|
|
return mask_face
|
|
|
|
|
|
_EYE_BAND = 16 # top rows reserved for the cyan eyes; the code sits below them
|
|
|
|
|
|
def _compose_under_eyes(inner) -> bytes:
|
|
"""Draw two cyan eyes across the top and place ``inner`` (a QR / image) in the
|
|
area BELOW them, then encode for the mask. Keeps the panel looking like a face
|
|
with a code under the eyes instead of a full-screen QR."""
|
|
_ensure_mask_path()
|
|
import colorface as cf
|
|
from PIL import Image, ImageDraw
|
|
W, H = cf.DISPLAY_W, cf.DISPLAY_H
|
|
inner = inner.convert("RGB")
|
|
iw, ih = inner.size
|
|
# keep the code a small badge under the eyes (~70% of the space below them)
|
|
target = max(20, int(min(W, H - _EYE_BAND - 1) * 0.72))
|
|
if iw <= target and ih <= target:
|
|
s = max(1, min(target // iw, target // ih)) # crisp integer up-scale (QR)
|
|
nw, nh = iw * s, ih * s
|
|
else:
|
|
s = min(target / iw, target / ih) # scale big images down
|
|
nw, nh = max(1, int(iw * s)), max(1, int(ih * s))
|
|
inner = inner.resize((nw, nh), Image.NEAREST)
|
|
canvas = Image.new("RGB", (W, H), (0, 0, 0))
|
|
g = ImageDraw.Draw(canvas)
|
|
eye = cf.DEFAULT_EYE
|
|
for cx in (W // 2 - 10, W // 2 + 10): # two eyes at the top
|
|
g.ellipse([cx - 5, 3, cx + 5, 13], fill=(255, 255, 255))
|
|
g.ellipse([cx - 3, 5, cx + 3, 11], fill=eye)
|
|
g.ellipse([cx - 1, 7, cx + 1, 10], fill=(0, 0, 0))
|
|
x = (W - nw) // 2
|
|
y = _EYE_BAND + (H - _EYE_BAND - nh) // 2
|
|
canvas.paste(inner, (max(0, x), max(_EYE_BAND, y)))
|
|
return cf.encode(canvas)
|
|
|
|
|
|
def _qr_bytes(url: str) -> bytes:
|
|
"""Render a QR for ``url`` FULL-SCREEN with the largest crisp (integer) module
|
|
size the 46-wide panel allows — the only way it has any chance of scanning.
|
|
Only a ~version-1 QR (<=17 chars) reaches ~2 px/module; longer data is denser
|
|
and won't scan. Returns (bytes, qr_version)."""
|
|
_ensure_mask_path()
|
|
import qrcode
|
|
from PIL import Image
|
|
import colorface as cf
|
|
W, H = cf.DISPLAY_W, cf.DISPLAY_H
|
|
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_L,
|
|
box_size=1, border=1)
|
|
qr.add_data(url)
|
|
qr.make(fit=True)
|
|
q = qr.make_image(fill_color=(255, 255, 255),
|
|
back_color=(0, 0, 0)).convert("RGB")
|
|
scale = max(1, min(W, H) // max(1, q.width)) # largest integer that fits
|
|
if scale > 1:
|
|
q = q.resize((q.width * scale, q.width * scale), Image.NEAREST)
|
|
canvas = Image.new("RGB", (W, H), (0, 0, 0))
|
|
canvas.paste(q, ((W - q.width) // 2, (H - q.height) // 2))
|
|
return cf.encode(canvas)
|
|
|
|
|
|
def _image_bytes(img) -> bytes:
|
|
"""Show an uploaded QR/image FULL-SCREEN, crisp (NEAREST) — best effort."""
|
|
_ensure_mask_path()
|
|
import colorface as cf
|
|
from PIL import Image
|
|
W, H = cf.DISPLAY_W, cf.DISPLAY_H
|
|
s = min(W, H)
|
|
img = img.convert("RGB").resize((s, s), Image.NEAREST)
|
|
canvas = Image.new("RGB", (W, H), (0, 0, 0))
|
|
canvas.paste(img, ((W - s) // 2, (H - s) // 2))
|
|
return cf.encode(canvas)
|
|
|
|
|
|
def show_social_on_mask(account: str) -> dict:
|
|
"""Show the account's **scannable** QR on the mask — a version-1 QR made from
|
|
a short (da.gd) link that redirects to the Instagram profile. Shared by the
|
|
dashboard button and the Gemini ``show_social`` tool. Raises for an unknown
|
|
account; propagates FaceController errors (e.g. not connected)."""
|
|
acc = SOCIAL.get(str(account).strip().lower())
|
|
if not acc:
|
|
raise HTTPException(status_code=404, detail="unknown account")
|
|
data = _qr_bytes(acc.get("short") or acc["url"]) # v1 short link -> scannable
|
|
mf = _get_face()
|
|
res = mf.show_scratch_image(data)
|
|
log.info("showing scannable social QR on mask: %s (%s)", acc["handle"], acc.get("short"))
|
|
return {"ok": True, "handle": acc["handle"], "scannable": True, **(res or {})}
|
|
|
|
|
|
@router.get("/social")
|
|
async def list_social():
|
|
return {"accounts": [{"id": k, "handle": v["handle"]} for k, v in SOCIAL.items()]}
|
|
|
|
|
|
def _friendly(exc: Exception) -> HTTPException:
|
|
"""Map FaceController errors to clean HTTP responses (esp. the common
|
|
'mask not connected' — usually the mask is off / far / held by the phone app)."""
|
|
if isinstance(exc, HTTPException):
|
|
return exc
|
|
msg = str(exc)
|
|
if "not connected" in msg or "not started" in msg or "MASK" in msg:
|
|
return HTTPException(status_code=503, detail=(
|
|
"Mask not connected — power it on, bring it close to the robot, and "
|
|
"free it from the phone app."))
|
|
log.exception("mask scratch op failed")
|
|
return HTTPException(status_code=500, detail="%s: %s" % (type(exc).__name__, msg))
|
|
|
|
|
|
@router.post("/social/{account}")
|
|
async def show_social(account: str):
|
|
try:
|
|
return await asyncio.to_thread(show_social_on_mask, account)
|
|
except Exception as exc:
|
|
raise _friendly(exc)
|
|
|
|
|
|
@router.post("/qr")
|
|
async def upload_qr(file: UploadFile = File(...)):
|
|
"""Upload an image (a QR you generated, or any picture) and show it on the mask."""
|
|
raw = await file.read()
|
|
if not raw:
|
|
raise HTTPException(status_code=400, detail="empty upload")
|
|
from PIL import Image
|
|
try:
|
|
img = Image.open(io.BytesIO(raw))
|
|
img.load()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="not a valid image")
|
|
try:
|
|
data = await asyncio.to_thread(_image_bytes, img)
|
|
mf = _get_face()
|
|
return await asyncio.to_thread(mf.show_scratch_image, data)
|
|
except Exception as exc:
|
|
raise _friendly(exc)
|
|
|
|
|
|
@router.post("/face/resume")
|
|
async def resume_face():
|
|
"""Stop showing the scratch image and resume the animated face."""
|
|
mf = _get_face()
|
|
return await asyncio.to_thread(mf.set_expression, None)
|
|
|
|
|
|
@router.post("/face/mouth")
|
|
async def face_mouth(hidden: bool = Query(...)):
|
|
"""Show (hidden=false) or hide (hidden=true) the mouth on the animated face."""
|
|
mf = _get_face()
|
|
return await asyncio.to_thread(mf.set_mouth_hidden, hidden)
|
|
|
|
|
|
@router.post("/link")
|
|
async def face_link(on: bool = Query(...)):
|
|
"""Link (on=true) / unlink (on=false) Gemini <-> the mask.
|
|
|
|
ON connects the mask + lets Gemini drive its emotions/social.
|
|
OFF tears the link down (no BLE churn) and Gemini stops touching the mask.
|
|
Default state is OFF. Runs in a thread — a link-on may briefly block while it
|
|
makes its first connect attempt."""
|
|
mf = _get_face()
|
|
return await asyncio.to_thread(mf.set_gemini_linked, on)
|
|
|
|
|
|
# ── saved QR library ────────────────────────────────────────────────
|
|
# Upload QR/images, save them by name, list/show/delete them. Stored as PNGs
|
|
# under data/qr_codes so they persist across restarts.
|
|
|
|
_QR_DIR = None
|
|
|
|
|
|
def _qr_dir() -> Path:
|
|
global _QR_DIR
|
|
if _QR_DIR is None:
|
|
try:
|
|
from Project.Sanad.config import BASE_DIR
|
|
base = Path(BASE_DIR)
|
|
except Exception:
|
|
base = Path(__file__).resolve().parents[2]
|
|
_QR_DIR = base / "data" / "qr_codes"
|
|
_QR_DIR.mkdir(parents=True, exist_ok=True)
|
|
return _QR_DIR
|
|
|
|
|
|
def _safe_name(name: str) -> str:
|
|
n = re.sub(r"[^A-Za-z0-9_.-]", "_", (name or "").strip())[:40].strip("._")
|
|
return n or "qr"
|
|
|
|
|
|
@router.post("/qr/save")
|
|
async def qr_save(name: str = Query(...), file: UploadFile = File(...)):
|
|
"""Save an uploaded QR/image into the library under ``name``."""
|
|
raw = await file.read()
|
|
if not raw:
|
|
raise HTTPException(status_code=400, detail="empty upload")
|
|
from PIL import Image
|
|
try:
|
|
img = Image.open(io.BytesIO(raw))
|
|
img.load()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="not a valid image")
|
|
sn = _safe_name(name)
|
|
await asyncio.to_thread(img.convert("RGB").save, str(_qr_dir() / (sn + ".png")))
|
|
return {"ok": True, "name": sn}
|
|
|
|
|
|
@router.post("/qr/save_link")
|
|
async def qr_save_link(name: str = Query(...), url: str = Query(...)):
|
|
"""Generate a QR from ``url`` and save it to the library. Returns the QR
|
|
version + whether it's short enough to actually scan on the mask (version 1)."""
|
|
u = (url or "").strip()
|
|
if not u:
|
|
raise HTTPException(status_code=400, detail="empty url")
|
|
_ensure_mask_path()
|
|
import qrcode
|
|
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_L,
|
|
box_size=10, border=2)
|
|
qr.add_data(u)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill_color=(255, 255, 255),
|
|
back_color=(0, 0, 0)).convert("RGB")
|
|
sn = _safe_name(name or u)
|
|
await asyncio.to_thread(img.save, str(_qr_dir() / (sn + ".png")))
|
|
return {"ok": True, "name": sn, "version": qr.version,
|
|
"scannable_on_mask": qr.version <= 1,
|
|
"note": ("scannable" if qr.version <= 1 else
|
|
"too dense to scan on the mask — use a shorter link")}
|
|
|
|
|
|
@router.get("/qr/library")
|
|
async def qr_library():
|
|
"""List the saved QR names."""
|
|
return {"qr": sorted(p.stem for p in _qr_dir().glob("*.png"))}
|
|
|
|
|
|
@router.get("/qr/thumb/{name}")
|
|
async def qr_thumb(name: str):
|
|
"""Serve a saved QR image (for the dashboard thumbnail)."""
|
|
p = _qr_dir() / (_safe_name(name) + ".png")
|
|
if not p.exists():
|
|
raise HTTPException(status_code=404, detail="not found")
|
|
return FileResponse(str(p), media_type="image/png")
|
|
|
|
|
|
@router.post("/qr/show/{name}")
|
|
async def qr_show(name: str):
|
|
"""Show a saved QR (under the eyes) on the mask."""
|
|
p = _qr_dir() / (_safe_name(name) + ".png")
|
|
if not p.exists():
|
|
raise HTTPException(status_code=404, detail="not found")
|
|
from PIL import Image
|
|
try:
|
|
img = Image.open(p)
|
|
data = await asyncio.to_thread(_image_bytes, img)
|
|
mf = _get_face()
|
|
return await asyncio.to_thread(mf.show_scratch_image, data)
|
|
except Exception as exc:
|
|
raise _friendly(exc)
|
|
|
|
|
|
@router.delete("/qr/{name}")
|
|
async def qr_delete(name: str):
|
|
"""Delete a saved QR from the library."""
|
|
p = _qr_dir() / (_safe_name(name) + ".png")
|
|
if p.exists():
|
|
p.unlink()
|
|
return {"ok": True, "deleted": _safe_name(name)}
|
|
|
|
|
|
# ── saved TEXT library ──────────────────────────────────────────────
|
|
# Save words/phrases and scroll any of them across the mask on demand.
|
|
|
|
_TEXT_DIR = None
|
|
|
|
|
|
def _text_dir() -> Path:
|
|
global _TEXT_DIR
|
|
if _TEXT_DIR is None:
|
|
try:
|
|
from Project.Sanad.config import BASE_DIR
|
|
base = Path(BASE_DIR)
|
|
except Exception:
|
|
base = Path(__file__).resolve().parents[2]
|
|
_TEXT_DIR = base / "data" / "mask_texts"
|
|
_TEXT_DIR.mkdir(parents=True, exist_ok=True)
|
|
return _TEXT_DIR
|
|
|
|
|
|
@router.post("/texts/save")
|
|
async def text_save(text: str = Query(...), name: str = Query("")):
|
|
"""Save a word/phrase to the text library (name defaults to the text)."""
|
|
t = (text or "").strip()[:200]
|
|
if not t:
|
|
raise HTTPException(status_code=400, detail="empty text")
|
|
nm = _safe_name(name or t)
|
|
await asyncio.to_thread((_text_dir() / (nm + ".txt")).write_text, t)
|
|
return {"ok": True, "name": nm, "text": t}
|
|
|
|
|
|
@router.get("/texts/library")
|
|
async def text_library():
|
|
"""List the saved texts."""
|
|
out = []
|
|
for p in sorted(_text_dir().glob("*.txt")):
|
|
try:
|
|
out.append({"name": p.stem, "text": p.read_text()[:80]})
|
|
except Exception:
|
|
pass
|
|
return {"texts": out}
|
|
|
|
|
|
@router.post("/texts/show/{name}")
|
|
async def text_show(name: str):
|
|
"""Scroll a saved text across the mask."""
|
|
p = _text_dir() / (_safe_name(name) + ".txt")
|
|
if not p.exists():
|
|
raise HTTPException(status_code=404, detail="not found")
|
|
txt = p.read_text()
|
|
mf = _get_face()
|
|
try:
|
|
return await asyncio.to_thread(mf.set_text, txt, (255, 255, 255), None, None, 38)
|
|
except Exception as exc:
|
|
raise _friendly(exc)
|
|
|
|
|
|
@router.delete("/texts/{name}")
|
|
async def text_delete(name: str):
|
|
"""Delete a saved text."""
|
|
p = _text_dir() / (_safe_name(name) + ".txt")
|
|
if p.exists():
|
|
p.unlink()
|
|
return {"ok": True, "deleted": _safe_name(name)}
|