106 lines
3.5 KiB
Python
106 lines
3.5 KiB
Python
# trigger_loop.py
|
|
import asyncio
|
|
import time
|
|
|
|
import config
|
|
from Logger import Logs
|
|
|
|
|
|
sanad_logger = Logs()
|
|
sanad_logger.LogEngine("G1_Logs", "trigger_loop")
|
|
|
|
def build_photographer_prompt(photo_total_sec: float, photo_thanks_sec: float) -> str:
|
|
talk_sec = max(1.0, photo_total_sec - photo_thanks_sec)
|
|
thanks_sec = max(0.5, min(photo_thanks_sec, photo_total_sec - 0.5))
|
|
|
|
return (
|
|
"In english only speak You are Bousanad the photographer. Speak in Emirati (UAE) dialect using short, energetic sentences. "
|
|
"Start with: 'Yalla.. get ready.. smile!' "
|
|
f"For about {talk_sec:.0f} seconds, give quick photo directions like: "
|
|
"'Come a bit closer' 'Hold still' 'Look at the camera' 'Nice, perfect' 'One.. two.. three' "
|
|
f"And in the last about {thanks_sec:.0f} seconds say: 'thank youooo' in a long, friendly way."
|
|
)
|
|
|
|
|
|
|
|
async def trigger_loop(
|
|
hub,
|
|
replay,
|
|
voice,
|
|
ws,
|
|
take_photo_sync_callable,
|
|
):
|
|
"""
|
|
R2+X:
|
|
- start replay (thread)
|
|
- send Gemini prompt (immediate)
|
|
- take photo AFTER PHOTO_DELAY_SEC (default=5.0) using teleimager python
|
|
R2+L1:
|
|
- cancels pending delayed photo (replay cancel handled in replay engine)
|
|
Debounce:
|
|
- must release R2+X to re-trigger
|
|
- blocks retrigger for PHOTO_TOTAL_SEC window
|
|
"""
|
|
r2x_prev = False
|
|
waiting_release = False
|
|
photo_busy_until = 0.0
|
|
|
|
pending_photo_task: asyncio.Task | None = None
|
|
|
|
async def delayed_photo(delay_sec: float):
|
|
await asyncio.sleep(delay_sec)
|
|
photo_result = await asyncio.to_thread(take_photo_sync_callable)
|
|
sanad_logger.print_and_log(f"📸 Saved photo: {photo_result}", message_type="info")
|
|
|
|
while True:
|
|
await asyncio.sleep(0.02)
|
|
if not hub.first_state:
|
|
continue
|
|
|
|
# Cancel combo: cancel pending photo
|
|
if hub.combo_r2l1():
|
|
if pending_photo_task and not pending_photo_task.done():
|
|
pending_photo_task.cancel()
|
|
sanad_logger.print_and_log("🛑 Pending photo cancelled (R2+L1).", message_type="warning")
|
|
|
|
r2x_now = hub.combo_r2x()
|
|
|
|
# Require release before next trigger
|
|
if waiting_release:
|
|
if not r2x_now:
|
|
waiting_release = False
|
|
r2x_prev = r2x_now
|
|
continue
|
|
|
|
# Do not retrigger while replay is playing
|
|
if replay.is_playing:
|
|
r2x_prev = r2x_now
|
|
continue
|
|
|
|
now = time.time()
|
|
|
|
# Rising edge trigger + not inside busy window
|
|
if r2x_now and (not r2x_prev) and (now >= photo_busy_until):
|
|
waiting_release = True
|
|
photo_busy_until = now + config.PHOTO_TOTAL_SEC
|
|
|
|
|
|
|
|
# 1) start replay (blocking -> thread)
|
|
asyncio.create_task(asyncio.to_thread(replay.run, config.REPLAY_FILE, config.HOME_FILE, 1.0))
|
|
|
|
# 2) send photographer prompt
|
|
prompt = build_photographer_prompt(config.PHOTO_TOTAL_SEC, config.PHOTO_THANKS_SEC)
|
|
await voice.send_text_prompt(ws, prompt)
|
|
|
|
# 3) schedule delayed photo
|
|
if pending_photo_task and not pending_photo_task.done():
|
|
pending_photo_task.cancel()
|
|
|
|
# ✅ You want photo exactly at 5.0 seconds when total is 5s:
|
|
# - if PHOTO_TOTAL_SEC is 5, delay becomes 5 (clamped)
|
|
delay = max(0.0, min(config.PHOTO_DELAY_SEC, config.PHOTO_TOTAL_SEC))
|
|
pending_photo_task = asyncio.create_task(delayed_photo(delay))
|
|
|
|
r2x_prev = r2x_now
|