Sanad_Package_4/vendor/Sanad/face/face_motion.py

600 lines
28 KiB
Python

#!/usr/bin/env python3
"""Lifelike face motion for the Shining LED mask — SanadV3.
A richer, more *organic* driver than the Mask lib's built-in idle. Instead of an
occasional blink/glance, it gives the robot's face the small, constant motion a
real face has:
* natural blinking — varied intervals, occasional quick double-blinks
* frequent small eye saccades (darts) with short gaze holds and drift
* idle micro-expressions (a brief smile now and then)
* state-aware behaviour:
idle — relaxed, wanders, blinks
listening — attentive, eyes mostly forward, fewer darts, soft blinks
thinking — looks away (longer gaze holds), slower blinks
speaking — mouth lip-syncs to audio + the odd mid-sentence blink
* quick reactions: surprised / happy(smile) / sad, held briefly then released
It drives the mask by PLAY of the pre-uploaded DIY frames (no per-frame upload),
so motion is smooth. Lip-sync composes with the eye motion via feed_audio_level().
Run it standalone (keep the mask within ~30 cm for the one-time frame upload):
python3 face/face_motion.py # connect, load frames, stay alive
python3 face/face_motion.py --demo # cycle the states to show the range
python3 face/face_motion.py --reload # force re-upload of the frame set
Integrate into Sanad: construct ``LifelikeFace(mask=<connected ShiningMask>)``
(or let it connect itself), ``await face.start()``, then drive it from the event
bus / Gemini lip-sync markers:
face.set_listening() # when the user starts speaking
face.set_thinking() # while a tool/response is being prepared
face.set_speaking(True/False) # around a spoken reply
face.feed_audio_level(rms_0_to_1) # per audio chunk -> real lip-sync
face.react("surprised" | "smile" | "sad")
face.set_idle() # back to relaxed wandering
"""
from __future__ import annotations
import argparse
import asyncio
import os
import random
import sys
import time
import logging
import threading
from pathlib import Path
log = logging.getLogger("sanad.face_motion")
# Frames present in the mask's DIY slots (colorface.default_frames()).
GAZE = ("neutral", "look_left", "look_right")
MOUTH = ("neutral", "talk1", "talk2", "talk3")
# How long after the last lip-sync marker the face stays "speaking" (mouth
# follows the amplitude; pauses close it). When markers stop for this long the
# turn ends and the eyes return to their underlying state.
_SPEECH_WINDOW = 0.6
# Mouth-frame cadence while speaking. Each frame is a FULL-face DIY slot switch;
# this small mask can't cleanly repaint the LED matrix faster than ~5/s, so the
# old 0.09s (~11/s) cadence overran it and showed torn/scrambled composites of
# several frames at once. Cap it well under the tear threshold — speech visemes
# only change ~4-7/s anyway, so lip-sync still reads fine.
_SPEAK_FRAME_SEC = 0.22
# BLE-link health. If the mask link drops mid-session every play_diy raises a
# BleakError, which left the loop busy-spinning ~20x/s forever (no recovery but a
# manual disconnect/connect). Treat a run of consecutive play failures (or
# mask.is_connected going False) as a dropped link and attempt a *bounded*
# reconnect; if that is exhausted, stop the loop so the face goes idle/unavailable
# instead of hammering a dead transport.
_PLAY_FAIL_LIMIT = 10 # consecutive failed plays before we call it a drop
_RECONNECT_ATTEMPTS = 3 # reconnect tries per detected drop
_RECONNECT_BACKOFF = 2.0 # seconds between reconnect tries
# BLE link keepalive. _play() skips re-sending an *unchanged* frame, so a long
# neutral/idle stretch writes nothing but the occasional blink (every 2-4.5s).
# If that quiet gap (plus an RF glitch) outlasts the link's supervision timeout
# the mask drops — and every reconnect briefly flashes the mask's own built-in
# face. Re-send the current frame at least this often so the link never goes
# quiet long enough to be dropped. One tiny write/sec when idle; free when busy.
_KEEPALIVE_SEC = 1.0
# Frames that carry the animated face's mouth (gaze + lip-sync). "Hide mouth"
# blacks out the mouth region on just these, leaving eyes-only — the emotion
# icons (heart/thumb/…) are left alone.
_MOUTH_FRAMES = ("neutral", "talk1", "talk2", "talk3", "blink",
"look_left", "look_right")
_MOUTH_MASK_TOP = 32 # display-y below which the mouth lives (eyes end ~29)
def _mask_mouth_bytes(data: bytes) -> bytes:
"""Return a copy of an encoded 46x58 frame with the mouth region blacked out
(decode the transposed bytes -> mask display rows >= _MOUTH_MASK_TOP -> re-encode)."""
import colorface as _cf
from PIL import Image as _Image, ImageDraw as _ImageDraw
img = _Image.frombytes("RGB", (_cf.DISPLAY_H, _cf.DISPLAY_W), bytes(data))
img = img.transpose(_Image.Transpose.TRANSPOSE) # -> 46x58 display space
_ImageDraw.Draw(img).rectangle([0, _MOUTH_MASK_TOP, _cf.DISPLAY_W, _cf.DISPLAY_H],
fill=(0, 0, 0))
return _cf.encode(img)
def _add_mask_to_path() -> str:
"""Put the flat Mask library (mask.py / faceanim.py / colorface.py) on sys.path."""
d = os.environ.get("SANAD_MASK_DIR") or str(Path(__file__).resolve().parents[2] / "Mask")
if d and d not in sys.path:
sys.path.insert(0, d)
return d
class LifelikeFace:
"""Organic, state-aware motion driver for the LED mask."""
def __init__(self, mask=None, *, name_prefix="MASK", address=None, adapter=None,
brightness=95, frames=None, eye_color=None, mouth_color=None,
sclera_color=None, auto_reconnect=True, hide_mouth=False):
_add_mask_to_path()
import mask as _mask # flat Mask lib
import faceanim as _faceanim
import colorface as _colorface
self._ShiningMask = _mask.ShiningMask
self._FaceAnimator = _faceanim.FaceAnimator
self._colorface = _colorface
self.mask = mask
self._own_mask = mask is None
self.name_prefix = name_prefix
self.address = address
self.adapter = adapter
self.brightness = int(brightness)
# When False, a dropped link is NOT self-healed here — the loop bails
# cleanly (and forces the transport disconnected) so an external owner
# (FaceController's reconnect supervisor) brings the link + face back.
self._auto_reconnect = bool(auto_reconnect)
# Frame colors: explicit frames win; else build the default set tinted
# with whatever colors were given (None -> the lib defaults cyan/red).
if frames is None:
ck = {}
if eye_color is not None:
ck["eye_color"] = tuple(eye_color)
if mouth_color is not None:
ck["mouth_color"] = tuple(mouth_color)
if sclera_color is not None:
ck["sclera_color"] = tuple(sclera_color)
frames = _colorface.default_frames(**ck)
# Extra Gemini-triggerable emotions (heart, laugh, love-eyes, cool,
# sleepy, confused, kiss, star_struck) in the same style. Appended
# after the base set so slot ids 1..N stay stable for existing
# frames. Guarded: a missing module never breaks the face.
try:
from Project.Sanad.face.emotion_frames import emotion_frames as _emo
# 7 emotions so slots 1..19 hold the face set and slot 20 stays
# free as a scratch slot for QR/social images (mask caps at 20).
frames = {**frames, **_emo(**ck, include={
"heart", "laugh", "love", "cool", "confused", "kiss", "thumbs_up"})}
except Exception:
log.exception("emotion frames unavailable — base frames only")
self.frames = frames
# Reserved DIY slot (just past the animated frames) for on-demand images
# (QR / social) shown via the FaceController's show_scratch_image().
self.scratch_slot = len(self.frames) + 1
# Mouth show/hide: keep the unmasked originals so a live toggle can
# re-upload just the gaze/talk slots masked or normal.
self._base_frames = dict(self.frames)
self.hide_mouth = bool(hide_mouth)
if self.hide_mouth:
self.frames = {n: (_mask_mouth_bytes(d) if n in _MOUTH_FRAMES else d)
for n, d in self.frames.items()}
def mouth_frames_for(self, hidden: bool) -> dict:
"""{name: bytes} for the gaze/talk frames, masked (hidden) or normal — the
FaceController re-uploads just these slots to toggle the mouth live."""
return {n: (_mask_mouth_bytes(self._base_frames[n]) if hidden
else self._base_frames[n])
for n in _MOUTH_FRAMES if n in self._base_frames}
self.slots: dict = {}
self._state = "idle" # underlying eye state: idle|listening|thinking
self._speaking = False # explicit speaking turn (set_speaking)
self._level = 0.0 # live lip-sync amplitude 0..1
self._last_mouth_t = 0.0 # last set_mouth/feed_audio_level time
self._react = None
self._react_until = 0.0
self._cur = None
self._task = None
self._stop = False
self._play_fails = 0 # consecutive play_diy failures (link-drop signal)
self._last_write = 0.0 # monotonic of the last successful play_diy (keepalive)
self._paused = False # loop stops writing (used during a scratch upload)
self._paused_ack = threading.Event() # set once the loop has actually parked
# -- lifecycle ------------------------------------------------------------
async def start(self, *, reload: bool = False):
if self.mask is None:
self.mask = self._ShiningMask(
address=self.address, name_prefix=self.name_prefix, adapter=self.adapter)
await self.mask.connect(timeout=20.0, attempts=12)
await self.mask.set_brightness(self.brightness)
# Upload the frame set via the RELIABLE (acked) image path — see
# _upload_frames. We no longer borrow FaceAnimator.load(), whose
# fire-and-forget upload silently corrupts slots on a marginal link (a
# dropped packet -> garbage frame, no exception -> no retry).
await self._upload_frames(force=reload)
self._stop = False
await self._play("neutral")
self._task = asyncio.create_task(self._loop())
return self
async def stop(self):
self._stop = True
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._own_mask and self.mask is not None:
try:
await self.mask.disconnect()
except Exception:
pass
async def __aenter__(self):
return await self.start()
async def __aexit__(self, *exc):
await self.stop()
# -- reliable frame upload ------------------------------------------------
async def _upload_frames(self, *, force: bool):
"""Upload the frame set to the mask's DIY slots, RELIABLY.
The mask's default DIY upload is fire-and-forget: ~80 write-without-
response packets per frame with no ack, so a packet dropped on a
marginal BLE link silently corrupts that slot (no exception -> no retry)
and the frame renders as garbage. We instead drive each frame through
the mask's ACKED image path (upload_image -> _upload(kind=IMAGE),
per-packet REOK), retrying the whole frame on any failure (a fresh DATS
resets the half-written slot). Same name->slot (1..N) map as before.
"""
names = list(self.frames)
self.slots = {name: i + 1 for i, name in enumerate(names)}
if not force:
try:
count = await self.mask.get_diy_count(timeout=4.0) or 0
except Exception:
count = 0
if count >= len(names):
return # frames already stored (persist in flash)
await self.mask.clear_diy()
acked = True
for i, (name, data) in enumerate(self.frames.items(), start=1):
acked = await self._upload_one_frame(i, bytes(data), acked)
await asyncio.sleep(0.2)
async def _upload_one_frame(self, slot: int, data: bytes, acked: bool) -> bool:
"""Upload one frame. Prefer the acked image path; on failure reconnect +
retry the whole frame. If frame 1 proves this mask clone never acks
IMAGE uploads, latch off the acked path and use paced fire-and-forget
for the rest. Returns whether to keep using the acked path."""
if acked:
for attempt in range(5):
try:
await self.mask.upload_image(data, slot, timeout=8.0)
return True
except Exception as exc:
# frame 1 failing its first two acked tries => this clone
# doesn't ack IMAGE uploads; stop trying it.
if slot == 1 and attempt >= 1:
log.warning("mask: IMAGE uploads not acked by this clone "
"(%s) -- using paced fire-and-forget", exc)
break
if attempt == 4:
log.warning("mask: acked upload of slot %d exhausted (%s) "
"-- fire-and-forget fallback", slot, exc)
break
await self._reupload_reconnect()
# fallback: paced fire-and-forget (probabilistic -- keep the mask close)
await self.mask.upload_raw_image(data, index=slot,
chunk_delay=0.10, init_delay=0.30)
return False
async def _reupload_reconnect(self):
"""Drop + re-establish the link mid-upload so the next frame attempt
starts clean (a fresh DATS resets any half-written slot)."""
try:
await self.mask.disconnect()
except Exception:
pass
await asyncio.sleep(1.0)
try:
await self.mask.connect(timeout=15.0, attempts=8)
await self.mask.set_brightness(self.brightness)
except Exception:
pass
# -- control --------------------------------------------------------------
def set_state(self, state: str):
self._state = state if state in ("idle", "listening", "thinking", "speaking") else "idle"
def set_idle(self):
self._speaking = False
self._last_mouth_t = 0.0 # leave any speaking overlay immediately
self.set_state("idle")
def set_listening(self):
self._speaking = False
self._last_mouth_t = 0.0
self.set_state("listening")
def set_thinking(self):
self._speaking = False
self._last_mouth_t = 0.0
self.set_state("thinking")
def set_speaking(self, on: bool):
"""Mark a speaking turn. Without lip-sync markers the mouth auto-talks;
with them it follows the amplitude. The underlying eye state is kept, so
it returns there when the turn ends."""
self._speaking = bool(on)
if not on:
self._last_mouth_t = 0.0
self._level = 0.0
def feed_audio_level(self, level: float):
"""Per-audio-chunk amplitude 0..1 -> real lip-sync (mouth opens by loudness).
Keeps the face 'speaking' for a short window after the last call, so
pauses close the mouth and the turn ends cleanly when markers stop —
without depending on an explicit speaking on/off signal."""
self._level = max(self._level * 0.4, min(1.0, float(level))) # fast attack
self._last_mouth_t = time.monotonic()
def react(self, emotion: str, hold: float = 1.4):
"""Briefly hold an expression (surprised / smile / sad / …) then release."""
if emotion in self.slots:
self._react = emotion
self._react_until = time.monotonic() + float(hold)
# -- FaceController-compatible API (so it can drop in for FaceAnimator) ----
def set_mouth(self, level: int):
"""Discrete mouth level 0..3 (e.g. from the Gemini [[MOUTH:n]] relay) ->
drives lip-sync. Maps the level to a representative amplitude; level 0
decays the mouth shut but keeps the short speaking window alive."""
amp = (0.0, 0.12, 0.24, 0.5)[max(0, min(3, int(level)))]
self._level = max(self._level * 0.4, amp)
self._last_mouth_t = time.monotonic()
def set_expression(self, name):
"""Hold an expression frame until cleared with None (vs the timed react)."""
if name and name in self.slots:
self._react = name
self._react_until = float("inf")
elif self._react_until == float("inf"):
self._react = None
def pause(self):
"""Stop the loop from writing to the mask (so a concurrent scratch-slot
upload's per-packet acks aren't disturbed by play_diy traffic)."""
self._paused_ack.clear()
self._paused = True
def wait_paused(self, timeout: float = 2.0) -> bool:
"""Block until the loop has actually reached the paused branch (so no
play_diy is in flight when the caller starts the scratch upload)."""
return self._paused_ack.wait(timeout)
def resume(self):
self._paused = False
self._paused_ack.clear()
self._cur = None # force a redraw when the loop takes over again
async def show(self, name: str):
"""One-off: briefly show a named frame (used by FaceController.show_expression)."""
if name in self.slots:
self.react(name, hold=1.5)
# -- internals ------------------------------------------------------------
@staticmethod
def _mouth_for(level: float) -> str:
i = 0 if level < 0.06 else 1 if level < 0.16 else 2 if level < 0.32 else 3
return MOUTH[i]
async def _play(self, name: str, *, force: bool = False):
slot = self.slots.get(name)
if slot is None or (name == self._cur and not force):
return
try:
await self.mask.play_diy(slot)
self._cur = name
self._last_write = time.monotonic() # keepalive clock: link saw traffic
self._play_fails = 0 # link is alive again
except Exception:
self._cur = None # retry next tick on a transient BLE error
self._play_fails += 1 # ...but count it: a sustained run == a drop
def _link_dead(self) -> bool:
"""True once the BLE link looks gone: the transport reports disconnected,
or play_diy has failed a sustained run in a row (a single glitch is still
treated as transient and retried)."""
connected = bool(getattr(self.mask, "is_connected", False)) if self.mask else False
return (not connected) or self._play_fails >= _PLAY_FAIL_LIMIT
async def _reconnect(self) -> bool:
"""Bounded reconnect after a detected drop. Frames persist on the mask's
flash, so on success we only re-pin brightness + redraw the current frame
(no re-upload). Returns True if the link is back, False if exhausted."""
for i in range(_RECONNECT_ATTEMPTS):
if self._stop:
return False
try:
if getattr(self.mask, "is_connected", False):
await self.mask.disconnect() # clean any half-open client first
except Exception:
pass
try:
await self.mask.connect(timeout=10.0, attempts=2)
await self.mask.set_brightness(self.brightness)
self._play_fails = 0
self._cur = None # force a redraw on the fresh link
await self._play("neutral")
return True
except Exception:
await asyncio.sleep(_RECONNECT_BACKOFF)
return False
async def _blink(self, restore: str):
await self._play("blink")
await asyncio.sleep(random.uniform(0.08, 0.13))
if random.random() < 0.18: # occasional quick double-blink
await self._play(restore)
await asyncio.sleep(random.uniform(0.07, 0.11))
await self._play("blink")
await asyncio.sleep(random.uniform(0.08, 0.12))
await self._play(restore)
async def _loop(self):
mono = time.monotonic
t_blink = mono() + random.uniform(1.5, 4.0)
t_sacc = mono() + random.uniform(0.6, 1.6)
t_micro = mono() + random.uniform(12.0, 25.0)
gaze = "neutral"
while not self._stop:
t = mono()
# BLE link health: if it dropped, try a bounded reconnect instead of
# busy-spinning play_diy on a dead transport. If reconnect is
# exhausted, leave the loop so the face stops (the controller's
# status() then reports it not running) rather than spinning forever.
if self._link_dead():
if not self._auto_reconnect:
# Owner-managed recovery (FaceController supervisor): make the
# transport report disconnected so the supervisor's is_connected
# check fires, then leave the loop. The supervisor reconnects
# and rebuilds the face (frames persist on the mask's flash).
try:
if getattr(self.mask, "is_connected", False):
await self.mask.disconnect()
except Exception:
pass
break
if not await self._reconnect():
break
t = mono() # reconnect can take a while
# Paused (during a scratch-slot upload): write nothing so the upload's
# per-packet REOK acks aren't disturbed by play_diy traffic. Signal
# that we've actually parked so the caller can start the upload.
if self._paused:
self._paused_ack.set()
await asyncio.sleep(0.1)
continue
# BLE keepalive: re-send the current frame if the link has gone quiet.
# _play() skips unchanged frames, so a long neutral idle stretch writes
# nothing but blinks; a quiet gap past the supervision timeout drops the
# link, and each reconnect flashes the mask's built-in face. A cheap
# periodic re-send keeps the link alive (no-op while speaking — that
# path already writes ~11x/s, so _last_write stays fresh).
if self._cur is not None and (t - self._last_write) >= _KEEPALIVE_SEC:
await self._play(self._cur, force=True)
# transient reaction overrides everything briefly
if self._react is not None:
if t < self._react_until:
await self._play(self._react)
await asyncio.sleep(0.06)
continue
self._react = None
self._cur = None # force a redraw of whatever's underneath
# "speaking" = an explicit turn OR fresh lip-sync markers (the latter
# window auto-expires, so the mouth closes and the turn ends when the
# markers stop, without needing a reliable speaking-off signal).
lipsync_active = (t - self._last_mouth_t) < _SPEECH_WINDOW
if self._speaking or lipsync_active:
if lipsync_active:
base = self._mouth_for(self._level) # 0 = closed on pauses
self._level *= 0.55 # decay toward closed
else:
base = MOUTH[random.choice([0, 1, 1, 2, 2, 3, 3, 2, 1, 0])] # auto-talk
await self._play(base)
# No mid-speech blink: a blink is a 2-3 frame burst that, on top
# of the mouth cadence, spikes the switch rate and tears the
# display. Eyes blink between utterances (idle/listening) instead.
await asyncio.sleep(_SPEAK_FRAME_SEC)
continue
# --- non-speaking: idle / listening / thinking ---
if t >= t_blink:
await self._blink(gaze)
lo, hi = (3.5, 6.5) if self._state == "thinking" else (2.0, 4.5)
t_blink = t + random.uniform(lo, hi)
if t >= t_sacc:
if self._state == "thinking":
gaze = random.choice(["look_left", "look_right", "look_left", "look_right", "neutral"])
hold = random.uniform(0.9, 1.8)
elif self._state == "listening":
gaze = random.choice(["neutral", "neutral", "neutral", "look_left", "look_right"])
hold = random.uniform(0.5, 1.2)
else: # idle — relaxed wandering
gaze = random.choice(["neutral", "neutral", "look_left", "look_right", "neutral"])
hold = random.uniform(0.3, 0.9)
await self._play(gaze)
t_sacc = t + hold + random.uniform(0.4, 1.4)
else:
await self._play(gaze)
if self._state == "idle" and t >= t_micro: # rare idle micro-smile
await self._play("smile")
await asyncio.sleep(random.uniform(0.6, 1.0))
gaze = "neutral"
self._cur = None
t_micro = t + random.uniform(15.0, 30.0)
await asyncio.sleep(0.05)
# ---------------------------------------------------------------------------
# Standalone runner
# ---------------------------------------------------------------------------
async def _amain(args):
face = LifelikeFace(name_prefix=args.name_prefix, address=args.address,
brightness=args.brightness)
print("connecting + loading frames (keep the mask within ~30 cm) ...", flush=True)
await face.start(reload=args.reload)
print("lifelike motion running. Ctrl+C to stop.", flush=True)
try:
if args.demo:
steps = [
("idle (wandering + blinks)", lambda: face.set_idle(), 7),
("listening (attentive)", lambda: face.set_listening(), 7),
("thinking (looks away)", lambda: face.set_thinking(), 7),
("speaking (auto lip-sync)", lambda: face.set_speaking(True), 7),
("react: surprised", lambda: face.react("surprised", 2.0), 2.2),
("react: smile", lambda: face.react("smile", 2.0), 2.2),
("react: sad", lambda: face.react("sad", 2.0), 2.2),
("back to idle", lambda: face.set_idle(), 5),
]
for label, action, dur in steps:
print(" ->", label, flush=True)
action()
await asyncio.sleep(dur)
face.set_idle()
await asyncio.sleep(2)
else:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nstopping ...")
finally:
await face.stop()
def main():
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--demo", action="store_true", help="cycle through the states/reactions")
ap.add_argument("--reload", action="store_true", help="force re-upload of the frame set")
ap.add_argument("--address", help="mask BLE MAC")
ap.add_argument("--name-prefix", default="MASK")
ap.add_argument("--brightness", type=int, default=95)
asyncio.run(_amain(ap.parse_args()))
if __name__ == "__main__":
main()