1171 lines
43 KiB
Python
1171 lines
43 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Direct camera service.
|
|
|
|
Purpose:
|
|
- Main camera backend for AI_Photographer.
|
|
- Does not use teleimager streaming.
|
|
- Opens the camera directly through RealSense SDK or OpenCV.
|
|
- Saves captured images into the configured output folder.
|
|
- Provides the web UI/API used for preview, capture, download, and delete.
|
|
|
|
Run:
|
|
python3 Core/direct_camera_service.py
|
|
|
|
Optional:
|
|
python3 Core/direct_camera_service.py --camera auto --port 8091
|
|
python3 Core/direct_camera_service.py --camera realsense --port 8091
|
|
python3 Core/direct_camera_service.py --camera /dev/video0 --port 8091
|
|
CAMERA_DEVICE=/dev/video2 python3 Core/direct_camera_service.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
from datetime import datetime
|
|
import io
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import socket
|
|
import threading
|
|
import time
|
|
import urllib.parse
|
|
import zipfile
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Optional, Union
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
try:
|
|
import pyrealsense2 as rs # type: ignore
|
|
_HAS_RS = True
|
|
except Exception:
|
|
rs = None # type: ignore
|
|
_HAS_RS = False
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent.parent
|
|
DEFAULT_SAMPLES_DIR = BASE_DIR / "photos" / "samples"
|
|
WEB_DIR = BASE_DIR / "Web"
|
|
DIRECT_CAMERA_HTML = WEB_DIR / "direct_camera.html"
|
|
DIRECT_CAMERA_CSS = WEB_DIR / "direct_camera.css"
|
|
DIRECT_CAMERA_JS = WEB_DIR / "direct_camera.js"
|
|
CONFIG_JSON = BASE_DIR / "Data" / "Settings" / "config.json"
|
|
LEGACY_CONFIG_JSONS = (
|
|
BASE_DIR / "Data" / "config.json",
|
|
BASE_DIR / "Scripts" / "config.json",
|
|
)
|
|
DEFAULT_PREFERRED_REALSENSE_SERIAL = "243622071722"
|
|
|
|
|
|
def safe_name(name: str) -> str:
|
|
return name.strip().replace("\\", "/").split("/")[-1]
|
|
|
|
|
|
def build_ordered_datetime_name(out_dir: Path, prefix: str, ext: str) -> Path:
|
|
safe_prefix = re.sub(r"[^A-Za-z0-9_-]+", "_", (prefix or "").strip()).strip("_") or "photo"
|
|
safe_ext = (ext or "jpg").strip().lstrip(".") or "jpg"
|
|
order = sum(
|
|
1
|
|
for p in out_dir.iterdir()
|
|
if p.is_file() and p.suffix.lower() in (".jpg", ".jpeg", ".png", ".webp")
|
|
) + 1
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return out_dir / f"{order:04d}_{timestamp}_{safe_prefix}.{safe_ext}"
|
|
|
|
|
|
def list_photos(folder: Path) -> list[dict]:
|
|
items = []
|
|
folder.mkdir(parents=True, exist_ok=True)
|
|
for p in sorted(folder.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
|
|
if not p.is_file() or p.suffix.lower() not in (".jpg", ".jpeg", ".png", ".webp"):
|
|
continue
|
|
st = p.stat()
|
|
items.append({"name": p.name, "size": st.st_size, "mtime": st.st_mtime})
|
|
return items
|
|
|
|
|
|
def get_local_ip() -> str:
|
|
try:
|
|
host_info = socket.gethostbyname_ex(socket.gethostname())
|
|
for ip in host_info[2]:
|
|
if ip and not ip.startswith("127."):
|
|
return ip
|
|
except Exception:
|
|
pass
|
|
|
|
for target in ("10.255.255.255", "192.168.1.1", "8.8.8.8"):
|
|
s = None
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.settimeout(0.2)
|
|
s.connect((target, 80))
|
|
ip = s.getsockname()[0]
|
|
if ip:
|
|
return ip
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
try:
|
|
if s is not None:
|
|
s.close()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
return socket.gethostbyname("localhost")
|
|
except Exception:
|
|
return "127.0.0.1"
|
|
|
|
|
|
def list_video_devices() -> list[str]:
|
|
return [str(p) for p in sorted(Path("/dev").glob("video*"))]
|
|
|
|
|
|
def list_video_device_info() -> list[dict]:
|
|
items = []
|
|
for devnode in sorted(Path("/dev").glob("video*")):
|
|
sysdir = Path("/sys/class/video4linux") / devnode.name
|
|
items.append(
|
|
{
|
|
"value": str(devnode),
|
|
"label": str(devnode),
|
|
"name": _read_text(sysdir / "name"),
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def _read_text(path: Path) -> str:
|
|
try:
|
|
return path.read_text(encoding="utf-8", errors="ignore").strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def list_realsense_video_devices() -> list[str]:
|
|
matches = []
|
|
for devnode in sorted(Path("/dev").glob("video*")):
|
|
sysdir = Path("/sys/class/video4linux") / devnode.name
|
|
name = _read_text(sysdir / "name").lower()
|
|
usb_dir = (sysdir / "device").resolve()
|
|
vendor = ""
|
|
for probe in [usb_dir, usb_dir.parent, usb_dir.parent.parent]:
|
|
if probe == probe.parent:
|
|
break
|
|
vendor = _read_text(probe / "idVendor")
|
|
if vendor:
|
|
break
|
|
if "realsense" in name and vendor.lower() in ("8086", "32902"):
|
|
matches.append(str(devnode))
|
|
return matches
|
|
|
|
|
|
def list_realsense_serials() -> list[str]:
|
|
if not _HAS_RS:
|
|
return []
|
|
try:
|
|
ctx = rs.context()
|
|
devices = ctx.query_devices()
|
|
serials = []
|
|
for dev in devices:
|
|
try:
|
|
serials.append(str(dev.get_info(rs.camera_info.serial_number)))
|
|
except Exception:
|
|
continue
|
|
return serials
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def list_realsense_devices() -> list[dict]:
|
|
if not _HAS_RS:
|
|
return []
|
|
items = []
|
|
try:
|
|
ctx = rs.context()
|
|
for dev in ctx.query_devices():
|
|
try:
|
|
name = str(dev.get_info(rs.camera_info.name))
|
|
except Exception:
|
|
name = "Intel RealSense"
|
|
try:
|
|
serial = str(dev.get_info(rs.camera_info.serial_number))
|
|
except Exception:
|
|
serial = ""
|
|
try:
|
|
usb_type = str(dev.get_info(rs.camera_info.usb_type_descriptor))
|
|
except Exception:
|
|
usb_type = ""
|
|
try:
|
|
physical_port = str(dev.get_info(rs.camera_info.physical_port))
|
|
except Exception:
|
|
physical_port = ""
|
|
items.append(
|
|
{
|
|
"name": name,
|
|
"serial": serial,
|
|
"usb_type": usb_type,
|
|
"physical_port": physical_port,
|
|
"value": f"realsense:{serial}" if serial else "realsense",
|
|
"label": f"{name} [{serial}]" if serial else name,
|
|
}
|
|
)
|
|
except Exception:
|
|
return []
|
|
return items
|
|
|
|
|
|
def parse_realsense_source(source: Union[str, int, None]) -> Optional[str]:
|
|
if not isinstance(source, str):
|
|
return None
|
|
s = source.strip()
|
|
if s.lower().startswith("realsense:"):
|
|
serial = s.split(":", 1)[1].strip()
|
|
return serial or None
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_teleimager_realsense_serial() -> str:
|
|
candidates = []
|
|
env_cfg = os.environ.get("TELEIMAGER_CONFIG", "").strip()
|
|
if env_cfg:
|
|
candidates.append(Path(env_cfg).expanduser())
|
|
home = Path.home()
|
|
script_dir = Path(__file__).resolve().parent
|
|
project_root = script_dir.parent
|
|
candidates.append(home / "teleimager" / "cam_config_server.yaml")
|
|
candidates.append(project_root.parent / "teleimager" / "cam_config_server.yaml")
|
|
candidates.append(script_dir / "cam_config_server.yaml")
|
|
candidates.append(home / "Robot-Unitree Main Folders" / "teleimager" / "cam_config_server.yaml")
|
|
|
|
for path in candidates:
|
|
try:
|
|
if not path.exists():
|
|
continue
|
|
text = path.read_text(encoding="utf-8", errors="ignore")
|
|
m = re.search(r"serial_number:\s*\"?([0-9A-Za-z_-]+)\"?", text)
|
|
if m:
|
|
return m.group(1).strip()
|
|
except Exception:
|
|
continue
|
|
return ""
|
|
|
|
|
|
def read_config_preferred_realsense_serial() -> str:
|
|
for path in (CONFIG_JSON, *LEGACY_CONFIG_JSONS):
|
|
try:
|
|
if not path.exists():
|
|
continue
|
|
raw = json.loads(path.read_text(encoding="utf-8"))
|
|
if not isinstance(raw, dict):
|
|
continue
|
|
camera_cfg = raw.get("camera")
|
|
if not isinstance(camera_cfg, dict):
|
|
continue
|
|
serial = str(camera_cfg.get("preferred_realsense_serial", "")).strip()
|
|
if serial:
|
|
return serial
|
|
except Exception:
|
|
continue
|
|
return ""
|
|
|
|
|
|
def resolve_preferred_realsense_serial_snapshot() -> tuple[str, str, str]:
|
|
forced = os.environ.get("REALSENSE_SERIAL", "").strip()
|
|
env_preferred = os.environ.get("PREFERRED_REALSENSE_SERIAL", "").strip()
|
|
config_preferred = read_config_preferred_realsense_serial()
|
|
configured = read_teleimager_realsense_serial()
|
|
|
|
if forced:
|
|
preferred = forced
|
|
elif env_preferred:
|
|
preferred = env_preferred
|
|
elif config_preferred:
|
|
preferred = config_preferred
|
|
else:
|
|
preferred = DEFAULT_PREFERRED_REALSENSE_SERIAL
|
|
|
|
return preferred, config_preferred, configured
|
|
|
|
|
|
def write_config_preferred_realsense_serial(serial: str) -> str:
|
|
raw = {}
|
|
for cfg_path in (CONFIG_JSON, *LEGACY_CONFIG_JSONS):
|
|
try:
|
|
if cfg_path.exists():
|
|
loaded = json.loads(cfg_path.read_text(encoding="utf-8"))
|
|
if isinstance(loaded, dict):
|
|
raw = loaded
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
camera_cfg = raw.get("camera")
|
|
if not isinstance(camera_cfg, dict):
|
|
camera_cfg = {}
|
|
camera_cfg["preferred_realsense_serial"] = str(serial or "").strip()
|
|
raw["camera"] = camera_cfg
|
|
CONFIG_JSON.parent.mkdir(parents=True, exist_ok=True)
|
|
CONFIG_JSON.write_text(json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
return str(camera_cfg["preferred_realsense_serial"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DirectCamera:
|
|
def __init__(
|
|
self,
|
|
source: Optional[Union[str, int]] = None,
|
|
width: int = 960,
|
|
height: int = 540,
|
|
fps: int = 15,
|
|
preview_quality: int = 60,
|
|
):
|
|
self.source = self._resolve_source(source)
|
|
self.width = int(width)
|
|
self.height = int(height)
|
|
self.fps = int(fps)
|
|
self.preview_quality = int(preview_quality)
|
|
|
|
self._stop = threading.Event()
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._cap = None
|
|
self._rs_pipeline = None
|
|
self._rs_align = None
|
|
self._lock = threading.Lock()
|
|
self._latest_frame = None
|
|
self._latest_jpeg: Optional[bytes] = None
|
|
self._last_error = ""
|
|
self._frame_ts = 0.0
|
|
self._camera_source_in_use: Optional[Union[str, int]] = None
|
|
self._backend = "opencv"
|
|
self._active_profile: Optional[str] = None
|
|
self._preferred_realsense_serial = ""
|
|
self._config_realsense_serial = ""
|
|
self._configured_realsense_serial = ""
|
|
self._realsense_serial = self._preferred_realsense_serial
|
|
self._reopen_after = 0.0
|
|
self._reopen_delay_sec = 2.0
|
|
self._consecutive_read_failures = 0
|
|
self._max_read_failures_before_reopen = int(os.environ.get("CAMERA_MAX_READ_FAILS", "20"))
|
|
self._refresh_realsense_preferences()
|
|
|
|
def _refresh_realsense_preferences(self):
|
|
preferred, config_preferred, configured = resolve_preferred_realsense_serial_snapshot()
|
|
self._preferred_realsense_serial = preferred
|
|
self._config_realsense_serial = config_preferred
|
|
self._configured_realsense_serial = configured
|
|
if not self._realsense_serial:
|
|
self._realsense_serial = preferred
|
|
|
|
def _resolve_source(self, source: Optional[Union[str, int]]) -> Union[str, int]:
|
|
if source is None:
|
|
env_device = os.environ.get("CAMERA_DEVICE", "").strip()
|
|
if env_device:
|
|
source = env_device
|
|
else:
|
|
source = os.environ.get("CAMERA_INDEX", "auto").strip() or "auto"
|
|
|
|
if isinstance(source, str):
|
|
s = source.strip()
|
|
if s.lower() == "auto":
|
|
return "auto"
|
|
if s.lower() in ("realsense", "rs"):
|
|
return "realsense"
|
|
if s.lower().startswith("realsense:"):
|
|
serial = s.split(":", 1)[1].strip()
|
|
return f"realsense:{serial}" if serial else "realsense"
|
|
try:
|
|
return int(s)
|
|
except ValueError:
|
|
return s
|
|
return int(source)
|
|
|
|
def _candidate_sources(self) -> list[Union[str, int]]:
|
|
src = self.source
|
|
out: list[Union[str, int]] = [src]
|
|
|
|
if src == "realsense" or (isinstance(src, str) and src.startswith("realsense:")):
|
|
for sp in list_realsense_video_devices():
|
|
if sp not in out:
|
|
out.append(sp)
|
|
return out
|
|
|
|
if isinstance(src, str) and src.startswith("/dev/video"):
|
|
try:
|
|
idx = int(src.replace("/dev/video", ""))
|
|
out.append(idx)
|
|
except Exception:
|
|
pass
|
|
|
|
scan_env = os.environ.get("CAMERA_FALLBACK_SCAN", "").strip().lower()
|
|
if scan_env:
|
|
do_scan = scan_env not in ("0", "false", "no", "off")
|
|
else:
|
|
do_scan = src == "auto"
|
|
if src == "auto" or do_scan:
|
|
if _HAS_RS and "realsense" not in out:
|
|
out.append("realsense")
|
|
for p in sorted(Path("/dev").glob("video*")):
|
|
sp = str(p)
|
|
if sp not in out:
|
|
out.append(sp)
|
|
try:
|
|
idx = int(sp.replace("/dev/video", ""))
|
|
if idx not in out:
|
|
out.append(idx)
|
|
except Exception:
|
|
pass
|
|
for idx in range(6):
|
|
if idx not in out:
|
|
out.append(idx)
|
|
return out
|
|
|
|
def _open_capture(self, source: Union[str, int]):
|
|
if isinstance(source, str) and source.startswith("/dev/video"):
|
|
return cv2.VideoCapture(source, cv2.CAP_V4L2)
|
|
return cv2.VideoCapture(source)
|
|
|
|
def _looks_rgb(self, frame) -> bool:
|
|
return frame is not None and getattr(frame, "ndim", 0) == 3 and frame.shape[2] == 3
|
|
|
|
def _open_realsense(self):
|
|
if not _HAS_RS:
|
|
raise RuntimeError("pyrealsense2 not installed")
|
|
self._refresh_realsense_preferences()
|
|
profiles = [
|
|
(self.width, self.height, self.fps),
|
|
(640, 480, 30),
|
|
(640, 480, 15),
|
|
(1280, 720, 30),
|
|
(1280, 720, 15),
|
|
(848, 480, 30),
|
|
(848, 480, 15),
|
|
]
|
|
available_serials = list_realsense_serials()
|
|
requested_serial = parse_realsense_source(self.source)
|
|
serial = ""
|
|
if requested_serial and requested_serial in available_serials:
|
|
serial = requested_serial
|
|
elif self._preferred_realsense_serial and self._preferred_realsense_serial in available_serials:
|
|
serial = self._preferred_realsense_serial
|
|
elif self._configured_realsense_serial and self._configured_realsense_serial in available_serials:
|
|
serial = self._configured_realsense_serial
|
|
elif self._realsense_serial and self._realsense_serial in available_serials:
|
|
serial = self._realsense_serial
|
|
elif available_serials:
|
|
serial = available_serials[0]
|
|
|
|
if serial:
|
|
self._realsense_serial = serial
|
|
tried = []
|
|
seen = set()
|
|
for w, h, fps in profiles:
|
|
key = (int(w), int(h), int(fps))
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
tried.append(f"{w}x{h}@{fps}")
|
|
pipe = rs.pipeline()
|
|
try:
|
|
cfg = rs.config()
|
|
if serial:
|
|
cfg.enable_device(serial)
|
|
cfg.enable_stream(rs.stream.color, int(w), int(h), rs.format.bgr8, int(fps))
|
|
pipe.start(cfg)
|
|
self._rs_align = rs.align(rs.stream.color)
|
|
self._realsense_serial = serial or self._realsense_serial
|
|
self._active_profile = f"{w}x{h}@{fps}"
|
|
return pipe
|
|
except Exception:
|
|
try:
|
|
pipe.stop()
|
|
except Exception:
|
|
pass
|
|
serial_note = serial or "(auto)"
|
|
raise RuntimeError(f"RealSense color stream open failed. serial={serial_note} tried: {', '.join(tried)}")
|
|
|
|
|
|
|
|
|
|
|
|
def _configure_capture(self, cap):
|
|
try:
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
|
|
except Exception:
|
|
pass
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
|
|
cap.set(cv2.CAP_PROP_FPS, self.fps)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _close_locked(self):
|
|
if self._rs_pipeline is not None:
|
|
try:
|
|
self._rs_pipeline.stop()
|
|
except Exception:
|
|
pass
|
|
self._rs_pipeline = None
|
|
self._rs_align = None
|
|
if self._cap is not None:
|
|
try:
|
|
self._cap.release()
|
|
except Exception:
|
|
pass
|
|
self._cap = None
|
|
self._camera_source_in_use = None
|
|
self._backend = "opencv"
|
|
self._active_profile = None
|
|
|
|
|
|
|
|
|
|
|
|
def _read_frame_locked(self):
|
|
if self._rs_pipeline is not None:
|
|
frames = self._rs_pipeline.wait_for_frames(timeout_ms=1000)
|
|
if self._rs_align is not None:
|
|
frames = self._rs_align.process(frames)
|
|
color_frame = frames.get_color_frame()
|
|
if not color_frame:
|
|
return False, None
|
|
frame = np.asanyarray(color_frame.get_data())
|
|
if frame is None or frame.size == 0:
|
|
return False, None
|
|
return True, frame
|
|
if self._cap is None:
|
|
return False, None
|
|
return self._cap.read()
|
|
|
|
def _ensure_open_locked(self) -> bool:
|
|
now = time.time()
|
|
if self._rs_pipeline is not None:
|
|
return True
|
|
if self._cap is not None and self._cap.isOpened():
|
|
return True
|
|
if now < self._reopen_after:
|
|
return False
|
|
|
|
self._last_error = "Opening camera..."
|
|
self._close_locked()
|
|
tried = []
|
|
last_open_error = ""
|
|
for source in self._candidate_sources():
|
|
tried.append(str(source))
|
|
if source == "realsense":
|
|
pipe = None
|
|
try:
|
|
pipe = self._open_realsense()
|
|
frames = pipe.wait_for_frames(timeout_ms=1500)
|
|
color_frame = frames.get_color_frame()
|
|
if not color_frame:
|
|
pipe.stop()
|
|
continue
|
|
frame = np.asanyarray(color_frame.get_data())
|
|
if frame is None or frame.size == 0:
|
|
pipe.stop()
|
|
continue
|
|
self._rs_pipeline = pipe
|
|
self._backend = "realsense"
|
|
self._camera_source_in_use = source
|
|
self._last_error = ""
|
|
self._consecutive_read_failures = 0
|
|
self._latest_frame = frame.copy()
|
|
return True
|
|
except Exception as e:
|
|
last_open_error = str(e)
|
|
self._last_error = str(e)
|
|
try:
|
|
if pipe is not None:
|
|
pipe.stop()
|
|
except Exception:
|
|
pass
|
|
continue
|
|
else:
|
|
cap = None
|
|
try:
|
|
cap = self._open_capture(source)
|
|
except Exception:
|
|
cap = None
|
|
if cap is None or not cap.isOpened():
|
|
try:
|
|
if cap is not None:
|
|
cap.release()
|
|
except Exception:
|
|
pass
|
|
continue
|
|
|
|
self._configure_capture(cap)
|
|
|
|
ok, frame = cap.read()
|
|
if not ok or frame is None:
|
|
cap.release()
|
|
continue
|
|
if self.source == "realsense" and not self._looks_rgb(frame):
|
|
cap.release()
|
|
continue
|
|
|
|
self._cap = cap
|
|
self._backend = "opencv"
|
|
self._camera_source_in_use = source
|
|
self._last_error = ""
|
|
self._consecutive_read_failures = 0
|
|
return True
|
|
|
|
self._last_error = last_open_error or f"Could not open camera. Tried: {', '.join(tried)}"
|
|
self._reopen_after = now + self._reopen_delay_sec
|
|
return False
|
|
|
|
def start(self):
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(target=self._run, daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=1.0)
|
|
with self._lock:
|
|
self._close_locked()
|
|
|
|
def _run(self):
|
|
while not self._stop.is_set():
|
|
frame = None
|
|
frame_ts = 0.0
|
|
with self._lock:
|
|
if not self._ensure_open_locked():
|
|
time.sleep(0.2)
|
|
continue
|
|
try:
|
|
ok, frame = self._read_frame_locked()
|
|
if not ok or frame is None:
|
|
self._consecutive_read_failures += 1
|
|
self._last_error = f"Camera read failed ({self._consecutive_read_failures}/{self._max_read_failures_before_reopen})"
|
|
if self._consecutive_read_failures < self._max_read_failures_before_reopen:
|
|
time.sleep(0.05)
|
|
continue
|
|
self._close_locked()
|
|
self._reopen_after = time.time() + self._reopen_delay_sec
|
|
self._consecutive_read_failures = 0
|
|
time.sleep(0.2)
|
|
continue
|
|
self._consecutive_read_failures = 0
|
|
self._latest_frame = frame.copy()
|
|
frame_ts = time.time()
|
|
except Exception as e:
|
|
self._last_error = str(e)
|
|
self._close_locked()
|
|
self._reopen_after = time.time() + self._reopen_delay_sec
|
|
self._consecutive_read_failures = 0
|
|
frame = None
|
|
|
|
if frame is not None:
|
|
try:
|
|
ok, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), self.preview_quality])
|
|
if ok and jpg is not None:
|
|
with self._lock:
|
|
self._latest_jpeg = jpg.tobytes()
|
|
self._frame_ts = frame_ts
|
|
except Exception:
|
|
pass
|
|
|
|
time.sleep(max(0.02, 1.0 / max(self.fps, 1)))
|
|
|
|
def latest_jpeg(self) -> Optional[bytes]:
|
|
return self._latest_jpeg
|
|
|
|
def capture(self, out_dir: Path, prefix: str = "sample", ext: str = "jpg") -> Path:
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
if not self._lock.acquire(timeout=0.5):
|
|
raise RuntimeError("camera busy")
|
|
try:
|
|
if not self._ensure_open_locked():
|
|
raise RuntimeError(self._last_error or "camera unavailable")
|
|
|
|
frame = self._latest_frame
|
|
if frame is None:
|
|
ok, frame = self._read_frame_locked()
|
|
if not ok or frame is None:
|
|
raise RuntimeError("camera read failed during capture")
|
|
|
|
ok, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
|
|
if not ok or jpg is None:
|
|
raise RuntimeError("cv2.imencode failed")
|
|
|
|
out_path = build_ordered_datetime_name(out_dir, prefix, ext)
|
|
out_path.write_bytes(jpg.tobytes())
|
|
return out_path
|
|
finally:
|
|
self._lock.release()
|
|
|
|
def reconfigure(self, width: int, height: int, fps: int) -> dict:
|
|
width = int(width)
|
|
height = int(height)
|
|
fps = int(fps)
|
|
if width < 1 or height < 1 or fps < 1:
|
|
raise RuntimeError("width, height, and fps must be positive integers")
|
|
|
|
if not self._lock.acquire(timeout=1.0):
|
|
raise RuntimeError("camera busy")
|
|
try:
|
|
self.width = width
|
|
self.height = height
|
|
self.fps = fps
|
|
self._last_error = f"Reconfiguring camera to {width}x{height}@{fps}..."
|
|
self._latest_frame = None
|
|
self._latest_jpeg = None
|
|
self._frame_ts = 0.0
|
|
self._close_locked()
|
|
self._reopen_after = 0.0
|
|
self._consecutive_read_failures = 0
|
|
finally:
|
|
self._lock.release()
|
|
|
|
deadline = time.time() + 6.0
|
|
while time.time() < deadline and not self._stop.is_set():
|
|
status = self.status()
|
|
if status["ok"]:
|
|
return status
|
|
time.sleep(0.05)
|
|
return self.status()
|
|
|
|
def set_source(self, source: Union[str, int]) -> dict:
|
|
resolved = self._resolve_source(source)
|
|
if not self._lock.acquire(timeout=1.0):
|
|
raise RuntimeError("camera busy")
|
|
try:
|
|
self.source = resolved
|
|
self._last_error = f"Switching camera source to {resolved}..."
|
|
self._latest_frame = None
|
|
self._latest_jpeg = None
|
|
self._frame_ts = 0.0
|
|
self._close_locked()
|
|
self._reopen_after = 0.0
|
|
self._consecutive_read_failures = 0
|
|
finally:
|
|
self._lock.release()
|
|
|
|
deadline = time.time() + 6.0
|
|
while time.time() < deadline and not self._stop.is_set():
|
|
status = self.status()
|
|
if status["ok"]:
|
|
return status
|
|
time.sleep(0.05)
|
|
return self.status()
|
|
|
|
def set_preferred_realsense_serial(self, serial: str) -> dict:
|
|
value = str(serial or "").strip()
|
|
if not self._lock.acquire(timeout=1.0):
|
|
raise RuntimeError("camera busy")
|
|
try:
|
|
self._preferred_realsense_serial = value
|
|
self._config_realsense_serial = value
|
|
if self.source == "realsense":
|
|
self._last_error = f"Switching preferred RealSense to {value or '(auto)'}..."
|
|
self._latest_frame = None
|
|
self._latest_jpeg = None
|
|
self._frame_ts = 0.0
|
|
self._close_locked()
|
|
self._reopen_after = 0.0
|
|
self._consecutive_read_failures = 0
|
|
finally:
|
|
self._lock.release()
|
|
|
|
deadline = time.time() + 6.0
|
|
while time.time() < deadline and not self._stop.is_set():
|
|
status = self.status()
|
|
if status["ok"] or self.source != "realsense":
|
|
return status
|
|
time.sleep(0.05)
|
|
return self.status()
|
|
|
|
def status(self) -> dict:
|
|
self._refresh_realsense_preferences()
|
|
return {
|
|
"ok": bool(self._rs_pipeline is not None or (self._cap is not None and self._cap.isOpened())),
|
|
"requested_source": str(self.source),
|
|
"source": str(self._camera_source_in_use if self._camera_source_in_use is not None else self.source),
|
|
"backend": self._backend,
|
|
"profile": self._active_profile,
|
|
"requested_profile": f"{self.width}x{self.height}@{self.fps}",
|
|
"requested_width": self.width,
|
|
"requested_height": self.height,
|
|
"requested_fps": self.fps,
|
|
"preferred_realsense_serial": self._preferred_realsense_serial,
|
|
"config_realsense_serial": self._config_realsense_serial,
|
|
"configured_realsense_serial": self._configured_realsense_serial,
|
|
"realsense_serial": self._realsense_serial,
|
|
"last_error": self._last_error,
|
|
"frame_time": self._frame_ts,
|
|
}
|
|
|
|
|
|
def static_file_response(path: Path) -> tuple[bytes, str]:
|
|
raw = path.read_bytes()
|
|
content_type, _ = mimetypes.guess_type(str(path))
|
|
return raw, content_type or "application/octet-stream"
|
|
|
|
|
|
class SamplesHandler(BaseHTTPRequestHandler):
|
|
server_version = "DirectCameraSamples/1.0"
|
|
|
|
def _json(self, payload: dict, status: int = 200):
|
|
raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(raw)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(raw)
|
|
|
|
def _text(self, text: str, status: int = 200, content_type: str = "text/plain; charset=utf-8"):
|
|
raw = text.encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Content-Length", str(len(raw)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(raw)
|
|
|
|
def _binary(self, raw: bytes, status: int = 200, content_type: str = "application/octet-stream", cache_control: str = "no-store"):
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Content-Length", str(len(raw)))
|
|
self.send_header("Cache-Control", cache_control)
|
|
self.end_headers()
|
|
self.wfile.write(raw)
|
|
|
|
def log_message(self, fmt: str, *args):
|
|
path = getattr(self, "path", "")
|
|
if path.startswith("/api/health") or path.startswith("/api/photos"):
|
|
return
|
|
print(f"HTTP {self.command} {path} -> {args[1]}")
|
|
|
|
def do_GET(self):
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
path = parsed.path
|
|
q = urllib.parse.parse_qs(parsed.query)
|
|
|
|
if path == "/":
|
|
raw, content_type = static_file_response(DIRECT_CAMERA_HTML)
|
|
return self._binary(raw, content_type=f"{content_type}; charset=utf-8")
|
|
|
|
if path == "/static/direct_camera.css":
|
|
raw, content_type = static_file_response(DIRECT_CAMERA_CSS)
|
|
return self._binary(raw, content_type=content_type)
|
|
|
|
if path == "/static/direct_camera.js":
|
|
raw, content_type = static_file_response(DIRECT_CAMERA_JS)
|
|
return self._binary(raw, content_type=f"{content_type}; charset=utf-8")
|
|
|
|
if path == "/api/health":
|
|
return self._json(
|
|
{
|
|
"ok": True,
|
|
"camera": self.server.camera.status(),
|
|
"samples_dir": str(self.server.samples_dir),
|
|
}
|
|
)
|
|
|
|
if path == "/api/cameras":
|
|
camera_status = self.server.camera.status()
|
|
realsense_devices = list_realsense_devices()
|
|
video_devices = list_video_device_info()
|
|
preferred_serial = str(camera_status.get("preferred_realsense_serial") or "").strip()
|
|
active_serial = str(camera_status.get("realsense_serial") or "").strip()
|
|
|
|
preferred_device = None
|
|
active_device = None
|
|
rs_items = []
|
|
for item in realsense_devices:
|
|
serial = str(item.get("serial") or "").strip()
|
|
is_preferred = bool(serial and serial == preferred_serial)
|
|
is_active = bool(serial and serial == active_serial)
|
|
if is_preferred:
|
|
preferred_device = item
|
|
if is_active:
|
|
active_device = item
|
|
label = str(item.get("label") or item.get("value") or "").strip()
|
|
tags = []
|
|
if is_preferred:
|
|
tags.append("default")
|
|
if is_active:
|
|
tags.append("live")
|
|
if tags:
|
|
label = f"{label} ({', '.join(tags)})"
|
|
rs_items.append({**item, "kind": "realsense", "is_preferred": is_preferred, "is_active": is_active, "label": label})
|
|
|
|
generic_label = "Preferred RealSense"
|
|
if preferred_serial:
|
|
generic_label += f" [{preferred_serial}]"
|
|
generic_label += " (default)"
|
|
|
|
options = [{"value": "realsense", "label": generic_label, "kind": "realsense-default"}]
|
|
options.extend(rs_items)
|
|
options.extend({**item, "kind": "video"} for item in video_devices)
|
|
options.append({"value": "auto", "label": "Auto fallback", "kind": "auto"})
|
|
return self._json(
|
|
{
|
|
"ok": True,
|
|
"selected_source": str(self.server.camera.source),
|
|
"active_source": str(camera_status.get("source") or self.server.camera.source),
|
|
"preferred_realsense_serial": preferred_serial,
|
|
"configured_realsense_serial": str(camera_status.get("configured_realsense_serial") or ""),
|
|
"config_realsense_serial": str(camera_status.get("config_realsense_serial") or ""),
|
|
"realsense_devices": rs_items,
|
|
"video_devices": video_devices,
|
|
"preferred_device": preferred_device,
|
|
"active_device": active_device,
|
|
"options": options,
|
|
}
|
|
)
|
|
|
|
if path == "/api/photos":
|
|
return self._json({"items": list_photos(self.server.samples_dir)})
|
|
|
|
if path == "/api/capture":
|
|
try:
|
|
prefix = ((q.get("prefix") or [self.server.capture_prefix])[0] or self.server.capture_prefix).strip() or self.server.capture_prefix
|
|
ext = ((q.get("ext") or [self.server.capture_ext])[0] or self.server.capture_ext).strip().lstrip(".") or self.server.capture_ext
|
|
saved = self.server.camera.capture(self.server.samples_dir, prefix=prefix, ext=ext)
|
|
return self._json({"ok": True, "name": saved.name, "path": str(saved)})
|
|
except Exception as e:
|
|
return self._json({"ok": False, "error": str(e)}, status=500)
|
|
|
|
if path == "/api/frame.jpg":
|
|
jpg = self.server.camera.latest_jpeg()
|
|
if jpg is None:
|
|
return self._text("camera not ready", status=503)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "image/jpeg")
|
|
self.send_header("Content-Length", str(len(jpg)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(jpg)
|
|
return
|
|
|
|
if path == "/api/set_resolution":
|
|
try:
|
|
width = int((q.get("width") or [""])[0])
|
|
height = int((q.get("height") or [""])[0])
|
|
fps = int((q.get("fps") or [""])[0])
|
|
except Exception:
|
|
return self._json({"ok": False, "error": "width, height, and fps are required integers"}, status=400)
|
|
try:
|
|
status = self.server.camera.reconfigure(width, height, fps)
|
|
return self._json({"ok": True, "camera": status})
|
|
except Exception as e:
|
|
return self._json({"ok": False, "error": str(e)}, status=500)
|
|
|
|
if path == "/api/set_camera":
|
|
source = (q.get("source") or [""])[0].strip()
|
|
if not source:
|
|
return self._json({"ok": False, "error": "missing source"}, status=400)
|
|
try:
|
|
status = self.server.camera.set_source(source)
|
|
return self._json({"ok": True, "camera": status})
|
|
except Exception as e:
|
|
return self._json({"ok": False, "error": str(e)}, status=500)
|
|
|
|
if path == "/api/set_preferred_camera":
|
|
serial = (q.get("serial") or [""])[0].strip()
|
|
try:
|
|
saved = write_config_preferred_realsense_serial(serial)
|
|
status = self.server.camera.set_preferred_realsense_serial(saved)
|
|
return self._json({"ok": True, "serial": saved, "camera": status})
|
|
except Exception as e:
|
|
return self._json({"ok": False, "error": str(e)}, status=500)
|
|
|
|
if path == "/api/delete":
|
|
name = safe_name((q.get("name") or [""])[0])
|
|
if not name:
|
|
return self._json({"ok": False, "error": "missing name"}, status=400)
|
|
p = (self.server.samples_dir / name).resolve()
|
|
if p.parent != self.server.samples_dir.resolve() or not p.exists():
|
|
return self._json({"ok": False, "error": "not found"}, status=404)
|
|
try:
|
|
p.unlink()
|
|
return self._json({"ok": True, "name": name})
|
|
except Exception as e:
|
|
return self._json({"ok": False, "error": str(e)}, status=500)
|
|
|
|
if path == "/api/download":
|
|
name = safe_name((q.get("name") or [""])[0])
|
|
if not name:
|
|
return self._text("missing name", status=400)
|
|
p = (self.server.samples_dir / name).resolve()
|
|
if p.parent != self.server.samples_dir.resolve() or not p.exists():
|
|
return self._text("not found", status=404)
|
|
data = p.read_bytes()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/octet-stream")
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.send_header("Content-Disposition", f'attachment; filename="{p.name}"')
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
return
|
|
|
|
if path == "/api/download_all.zip":
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
for item in list_photos(self.server.samples_dir):
|
|
p = self.server.samples_dir / item["name"]
|
|
zf.write(p, arcname=p.name)
|
|
raw = buf.getvalue()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/zip")
|
|
self.send_header("Content-Length", str(len(raw)))
|
|
self.send_header("Content-Disposition", 'attachment; filename="samples.zip"')
|
|
self.end_headers()
|
|
self.wfile.write(raw)
|
|
return
|
|
|
|
if path.startswith("/samples/"):
|
|
name = safe_name(path[len("/samples/"):])
|
|
p = (self.server.samples_dir / name).resolve()
|
|
if p.parent != self.server.samples_dir.resolve() or not p.exists():
|
|
return self._text("not found", status=404)
|
|
data = p.read_bytes()
|
|
ext = p.suffix.lower()
|
|
ct = "image/jpeg" if ext in (".jpg", ".jpeg") else "image/png"
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", ct)
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
return
|
|
|
|
if path == "/preview.mjpg":
|
|
self.send_response(200)
|
|
self.send_header("Age", "0")
|
|
self.send_header("Cache-Control", "no-cache, private")
|
|
self.send_header("Pragma", "no-cache")
|
|
self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=frame")
|
|
self.end_headers()
|
|
try:
|
|
while True:
|
|
jpg = self.server.camera.latest_jpeg()
|
|
if jpg is None:
|
|
time.sleep(0.05)
|
|
continue
|
|
self.wfile.write(b"--frame\r\n")
|
|
self.wfile.write(b"Content-Type: image/jpeg\r\n")
|
|
self.wfile.write(f"Content-Length: {len(jpg)}\r\n\r\n".encode("ascii"))
|
|
self.wfile.write(jpg)
|
|
self.wfile.write(b"\r\n")
|
|
time.sleep(0.12)
|
|
except BrokenPipeError:
|
|
return
|
|
except ConnectionResetError:
|
|
return
|
|
except Exception:
|
|
return
|
|
|
|
return self._text("not found", status=404)
|
|
|
|
|
|
class SamplesServer(ThreadingHTTPServer):
|
|
def __init__(
|
|
self,
|
|
addr,
|
|
handler_cls,
|
|
camera: DirectCamera,
|
|
samples_dir: Path,
|
|
capture_prefix: str = "sample",
|
|
capture_ext: str = "jpg",
|
|
):
|
|
super().__init__(addr, handler_cls)
|
|
self.camera = camera
|
|
self.samples_dir = samples_dir
|
|
self.capture_prefix = capture_prefix
|
|
self.capture_ext = capture_ext
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Standalone direct-camera samples server")
|
|
parser.add_argument("--host", default="0.0.0.0")
|
|
parser.add_argument("--port", type=int, default=int(os.environ.get("PORT", "8091")))
|
|
parser.add_argument("--camera", default=os.environ.get("CAMERA_DEVICE", os.environ.get("CAMERA_INDEX", "auto")))
|
|
parser.add_argument("--width", type=int, default=int(os.environ.get("FRAME_WIDTH", "960")))
|
|
parser.add_argument("--height", type=int, default=int(os.environ.get("FRAME_HEIGHT", "540")))
|
|
parser.add_argument("--fps", type=int, default=int(os.environ.get("FPS", "15")))
|
|
parser.add_argument("--samples-dir", default=os.environ.get("SAMPLES_DIR", str(DEFAULT_SAMPLES_DIR)))
|
|
parser.add_argument("--capture-prefix", default=os.environ.get("CAPTURE_PREFIX", "sample"))
|
|
parser.add_argument("--capture-ext", default=os.environ.get("CAPTURE_EXT", "jpg"))
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
samples_dir = Path(args.samples_dir).expanduser().resolve()
|
|
samples_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if str(args.camera).strip().lower() in ("realsense", "rs") and not _HAS_RS:
|
|
raise SystemExit(
|
|
"RealSense backend requested but pyrealsense2 is not installed in this environment. "
|
|
"Run this script from the teleimager conda env or install pyrealsense2 here."
|
|
)
|
|
|
|
camera = DirectCamera(source=args.camera, width=args.width, height=args.height, fps=args.fps)
|
|
camera.start()
|
|
|
|
server = SamplesServer(
|
|
(args.host, args.port),
|
|
SamplesHandler,
|
|
camera=camera,
|
|
samples_dir=samples_dir,
|
|
capture_prefix=str(args.capture_prefix).strip() or "sample",
|
|
capture_ext=str(args.capture_ext).strip().lstrip(".") or "jpg",
|
|
)
|
|
ip = get_local_ip()
|
|
print(f"Video devices: {', '.join(list_video_devices()) or '(none found)'}")
|
|
if _HAS_RS:
|
|
print(f"RealSense serials: {', '.join(list_realsense_serials()) or '(none found)'}")
|
|
print(f"Preferred RealSense serial: {camera._preferred_realsense_serial}")
|
|
print(f"Samples dir: {samples_dir}")
|
|
print(f"Camera source: {args.camera}")
|
|
print(f"Capture naming: prefix={server.capture_prefix} ext={server.capture_ext}")
|
|
print(f"Open: http://{ip}:{args.port}/")
|
|
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
server.shutdown()
|
|
camera.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|