117 lines
4.1 KiB
Python

"""YOLO object detector wrapper (Ultralytics).
Thin adapter that runs an Ultralytics YOLO model on raw BGR frames and emits
:class:`gowelcome.types.Detection` boxes. Classification of those boxes into
*persons* vs *dangers* is intentionally **not** done here -- that is the
perception thread's job (it knows the per-class confidence/size gates). We just
return everything at or above the lowest confidence floor we care about.
The ``ultralytics`` import is lazy (inside ``__init__``) so the package imports
fine on a machine without it; off-robot unit tests can still import this module.
"""
from __future__ import annotations
from typing import List
from config import PerceptionConfig
from gowelcome.types import Detection
class YoloDetector:
"""Run an Ultralytics YOLO model and return raw :class:`Detection` boxes."""
def __init__(self, cfg: PerceptionConfig) -> None:
"""Load the YOLO model described by ``cfg``.
Args:
cfg: Perception configuration (model path, device, conf floors,
tracking toggle, inference image size).
Raises:
ImportError: if ``ultralytics`` is not installed, with a hint to
``pip install ultralytics``.
"""
try:
from ultralytics import YOLO # lazy: heavy/optional dep
except ImportError as exc: # pragma: no cover - exercised off-robot
raise ImportError(
"ultralytics is required for YoloDetector. "
"Install it with: pip install ultralytics"
) from exc
self.cfg = cfg
# Lowest confidence we ever keep; the thread applies the stricter
# per-class gates (person_conf / danger_conf) afterwards.
self.conf_floor: float = min(cfg.person_conf, cfg.danger_conf)
self.model = YOLO(cfg.model_path)
# Move to the requested device when possible (CPU/cuda/cuda:0...).
if cfg.device:
try:
self.model.to(cfg.device)
except Exception: # pragma: no cover - device/driver dependent
pass
# Optional FP16 inference (discrete GPU only). Guarded: a no-op or
# failure here must never break detection.
if cfg.use_half:
try:
self.model.model.half()
except Exception: # pragma: no cover - hardware dependent
pass
# ``names`` may be a dict {id: label} or a list; both are handled in
# detect() via the ``get`` probe.
self.names = self.model.names
def detect(self, frame) -> List[Detection]:
"""Run inference on one BGR frame and return all kept detections.
Args:
frame: ``HxWx3`` BGR ``uint8`` numpy array. ``None`` or empty
frames yield an empty list.
Returns:
Every detection with confidence ``>= conf_floor``, as
:class:`Detection` objects (pixel coords, optional track id).
"""
if frame is None or getattr(frame, "size", 0) == 0:
return []
cfg = self.cfg
if cfg.use_tracking:
results = self.model.track(
frame,
persist=True,
conf=self.conf_floor,
tracker=cfg.tracker,
verbose=False,
imgsz=cfg.infer_imgsz,
)[0]
else:
results = self.model(
frame,
conf=self.conf_floor,
verbose=False,
imgsz=cfg.infer_imgsz,
)[0]
names = self.names
detections: List[Detection] = []
for box in results.boxes:
cls_id = int(box.cls[0])
conf = float(box.conf[0])
x1, y1, x2, y2 = map(int, box.xyxy[0])
tid = int(box.id[0]) if getattr(box, "id", None) is not None else -1
label = (
names[cls_id]
if not hasattr(names, "get")
else names.get(cls_id, str(cls_id))
)
detections.append(
Detection(label, conf, x1, y1, x2, y2, track_id=tid)
)
return detections