600 lines
28 KiB
Python
600 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
"""Lifelike face motion for the Shining LED mask — SanadV3.
|
|
|
|
A richer, more *organic* driver than the Mask lib's built-in idle. Instead of an
|
|
occasional blink/glance, it gives the robot's face the small, constant motion a
|
|
real face has:
|
|
|
|
* natural blinking — varied intervals, occasional quick double-blinks
|
|
* frequent small eye saccades (darts) with short gaze holds and drift
|
|
* idle micro-expressions (a brief smile now and then)
|
|
* state-aware behaviour:
|
|
idle — relaxed, wanders, blinks
|
|
listening — attentive, eyes mostly forward, fewer darts, soft blinks
|
|
thinking — looks away (longer gaze holds), slower blinks
|
|
speaking — mouth lip-syncs to audio + the odd mid-sentence blink
|
|
* quick reactions: surprised / happy(smile) / sad, held briefly then released
|
|
|
|
It drives the mask by PLAY of the pre-uploaded DIY frames (no per-frame upload),
|
|
so motion is smooth. Lip-sync composes with the eye motion via feed_audio_level().
|
|
|
|
Run it standalone (keep the mask within ~30 cm for the one-time frame upload):
|
|
|
|
python3 face/face_motion.py # connect, load frames, stay alive
|
|
python3 face/face_motion.py --demo # cycle the states to show the range
|
|
python3 face/face_motion.py --reload # force re-upload of the frame set
|
|
|
|
Integrate into Sanad: construct ``LifelikeFace(mask=<connected ShiningMask>)``
|
|
(or let it connect itself), ``await face.start()``, then drive it from the event
|
|
bus / Gemini lip-sync markers:
|
|
|
|
face.set_listening() # when the user starts speaking
|
|
face.set_thinking() # while a tool/response is being prepared
|
|
face.set_speaking(True/False) # around a spoken reply
|
|
face.feed_audio_level(rms_0_to_1) # per audio chunk -> real lip-sync
|
|
face.react("surprised" | "smile" | "sad")
|
|
face.set_idle() # back to relaxed wandering
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
import logging
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger("sanad.face_motion")
|
|
|
|
# Frames present in the mask's DIY slots (colorface.default_frames()).
|
|
GAZE = ("neutral", "look_left", "look_right")
|
|
MOUTH = ("neutral", "talk1", "talk2", "talk3")
|
|
|
|
# How long after the last lip-sync marker the face stays "speaking" (mouth
|
|
# follows the amplitude; pauses close it). When markers stop for this long the
|
|
# turn ends and the eyes return to their underlying state.
|
|
_SPEECH_WINDOW = 0.6
|
|
|
|
# Mouth-frame cadence while speaking. Each frame is a FULL-face DIY slot switch;
|
|
# this small mask can't cleanly repaint the LED matrix faster than ~5/s, so the
|
|
# old 0.09s (~11/s) cadence overran it and showed torn/scrambled composites of
|
|
# several frames at once. Cap it well under the tear threshold — speech visemes
|
|
# only change ~4-7/s anyway, so lip-sync still reads fine.
|
|
_SPEAK_FRAME_SEC = 0.22
|
|
|
|
# BLE-link health. If the mask link drops mid-session every play_diy raises a
|
|
# BleakError, which left the loop busy-spinning ~20x/s forever (no recovery but a
|
|
# manual disconnect/connect). Treat a run of consecutive play failures (or
|
|
# mask.is_connected going False) as a dropped link and attempt a *bounded*
|
|
# reconnect; if that is exhausted, stop the loop so the face goes idle/unavailable
|
|
# instead of hammering a dead transport.
|
|
_PLAY_FAIL_LIMIT = 10 # consecutive failed plays before we call it a drop
|
|
_RECONNECT_ATTEMPTS = 3 # reconnect tries per detected drop
|
|
_RECONNECT_BACKOFF = 2.0 # seconds between reconnect tries
|
|
|
|
# BLE link keepalive. _play() skips re-sending an *unchanged* frame, so a long
|
|
# neutral/idle stretch writes nothing but the occasional blink (every 2-4.5s).
|
|
# If that quiet gap (plus an RF glitch) outlasts the link's supervision timeout
|
|
# the mask drops — and every reconnect briefly flashes the mask's own built-in
|
|
# face. Re-send the current frame at least this often so the link never goes
|
|
# quiet long enough to be dropped. One tiny write/sec when idle; free when busy.
|
|
_KEEPALIVE_SEC = 1.0
|
|
|
|
# Frames that carry the animated face's mouth (gaze + lip-sync). "Hide mouth"
|
|
# blacks out the mouth region on just these, leaving eyes-only — the emotion
|
|
# icons (heart/thumb/…) are left alone.
|
|
_MOUTH_FRAMES = ("neutral", "talk1", "talk2", "talk3", "blink",
|
|
"look_left", "look_right")
|
|
_MOUTH_MASK_TOP = 32 # display-y below which the mouth lives (eyes end ~29)
|
|
|
|
|
|
def _mask_mouth_bytes(data: bytes) -> bytes:
|
|
"""Return a copy of an encoded 46x58 frame with the mouth region blacked out
|
|
(decode the transposed bytes -> mask display rows >= _MOUTH_MASK_TOP -> re-encode)."""
|
|
import colorface as _cf
|
|
from PIL import Image as _Image, ImageDraw as _ImageDraw
|
|
img = _Image.frombytes("RGB", (_cf.DISPLAY_H, _cf.DISPLAY_W), bytes(data))
|
|
img = img.transpose(_Image.Transpose.TRANSPOSE) # -> 46x58 display space
|
|
_ImageDraw.Draw(img).rectangle([0, _MOUTH_MASK_TOP, _cf.DISPLAY_W, _cf.DISPLAY_H],
|
|
fill=(0, 0, 0))
|
|
return _cf.encode(img)
|
|
|
|
|
|
def _add_mask_to_path() -> str:
|
|
"""Put the flat Mask library (mask.py / faceanim.py / colorface.py) on sys.path."""
|
|
d = os.environ.get("SANAD_MASK_DIR") or str(Path(__file__).resolve().parents[2] / "Mask")
|
|
if d and d not in sys.path:
|
|
sys.path.insert(0, d)
|
|
return d
|
|
|
|
|
|
class LifelikeFace:
|
|
"""Organic, state-aware motion driver for the LED mask."""
|
|
|
|
def __init__(self, mask=None, *, name_prefix="MASK", address=None, adapter=None,
|
|
brightness=95, frames=None, eye_color=None, mouth_color=None,
|
|
sclera_color=None, auto_reconnect=True, hide_mouth=False):
|
|
_add_mask_to_path()
|
|
import mask as _mask # flat Mask lib
|
|
import faceanim as _faceanim
|
|
import colorface as _colorface
|
|
self._ShiningMask = _mask.ShiningMask
|
|
self._FaceAnimator = _faceanim.FaceAnimator
|
|
self._colorface = _colorface
|
|
|
|
self.mask = mask
|
|
self._own_mask = mask is None
|
|
self.name_prefix = name_prefix
|
|
self.address = address
|
|
self.adapter = adapter
|
|
self.brightness = int(brightness)
|
|
# When False, a dropped link is NOT self-healed here — the loop bails
|
|
# cleanly (and forces the transport disconnected) so an external owner
|
|
# (FaceController's reconnect supervisor) brings the link + face back.
|
|
self._auto_reconnect = bool(auto_reconnect)
|
|
# Frame colors: explicit frames win; else build the default set tinted
|
|
# with whatever colors were given (None -> the lib defaults cyan/red).
|
|
if frames is None:
|
|
ck = {}
|
|
if eye_color is not None:
|
|
ck["eye_color"] = tuple(eye_color)
|
|
if mouth_color is not None:
|
|
ck["mouth_color"] = tuple(mouth_color)
|
|
if sclera_color is not None:
|
|
ck["sclera_color"] = tuple(sclera_color)
|
|
frames = _colorface.default_frames(**ck)
|
|
# Extra Gemini-triggerable emotions (heart, laugh, love-eyes, cool,
|
|
# sleepy, confused, kiss, star_struck) in the same style. Appended
|
|
# after the base set so slot ids 1..N stay stable for existing
|
|
# frames. Guarded: a missing module never breaks the face.
|
|
try:
|
|
from Project.Sanad.face.emotion_frames import emotion_frames as _emo
|
|
# 7 emotions so slots 1..19 hold the face set and slot 20 stays
|
|
# free as a scratch slot for QR/social images (mask caps at 20).
|
|
frames = {**frames, **_emo(**ck, include={
|
|
"heart", "laugh", "love", "cool", "confused", "kiss", "thumbs_up"})}
|
|
except Exception:
|
|
log.exception("emotion frames unavailable — base frames only")
|
|
self.frames = frames
|
|
# Reserved DIY slot (just past the animated frames) for on-demand images
|
|
# (QR / social) shown via the FaceController's show_scratch_image().
|
|
self.scratch_slot = len(self.frames) + 1
|
|
# Mouth show/hide: keep the unmasked originals so a live toggle can
|
|
# re-upload just the gaze/talk slots masked or normal.
|
|
self._base_frames = dict(self.frames)
|
|
self.hide_mouth = bool(hide_mouth)
|
|
if self.hide_mouth:
|
|
self.frames = {n: (_mask_mouth_bytes(d) if n in _MOUTH_FRAMES else d)
|
|
for n, d in self.frames.items()}
|
|
|
|
def mouth_frames_for(self, hidden: bool) -> dict:
|
|
"""{name: bytes} for the gaze/talk frames, masked (hidden) or normal — the
|
|
FaceController re-uploads just these slots to toggle the mouth live."""
|
|
return {n: (_mask_mouth_bytes(self._base_frames[n]) if hidden
|
|
else self._base_frames[n])
|
|
for n in _MOUTH_FRAMES if n in self._base_frames}
|
|
|
|
self.slots: dict = {}
|
|
self._state = "idle" # underlying eye state: idle|listening|thinking
|
|
self._speaking = False # explicit speaking turn (set_speaking)
|
|
self._level = 0.0 # live lip-sync amplitude 0..1
|
|
self._last_mouth_t = 0.0 # last set_mouth/feed_audio_level time
|
|
self._react = None
|
|
self._react_until = 0.0
|
|
self._cur = None
|
|
self._task = None
|
|
self._stop = False
|
|
self._play_fails = 0 # consecutive play_diy failures (link-drop signal)
|
|
self._last_write = 0.0 # monotonic of the last successful play_diy (keepalive)
|
|
self._paused = False # loop stops writing (used during a scratch upload)
|
|
self._paused_ack = threading.Event() # set once the loop has actually parked
|
|
|
|
# -- lifecycle ------------------------------------------------------------
|
|
|
|
async def start(self, *, reload: bool = False):
|
|
if self.mask is None:
|
|
self.mask = self._ShiningMask(
|
|
address=self.address, name_prefix=self.name_prefix, adapter=self.adapter)
|
|
await self.mask.connect(timeout=20.0, attempts=12)
|
|
await self.mask.set_brightness(self.brightness)
|
|
# Upload the frame set via the RELIABLE (acked) image path — see
|
|
# _upload_frames. We no longer borrow FaceAnimator.load(), whose
|
|
# fire-and-forget upload silently corrupts slots on a marginal link (a
|
|
# dropped packet -> garbage frame, no exception -> no retry).
|
|
await self._upload_frames(force=reload)
|
|
self._stop = False
|
|
await self._play("neutral")
|
|
self._task = asyncio.create_task(self._loop())
|
|
return self
|
|
|
|
async def stop(self):
|
|
self._stop = True
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._task = None
|
|
if self._own_mask and self.mask is not None:
|
|
try:
|
|
await self.mask.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
return await self.start()
|
|
|
|
async def __aexit__(self, *exc):
|
|
await self.stop()
|
|
|
|
# -- reliable frame upload ------------------------------------------------
|
|
|
|
async def _upload_frames(self, *, force: bool):
|
|
"""Upload the frame set to the mask's DIY slots, RELIABLY.
|
|
|
|
The mask's default DIY upload is fire-and-forget: ~80 write-without-
|
|
response packets per frame with no ack, so a packet dropped on a
|
|
marginal BLE link silently corrupts that slot (no exception -> no retry)
|
|
and the frame renders as garbage. We instead drive each frame through
|
|
the mask's ACKED image path (upload_image -> _upload(kind=IMAGE),
|
|
per-packet REOK), retrying the whole frame on any failure (a fresh DATS
|
|
resets the half-written slot). Same name->slot (1..N) map as before.
|
|
"""
|
|
names = list(self.frames)
|
|
self.slots = {name: i + 1 for i, name in enumerate(names)}
|
|
if not force:
|
|
try:
|
|
count = await self.mask.get_diy_count(timeout=4.0) or 0
|
|
except Exception:
|
|
count = 0
|
|
if count >= len(names):
|
|
return # frames already stored (persist in flash)
|
|
await self.mask.clear_diy()
|
|
acked = True
|
|
for i, (name, data) in enumerate(self.frames.items(), start=1):
|
|
acked = await self._upload_one_frame(i, bytes(data), acked)
|
|
await asyncio.sleep(0.2)
|
|
|
|
async def _upload_one_frame(self, slot: int, data: bytes, acked: bool) -> bool:
|
|
"""Upload one frame. Prefer the acked image path; on failure reconnect +
|
|
retry the whole frame. If frame 1 proves this mask clone never acks
|
|
IMAGE uploads, latch off the acked path and use paced fire-and-forget
|
|
for the rest. Returns whether to keep using the acked path."""
|
|
if acked:
|
|
for attempt in range(5):
|
|
try:
|
|
await self.mask.upload_image(data, slot, timeout=8.0)
|
|
return True
|
|
except Exception as exc:
|
|
# frame 1 failing its first two acked tries => this clone
|
|
# doesn't ack IMAGE uploads; stop trying it.
|
|
if slot == 1 and attempt >= 1:
|
|
log.warning("mask: IMAGE uploads not acked by this clone "
|
|
"(%s) -- using paced fire-and-forget", exc)
|
|
break
|
|
if attempt == 4:
|
|
log.warning("mask: acked upload of slot %d exhausted (%s) "
|
|
"-- fire-and-forget fallback", slot, exc)
|
|
break
|
|
await self._reupload_reconnect()
|
|
# fallback: paced fire-and-forget (probabilistic -- keep the mask close)
|
|
await self.mask.upload_raw_image(data, index=slot,
|
|
chunk_delay=0.10, init_delay=0.30)
|
|
return False
|
|
|
|
async def _reupload_reconnect(self):
|
|
"""Drop + re-establish the link mid-upload so the next frame attempt
|
|
starts clean (a fresh DATS resets any half-written slot)."""
|
|
try:
|
|
await self.mask.disconnect()
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(1.0)
|
|
try:
|
|
await self.mask.connect(timeout=15.0, attempts=8)
|
|
await self.mask.set_brightness(self.brightness)
|
|
except Exception:
|
|
pass
|
|
|
|
# -- control --------------------------------------------------------------
|
|
|
|
def set_state(self, state: str):
|
|
self._state = state if state in ("idle", "listening", "thinking", "speaking") else "idle"
|
|
|
|
def set_idle(self):
|
|
self._speaking = False
|
|
self._last_mouth_t = 0.0 # leave any speaking overlay immediately
|
|
self.set_state("idle")
|
|
|
|
def set_listening(self):
|
|
self._speaking = False
|
|
self._last_mouth_t = 0.0
|
|
self.set_state("listening")
|
|
|
|
def set_thinking(self):
|
|
self._speaking = False
|
|
self._last_mouth_t = 0.0
|
|
self.set_state("thinking")
|
|
|
|
def set_speaking(self, on: bool):
|
|
"""Mark a speaking turn. Without lip-sync markers the mouth auto-talks;
|
|
with them it follows the amplitude. The underlying eye state is kept, so
|
|
it returns there when the turn ends."""
|
|
self._speaking = bool(on)
|
|
if not on:
|
|
self._last_mouth_t = 0.0
|
|
self._level = 0.0
|
|
|
|
def feed_audio_level(self, level: float):
|
|
"""Per-audio-chunk amplitude 0..1 -> real lip-sync (mouth opens by loudness).
|
|
|
|
Keeps the face 'speaking' for a short window after the last call, so
|
|
pauses close the mouth and the turn ends cleanly when markers stop —
|
|
without depending on an explicit speaking on/off signal."""
|
|
self._level = max(self._level * 0.4, min(1.0, float(level))) # fast attack
|
|
self._last_mouth_t = time.monotonic()
|
|
|
|
def react(self, emotion: str, hold: float = 1.4):
|
|
"""Briefly hold an expression (surprised / smile / sad / …) then release."""
|
|
if emotion in self.slots:
|
|
self._react = emotion
|
|
self._react_until = time.monotonic() + float(hold)
|
|
|
|
# -- FaceController-compatible API (so it can drop in for FaceAnimator) ----
|
|
|
|
def set_mouth(self, level: int):
|
|
"""Discrete mouth level 0..3 (e.g. from the Gemini [[MOUTH:n]] relay) ->
|
|
drives lip-sync. Maps the level to a representative amplitude; level 0
|
|
decays the mouth shut but keeps the short speaking window alive."""
|
|
amp = (0.0, 0.12, 0.24, 0.5)[max(0, min(3, int(level)))]
|
|
self._level = max(self._level * 0.4, amp)
|
|
self._last_mouth_t = time.monotonic()
|
|
|
|
def set_expression(self, name):
|
|
"""Hold an expression frame until cleared with None (vs the timed react)."""
|
|
if name and name in self.slots:
|
|
self._react = name
|
|
self._react_until = float("inf")
|
|
elif self._react_until == float("inf"):
|
|
self._react = None
|
|
|
|
def pause(self):
|
|
"""Stop the loop from writing to the mask (so a concurrent scratch-slot
|
|
upload's per-packet acks aren't disturbed by play_diy traffic)."""
|
|
self._paused_ack.clear()
|
|
self._paused = True
|
|
|
|
def wait_paused(self, timeout: float = 2.0) -> bool:
|
|
"""Block until the loop has actually reached the paused branch (so no
|
|
play_diy is in flight when the caller starts the scratch upload)."""
|
|
return self._paused_ack.wait(timeout)
|
|
|
|
def resume(self):
|
|
self._paused = False
|
|
self._paused_ack.clear()
|
|
self._cur = None # force a redraw when the loop takes over again
|
|
|
|
async def show(self, name: str):
|
|
"""One-off: briefly show a named frame (used by FaceController.show_expression)."""
|
|
if name in self.slots:
|
|
self.react(name, hold=1.5)
|
|
|
|
# -- internals ------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _mouth_for(level: float) -> str:
|
|
i = 0 if level < 0.06 else 1 if level < 0.16 else 2 if level < 0.32 else 3
|
|
return MOUTH[i]
|
|
|
|
async def _play(self, name: str, *, force: bool = False):
|
|
slot = self.slots.get(name)
|
|
if slot is None or (name == self._cur and not force):
|
|
return
|
|
try:
|
|
await self.mask.play_diy(slot)
|
|
self._cur = name
|
|
self._last_write = time.monotonic() # keepalive clock: link saw traffic
|
|
self._play_fails = 0 # link is alive again
|
|
except Exception:
|
|
self._cur = None # retry next tick on a transient BLE error
|
|
self._play_fails += 1 # ...but count it: a sustained run == a drop
|
|
|
|
def _link_dead(self) -> bool:
|
|
"""True once the BLE link looks gone: the transport reports disconnected,
|
|
or play_diy has failed a sustained run in a row (a single glitch is still
|
|
treated as transient and retried)."""
|
|
connected = bool(getattr(self.mask, "is_connected", False)) if self.mask else False
|
|
return (not connected) or self._play_fails >= _PLAY_FAIL_LIMIT
|
|
|
|
async def _reconnect(self) -> bool:
|
|
"""Bounded reconnect after a detected drop. Frames persist on the mask's
|
|
flash, so on success we only re-pin brightness + redraw the current frame
|
|
(no re-upload). Returns True if the link is back, False if exhausted."""
|
|
for i in range(_RECONNECT_ATTEMPTS):
|
|
if self._stop:
|
|
return False
|
|
try:
|
|
if getattr(self.mask, "is_connected", False):
|
|
await self.mask.disconnect() # clean any half-open client first
|
|
except Exception:
|
|
pass
|
|
try:
|
|
await self.mask.connect(timeout=10.0, attempts=2)
|
|
await self.mask.set_brightness(self.brightness)
|
|
self._play_fails = 0
|
|
self._cur = None # force a redraw on the fresh link
|
|
await self._play("neutral")
|
|
return True
|
|
except Exception:
|
|
await asyncio.sleep(_RECONNECT_BACKOFF)
|
|
return False
|
|
|
|
async def _blink(self, restore: str):
|
|
await self._play("blink")
|
|
await asyncio.sleep(random.uniform(0.08, 0.13))
|
|
if random.random() < 0.18: # occasional quick double-blink
|
|
await self._play(restore)
|
|
await asyncio.sleep(random.uniform(0.07, 0.11))
|
|
await self._play("blink")
|
|
await asyncio.sleep(random.uniform(0.08, 0.12))
|
|
await self._play(restore)
|
|
|
|
async def _loop(self):
|
|
mono = time.monotonic
|
|
t_blink = mono() + random.uniform(1.5, 4.0)
|
|
t_sacc = mono() + random.uniform(0.6, 1.6)
|
|
t_micro = mono() + random.uniform(12.0, 25.0)
|
|
gaze = "neutral"
|
|
while not self._stop:
|
|
t = mono()
|
|
|
|
# BLE link health: if it dropped, try a bounded reconnect instead of
|
|
# busy-spinning play_diy on a dead transport. If reconnect is
|
|
# exhausted, leave the loop so the face stops (the controller's
|
|
# status() then reports it not running) rather than spinning forever.
|
|
if self._link_dead():
|
|
if not self._auto_reconnect:
|
|
# Owner-managed recovery (FaceController supervisor): make the
|
|
# transport report disconnected so the supervisor's is_connected
|
|
# check fires, then leave the loop. The supervisor reconnects
|
|
# and rebuilds the face (frames persist on the mask's flash).
|
|
try:
|
|
if getattr(self.mask, "is_connected", False):
|
|
await self.mask.disconnect()
|
|
except Exception:
|
|
pass
|
|
break
|
|
if not await self._reconnect():
|
|
break
|
|
t = mono() # reconnect can take a while
|
|
|
|
# Paused (during a scratch-slot upload): write nothing so the upload's
|
|
# per-packet REOK acks aren't disturbed by play_diy traffic. Signal
|
|
# that we've actually parked so the caller can start the upload.
|
|
if self._paused:
|
|
self._paused_ack.set()
|
|
await asyncio.sleep(0.1)
|
|
continue
|
|
|
|
# BLE keepalive: re-send the current frame if the link has gone quiet.
|
|
# _play() skips unchanged frames, so a long neutral idle stretch writes
|
|
# nothing but blinks; a quiet gap past the supervision timeout drops the
|
|
# link, and each reconnect flashes the mask's built-in face. A cheap
|
|
# periodic re-send keeps the link alive (no-op while speaking — that
|
|
# path already writes ~11x/s, so _last_write stays fresh).
|
|
if self._cur is not None and (t - self._last_write) >= _KEEPALIVE_SEC:
|
|
await self._play(self._cur, force=True)
|
|
|
|
# transient reaction overrides everything briefly
|
|
if self._react is not None:
|
|
if t < self._react_until:
|
|
await self._play(self._react)
|
|
await asyncio.sleep(0.06)
|
|
continue
|
|
self._react = None
|
|
self._cur = None # force a redraw of whatever's underneath
|
|
|
|
# "speaking" = an explicit turn OR fresh lip-sync markers (the latter
|
|
# window auto-expires, so the mouth closes and the turn ends when the
|
|
# markers stop, without needing a reliable speaking-off signal).
|
|
lipsync_active = (t - self._last_mouth_t) < _SPEECH_WINDOW
|
|
if self._speaking or lipsync_active:
|
|
if lipsync_active:
|
|
base = self._mouth_for(self._level) # 0 = closed on pauses
|
|
self._level *= 0.55 # decay toward closed
|
|
else:
|
|
base = MOUTH[random.choice([0, 1, 1, 2, 2, 3, 3, 2, 1, 0])] # auto-talk
|
|
await self._play(base)
|
|
# No mid-speech blink: a blink is a 2-3 frame burst that, on top
|
|
# of the mouth cadence, spikes the switch rate and tears the
|
|
# display. Eyes blink between utterances (idle/listening) instead.
|
|
await asyncio.sleep(_SPEAK_FRAME_SEC)
|
|
continue
|
|
|
|
# --- non-speaking: idle / listening / thinking ---
|
|
if t >= t_blink:
|
|
await self._blink(gaze)
|
|
lo, hi = (3.5, 6.5) if self._state == "thinking" else (2.0, 4.5)
|
|
t_blink = t + random.uniform(lo, hi)
|
|
|
|
if t >= t_sacc:
|
|
if self._state == "thinking":
|
|
gaze = random.choice(["look_left", "look_right", "look_left", "look_right", "neutral"])
|
|
hold = random.uniform(0.9, 1.8)
|
|
elif self._state == "listening":
|
|
gaze = random.choice(["neutral", "neutral", "neutral", "look_left", "look_right"])
|
|
hold = random.uniform(0.5, 1.2)
|
|
else: # idle — relaxed wandering
|
|
gaze = random.choice(["neutral", "neutral", "look_left", "look_right", "neutral"])
|
|
hold = random.uniform(0.3, 0.9)
|
|
await self._play(gaze)
|
|
t_sacc = t + hold + random.uniform(0.4, 1.4)
|
|
else:
|
|
await self._play(gaze)
|
|
|
|
if self._state == "idle" and t >= t_micro: # rare idle micro-smile
|
|
await self._play("smile")
|
|
await asyncio.sleep(random.uniform(0.6, 1.0))
|
|
gaze = "neutral"
|
|
self._cur = None
|
|
t_micro = t + random.uniform(15.0, 30.0)
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Standalone runner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _amain(args):
|
|
face = LifelikeFace(name_prefix=args.name_prefix, address=args.address,
|
|
brightness=args.brightness)
|
|
print("connecting + loading frames (keep the mask within ~30 cm) ...", flush=True)
|
|
await face.start(reload=args.reload)
|
|
print("lifelike motion running. Ctrl+C to stop.", flush=True)
|
|
try:
|
|
if args.demo:
|
|
steps = [
|
|
("idle (wandering + blinks)", lambda: face.set_idle(), 7),
|
|
("listening (attentive)", lambda: face.set_listening(), 7),
|
|
("thinking (looks away)", lambda: face.set_thinking(), 7),
|
|
("speaking (auto lip-sync)", lambda: face.set_speaking(True), 7),
|
|
("react: surprised", lambda: face.react("surprised", 2.0), 2.2),
|
|
("react: smile", lambda: face.react("smile", 2.0), 2.2),
|
|
("react: sad", lambda: face.react("sad", 2.0), 2.2),
|
|
("back to idle", lambda: face.set_idle(), 5),
|
|
]
|
|
for label, action, dur in steps:
|
|
print(" ->", label, flush=True)
|
|
action()
|
|
await asyncio.sleep(dur)
|
|
face.set_idle()
|
|
await asyncio.sleep(2)
|
|
else:
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nstopping ...")
|
|
finally:
|
|
await face.stop()
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
ap.add_argument("--demo", action="store_true", help="cycle through the states/reactions")
|
|
ap.add_argument("--reload", action="store_true", help="force re-upload of the frame set")
|
|
ap.add_argument("--address", help="mask BLE MAC")
|
|
ap.add_argument("--name-prefix", default="MASK")
|
|
ap.add_argument("--brightness", type=int, default=95)
|
|
asyncio.run(_amain(ap.parse_args()))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|