104 lines
3.2 KiB
Python
104 lines
3.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from typing import Any, Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def is_enabled() -> bool:
|
|
return os.environ.get("DIRECT_CAMERA_ENABLED", "1").strip().lower() not in ("0", "false", "no", "off")
|
|
|
|
|
|
def base_url() -> str:
|
|
return (
|
|
os.environ.get("DIRECT_CAMERA_SERVER_URL", "").strip()
|
|
or f"http://127.0.0.1:{int(os.environ.get('DIRECT_CAMERA_PORT', '8091'))}"
|
|
).rstrip("/")
|
|
|
|
|
|
def _open(path: str, timeout: float = 3.0):
|
|
url = base_url() + path
|
|
req = urllib.request.Request(url, method="GET")
|
|
return urllib.request.urlopen(req, timeout=timeout)
|
|
|
|
|
|
def get_json(path: str, timeout: float = 3.0) -> dict[str, Any]:
|
|
with _open(path, timeout=timeout) as resp:
|
|
raw = resp.read()
|
|
data = json.loads(raw.decode("utf-8"))
|
|
if isinstance(data, dict):
|
|
return data
|
|
raise RuntimeError("direct camera server returned non-object JSON")
|
|
|
|
|
|
def get_bytes(path: str, timeout: float = 3.0) -> bytes:
|
|
with _open(path, timeout=timeout) as resp:
|
|
return resp.read()
|
|
|
|
|
|
def health(timeout: float = 2.0) -> dict[str, Any]:
|
|
return get_json("/api/health", timeout=timeout)
|
|
|
|
|
|
def cameras(timeout: float = 2.0) -> dict[str, Any]:
|
|
return get_json("/api/cameras", timeout=timeout)
|
|
|
|
|
|
def capture(prefix: str = "photo", ext: str = "jpg", timeout: float = 10.0) -> str:
|
|
q = urllib.parse.urlencode({"prefix": prefix, "ext": ext})
|
|
data = get_json(f"/api/capture?{q}", timeout=timeout)
|
|
if not data.get("ok"):
|
|
raise RuntimeError(str(data.get("error") or "capture failed"))
|
|
path = str(data.get("path") or "").strip()
|
|
if not path:
|
|
raise RuntimeError("capture response missing saved path")
|
|
return path
|
|
|
|
|
|
def set_source(source: str, timeout: float = 8.0) -> dict[str, Any]:
|
|
q = urllib.parse.urlencode({"source": source})
|
|
data = get_json(f"/api/set_camera?{q}", timeout=timeout)
|
|
if not data.get("ok"):
|
|
raise RuntimeError(str(data.get("error") or "camera switch failed"))
|
|
return data
|
|
|
|
|
|
def set_preferred_camera(serial: str, timeout: float = 8.0) -> dict[str, Any]:
|
|
q = urllib.parse.urlencode({"serial": serial})
|
|
data = get_json(f"/api/set_preferred_camera?{q}", timeout=timeout)
|
|
if not data.get("ok"):
|
|
raise RuntimeError(str(data.get("error") or "preferred camera update failed"))
|
|
return data
|
|
|
|
|
|
def set_resolution(width: int, height: int, fps: int, timeout: float = 8.0) -> dict[str, Any]:
|
|
q = urllib.parse.urlencode({"width": int(width), "height": int(height), "fps": int(fps)})
|
|
data = get_json(f"/api/set_resolution?{q}", timeout=timeout)
|
|
if not data.get("ok"):
|
|
raise RuntimeError(str(data.get("error") or "resolution change failed"))
|
|
return data
|
|
|
|
|
|
def frame_jpeg(timeout: float = 2.0) -> bytes:
|
|
return get_bytes("/api/frame.jpg", timeout=timeout)
|
|
|
|
|
|
def frame_bgr(timeout: float = 2.0) -> Optional[np.ndarray]:
|
|
try:
|
|
raw = frame_jpeg(timeout=timeout)
|
|
except urllib.error.URLError:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
arr = np.frombuffer(raw, dtype=np.uint8)
|
|
if arr.size <= 0:
|
|
return None
|
|
frame = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
return frame
|