"""Camera backends: RealSense SDK (with optional aligned depth) and OpenCV V4L2. RealSenseCapture exposes the latest depth frame via ``latest_depth`` (a numpy uint16 array in millimetres) and ``depth_scale`` (meters per raw unit). Non-RealSense captures leave ``latest_depth = None`` — callers must guard. """ from __future__ import annotations from typing import Optional import cv2 import numpy as np from utils.config import load_config from utils.logger import get_logger log = get_logger("Inference", "camera") _CFG = load_config("core")["camera"] try: import pyrealsense2 as rs HAS_REALSENSE = True except ImportError: HAS_REALSENSE = False class RealSenseCapture: """pyrealsense2 pipeline with an OpenCV-like read() interface. When ``enable_depth`` is true (default from core_config.camera.enable_depth), the pipeline also streams depth and aligns each frame to the color view. The aligned depth frame is stored on ``self.latest_depth`` as a numpy array. """ def __init__(self, width: int = _CFG["width"], height: int = _CFG["height"], fps: int = _CFG["fps"], serial: Optional[str] = None, enable_depth: bool = _CFG.get("enable_depth", True)): if not HAS_REALSENSE: raise RuntimeError("pyrealsense2 not installed") self.pipeline = rs.pipeline() cfg = rs.config() if serial: cfg.enable_device(serial) cfg.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps) self.has_depth = bool(enable_depth) self._align = None self.depth_scale = 0.001 # mm per raw unit default; overwritten below if self.has_depth: cfg.enable_stream(rs.stream.depth, width, height, rs.format.z16, fps) self.profile = self.pipeline.start(cfg) self._open = True if self.has_depth: try: depth_sensor = self.profile.get_device().first_depth_sensor() self.depth_scale = float(depth_sensor.get_depth_scale()) self._align = rs.align(rs.stream.color) except Exception as e: log.warning(f"Depth init failed ({e}); disabling depth") self.has_depth = False self._align = None self.latest_depth: Optional[np.ndarray] = None dev = self.profile.get_device() log.info( f"RealSense opened | {dev.get_info(rs.camera_info.name)} " f"serial={dev.get_info(rs.camera_info.serial_number)} " f"{width}x{height}@{fps} depth={self.has_depth}" ) def isOpened(self) -> bool: return self._open def read(self): if not self._open: return False, None try: frames = self.pipeline.wait_for_frames(timeout_ms=3000) if self._align is not None: frames = self._align.process(frames) color = frames.get_color_frame() if not color: return False, None if self.has_depth: depth = frames.get_depth_frame() self.latest_depth = ( np.asanyarray(depth.get_data()) if depth else None ) return True, np.asanyarray(color.get_data()) except Exception: return False, None def release(self): if self._open: self.pipeline.stop() self._open = False def open_capture(source: str): if source.lower().startswith("realsense"): serial = None if ":" in source: serial = source.split(":", 1)[1] return RealSenseCapture( width=_CFG["width"], height=_CFG["height"], fps=_CFG["fps"], serial=serial, ) if str(source).isdigit(): idx = int(source) cap = cv2.VideoCapture(idx) if cap.isOpened(): return cap cap = cv2.VideoCapture(idx, cv2.CAP_ANY) if cap.isOpened(): return cap cap = cv2.VideoCapture(idx, cv2.CAP_V4L2) return cap if source.startswith("/dev/video"): cap = cv2.VideoCapture(source, cv2.CAP_V4L2) if cap.isOpened(): cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) cap.set(cv2.CAP_PROP_FRAME_WIDTH, _CFG["width"]) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, _CFG["height"]) cap.set(cv2.CAP_PROP_FPS, _CFG["fps"]) return cap return cv2.VideoCapture(source) def estimate_person_distance_m(depth_frame, bbox, depth_scale: float) -> Optional[float]: """Median distance in metres inside ``bbox`` on the depth frame. Returns ``None`` if depth is unavailable or the bbox falls outside the valid area. Ignores zero-depth pixels (RealSense's "no return" marker). """ if depth_frame is None: return None h, w = depth_frame.shape[:2] x1, y1, x2, y2 = bbox x1 = max(0, min(int(x1), w - 1)) x2 = max(0, min(int(x2), w)) y1 = max(0, min(int(y1), h - 1)) y2 = max(0, min(int(y2), h)) if x2 <= x1 or y2 <= y1: return None roi = depth_frame[y1:y2, x1:x2] valid = roi[roi > 0] if valid.size == 0: return None return float(np.median(valid) * depth_scale)