from __future__ import annotations import json import os import urllib.error import urllib.parse import urllib.request from typing import Any, Optional import cv2 import numpy as np def is_enabled() -> bool: return os.environ.get("DIRECT_CAMERA_ENABLED", "1").strip().lower() not in ("0", "false", "no", "off") def base_url() -> str: return ( os.environ.get("DIRECT_CAMERA_SERVER_URL", "").strip() or f"http://127.0.0.1:{int(os.environ.get('DIRECT_CAMERA_PORT', '8091'))}" ).rstrip("/") def _open(path: str, timeout: float = 3.0): url = base_url() + path req = urllib.request.Request(url, method="GET") return urllib.request.urlopen(req, timeout=timeout) def get_json(path: str, timeout: float = 3.0) -> dict[str, Any]: with _open(path, timeout=timeout) as resp: raw = resp.read() data = json.loads(raw.decode("utf-8")) if isinstance(data, dict): return data raise RuntimeError("direct camera server returned non-object JSON") def get_bytes(path: str, timeout: float = 3.0) -> bytes: with _open(path, timeout=timeout) as resp: return resp.read() def health(timeout: float = 2.0) -> dict[str, Any]: return get_json("/api/health", timeout=timeout) def cameras(timeout: float = 2.0) -> dict[str, Any]: return get_json("/api/cameras", timeout=timeout) def capture(prefix: str = "photo", ext: str = "jpg", timeout: float = 10.0) -> str: q = urllib.parse.urlencode({"prefix": prefix, "ext": ext}) data = get_json(f"/api/capture?{q}", timeout=timeout) if not data.get("ok"): raise RuntimeError(str(data.get("error") or "capture failed")) path = str(data.get("path") or "").strip() if not path: raise RuntimeError("capture response missing saved path") return path def set_source(source: str, timeout: float = 8.0) -> dict[str, Any]: q = urllib.parse.urlencode({"source": source}) data = get_json(f"/api/set_camera?{q}", timeout=timeout) if not data.get("ok"): raise RuntimeError(str(data.get("error") or "camera switch failed")) return data def set_preferred_camera(serial: str, timeout: float = 8.0) -> dict[str, Any]: q = urllib.parse.urlencode({"serial": serial}) data = get_json(f"/api/set_preferred_camera?{q}", timeout=timeout) if not data.get("ok"): raise RuntimeError(str(data.get("error") or "preferred camera update failed")) return data def set_resolution(width: int, height: int, fps: int, timeout: float = 8.0) -> dict[str, Any]: q = urllib.parse.urlencode({"width": int(width), "height": int(height), "fps": int(fps)}) data = get_json(f"/api/set_resolution?{q}", timeout=timeout) if not data.get("ok"): raise RuntimeError(str(data.get("error") or "resolution change failed")) return data def frame_jpeg(timeout: float = 2.0) -> bytes: return get_bytes("/api/frame.jpg", timeout=timeout) def frame_bgr(timeout: float = 2.0) -> Optional[np.ndarray]: try: raw = frame_jpeg(timeout=timeout) except urllib.error.URLError: return None except Exception: return None arr = np.frombuffer(raw, dtype=np.uint8) if arr.size <= 0: return None frame = cv2.imdecode(arr, cv2.IMREAD_COLOR) return frame