"""vision_detector.py YOLO detector + tracker + intent: - YOLO ONNX person detector for subject tracking/intent. - YOLO ONNX face detector (optional) for face metrics. - Group detection from person clusters. - Intent uses RealSense depth first; bbox area fallback. """ from collections import deque from threading import Event, Lock, Thread import logging import os import time from pathlib import Path import cv2 import numpy as np from Modes.AI.camera_module import CameraCapture from Core import settings as config from Core.error_events import record_error from Server import direct_camera_client try: import pyrealsense2 as rs # type: ignore _HAS_RS = True except Exception: _HAS_RS = False try: from ultralytics import YOLO # type: ignore _HAS_ULTRALYTICS = True except Exception: YOLO = None # type: ignore _HAS_ULTRALYTICS = False logger = logging.getLogger(__name__) def _coerce_backend_name(value: str | None) -> str: v = str(value or "").strip().lower() if v in ("yolo", "normal"): return v return "normal" def _coerce_yolo_runtime(value: str | None) -> str: v = str(value or "").strip().lower() if v in ("ultralytics", "opencv"): return v return "ultralytics" def _resolve_model_path(path_value: str | None) -> Path | None: if not path_value: return None p = Path(str(path_value)).expanduser() if not p.is_absolute(): p = (config.PROJECT_ROOT / p).resolve() return p def _probe_onnx_model(path_value: str | None) -> dict: out = {"path": "", "exists": False, "loadable": False, "error": ""} p = _resolve_model_path(path_value) if p is None: return out out["path"] = str(p) if not p.exists() or (not p.is_file()): out["error"] = "model file missing" return out out["exists"] = True try: cv2.dnn.readNetFromONNX(str(p)) out["loadable"] = True except Exception as e: out["error"] = str(e) return out def _probe_ultralytics_model(path_value: str | None) -> dict: out = {"path": "", "exists": False, "loadable": False, "error": ""} p = _resolve_model_path(path_value) if p is None: return out out["path"] = str(p) if not p.exists() or (not p.is_file()): out["error"] = "model file missing" return out out["exists"] = True if not _HAS_ULTRALYTICS: out["error"] = "ultralytics not installed" return out try: YOLO(str(p)) out["loadable"] = True except Exception as e: out["error"] = str(e) return out def probe_ai_readiness(backend: str | None = None, strict_required: bool | None = None) -> dict: configured_backend = _coerce_backend_name( backend if backend is not None else config.read_vision_detector_backend() ) yolo_runtime = _coerce_yolo_runtime(config.read_vision_yolo_runtime()) strict = bool(config.read_vision_yolo_strict_required() if strict_required is None else strict_required) person_path = ( os.environ.get("DETECTOR_PERSON_YOLO_ONNX", "").strip() or os.environ.get("DETECTOR_ONNX_PATH", "").strip() or str(getattr(config, "VISION_PERSON_YOLO_ONNX", "")).strip() ) face_path = ( os.environ.get("DETECTOR_FACE_YOLO_ONNX", "").strip() or str(getattr(config, "VISION_FACE_YOLO_ONNX", "")).strip() ) if yolo_runtime == "ultralytics": person = _probe_ultralytics_model(person_path) face = _probe_ultralytics_model(face_path) else: person = _probe_onnx_model(person_path) face = _probe_onnx_model(face_path) person_ok = bool(person.get("loadable")) face_ok = bool(face.get("loadable")) yolo_loaded = bool(configured_backend == "yolo" and person_ok) block_reason = "" if strict and configured_backend != "yolo": block_reason = "Strict AI mode requires detection_backend=yolo." elif strict and yolo_runtime == "ultralytics" and not _HAS_ULTRALYTICS: block_reason = "Ultralytics runtime selected but package is not installed." elif strict and not person_ok: block_reason = "Person YOLO model is missing/invalid. Fix vision.person_yolo_onnx." ok = not bool(block_reason) return { "ok": bool(ok), "strict_required": bool(strict), "configured_backend": configured_backend, "yolo_runtime": yolo_runtime, "effective_backend": "yolo" if yolo_loaded else "normal", "yolo_loaded": bool(yolo_loaded), "person_model_ok": bool(person_ok), "face_model_ok": bool(face_ok), "person_model_path": str(person.get("path") or ""), "face_model_path": str(face.get("path") or ""), "person_model_error": str(person.get("error") or ""), "face_model_error": str(face.get("error") or ""), "block_reason": block_reason, } class VisionDetector: def __init__(self, zmq_host="127.0.0.1", zmq_port=55555, poll_hz=10, video_source: str | None = None): self.zmq_host = zmq_host self.zmq_port = int(zmq_port) self.poll_hz = float(poll_hz) self.video_source = video_source self._stop = Event() self._thread = None self._latest_lock = Lock() self._latest = { "frame_time": 0.0, "detector_backend": "normal", "yolo_runtime": "ultralytics", "face_count": 0, "person_count": 0, "group_count": 0, "group_size": 0, "group_detected": False, "boxes": [], "face_boxes": [], "group_boxes": [], "frame": None, "intent_detected": False, "max_area": 0.0, "is_close": False, "is_approaching": False, "depth_m": None, "approach_speed_mps": 0.0, "subject_id": None, "subject_box": None, "subject_visible": False, "subject_locked": False, "target_lock_active": False, "target_lock_type": "", "target_lock_id": None, "target_switch_blocked_count": 0, "camera_ok": False, "depth_ok": False, "camera_restarts": 0, "depth_restarts": 0, } # Runtime-selectable detector backend self._detector_backend = self._coerce_backend( os.environ.get("DETECTOR_BACKEND", "").strip() or config.read_vision_detector_backend() ) self._last_backend_check_ts = 0.0 # YOLO detection config self._yolo_runtime = _coerce_yolo_runtime( os.environ.get("DETECTOR_YOLO_RUNTIME", "").strip() or config.read_vision_yolo_runtime() ) self._ultralytics_device = ( os.environ.get("DETECTOR_YOLO_DEVICE", "").strip() or config.read_vision_yolo_ultralytics_device() ) self._person_model_path = str( _resolve_model_path( os.environ.get("DETECTOR_PERSON_YOLO_ONNX", "").strip() or os.environ.get("DETECTOR_ONNX_PATH", "").strip() or str(getattr(config, "VISION_PERSON_YOLO_ONNX", "")).strip() or "" ) or "" ).strip() self._face_model_path = str( _resolve_model_path( os.environ.get("DETECTOR_FACE_YOLO_ONNX", "").strip() or str(getattr(config, "VISION_FACE_YOLO_ONNX", "")).strip() or "" ) or "" ).strip() input_size = int(max(320, getattr(config, "VISION_INPUT_SIZE", 640))) self._dnn_input_size = (input_size, input_size) self._person_score_thresh = float(getattr(config, "VISION_PERSON_SCORE_THRESH", 0.35)) self._face_score_thresh = float(getattr(config, "VISION_FACE_SCORE_THRESH", 0.35)) self._nms_iou_thresh = float(getattr(config, "VISION_NMS_IOU_THRESH", 0.45)) self._person_class_id = int(getattr(config, "VISION_PERSON_CLASS_ID", 0)) self._group_min_people = max(2, int(getattr(config, "VISION_GROUP_MIN_PEOPLE", 3))) self._group_link_distance_px = float(getattr(config, "VISION_GROUP_LINK_DISTANCE_PX", 220.0)) self._person_net = None self._face_net = None self._warned_person_fallback = False self._warned_yolo_unavailable = False self._warned_ultralytics_missing = False self._last_model_retry_ts = 0.0 self._model_retry_sec = float(os.environ.get("YOLO_MODEL_RETRY_SEC", "2.0")) # Area-based fallback intent self._proximity_threshold = float(os.environ.get("PROXIMITY_THRESHOLD", "50000")) self._approach_delta = float(os.environ.get("APPROACH_DELTA", "12000")) self._history_len = max(2, int(os.environ.get("INTENT_HISTORY_LEN", "5"))) self._area_history = deque(maxlen=self._history_len) # Depth-first intent self._min_depth_m = float(os.environ.get("MIN_DEPTH_M", "1.7")) self._min_approach_speed_mps = float(os.environ.get("MIN_APPROACH_SPEED_MPS", "0.08")) self._depth_history_len = max(2, int(os.environ.get("DEPTH_HISTORY_LEN", "6"))) self._depth_history = deque(maxlen=self._depth_history_len) # Subject lock self._hard_target_lock_enabled = bool(config.read_vision_hard_target_lock_enabled()) self._locked_target_type = "" self._locked_subject_id = None self._locked_group_id = None self._target_switch_blocked_count = 0 self._subject_lock_lost_frames = max(1, int(os.environ.get("SUBJECT_LOCK_LOST_FRAMES", "20"))) self._subject_lost_count = 0 # Camera/depth watchdog self._no_frame_count = 0 self._recover_after_no_frame = max(3, int(os.environ.get("CAMERA_RECOVER_AFTER_FRAMES", "10"))) self._camera_ok = False self._depth_ok = False self._camera_restarts = 0 self._depth_restarts = 0 self._depth_enabled = os.environ.get("DETECTOR_USE_REALSENSE_DEPTH", "1").strip().lower() not in ( "0", "false", "no", "off", ) self._rs_pipeline = None self._rs_profile = None self._last_rs_restart = 0.0 self._apply_detector_backend(self._detector_backend, reason="init") self._latest["detector_backend"] = self._effective_backend() self._latest["yolo_runtime"] = self._yolo_runtime self._face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml") self._camera = CameraCapture() try: self._camera.initialize() except Exception: logger.warning("VisionDetector: could not init CameraCapture fallback") self._video_cap = None if self.video_source: try: self._video_cap = cv2.VideoCapture(self.video_source) if not self._video_cap.isOpened(): logger.warning(f"VisionDetector: could not open video source {self.video_source}") self._video_cap = None else: logger.info(f"VisionDetector: using video source {self.video_source} for simulation") except Exception as e: logger.warning(f"VisionDetector: video source init failed: {e}") self._ensure_depth_pipeline(force=False) @staticmethod def _coerce_backend(value: str | None) -> str: return _coerce_backend_name(value) def _effective_backend(self) -> str: if self._detector_backend == "yolo" and self._person_net is not None: return "yolo" return "normal" def _build_readiness(self, strict_required: bool | None = None) -> dict: strict = bool(config.read_vision_yolo_strict_required() if strict_required is None else strict_required) configured_backend = self._coerce_backend(self._detector_backend) yolo_runtime = _coerce_yolo_runtime(self._yolo_runtime) if yolo_runtime == "ultralytics": person_model_ok = bool(self._person_net is not None and hasattr(self._person_net, "predict")) face_model_ok = bool(self._face_net is not None and hasattr(self._face_net, "predict")) if self._face_model_path else False else: person_model_ok = bool(self._person_net is not None and hasattr(self._person_net, "setInput")) face_model_ok = bool(self._face_net is not None and hasattr(self._face_net, "setInput")) if self._face_model_path else False yolo_loaded = bool(configured_backend == "yolo" and person_model_ok) block_reason = "" if strict and configured_backend != "yolo": block_reason = "Strict AI mode requires detection_backend=yolo." elif strict and yolo_runtime == "ultralytics" and not _HAS_ULTRALYTICS: block_reason = "Ultralytics runtime selected but package is not installed." elif strict and not person_model_ok: block_reason = "Person YOLO model is missing/invalid. Fix vision.person_yolo_onnx." return { "ok": not bool(block_reason), "strict_required": bool(strict), "configured_backend": configured_backend, "yolo_runtime": yolo_runtime, "effective_backend": self._effective_backend(), "yolo_loaded": bool(yolo_loaded), "person_model_ok": bool(person_model_ok), "face_model_ok": bool(face_model_ok), "person_model_path": str(self._person_model_path or ""), "face_model_path": str(self._face_model_path or ""), "person_model_error": "" if person_model_ok else "model not loaded", "face_model_error": "" if (face_model_ok or (not self._face_model_path)) else "model not loaded", "block_reason": block_reason, } def readiness(self, strict_required: bool | None = None) -> dict: return self._build_readiness(strict_required=strict_required) @classmethod def probe_readiness(cls, backend: str | None = None, strict_required: bool | None = None) -> dict: return probe_ai_readiness(backend=backend, strict_required=strict_required) def _maybe_retry_yolo_models(self, now_ts: float): if self._detector_backend != "yolo": return if self._person_net is not None and (self._face_net is not None or not self._face_model_path): return if (now_ts - self._last_model_retry_ts) < max(0.5, self._model_retry_sec): return self._last_model_retry_ts = now_ts if self._yolo_runtime == "ultralytics": if self._person_net is None: self._person_net = self._load_ultralytics_model(self._person_model_path, label="person") if self._face_net is None and self._face_model_path: self._face_net = self._load_ultralytics_model(self._face_model_path, label="face") else: if self._person_net is None: self._person_net = self._load_onnx_model(self._person_model_path, label="person") if self._face_net is None and self._face_model_path: self._face_net = self._load_onnx_model(self._face_model_path, label="face") def _apply_detector_backend(self, backend: str, reason: str = ""): backend = self._coerce_backend(backend) prev_runtime = self._yolo_runtime self._detector_backend = backend self._warned_person_fallback = False self._warned_yolo_unavailable = False self._warned_ultralytics_missing = False try: self._yolo_runtime = _coerce_yolo_runtime(config.read_vision_yolo_runtime()) except Exception: self._yolo_runtime = _coerce_yolo_runtime(self._yolo_runtime) self._yolo_runtime = _coerce_yolo_runtime( os.environ.get("DETECTOR_YOLO_RUNTIME", "").strip() or self._yolo_runtime ) if self._yolo_runtime != prev_runtime: self._person_net = None self._face_net = None if backend == "yolo": if self._yolo_runtime == "ultralytics": if self._person_net is None: self._person_net = self._load_ultralytics_model(self._person_model_path, label="person") if self._face_net is None and self._face_model_path: self._face_net = self._load_ultralytics_model(self._face_model_path, label="face") else: if self._person_net is None: self._person_net = self._load_onnx_model(self._person_model_path, label="person") if self._face_net is None and self._face_model_path: self._face_net = self._load_onnx_model(self._face_model_path, label="face") if self._person_net is None: logger.warning( f"VisionDetector: YOLO backend selected ({self._yolo_runtime}) but person model unavailable. Falling back to normal." ) elif self._face_net is None and self._face_model_path: logger.info("VisionDetector: YOLO person active, face model missing -> Haar face fallback.") else: # Force non-AI normal mode. self._person_net = None self._face_net = None suffix = f" ({reason})" if reason else "" logger.info(f"VisionDetector: detector backend set to {self._effective_backend()} [runtime={self._yolo_runtime}]{suffix}") def _refresh_detector_backend(self, now_ts: float): if (now_ts - self._last_backend_check_ts) < 1.0: self._maybe_retry_yolo_models(now_ts) return self._last_backend_check_ts = now_ts try: self._hard_target_lock_enabled = bool(config.read_vision_hard_target_lock_enabled()) except Exception: pass try: cfg_backend = self._coerce_backend(config.read_vision_detector_backend()) except Exception: cfg_backend = self._detector_backend try: cfg_runtime = _coerce_yolo_runtime(config.read_vision_yolo_runtime()) except Exception: cfg_runtime = self._yolo_runtime if cfg_backend != self._detector_backend or cfg_runtime != self._yolo_runtime: self._apply_detector_backend(cfg_backend, reason="runtime switch") else: self._maybe_retry_yolo_models(now_ts) @staticmethod def _load_onnx_model(path: str | None, label: str): if not path: return None try: net = cv2.dnn.readNetFromONNX(str(path)) net.setPreferableBackend(cv2.dnn.DNN_BACKEND_DEFAULT) net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) logger.info(f"VisionDetector: loaded {label} YOLO ONNX model: {path}") return net except Exception as e: logger.warning(f"VisionDetector: failed to load {label} model {path}: {e}") return None def _load_ultralytics_model(self, path: str | None, label: str): if not path: return None if not _HAS_ULTRALYTICS: if not self._warned_ultralytics_missing: self._warned_ultralytics_missing = True logger.warning("VisionDetector: ultralytics package not installed; cannot use ultralytics runtime.") return None try: model = YOLO(str(path)) logger.info(f"VisionDetector: loaded {label} Ultralytics model: {path}") return model except Exception as e: logger.warning(f"VisionDetector: failed to load {label} ultralytics model {path}: {e}") return None def _run_yolo_ultralytics(self, model, frame, conf_thresh: float, class_filter: int | None = None): if model is None or frame is None: return [] try: results = model.predict( source=frame, conf=float(conf_thresh), iou=float(self._nms_iou_thresh), imgsz=int(self._dnn_input_size[0]), device=str(self._ultralytics_device), verbose=False, ) if not results: return [] res = results[0] boxes_obj = getattr(res, "boxes", None) if boxes_obj is None: return [] xyxy = boxes_obj.xyxy confs = boxes_obj.conf classes = boxes_obj.cls if xyxy is None or confs is None or classes is None: return [] try: xyxy_np = xyxy.cpu().numpy() conf_np = confs.cpu().numpy() cls_np = classes.cpu().numpy() except Exception: xyxy_np = np.asarray(xyxy) conf_np = np.asarray(confs) cls_np = np.asarray(classes) out = [] for i in range(len(xyxy_np)): cls_id = int(round(float(cls_np[i]))) if class_filter is not None and cls_id != int(class_filter): continue x1, y1, x2, y2 = [float(v) for v in xyxy_np[i][:4]] x = int(round(max(0.0, x1))) y = int(round(max(0.0, y1))) w = int(round(max(0.0, x2 - x1))) h = int(round(max(0.0, y2 - y1))) if w < 4 or h < 4: continue out.append( { "x": x, "y": y, "w": w, "h": h, "conf": float(conf_np[i]), "class_id": cls_id, } ) return out except Exception as e: record_error("vision_detector", "run_yolo_ultralytics", e) return [] @staticmethod def _prepare_output_rows(output) -> np.ndarray: if isinstance(output, (list, tuple)): chunks = [] for out in output: rows = VisionDetector._prepare_output_rows(out) if rows.size > 0: chunks.append(rows) if not chunks: return np.empty((0, 0), dtype=np.float32) try: return np.concatenate(chunks, axis=0) except Exception: return chunks[0] arr = np.asarray(output) if arr.size == 0: return np.empty((0, 0), dtype=np.float32) if arr.ndim >= 4: arr = arr.reshape(arr.shape[0], -1, arr.shape[-1]) if arr.ndim == 3: if arr.shape[0] == 1: arr = arr[0] else: arr = arr.reshape(-1, arr.shape[-1]) elif arr.ndim == 2: pass elif arr.ndim == 1: arr = arr.reshape(1, -1) else: arr = arr.reshape(-1, 1) if arr.ndim != 2: return np.empty((0, 0), dtype=np.float32) # Typical YOLO ONNX output can be [84, 8400], transpose to [8400, 84]. if arr.shape[0] < arr.shape[1] and arr.shape[1] > 64 and arr.shape[0] <= 256: arr = arr.T return arr.astype(np.float32, copy=False) @staticmethod def _decode_conf_and_class(row: np.ndarray) -> tuple[float, int] | None: n = int(row.shape[0]) if n < 5: return None # End-to-end format: [x1, y1, x2, y2, conf, class_id] if ( n == 6 and row[2] > row[0] and row[3] > row[1] and (row[2] > 2.0 or row[3] > 2.0 or row[0] < 0.0 or row[1] < 0.0) and 0.0 <= float(row[4]) <= 1.0 and abs(float(row[5]) - round(float(row[5]))) <= 1e-3 ): return float(row[4]), int(round(float(row[5]))) obj = max(0.0, float(row[4])) if n <= 5: return obj, 0 class_scores = row[5:] cls_id = int(np.argmax(class_scores)) cls_score = max(0.0, float(class_scores[cls_id])) if obj <= 1.5: conf = obj * cls_score else: conf = cls_score return float(conf), cls_id @staticmethod def _coords_to_xywh(coords: np.ndarray, frame_w: int, frame_h: int) -> tuple[int, int, int, int] | None: if coords.shape[0] < 4: return None a, b, c, d = (float(coords[0]), float(coords[1]), float(coords[2]), float(coords[3])) max_abs = max(abs(a), abs(b), abs(c), abs(d)) # xyxy-style if c > a and d > b: if max_abs <= 2.0: x1, y1 = a * frame_w, b * frame_h x2, y2 = c * frame_w, d * frame_h else: x1, y1, x2, y2 = a, b, c, d else: # cx, cy, w, h style (YOLO default) if max_abs <= 2.0: cx, cy = a * frame_w, b * frame_h bw, bh = c * frame_w, d * frame_h else: cx, cy = a, b bw, bh = c, d x1 = cx - (bw / 2.0) y1 = cy - (bh / 2.0) x2 = cx + (bw / 2.0) y2 = cy + (bh / 2.0) x1_i = max(0, min(frame_w - 1, int(round(x1)))) y1_i = max(0, min(frame_h - 1, int(round(y1)))) x2_i = max(0, min(frame_w, int(round(x2)))) y2_i = max(0, min(frame_h, int(round(y2)))) w_i = max(0, x2_i - x1_i) h_i = max(0, y2_i - y1_i) if w_i < 4 or h_i < 4: return None return x1_i, y1_i, w_i, h_i def _decode_yolo_output(self, output, frame_w: int, frame_h: int, conf_thresh: float, class_filter: int | None): rows = self._prepare_output_rows(output) if rows.size == 0: return [] candidates = [] rects = [] confs = [] for row in rows: conf_and_class = self._decode_conf_and_class(row) if conf_and_class is None: continue conf, class_id = conf_and_class if conf < conf_thresh: continue if class_filter is not None and int(class_id) != int(class_filter): continue xywh = self._coords_to_xywh(row[:4], frame_w=frame_w, frame_h=frame_h) if xywh is None: continue x, y, w, h = xywh candidates.append({"x": x, "y": y, "w": w, "h": h, "conf": float(conf), "class_id": int(class_id)}) rects.append([int(x), int(y), int(w), int(h)]) confs.append(float(conf)) if not candidates: return [] try: idxs = cv2.dnn.NMSBoxes(rects, confs, conf_thresh, self._nms_iou_thresh) except Exception: idxs = None if idxs is None or len(idxs) == 0: return candidates if isinstance(idxs, np.ndarray): keep = [int(i) for i in idxs.flatten().tolist()] else: keep = [] for i in idxs: if isinstance(i, (list, tuple, np.ndarray)): keep.append(int(i[0])) else: keep.append(int(i)) filtered = [] for idx in keep: if 0 <= idx < len(candidates): filtered.append(candidates[idx]) return filtered def _run_yolo_onnx(self, net, frame, conf_thresh: float, class_filter: int | None = None): if net is None or frame is None: return [] try: blob = cv2.dnn.blobFromImage(frame, 1.0 / 255.0, self._dnn_input_size, swapRB=True, crop=False) net.setInput(blob) output = None try: names = net.getUnconnectedOutLayersNames() if names: output = net.forward(names) except Exception: output = None if output is None: output = net.forward() h, w = frame.shape[:2] return self._decode_yolo_output(output, frame_w=w, frame_h=h, conf_thresh=conf_thresh, class_filter=class_filter) except Exception as e: record_error("vision_detector", "run_yolo_onnx", e) return [] @staticmethod def _bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float: ax, ay, aw, ah = a bx, by, bw, bh = b ax2, ay2 = ax + aw, ay + ah bx2, by2 = bx + bw, by + bh ix1 = max(ax, bx) iy1 = max(ay, by) ix2 = min(ax2, bx2) iy2 = min(ay2, by2) iw = max(0, ix2 - ix1) ih = max(0, iy2 - iy1) inter = float(iw * ih) if inter <= 0.0: return 0.0 union = float(aw * ah + bw * bh) - inter if union <= 0.0: return 0.0 return inter / union def _detect_person_boxes(self, frame, tracker): if self._detector_backend == "yolo" and self._person_net is not None: if self._yolo_runtime == "ultralytics": raw = self._run_yolo_ultralytics( self._person_net, frame, conf_thresh=self._person_score_thresh, class_filter=self._person_class_id, ) else: raw = self._run_yolo_onnx( self._person_net, frame, conf_thresh=self._person_score_thresh, class_filter=self._person_class_id, ) boxes_raw = [(int(d["x"]), int(d["y"]), int(d["w"]), int(d["h"])) for d in raw] assigned = tracker.update(boxes_raw) boxes = [] for (cid, box) in assigned: best_conf = 0.0 for det in raw: dbox = (int(det["x"]), int(det["y"]), int(det["w"]), int(det["h"])) iou = self._bbox_iou(tuple(box), dbox) if iou > 0.5: best_conf = max(best_conf, float(det.get("conf", 0.0))) boxes.append( { "x": int(box[0]), "y": int(box[1]), "w": int(box[2]), "h": int(box[3]), "id": int(cid), "conf": float(best_conf), } ) return boxes if self._detector_backend == "yolo" and self._person_net is None and not self._warned_yolo_unavailable: self._warned_yolo_unavailable = True logger.warning("VisionDetector: YOLO person model unavailable at runtime; using normal detector.") if not self._warned_person_fallback: self._warned_person_fallback = True logger.warning("VisionDetector: person fallback enabled (Haar face boxes used as person proxies).") gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = self._face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30)) boxes_raw = [tuple(map(int, f)) for f in faces] assigned = tracker.update(boxes_raw) return [ {"x": int(b[0]), "y": int(b[1]), "w": int(b[2]), "h": int(b[3]), "id": int(i), "conf": 1.0} for (i, b) in assigned ] def _detect_face_boxes(self, frame): if self._detector_backend == "yolo" and self._face_net is not None: if self._yolo_runtime == "ultralytics": raw = self._run_yolo_ultralytics( self._face_net, frame, conf_thresh=self._face_score_thresh, class_filter=None, ) else: raw = self._run_yolo_onnx(self._face_net, frame, conf_thresh=self._face_score_thresh, class_filter=None) return [ { "x": int(d["x"]), "y": int(d["y"]), "w": int(d["w"]), "h": int(d["h"]), "conf": float(d.get("conf", 0.0)), } for d in raw ] gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = self._face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30)) return [{"x": int(x), "y": int(y), "w": int(w), "h": int(h), "conf": 1.0} for (x, y, w, h) in faces] def _detect_groups(self, person_boxes): if len(person_boxes) < self._group_min_people: return [] centers = [] for b in person_boxes: cx = float(b.get("x", 0.0)) + (float(b.get("w", 0.0)) / 2.0) cy = float(b.get("y", 0.0)) + (float(b.get("h", 0.0)) / 2.0) centers.append((cx, cy)) n = len(person_boxes) visited = set() components = [] link_dist_sq = float(self._group_link_distance_px) * float(self._group_link_distance_px) for i in range(n): if i in visited: continue stack = [i] comp = [] while stack: cur = stack.pop() if cur in visited: continue visited.add(cur) comp.append(cur) cx, cy = centers[cur] for j in range(n): if j in visited: continue dx = cx - centers[j][0] dy = cy - centers[j][1] if (dx * dx + dy * dy) <= link_dist_sq: stack.append(j) components.append(comp) groups = [] gid = 1 for comp in components: if len(comp) < self._group_min_people: continue xs = [int(person_boxes[k]["x"]) for k in comp] ys = [int(person_boxes[k]["y"]) for k in comp] x2s = [int(person_boxes[k]["x"] + person_boxes[k]["w"]) for k in comp] y2s = [int(person_boxes[k]["y"] + person_boxes[k]["h"]) for k in comp] x1 = min(xs) y1 = min(ys) x2 = max(x2s) y2 = max(y2s) groups.append( { "id": gid, "size": int(len(comp)), "members": [int(person_boxes[k].get("id", 0)) for k in comp], "x": int(x1), "y": int(y1), "w": int(max(0, x2 - x1)), "h": int(max(0, y2 - y1)), } ) gid += 1 groups.sort(key=lambda g: (-int(g.get("size", 0)), int(g.get("id", 0)))) return groups def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._stop.set() if self._thread: self._thread.join(timeout=1.0) self._close_depth_pipeline() try: if self._video_cap is not None: self._video_cap.release() except Exception as e: record_error("vision_detector", "stop_release_video", e) def latest(self): with self._latest_lock: return dict(self._latest) def set_hard_lock(self, enabled: bool): self._hard_target_lock_enabled = bool(enabled) def lock_target_from_snapshot(self, snapshot: dict | None = None, lock_group: bool = False): s = snapshot or self.latest() self._locked_target_type = "" self._locked_group_id = None self._locked_subject_id = None if lock_group: groups = s.get("group_boxes") or [] if groups: try: g = max( groups, key=lambda x: (int(x.get("size", 0)), float(x.get("w", 0.0)) * float(x.get("h", 0.0))), ) except Exception: g = groups[0] gid = g.get("id") members = g.get("members") or [] try: self._locked_group_id = int(gid) if gid is not None else None except Exception: self._locked_group_id = None try: self._locked_subject_id = int(members[0]) if members else None except Exception: self._locked_subject_id = None if self._locked_group_id is not None: self._locked_target_type = "group" if self._locked_target_type != "group": sid = s.get("subject_id") if sid is None: boxes = s.get("boxes") or [] if boxes: try: sid = max(boxes, key=lambda x: float(x.get("w", 0.0)) * float(x.get("h", 0.0))).get("id") except Exception: sid = boxes[0].get("id") try: self._locked_subject_id = int(sid) if sid is not None else None except Exception: self._locked_subject_id = None if self._locked_subject_id is not None: self._locked_target_type = "subject" self._subject_lost_count = 0 def unlock_target(self): self._locked_target_type = "" self._locked_group_id = None self._locked_subject_id = None self._subject_lost_count = 0 def lock_subject(self, subject_id: int | None = None): if subject_id is None: snap = self.latest() subject_id = snap.get("subject_id") try: self._locked_subject_id = int(subject_id) if subject_id is not None else None except Exception: self._locked_subject_id = None self._locked_group_id = None self._locked_target_type = "subject" if self._locked_subject_id is not None else "" self._subject_lost_count = 0 def lock_subject_from_snapshot(self, snapshot: dict | None = None): s = snapshot or self.latest() sid = s.get("subject_id") if sid is None: boxes = s.get("boxes") or [] if boxes: b = max(boxes, key=lambda x: float(x.get("w", 0.0)) * float(x.get("h", 0.0))) sid = b.get("id") self.lock_subject(sid) def unlock_subject(self): self.unlock_target() def _close_depth_pipeline(self): if self._rs_pipeline is not None: try: self._rs_pipeline.stop() except Exception: pass self._rs_pipeline = None self._rs_profile = None self._depth_ok = False def _ensure_depth_pipeline(self, force: bool = False): if not self._depth_enabled or not _HAS_RS: self._depth_ok = False return False if self._rs_pipeline is not None and not force: self._depth_ok = True return True now = time.time() if (not force) and (now - self._last_rs_restart) < 2.0: return self._rs_pipeline is not None self._last_rs_restart = now self._close_depth_pipeline() try: pipe = rs.pipeline() cfg = rs.config() cfg.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) self._rs_profile = pipe.start(cfg) self._rs_pipeline = pipe self._depth_ok = True if force: self._depth_restarts += 1 logger.info("VisionDetector: RealSense depth pipeline started") return True except Exception as e: self._rs_pipeline = None self._rs_profile = None self._depth_ok = False logger.warning(f"VisionDetector: depth pipeline unavailable: {e}") return False def _recover_inputs(self): logger.warning("VisionDetector: recovering camera/depth inputs") self._camera_restarts += 1 try: self._camera.release() except Exception as e: record_error("vision_detector", "recover_inputs_release", e) try: self._camera.initialize() except Exception as e: record_error("vision_detector", "recover_inputs_initialize", e) self._ensure_depth_pipeline(force=True) self._no_frame_count = 0 @staticmethod def _coerce_depth_payload(depth_payload, subject_box=None): if depth_payload is None: return None if isinstance(depth_payload, (int, float)): d = float(depth_payload) return d if d > 0.0 else None if isinstance(depth_payload, np.ndarray): arr = depth_payload h, w = arr.shape[:2] if subject_box: x = max(0, int(subject_box.get("x", 0))) y = max(0, int(subject_box.get("y", 0))) bw = max(1, int(subject_box.get("w", 1))) bh = max(1, int(subject_box.get("h", 1))) x2 = min(w, x + bw) y2 = min(h, y + bh) roi = arr[y:y2, x:x2] else: roi = arr if roi.size <= 0: return None vals = roi[roi > 0] if vals.size <= 0: return None med = float(np.median(vals)) if arr.dtype in (np.uint16, np.int16, np.uint32, np.int32): med *= 0.001 # mm -> m return med if med > 0.0 else None return None def _estimate_depth_m(self, subject_box=None, depth_payload=None): d = self._coerce_depth_payload(depth_payload, subject_box=subject_box) if d is not None: self._depth_ok = True return d if not self._ensure_depth_pipeline(force=False): self._depth_ok = False return None try: frames = self._rs_pipeline.poll_for_frames() if not frames: return None depth_frame = frames.get_depth_frame() if not depth_frame: return None w = depth_frame.get_width() h = depth_frame.get_height() if subject_box: cx = int(subject_box.get("x", 0) + (subject_box.get("w", 0) / 2)) cy = int(subject_box.get("y", 0) + (subject_box.get("h", 0) / 2)) else: cx, cy = int(w / 2), int(h / 2) cx = max(0, min(w - 1, cx)) cy = max(0, min(h - 1, cy)) d = float(depth_frame.get_distance(cx, cy)) if d <= 0.0: self._depth_ok = False return None self._depth_ok = True return d except Exception: self._depth_ok = False return None def _extract_frame_and_depth(self): if self._video_cap is not None: try: ret, frame = self._video_cap.read() if not ret: self._video_cap.set(cv2.CAP_PROP_POS_FRAMES, 0) ret, frame = self._video_cap.read() self._camera_ok = bool(ret and frame is not None) return frame, None except Exception: self._camera_ok = False return None, None if direct_camera_client.is_enabled(): try: frame = direct_camera_client.frame_bgr(timeout=1.5) if frame is not None: self._camera_ok = True return frame, None except Exception: pass try: frame = self._camera.capture_frame() self._camera_ok = frame is not None return frame, None except Exception: self._camera_ok = False return None, None def _select_subject(self, boxes, groups=None): groups = groups or [] if not boxes: self._subject_lost_count += 1 if (not self._hard_target_lock_enabled) and self._subject_lost_count > self._subject_lock_lost_frames: self._locked_subject_id = None self._locked_group_id = None self._locked_target_type = "" return None, False self._subject_lost_count = 0 if self._hard_target_lock_enabled and self._locked_group_id is not None: locked_group = None for g in groups: try: if int(g.get("id", -1)) == int(self._locked_group_id): locked_group = g break except Exception: continue if locked_group is None: self._target_switch_blocked_count += 1 return None, False members = set() try: members = {int(x) for x in (locked_group.get("members") or [])} except Exception: members = set() cands = [b for b in boxes if int(b.get("id", -1)) in members] if not cands: self._target_switch_blocked_count += 1 return None, False best = max(cands, key=lambda b: float(b.get("w", 0.0)) * float(b.get("h", 0.0))) try: self._locked_subject_id = int(best.get("id")) except Exception: pass return best, True if self._locked_subject_id is not None: for b in boxes: if int(b.get("id", -1)) == int(self._locked_subject_id): return b, True if self._hard_target_lock_enabled: self._target_switch_blocked_count += 1 return None, False # No lock: select largest target. best = max(boxes, key=lambda b: float(b.get("w", 0.0)) * float(b.get("h", 0.0))) return best, True def _check_interaction_intent(self, boxes, subject_box, depth_m, now): max_area = 0.0 for b in boxes: try: area = max(0.0, float(b.get("w", 0.0))) * max(0.0, float(b.get("h", 0.0))) if area > max_area: max_area = area except Exception: continue self._area_history.append(max_area) area_first = self._area_history[0] if self._area_history else 0.0 is_close_area = max_area >= self._proximity_threshold is_approach_area = len(self._area_history) >= self._history_len and (max_area - area_first) >= self._approach_delta approach_speed = 0.0 is_close_depth = False is_approach_depth = False if depth_m is not None: self._depth_history.append((now, float(depth_m))) if len(self._depth_history) >= 2: t0, d0 = self._depth_history[0] dt = max(1e-3, now - float(t0)) approach_speed = max(0.0, (float(d0) - float(depth_m)) / dt) is_close_depth = float(depth_m) <= self._min_depth_m is_approach_depth = approach_speed >= self._min_approach_speed_mps else: self._depth_history.clear() if depth_m is not None: is_close = is_close_depth is_approaching = is_approach_depth else: is_close = is_close_area is_approaching = is_approach_area subject_visible = subject_box is not None intent_detected = bool(subject_visible and (is_close or is_approaching)) return { "intent_detected": intent_detected, "max_area": float(max_area), "is_close": bool(is_close), "is_approaching": bool(is_approaching), "depth_m": float(depth_m) if depth_m is not None else None, "approach_speed_mps": float(approach_speed), } def _run(self): interval = max(0.01, 1.0 / max(1.0, self.poll_hz)) tracker = CentroidTracker() while not self._stop.is_set(): t0 = time.time() now = time.time() self._refresh_detector_backend(now) frame, depth_payload = self._extract_frame_and_depth() if frame is None: self._no_frame_count += 1 if self._no_frame_count >= self._recover_after_no_frame: self._recover_inputs() intent = self._check_interaction_intent([], None, None, now) with self._latest_lock: self._latest.update( { "frame_time": now, "detector_backend": self._effective_backend(), "yolo_runtime": self._yolo_runtime, "face_count": 0, "person_count": 0, "group_count": 0, "group_size": 0, "group_detected": False, "boxes": [], "face_boxes": [], "group_boxes": [], "frame": None, "subject_id": self._locked_subject_id, "subject_box": None, "subject_visible": False, "subject_locked": self._locked_subject_id is not None, "target_lock_active": bool(self._locked_subject_id is not None or self._locked_group_id is not None), "target_lock_type": str(self._locked_target_type or ""), "target_lock_id": self._locked_group_id if self._locked_group_id is not None else self._locked_subject_id, "target_switch_blocked_count": int(self._target_switch_blocked_count), "camera_ok": bool(self._camera_ok), "depth_ok": bool(self._depth_ok), "camera_restarts": int(self._camera_restarts), "depth_restarts": int(self._depth_restarts), **intent, } ) time.sleep(interval) continue self._no_frame_count = 0 try: person_boxes = self._detect_person_boxes(frame, tracker) face_boxes = self._detect_face_boxes(frame) groups = self._detect_groups(person_boxes) group_size = max((int(g.get("size", 0)) for g in groups), default=0) subject_box, subject_visible = self._select_subject(person_boxes, groups=groups) depth_m = self._estimate_depth_m(subject_box=subject_box, depth_payload=depth_payload) intent = self._check_interaction_intent(person_boxes, subject_box, depth_m, now) with self._latest_lock: self._latest.update( { "frame_time": now, "detector_backend": self._effective_backend(), "yolo_runtime": self._yolo_runtime, "face_count": len(face_boxes), "person_count": len(person_boxes), "group_count": len(groups), "group_size": int(group_size), "group_detected": bool(group_size >= self._group_min_people), "boxes": person_boxes, "face_boxes": face_boxes, "group_boxes": groups, "frame": frame.copy() if frame is not None else None, "subject_id": subject_box.get("id") if subject_box else self._locked_subject_id, "subject_box": dict(subject_box) if isinstance(subject_box, dict) else None, "subject_visible": bool(subject_visible), "subject_locked": self._locked_subject_id is not None, "target_lock_active": bool(self._locked_subject_id is not None or self._locked_group_id is not None), "target_lock_type": str(self._locked_target_type or ""), "target_lock_id": self._locked_group_id if self._locked_group_id is not None else self._locked_subject_id, "target_switch_blocked_count": int(self._target_switch_blocked_count), "camera_ok": bool(self._camera_ok), "depth_ok": bool(self._depth_ok), "camera_restarts": int(self._camera_restarts), "depth_restarts": int(self._depth_restarts), **intent, } ) except Exception as e: record_error("vision_detector", "run_loop_detect", e) logger.warning(f"VisionDetector: detect failed: {e}") intent = self._check_interaction_intent([], None, None, now) with self._latest_lock: self._latest.update( { "frame_time": now, "detector_backend": self._effective_backend(), "yolo_runtime": self._yolo_runtime, "face_count": 0, "person_count": 0, "group_count": 0, "group_size": 0, "group_detected": False, "boxes": [], "face_boxes": [], "group_boxes": [], "frame": frame.copy() if frame is not None else None, "subject_id": self._locked_subject_id, "subject_box": None, "subject_visible": False, "subject_locked": self._locked_subject_id is not None, "target_lock_active": bool(self._locked_subject_id is not None or self._locked_group_id is not None), "target_lock_type": str(self._locked_target_type or ""), "target_lock_id": self._locked_group_id if self._locked_group_id is not None else self._locked_subject_id, "target_switch_blocked_count": int(self._target_switch_blocked_count), "camera_ok": bool(self._camera_ok), "depth_ok": bool(self._depth_ok), "camera_restarts": int(self._camera_restarts), "depth_restarts": int(self._depth_restarts), **intent, } ) dt = time.time() - t0 time.sleep(max(0.0, interval - dt)) class CentroidTracker: """Small centroid tracker assigning incremental IDs to objects.""" def __init__(self, max_disappeared: int = 10, max_distance: int = 80): self.next_id = 1 self.objects = {} # id -> centroid self.disappeared = {} # id -> frames disappeared self.max_disappeared = max_disappeared self.max_distance = max_distance def _centroid(self, box): x, y, w, h = box return (int(x + w / 2), int(y + h / 2)) def register(self, box): cid = self.next_id self.next_id += 1 cent = self._centroid(box) self.objects[cid] = cent self.disappeared[cid] = 0 return cid def deregister(self, cid): del self.objects[cid] del self.disappeared[cid] def update(self, boxes): """boxes: list of (x, y, w, h), returns list of (id, box).""" if len(boxes) == 0: remove = [] for cid in list(self.disappeared.keys()): self.disappeared[cid] += 1 if self.disappeared[cid] > self.max_disappeared: remove.append(cid) for cid in remove: self.deregister(cid) return [] input_centroids = [self._centroid(b) for b in boxes] if len(self.objects) == 0: assigned = [] for b in boxes: cid = self.register(b) assigned.append((cid, b)) return assigned object_ids = list(self.objects.keys()) object_centroids = list(self.objects.values()) D = np.linalg.norm(np.array(object_centroids)[:, None, :] - np.array(input_centroids)[None, :, :], axis=2) rows = D.min(axis=1).argsort() cols = D.argmin(axis=1)[rows] used_rows = set() used_cols = set() assigned = [] for r, c in zip(rows, cols): if r in used_rows or c in used_cols: continue if D[r, c] > self.max_distance: continue cid = object_ids[r] self.objects[cid] = input_centroids[c] self.disappeared[cid] = 0 assigned.append((cid, boxes[c])) used_rows.add(r) used_cols.add(c) for i, b in enumerate(boxes): if i in used_cols: continue cid = self.register(b) assigned.append((cid, b)) for i, cid in enumerate(object_ids): if i in used_rows: continue self.disappeared[cid] += 1 if self.disappeared[cid] > self.max_disappeared: self.deregister(cid) return assigned if __name__ == "__main__": logging.basicConfig(level=logging.INFO) v = VisionDetector() v.start() try: for _ in range(30): s = v.latest() logger.info( "people=%s faces=%s groups=%s intent=%s time=%s", s.get("person_count", 0), s.get("face_count", 0), s.get("group_count", 0), s.get("intent_detected", False), s.get("frame_time", 0.0), ) time.sleep(0.5) finally: v.stop()