import asyncio import json import os import threading import time from pathlib import Path from threading import Thread import cv2 from Modes.AI.vision_detector import VisionDetector from Core import settings as config from Core.error_events import record_error from Core.Logger import Logs from Core import people_registry from Server.capture_service import capture_with_replay_sync, replay_timing_profile sanad_logger = Logs() sanad_logger.LogEngine("G1_Logs", "autonomous_manager") class AutonomousManager: """ Autonomous flow: IDLE -> WAIT_CONFIRM -> FRAMING -> COUNTDOWN -> COMPLETE -> IDLE """ def __init__( self, zmq_host: str = "127.0.0.1", zmq_port: int = 55555, stability_frames: int = 3, poll_hz: int = 8, video_source: str | None = None, ): self.detector = VisionDetector( zmq_host=zmq_host, zmq_port=zmq_port, poll_hz=poll_hz, video_source=video_source, ) self.stability_frames = int(stability_frames) self.cooldown_until = 0.0 self.session_id = 0 self._running = False self._capture_done = False self._capture_result = None self._capture_lock = threading.Lock() self._capture_cancel_event: threading.Event | None = None self.interaction_active = False self.interaction_flag = Path(config.SCRIPTS_DIR) / "interaction_triggered.flag" self.request_photo_flag = Path(config.SCRIPTS_DIR) / "request_photo.flag" self.confirm_yes_flag = Path(config.SCRIPTS_DIR) / "confirm_yes.flag" self.confirm_no_flag = Path(config.SCRIPTS_DIR) / "confirm_no.flag" self.state_file = Path(config.AUTONOMOUS_STATE_FILE) self.confirm_timeout_sec = float(os.environ.get("CONFIRM_TIMEOUT_SEC", "15.0")) self.confirm_reminder_sec = float(os.environ.get("CONFIRM_REMINDER_SEC", "5.0")) self.confirm_guard_sec = float(os.environ.get("CONFIRM_GUARD_SEC", "1.2")) self.session_cooldown_sec = float(os.environ.get("SESSION_COOLDOWN_SEC", "10.0")) self.leave_timeout_sec = float(os.environ.get("VISITOR_LEAVE_TIMEOUT_SEC", "2.5")) self.countdown_lose_subject_sec = float(os.environ.get("COUNTDOWN_LOSE_SUBJECT_SEC", "1.4")) self.capture_finalize_grace_sec = float(os.environ.get("CAPTURE_FINALIZE_GRACE_SEC", "3.0")) self.framing_timeout_sec = float(os.environ.get("FRAMING_TIMEOUT_SEC", "20.0")) self.framing_feedback_interval_sec = float(os.environ.get("FRAMING_FEEDBACK_INTERVAL_SEC", "2.0")) self.framing_good_frames_required = int(os.environ.get("FRAMING_GOOD_FRAMES_REQUIRED", "2")) # Framing thresholds self.center_tolerance = float(os.environ.get("FRAMING_CENTER_TOLERANCE", "0.18")) self.subject_min_area_ratio = float(os.environ.get("FRAMING_MIN_AREA_RATIO", "0.06")) self.subject_max_area_ratio = float(os.environ.get("FRAMING_MAX_AREA_RATIO", "0.55")) self.min_blur_var = float(os.environ.get("FRAMING_MIN_BLUR_VAR", "80.0")) self.min_exposure = float(os.environ.get("FRAMING_MIN_EXPOSURE", "55.0")) self.max_exposure = float(os.environ.get("FRAMING_MAX_EXPOSURE", "200.0")) self.headroom_min_ratio = float(config.read_vision_framing_headroom_min_ratio()) self.headroom_max_ratio = float(config.read_vision_framing_headroom_max_ratio()) self.eye_line_min_ratio = float(config.read_vision_framing_eye_line_min_ratio()) self.eye_line_max_ratio = float(config.read_vision_framing_eye_line_max_ratio()) self.retake_score_threshold = float(config.read_vision_framing_retake_score_threshold()) self.retake_prompt_enabled = bool(config.read_vision_retake_prompt_enabled()) self.retake_limit = int(config.read_vision_retake_max_per_session()) self.hard_target_lock_enabled = bool(config.read_vision_hard_target_lock_enabled()) self.autonomous_greeting_replay_enabled = bool(config.read_vision_autonomous_greeting_replay_enabled()) self.autonomous_greeting_replay_file = self._resolve_replay_path( config.read_vision_autonomous_greeting_replay_file() ) self.autonomous_capture_replay_enabled = bool(config.read_vision_autonomous_capture_replay_enabled()) self.retake_confirm_timeout_sec = float(os.environ.get("RETAKE_CONFIRM_TIMEOUT_SEC", "8.0")) self.yolo_strict_required = bool(config.read_vision_yolo_strict_required()) self.gemini_context_hz = float(config.read_vision_gemini_context_hz()) self.gemini_context_silent = bool(config.read_vision_gemini_context_silent()) self._context_interval_sec = 1.0 / max(0.5, self.gemini_context_hz) self._next_context_ts = 0.0 self.ai_blocked = False self.ai_block_reason = "" self._last_vision_log_signature = None self._last_vision_log_ts = 0.0 self.face_recognition_enabled = bool(config.read_vision_face_recognition_enabled()) self.face_recognition_threshold = float(config.read_vision_face_recognition_threshold()) self.current_person: dict | None = None async def _say(self, voice, text: str): if voice is None: return try: ok = await voice.send_text_prompt_live(text) if not ok: sanad_logger.print_and_log("Voice prompt skipped: Gemini WS not connected.", "warning") except Exception as e: sanad_logger.print_and_log(f"Voice prompt failed: {e}", "warning") async def _say_prompt( self, voice, prompt_key: str, fallback_text: str, *, mode_override: str | None = None, allow_gemini_fallback: bool | None = None, ): if voice is None: return try: if hasattr(voice, "play_prompt_key"): ok = await voice.play_prompt_key( prompt_key, fallback_text=fallback_text, allow_gemini_fallback=allow_gemini_fallback, mode_override=mode_override, ) else: ok = await voice.send_text_prompt_live(fallback_text) if not ok: sanad_logger.print_and_log(f"Voice prompt skipped for {prompt_key}: output unavailable.", "warning") except Exception as e: sanad_logger.print_and_log(f"Voice prompt failed for {prompt_key}: {e}", "warning") async def _say_capture_prompt(self, voice, prompt_key: str, fallback_text: str): await self._say_prompt( voice, prompt_key, fallback_text, mode_override="audio", allow_gemini_fallback=False, ) def _maybe_log_vision_snapshot(self, snapshot: dict, now_ts: float): try: person_count = int(snapshot.get("person_count", 0)) face_count = int(snapshot.get("face_count", 0)) group_detected = bool(snapshot.get("group_detected", False)) group_size = int(snapshot.get("group_size", 0)) subject_visible = bool(snapshot.get("subject_visible", False)) intent_detected = bool(snapshot.get("intent_detected", False)) max_area = int(float(snapshot.get("max_area", 0.0) or 0.0)) depth_m = snapshot.get("depth_m", None) if depth_m is not None: depth_m = round(float(depth_m), 2) sig = ( person_count, face_count, group_detected, group_size, subject_visible, intent_detected, max_area, depth_m, ) active = bool(person_count or face_count or subject_visible or group_detected or intent_detected) if (not active) and self._last_vision_log_signature in (None, sig): return if sig == self._last_vision_log_signature and (now_ts - self._last_vision_log_ts) < 1.0: return self._last_vision_log_signature = sig self._last_vision_log_ts = now_ts sanad_logger.print_and_log( "👁️ Vision: " f"people={person_count} faces={face_count} group={group_detected}({group_size}) " f"visible={subject_visible} intent={intent_detected} area={max_area} depth={depth_m if depth_m is not None else '-'}", "info", ) except Exception as e: record_error("autonomous_manager", "vision_snapshot_log", e) def _set_interaction_active(self, active: bool, voice=None, reason: str = ""): active = bool(active) self.interaction_active = active try: self.interaction_flag.parent.mkdir(parents=True, exist_ok=True) if active: self.interaction_flag.write_text(f"{time.time():.3f} {reason}".strip(), encoding="utf-8") elif self.interaction_flag.exists(): self.interaction_flag.unlink() except Exception as e: record_error("autonomous_manager", "set_interaction_active_file", e, {"active": bool(active)}) if voice is not None and hasattr(voice, "set_audio_gate"): try: idle_voice_listen_enabled = bool(config.read_vision_idle_voice_listen_enabled()) mic_enabled = bool(config.read_gemini_mic_enabled()) runtime_mode = str(config.read_runtime_mode()).strip().lower() if runtime_mode not in ("manual", "ai"): runtime_mode = "manual" if active: if hasattr(voice, "set_passive_listen"): voice.set_passive_listen(False, reason=reason or "interaction active") voice.set_audio_gate(True, reason=reason) else: if runtime_mode != "ai": if hasattr(voice, "set_passive_listen"): voice.set_passive_listen(False, reason=reason or "manual mode") voice.set_audio_gate(mic_enabled, reason=reason or "manual mode") else: if hasattr(voice, "set_passive_listen"): voice.set_passive_listen(mic_enabled and idle_voice_listen_enabled, reason=reason or "idle") voice.set_audio_gate(mic_enabled and idle_voice_listen_enabled, reason=reason or "idle") except Exception as e: record_error("autonomous_manager", "set_interaction_active_audio_gate", e, {"active": bool(active)}) @staticmethod def _clear_flag(path: Path): try: if path.exists(): path.unlink() except Exception as e: record_error("autonomous_manager", "clear_flag", e, {"path": str(path)}) @staticmethod def _consume_flag(path: Path) -> bool: try: if path.exists(): path.unlink() return True except Exception as e: record_error("autonomous_manager", "consume_flag", e, {"path": str(path)}) return False def _consume_request_photo_flag(self) -> bool: return self._consume_flag(self.request_photo_flag) or self._consume_flag(self.confirm_yes_flag) def _consume_direct_request_flag(self) -> bool: return self._consume_flag(self.request_photo_flag) def _consume_no_photo_flag(self) -> bool: return self._consume_flag(self.confirm_no_flag) def _clear_confirmation_flags(self): self._clear_flag(self.request_photo_flag) self._clear_flag(self.confirm_yes_flag) self._clear_flag(self.confirm_no_flag) def _cancel_capture_pipeline(self, reason: str = ""): try: ev = self._capture_cancel_event if ev is not None: ev.set() self._capture_cancel_event = None if reason: sanad_logger.print_and_log(f"Capture pipeline cancelled: {reason}", "warning") except Exception as e: record_error("autonomous_manager", "cancel_capture_pipeline", e, {"reason": reason}) @staticmethod def _resolve_replay_path(path_value: str) -> Path: return config.resolve_replay_path(path_value) def _start_greeting_replay(self, replay): if replay is None or not bool(self.autonomous_greeting_replay_enabled): return replay_file = Path(self.autonomous_greeting_replay_file).resolve() if not replay_file.exists(): sanad_logger.print_and_log(f"⚠️ Greeting replay missing: {replay_file}", "warning") return if bool(getattr(replay, "is_playing", False)): sanad_logger.print_and_log("⚠️ Greeting replay skipped: replay already busy.", "warning") return def _run_greeting(): try: sanad_logger.print_and_log(f"👋 Greeting replay: {replay_file.name}", "info") replay.run(replay_file, config.HOME_FILE, 1.0) except Exception as e: record_error("autonomous_manager", "greeting_replay", e, {"replay_file": str(replay_file)}) Thread(target=_run_greeting, daemon=True).start() def _reset_current_person(self): self.current_person = None def _session_person_label(self) -> str: if not isinstance(self.current_person, dict): return "" return str( self.current_person.get("display_label") or self.current_person.get("display_name") or self.current_person.get("person_id") or "" ).strip() def _current_person_extras(self) -> dict: person = self.current_person if isinstance(self.current_person, dict) else {} return { "recognized_person_id": str(person.get("person_id") or ""), "recognized_person_known": bool(person.get("known_person", False)), "recognized_person_new": bool(person.get("new_person", False)), "recognized_person_label": str( person.get("display_label") or person.get("display_name") or person.get("person_id") or "" ), "recognized_person_match_score": float(person.get("match_score", 0.0) or 0.0), "recognized_person_created_date": str(person.get("created_date") or ""), } def _select_face_box(self, snapshot: dict) -> dict | None: faces = snapshot.get("face_boxes") or [] if not isinstance(faces, list) or not faces: return None subject_box = self._find_subject_box(snapshot) if not isinstance(subject_box, dict): try: return max(faces, key=lambda f: float(f.get("w", 0.0)) * float(f.get("h", 0.0))) except Exception: return faces[0] if faces else None try: sx1 = float(subject_box.get("x", 0.0)) sy1 = float(subject_box.get("y", 0.0)) sx2 = sx1 + max(1.0, float(subject_box.get("w", 1.0))) sy2 = sy1 + max(1.0, float(subject_box.get("h", 1.0))) except Exception: sx1 = sy1 = 0.0 sx2 = sy2 = 0.0 best = None best_overlap = -1.0 for face in faces: try: fx1 = float(face.get("x", 0.0)) fy1 = float(face.get("y", 0.0)) fx2 = fx1 + max(1.0, float(face.get("w", 1.0))) fy2 = fy1 + max(1.0, float(face.get("h", 1.0))) ix1 = max(sx1, fx1) iy1 = max(sy1, fy1) ix2 = min(sx2, fx2) iy2 = min(sy2, fy2) overlap = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1) if overlap > best_overlap: best_overlap = overlap best = face except Exception: continue if best is not None: return best try: return max(faces, key=lambda f: float(f.get("w", 0.0)) * float(f.get("h", 0.0))) except Exception: return faces[0] if faces else None def _identify_person_for_session(self, snapshot: dict, source: str = "vision") -> dict | None: if not bool(self.face_recognition_enabled): self._reset_current_person() return None frame = snapshot.get("frame") if frame is None: self._reset_current_person() return None if bool(snapshot.get("group_detected", False)) or int(snapshot.get("face_count", 0) or 0) > 1: self._reset_current_person() return None face_box = self._select_face_box(snapshot) if face_box is None: self._reset_current_person() return None subject_box = self._find_subject_box(snapshot) try: result = people_registry.recognize_or_enroll( frame, face_box, subject_box=subject_box, threshold=self.face_recognition_threshold, source=source, ) except Exception as e: record_error("autonomous_manager", "identify_person_for_session", e) self._reset_current_person() return None if not isinstance(result, dict) or not bool(result.get("ok", False)): self._reset_current_person() return None self.current_person = result label = self._session_person_label() if bool(result.get("known_person", False)): sanad_logger.print_and_log( f"🧑 Returning guest recognized: {label} (score={float(result.get('match_score', 0.0) or 0.0):.2f})", "info", ) else: sanad_logger.print_and_log(f"🧑 New guest enrolled: {label}", "info") return result def _welcome_prompt_text(self, group_detected: bool) -> str: if group_detected: return ( "Hello everyone, welcome. We will take a photo together. " "Would your group like a photo? Please say yes photo or no photo." ) label = self._session_person_label() if label and bool(self.current_person and self.current_person.get("known_person")): return ( f"Welcome back, {label}. Would you like another photo? " "Please say yes photo or no photo." ) return ( "Hello, welcome. We will take a photo together. " "Would you like a photo? Please say yes photo or no photo." ) def _welcome_prompt_key(self, group_detected: bool) -> str: if group_detected: return "welcome_group" if self._session_person_label() and bool(self.current_person and self.current_person.get("known_person")): return "welcome_returning" return "welcome_single" def _framing_prompt_text(self, group_detected: bool) -> str: if group_detected: return "Great. Please stand with me in front of the camera, stay together in the center, and look at the camera." return "Great. Please stand with me in front of the camera, stay in the center, and look at the camera." @staticmethod def _framing_prompt_key(group_detected: bool) -> str: return "frame_group" if group_detected else "frame_single" @staticmethod def _find_subject_box(snapshot: dict) -> dict | None: subj = snapshot.get("subject_box") if isinstance(subj, dict): return subj boxes = snapshot.get("boxes") or [] if not boxes: return None try: return max(boxes, key=lambda b: float(b.get("w", 0.0)) * float(b.get("h", 0.0))) except Exception: return boxes[0] if boxes else None def _evaluate_framing_quality(self, snapshot: dict) -> tuple[bool, list[str], dict]: frame = snapshot.get("frame") box = self._find_subject_box(snapshot) if frame is None or box is None: return False, ["step into view"], {"reason": "no_frame_or_subject"} try: h, w = frame.shape[:2] x = float(box.get("x", 0.0)) y = float(box.get("y", 0.0)) bw = max(1.0, float(box.get("w", 1.0))) bh = max(1.0, float(box.get("h", 1.0))) area_ratio = (bw * bh) / max(1.0, float(w * h)) cx = x + (bw / 2.0) dx = (cx - (w / 2.0)) / max(1.0, float(w)) centered = abs(dx) <= self.center_tolerance size_ok = self.subject_min_area_ratio <= area_ratio <= self.subject_max_area_ratio gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) blur_var = float(cv2.Laplacian(gray, cv2.CV_64F).var()) exposure = float(gray.mean()) blur_ok = blur_var >= self.min_blur_var exposure_ok = self.min_exposure <= exposure <= self.max_exposure faces = snapshot.get("face_boxes") or [] face_box = None best_overlap = -1.0 for f in faces: try: fx = float(f.get("x", 0.0)) fy = float(f.get("y", 0.0)) fw = max(1.0, float(f.get("w", 1.0))) fh = max(1.0, float(f.get("h", 1.0))) ix1 = max(x, fx) iy1 = max(y, fy) ix2 = min(x + bw, fx + fw) iy2 = min(y + bh, fy + fh) iw = max(0.0, ix2 - ix1) ih = max(0.0, iy2 - iy1) overlap = iw * ih if overlap > best_overlap: best_overlap = overlap face_box = {"x": fx, "y": fy, "w": fw, "h": fh} except Exception: continue if face_box is not None: headroom_ratio = float(face_box["y"]) / max(1.0, float(h)) eye_y = float(face_box["y"]) + (0.38 * float(face_box["h"])) eye_line_ratio = eye_y / max(1.0, float(h)) else: headroom_ratio = y / max(1.0, float(h)) eye_line_ratio = (y + (0.25 * bh)) / max(1.0, float(h)) headroom_ok = self.headroom_min_ratio <= headroom_ratio <= self.headroom_max_ratio eye_line_ok = self.eye_line_min_ratio <= eye_line_ratio <= self.eye_line_max_ratio mid = int(max(1, w // 2)) left_mean = float(gray[:, :mid].mean()) if mid > 0 else exposure right_mean = float(gray[:, mid:].mean()) if (w - mid) > 0 else exposure lr_delta = right_mean - left_mean reasons: list[str] = [] if not centered: reasons.append("move a bit to the center") if not size_ok: reasons.append("come a little closer" if area_ratio < self.subject_min_area_ratio else "step slightly back") if not headroom_ok: if headroom_ratio < self.headroom_min_ratio: reasons.append("lower your chin a little") else: reasons.append("raise your chin a little") if not eye_line_ok: reasons.append("keep your eyes around the middle of the frame") if not blur_ok: reasons.append("hold still for a second") if not exposure_ok: if exposure < self.min_exposure: if abs(lr_delta) > 12.0: reasons.append( "turn slightly toward the brighter side" ) else: reasons.append("face the light") else: reasons.append("avoid strong direct light") metrics = { "area_ratio": area_ratio, "blur_var": blur_var, "exposure": exposure, "center_dx": dx, "centered": centered, "size_ok": size_ok, "blur_ok": blur_ok, "exposure_ok": exposure_ok, "headroom_ratio": headroom_ratio, "eye_line_ratio": eye_line_ratio, "headroom_ok": headroom_ok, "eye_line_ok": eye_line_ok, "left_exposure": left_mean, "right_exposure": right_mean, "lr_exposure_delta": lr_delta, } return len(reasons) == 0, reasons, metrics except Exception as e: return False, ["hold still and face the camera"], {"reason": str(e)} @staticmethod def _framing_guidance_text(reasons: list[str]) -> str: if not reasons: return "Great framing. Hold still." uniq = [] for r in reasons: if r not in uniq: uniq.append(r) joined = ", and ".join(uniq[:2]) return f"Almost ready. Please {joined}." @staticmethod def _quality_score_from_metrics(metrics: dict) -> float: checks = [ bool(metrics.get("centered", False)), bool(metrics.get("size_ok", False)), bool(metrics.get("blur_ok", False)), bool(metrics.get("exposure_ok", False)), bool(metrics.get("headroom_ok", False)), bool(metrics.get("eye_line_ok", False)), ] if not checks: return 0.0 return float(sum(1 for x in checks if x)) / float(len(checks)) def _retake_assessment(self, snapshot: dict, previous_metrics: dict | None = None) -> tuple[bool, str, float, dict]: good, reasons, metrics = self._evaluate_framing_quality(snapshot) if previous_metrics and isinstance(previous_metrics, dict): merged = dict(previous_metrics) merged.update(metrics) metrics = merged score = self._quality_score_from_metrics(metrics) if good and score >= self.retake_score_threshold: return False, "", score, metrics if reasons: reason = reasons[0] else: reason = "framing quality is not optimal" return True, str(reason), score, metrics def _lock_target_on_session_start(self, snapshot: dict): try: if hasattr(self.detector, "set_hard_lock"): self.detector.set_hard_lock(bool(self.hard_target_lock_enabled)) if hasattr(self.detector, "lock_target_from_snapshot"): self.detector.lock_target_from_snapshot( snapshot, lock_group=bool(self.hard_target_lock_enabled and snapshot.get("group_detected", False)), ) elif hasattr(self.detector, "lock_subject_from_snapshot"): self.detector.lock_subject_from_snapshot(snapshot) except Exception as e: record_error("autonomous_manager", "lock_target_on_session_start", e) def _unlock_target(self): try: if hasattr(self.detector, "unlock_target"): self.detector.unlock_target() elif hasattr(self.detector, "unlock_subject"): self.detector.unlock_subject() except Exception as e: record_error("autonomous_manager", "unlock_target", e) def _detector_readiness(self) -> dict: try: return self.detector.readiness(strict_required=self.yolo_strict_required) except Exception as e: record_error("autonomous_manager", "detector_readiness", e) return { "ok": False, "strict_required": bool(self.yolo_strict_required), "configured_backend": "normal", "effective_backend": "normal", "yolo_loaded": False, "person_model_ok": False, "face_model_ok": False, "person_model_path": "", "face_model_path": "", "person_model_error": str(e), "face_model_error": "", "block_reason": f"Detector readiness failed: {e}", } async def _push_vision_context(self, voice, state_name: str, snapshot: dict): if voice is None: return if not hasattr(voice, "send_vision_context_live"): return now = time.time() if now < self._next_context_ts: return self._next_context_ts = now + self._context_interval_sec payload = { "person_count": int(snapshot.get("person_count", 0)), "group_count": int(snapshot.get("group_count", 0)), "group_size": int(snapshot.get("group_size", 0)), "subject_visible": bool(snapshot.get("subject_visible", False)), "depth_m": snapshot.get("depth_m"), "approach_speed_mps": float(snapshot.get("approach_speed_mps", 0.0) or 0.0), "state": str(state_name), "intent_detected": bool(snapshot.get("intent_detected", False)), } try: await voice.send_vision_context_live(payload, silent=bool(self.gemini_context_silent)) except Exception as e: record_error("autonomous_manager", "push_vision_context", e, {"state": state_name}) def _write_runtime_state(self, state_name: str, snapshot: dict, voice=None, extras: dict | None = None): payload = { "state": state_name, "session_id": self.session_id, "interaction_active": bool(self.interaction_active), "intent_detected": bool(snapshot.get("intent_detected", False)), "detector_backend": str(snapshot.get("detector_backend", "normal")), "yolo_runtime": str(snapshot.get("yolo_runtime", "")), "ai_blocked": bool(self.ai_blocked), "ai_block_reason": str(self.ai_block_reason or ""), "person_count": int(snapshot.get("person_count", 0)), "face_count": int(snapshot.get("face_count", 0)), "group_count": int(snapshot.get("group_count", 0)), "group_size": int(snapshot.get("group_size", 0)), "group_detected": bool(snapshot.get("group_detected", False)), "is_close": bool(snapshot.get("is_close", False)), "is_approaching": bool(snapshot.get("is_approaching", False)), "max_area": float(snapshot.get("max_area", 0.0)), "depth_m": snapshot.get("depth_m"), "approach_speed_mps": snapshot.get("approach_speed_mps"), "subject_id": snapshot.get("subject_id"), "subject_visible": bool(snapshot.get("subject_visible", False)), "target_lock_active": bool(snapshot.get("target_lock_active", False)), "target_lock_type": str(snapshot.get("target_lock_type", "") or ""), "target_lock_id": snapshot.get("target_lock_id"), "target_switch_blocked_count": int(snapshot.get("target_switch_blocked_count", 0)), "camera_ok": bool(snapshot.get("camera_ok", False)), "depth_ok": bool(snapshot.get("depth_ok", False)), "camera_restarts": int(snapshot.get("camera_restarts", 0)), "depth_restarts": int(snapshot.get("depth_restarts", 0)), "audio_gate_open": bool(getattr(voice, "audio_gate_open", False)) if voice is not None else None, "ws_connected": bool(getattr(voice, "is_ws_connected", lambda: False)()) if voice is not None else False, "mic_state": "", "speaker_state": "", "cooldown_remaining": max(0.0, self.cooldown_until - time.time()), "time": time.time(), } try: if voice is not None and hasattr(voice, "get_runtime_health"): vh = voice.get_runtime_health() or {} payload["ws_connected"] = bool(vh.get("ws_connected", payload["ws_connected"])) payload["mic_state"] = str(vh.get("mic_state", "") or "") payload["speaker_state"] = str(vh.get("speaker_state", "") or "") except Exception as e: record_error("autonomous_manager", "write_runtime_state_voice_health", e) if extras: payload.update(extras) try: self.state_file.parent.mkdir(parents=True, exist_ok=True) self.state_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") except Exception as e: record_error("autonomous_manager", "write_runtime_state", e, {"state": state_name}) def _start_capture_pipeline(self, replay, timing_info: dict | None = None): self._cancel_capture_pipeline(reason="restart") with self._capture_lock: self._capture_done = False self._capture_result = None timing_info = timing_info or self._resolve_capture_timing(use_replay=bool(self.autonomous_capture_replay_enabled)) default_delay_sec = max(0.0, min(config.PHOTO_DELAY_SEC, config.PHOTO_TOTAL_SEC)) delay_sec = float(timing_info.get("capture_offset_sec") or default_delay_sec) cancel_event = threading.Event() self._capture_cancel_event = cancel_event def _run_pipeline(): try: base_prefix = os.environ.get("PHOTO_PREFIX", "photo") prefix = f"session{self.session_id}_{base_prefix}" run_delay_sec = delay_sec capture_replay_runner = replay if bool(self.autonomous_capture_replay_enabled) else None if capture_replay_runner is not None: replay_wait_deadline = time.time() + 4.0 while ( bool(getattr(capture_replay_runner, "is_playing", False)) and time.time() < replay_wait_deadline and not cancel_event.is_set() ): time.sleep(0.05) if bool(getattr(capture_replay_runner, "is_playing", False)): sanad_logger.print_and_log( "⚠️ AI capture replay still busy; falling back to still photo for this shot.", "warning", ) capture_replay_runner = None run_delay_sec = default_delay_sec if capture_replay_runner is None: sanad_logger.print_and_log("📷 AI capture: still photo mode (no replay during photo).", "info") else: sanad_logger.print_and_log( f"📷 AI capture: replaying {Path(config.REPLAY_FILE).name} during the shot. " f"duration={float(timing_info.get('duration_sec') or 0.0):.3f}s " f"shot_at={run_delay_sec:.3f}s " f"source={timing_info.get('capture_source', 'config_fallback')}", "info", ) res = capture_with_replay_sync( replay_runner=capture_replay_runner, replay_file=config.REPLAY_FILE, home_file=config.HOME_FILE, delay_sec=run_delay_sec, prefix=prefix, speed=1.0, cancel_event=cancel_event, ) with self._capture_lock: self._capture_result = str(res) self._capture_done = True if str(res).startswith("[ERR]"): sanad_logger.print_and_log(f"Capture pipeline failed: {res}", "error") else: sanad_logger.print_and_log(f"Saved photo (pipeline): {res}", "info") except Exception as e: record_error("autonomous_manager", "start_capture_pipeline", e, {"session_id": self.session_id}) with self._capture_lock: self._capture_result = f"[ERR] capture pipeline exception: {e}" self._capture_done = True Thread(target=_run_pipeline, daemon=True).start() def _resolve_capture_timing(self, use_replay: bool) -> dict: default_delay = max(0.0, min(config.PHOTO_DELAY_SEC, config.PHOTO_TOTAL_SEC)) timing_info = { "capture_offset_sec": default_delay, "duration_sec": 0.0, "capture_source": "config_fallback", } if not use_replay: return timing_info try: profile = replay_timing_profile(config.REPLAY_FILE) if bool(profile.get("ok")): return profile if profile.get("capture_offset_sec") is not None: timing_info["capture_offset_sec"] = float(profile.get("capture_offset_sec") or default_delay) except Exception as e: record_error("autonomous_manager", "resolve_capture_timing", e, {"replay_file": str(config.REPLAY_FILE)}) return timing_info async def run(self, hub, replay, voice, ws=None): self.detector.start() self.hub = hub self._running = True self._set_interaction_active(False, voice=voice, reason="idle") state = "IDLE" state_enter_ts = time.time() stable_count = 0 confirm_deadline = 0.0 confirm_last_prompt_ts = 0.0 confirm_ignore_until = 0.0 leave_since = 0.0 framing_deadline = 0.0 framing_last_feedback_ts = 0.0 framing_good_streak = 0 framing_metrics: dict = {} countdown_deadline = 0.0 countdown_announced: set[int] = set() countdown_lost_since = 0.0 retake_count = 0 retake_deadline = 0.0 retake_recommended = False retake_reason = "" retake_score = 1.0 sanad_logger.print_and_log("🤖 Autonomous mode enabled.", "info") try: while self._running: await asyncio.sleep(1.0 / max(1, self.detector.poll_hz)) now = time.time() runtime_mode = "manual" try: runtime_mode = str(config.read_runtime_mode()).strip().lower() except Exception: runtime_mode = "manual" if runtime_mode not in ("manual", "ai"): runtime_mode = "manual" # Runtime-refresh operator toggles from config.json. try: self.hard_target_lock_enabled = bool(config.read_vision_hard_target_lock_enabled()) self.retake_prompt_enabled = bool(config.read_vision_retake_prompt_enabled()) self.retake_limit = int(config.read_vision_retake_max_per_session()) self.retake_score_threshold = float(config.read_vision_framing_retake_score_threshold()) self.autonomous_greeting_replay_enabled = bool(config.read_vision_autonomous_greeting_replay_enabled()) self.autonomous_greeting_replay_file = self._resolve_replay_path( config.read_vision_autonomous_greeting_replay_file() ) self.autonomous_capture_replay_enabled = bool(config.read_vision_autonomous_capture_replay_enabled()) self.face_recognition_enabled = bool(config.read_vision_face_recognition_enabled()) self.face_recognition_threshold = float(config.read_vision_face_recognition_threshold()) except Exception as e: record_error("autonomous_manager", "runtime_option_refresh", e) if hasattr(self.detector, "set_hard_lock"): try: self.detector.set_hard_lock(bool(self.hard_target_lock_enabled)) except Exception as e: record_error("autonomous_manager", "detector_set_hard_lock", e) snap = self.detector.latest() face_count = int(snap.get("face_count", 0)) subject_visible = bool(snap.get("subject_visible", face_count > 0)) intent_detected = bool(snap.get("intent_detected", False)) max_area = float(snap.get("max_area", 0.0)) group_detected = bool(snap.get("group_detected", False)) self._maybe_log_vision_snapshot(snap, now) if runtime_mode != "ai": stable_count = 0 if state != "IDLE": sanad_logger.print_and_log("🛑 Autonomous flow paused: runtime mode is MANUAL.", "info") state = "IDLE" state_enter_ts = now self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="manual mode") self._cancel_capture_pipeline(reason="manual mode") self._unlock_target() self._reset_current_person() self._write_runtime_state( "IDLE", snap, voice=voice, extras={ "stable_count": 0, "runtime_mode": runtime_mode, "autonomous_paused": True, "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_limit": int(self.retake_limit), "retake_count": int(retake_count), **self._current_person_extras(), }, ) continue readiness = self._detector_readiness() if self.yolo_strict_required and (not bool(readiness.get("ok", False))): block_reason = str(readiness.get("block_reason") or "AI blocked by strict YOLO policy.") if (not self.ai_blocked) or (self.ai_block_reason != block_reason): sanad_logger.print_and_log(f"⛔ AI blocked: {block_reason}", "error") self._set_interaction_active(False, voice=voice, reason="strict yolo blocked") self._clear_confirmation_flags() self._cancel_capture_pipeline(reason="strict yolo blocked") self._unlock_target() self._reset_current_person() self.ai_blocked = True self.ai_block_reason = block_reason state = "IDLE_BLOCKED" stable_count = 0 self._write_runtime_state( "IDLE_BLOCKED", snap, voice=voice, extras={ "yolo_loaded": bool(readiness.get("yolo_loaded", False)), "person_model_ok": bool(readiness.get("person_model_ok", False)), "face_model_ok": bool(readiness.get("face_model_ok", False)), "detector_backend": str(readiness.get("effective_backend", "normal")), "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_limit": int(self.retake_limit), "retake_count": int(retake_count), **self._current_person_extras(), }, ) await self._push_vision_context(voice, "IDLE_BLOCKED", snap) continue if self.ai_blocked: sanad_logger.print_and_log("✅ AI readiness restored. Leaving blocked state.", "info") self.ai_blocked = False self.ai_block_reason = "" state = "IDLE" state_enter_ts = now stable_count = 0 await self._push_vision_context(voice, state, snap) try: hard_cancel = False if hub is not None: if getattr(hub, "hard_cancel_combo", None): hard_cancel = bool(hub.hard_cancel_combo()) elif getattr(hub, "combo_r2l1", None): hard_cancel = bool(hub.combo_r2l1()) if hard_cancel: sanad_logger.print_and_log("🛑 HARD CANCEL detected (R2+L1).", "warning") state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="hard cancel") self._cancel_capture_pipeline(reason="hard cancel") self._unlock_target() self._reset_current_person() except Exception as e: record_error("autonomous_manager", "remote_cancel_check", e) if state == "IDLE": self._write_runtime_state( "IDLE", snap, voice=voice, extras={ "stable_count": stable_count, "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_limit": int(self.retake_limit), "retake_count": int(retake_count), **self._current_person_extras(), }, ) # Allow visitor-initiated photo request from IDLE. # Only explicit request_photo should start a new session from idle. if self._consume_direct_request_flag(): self.session_id += 1 state = "FRAMING" state_enter_ts = now framing_deadline = now + self.framing_timeout_sec framing_last_feedback_ts = 0.0 framing_good_streak = 0 framing_metrics = {} retake_count = 0 retake_recommended = False retake_reason = "" retake_score = 1.0 self._clear_confirmation_flags() self._set_interaction_active(True, voice=voice, reason="voice request from idle") self._lock_target_on_session_start(snap) self._identify_person_for_session(snap, source="voice_request") sanad_logger.print_and_log( f"🗣️ Voice photo request from IDLE -> session {self.session_id}", "info", ) await self._say_prompt( voice, self._framing_prompt_key(group_detected), self._framing_prompt_text(group_detected), ) continue if now < self.cooldown_until: continue if intent_detected: stable_count += 1 else: stable_count = 0 if stable_count < self.stability_frames: continue stable_count = 0 self.session_id += 1 state = "WAIT_CONFIRM" state_enter_ts = now confirm_deadline = now + self.confirm_timeout_sec confirm_ignore_until = now + self.confirm_guard_sec confirm_last_prompt_ts = now leave_since = 0.0 framing_metrics = {} retake_count = 0 retake_recommended = False retake_reason = "" retake_score = 1.0 self._clear_confirmation_flags() self._set_interaction_active(True, voice=voice, reason=f"intent max_area={max_area:.0f}") self._lock_target_on_session_start(snap) self._identify_person_for_session(snap, source="intent") sanad_logger.print_and_log( f"🔔 Intent detected (area={max_area:.0f}) -> session {self.session_id}", "info", ) self._start_greeting_replay(replay) try: wake_text = self._welcome_prompt_text(group_detected) await voice.trigger_wake_sequence( wake_text=wake_text, prompt_key=self._welcome_prompt_key(group_detected), ) except Exception as e: record_error("autonomous_manager", "wake_sequence", e) sanad_logger.print_and_log(f"Wake sequence failed: {e}", "warning") continue if state == "WAIT_CONFIRM": confirm_remaining = max(0.0, confirm_deadline - now) self._write_runtime_state( "WAIT_CONFIRM", snap, voice=voice, extras={ "confirm_timeout_remaining": confirm_remaining, "confirm_guard_remaining": max(0.0, confirm_ignore_until - now), "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_limit": int(self.retake_limit), "retake_count": int(retake_count), **self._current_person_extras(), }, ) if not subject_visible: if leave_since <= 0.0: leave_since = now elif (now - leave_since) >= self.leave_timeout_sec: sanad_logger.print_and_log("🙈 Visitor left before confirmation.", "warning") await self._say_prompt( voice, "visitor_left", "No worries. I will wait here for the next visitor.", ) state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="left before confirm") self._cancel_capture_pipeline(reason="left before confirm") self._unlock_target() self._reset_current_person() continue else: leave_since = 0.0 if now >= confirm_ignore_until: if self._consume_no_photo_flag(): await self._say_prompt(voice, "declined", "No problem. We can do it anytime.") state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="visitor declined") self._cancel_capture_pipeline(reason="visitor declined") self._unlock_target() self._reset_current_person() continue if self._consume_request_photo_flag(): sanad_logger.print_and_log("✅ Confirmation received. Entering framing check.", "info") await self._say_prompt( voice, self._framing_prompt_key(group_detected), self._framing_prompt_text(group_detected), ) state = "FRAMING" state_enter_ts = now framing_deadline = now + self.framing_timeout_sec framing_last_feedback_ts = 0.0 framing_good_streak = 0 continue if (now - confirm_last_prompt_ts) >= self.confirm_reminder_sec: confirm_last_prompt_ts = now await self._say_prompt( voice, "confirm_reminder", "Say yes photo to continue, or no photo to cancel.", ) if now >= confirm_deadline: sanad_logger.print_and_log("⌛ Confirmation timeout.", "warning") await self._say_prompt( voice, "confirm_timeout", "No problem. I will wait here. Come back anytime for a photo.", ) state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="confirm timeout") self._cancel_capture_pipeline(reason="confirm timeout") self._unlock_target() self._reset_current_person() continue elif state == "FRAMING": if self._consume_no_photo_flag(): await self._say_prompt(voice, "session_cancelled", "Okay. Session cancelled.") state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="cancelled during framing") self._cancel_capture_pipeline(reason="cancelled during framing") self._unlock_target() self._reset_current_person() continue good, reasons, framing_metrics = self._evaluate_framing_quality(snap) framing_remaining = max(0.0, framing_deadline - now) self._write_runtime_state( "FRAMING", snap, voice=voice, extras={ "framing_ok": bool(good), "framing_reasons": reasons, "framing_timeout_remaining": framing_remaining, "framing_good_streak": framing_good_streak, "framing_metrics": framing_metrics, "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_limit": int(self.retake_limit), "retake_count": int(retake_count), **self._current_person_extras(), }, ) if good: framing_good_streak += 1 else: framing_good_streak = 0 if (now - framing_last_feedback_ts) >= self.framing_feedback_interval_sec: framing_last_feedback_ts = now await self._say(voice, self._framing_guidance_text(reasons)) if framing_good_streak >= self.framing_good_frames_required: capture_timing = self._resolve_capture_timing( use_replay=bool(self.autonomous_capture_replay_enabled) ) capture_start_ts = time.time() self._start_capture_pipeline(replay, timing_info=capture_timing) await self._say_capture_prompt( voice, "countdown_intro", "Look at the camera, stay ready, hold your pose with me, keep still, keep your smile soft, and in a moment I will count down for the photo.", ) state = "COUNTDOWN" state_enter_ts = now countdown_deadline = capture_start_ts + float( capture_timing.get("capture_offset_sec") or max(0.0, min(config.PHOTO_DELAY_SEC, config.PHOTO_TOTAL_SEC)) ) countdown_announced = set() countdown_lost_since = 0.0 continue if now >= framing_deadline: sanad_logger.print_and_log("⌛ Framing timeout.", "warning") await self._say_prompt( voice, "framing_timeout", "I still need a better frame. Please step in front of me and say yes photo when ready.", ) state = "WAIT_CONFIRM" state_enter_ts = now confirm_deadline = now + self.confirm_timeout_sec confirm_ignore_until = now + self.confirm_guard_sec confirm_last_prompt_ts = now leave_since = 0.0 continue elif state == "COUNTDOWN": countdown_remaining = max(0.0, countdown_deadline - now) self._write_runtime_state( "COUNTDOWN", snap, voice=voice, extras={ "countdown_remaining": countdown_remaining, "framing_metrics": framing_metrics, "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_limit": int(self.retake_limit), "retake_count": int(retake_count), **self._current_person_extras(), }, ) if self._consume_no_photo_flag(): await self._say_prompt(voice, "countdown_cancelled", "Countdown cancelled.") state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="cancelled during countdown") self._cancel_capture_pipeline(reason="cancelled during countdown") self._unlock_target() self._reset_current_person() continue if not subject_visible: if countdown_lost_since <= 0.0: countdown_lost_since = now elif (now - countdown_lost_since) >= self.countdown_lose_subject_sec: await self._say_prompt( voice, "lost_from_frame", "I lost you from frame. Let us try again.", ) state = "FRAMING" state_enter_ts = now framing_deadline = now + self.framing_timeout_sec framing_last_feedback_ts = 0.0 framing_good_streak = 0 continue else: countdown_lost_since = 0.0 sec_left = int(round(countdown_remaining)) if sec_left in (3, 2, 1) and sec_left not in countdown_announced: countdown_announced.add(sec_left) await self._say_capture_prompt(voice, f"count_{sec_left}", f"{sec_left}...") elif sec_left <= 0 and 0 not in countdown_announced: countdown_announced.add(0) await self._say_capture_prompt(voice, "smile", "Smile.") with self._capture_lock: capture_done = bool(self._capture_done) capture_result = self._capture_result if capture_done: if isinstance(capture_result, str) and capture_result.startswith("[ERR]"): sanad_logger.print_and_log(f"Capture failed: {capture_result}", "error") state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._set_interaction_active(False, voice=voice, reason="capture failed") self._cancel_capture_pipeline(reason="capture failed") self._unlock_target() self._reset_current_person() else: try: person_id = str((self.current_person or {}).get("person_id") or "").strip() if person_id: people_registry.attach_captured_photo(person_id, str(capture_result)) except Exception as e: record_error("autonomous_manager", "attach_captured_photo", e) retake_recommended, retake_reason, retake_score, framing_metrics = self._retake_assessment( snap, previous_metrics=framing_metrics, ) if ( bool(self.retake_prompt_enabled) and bool(retake_recommended) and int(retake_count) < int(self.retake_limit) ): state = "RETAKE_CONFIRM" state_enter_ts = now retake_deadline = now + self.retake_confirm_timeout_sec reason_txt = retake_reason or "the framing is not optimal" await self._say_prompt( voice, "retake_recommended", f"Photo captured. I recommend a retake because {reason_txt}. " "Say yes photo to retake, or no photo to keep this one.", ) else: state = "COMPLETE" state_enter_ts = now continue if now >= (countdown_deadline + self.capture_finalize_grace_sec): record_error( "autonomous_manager", "countdown_capture_timeout", context={"session_id": self.session_id, "grace_sec": self.capture_finalize_grace_sec}, ) sanad_logger.print_and_log("Capture timeout after countdown. Resetting session.", "error") state = "IDLE" state_enter_ts = now self.cooldown_until = now + self.session_cooldown_sec self._set_interaction_active(False, voice=voice, reason="capture timeout") self._cancel_capture_pipeline(reason="capture timeout") self._unlock_target() self._reset_current_person() continue elif state == "RETAKE_CONFIRM": rem = max(0.0, retake_deadline - now) self._write_runtime_state( "RETAKE_CONFIRM", snap, voice=voice, extras={ "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_recommended": bool(retake_recommended), "retake_reason": str(retake_reason or ""), "retake_score": float(retake_score), "retake_count": int(retake_count), "retake_limit": int(self.retake_limit), "retake_timeout_remaining": rem, "framing_metrics": framing_metrics, **self._current_person_extras(), }, ) if self._consume_no_photo_flag(): state = "COMPLETE" state_enter_ts = now continue if self._consume_request_photo_flag(): if int(retake_count) < int(self.retake_limit): retake_count += 1 await self._say_prompt( voice, "retake_yes", "Great, let us retake. Hold your pose.", ) state = "FRAMING" state_enter_ts = now framing_deadline = now + self.framing_timeout_sec framing_last_feedback_ts = 0.0 framing_good_streak = 0 else: await self._say_prompt( voice, "retake_limit", "Retake limit reached. Keeping the current photo.", ) state = "COMPLETE" state_enter_ts = now continue if now >= retake_deadline: state = "COMPLETE" state_enter_ts = now continue elif state == "COMPLETE": self._write_runtime_state( "COMPLETE", snap, voice=voice, extras={ "capture_result": self._capture_result, "retake_prompt_enabled": bool(self.retake_prompt_enabled), "retake_recommended": bool(retake_recommended), "retake_reason": str(retake_reason or ""), "retake_score": float(retake_score), "retake_count": int(retake_count), "retake_limit": int(self.retake_limit), "framing_metrics": framing_metrics, **self._current_person_extras(), }, ) await self._say_capture_prompt( voice, "photo_saved_thanks", "Thank you. Photo saved. Don't forget to check your photos.", ) sanad_logger.print_and_log(f"✅ Session {self.session_id} complete.", "info") await asyncio.sleep(1.0) state = "IDLE" state_enter_ts = now self.cooldown_until = time.time() + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="session complete") self._cancel_capture_pipeline(reason="session complete") self._unlock_target() self._reset_current_person() continue if (time.time() - state_enter_ts) > 120.0: sanad_logger.print_and_log("Autonomous state timed out, forcing IDLE reset.", "warning") state = "IDLE" state_enter_ts = time.time() self.cooldown_until = time.time() + self.session_cooldown_sec self._clear_confirmation_flags() self._set_interaction_active(False, voice=voice, reason="state watchdog reset") self._cancel_capture_pipeline(reason="state watchdog reset") self._unlock_target() self._reset_current_person() finally: self._set_interaction_active(False, voice=voice, reason="autonomous stop") self._cancel_capture_pipeline(reason="autonomous stop") self._unlock_target() self._reset_current_person() self.detector.stop() def stop(self): self._running = False self.detector.stop() if __name__ == "__main__": async def main(): am = AutonomousManager() class Stub: pass await am.run(Stub(), Stub(), Stub(), None) try: asyncio.run(main()) except KeyboardInterrupt: pass