Sanadv3/face/mask_face.py

802 lines
37 KiB
Python

"""Shining LED face mask — Sanad subsystem (BLE, owns its own asyncio loop).
Wraps the standalone **Mask** project (``Project/Mask`` — the flat ``shiningmask``
library: ``mask.py`` / ``faceanim.py`` / ``colorface.py`` …) as a Sanad subsystem
so the dashboard "Mask Face" tab can drive the robot's animated LED face.
Why a dedicated loop: the mask talks BLE (bleak/BlueZ) and ``FaceAnimator`` runs a
persistent asyncio task, so this controller owns a background daemon thread with
its own event loop. Route handlers call the plain SYNC methods here (themselves
wrapped in ``asyncio.to_thread`` by FastAPI); each marshals a coroutine onto that
loop via ``run_coroutine_threadsafe``.
The Mask project is a flat set of top-level modules (not an installed package), so
it is imported by inserting its directory on ``sys.path``. Default location is the
sibling ``<Project>/Mask``; override with ``SANAD_MASK_DIR`` or
``config/mask_config.json``. Needs an env with ``bleak`` + ``Pillow`` (g1_env). If
those are missing the subsystem still constructs but reports unavailable, and the
rest of Sanad is unaffected (the dashboard tab shows the reason).
"""
from __future__ import annotations
import asyncio
import os
import sys
import threading
from pathlib import Path
from typing import Optional, Sequence, Tuple
from Project.Sanad.config import BASE_DIR
from Project.Sanad.core import config_loader
from Project.Sanad.core.logger import get_logger
log = get_logger("mask_face")
Color = Tuple[int, int, int]
# Named frames provided by colorface.default_frames() (FaceAnimator slots).
EXPRESSIONS = ("neutral", "smile", "blink", "look_left", "look_right",
"talk1", "talk2", "talk3", "surprised", "sad", "wink", "angry",
"heart", "laugh", "love", "cool", "confused", "kiss", "thumbs_up")
# Default face colors (match colorface.DEFAULT_EYE / DEFAULT_MOUTH).
DEFAULT_EYE_COLOR: Color = (0, 230, 255) # cyan
DEFAULT_MOUTH_COLOR: Color = (255, 50, 50) # red
DEFAULT_SCLERA_COLOR: Color = (255, 255, 255) # white of the eye
def _parse_color(value, default: Color) -> Color:
"""Coerce a config/API color (``[r,g,b]``, ``(r,g,b)``, or ``"#rrggbb"``) to a
clamped RGB tuple; fall back to ``default`` on anything unusable."""
if value in (None, ""):
return tuple(default)
try:
if isinstance(value, str):
h = value.strip().lstrip("#")
if len(h) == 3:
h = "".join(c * 2 for c in h)
value = (int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16))
r, g, b = (int(value[0]), int(value[1]), int(value[2]))
return (max(0, min(255, r)), max(0, min(255, g)), max(0, min(255, b)))
except Exception:
return tuple(default)
class FaceController:
"""Owns the BLE mask connection + FaceAnimator on a private event loop."""
def __init__(self):
cfg = config_loader.load("mask")
def _cfg(key, default):
v = cfg.get(key, default)
return v if v not in (None, "") else default
mask_dir = os.environ.get("SANAD_MASK_DIR") or _cfg("mask_dir", "")
if not mask_dir:
# Default: the sibling Mask project (…/Project/Mask).
mask_dir = str(Path(BASE_DIR).parent / "Mask")
self.mask_dir = mask_dir
self.address = (os.environ.get("SANAD_MASK_ADDRESS") or _cfg("address", "")) or None
self.name_prefix = os.environ.get("SANAD_MASK_NAME_PREFIX") or _cfg("name_prefix", "MASK")
self.adapter = (os.environ.get("SANAD_MASK_ADAPTER") or _cfg("adapter", "")) or None
self.brightness = int(_cfg("brightness", 95))
self.fps = float(_cfg("fps", 8.0))
self.connect_timeout = float(_cfg("connect_timeout", 15.0))
self.connect_attempts = int(_cfg("connect_attempts", 5))
# Use the lifelike motion driver (saccades, varied blinks, states,
# reactions, smooth lip-sync). Falls back to the basic FaceAnimator if
# the lifelike module is unavailable or this is set false.
self.lifelike = bool(_cfg("lifelike", True))
self._face_kind = None
self._hide_mouth = bool(_cfg("hide_mouth", False)) # eyes-only face toggle
# Gemini<->mask link. Default OFF: the mask does NOT auto-connect (no BLE
# churn) and Gemini's emotion/social markers are ignored. Turned on from
# the dashboard, it connects the mask + lets Gemini drive it.
self._gemini_linked = bool(_cfg("gemini_linked", False))
# Auto-connect + start the animated face on boot (best-effort, in the
# background so it never blocks startup). After the one-time frame
# upload, later boots just connect + animate (no upload).
self.autostart = bool(_cfg("autostart", True))
# Face colors (baked into the uploaded DIY frames). Stored as RGB lists in
# config; changing them re-uploads the frame set (face_start reload).
self.eye_color = _parse_color(_cfg("eye_color", None), DEFAULT_EYE_COLOR)
self.mouth_color = _parse_color(_cfg("mouth_color", None), DEFAULT_MOUTH_COLOR)
self.sclera_color = _parse_color(_cfg("sclera_color", None), DEFAULT_SCLERA_COLOR)
# runtime state
self._mask = None # shiningmask.ShiningMask
self._face = None # faceanim.FaceAnimator
self._lib: Optional[dict] = None
self._lib_failed = False
self._connecting = False
self._face_running = False
self._speaking = False
self._mouth: Optional[int] = None
self._last_error: Optional[str] = None
self._op_lock = threading.Lock()
# Desired-state intents the reconnect supervisor enforces:
# _want_connected — we want a live BLE link (set on connect/autostart,
# cleared on a *user* disconnect). While true, the
# supervisor keeps (re)connecting through drops / weak
# signal until it succeeds.
# _face_desired — the animated face should be running (set on
# face_start, cleared on face_stop / static overrides
# like text/image). After a reconnect the supervisor
# restarts the face iff this is true.
self._want_connected = False
self._face_desired = False
self._reconnecting = False
# dedicated event loop in a background daemon thread (idle until used)
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._run_loop, daemon=True,
name="mask-face-loop")
self._thread.start()
log.info("FaceController ready (mask_dir=%s, name_prefix=%s, address=%s)",
self.mask_dir, self.name_prefix, self.address or "scan")
# Persistent reconnect supervisor: self-heals dropped/weak links and even
# establishes the FIRST connection once the mask comes into range, without
# the user babysitting the Connect button.
threading.Thread(target=self._supervisor, daemon=True,
name="mask-supervisor").start()
# Only auto-connect on boot if Gemini is linked (default off -> the mask
# stays disconnected + silent until the user links it from the dashboard).
if self.autostart and self._gemini_linked:
threading.Thread(target=self._autostart, daemon=True,
name="mask-autostart").start()
def _clear_stale_mask_links(self) -> None:
"""Drop any BlueZ-level connection to a MASK device left over from a
previous process, BEFORE the first connect.
A hard service restart leaves the old link half-open: BlueZ still
reports the mask "connected" so it stops advertising, our scan can't
find it, and the fresh connect churns with 'Software caused connection
abort' / 'failed to discover services' for minutes before BlueZ times
the stale link out — flashing the mask's built-in face the whole time
(this, not WiFi/coexistence, is what makes the weird face appear on a
restart/boot). Disconnecting it first lets the mask advertise again, so
the new connect is clean and immediate. Fully guarded + bounded — a
missing bluetoothctl or any error is a no-op, never blocking startup."""
try:
import subprocess as _sp
import time as _time
out = _sp.run(["bluetoothctl", "devices"], capture_output=True,
text=True, timeout=5).stdout or ""
prefix = (self.name_prefix or "MASK").upper()
cleared = False
for line in out.splitlines():
parts = line.split()
# "Device C3:8A:9B:05:B4:C9 MASK-05B4C9"
if (len(parts) >= 3 and parts[0] == "Device"
and parts[2].upper().startswith(prefix)):
addr = parts[1]
_sp.run(["bluetoothctl", "disconnect", addr],
capture_output=True, text=True, timeout=8)
log.info("cleared stale BlueZ link to %s (%s) before first connect",
parts[2], addr)
cleared = True
if cleared:
_time.sleep(1.5) # let the mask resume advertising before we scan
except Exception as exc:
log.debug("stale mask-link cleanup skipped: %s", exc)
def _autostart(self):
"""Best-effort connect + start the face on boot (runs on its own thread,
so a missing/asleep mask never blocks or breaks Sanad startup). Declares
the connect+face *intent* up front, so even if the mask is off / out of
range at boot, the reconnect supervisor keeps trying and brings the face
up on its own once the mask appears — no dashboard babysitting."""
import time as _time
_time.sleep(4.0) # let the rest of Sanad finish booting first
if not self.lib_available:
log.warning("mask autostart skipped — Mask lib unavailable "
"(need bleak + Pillow in this conda env)")
return
self._want_connected = True
self._face_desired = True
self._clear_stale_mask_links() # drop any half-open link from a prior process
try:
self.connect()
except Exception as exc:
log.warning("mask autostart: connect failed (%s) — the supervisor will "
"keep retrying; or connect from the dashboard", exc)
return
try:
self.face_start(reload=False)
log.info("mask autostart: animated face running (driver=%s)", self._face_kind)
except Exception:
log.exception("mask autostart: face_start failed")
def _supervisor(self):
"""Background daemon that enforces the connect/face *intents*.
While ``_want_connected`` is set it keeps (re)establishing the BLE link
through drops and weak-signal scan misses; once connected, if the face is
desired but not running (e.g. after a reconnect) it restarts it. A user
Disconnect clears the intent so this stops fighting a deliberate
disconnect. Each attempt reuses the normal serialized connect()/
face_start() paths, so there are no new locking hazards — only retries."""
import time as _time
backoff = 3.0
while True:
_time.sleep(backoff)
try:
if not self._want_connected or self._connecting:
backoff = 3.0
continue
if self.is_connected:
backoff = 3.0
# Link is up — restore the face if it's wanted but stopped
# (e.g. the face loop bailed on a drop the supervisor healed).
if self._face_desired and not self._face_running:
try:
self.face_start(reload=False)
log.info("mask supervisor: face restored")
except Exception as exc:
log.debug("mask supervisor: face restore failed (%s)", exc)
backoff = 5.0
continue
# Want a link but don't have one -> reconnect (short, then loop).
self._reconnecting = True
try:
self.connect(timeout=12.0, attempts=2)
log.info("mask supervisor: link (re)established")
if self._face_desired:
self.face_start(reload=False)
backoff = 3.0
except Exception as exc:
# Keep trying with a gentle backoff (weak signal / mask off).
log.debug("mask supervisor: reconnect attempt failed (%s)", exc)
# 'Software caused connection abort' / 'device disconnected'
# is usually a half-open BlueZ link from the drop: the mask
# still shows "connected" so it stops advertising and the next
# scan can't find it. Clearing it lets the mask re-advertise.
m = str(exc).lower()
if any(s in m for s in ("abort", "disconnect", "not connected",
"discover services")):
try:
self._clear_stale_mask_links()
except Exception:
pass
backoff = min(backoff * 1.5, 20.0)
finally:
self._reconnecting = False
except Exception:
log.exception("mask supervisor loop error")
backoff = 5.0
# -- loop plumbing --------------------------------------------------------
def _run_loop(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def _submit(self, coro, timeout: float = 30.0):
"""Run a coroutine on the mask loop from a caller thread, blocking."""
fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
return fut.result(timeout=timeout)
# -- lazy import of the flat Mask library ---------------------------------
def _ensure_lib(self) -> dict:
if self._lib is not None:
return self._lib
if self._lib_failed:
raise RuntimeError(self._last_error or "mask library unavailable")
if self.mask_dir and self.mask_dir not in sys.path:
sys.path.insert(0, self.mask_dir)
try:
import mask as _mask
import faceanim as _faceanim
import colorface as _colorface
import constants as _constants
except Exception as exc:
self._lib_failed = True
self._last_error = f"mask library import failed: {exc}"
log.exception("Mask library import failed (dir=%s) — is bleak/Pillow "
"installed (g1_env)?", self.mask_dir)
raise RuntimeError(self._last_error)
try:
from Project.Sanad.face.face_motion import LifelikeFace as _LifelikeFace
except Exception:
_LifelikeFace = None
log.warning("LifelikeFace unavailable — falling back to FaceAnimator")
self._lib = {
"ShiningMask": _mask.ShiningMask,
"FaceAnimator": _faceanim.FaceAnimator,
"LifelikeFace": _LifelikeFace,
"colorface": _colorface,
"TextMode": _constants.TextMode,
}
log.info("Mask library imported from %s", self.mask_dir)
return self._lib
@property
def lib_available(self) -> bool:
if self._lib is not None:
return True
if self._lib_failed:
return False
try:
self._ensure_lib()
return True
except Exception:
return False
@property
def is_connected(self) -> bool:
return bool(self._mask is not None and getattr(self._mask, "is_connected", False))
def _require_connected(self):
if not self.is_connected:
raise RuntimeError("mask not connected")
# -- status ---------------------------------------------------------------
def status(self) -> dict:
return {
"lib_available": self.lib_available,
"connected": self.is_connected,
"connecting": self._connecting,
"reconnecting": self._reconnecting and not self.is_connected,
"want_connected": self._want_connected,
"face_running": self._face_running and self.is_connected,
"face_desired": self._face_desired,
"driver": self._face_kind,
"lifelike": self.lifelike,
"autostart": self.autostart,
"gemini_linked": self._gemini_linked,
"hide_mouth": self._hide_mouth,
"speaking": self._speaking,
"mouth": self._mouth,
"brightness": self.brightness,
"eye_color": list(self.eye_color),
"mouth_color": list(self.mouth_color),
"sclera_color": list(self.sclera_color),
"fps": self.fps,
"address": self.address,
"name_prefix": self.name_prefix,
"adapter": self.adapter,
"mask_dir": self.mask_dir,
"expressions": list(EXPRESSIONS),
"last_error": self._last_error,
}
# -- connection -----------------------------------------------------------
def connect(self, timeout: Optional[float] = None, attempts: Optional[int] = None) -> dict:
# Serialize the whole connect under _op_lock so it can't interleave with a
# concurrent disconnect()/face_start() swapping self._mask underneath, and
# so _connecting is set and cleared under the same lock (status() reads it).
with self._op_lock:
if self._connecting:
raise RuntimeError("a connect is already in progress")
self._connecting = True
to = float(timeout) if timeout else self.connect_timeout
at = int(attempts) if attempts else self.connect_attempts
self._last_error = None
try:
self._ensure_lib()
self._submit(self._aconnect(to, at), timeout=to * at + 15.0)
self._want_connected = True # intent: supervisor keeps it alive
except Exception as exc:
self._last_error = str(exc)
raise RuntimeError(str(exc))
finally:
self._connecting = False
return self.status()
async def _aconnect(self, timeout: float, attempts: int):
if self.is_connected:
return
lib = self._lib
# Tear down any stale mask from a previous (now-dropped) session BEFORE
# building a fresh one. A reconnect after a silent BLE drop leaves the old
# BleakClient holding a half-open BlueZ connection to the SAME device; if
# we just overwrite self._mask the old client is never disconnected at the
# BlueZ level, the OS keeps the device "connected", and the new
# BleakClient.connect() to that address hangs/refuses. Disconnect (and
# drop) the old client first so the fresh connect starts from a clean
# BlueZ state.
old = self._mask
self._mask = None
if old is not None:
try:
await old.disconnect()
except Exception:
log.exception("stale mask.disconnect() before reconnect failed")
self._mask = lib["ShiningMask"](
address=self.address, name_prefix=self.name_prefix, adapter=self.adapter)
await self._mask.connect(timeout=timeout, attempts=attempts)
def disconnect(self) -> dict:
# Clear the intents FIRST (before the lock) so the supervisor won't race
# to re-establish a link the user is deliberately tearing down.
self._want_connected = False
self._face_desired = False
with self._op_lock:
self._stop_face()
if self._mask is not None:
try:
self._submit(self._mask.disconnect(), timeout=10.0)
except Exception:
log.exception("mask.disconnect() failed")
return self.status()
def set_gemini_linked(self, on: bool) -> dict:
"""Link / unlink Gemini <-> the mask.
ON -> declare intent to hold the BLE link + run the face; the supervisor
connects (and self-heals) in the background, and Gemini's emotion /
social markers are relayed to the mask.
OFF -> tear the link down + clear the intent, so the mask stops any BLE
churn and Gemini's markers are ignored (the voice is unaffected).
Default is OFF: the mask stays silent + disconnected until the user
links it from the dashboard."""
on = bool(on)
self._gemini_linked = on
if not on:
# Deliberate teardown; disconnect() also clears _want_connected /
# _face_desired so the supervisor stops trying to reconnect.
self.disconnect()
return {"ok": True, "linked": False, "connected": self.is_connected}
# Linking: declare intent, then one quick connect attempt so the common
# "mask nearby" case comes up immediately; the supervisor keeps retrying
# (weak signal / mask still off) so we never block the caller for long.
self._want_connected = True
self._face_desired = True
if not self.is_connected and not self._connecting:
try:
self._clear_stale_mask_links()
except Exception:
pass
try:
self.connect(timeout=10.0, attempts=1)
self.face_start(reload=False)
except Exception as exc:
log.info("link-on: mask not up yet, supervisor will retry (%s)", exc)
elif self.is_connected and not self._face_running:
try:
self.face_start(reload=False)
except Exception:
pass
return {"ok": True, "linked": True, "connected": self.is_connected}
# -- simple commands ------------------------------------------------------
def set_brightness(self, level: int) -> dict:
# Hardware range is 0-128 (config/mask_config.json: "0-128. Keep <=100 to
# avoid LED flicker"); reject/clamp values above the panel's real maximum
# rather than forwarding 129-255 to the mask.
level = max(0, min(128, int(level)))
with self._op_lock:
self._require_connected()
self._submit(self._mask.set_brightness(level))
self.brightness = level
face = self._face
if face is not None:
face.brightness = level
return {"ok": True, "brightness": level}
def set_text(self, text: str, color: Color = (255, 255, 255),
mode: Optional[int] = None, bg: Optional[Color] = None,
speed: Optional[int] = None) -> dict:
with self._op_lock:
self._require_connected()
self._face_desired = False # static override — don't auto-restart the face
self._stop_face() # static text can't share the panel with the animator
tm = self._lib["TextMode"]
m = int(mode) if mode is not None else tm.SCROLL_LEFT
kw = {}
if speed is not None:
kw["speed"] = max(0, min(255, int(speed)))
self._submit(self._mask.set_text(str(text), color=tuple(color), mode=m, **kw),
timeout=20.0)
if bg is not None:
# Apply a custom background AFTER set_text (which forces black by default).
self._submit(self._mask.set_background_color(*tuple(bg)), timeout=10.0)
return {"ok": True}
def show_image(self, image_id: int) -> dict:
with self._op_lock:
self._require_connected()
self._face_desired = False # static override
self._stop_face()
self._submit(self._mask.show_image(int(image_id)))
return {"ok": True, "image_id": int(image_id)}
def play_animation(self, anim_id: int) -> dict:
with self._op_lock:
self._require_connected()
self._face_desired = False # static override
self._stop_face()
self._submit(self._mask.play_animation(int(anim_id)))
return {"ok": True, "anim_id": int(anim_id)}
def clear_diy(self) -> dict:
with self._op_lock:
self._require_connected()
self._stop_face() # stop the loop before deleting the frames it plays
removed = self._submit(self._mask.clear_diy(), timeout=30.0)
return {"ok": True, "removed": int(removed or 0)}
# -- animated face --------------------------------------------------------
def _stop_face(self):
"""Cancel the animator loop (if any) and reset face state. Idempotent.
Lock-free internal: callers MUST hold ``self._op_lock`` (it mutates the
shared self._face / self._face_running state that the serialized mask
operations and the event-bus callbacks both touch)."""
if self._face is not None:
try:
self._submit(self._face.stop(), timeout=10.0)
except Exception:
log.exception("face.stop() failed")
self._face = None
self._face_running = False
self._speaking = False
self._mouth = None
def face_start(self, reload: bool = False) -> dict:
with self._op_lock:
self._require_connected()
# Always tear down any existing loop first so a second Start (or
# Reload) never leaves two animator tasks fighting over the display.
# Serialized under _op_lock so two concurrent Start presses can't both
# build an animator and race self._face.
self._stop_face()
cf = self._lib["colorface"]
Lifelike = self._lib.get("LifelikeFace") if self.lifelike else None
if Lifelike is not None:
# Rich driver: eye saccades, varied blinks, states, reactions,
# smooth lip-sync. Runs its own loop on this controller's BLE loop.
# auto_reconnect=False -> the controller's supervisor owns recovery.
self._face = Lifelike(mask=self._mask, brightness=self.brightness,
eye_color=self.eye_color, mouth_color=self.mouth_color,
sclera_color=self.sclera_color, auto_reconnect=False,
hide_mouth=self._hide_mouth)
self._face_kind = "lifelike"
else:
self._face = self._lib["FaceAnimator"](
self._mask, fps=self.fps, brightness=self.brightness,
frames=cf.default_frames(eye_color=self.eye_color,
mouth_color=self.mouth_color,
sclera_color=self.sclera_color))
self._face_kind = "faceanim"
# First upload of the frame set can take ~30-90s (acked writes); later
# starts skip it (frames persist on the mask's flash).
self._submit(self._face.start(reload=bool(reload)), timeout=240.0)
self._face_running = True
self._face_desired = True # intent: supervisor restores it after a drop
self._want_connected = True
return {"ok": True, "reloaded": bool(reload), "driver": self._face_kind}
def face_stop(self) -> dict:
with self._op_lock:
self._face_desired = False # user stopped it — don't auto-restart
self._stop_face()
return {"ok": True}
def return_face(self) -> dict:
"""Resume the live animated face (e.g. after a text/image/anim override)."""
self._face_desired = True
return self.face_start(reload=False)
def set_face_color(self, eye=None, mouth=None, sclera=None) -> dict:
"""Recolor the animated face. Colors are baked into the uploaded DIY
frames, so this stores them (persisted to config) and — if the face is
running — re-uploads the frame set in the new colors (~30-90s)."""
if eye is not None:
self.eye_color = _parse_color(eye, self.eye_color)
if mouth is not None:
self.mouth_color = _parse_color(mouth, self.mouth_color)
if sclera is not None:
self.sclera_color = _parse_color(sclera, self.sclera_color)
self._save_colors()
reuploaded = False
if self.is_connected and self._face_desired:
self.face_start(reload=True) # rebuild frames in the new colors
reuploaded = True
return {"ok": True, "reuploaded": reuploaded,
"eye_color": list(self.eye_color),
"mouth_color": list(self.mouth_color),
"sclera_color": list(self.sclera_color)}
def _save_colors(self):
"""Persist the chosen face colors to config/mask_config.json (best-effort,
so they survive restarts and drive autostart). Never raises."""
try:
import json
path = Path(BASE_DIR) / "config" / "mask_config.json"
data = json.loads(path.read_text()) if path.exists() else {}
data["eye_color"] = list(self.eye_color)
data["mouth_color"] = list(self.mouth_color)
data["sclera_color"] = list(self.sclera_color)
path.write_text(json.dumps(data, indent=2))
except Exception:
log.exception("could not persist mask face colors (kept in-memory)")
# -- lifelike states + reactions (no-ops on the basic FaceAnimator) --------
def _face_state(self, state: str) -> dict:
# Snapshot the face reference once: face_start/_stop_face (under _op_lock)
# can swap self._face to None concurrently, and these state setters fire
# from the event-bus worker threads. A local snapshot avoids a torn read
# (AttributeError) without blocking on a long face_start upload.
face = self._face
fn = getattr(face, "set_" + state, None) if face is not None else None
if callable(fn):
try:
fn()
except Exception:
log.exception("face.set_%s failed", state)
return {"ok": True, "state": state}
def set_listening(self) -> dict:
return self._face_state("listening")
def set_thinking(self) -> dict:
return self._face_state("thinking")
def set_idle(self) -> dict:
return self._face_state("idle")
def react(self, emotion: str, hold: float = 1.4) -> dict:
"""Brief reaction (surprised / smile / sad). No-op if unsupported."""
face = self._face # snapshot: face_start/_stop_face may swap it concurrently
if face is not None and hasattr(face, "react"):
try:
face.react(str(emotion), float(hold))
except Exception:
log.exception("face.react failed")
return {"ok": True, "react": emotion}
def set_speaking(self, on: bool) -> dict:
"""Animate the mouth while speaking. Safe no-op if the face isn't running.
Also called from the event bus (brain.gestural_speaking_changed)."""
on = bool(on)
self._speaking = on
self._mouth = None
face = self._face # snapshot: avoid a torn read vs a concurrent _stop_face
if face is not None:
try:
face.set_speaking(on)
except Exception:
log.exception("face.set_speaking() failed")
return {"ok": True, "speaking": on}
def set_mouth(self, level: int) -> dict:
level = max(0, min(3, int(level)))
self._mouth = level
self._speaking = False
# Fired from the Gemini reader thread at lip-sync rate; snapshot the face
# so a concurrent face_start/_stop_face swap can't NoneType-deref here.
face = self._face
if face is not None:
try:
face.set_mouth(level)
except Exception:
log.exception("face.set_mouth() failed")
return {"ok": True, "mouth": level}
def show_expression(self, name: str) -> dict:
with self._op_lock:
self._require_connected()
face = self._face
if face is None:
raise RuntimeError("face animation not started")
self._submit(face.show(str(name)), timeout=10.0)
return {"ok": True, "expression": name}
def show_scratch_image(self, data: bytes, timeout: float = 90.0) -> dict:
"""Upload raw 46x58 image bytes to the mask's reserved scratch DIY slot
and hold it on the face (a QR / social / custom image) until the face is
resumed with set_expression(None). Uses the reliable acked image upload."""
with self._op_lock:
self._require_connected()
face = self._face
if face is None:
raise RuntimeError("face animation not started")
slot = int(getattr(face, "scratch_slot", 20))
# Pause the animation loop so its play_diy traffic doesn't disturb the
# acked upload's per-packet REOK acks (else NotificationTimeout). Wait
# for the loop to actually park before uploading (not a fixed sleep).
paused = hasattr(face, "pause")
if paused:
face.pause()
if hasattr(face, "wait_paused"):
face.wait_paused(2.0)
else:
import time as _t
_t.sleep(0.35)
try:
self._submit(self._mask.upload_image(bytes(data), slot, timeout=15.0),
timeout=timeout)
# Register "_scratch" so set_expression holds it on EITHER driver:
# LifelikeFace.set_expression checks .slots, FaceAnimator checks
# .frames — populate both so the fallback driver holds it too.
if hasattr(face, "slots"):
face.slots["_scratch"] = slot
frames = getattr(face, "frames", None)
if isinstance(frames, dict) and "_scratch" not in frames:
frames["_scratch"] = b""
if hasattr(face, "set_expression"):
face.set_expression("_scratch")
finally:
if paused:
face.resume() # loop resumes + holds the "_scratch" frame
return {"ok": True, "slot": slot}
def set_mouth_hidden(self, hidden: bool) -> dict:
"""Show/hide the mouth on the animated face. Re-uploads just the 7 gaze/
talk slots (masked eyes-only, or normal) — pausing the loop so the acked
upload isn't disturbed. Persists for future face starts this session."""
hidden = bool(hidden)
with self._op_lock:
self._hide_mouth = hidden
face = self._face
if (face is None or not self.is_connected
or not hasattr(face, "mouth_frames_for")):
return {"ok": True, "hidden": hidden,
"note": "applies when the face is running"}
frames = face.mouth_frames_for(hidden)
paused = hasattr(face, "pause")
if paused:
face.pause()
if hasattr(face, "wait_paused"):
face.wait_paused(2.0)
try:
for name, data in frames.items():
slot = face.slots.get(name) if hasattr(face, "slots") else None
if slot:
self._submit(self._mask.upload_image(bytes(data), int(slot),
timeout=15.0), timeout=90.0)
if hasattr(face, "frames"):
face.frames[name] = data
if hasattr(face, "hide_mouth"):
face.hide_mouth = hidden
if hasattr(face, "_cur"):
face._cur = None # force a redraw with the new frame
finally:
if paused:
face.resume()
return {"ok": True, "hidden": hidden}
def set_expression(self, name: Optional[str]) -> dict:
"""Hold an expression over the animation (None resumes idle/talk).
Unlike show_expression (a one-off), this pins the frame until cleared —
e.g. 'surprised' on a reaction, 'sad' on an error. Safe no-op if the face
isn't running."""
face = self._face # snapshot: face_start/_stop_face may swap it concurrently
if face is not None:
try:
face.set_expression(name if name else None)
except Exception:
log.exception("face.set_expression() failed")
return {"ok": True, "expression": name}
# -- lifecycle ------------------------------------------------------------
def shutdown(self):
"""Disconnect the mask and stop the background loop (idempotent)."""
try:
self.disconnect()
except Exception:
log.exception("mask disconnect on shutdown failed")
try:
self._loop.call_soon_threadsafe(self._loop.stop)
except Exception:
pass