AI_Photographer/Modes/AI/vision_detector.py
2026-04-12 18:52:37 +04:00

1481 lines
57 KiB
Python

"""vision_detector.py
YOLO detector + tracker + intent:
- YOLO ONNX person detector for subject tracking/intent.
- YOLO ONNX face detector (optional) for face metrics.
- Group detection from person clusters.
- Intent uses RealSense depth first; bbox area fallback.
"""
from collections import deque
from threading import Event, Lock, Thread
import logging
import os
import time
from pathlib import Path
import cv2
import numpy as np
from Modes.AI.camera_module import CameraCapture
from Core import settings as config
from Core.error_events import record_error
from Server import direct_camera_client
try:
import pyrealsense2 as rs # type: ignore
_HAS_RS = True
except Exception:
_HAS_RS = False
try:
from ultralytics import YOLO # type: ignore
_HAS_ULTRALYTICS = True
except Exception:
YOLO = None # type: ignore
_HAS_ULTRALYTICS = False
logger = logging.getLogger(__name__)
def _coerce_backend_name(value: str | None) -> str:
v = str(value or "").strip().lower()
if v in ("yolo", "normal"):
return v
return "normal"
def _coerce_yolo_runtime(value: str | None) -> str:
v = str(value or "").strip().lower()
if v in ("ultralytics", "opencv"):
return v
return "ultralytics"
def _resolve_model_path(path_value: str | None) -> Path | None:
if not path_value:
return None
p = Path(str(path_value)).expanduser()
if not p.is_absolute():
p = (config.PROJECT_ROOT / p).resolve()
return p
def _probe_onnx_model(path_value: str | None) -> dict:
out = {"path": "", "exists": False, "loadable": False, "error": ""}
p = _resolve_model_path(path_value)
if p is None:
return out
out["path"] = str(p)
if not p.exists() or (not p.is_file()):
out["error"] = "model file missing"
return out
out["exists"] = True
try:
cv2.dnn.readNetFromONNX(str(p))
out["loadable"] = True
except Exception as e:
out["error"] = str(e)
return out
def _probe_ultralytics_model(path_value: str | None) -> dict:
out = {"path": "", "exists": False, "loadable": False, "error": ""}
p = _resolve_model_path(path_value)
if p is None:
return out
out["path"] = str(p)
if not p.exists() or (not p.is_file()):
out["error"] = "model file missing"
return out
out["exists"] = True
if not _HAS_ULTRALYTICS:
out["error"] = "ultralytics not installed"
return out
try:
YOLO(str(p))
out["loadable"] = True
except Exception as e:
out["error"] = str(e)
return out
def probe_ai_readiness(backend: str | None = None, strict_required: bool | None = None) -> dict:
configured_backend = _coerce_backend_name(
backend if backend is not None else config.read_vision_detector_backend()
)
yolo_runtime = _coerce_yolo_runtime(config.read_vision_yolo_runtime())
strict = bool(config.read_vision_yolo_strict_required() if strict_required is None else strict_required)
person_path = (
os.environ.get("DETECTOR_PERSON_YOLO_ONNX", "").strip()
or os.environ.get("DETECTOR_ONNX_PATH", "").strip()
or str(getattr(config, "VISION_PERSON_YOLO_ONNX", "")).strip()
)
face_path = (
os.environ.get("DETECTOR_FACE_YOLO_ONNX", "").strip()
or str(getattr(config, "VISION_FACE_YOLO_ONNX", "")).strip()
)
if yolo_runtime == "ultralytics":
person = _probe_ultralytics_model(person_path)
face = _probe_ultralytics_model(face_path)
else:
person = _probe_onnx_model(person_path)
face = _probe_onnx_model(face_path)
person_ok = bool(person.get("loadable"))
face_ok = bool(face.get("loadable"))
yolo_loaded = bool(configured_backend == "yolo" and person_ok)
block_reason = ""
if strict and configured_backend != "yolo":
block_reason = "Strict AI mode requires detection_backend=yolo."
elif strict and yolo_runtime == "ultralytics" and not _HAS_ULTRALYTICS:
block_reason = "Ultralytics runtime selected but package is not installed."
elif strict and not person_ok:
block_reason = "Person YOLO model is missing/invalid. Fix vision.person_yolo_onnx."
ok = not bool(block_reason)
return {
"ok": bool(ok),
"strict_required": bool(strict),
"configured_backend": configured_backend,
"yolo_runtime": yolo_runtime,
"effective_backend": "yolo" if yolo_loaded else "normal",
"yolo_loaded": bool(yolo_loaded),
"person_model_ok": bool(person_ok),
"face_model_ok": bool(face_ok),
"person_model_path": str(person.get("path") or ""),
"face_model_path": str(face.get("path") or ""),
"person_model_error": str(person.get("error") or ""),
"face_model_error": str(face.get("error") or ""),
"block_reason": block_reason,
}
class VisionDetector:
def __init__(self, zmq_host="127.0.0.1", zmq_port=55555, poll_hz=10, video_source: str | None = None):
self.zmq_host = zmq_host
self.zmq_port = int(zmq_port)
self.poll_hz = float(poll_hz)
self.video_source = video_source
self._stop = Event()
self._thread = None
self._latest_lock = Lock()
self._latest = {
"frame_time": 0.0,
"detector_backend": "normal",
"yolo_runtime": "ultralytics",
"face_count": 0,
"person_count": 0,
"group_count": 0,
"group_size": 0,
"group_detected": False,
"boxes": [],
"face_boxes": [],
"group_boxes": [],
"frame": None,
"intent_detected": False,
"max_area": 0.0,
"is_close": False,
"is_approaching": False,
"depth_m": None,
"approach_speed_mps": 0.0,
"subject_id": None,
"subject_box": None,
"subject_visible": False,
"subject_locked": False,
"target_lock_active": False,
"target_lock_type": "",
"target_lock_id": None,
"target_switch_blocked_count": 0,
"camera_ok": False,
"depth_ok": False,
"camera_restarts": 0,
"depth_restarts": 0,
}
# Runtime-selectable detector backend
self._detector_backend = self._coerce_backend(
os.environ.get("DETECTOR_BACKEND", "").strip() or config.read_vision_detector_backend()
)
self._last_backend_check_ts = 0.0
# YOLO detection config
self._yolo_runtime = _coerce_yolo_runtime(
os.environ.get("DETECTOR_YOLO_RUNTIME", "").strip() or config.read_vision_yolo_runtime()
)
self._ultralytics_device = (
os.environ.get("DETECTOR_YOLO_DEVICE", "").strip() or config.read_vision_yolo_ultralytics_device()
)
self._person_model_path = str(
_resolve_model_path(
os.environ.get("DETECTOR_PERSON_YOLO_ONNX", "").strip()
or os.environ.get("DETECTOR_ONNX_PATH", "").strip()
or str(getattr(config, "VISION_PERSON_YOLO_ONNX", "")).strip()
or ""
)
or ""
).strip()
self._face_model_path = str(
_resolve_model_path(
os.environ.get("DETECTOR_FACE_YOLO_ONNX", "").strip()
or str(getattr(config, "VISION_FACE_YOLO_ONNX", "")).strip()
or ""
)
or ""
).strip()
input_size = int(max(320, getattr(config, "VISION_INPUT_SIZE", 640)))
self._dnn_input_size = (input_size, input_size)
self._person_score_thresh = float(getattr(config, "VISION_PERSON_SCORE_THRESH", 0.35))
self._face_score_thresh = float(getattr(config, "VISION_FACE_SCORE_THRESH", 0.35))
self._nms_iou_thresh = float(getattr(config, "VISION_NMS_IOU_THRESH", 0.45))
self._person_class_id = int(getattr(config, "VISION_PERSON_CLASS_ID", 0))
self._group_min_people = max(2, int(getattr(config, "VISION_GROUP_MIN_PEOPLE", 3)))
self._group_link_distance_px = float(getattr(config, "VISION_GROUP_LINK_DISTANCE_PX", 220.0))
self._person_net = None
self._face_net = None
self._warned_person_fallback = False
self._warned_yolo_unavailable = False
self._warned_ultralytics_missing = False
self._last_model_retry_ts = 0.0
self._model_retry_sec = float(os.environ.get("YOLO_MODEL_RETRY_SEC", "2.0"))
# Area-based fallback intent
self._proximity_threshold = float(os.environ.get("PROXIMITY_THRESHOLD", "50000"))
self._approach_delta = float(os.environ.get("APPROACH_DELTA", "12000"))
self._history_len = max(2, int(os.environ.get("INTENT_HISTORY_LEN", "5")))
self._area_history = deque(maxlen=self._history_len)
# Depth-first intent
self._min_depth_m = float(os.environ.get("MIN_DEPTH_M", "1.7"))
self._min_approach_speed_mps = float(os.environ.get("MIN_APPROACH_SPEED_MPS", "0.08"))
self._depth_history_len = max(2, int(os.environ.get("DEPTH_HISTORY_LEN", "6")))
self._depth_history = deque(maxlen=self._depth_history_len)
# Subject lock
self._hard_target_lock_enabled = bool(config.read_vision_hard_target_lock_enabled())
self._locked_target_type = ""
self._locked_subject_id = None
self._locked_group_id = None
self._target_switch_blocked_count = 0
self._subject_lock_lost_frames = max(1, int(os.environ.get("SUBJECT_LOCK_LOST_FRAMES", "20")))
self._subject_lost_count = 0
# Camera/depth watchdog
self._no_frame_count = 0
self._recover_after_no_frame = max(3, int(os.environ.get("CAMERA_RECOVER_AFTER_FRAMES", "10")))
self._camera_ok = False
self._depth_ok = False
self._camera_restarts = 0
self._depth_restarts = 0
self._depth_enabled = os.environ.get("DETECTOR_USE_REALSENSE_DEPTH", "1").strip().lower() not in (
"0",
"false",
"no",
"off",
)
self._rs_pipeline = None
self._rs_profile = None
self._last_rs_restart = 0.0
self._apply_detector_backend(self._detector_backend, reason="init")
self._latest["detector_backend"] = self._effective_backend()
self._latest["yolo_runtime"] = self._yolo_runtime
self._face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
self._camera = CameraCapture()
try:
self._camera.initialize()
except Exception:
logger.warning("VisionDetector: could not init CameraCapture fallback")
self._video_cap = None
if self.video_source:
try:
self._video_cap = cv2.VideoCapture(self.video_source)
if not self._video_cap.isOpened():
logger.warning(f"VisionDetector: could not open video source {self.video_source}")
self._video_cap = None
else:
logger.info(f"VisionDetector: using video source {self.video_source} for simulation")
except Exception as e:
logger.warning(f"VisionDetector: video source init failed: {e}")
self._ensure_depth_pipeline(force=False)
@staticmethod
def _coerce_backend(value: str | None) -> str:
return _coerce_backend_name(value)
def _effective_backend(self) -> str:
if self._detector_backend == "yolo" and self._person_net is not None:
return "yolo"
return "normal"
def _build_readiness(self, strict_required: bool | None = None) -> dict:
strict = bool(config.read_vision_yolo_strict_required() if strict_required is None else strict_required)
configured_backend = self._coerce_backend(self._detector_backend)
yolo_runtime = _coerce_yolo_runtime(self._yolo_runtime)
if yolo_runtime == "ultralytics":
person_model_ok = bool(self._person_net is not None and hasattr(self._person_net, "predict"))
face_model_ok = bool(self._face_net is not None and hasattr(self._face_net, "predict")) if self._face_model_path else False
else:
person_model_ok = bool(self._person_net is not None and hasattr(self._person_net, "setInput"))
face_model_ok = bool(self._face_net is not None and hasattr(self._face_net, "setInput")) if self._face_model_path else False
yolo_loaded = bool(configured_backend == "yolo" and person_model_ok)
block_reason = ""
if strict and configured_backend != "yolo":
block_reason = "Strict AI mode requires detection_backend=yolo."
elif strict and yolo_runtime == "ultralytics" and not _HAS_ULTRALYTICS:
block_reason = "Ultralytics runtime selected but package is not installed."
elif strict and not person_model_ok:
block_reason = "Person YOLO model is missing/invalid. Fix vision.person_yolo_onnx."
return {
"ok": not bool(block_reason),
"strict_required": bool(strict),
"configured_backend": configured_backend,
"yolo_runtime": yolo_runtime,
"effective_backend": self._effective_backend(),
"yolo_loaded": bool(yolo_loaded),
"person_model_ok": bool(person_model_ok),
"face_model_ok": bool(face_model_ok),
"person_model_path": str(self._person_model_path or ""),
"face_model_path": str(self._face_model_path or ""),
"person_model_error": "" if person_model_ok else "model not loaded",
"face_model_error": "" if (face_model_ok or (not self._face_model_path)) else "model not loaded",
"block_reason": block_reason,
}
def readiness(self, strict_required: bool | None = None) -> dict:
return self._build_readiness(strict_required=strict_required)
@classmethod
def probe_readiness(cls, backend: str | None = None, strict_required: bool | None = None) -> dict:
return probe_ai_readiness(backend=backend, strict_required=strict_required)
def _maybe_retry_yolo_models(self, now_ts: float):
if self._detector_backend != "yolo":
return
if self._person_net is not None and (self._face_net is not None or not self._face_model_path):
return
if (now_ts - self._last_model_retry_ts) < max(0.5, self._model_retry_sec):
return
self._last_model_retry_ts = now_ts
if self._yolo_runtime == "ultralytics":
if self._person_net is None:
self._person_net = self._load_ultralytics_model(self._person_model_path, label="person")
if self._face_net is None and self._face_model_path:
self._face_net = self._load_ultralytics_model(self._face_model_path, label="face")
else:
if self._person_net is None:
self._person_net = self._load_onnx_model(self._person_model_path, label="person")
if self._face_net is None and self._face_model_path:
self._face_net = self._load_onnx_model(self._face_model_path, label="face")
def _apply_detector_backend(self, backend: str, reason: str = ""):
backend = self._coerce_backend(backend)
prev_runtime = self._yolo_runtime
self._detector_backend = backend
self._warned_person_fallback = False
self._warned_yolo_unavailable = False
self._warned_ultralytics_missing = False
try:
self._yolo_runtime = _coerce_yolo_runtime(config.read_vision_yolo_runtime())
except Exception:
self._yolo_runtime = _coerce_yolo_runtime(self._yolo_runtime)
self._yolo_runtime = _coerce_yolo_runtime(
os.environ.get("DETECTOR_YOLO_RUNTIME", "").strip() or self._yolo_runtime
)
if self._yolo_runtime != prev_runtime:
self._person_net = None
self._face_net = None
if backend == "yolo":
if self._yolo_runtime == "ultralytics":
if self._person_net is None:
self._person_net = self._load_ultralytics_model(self._person_model_path, label="person")
if self._face_net is None and self._face_model_path:
self._face_net = self._load_ultralytics_model(self._face_model_path, label="face")
else:
if self._person_net is None:
self._person_net = self._load_onnx_model(self._person_model_path, label="person")
if self._face_net is None and self._face_model_path:
self._face_net = self._load_onnx_model(self._face_model_path, label="face")
if self._person_net is None:
logger.warning(
f"VisionDetector: YOLO backend selected ({self._yolo_runtime}) but person model unavailable. Falling back to normal."
)
elif self._face_net is None and self._face_model_path:
logger.info("VisionDetector: YOLO person active, face model missing -> Haar face fallback.")
else:
# Force non-AI normal mode.
self._person_net = None
self._face_net = None
suffix = f" ({reason})" if reason else ""
logger.info(f"VisionDetector: detector backend set to {self._effective_backend()} [runtime={self._yolo_runtime}]{suffix}")
def _refresh_detector_backend(self, now_ts: float):
if (now_ts - self._last_backend_check_ts) < 1.0:
self._maybe_retry_yolo_models(now_ts)
return
self._last_backend_check_ts = now_ts
try:
self._hard_target_lock_enabled = bool(config.read_vision_hard_target_lock_enabled())
except Exception:
pass
try:
cfg_backend = self._coerce_backend(config.read_vision_detector_backend())
except Exception:
cfg_backend = self._detector_backend
try:
cfg_runtime = _coerce_yolo_runtime(config.read_vision_yolo_runtime())
except Exception:
cfg_runtime = self._yolo_runtime
if cfg_backend != self._detector_backend or cfg_runtime != self._yolo_runtime:
self._apply_detector_backend(cfg_backend, reason="runtime switch")
else:
self._maybe_retry_yolo_models(now_ts)
@staticmethod
def _load_onnx_model(path: str | None, label: str):
if not path:
return None
try:
net = cv2.dnn.readNetFromONNX(str(path))
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_DEFAULT)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
logger.info(f"VisionDetector: loaded {label} YOLO ONNX model: {path}")
return net
except Exception as e:
logger.warning(f"VisionDetector: failed to load {label} model {path}: {e}")
return None
def _load_ultralytics_model(self, path: str | None, label: str):
if not path:
return None
if not _HAS_ULTRALYTICS:
if not self._warned_ultralytics_missing:
self._warned_ultralytics_missing = True
logger.warning("VisionDetector: ultralytics package not installed; cannot use ultralytics runtime.")
return None
try:
model = YOLO(str(path))
logger.info(f"VisionDetector: loaded {label} Ultralytics model: {path}")
return model
except Exception as e:
logger.warning(f"VisionDetector: failed to load {label} ultralytics model {path}: {e}")
return None
def _run_yolo_ultralytics(self, model, frame, conf_thresh: float, class_filter: int | None = None):
if model is None or frame is None:
return []
try:
results = model.predict(
source=frame,
conf=float(conf_thresh),
iou=float(self._nms_iou_thresh),
imgsz=int(self._dnn_input_size[0]),
device=str(self._ultralytics_device),
verbose=False,
)
if not results:
return []
res = results[0]
boxes_obj = getattr(res, "boxes", None)
if boxes_obj is None:
return []
xyxy = boxes_obj.xyxy
confs = boxes_obj.conf
classes = boxes_obj.cls
if xyxy is None or confs is None or classes is None:
return []
try:
xyxy_np = xyxy.cpu().numpy()
conf_np = confs.cpu().numpy()
cls_np = classes.cpu().numpy()
except Exception:
xyxy_np = np.asarray(xyxy)
conf_np = np.asarray(confs)
cls_np = np.asarray(classes)
out = []
for i in range(len(xyxy_np)):
cls_id = int(round(float(cls_np[i])))
if class_filter is not None and cls_id != int(class_filter):
continue
x1, y1, x2, y2 = [float(v) for v in xyxy_np[i][:4]]
x = int(round(max(0.0, x1)))
y = int(round(max(0.0, y1)))
w = int(round(max(0.0, x2 - x1)))
h = int(round(max(0.0, y2 - y1)))
if w < 4 or h < 4:
continue
out.append(
{
"x": x,
"y": y,
"w": w,
"h": h,
"conf": float(conf_np[i]),
"class_id": cls_id,
}
)
return out
except Exception as e:
record_error("vision_detector", "run_yolo_ultralytics", e)
return []
@staticmethod
def _prepare_output_rows(output) -> np.ndarray:
if isinstance(output, (list, tuple)):
chunks = []
for out in output:
rows = VisionDetector._prepare_output_rows(out)
if rows.size > 0:
chunks.append(rows)
if not chunks:
return np.empty((0, 0), dtype=np.float32)
try:
return np.concatenate(chunks, axis=0)
except Exception:
return chunks[0]
arr = np.asarray(output)
if arr.size == 0:
return np.empty((0, 0), dtype=np.float32)
if arr.ndim >= 4:
arr = arr.reshape(arr.shape[0], -1, arr.shape[-1])
if arr.ndim == 3:
if arr.shape[0] == 1:
arr = arr[0]
else:
arr = arr.reshape(-1, arr.shape[-1])
elif arr.ndim == 2:
pass
elif arr.ndim == 1:
arr = arr.reshape(1, -1)
else:
arr = arr.reshape(-1, 1)
if arr.ndim != 2:
return np.empty((0, 0), dtype=np.float32)
# Typical YOLO ONNX output can be [84, 8400], transpose to [8400, 84].
if arr.shape[0] < arr.shape[1] and arr.shape[1] > 64 and arr.shape[0] <= 256:
arr = arr.T
return arr.astype(np.float32, copy=False)
@staticmethod
def _decode_conf_and_class(row: np.ndarray) -> tuple[float, int] | None:
n = int(row.shape[0])
if n < 5:
return None
# End-to-end format: [x1, y1, x2, y2, conf, class_id]
if (
n == 6
and row[2] > row[0]
and row[3] > row[1]
and (row[2] > 2.0 or row[3] > 2.0 or row[0] < 0.0 or row[1] < 0.0)
and 0.0 <= float(row[4]) <= 1.0
and abs(float(row[5]) - round(float(row[5]))) <= 1e-3
):
return float(row[4]), int(round(float(row[5])))
obj = max(0.0, float(row[4]))
if n <= 5:
return obj, 0
class_scores = row[5:]
cls_id = int(np.argmax(class_scores))
cls_score = max(0.0, float(class_scores[cls_id]))
if obj <= 1.5:
conf = obj * cls_score
else:
conf = cls_score
return float(conf), cls_id
@staticmethod
def _coords_to_xywh(coords: np.ndarray, frame_w: int, frame_h: int) -> tuple[int, int, int, int] | None:
if coords.shape[0] < 4:
return None
a, b, c, d = (float(coords[0]), float(coords[1]), float(coords[2]), float(coords[3]))
max_abs = max(abs(a), abs(b), abs(c), abs(d))
# xyxy-style
if c > a and d > b:
if max_abs <= 2.0:
x1, y1 = a * frame_w, b * frame_h
x2, y2 = c * frame_w, d * frame_h
else:
x1, y1, x2, y2 = a, b, c, d
else:
# cx, cy, w, h style (YOLO default)
if max_abs <= 2.0:
cx, cy = a * frame_w, b * frame_h
bw, bh = c * frame_w, d * frame_h
else:
cx, cy = a, b
bw, bh = c, d
x1 = cx - (bw / 2.0)
y1 = cy - (bh / 2.0)
x2 = cx + (bw / 2.0)
y2 = cy + (bh / 2.0)
x1_i = max(0, min(frame_w - 1, int(round(x1))))
y1_i = max(0, min(frame_h - 1, int(round(y1))))
x2_i = max(0, min(frame_w, int(round(x2))))
y2_i = max(0, min(frame_h, int(round(y2))))
w_i = max(0, x2_i - x1_i)
h_i = max(0, y2_i - y1_i)
if w_i < 4 or h_i < 4:
return None
return x1_i, y1_i, w_i, h_i
def _decode_yolo_output(self, output, frame_w: int, frame_h: int, conf_thresh: float, class_filter: int | None):
rows = self._prepare_output_rows(output)
if rows.size == 0:
return []
candidates = []
rects = []
confs = []
for row in rows:
conf_and_class = self._decode_conf_and_class(row)
if conf_and_class is None:
continue
conf, class_id = conf_and_class
if conf < conf_thresh:
continue
if class_filter is not None and int(class_id) != int(class_filter):
continue
xywh = self._coords_to_xywh(row[:4], frame_w=frame_w, frame_h=frame_h)
if xywh is None:
continue
x, y, w, h = xywh
candidates.append({"x": x, "y": y, "w": w, "h": h, "conf": float(conf), "class_id": int(class_id)})
rects.append([int(x), int(y), int(w), int(h)])
confs.append(float(conf))
if not candidates:
return []
try:
idxs = cv2.dnn.NMSBoxes(rects, confs, conf_thresh, self._nms_iou_thresh)
except Exception:
idxs = None
if idxs is None or len(idxs) == 0:
return candidates
if isinstance(idxs, np.ndarray):
keep = [int(i) for i in idxs.flatten().tolist()]
else:
keep = []
for i in idxs:
if isinstance(i, (list, tuple, np.ndarray)):
keep.append(int(i[0]))
else:
keep.append(int(i))
filtered = []
for idx in keep:
if 0 <= idx < len(candidates):
filtered.append(candidates[idx])
return filtered
def _run_yolo_onnx(self, net, frame, conf_thresh: float, class_filter: int | None = None):
if net is None or frame is None:
return []
try:
blob = cv2.dnn.blobFromImage(frame, 1.0 / 255.0, self._dnn_input_size, swapRB=True, crop=False)
net.setInput(blob)
output = None
try:
names = net.getUnconnectedOutLayersNames()
if names:
output = net.forward(names)
except Exception:
output = None
if output is None:
output = net.forward()
h, w = frame.shape[:2]
return self._decode_yolo_output(output, frame_w=w, frame_h=h, conf_thresh=conf_thresh, class_filter=class_filter)
except Exception as e:
record_error("vision_detector", "run_yolo_onnx", e)
return []
@staticmethod
def _bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float:
ax, ay, aw, ah = a
bx, by, bw, bh = b
ax2, ay2 = ax + aw, ay + ah
bx2, by2 = bx + bw, by + bh
ix1 = max(ax, bx)
iy1 = max(ay, by)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
iw = max(0, ix2 - ix1)
ih = max(0, iy2 - iy1)
inter = float(iw * ih)
if inter <= 0.0:
return 0.0
union = float(aw * ah + bw * bh) - inter
if union <= 0.0:
return 0.0
return inter / union
def _detect_person_boxes(self, frame, tracker):
if self._detector_backend == "yolo" and self._person_net is not None:
if self._yolo_runtime == "ultralytics":
raw = self._run_yolo_ultralytics(
self._person_net,
frame,
conf_thresh=self._person_score_thresh,
class_filter=self._person_class_id,
)
else:
raw = self._run_yolo_onnx(
self._person_net,
frame,
conf_thresh=self._person_score_thresh,
class_filter=self._person_class_id,
)
boxes_raw = [(int(d["x"]), int(d["y"]), int(d["w"]), int(d["h"])) for d in raw]
assigned = tracker.update(boxes_raw)
boxes = []
for (cid, box) in assigned:
best_conf = 0.0
for det in raw:
dbox = (int(det["x"]), int(det["y"]), int(det["w"]), int(det["h"]))
iou = self._bbox_iou(tuple(box), dbox)
if iou > 0.5:
best_conf = max(best_conf, float(det.get("conf", 0.0)))
boxes.append(
{
"x": int(box[0]),
"y": int(box[1]),
"w": int(box[2]),
"h": int(box[3]),
"id": int(cid),
"conf": float(best_conf),
}
)
return boxes
if self._detector_backend == "yolo" and self._person_net is None and not self._warned_yolo_unavailable:
self._warned_yolo_unavailable = True
logger.warning("VisionDetector: YOLO person model unavailable at runtime; using normal detector.")
if not self._warned_person_fallback:
self._warned_person_fallback = True
logger.warning("VisionDetector: person fallback enabled (Haar face boxes used as person proxies).")
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self._face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30))
boxes_raw = [tuple(map(int, f)) for f in faces]
assigned = tracker.update(boxes_raw)
return [
{"x": int(b[0]), "y": int(b[1]), "w": int(b[2]), "h": int(b[3]), "id": int(i), "conf": 1.0}
for (i, b) in assigned
]
def _detect_face_boxes(self, frame):
if self._detector_backend == "yolo" and self._face_net is not None:
if self._yolo_runtime == "ultralytics":
raw = self._run_yolo_ultralytics(
self._face_net,
frame,
conf_thresh=self._face_score_thresh,
class_filter=None,
)
else:
raw = self._run_yolo_onnx(self._face_net, frame, conf_thresh=self._face_score_thresh, class_filter=None)
return [
{
"x": int(d["x"]),
"y": int(d["y"]),
"w": int(d["w"]),
"h": int(d["h"]),
"conf": float(d.get("conf", 0.0)),
}
for d in raw
]
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self._face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30))
return [{"x": int(x), "y": int(y), "w": int(w), "h": int(h), "conf": 1.0} for (x, y, w, h) in faces]
def _detect_groups(self, person_boxes):
if len(person_boxes) < self._group_min_people:
return []
centers = []
for b in person_boxes:
cx = float(b.get("x", 0.0)) + (float(b.get("w", 0.0)) / 2.0)
cy = float(b.get("y", 0.0)) + (float(b.get("h", 0.0)) / 2.0)
centers.append((cx, cy))
n = len(person_boxes)
visited = set()
components = []
link_dist_sq = float(self._group_link_distance_px) * float(self._group_link_distance_px)
for i in range(n):
if i in visited:
continue
stack = [i]
comp = []
while stack:
cur = stack.pop()
if cur in visited:
continue
visited.add(cur)
comp.append(cur)
cx, cy = centers[cur]
for j in range(n):
if j in visited:
continue
dx = cx - centers[j][0]
dy = cy - centers[j][1]
if (dx * dx + dy * dy) <= link_dist_sq:
stack.append(j)
components.append(comp)
groups = []
gid = 1
for comp in components:
if len(comp) < self._group_min_people:
continue
xs = [int(person_boxes[k]["x"]) for k in comp]
ys = [int(person_boxes[k]["y"]) for k in comp]
x2s = [int(person_boxes[k]["x"] + person_boxes[k]["w"]) for k in comp]
y2s = [int(person_boxes[k]["y"] + person_boxes[k]["h"]) for k in comp]
x1 = min(xs)
y1 = min(ys)
x2 = max(x2s)
y2 = max(y2s)
groups.append(
{
"id": gid,
"size": int(len(comp)),
"members": [int(person_boxes[k].get("id", 0)) for k in comp],
"x": int(x1),
"y": int(y1),
"w": int(max(0, x2 - x1)),
"h": int(max(0, y2 - y1)),
}
)
gid += 1
groups.sort(key=lambda g: (-int(g.get("size", 0)), int(g.get("id", 0))))
return groups
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._stop.set()
if self._thread:
self._thread.join(timeout=1.0)
self._close_depth_pipeline()
try:
if self._video_cap is not None:
self._video_cap.release()
except Exception as e:
record_error("vision_detector", "stop_release_video", e)
def latest(self):
with self._latest_lock:
return dict(self._latest)
def set_hard_lock(self, enabled: bool):
self._hard_target_lock_enabled = bool(enabled)
def lock_target_from_snapshot(self, snapshot: dict | None = None, lock_group: bool = False):
s = snapshot or self.latest()
self._locked_target_type = ""
self._locked_group_id = None
self._locked_subject_id = None
if lock_group:
groups = s.get("group_boxes") or []
if groups:
try:
g = max(
groups,
key=lambda x: (int(x.get("size", 0)), float(x.get("w", 0.0)) * float(x.get("h", 0.0))),
)
except Exception:
g = groups[0]
gid = g.get("id")
members = g.get("members") or []
try:
self._locked_group_id = int(gid) if gid is not None else None
except Exception:
self._locked_group_id = None
try:
self._locked_subject_id = int(members[0]) if members else None
except Exception:
self._locked_subject_id = None
if self._locked_group_id is not None:
self._locked_target_type = "group"
if self._locked_target_type != "group":
sid = s.get("subject_id")
if sid is None:
boxes = s.get("boxes") or []
if boxes:
try:
sid = max(boxes, key=lambda x: float(x.get("w", 0.0)) * float(x.get("h", 0.0))).get("id")
except Exception:
sid = boxes[0].get("id")
try:
self._locked_subject_id = int(sid) if sid is not None else None
except Exception:
self._locked_subject_id = None
if self._locked_subject_id is not None:
self._locked_target_type = "subject"
self._subject_lost_count = 0
def unlock_target(self):
self._locked_target_type = ""
self._locked_group_id = None
self._locked_subject_id = None
self._subject_lost_count = 0
def lock_subject(self, subject_id: int | None = None):
if subject_id is None:
snap = self.latest()
subject_id = snap.get("subject_id")
try:
self._locked_subject_id = int(subject_id) if subject_id is not None else None
except Exception:
self._locked_subject_id = None
self._locked_group_id = None
self._locked_target_type = "subject" if self._locked_subject_id is not None else ""
self._subject_lost_count = 0
def lock_subject_from_snapshot(self, snapshot: dict | None = None):
s = snapshot or self.latest()
sid = s.get("subject_id")
if sid is None:
boxes = s.get("boxes") or []
if boxes:
b = max(boxes, key=lambda x: float(x.get("w", 0.0)) * float(x.get("h", 0.0)))
sid = b.get("id")
self.lock_subject(sid)
def unlock_subject(self):
self.unlock_target()
def _close_depth_pipeline(self):
if self._rs_pipeline is not None:
try:
self._rs_pipeline.stop()
except Exception:
pass
self._rs_pipeline = None
self._rs_profile = None
self._depth_ok = False
def _ensure_depth_pipeline(self, force: bool = False):
if not self._depth_enabled or not _HAS_RS:
self._depth_ok = False
return False
if self._rs_pipeline is not None and not force:
self._depth_ok = True
return True
now = time.time()
if (not force) and (now - self._last_rs_restart) < 2.0:
return self._rs_pipeline is not None
self._last_rs_restart = now
self._close_depth_pipeline()
try:
pipe = rs.pipeline()
cfg = rs.config()
cfg.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
self._rs_profile = pipe.start(cfg)
self._rs_pipeline = pipe
self._depth_ok = True
if force:
self._depth_restarts += 1
logger.info("VisionDetector: RealSense depth pipeline started")
return True
except Exception as e:
self._rs_pipeline = None
self._rs_profile = None
self._depth_ok = False
logger.warning(f"VisionDetector: depth pipeline unavailable: {e}")
return False
def _recover_inputs(self):
logger.warning("VisionDetector: recovering camera/depth inputs")
self._camera_restarts += 1
try:
self._camera.release()
except Exception as e:
record_error("vision_detector", "recover_inputs_release", e)
try:
self._camera.initialize()
except Exception as e:
record_error("vision_detector", "recover_inputs_initialize", e)
self._ensure_depth_pipeline(force=True)
self._no_frame_count = 0
@staticmethod
def _coerce_depth_payload(depth_payload, subject_box=None):
if depth_payload is None:
return None
if isinstance(depth_payload, (int, float)):
d = float(depth_payload)
return d if d > 0.0 else None
if isinstance(depth_payload, np.ndarray):
arr = depth_payload
h, w = arr.shape[:2]
if subject_box:
x = max(0, int(subject_box.get("x", 0)))
y = max(0, int(subject_box.get("y", 0)))
bw = max(1, int(subject_box.get("w", 1)))
bh = max(1, int(subject_box.get("h", 1)))
x2 = min(w, x + bw)
y2 = min(h, y + bh)
roi = arr[y:y2, x:x2]
else:
roi = arr
if roi.size <= 0:
return None
vals = roi[roi > 0]
if vals.size <= 0:
return None
med = float(np.median(vals))
if arr.dtype in (np.uint16, np.int16, np.uint32, np.int32):
med *= 0.001 # mm -> m
return med if med > 0.0 else None
return None
def _estimate_depth_m(self, subject_box=None, depth_payload=None):
d = self._coerce_depth_payload(depth_payload, subject_box=subject_box)
if d is not None:
self._depth_ok = True
return d
if not self._ensure_depth_pipeline(force=False):
self._depth_ok = False
return None
try:
frames = self._rs_pipeline.poll_for_frames()
if not frames:
return None
depth_frame = frames.get_depth_frame()
if not depth_frame:
return None
w = depth_frame.get_width()
h = depth_frame.get_height()
if subject_box:
cx = int(subject_box.get("x", 0) + (subject_box.get("w", 0) / 2))
cy = int(subject_box.get("y", 0) + (subject_box.get("h", 0) / 2))
else:
cx, cy = int(w / 2), int(h / 2)
cx = max(0, min(w - 1, cx))
cy = max(0, min(h - 1, cy))
d = float(depth_frame.get_distance(cx, cy))
if d <= 0.0:
self._depth_ok = False
return None
self._depth_ok = True
return d
except Exception:
self._depth_ok = False
return None
def _extract_frame_and_depth(self):
if self._video_cap is not None:
try:
ret, frame = self._video_cap.read()
if not ret:
self._video_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, frame = self._video_cap.read()
self._camera_ok = bool(ret and frame is not None)
return frame, None
except Exception:
self._camera_ok = False
return None, None
if direct_camera_client.is_enabled():
try:
frame = direct_camera_client.frame_bgr(timeout=1.5)
if frame is not None:
self._camera_ok = True
return frame, None
except Exception:
pass
try:
frame = self._camera.capture_frame()
self._camera_ok = frame is not None
return frame, None
except Exception:
self._camera_ok = False
return None, None
def _select_subject(self, boxes, groups=None):
groups = groups or []
if not boxes:
self._subject_lost_count += 1
if (not self._hard_target_lock_enabled) and self._subject_lost_count > self._subject_lock_lost_frames:
self._locked_subject_id = None
self._locked_group_id = None
self._locked_target_type = ""
return None, False
self._subject_lost_count = 0
if self._hard_target_lock_enabled and self._locked_group_id is not None:
locked_group = None
for g in groups:
try:
if int(g.get("id", -1)) == int(self._locked_group_id):
locked_group = g
break
except Exception:
continue
if locked_group is None:
self._target_switch_blocked_count += 1
return None, False
members = set()
try:
members = {int(x) for x in (locked_group.get("members") or [])}
except Exception:
members = set()
cands = [b for b in boxes if int(b.get("id", -1)) in members]
if not cands:
self._target_switch_blocked_count += 1
return None, False
best = max(cands, key=lambda b: float(b.get("w", 0.0)) * float(b.get("h", 0.0)))
try:
self._locked_subject_id = int(best.get("id"))
except Exception:
pass
return best, True
if self._locked_subject_id is not None:
for b in boxes:
if int(b.get("id", -1)) == int(self._locked_subject_id):
return b, True
if self._hard_target_lock_enabled:
self._target_switch_blocked_count += 1
return None, False
# No lock: select largest target.
best = max(boxes, key=lambda b: float(b.get("w", 0.0)) * float(b.get("h", 0.0)))
return best, True
def _check_interaction_intent(self, boxes, subject_box, depth_m, now):
max_area = 0.0
for b in boxes:
try:
area = max(0.0, float(b.get("w", 0.0))) * max(0.0, float(b.get("h", 0.0)))
if area > max_area:
max_area = area
except Exception:
continue
self._area_history.append(max_area)
area_first = self._area_history[0] if self._area_history else 0.0
is_close_area = max_area >= self._proximity_threshold
is_approach_area = len(self._area_history) >= self._history_len and (max_area - area_first) >= self._approach_delta
approach_speed = 0.0
is_close_depth = False
is_approach_depth = False
if depth_m is not None:
self._depth_history.append((now, float(depth_m)))
if len(self._depth_history) >= 2:
t0, d0 = self._depth_history[0]
dt = max(1e-3, now - float(t0))
approach_speed = max(0.0, (float(d0) - float(depth_m)) / dt)
is_close_depth = float(depth_m) <= self._min_depth_m
is_approach_depth = approach_speed >= self._min_approach_speed_mps
else:
self._depth_history.clear()
if depth_m is not None:
is_close = is_close_depth
is_approaching = is_approach_depth
else:
is_close = is_close_area
is_approaching = is_approach_area
subject_visible = subject_box is not None
intent_detected = bool(subject_visible and (is_close or is_approaching))
return {
"intent_detected": intent_detected,
"max_area": float(max_area),
"is_close": bool(is_close),
"is_approaching": bool(is_approaching),
"depth_m": float(depth_m) if depth_m is not None else None,
"approach_speed_mps": float(approach_speed),
}
def _run(self):
interval = max(0.01, 1.0 / max(1.0, self.poll_hz))
tracker = CentroidTracker()
while not self._stop.is_set():
t0 = time.time()
now = time.time()
self._refresh_detector_backend(now)
frame, depth_payload = self._extract_frame_and_depth()
if frame is None:
self._no_frame_count += 1
if self._no_frame_count >= self._recover_after_no_frame:
self._recover_inputs()
intent = self._check_interaction_intent([], None, None, now)
with self._latest_lock:
self._latest.update(
{
"frame_time": now,
"detector_backend": self._effective_backend(),
"yolo_runtime": self._yolo_runtime,
"face_count": 0,
"person_count": 0,
"group_count": 0,
"group_size": 0,
"group_detected": False,
"boxes": [],
"face_boxes": [],
"group_boxes": [],
"frame": None,
"subject_id": self._locked_subject_id,
"subject_box": None,
"subject_visible": False,
"subject_locked": self._locked_subject_id is not None,
"target_lock_active": bool(self._locked_subject_id is not None or self._locked_group_id is not None),
"target_lock_type": str(self._locked_target_type or ""),
"target_lock_id": self._locked_group_id if self._locked_group_id is not None else self._locked_subject_id,
"target_switch_blocked_count": int(self._target_switch_blocked_count),
"camera_ok": bool(self._camera_ok),
"depth_ok": bool(self._depth_ok),
"camera_restarts": int(self._camera_restarts),
"depth_restarts": int(self._depth_restarts),
**intent,
}
)
time.sleep(interval)
continue
self._no_frame_count = 0
try:
person_boxes = self._detect_person_boxes(frame, tracker)
face_boxes = self._detect_face_boxes(frame)
groups = self._detect_groups(person_boxes)
group_size = max((int(g.get("size", 0)) for g in groups), default=0)
subject_box, subject_visible = self._select_subject(person_boxes, groups=groups)
depth_m = self._estimate_depth_m(subject_box=subject_box, depth_payload=depth_payload)
intent = self._check_interaction_intent(person_boxes, subject_box, depth_m, now)
with self._latest_lock:
self._latest.update(
{
"frame_time": now,
"detector_backend": self._effective_backend(),
"yolo_runtime": self._yolo_runtime,
"face_count": len(face_boxes),
"person_count": len(person_boxes),
"group_count": len(groups),
"group_size": int(group_size),
"group_detected": bool(group_size >= self._group_min_people),
"boxes": person_boxes,
"face_boxes": face_boxes,
"group_boxes": groups,
"frame": frame.copy() if frame is not None else None,
"subject_id": subject_box.get("id") if subject_box else self._locked_subject_id,
"subject_box": dict(subject_box) if isinstance(subject_box, dict) else None,
"subject_visible": bool(subject_visible),
"subject_locked": self._locked_subject_id is not None,
"target_lock_active": bool(self._locked_subject_id is not None or self._locked_group_id is not None),
"target_lock_type": str(self._locked_target_type or ""),
"target_lock_id": self._locked_group_id if self._locked_group_id is not None else self._locked_subject_id,
"target_switch_blocked_count": int(self._target_switch_blocked_count),
"camera_ok": bool(self._camera_ok),
"depth_ok": bool(self._depth_ok),
"camera_restarts": int(self._camera_restarts),
"depth_restarts": int(self._depth_restarts),
**intent,
}
)
except Exception as e:
record_error("vision_detector", "run_loop_detect", e)
logger.warning(f"VisionDetector: detect failed: {e}")
intent = self._check_interaction_intent([], None, None, now)
with self._latest_lock:
self._latest.update(
{
"frame_time": now,
"detector_backend": self._effective_backend(),
"yolo_runtime": self._yolo_runtime,
"face_count": 0,
"person_count": 0,
"group_count": 0,
"group_size": 0,
"group_detected": False,
"boxes": [],
"face_boxes": [],
"group_boxes": [],
"frame": frame.copy() if frame is not None else None,
"subject_id": self._locked_subject_id,
"subject_box": None,
"subject_visible": False,
"subject_locked": self._locked_subject_id is not None,
"target_lock_active": bool(self._locked_subject_id is not None or self._locked_group_id is not None),
"target_lock_type": str(self._locked_target_type or ""),
"target_lock_id": self._locked_group_id if self._locked_group_id is not None else self._locked_subject_id,
"target_switch_blocked_count": int(self._target_switch_blocked_count),
"camera_ok": bool(self._camera_ok),
"depth_ok": bool(self._depth_ok),
"camera_restarts": int(self._camera_restarts),
"depth_restarts": int(self._depth_restarts),
**intent,
}
)
dt = time.time() - t0
time.sleep(max(0.0, interval - dt))
class CentroidTracker:
"""Small centroid tracker assigning incremental IDs to objects."""
def __init__(self, max_disappeared: int = 10, max_distance: int = 80):
self.next_id = 1
self.objects = {} # id -> centroid
self.disappeared = {} # id -> frames disappeared
self.max_disappeared = max_disappeared
self.max_distance = max_distance
def _centroid(self, box):
x, y, w, h = box
return (int(x + w / 2), int(y + h / 2))
def register(self, box):
cid = self.next_id
self.next_id += 1
cent = self._centroid(box)
self.objects[cid] = cent
self.disappeared[cid] = 0
return cid
def deregister(self, cid):
del self.objects[cid]
del self.disappeared[cid]
def update(self, boxes):
"""boxes: list of (x, y, w, h), returns list of (id, box)."""
if len(boxes) == 0:
remove = []
for cid in list(self.disappeared.keys()):
self.disappeared[cid] += 1
if self.disappeared[cid] > self.max_disappeared:
remove.append(cid)
for cid in remove:
self.deregister(cid)
return []
input_centroids = [self._centroid(b) for b in boxes]
if len(self.objects) == 0:
assigned = []
for b in boxes:
cid = self.register(b)
assigned.append((cid, b))
return assigned
object_ids = list(self.objects.keys())
object_centroids = list(self.objects.values())
D = np.linalg.norm(np.array(object_centroids)[:, None, :] - np.array(input_centroids)[None, :, :], axis=2)
rows = D.min(axis=1).argsort()
cols = D.argmin(axis=1)[rows]
used_rows = set()
used_cols = set()
assigned = []
for r, c in zip(rows, cols):
if r in used_rows or c in used_cols:
continue
if D[r, c] > self.max_distance:
continue
cid = object_ids[r]
self.objects[cid] = input_centroids[c]
self.disappeared[cid] = 0
assigned.append((cid, boxes[c]))
used_rows.add(r)
used_cols.add(c)
for i, b in enumerate(boxes):
if i in used_cols:
continue
cid = self.register(b)
assigned.append((cid, b))
for i, cid in enumerate(object_ids):
if i in used_rows:
continue
self.disappeared[cid] += 1
if self.disappeared[cid] > self.max_disappeared:
self.deregister(cid)
return assigned
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
v = VisionDetector()
v.start()
try:
for _ in range(30):
s = v.latest()
logger.info(
"people=%s faces=%s groups=%s intent=%s time=%s",
s.get("person_count", 0),
s.get("face_count", 0),
s.get("group_count", 0),
s.get("intent_detected", False),
s.get("frame_time", 0.0),
)
time.sleep(0.5)
finally:
v.stop()