594 lines
24 KiB
Python
594 lines
24 KiB
Python
"""Camera daemon — single producer, in-memory frame cache.
|
|
|
|
Captures frames at fixed FPS from a RealSense (preferred) or any USB
|
|
camera (fallback), JPEG-encodes them, and caches the latest frame in
|
|
memory in two views (matches Marcus's API/camera_api.py):
|
|
|
|
- `_latest_jpeg` raw JPEG bytes — dashboard preview + frame forwarder
|
|
- `_latest_b64` base64 ASCII — frame forwarder → Gemini child stdin
|
|
|
|
Consumers:
|
|
- dashboard preview → `snapshot_jpeg()` (served as an HTTP Response)
|
|
- face enrollment → `get_fresh_frame()` for a guaranteed-current capture
|
|
- GeminiSubprocess → `get_frame_b64()`, pushed over the child's stdin
|
|
|
|
Lifecycle is driven by the Recognition tab toggle. The daemon is idle
|
|
until `start()` is called; failures in start() are non-fatal and
|
|
reported via `is_running()` / `backend`. Once running it auto-reconnects
|
|
on USB unplug / stalled frames (Marcus-style resilience), and supports
|
|
hot `reconfigure()` of resolution/FPS without a full restart.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import os
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
|
|
from Project.Sanad.core.logger import get_logger
|
|
|
|
log = get_logger("camera")
|
|
|
|
# How many /dev/video* indices to scan for a USB-style color camera when
|
|
# RealSense isn't available. A RealSense exposes ~6 V4L2 nodes (depth, IR,
|
|
# color, metadata…) — the color one is rarely index 0, so we probe each
|
|
# and accept the first that yields a real 3-channel BGR frame.
|
|
_USB_SCAN_RANGE = 10
|
|
|
|
|
|
class CameraDaemon:
|
|
"""RealSense → USB fallback camera capture with in-memory frame cache."""
|
|
|
|
def __init__(
|
|
self,
|
|
width: int = 424,
|
|
height: int = 240,
|
|
fps: int = 15,
|
|
jpeg_quality: int = 70,
|
|
stale_threshold_s: float = 10.0,
|
|
reconnect_min_s: float = 2.0,
|
|
reconnect_max_s: float = 10.0,
|
|
capture_timeout_ms: int = 5000,
|
|
) -> None:
|
|
# Active profile — guarded by _reconfig_lock so reconfigure() can
|
|
# hot-swap it from another thread between capture sessions.
|
|
self._reconfig_lock = threading.Lock()
|
|
self._w = int(width)
|
|
self._h = int(height)
|
|
self._fps = int(fps)
|
|
self._q = max(10, min(95, int(jpeg_quality)))
|
|
self._reconfig_pending = False
|
|
|
|
# Resilience knobs (Marcus-style)
|
|
self._stale_s = float(stale_threshold_s)
|
|
self._reconnect_min_s = float(reconnect_min_s)
|
|
self._reconnect_max_s = float(reconnect_max_s)
|
|
self._capture_timeout_ms = int(capture_timeout_ms)
|
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop = threading.Event()
|
|
self._backend: Optional[str] = None
|
|
self._lock = threading.Lock()
|
|
self._latest_jpeg: Optional[bytes] = None
|
|
self._latest_b64: Optional[str] = None
|
|
self._latest_ts: float = 0.0
|
|
self._frame_seq: int = 0
|
|
self._error: Optional[str] = None
|
|
self._reconnect_count: int = 0
|
|
|
|
# ── public API ──────────────────────────────────────────
|
|
|
|
@property
|
|
def backend(self) -> Optional[str]:
|
|
return self._backend
|
|
|
|
@property
|
|
def error(self) -> Optional[str]:
|
|
return self._error
|
|
|
|
@property
|
|
def frame_seq(self) -> int:
|
|
return self._frame_seq
|
|
|
|
def is_running(self) -> bool:
|
|
return self._thread is not None and self._thread.is_alive()
|
|
|
|
def start(self) -> bool:
|
|
"""Start capture thread. Returns True if a backend was acquired.
|
|
|
|
Initial probe is synchronous; if it fails the thread isn't spawned.
|
|
Once running, the inner loop auto-reconnects on USB unplug or
|
|
stalled frames using exponential backoff (`reconnect_min_s` ..
|
|
`reconnect_max_s`).
|
|
"""
|
|
if self.is_running():
|
|
return True
|
|
self._stop.clear()
|
|
self._error = None
|
|
self._reconnect_count = 0
|
|
|
|
# One-shot USB-2.0 negotiation diagnostic (warns operator if D435I
|
|
# came up on USB 2.0 — frame drops would be likely otherwise).
|
|
self._check_usb_version()
|
|
|
|
backend = self._probe_any()
|
|
if backend is None:
|
|
log.warning("Camera: no backend available (RealSense + USB both failed)")
|
|
self._backend = None
|
|
return False
|
|
|
|
self._backend = backend["name"]
|
|
self._thread = threading.Thread(
|
|
target=self._reconnect_loop, args=(backend,),
|
|
daemon=True, name="camera-daemon",
|
|
)
|
|
self._thread.start()
|
|
with self._reconfig_lock:
|
|
w, h, f = self._w, self._h, self._fps
|
|
log.info("Camera started (backend=%s, %dx%d @ %dfps)",
|
|
self._backend, w, h, f)
|
|
return True
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the capture thread and release the hardware."""
|
|
if not self.is_running():
|
|
self._backend = None
|
|
self._clear_cache()
|
|
return
|
|
self._stop.set()
|
|
t = self._thread
|
|
if t is not None:
|
|
t.join(timeout=2.0)
|
|
self._thread = None
|
|
self._backend = None
|
|
# Drop the last captured frame so snapshot_jpeg()/get_frame_b64()
|
|
# return None once vision is OFF — otherwise the /frame.jpg preview
|
|
# and the enroll path keep serving a frozen image of whoever was
|
|
# last in front of the camera.
|
|
self._clear_cache()
|
|
log.info("Camera stopped")
|
|
|
|
def _clear_cache(self) -> None:
|
|
"""Drop the cached frame views so nothing stale is served."""
|
|
with self._lock:
|
|
self._latest_jpeg = None
|
|
self._latest_b64 = None
|
|
self._latest_ts = 0.0
|
|
|
|
def reconfigure(self, width: Optional[int] = None, height: Optional[int] = None,
|
|
fps: Optional[int] = None, jpeg_quality: Optional[int] = None) -> dict:
|
|
"""Hot-swap the capture profile without a full stop/start.
|
|
|
|
Sets a pending flag — the capture loop notices it, tears the
|
|
pipeline down, and rebuilds at the new resolution (~0.5 s gap).
|
|
If the daemon isn't running the new values just take effect on
|
|
the next `start()`. Returns the resulting active profile.
|
|
"""
|
|
with self._reconfig_lock:
|
|
if width is not None:
|
|
self._w = int(width)
|
|
if height is not None:
|
|
self._h = int(height)
|
|
if fps is not None:
|
|
self._fps = int(fps)
|
|
if jpeg_quality is not None:
|
|
self._q = max(10, min(95, int(jpeg_quality)))
|
|
if self.is_running():
|
|
self._reconfig_pending = True
|
|
profile = {"width": self._w, "height": self._h,
|
|
"fps": self._fps, "jpeg_quality": self._q}
|
|
log.info("Camera reconfigure → %s", profile)
|
|
return profile
|
|
|
|
def snapshot_jpeg(self) -> Optional[bytes]:
|
|
"""Return the latest JPEG bytes, or None if no frame yet."""
|
|
with self._lock:
|
|
return self._latest_jpeg
|
|
|
|
def get_frame_b64(self) -> Optional[str]:
|
|
"""Return the latest frame as a base64 ASCII string (or None).
|
|
|
|
Used by the frame forwarder to push frames over the Gemini child's
|
|
stdin without re-encoding — base64 is cached alongside the JPEG.
|
|
"""
|
|
with self._lock:
|
|
return self._latest_b64
|
|
|
|
def get_fresh_frame(self, max_age_s: float = 0.5,
|
|
timeout_s: float = 1.5) -> Optional[bytes]:
|
|
"""Return a JPEG frame newer than `max_age_s`, waiting up to `timeout_s`.
|
|
|
|
Used by face enrollment so the captured frame is guaranteed to be
|
|
the *current* scene, not a stale buffer from before the user got
|
|
into position. On timeout, only falls back to the cached frame if
|
|
it is still within the stale threshold — otherwise returns None so
|
|
the enroll route raises 409 rather than capturing an old scene
|
|
(e.g. while the daemon is stuck reconnecting).
|
|
"""
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
with self._lock:
|
|
if (self._latest_jpeg is not None
|
|
and self._latest_ts > 0
|
|
and (time.time() - self._latest_ts) <= max_age_s):
|
|
return self._latest_jpeg
|
|
time.sleep(0.03)
|
|
# Timed out waiting for a fresh frame. Hand back the cached frame
|
|
# only if it isn't dangerously stale; never enrol an arbitrarily
|
|
# old scene.
|
|
with self._lock:
|
|
if (self._latest_jpeg is not None
|
|
and self._latest_ts > 0
|
|
and (time.time() - self._latest_ts) <= self._stale_s):
|
|
return self._latest_jpeg
|
|
return None
|
|
|
|
def latest_age_s(self) -> float:
|
|
"""Seconds since last successful frame; +inf if none."""
|
|
with self._lock:
|
|
if self._latest_ts <= 0:
|
|
return float("inf")
|
|
return time.time() - self._latest_ts
|
|
|
|
def status(self) -> dict:
|
|
with self._reconfig_lock:
|
|
w, h, f, q = self._w, self._h, self._fps, self._q
|
|
# latest_age_s() is +inf until the first frame lands. inf is NOT
|
|
# JSON-serialisable by Starlette's JSONResponse (allow_nan=False) —
|
|
# leaving it as inf would 500 the /api/recognition/* routes. Map
|
|
# "running but no frame yet" and "not running" both to None.
|
|
age = self.latest_age_s()
|
|
running = self.is_running()
|
|
age_s = round(age, 2) if (running and age != float("inf")) else None
|
|
# Snapshot the report counters under _lock for a consistent view —
|
|
# the capture/reconnect thread mutates these (see _reconnect_loop).
|
|
# Read latest_age_s()/is_running() above (they self-lock) so we
|
|
# don't re-enter this non-reentrant lock.
|
|
with self._lock:
|
|
backend = self._backend
|
|
frame_seq = self._frame_seq
|
|
error = self._error
|
|
reconnect_count = self._reconnect_count
|
|
return {
|
|
"running": running,
|
|
"backend": backend,
|
|
"width": w,
|
|
"height": h,
|
|
"fps": f,
|
|
"jpeg_quality": q,
|
|
"frame_seq": frame_seq,
|
|
"age_s": age_s,
|
|
"error": error,
|
|
"reconnect_count": reconnect_count,
|
|
}
|
|
|
|
# ── helpers ─────────────────────────────────────────────
|
|
|
|
def _probe_any(self) -> Optional[dict]:
|
|
"""Try RealSense first, then USB. Returns backend dict or None."""
|
|
b = self._probe_realsense()
|
|
if b is None:
|
|
b = self._probe_usb()
|
|
return b
|
|
|
|
def _check_usb_version(self) -> None:
|
|
"""Warn if a connected RealSense negotiated USB 2.0 (needs 3.x).
|
|
|
|
Marcus has this same check — D435I on USB 2.0 can't deliver
|
|
color+depth+IMU and the pipeline silently stalls. Catching it at
|
|
startup lets the operator fix the cable/port instead of chasing a
|
|
"no frames" loop. Diagnostic only; never blocks startup.
|
|
"""
|
|
try:
|
|
import pyrealsense2 as rs # type: ignore
|
|
ctx = rs.context()
|
|
for dev in ctx.query_devices():
|
|
try:
|
|
usb_type = dev.get_info(rs.camera_info.usb_type_descriptor)
|
|
name = dev.get_info(rs.camera_info.name)
|
|
except Exception:
|
|
continue
|
|
if str(usb_type).startswith("2."):
|
|
log.warning(
|
|
"RealSense %s negotiated USB %s — expected 3.x. "
|
|
"Frame drops likely. Try a USB 3 port / shorter cable / "
|
|
"powered hub.", name, usb_type,
|
|
)
|
|
else:
|
|
log.info("RealSense %s on USB %s", name, usb_type)
|
|
except Exception:
|
|
pass
|
|
|
|
# ── backend probing ─────────────────────────────────────
|
|
|
|
def _probe_realsense(self) -> Optional[dict]:
|
|
with self._reconfig_lock:
|
|
w, h, f = self._w, self._h, self._fps
|
|
try:
|
|
import pyrealsense2 as rs # type: ignore
|
|
pipeline = rs.pipeline()
|
|
cfg = rs.config()
|
|
cfg.enable_stream(rs.stream.color, w, h, rs.format.bgr8, f)
|
|
profile = pipeline.start(cfg)
|
|
return {"name": "realsense", "pipeline": pipeline, "rs": rs,
|
|
"profile": profile}
|
|
except Exception as exc:
|
|
log.info("RealSense unavailable: %s", exc)
|
|
return None
|
|
|
|
def _open_usb_index(self, idx: int, w: int, h: int, f: int,
|
|
cv2) -> Optional[dict]:
|
|
"""Open one /dev/video<idx>, validate it yields a 3-channel frame,
|
|
and classify it as colour vs grayscale/IR.
|
|
|
|
A RealSense IR node delivers Y8 — cv2 replicates that single plane
|
|
across 3 channels, so the planes come back *bit-identical*. A real
|
|
colour sensor never produces bit-identical channels (per-channel
|
|
sensor noise differs even on a flat gray scene). That's the test.
|
|
Returns a backend dict with `is_color`, or None if the node is
|
|
unusable.
|
|
"""
|
|
cap = None
|
|
try:
|
|
cap = cv2.VideoCapture(idx)
|
|
if not cap.isOpened():
|
|
cap.release()
|
|
return None
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
|
|
cap.set(cv2.CAP_PROP_FPS, f)
|
|
good = None
|
|
for _ in range(5):
|
|
ok, frame = cap.read()
|
|
if (ok and frame is not None and frame.ndim == 3
|
|
and frame.shape[2] == 3):
|
|
good = frame
|
|
break
|
|
if good is None:
|
|
cap.release()
|
|
return None
|
|
is_color = not (
|
|
np.array_equal(good[:, :, 0], good[:, :, 1])
|
|
and np.array_equal(good[:, :, 1], good[:, :, 2])
|
|
)
|
|
return {"name": "usb", "cap": cap, "cv2": cv2, "index": idx,
|
|
"is_color": is_color,
|
|
"frame_wh": (good.shape[1], good.shape[0])}
|
|
except Exception as exc:
|
|
log.info("USB camera index %d: %s", idx, exc)
|
|
if cap is not None:
|
|
try:
|
|
cap.release()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _probe_usb(self) -> Optional[dict]:
|
|
"""Scan /dev/video* for a colour camera node, falling back to a
|
|
grayscale/IR node only if no colour node exists.
|
|
|
|
On a RealSense, /dev/video0 is the *depth* stream (Z16, cv2 can't
|
|
open it as a webcam); the IR nodes deliver Y8 (grayscale); the
|
|
*colour* node delivers YUYV/BGR. We can't know the index up front,
|
|
so we probe each and prefer the first genuine colour node — that's
|
|
why the dashboard preview used to come up grayscale. Pin a node
|
|
with SANAD_CAMERA_USB_INDEX=<n> to skip the scan entirely.
|
|
"""
|
|
with self._reconfig_lock:
|
|
w, h, f = self._w, self._h, self._fps
|
|
try:
|
|
import cv2 # type: ignore
|
|
except Exception as exc:
|
|
log.info("USB camera unavailable: %s", exc)
|
|
return None
|
|
|
|
# Pinned index — accept whatever it is (colour or not).
|
|
explicit = os.environ.get("SANAD_CAMERA_USB_INDEX", "").strip()
|
|
if explicit.isdigit():
|
|
backend = self._open_usb_index(int(explicit), w, h, f, cv2)
|
|
if backend is not None:
|
|
fw, fh = backend["frame_wh"]
|
|
log.info("USB camera: pinned /dev/video%d (%dx%d, %s)",
|
|
backend["index"], fw, fh,
|
|
"colour" if backend["is_color"] else "grayscale/IR")
|
|
return backend
|
|
log.warning("USB camera: pinned index %s unusable", explicit)
|
|
return None
|
|
|
|
# Scan — prefer a real colour node; keep the first grayscale node
|
|
# as a last resort so the camera still works if that's all there is.
|
|
gray_fallback: Optional[dict] = None
|
|
for idx in range(_USB_SCAN_RANGE):
|
|
backend = self._open_usb_index(idx, w, h, f, cv2)
|
|
if backend is None:
|
|
continue
|
|
fw, fh = backend["frame_wh"]
|
|
if backend["is_color"]:
|
|
log.info("USB camera: using /dev/video%d (colour, %dx%d)",
|
|
idx, fw, fh)
|
|
if gray_fallback is not None:
|
|
try:
|
|
gray_fallback["cap"].release()
|
|
except Exception:
|
|
pass
|
|
return backend
|
|
# grayscale/IR — remember the first, release any extras
|
|
if gray_fallback is None:
|
|
gray_fallback = backend
|
|
else:
|
|
try:
|
|
backend["cap"].release()
|
|
except Exception:
|
|
pass
|
|
|
|
if gray_fallback is not None:
|
|
fw, fh = gray_fallback["frame_wh"]
|
|
log.warning("USB camera: no colour node found — falling back to "
|
|
"/dev/video%d (grayscale/IR, %dx%d). For a RealSense, "
|
|
"build pyrealsense2 or pin the colour node with "
|
|
"SANAD_CAMERA_USB_INDEX.", gray_fallback["index"], fw, fh)
|
|
return gray_fallback
|
|
|
|
log.info("USB camera unavailable: no working /dev/video* node found "
|
|
"(scanned %d indices)", _USB_SCAN_RANGE)
|
|
return None
|
|
|
|
# ── main capture loop ───────────────────────────────────
|
|
|
|
def _reconnect_loop(self, initial_backend: dict) -> None:
|
|
"""Outer loop — owns reconnect with exponential backoff.
|
|
|
|
Inner `_capture_session` runs until the camera goes stale, the
|
|
stop flag is set, or a reconfigure is requested. On stall we
|
|
sleep + re-probe; on reconfigure we re-probe immediately at the
|
|
new resolution. Backoff resets after a successful session.
|
|
"""
|
|
backend = initial_backend
|
|
backoff = self._reconnect_min_s
|
|
|
|
while not self._stop.is_set():
|
|
reconfigured = False
|
|
try:
|
|
reconfigured = self._capture_session(backend)
|
|
except Exception as exc:
|
|
log.exception("Camera capture session crashed: %s", exc)
|
|
self._error = str(exc)
|
|
finally:
|
|
self._teardown(backend)
|
|
|
|
if self._stop.is_set():
|
|
break
|
|
|
|
if reconfigured:
|
|
# Fast path — rebuild immediately at the new profile.
|
|
with self._reconfig_lock:
|
|
self._reconfig_pending = False
|
|
new_backend = self._probe_any()
|
|
if new_backend is None:
|
|
self._error = "reconnecting"
|
|
log.warning("Camera reconfigure: re-probe failed — "
|
|
"retrying in %.1fs", backoff)
|
|
if self._stop.wait(backoff):
|
|
break
|
|
backoff = min(backoff * 2, self._reconnect_max_s)
|
|
continue
|
|
self._backend = new_backend["name"]
|
|
self._error = None
|
|
backend = new_backend
|
|
backoff = self._reconnect_min_s
|
|
log.info("Camera rebuilt after reconfigure (backend=%s)",
|
|
self._backend)
|
|
continue
|
|
|
|
# Capture session ended unexpectedly (stall / crash). Sleep + re-probe.
|
|
self._error = "reconnecting"
|
|
log.warning("Camera disconnected — reconnecting in %.1fs", backoff)
|
|
if self._stop.wait(backoff): # interruptible sleep
|
|
break
|
|
backoff = min(backoff * 2, self._reconnect_max_s)
|
|
|
|
new_backend = self._probe_any()
|
|
if new_backend is None:
|
|
self._backend = None
|
|
continue # stay in the loop; next iteration retries
|
|
self._backend = new_backend["name"]
|
|
self._reconnect_count += 1
|
|
self._error = None
|
|
log.info("Camera reconnected (backend=%s, attempt #%d)",
|
|
self._backend, self._reconnect_count)
|
|
backend = new_backend
|
|
backoff = self._reconnect_min_s # reset on success
|
|
|
|
def _capture_session(self, backend: dict) -> bool:
|
|
"""Inner capture loop — runs until stop, stale-frame timeout, or
|
|
a reconfigure request.
|
|
|
|
Returns True if it exited because of a reconfigure (caller rebuilds
|
|
immediately), False on a stall or clean stop.
|
|
"""
|
|
import cv2 # always available — used for JPEG encode
|
|
|
|
with self._reconfig_lock:
|
|
encode_params = [int(cv2.IMWRITE_JPEG_QUALITY), self._q]
|
|
last_frame_time = time.time()
|
|
consecutive_failures = 0
|
|
|
|
while not self._stop.is_set():
|
|
if self._reconfig_pending:
|
|
log.info("Camera reconfigure requested — rebuilding pipeline")
|
|
return True
|
|
|
|
bgr = self._read_frame(backend)
|
|
if bgr is None:
|
|
consecutive_failures += 1
|
|
age = time.time() - last_frame_time
|
|
if age > self._stale_s:
|
|
log.warning(
|
|
"Camera stalled %.1fs (%d consecutive timeouts) — "
|
|
"rebuilding pipeline", age, consecutive_failures,
|
|
)
|
|
return False
|
|
# Intermediate warnings so degradation is visible early
|
|
if consecutive_failures in (3, 10, 30):
|
|
log.warning("Camera slow (%d failures, age %.1fs)",
|
|
consecutive_failures, age)
|
|
time.sleep(0.05)
|
|
continue
|
|
|
|
try:
|
|
ok, buf = cv2.imencode(".jpg", bgr, encode_params)
|
|
except Exception as exc:
|
|
log.warning("JPEG encode failed: %s", exc)
|
|
continue
|
|
if not ok:
|
|
continue
|
|
jpeg = bytes(buf)
|
|
b64 = base64.b64encode(jpeg).decode("ascii")
|
|
now = time.time()
|
|
with self._lock:
|
|
self._latest_jpeg = jpeg
|
|
self._latest_b64 = b64
|
|
self._latest_ts = now
|
|
self._frame_seq += 1
|
|
last_frame_time = now
|
|
consecutive_failures = 0
|
|
|
|
return False
|
|
|
|
def _read_frame(self, backend: dict) -> Optional[np.ndarray]:
|
|
name = backend["name"]
|
|
if name == "realsense":
|
|
try:
|
|
frames = backend["pipeline"].wait_for_frames(
|
|
timeout_ms=self._capture_timeout_ms,
|
|
)
|
|
color = frames.get_color_frame()
|
|
if not color:
|
|
return None
|
|
return np.asanyarray(color.get_data())
|
|
except Exception:
|
|
# Soft path — single timeouts handled by _capture_session's
|
|
# stale-detection logic; don't spam the log per frame.
|
|
return None
|
|
elif name == "usb":
|
|
cap = backend["cap"]
|
|
ok, frame = cap.read()
|
|
if not ok or frame is None:
|
|
return None
|
|
return frame
|
|
return None
|
|
|
|
def _teardown(self, backend: dict) -> None:
|
|
name = backend.get("name")
|
|
try:
|
|
if name == "realsense":
|
|
backend["pipeline"].stop()
|
|
elif name == "usb":
|
|
backend["cap"].release()
|
|
except Exception as exc:
|
|
log.info("Camera teardown: %s", exc)
|