""" patrol.py — Autonomous patrol loop with YOLO PPE detection + LLaVA scene assessment """ import time from API.zmq_api import gradual_stop from API.camera_api import get_frame from API.yolo_api import yolo_ppe_violations, yolo_person_too_close, yolo_summary from API.llava_api import ask_patrol from API.memory_api import mem, log_detection from Brain.executor import move_step from Core.config_loader import load_config from Core.motion_state import motion_abort # Persist patrol events to logs/navigation.log so a long unattended patrol # session leaves a usable post-mortem audit (start/finish, every step's # observation, every PPE violation alert, every proximity pause). try: from Core.logger import log as _core_log except Exception: _core_log = None def _plog(msg: str, level: str = "info") -> None: """Print and persist to logs/navigation.log.""" print(f" [Patrol] {msg}") if _core_log is not None: try: _core_log(f"[Patrol] {msg}", level, "navigation") except Exception: pass _cfg = load_config("Patrol") DEFAULT_DURATION = _cfg["default_duration_minutes"] PROXIMITY_THRESH = _cfg["proximity_threshold"] PROXIMITY_PAUSE = _cfg["proximity_pause_s"] def patrol(duration_minutes: float = 0.0, alert_callback=None): """ Timed patrol loop. Each iteration: 1. YOLO PPE violation check -> log + alert 2. LLaVA scene assessment -> decide next move 3. Proximity safety stop -> pause if someone is too close 4. Execute move_step Parameters ---------- duration_minutes : float How long to patrol (default from config). alert_callback : callable, optional Called with (alert_text: str) whenever an alert fires. """ if duration_minutes <= 0: duration_minutes = DEFAULT_DURATION end_time = time.time() + duration_minutes * 60.0 step = 0 _plog(f"Starting {duration_minutes:.1f}-minute patrol") try: while time.time() < end_time: # User said 'stop' (or any other interrupt). Bail out cleanly # — gradual_stop is in the finally block, so the robot # decelerates safely. The patrol's outer 'duration' is just # the upper bound; honouring abort means we exit early. if motion_abort.is_set(): _plog("motion_abort set — ending patrol early", "warn") break step += 1 # ----- 1. YOLO PPE violations ----- violations = yolo_ppe_violations() if violations: for v in violations: alert_text = f"PPE violation: {v}" _plog(alert_text, "warn") log_detection(v, position="patrol", distance="") if alert_callback: alert_callback(alert_text) # ----- 2. LLaVA scene assessment ----- img_b64 = get_frame() if img_b64: d = ask_patrol(img_b64) else: d = {"observation": "no frame", "alert": None, "next_move": "forward", "duration": 1.0} observation = d.get("observation", "") alert = d.get("alert") next_move = d.get("next_move", "forward") duration = float(d.get("duration", 1.0)) if observation: _plog(f"step {step}: {observation}") if alert: alert_text = f"Alert: {alert}" _plog(alert_text, "warn") if alert_callback: alert_callback(alert_text) # Log interesting detections to memory if mem and observation and "person" in observation.lower(): log_detection("person", position="patrol", distance="") # ----- 3. Proximity safety stop (YOLO + LiDAR) ----- lidar_blocked = False try: from API.lidar_api import obstacle_ahead lidar_blocked = obstacle_ahead() except ImportError: pass if yolo_person_too_close(threshold=PROXIMITY_THRESH) or lidar_blocked: reason = "LiDAR obstacle" if lidar_blocked else "Person too close" _plog(f"{reason} — pausing {PROXIMITY_PAUSE}s", "warn") gradual_stop() time.sleep(PROXIMITY_PAUSE) continue # ----- 4. Execute movement ----- if next_move and next_move != "stop": move_step(next_move, duration) else: gradual_stop() time.sleep(0.5) except KeyboardInterrupt: _plog("Interrupted", "warn") finally: gradual_stop() _plog(f"Finished after {step} steps") return step