#!/usr/bin/env python3 """ Direct camera service. Purpose: - Main camera backend for AI_Photographer. - Does not use teleimager streaming. - Opens the camera directly through RealSense SDK or OpenCV. - Saves captured images into the configured output folder. - Provides the web UI/API used for preview, capture, download, and delete. Run: python3 Core/direct_camera_service.py Optional: python3 Core/direct_camera_service.py --camera auto --port 8091 python3 Core/direct_camera_service.py --camera realsense --port 8091 python3 Core/direct_camera_service.py --camera /dev/video0 --port 8091 CAMERA_DEVICE=/dev/video2 python3 Core/direct_camera_service.py """ from __future__ import annotations import argparse from datetime import datetime import io import json import mimetypes import os import re import socket import threading import time import urllib.parse import zipfile from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Optional, Union import cv2 import numpy as np try: import pyrealsense2 as rs # type: ignore _HAS_RS = True except Exception: rs = None # type: ignore _HAS_RS = False BASE_DIR = Path(__file__).resolve().parent.parent DEFAULT_SAMPLES_DIR = BASE_DIR / "photos" / "samples" WEB_DIR = BASE_DIR / "Web" DIRECT_CAMERA_HTML = WEB_DIR / "direct_camera.html" DIRECT_CAMERA_CSS = WEB_DIR / "direct_camera.css" DIRECT_CAMERA_JS = WEB_DIR / "direct_camera.js" CONFIG_JSON = BASE_DIR / "Data" / "Settings" / "config.json" LEGACY_CONFIG_JSONS = ( BASE_DIR / "Data" / "config.json", BASE_DIR / "Scripts" / "config.json", ) DEFAULT_PREFERRED_REALSENSE_SERIAL = "243622071722" def safe_name(name: str) -> str: return name.strip().replace("\\", "/").split("/")[-1] def build_ordered_datetime_name(out_dir: Path, prefix: str, ext: str) -> Path: safe_prefix = re.sub(r"[^A-Za-z0-9_-]+", "_", (prefix or "").strip()).strip("_") or "photo" safe_ext = (ext or "jpg").strip().lstrip(".") or "jpg" order = sum( 1 for p in out_dir.iterdir() if p.is_file() and p.suffix.lower() in (".jpg", ".jpeg", ".png", ".webp") ) + 1 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return out_dir / f"{order:04d}_{timestamp}_{safe_prefix}.{safe_ext}" def list_photos(folder: Path) -> list[dict]: items = [] folder.mkdir(parents=True, exist_ok=True) for p in sorted(folder.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if not p.is_file() or p.suffix.lower() not in (".jpg", ".jpeg", ".png", ".webp"): continue st = p.stat() items.append({"name": p.name, "size": st.st_size, "mtime": st.st_mtime}) return items def get_local_ip() -> str: try: host_info = socket.gethostbyname_ex(socket.gethostname()) for ip in host_info[2]: if ip and not ip.startswith("127."): return ip except Exception: pass for target in ("10.255.255.255", "192.168.1.1", "8.8.8.8"): s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(0.2) s.connect((target, 80)) ip = s.getsockname()[0] if ip: return ip except Exception: pass finally: try: if s is not None: s.close() except Exception: pass try: return socket.gethostbyname("localhost") except Exception: return "127.0.0.1" def list_video_devices() -> list[str]: return [str(p) for p in sorted(Path("/dev").glob("video*"))] def list_video_device_info() -> list[dict]: items = [] for devnode in sorted(Path("/dev").glob("video*")): sysdir = Path("/sys/class/video4linux") / devnode.name items.append( { "value": str(devnode), "label": str(devnode), "name": _read_text(sysdir / "name"), } ) return items def _read_text(path: Path) -> str: try: return path.read_text(encoding="utf-8", errors="ignore").strip() except Exception: return "" def list_realsense_video_devices() -> list[str]: matches = [] for devnode in sorted(Path("/dev").glob("video*")): sysdir = Path("/sys/class/video4linux") / devnode.name name = _read_text(sysdir / "name").lower() usb_dir = (sysdir / "device").resolve() vendor = "" for probe in [usb_dir, usb_dir.parent, usb_dir.parent.parent]: if probe == probe.parent: break vendor = _read_text(probe / "idVendor") if vendor: break if "realsense" in name and vendor.lower() in ("8086", "32902"): matches.append(str(devnode)) return matches def list_realsense_serials() -> list[str]: if not _HAS_RS: return [] try: ctx = rs.context() devices = ctx.query_devices() serials = [] for dev in devices: try: serials.append(str(dev.get_info(rs.camera_info.serial_number))) except Exception: continue return serials except Exception: return [] def list_realsense_devices() -> list[dict]: if not _HAS_RS: return [] items = [] try: ctx = rs.context() for dev in ctx.query_devices(): try: name = str(dev.get_info(rs.camera_info.name)) except Exception: name = "Intel RealSense" try: serial = str(dev.get_info(rs.camera_info.serial_number)) except Exception: serial = "" try: usb_type = str(dev.get_info(rs.camera_info.usb_type_descriptor)) except Exception: usb_type = "" try: physical_port = str(dev.get_info(rs.camera_info.physical_port)) except Exception: physical_port = "" items.append( { "name": name, "serial": serial, "usb_type": usb_type, "physical_port": physical_port, "value": f"realsense:{serial}" if serial else "realsense", "label": f"{name} [{serial}]" if serial else name, } ) except Exception: return [] return items def parse_realsense_source(source: Union[str, int, None]) -> Optional[str]: if not isinstance(source, str): return None s = source.strip() if s.lower().startswith("realsense:"): serial = s.split(":", 1)[1].strip() return serial or None return None def read_teleimager_realsense_serial() -> str: candidates = [] env_cfg = os.environ.get("TELEIMAGER_CONFIG", "").strip() if env_cfg: candidates.append(Path(env_cfg).expanduser()) home = Path.home() script_dir = Path(__file__).resolve().parent project_root = script_dir.parent candidates.append(home / "teleimager" / "cam_config_server.yaml") candidates.append(project_root.parent / "teleimager" / "cam_config_server.yaml") candidates.append(script_dir / "cam_config_server.yaml") candidates.append(home / "Robot-Unitree Main Folders" / "teleimager" / "cam_config_server.yaml") for path in candidates: try: if not path.exists(): continue text = path.read_text(encoding="utf-8", errors="ignore") m = re.search(r"serial_number:\s*\"?([0-9A-Za-z_-]+)\"?", text) if m: return m.group(1).strip() except Exception: continue return "" def read_config_preferred_realsense_serial() -> str: for path in (CONFIG_JSON, *LEGACY_CONFIG_JSONS): try: if not path.exists(): continue raw = json.loads(path.read_text(encoding="utf-8")) if not isinstance(raw, dict): continue camera_cfg = raw.get("camera") if not isinstance(camera_cfg, dict): continue serial = str(camera_cfg.get("preferred_realsense_serial", "")).strip() if serial: return serial except Exception: continue return "" def resolve_preferred_realsense_serial_snapshot() -> tuple[str, str, str]: forced = os.environ.get("REALSENSE_SERIAL", "").strip() env_preferred = os.environ.get("PREFERRED_REALSENSE_SERIAL", "").strip() config_preferred = read_config_preferred_realsense_serial() configured = read_teleimager_realsense_serial() if forced: preferred = forced elif env_preferred: preferred = env_preferred elif config_preferred: preferred = config_preferred else: preferred = DEFAULT_PREFERRED_REALSENSE_SERIAL return preferred, config_preferred, configured def write_config_preferred_realsense_serial(serial: str) -> str: raw = {} for cfg_path in (CONFIG_JSON, *LEGACY_CONFIG_JSONS): try: if cfg_path.exists(): loaded = json.loads(cfg_path.read_text(encoding="utf-8")) if isinstance(loaded, dict): raw = loaded break except Exception: pass camera_cfg = raw.get("camera") if not isinstance(camera_cfg, dict): camera_cfg = {} camera_cfg["preferred_realsense_serial"] = str(serial or "").strip() raw["camera"] = camera_cfg CONFIG_JSON.parent.mkdir(parents=True, exist_ok=True) CONFIG_JSON.write_text(json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8") return str(camera_cfg["preferred_realsense_serial"]) class DirectCamera: def __init__( self, source: Optional[Union[str, int]] = None, width: int = 960, height: int = 540, fps: int = 15, preview_quality: int = 60, ): self.source = self._resolve_source(source) self.width = int(width) self.height = int(height) self.fps = int(fps) self.preview_quality = int(preview_quality) self._stop = threading.Event() self._thread: Optional[threading.Thread] = None self._cap = None self._rs_pipeline = None self._rs_align = None self._lock = threading.Lock() self._latest_frame = None self._latest_jpeg: Optional[bytes] = None self._last_error = "" self._frame_ts = 0.0 self._camera_source_in_use: Optional[Union[str, int]] = None self._backend = "opencv" self._active_profile: Optional[str] = None self._preferred_realsense_serial = "" self._config_realsense_serial = "" self._configured_realsense_serial = "" self._realsense_serial = self._preferred_realsense_serial self._reopen_after = 0.0 self._reopen_delay_sec = 2.0 self._consecutive_read_failures = 0 self._max_read_failures_before_reopen = int(os.environ.get("CAMERA_MAX_READ_FAILS", "20")) self._refresh_realsense_preferences() def _refresh_realsense_preferences(self): preferred, config_preferred, configured = resolve_preferred_realsense_serial_snapshot() self._preferred_realsense_serial = preferred self._config_realsense_serial = config_preferred self._configured_realsense_serial = configured if not self._realsense_serial: self._realsense_serial = preferred def _resolve_source(self, source: Optional[Union[str, int]]) -> Union[str, int]: if source is None: env_device = os.environ.get("CAMERA_DEVICE", "").strip() if env_device: source = env_device else: source = os.environ.get("CAMERA_INDEX", "auto").strip() or "auto" if isinstance(source, str): s = source.strip() if s.lower() == "auto": return "auto" if s.lower() in ("realsense", "rs"): return "realsense" if s.lower().startswith("realsense:"): serial = s.split(":", 1)[1].strip() return f"realsense:{serial}" if serial else "realsense" try: return int(s) except ValueError: return s return int(source) def _candidate_sources(self) -> list[Union[str, int]]: src = self.source out: list[Union[str, int]] = [src] if src == "realsense" or (isinstance(src, str) and src.startswith("realsense:")): for sp in list_realsense_video_devices(): if sp not in out: out.append(sp) return out if isinstance(src, str) and src.startswith("/dev/video"): try: idx = int(src.replace("/dev/video", "")) out.append(idx) except Exception: pass scan_env = os.environ.get("CAMERA_FALLBACK_SCAN", "").strip().lower() if scan_env: do_scan = scan_env not in ("0", "false", "no", "off") else: do_scan = src == "auto" if src == "auto" or do_scan: if _HAS_RS and "realsense" not in out: out.append("realsense") for p in sorted(Path("/dev").glob("video*")): sp = str(p) if sp not in out: out.append(sp) try: idx = int(sp.replace("/dev/video", "")) if idx not in out: out.append(idx) except Exception: pass for idx in range(6): if idx not in out: out.append(idx) return out def _open_capture(self, source: Union[str, int]): if isinstance(source, str) and source.startswith("/dev/video"): return cv2.VideoCapture(source, cv2.CAP_V4L2) return cv2.VideoCapture(source) def _looks_rgb(self, frame) -> bool: return frame is not None and getattr(frame, "ndim", 0) == 3 and frame.shape[2] == 3 def _open_realsense(self): if not _HAS_RS: raise RuntimeError("pyrealsense2 not installed") self._refresh_realsense_preferences() profiles = [ (self.width, self.height, self.fps), (640, 480, 30), (640, 480, 15), (1280, 720, 30), (1280, 720, 15), (848, 480, 30), (848, 480, 15), ] available_serials = list_realsense_serials() requested_serial = parse_realsense_source(self.source) serial = "" if requested_serial and requested_serial in available_serials: serial = requested_serial elif self._preferred_realsense_serial and self._preferred_realsense_serial in available_serials: serial = self._preferred_realsense_serial elif self._configured_realsense_serial and self._configured_realsense_serial in available_serials: serial = self._configured_realsense_serial elif self._realsense_serial and self._realsense_serial in available_serials: serial = self._realsense_serial elif available_serials: serial = available_serials[0] if serial: self._realsense_serial = serial tried = [] seen = set() for w, h, fps in profiles: key = (int(w), int(h), int(fps)) if key in seen: continue seen.add(key) tried.append(f"{w}x{h}@{fps}") pipe = rs.pipeline() try: cfg = rs.config() if serial: cfg.enable_device(serial) cfg.enable_stream(rs.stream.color, int(w), int(h), rs.format.bgr8, int(fps)) pipe.start(cfg) self._rs_align = rs.align(rs.stream.color) self._realsense_serial = serial or self._realsense_serial self._active_profile = f"{w}x{h}@{fps}" return pipe except Exception: try: pipe.stop() except Exception: pass serial_note = serial or "(auto)" raise RuntimeError(f"RealSense color stream open failed. serial={serial_note} tried: {', '.join(tried)}") def _configure_capture(self, cap): try: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) except Exception: pass try: cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) except Exception: pass cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) cap.set(cv2.CAP_PROP_FPS, self.fps) def _close_locked(self): if self._rs_pipeline is not None: try: self._rs_pipeline.stop() except Exception: pass self._rs_pipeline = None self._rs_align = None if self._cap is not None: try: self._cap.release() except Exception: pass self._cap = None self._camera_source_in_use = None self._backend = "opencv" self._active_profile = None def _read_frame_locked(self): if self._rs_pipeline is not None: frames = self._rs_pipeline.wait_for_frames(timeout_ms=1000) if self._rs_align is not None: frames = self._rs_align.process(frames) color_frame = frames.get_color_frame() if not color_frame: return False, None frame = np.asanyarray(color_frame.get_data()) if frame is None or frame.size == 0: return False, None return True, frame if self._cap is None: return False, None return self._cap.read() def _ensure_open_locked(self) -> bool: now = time.time() if self._rs_pipeline is not None: return True if self._cap is not None and self._cap.isOpened(): return True if now < self._reopen_after: return False self._last_error = "Opening camera..." self._close_locked() tried = [] last_open_error = "" for source in self._candidate_sources(): tried.append(str(source)) if source == "realsense": pipe = None try: pipe = self._open_realsense() frames = pipe.wait_for_frames(timeout_ms=1500) color_frame = frames.get_color_frame() if not color_frame: pipe.stop() continue frame = np.asanyarray(color_frame.get_data()) if frame is None or frame.size == 0: pipe.stop() continue self._rs_pipeline = pipe self._backend = "realsense" self._camera_source_in_use = source self._last_error = "" self._consecutive_read_failures = 0 self._latest_frame = frame.copy() return True except Exception as e: last_open_error = str(e) self._last_error = str(e) try: if pipe is not None: pipe.stop() except Exception: pass continue else: cap = None try: cap = self._open_capture(source) except Exception: cap = None if cap is None or not cap.isOpened(): try: if cap is not None: cap.release() except Exception: pass continue self._configure_capture(cap) ok, frame = cap.read() if not ok or frame is None: cap.release() continue if self.source == "realsense" and not self._looks_rgb(frame): cap.release() continue self._cap = cap self._backend = "opencv" self._camera_source_in_use = source self._last_error = "" self._consecutive_read_failures = 0 return True self._last_error = last_open_error or f"Could not open camera. Tried: {', '.join(tried)}" self._reopen_after = now + self._reopen_delay_sec return False def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._stop.set() if self._thread: self._thread.join(timeout=1.0) with self._lock: self._close_locked() def _run(self): while not self._stop.is_set(): frame = None frame_ts = 0.0 with self._lock: if not self._ensure_open_locked(): time.sleep(0.2) continue try: ok, frame = self._read_frame_locked() if not ok or frame is None: self._consecutive_read_failures += 1 self._last_error = f"Camera read failed ({self._consecutive_read_failures}/{self._max_read_failures_before_reopen})" if self._consecutive_read_failures < self._max_read_failures_before_reopen: time.sleep(0.05) continue self._close_locked() self._reopen_after = time.time() + self._reopen_delay_sec self._consecutive_read_failures = 0 time.sleep(0.2) continue self._consecutive_read_failures = 0 self._latest_frame = frame.copy() frame_ts = time.time() except Exception as e: self._last_error = str(e) self._close_locked() self._reopen_after = time.time() + self._reopen_delay_sec self._consecutive_read_failures = 0 frame = None if frame is not None: try: ok, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), self.preview_quality]) if ok and jpg is not None: with self._lock: self._latest_jpeg = jpg.tobytes() self._frame_ts = frame_ts except Exception: pass time.sleep(max(0.02, 1.0 / max(self.fps, 1))) def latest_jpeg(self) -> Optional[bytes]: return self._latest_jpeg def capture(self, out_dir: Path, prefix: str = "sample", ext: str = "jpg") -> Path: out_dir.mkdir(parents=True, exist_ok=True) if not self._lock.acquire(timeout=0.5): raise RuntimeError("camera busy") try: if not self._ensure_open_locked(): raise RuntimeError(self._last_error or "camera unavailable") frame = self._latest_frame if frame is None: ok, frame = self._read_frame_locked() if not ok or frame is None: raise RuntimeError("camera read failed during capture") ok, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) if not ok or jpg is None: raise RuntimeError("cv2.imencode failed") out_path = build_ordered_datetime_name(out_dir, prefix, ext) out_path.write_bytes(jpg.tobytes()) return out_path finally: self._lock.release() def reconfigure(self, width: int, height: int, fps: int) -> dict: width = int(width) height = int(height) fps = int(fps) if width < 1 or height < 1 or fps < 1: raise RuntimeError("width, height, and fps must be positive integers") if not self._lock.acquire(timeout=1.0): raise RuntimeError("camera busy") try: self.width = width self.height = height self.fps = fps self._last_error = f"Reconfiguring camera to {width}x{height}@{fps}..." self._latest_frame = None self._latest_jpeg = None self._frame_ts = 0.0 self._close_locked() self._reopen_after = 0.0 self._consecutive_read_failures = 0 finally: self._lock.release() deadline = time.time() + 6.0 while time.time() < deadline and not self._stop.is_set(): status = self.status() if status["ok"]: return status time.sleep(0.05) return self.status() def set_source(self, source: Union[str, int]) -> dict: resolved = self._resolve_source(source) if not self._lock.acquire(timeout=1.0): raise RuntimeError("camera busy") try: self.source = resolved self._last_error = f"Switching camera source to {resolved}..." self._latest_frame = None self._latest_jpeg = None self._frame_ts = 0.0 self._close_locked() self._reopen_after = 0.0 self._consecutive_read_failures = 0 finally: self._lock.release() deadline = time.time() + 6.0 while time.time() < deadline and not self._stop.is_set(): status = self.status() if status["ok"]: return status time.sleep(0.05) return self.status() def set_preferred_realsense_serial(self, serial: str) -> dict: value = str(serial or "").strip() if not self._lock.acquire(timeout=1.0): raise RuntimeError("camera busy") try: self._preferred_realsense_serial = value self._config_realsense_serial = value if self.source == "realsense": self._last_error = f"Switching preferred RealSense to {value or '(auto)'}..." self._latest_frame = None self._latest_jpeg = None self._frame_ts = 0.0 self._close_locked() self._reopen_after = 0.0 self._consecutive_read_failures = 0 finally: self._lock.release() deadline = time.time() + 6.0 while time.time() < deadline and not self._stop.is_set(): status = self.status() if status["ok"] or self.source != "realsense": return status time.sleep(0.05) return self.status() def status(self) -> dict: self._refresh_realsense_preferences() return { "ok": bool(self._rs_pipeline is not None or (self._cap is not None and self._cap.isOpened())), "requested_source": str(self.source), "source": str(self._camera_source_in_use if self._camera_source_in_use is not None else self.source), "backend": self._backend, "profile": self._active_profile, "requested_profile": f"{self.width}x{self.height}@{self.fps}", "requested_width": self.width, "requested_height": self.height, "requested_fps": self.fps, "preferred_realsense_serial": self._preferred_realsense_serial, "config_realsense_serial": self._config_realsense_serial, "configured_realsense_serial": self._configured_realsense_serial, "realsense_serial": self._realsense_serial, "last_error": self._last_error, "frame_time": self._frame_ts, } def static_file_response(path: Path) -> tuple[bytes, str]: raw = path.read_bytes() content_type, _ = mimetypes.guess_type(str(path)) return raw, content_type or "application/octet-stream" class SamplesHandler(BaseHTTPRequestHandler): server_version = "DirectCameraSamples/1.0" def _json(self, payload: dict, status: int = 200): raw = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(raw))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(raw) def _text(self, text: str, status: int = 200, content_type: str = "text/plain; charset=utf-8"): raw = text.encode("utf-8") self.send_response(status) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(raw))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(raw) def _binary(self, raw: bytes, status: int = 200, content_type: str = "application/octet-stream", cache_control: str = "no-store"): self.send_response(status) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(raw))) self.send_header("Cache-Control", cache_control) self.end_headers() self.wfile.write(raw) def log_message(self, fmt: str, *args): path = getattr(self, "path", "") if path.startswith("/api/health") or path.startswith("/api/photos"): return print(f"HTTP {self.command} {path} -> {args[1]}") def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path q = urllib.parse.parse_qs(parsed.query) if path == "/": raw, content_type = static_file_response(DIRECT_CAMERA_HTML) return self._binary(raw, content_type=f"{content_type}; charset=utf-8") if path == "/static/direct_camera.css": raw, content_type = static_file_response(DIRECT_CAMERA_CSS) return self._binary(raw, content_type=content_type) if path == "/static/direct_camera.js": raw, content_type = static_file_response(DIRECT_CAMERA_JS) return self._binary(raw, content_type=f"{content_type}; charset=utf-8") if path == "/api/health": return self._json( { "ok": True, "camera": self.server.camera.status(), "samples_dir": str(self.server.samples_dir), } ) if path == "/api/cameras": camera_status = self.server.camera.status() realsense_devices = list_realsense_devices() video_devices = list_video_device_info() preferred_serial = str(camera_status.get("preferred_realsense_serial") or "").strip() active_serial = str(camera_status.get("realsense_serial") or "").strip() preferred_device = None active_device = None rs_items = [] for item in realsense_devices: serial = str(item.get("serial") or "").strip() is_preferred = bool(serial and serial == preferred_serial) is_active = bool(serial and serial == active_serial) if is_preferred: preferred_device = item if is_active: active_device = item label = str(item.get("label") or item.get("value") or "").strip() tags = [] if is_preferred: tags.append("default") if is_active: tags.append("live") if tags: label = f"{label} ({', '.join(tags)})" rs_items.append({**item, "kind": "realsense", "is_preferred": is_preferred, "is_active": is_active, "label": label}) generic_label = "Preferred RealSense" if preferred_serial: generic_label += f" [{preferred_serial}]" generic_label += " (default)" options = [{"value": "realsense", "label": generic_label, "kind": "realsense-default"}] options.extend(rs_items) options.extend({**item, "kind": "video"} for item in video_devices) options.append({"value": "auto", "label": "Auto fallback", "kind": "auto"}) return self._json( { "ok": True, "selected_source": str(self.server.camera.source), "active_source": str(camera_status.get("source") or self.server.camera.source), "preferred_realsense_serial": preferred_serial, "configured_realsense_serial": str(camera_status.get("configured_realsense_serial") or ""), "config_realsense_serial": str(camera_status.get("config_realsense_serial") or ""), "realsense_devices": rs_items, "video_devices": video_devices, "preferred_device": preferred_device, "active_device": active_device, "options": options, } ) if path == "/api/photos": return self._json({"items": list_photos(self.server.samples_dir)}) if path == "/api/capture": try: prefix = ((q.get("prefix") or [self.server.capture_prefix])[0] or self.server.capture_prefix).strip() or self.server.capture_prefix ext = ((q.get("ext") or [self.server.capture_ext])[0] or self.server.capture_ext).strip().lstrip(".") or self.server.capture_ext saved = self.server.camera.capture(self.server.samples_dir, prefix=prefix, ext=ext) return self._json({"ok": True, "name": saved.name, "path": str(saved)}) except Exception as e: return self._json({"ok": False, "error": str(e)}, status=500) if path == "/api/frame.jpg": jpg = self.server.camera.latest_jpeg() if jpg is None: return self._text("camera not ready", status=503) self.send_response(200) self.send_header("Content-Type", "image/jpeg") self.send_header("Content-Length", str(len(jpg))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(jpg) return if path == "/api/set_resolution": try: width = int((q.get("width") or [""])[0]) height = int((q.get("height") or [""])[0]) fps = int((q.get("fps") or [""])[0]) except Exception: return self._json({"ok": False, "error": "width, height, and fps are required integers"}, status=400) try: status = self.server.camera.reconfigure(width, height, fps) return self._json({"ok": True, "camera": status}) except Exception as e: return self._json({"ok": False, "error": str(e)}, status=500) if path == "/api/set_camera": source = (q.get("source") or [""])[0].strip() if not source: return self._json({"ok": False, "error": "missing source"}, status=400) try: status = self.server.camera.set_source(source) return self._json({"ok": True, "camera": status}) except Exception as e: return self._json({"ok": False, "error": str(e)}, status=500) if path == "/api/set_preferred_camera": serial = (q.get("serial") or [""])[0].strip() try: saved = write_config_preferred_realsense_serial(serial) status = self.server.camera.set_preferred_realsense_serial(saved) return self._json({"ok": True, "serial": saved, "camera": status}) except Exception as e: return self._json({"ok": False, "error": str(e)}, status=500) if path == "/api/delete": name = safe_name((q.get("name") or [""])[0]) if not name: return self._json({"ok": False, "error": "missing name"}, status=400) p = (self.server.samples_dir / name).resolve() if p.parent != self.server.samples_dir.resolve() or not p.exists(): return self._json({"ok": False, "error": "not found"}, status=404) try: p.unlink() return self._json({"ok": True, "name": name}) except Exception as e: return self._json({"ok": False, "error": str(e)}, status=500) if path == "/api/download": name = safe_name((q.get("name") or [""])[0]) if not name: return self._text("missing name", status=400) p = (self.server.samples_dir / name).resolve() if p.parent != self.server.samples_dir.resolve() or not p.exists(): return self._text("not found", status=404) data = p.read_bytes() self.send_response(200) self.send_header("Content-Type", "application/octet-stream") self.send_header("Content-Length", str(len(data))) self.send_header("Content-Disposition", f'attachment; filename="{p.name}"') self.end_headers() self.wfile.write(data) return if path == "/api/download_all.zip": buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for item in list_photos(self.server.samples_dir): p = self.server.samples_dir / item["name"] zf.write(p, arcname=p.name) raw = buf.getvalue() self.send_response(200) self.send_header("Content-Type", "application/zip") self.send_header("Content-Length", str(len(raw))) self.send_header("Content-Disposition", 'attachment; filename="samples.zip"') self.end_headers() self.wfile.write(raw) return if path.startswith("/samples/"): name = safe_name(path[len("/samples/"):]) p = (self.server.samples_dir / name).resolve() if p.parent != self.server.samples_dir.resolve() or not p.exists(): return self._text("not found", status=404) data = p.read_bytes() ext = p.suffix.lower() ct = "image/jpeg" if ext in (".jpg", ".jpeg") else "image/png" self.send_response(200) self.send_header("Content-Type", ct) self.send_header("Content-Length", str(len(data))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(data) return if path == "/preview.mjpg": self.send_response(200) self.send_header("Age", "0") self.send_header("Cache-Control", "no-cache, private") self.send_header("Pragma", "no-cache") self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=frame") self.end_headers() try: while True: jpg = self.server.camera.latest_jpeg() if jpg is None: time.sleep(0.05) continue self.wfile.write(b"--frame\r\n") self.wfile.write(b"Content-Type: image/jpeg\r\n") self.wfile.write(f"Content-Length: {len(jpg)}\r\n\r\n".encode("ascii")) self.wfile.write(jpg) self.wfile.write(b"\r\n") time.sleep(0.12) except BrokenPipeError: return except ConnectionResetError: return except Exception: return return self._text("not found", status=404) class SamplesServer(ThreadingHTTPServer): def __init__( self, addr, handler_cls, camera: DirectCamera, samples_dir: Path, capture_prefix: str = "sample", capture_ext: str = "jpg", ): super().__init__(addr, handler_cls) self.camera = camera self.samples_dir = samples_dir self.capture_prefix = capture_prefix self.capture_ext = capture_ext def parse_args(): parser = argparse.ArgumentParser(description="Standalone direct-camera samples server") parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--port", type=int, default=int(os.environ.get("PORT", "8091"))) parser.add_argument("--camera", default=os.environ.get("CAMERA_DEVICE", os.environ.get("CAMERA_INDEX", "auto"))) parser.add_argument("--width", type=int, default=int(os.environ.get("FRAME_WIDTH", "960"))) parser.add_argument("--height", type=int, default=int(os.environ.get("FRAME_HEIGHT", "540"))) parser.add_argument("--fps", type=int, default=int(os.environ.get("FPS", "15"))) parser.add_argument("--samples-dir", default=os.environ.get("SAMPLES_DIR", str(DEFAULT_SAMPLES_DIR))) parser.add_argument("--capture-prefix", default=os.environ.get("CAPTURE_PREFIX", "sample")) parser.add_argument("--capture-ext", default=os.environ.get("CAPTURE_EXT", "jpg")) return parser.parse_args() def main(): args = parse_args() samples_dir = Path(args.samples_dir).expanduser().resolve() samples_dir.mkdir(parents=True, exist_ok=True) if str(args.camera).strip().lower() in ("realsense", "rs") and not _HAS_RS: raise SystemExit( "RealSense backend requested but pyrealsense2 is not installed in this environment. " "Run this script from the teleimager conda env or install pyrealsense2 here." ) camera = DirectCamera(source=args.camera, width=args.width, height=args.height, fps=args.fps) camera.start() server = SamplesServer( (args.host, args.port), SamplesHandler, camera=camera, samples_dir=samples_dir, capture_prefix=str(args.capture_prefix).strip() or "sample", capture_ext=str(args.capture_ext).strip().lstrip(".") or "jpg", ) ip = get_local_ip() print(f"Video devices: {', '.join(list_video_devices()) or '(none found)'}") if _HAS_RS: print(f"RealSense serials: {', '.join(list_realsense_serials()) or '(none found)'}") print(f"Preferred RealSense serial: {camera._preferred_realsense_serial}") print(f"Samples dir: {samples_dir}") print(f"Camera source: {args.camera}") print(f"Capture naming: prefix={server.capture_prefix} ext={server.capture_ext}") print(f"Open: http://{ip}:{args.port}/") try: server.serve_forever() except KeyboardInterrupt: pass finally: server.shutdown() camera.stop() if __name__ == "__main__": main()