#!/usr/bin/env python3 """ photo_server.py --------------- HTTP photo gallery server for the configured AI_Photographer photos directory. ✅ Features: - Phone/Laptop gallery UI: GET / - Serves photos directly (open/download): GET / - Delete photo: GET /api/delete?name= - Capture via unified replay pipeline: GET /api/capture - Camera status: GET /api/status - Camera test (RealSense enumerate via camera env python): GET /api/test - Run RealSense USB fix script: GET /api/fix - CSS/HTML split: Web/gallery.html Web/style.css Served as: / -> gallery.html template (photo list injected) /static/style.css -> style.css file ⚠️ Security note: - This is an open LAN server (anyone on same WiFi can view/download/delete). Use only on trusted networks. Works well with your structure (no new folders). """ from __future__ import annotations import json import os import socket import sys import threading import tempfile import time import urllib.parse import subprocess import cgi from pathlib import Path from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from typing import Optional, Callable, Dict, Any, List from Core import settings as config from Core import audio_prompts from Core import audio_prompt_recorder from Core.error_events import get_error_counters, record_error from Core import people_registry from Server.capture_service import replay_file_integrity, take_photo_sync from Server import direct_camera_client try: from Core.Logger import Logs except Exception: Logs = None # type: ignore # ============================================================ # Defaults / Paths # ============================================================ SERVER_DIR = Path(__file__).resolve().parent PROJECT_ROOT = SERVER_DIR.parent SCRIPTS_DIR = (PROJECT_ROOT / "Scripts").resolve() APP_DATA_DIR = Path(getattr(config, "APP_DATA_DIR", PROJECT_ROOT / "Data")).resolve() DATA_SCRIPTS_DIR = Path(getattr(config, "APP_SCRIPTS_DIR", APP_DATA_DIR / "Scripts")).resolve() AUTONOMOUS_STATE_FILE = Path(getattr(config, "AUTONOMOUS_STATE_FILE", APP_DATA_DIR / "Runtime" / "autonomous_state.json")).resolve() RUNTIME_HEALTH_FILE = Path(getattr(config, "RUNTIME_HEALTH_FILE", APP_DATA_DIR / "Runtime" / "runtime_health.json")).resolve() DEFAULT_PHOTOS_DIR = Path(getattr(config, "PHOTOS_DIR", PROJECT_ROOT / "photos" / "Captures")).resolve() WEB_DIR = (PROJECT_ROOT / "Web").resolve() GALLERY_HTML = WEB_DIR / "gallery.html" STYLE_CSS = WEB_DIR / "style.css" GALLERY_JS = WEB_DIR / "gallery.js" FIX_SCRIPT_ENV = os.environ.get("FIX_SCRIPT_PATH", "").strip() _DEFAULT_RUNTIME_HOME = Path(os.environ.get("HOME", "~")).expanduser() CAMERA_ENV_PY = os.environ.get( "TELEIMAGER_PY", str((_DEFAULT_RUNTIME_HOME / "miniconda3" / "envs" / "teleimager" / "bin" / "python").resolve()), ) # Preview fallback controls PREVIEW_USE_OPENCV_FALLBACK = os.environ.get("PREVIEW_USE_OPENCV_FALLBACK", "1").strip().lower() not in ( "0", "false", "no", "off" ) PREVIEW_RETRY_SEC = float(os.environ.get("PREVIEW_RETRY_SEC", "2.0")) # Shared preview camera state to avoid reopening camera on every frame _PREVIEW_CAM_LOCK = threading.Lock() _PREVIEW_CAM: Optional[Any] = None _PREVIEW_LAST_OPEN_TRY = 0.0 _CV2_MODULE = None _CAMERA_CAPTURE_CLS = None _RECORDER_LOCK = threading.Lock() _RECORDER_STATE: Dict[str, Any] = { "running": False, "name": "", "seconds": 0.0, "output": "", "returncode": None, "ok": False, "error": "", "started_at": 0.0, "finished_at": 0.0, "stdout_tail": [], } _REPLAY_TEST_LOCK = threading.Lock() _REPLAY_TEST_STATE: Dict[str, Any] = { "running": False, "name": "", "ok": False, "error": "", "result": "", "started_at": 0.0, "finished_at": 0.0, } _AUDIO_PROMPT_RECORD_LOCK = threading.Lock() _AUDIO_PROMPT_RECORD_STATE: Dict[str, Any] = { "running": False, "key": "", "filename": "", "text": "", "ok": False, "error": "", "started_at": 0.0, "finished_at": 0.0, "result": {}, } # ============================================================ # Logging helpers # ============================================================ def _make_logger(log_name: str = "photo_server"): if Logs is None: return None lg = Logs() try: lg.LogEngine("G1_Logs", log_name) except Exception: pass return lg def _log(lg, msg: str, level: str = "info"): # level: info / warning / error if lg is not None: try: lg.print_and_log(msg, message_type=level) return except Exception: pass print(msg) # ============================================================ # Utilities # ============================================================ def _get_local_ip() -> str: """ Best-effort IP that phones can reach (same WiFi). """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "127.0.0.1" def _safe_name(name: str) -> str: """ Prevent path traversal: keep only filename (no dirs). """ name = name.strip().replace("\\", "/") name = name.split("/")[-1] return name def _fix_script_candidates() -> List[Path]: cands: List[Path] = [] if FIX_SCRIPT_ENV: p = Path(FIX_SCRIPT_ENV).expanduser() if not p.is_absolute(): p = (PROJECT_ROOT / p).resolve() cands.append(p) cands.append((SCRIPTS_DIR / "fix_realsense_usb.sh").resolve()) cands.append((PROJECT_ROOT / "fix_realsense_usb.sh").resolve()) return cands def _resolve_fix_script() -> Optional[Path]: for p in _fix_script_candidates(): if p.exists() and p.is_file(): return p return None def _get_cv2(): global _CV2_MODULE if _CV2_MODULE is None: import cv2 _CV2_MODULE = cv2 return _CV2_MODULE def _get_camera_capture_cls(): global _CAMERA_CAPTURE_CLS if _CAMERA_CAPTURE_CLS is None: from Modes.AI.camera_module import CameraCapture _CAMERA_CAPTURE_CLS = CameraCapture return _CAMERA_CAPTURE_CLS def _is_photo_file(p: Path) -> bool: if not p.is_file(): return False ext = p.suffix.lower() return ext in (".jpg", ".jpeg", ".png", ".webp") def _list_photos(folder: Path) -> List[Dict[str, Any]]: items: List[Dict[str, Any]] = [] folder.mkdir(parents=True, exist_ok=True) for p in sorted(folder.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if not _is_photo_file(p): continue st = p.stat() items.append( { "name": p.name, "size": st.st_size, "mtime": st.st_mtime, } ) return items def _group_sessions(items: List[Dict[str, Any]], gap_sec: int = 120) -> List[Dict[str, Any]]: """Group photos into sessions when gaps between photos exceed gap_sec.""" sessions = [] cur = None for it in sorted(items, key=lambda x: x["mtime"]): t = it["mtime"] if cur is None: cur = {"id": len(sessions) + 1, "start": t, "end": t, "photos": [it["name"]]} continue if t - cur["end"] > gap_sec: sessions.append(cur) cur = {"id": len(sessions) + 1, "start": t, "end": t, "photos": [it["name"]]} else: cur["photos"].append(it["name"]) cur["end"] = t if cur is not None: sessions.append(cur) return sessions def _read_text_file(path: Path) -> Optional[str]: try: return path.read_text(encoding="utf-8") except Exception: return None def _read_bytes_file(path: Path) -> Optional[bytes]: try: return path.read_bytes() except Exception: return None def _read_mode() -> str: try: m = str(config.read_runtime_mode()).strip().lower() except Exception: m = "manual" if m == "command": return "ai" if m not in ("manual", "ai"): return "manual" return m def _read_detector_backend() -> str: try: b = str(config.read_vision_detector_backend()).strip().lower() except Exception: b = "normal" if b not in ("yolo", "normal"): return "normal" return b def _strict_yolo_required() -> bool: try: return bool(config.read_vision_yolo_strict_required()) except Exception: return bool(getattr(config, "VISION_YOLO_STRICT_REQUIRED", False)) def _read_ai_readiness() -> Dict[str, Any]: from Modes.AI.vision_detector import probe_ai_readiness backend = _read_detector_backend() strict_required = _strict_yolo_required() readiness = probe_ai_readiness(backend=backend, strict_required=strict_required) state = _read_autonomous_state() ai_blocked = bool(state.get("ai_blocked", False)) ai_block_reason = str(state.get("ai_block_reason", "") or "").strip() if ai_blocked and ai_block_reason: readiness["ok"] = False readiness["block_reason"] = ai_block_reason return { "ok": bool(readiness.get("ok", False)), "strict_required": bool(strict_required), "backend": backend, "yolo_runtime": str(readiness.get("yolo_runtime", "") or ""), "yolo_loaded": bool(readiness.get("yolo_loaded", False)), "person_model_ok": bool(readiness.get("person_model_ok", False)), "face_model_ok": bool(readiness.get("face_model_ok", False)), "block_reason": str(readiness.get("block_reason", "") or ""), "person_model_path": str(readiness.get("person_model_path", "") or ""), "face_model_path": str(readiness.get("face_model_path", "") or ""), } def _read_runtime_health() -> Dict[str, Any]: default = { "ws_connected": False, "ws_state": "detached", "ws_restarts": 0, "ws_last_error": "", "mic_enabled": True, "mic_state": "idle", "mic_restarts": 0, "mic_last_error": "", "speaker_state": "idle", "speaker_restarts": 0, "speaker_last_error": "", "audio_gate_open": False, "time": 0.0, } try: if not RUNTIME_HEALTH_FILE.exists(): return default raw = json.loads(RUNTIME_HEALTH_FILE.read_text(encoding="utf-8")) if not isinstance(raw, dict): return default out = dict(default) out.update(raw) return out except Exception: return default def _read_ai_options() -> Dict[str, Any]: try: hard = bool(config.read_vision_hard_target_lock_enabled()) except Exception: hard = True try: retake = bool(config.read_vision_retake_prompt_enabled()) except Exception: retake = True try: greeting_replay_enabled = bool(config.read_vision_autonomous_greeting_replay_enabled()) except Exception: greeting_replay_enabled = True try: greeting_replay_file = str(config.read_vision_autonomous_greeting_replay_file() or "").strip() except Exception: greeting_replay_file = "right_hand_up.jsonl" try: capture_replay_enabled = bool(config.read_vision_autonomous_capture_replay_enabled()) except Exception: capture_replay_enabled = True try: face_recognition_enabled = bool(config.read_vision_face_recognition_enabled()) except Exception: face_recognition_enabled = True try: face_recognition_threshold = float(config.read_vision_face_recognition_threshold()) except Exception: face_recognition_threshold = 0.88 return { "hard_target_lock_enabled": hard, "retake_prompt_enabled": retake, "autonomous_greeting_replay_enabled": greeting_replay_enabled, "autonomous_greeting_replay_file": greeting_replay_file, "autonomous_capture_replay_enabled": capture_replay_enabled, "face_recognition_enabled": face_recognition_enabled, "face_recognition_threshold": face_recognition_threshold, "active_replay": _read_active_replay_name(), } def _read_mic_options() -> Dict[str, Any]: try: enabled = bool(config.read_gemini_mic_enabled()) except Exception: enabled = True return {"mic_enabled": enabled} def _safe_relative_name(name: str) -> str: raw = str(name or "").strip().replace("\\", "/").lstrip("/") if not raw: return "" p = Path(raw) if p.is_absolute(): return "" parts = [part for part in p.parts if part not in ("", ".")] if any(part == ".." for part in parts): return "" return Path(*parts).as_posix() if parts else "" def _normalize_replay_name(name: str, default_parent: str | None = None) -> str: safe = _safe_relative_name(name) if not safe: raise ValueError("invalid replay path") p = Path(safe) if len(p.parts) == 1 and default_parent: p = Path(default_parent) / p.name if p.suffix.lower() != ".jsonl": p = p.with_suffix(".jsonl") if p.name == config.HOME_FILE.name: raise ValueError("reserved replay name") return p.as_posix() def _resolve_replay_path(name: str) -> Path: candidate = config.resolve_replay_path(_normalize_replay_name(name)) try: candidate.relative_to(config.DATA_DIR) except Exception as e: raise ValueError("invalid replay path") from e return candidate def _is_replay_file(path: Path) -> bool: return path.is_file() and path.suffix.lower() == ".jsonl" and path.name != config.HOME_FILE.name def _read_active_replay_name() -> str: try: return config.read_selected_replay_name() except Exception: return str(config.REPLAY_FILE.name) def _list_replays() -> List[Dict[str, Any]]: items: List[Dict[str, Any]] = [] root = config.DATA_DIR.resolve() root.mkdir(parents=True, exist_ok=True) active_name = _read_active_replay_name() greeting_name = str(config.read_vision_autonomous_greeting_replay_file() or "").strip().replace("\\", "/") for path in sorted(root.rglob("*.jsonl")): if not _is_replay_file(path): continue try: rel = str(path.resolve().relative_to(root)).replace("\\", "/") except Exception: rel = path.name st = path.stat() integrity = replay_file_integrity(path) items.append( { "name": rel, "label": path.name, "size": st.st_size, "mtime": st.st_mtime, "active": rel == active_name, "greeting": rel == greeting_name or path.name == greeting_name, "trigger_ok": bool(integrity.get("trigger_ok", False)), "ok": bool(integrity.get("ok", False)), } ) return items def _replay_recorder_candidates() -> List[Path]: candidates = [] env_path = os.environ.get("REPLAY_RECORDER_SCRIPT", "").strip() if env_path: candidates.append(Path(env_path).expanduser()) cfg_raw = str(getattr(config, "REPLAY_RECORDER_SCRIPT", "") or "").strip() if cfg_raw: cfg_path = Path(cfg_raw).expanduser() candidates.append(cfg_path) candidates.append((PROJECT_ROOT.parent / "G1_Lootah" / "Manual_Recorder" / "g1_teach_v4_stable.py").resolve()) candidates.append((PROJECT_ROOT.parents[1] / "G1_Lootah" / "Manual_Recorder" / "g1_teach_v4_stable.py").resolve()) candidates.append((Path.home() / "G1_Lootah" / "Manual_Recorder" / "g1_teach_v4_stable.py").resolve()) return [p if p.is_absolute() else p.resolve() for p in candidates] def _resolve_replay_recorder_script() -> Optional[Path]: for candidate in _replay_recorder_candidates(): try: if candidate.exists() and candidate.is_file(): return candidate except Exception: continue return None def _read_recorder_state() -> Dict[str, Any]: with _RECORDER_LOCK: return dict(_RECORDER_STATE) def _set_recorder_state(**kwargs): with _RECORDER_LOCK: _RECORDER_STATE.update(kwargs) def _read_replay_test_state() -> Dict[str, Any]: with _REPLAY_TEST_LOCK: return dict(_REPLAY_TEST_STATE) def _set_replay_test_state(**kwargs): with _REPLAY_TEST_LOCK: _REPLAY_TEST_STATE.update(kwargs) def _read_audio_prompt_record_state() -> Dict[str, Any]: with _AUDIO_PROMPT_RECORD_LOCK: return dict(_AUDIO_PROMPT_RECORD_STATE) def _set_audio_prompt_record_state(**kwargs): with _AUDIO_PROMPT_RECORD_LOCK: _AUDIO_PROMPT_RECORD_STATE.update(kwargs) def _run_audio_prompt_record(key: str, text: str, filename: str): try: result = audio_prompt_recorder.record_prompt_from_text(key, text, filename=filename) _set_audio_prompt_record_state( running=False, ok=True, error="", finished_at=time.time(), result=result, ) except Exception as e: _set_audio_prompt_record_state( running=False, ok=False, error=str(e), finished_at=time.time(), result={}, ) def _run_replay_recorder(name: str, seconds: float): script = _resolve_replay_recorder_script() if script is None: _set_recorder_state( running=False, ok=False, error="recorder script not found", finished_at=time.time(), ) return try: from Modes.Manual.controller import auto_pick_iface iface = auto_pick_iface() except Exception as e: _set_recorder_state( running=False, ok=False, error=f"failed to resolve DDS interface: {e}", finished_at=time.time(), ) return output_path = _resolve_replay_path(name) output_path.parent.mkdir(parents=True, exist_ok=True) cmd = [ sys.executable, str(script), iface, "--output", str(output_path), "--seconds", str(seconds), "--home", str(config.HOME_FILE), ] try: proc = subprocess.run( cmd, input="y\nn\n", capture_output=True, text=True, timeout=max(30.0, seconds + 45.0), cwd=str(PROJECT_ROOT), ) combined = ((proc.stdout or "") + "\n" + (proc.stderr or "")).strip() tail = [line for line in combined.splitlines() if line.strip()][-25:] saved_path = "" for line in tail: marker = "💾 Saved " if marker in line and " to " in line: saved_path = line.split(" to ", 1)[1].strip() ok = proc.returncode == 0 and bool(saved_path) _set_recorder_state( running=False, ok=ok, error="" if ok else (tail[-1] if tail else f"recorder failed rc={proc.returncode}"), returncode=proc.returncode, finished_at=time.time(), output=saved_path, stdout_tail=tail, ) except Exception as e: _set_recorder_state( running=False, ok=False, error=str(e), finished_at=time.time(), stdout_tail=[], ) def _run_replay_test(name: str, replay_test_func: Callable[[str], str]): try: result = replay_test_func(name) ok = not str(result or "").startswith("[ERR]") _set_replay_test_state( running=False, ok=ok, error="" if ok else str(result or "").replace("[ERR] ", "").replace("[ERR]", "").strip(), result=str(result or ""), finished_at=time.time(), ) except Exception as e: _set_replay_test_state( running=False, ok=False, error=str(e), result="", finished_at=time.time(), ) def _update_replay_references_after_rename(old_rel: str, new_rel: str, new_path: Path): if _read_active_replay_name() == old_rel: config.write_selected_replay_name(new_rel) config.REPLAY_FILE = new_path greeting_name = str(config.read_vision_autonomous_greeting_replay_file() or "").strip().replace("\\", "/") if greeting_name == old_rel: config.write_vision_autonomous_greeting_replay_file(new_rel) def _is_greeting_replay(rel_name: str) -> bool: greeting_name = str(config.read_vision_autonomous_greeting_replay_file() or "").strip().replace("\\", "/") return rel_name == greeting_name def _mode_policy(mode: str) -> Dict[str, Any]: mode = mode if mode in ("manual", "ai") else "manual" if mode == "manual": return { "voice_request_photo": False, "description": "Manual mode: Gemini mic can stay on, but mapped photo commands and AI detection are disabled.", } if mode == "ai": return { "voice_request_photo": True, "description": "AI mode: mapped voice can request photo.", } return { "voice_request_photo": False, "description": "Unknown mode.", } def _read_autonomous_state() -> Dict[str, Any]: default: Dict[str, Any] = { "state": "IDLE", "session_id": 0, "interaction_active": False, "intent_detected": False, "detector_backend": "normal", "ai_blocked": False, "ai_block_reason": "", "person_count": 0, "face_count": 0, "group_count": 0, "group_size": 0, "group_detected": False, "max_area": 0.0, "audio_gate_open": False, "subject_id": None, "subject_visible": False, "target_lock_active": False, "target_lock_type": "", "target_lock_id": None, "target_switch_blocked_count": 0, "depth_m": None, "approach_speed_mps": 0.0, "camera_ok": False, "depth_ok": False, "camera_restarts": 0, "depth_restarts": 0, "ws_connected": False, "mic_state": "", "speaker_state": "", "cooldown_remaining": 0.0, "confirm_timeout_remaining": 0.0, "framing_timeout_remaining": 0.0, "countdown_remaining": 0.0, "retake_prompt_enabled": False, "retake_recommended": False, "retake_reason": "", "retake_count": 0, "retake_limit": 0, "recognized_person_id": "", "recognized_person_known": False, "recognized_person_new": False, "recognized_person_label": "", "recognized_person_match_score": 0.0, "recognized_person_created_date": "", "time": 0.0, } try: if not AUTONOMOUS_STATE_FILE.exists(): return default data = json.loads(AUTONOMOUS_STATE_FILE.read_text(encoding="utf-8")) if not isinstance(data, dict): return default merged = dict(default) merged.update(data) return merged except Exception: return default # ============================================================ # Capture/Test/Fix implementations # ============================================================ def _capture_via_service() -> str: """ Capture a photo using the shared capture service. """ try: return take_photo_sync(prefix="photo") except Exception as e: return f"[ERR] capture exception: {e}" def _preview_camera_source() -> Optional[str]: src = os.environ.get("CAMERA_DEVICE", "").strip() if src: return src idx = os.environ.get("CAMERA_INDEX", "").strip() return idx or None def _get_preview_frame(zmq_host: str = "127.0.0.1", zmq_port: int = 55555, video_source: Optional[str] = None): """Best-effort single frame for MJPEG preview. Tries direct camera server, then CameraCapture fallback. Returns BGR frame or None. """ global _PREVIEW_CAM, _PREVIEW_LAST_OPEN_TRY try: if direct_camera_client.is_enabled(): frame = direct_camera_client.frame_bgr(timeout=2.0) if frame is not None: return frame if not PREVIEW_USE_OPENCV_FALLBACK: return None now = time.time() with _PREVIEW_CAM_LOCK: if _PREVIEW_CAM is None and (now - _PREVIEW_LAST_OPEN_TRY) >= PREVIEW_RETRY_SEC: _PREVIEW_LAST_OPEN_TRY = now src = video_source if video_source is not None else _preview_camera_source() cam = _get_camera_capture_cls()(src) if cam.initialize(): _PREVIEW_CAM = cam else: try: cam.release() except Exception: pass _PREVIEW_CAM = None if _PREVIEW_CAM is None: return None frame = _PREVIEW_CAM.capture_frame() if frame is None: # If device dropped, close it and retry after cooldown. cap = getattr(_PREVIEW_CAM, "cap", None) if cap is None or not cap.isOpened(): try: _PREVIEW_CAM.release() except Exception: pass _PREVIEW_CAM = None return None return frame except Exception: return None def _realsense_test() -> Dict[str, Any]: """ Quick test using the camera env python: - import pyrealsense2 - count devices """ try: code = ( "import pyrealsense2 as rs\n" "ctx = rs.context()\n" "devs = list(ctx.query_devices())\n" "print(len(devs))\n" "print([d.get_info(rs.camera_info.serial_number) for d in devs])\n" ) p = subprocess.run( [CAMERA_ENV_PY, "-c", code], capture_output=True, text=True, timeout=10, ) out = (p.stdout or "").strip().splitlines() err = (p.stderr or "").strip() if p.returncode != 0: return {"ok": False, "rc": p.returncode, "error": err or "realsense test failed", "stdout": out[-20:]} count = int(out[0]) if out else 0 serials = [] if len(out) >= 2: # second line prints python list serials = out[1] return {"ok": True, "devices": count, "serials": serials, "rc": 0} except Exception as e: return {"ok": False, "error": str(e)} def _run_fix_script() -> Dict[str, Any]: """ Runs fix_realsense_usb.sh in NON-INTERACTIVE mode. If sudo password is required, it fails gracefully (no terminal prompt). """ fix_script = _resolve_fix_script() if not fix_script: checked = [str(p) for p in _fix_script_candidates()] return {"ok": False, "error": "fix script not found", "checked": checked} try: # IMPORTANT: -n = non-interactive (won't prompt for password) p = subprocess.run( ["sudo", "-n", "bash", str(fix_script)], capture_output=True, text=True, timeout=40, ) out = (p.stdout or "").strip().splitlines()[-80:] err = (p.stderr or "").strip().splitlines()[-80:] if p.returncode == 0: return {"ok": True, "rc": 0, "stdout": out, "stderr": err} # Most common case: sudo needs password joined_err = "\n".join(err).lower() if "a password is required" in joined_err or "sudo:" in joined_err: return { "ok": False, "rc": p.returncode, "error": f"sudo password required. Run manually in terminal: sudo bash {fix_script}", "stdout": out, "stderr": err, } return {"ok": False, "rc": p.returncode, "stdout": out, "stderr": err} except Exception as e: return {"ok": False, "error": str(e)} # ============================================================ # HTML template fallback (if Scripts/gallery.html missing) # ============================================================ # All HTML/CSS/JS files live in WEB_DIR. The server reads them at runtime. # ============================================================ # Main server factory # ============================================================ def start_photo_server( folder: Path = DEFAULT_PHOTOS_DIR, port: int = 8080, bind: str = "0.0.0.0", capture_func: Optional[Callable[[], str]] = None, replay_test_func: Optional[Callable[[str], str]] = None, logger=None, ) -> str: """ Starts the gallery server in a daemon thread. Returns the URL (best-effort reachable IP). If port is already in use, it logs and returns the URL anyway (assumes old server is running). """ lg = logger or _make_logger("photo_server.log") folder = folder.resolve() folder.mkdir(parents=True, exist_ok=True) ip = _get_local_ip() url = f"http://{ip}:{port}/" if capture_func is None: capture_func = _capture_via_service class Handler(SimpleHTTPRequestHandler): QUIET_PATHS = { "/api/autonomous_state", "/api/runtime_health", "/api/photos", "/api/people", "/api/audio_prompts", "/api/audio_prompt_record_status", "/api/set_audio_prompt_mode", "/api/set_audio_prompt_fallback", "/api/replays", "/api/get_replay", "/api/replay_record_status", "/api/replay_test_status", "/api/mode", "/api/ai_readiness", "/api/ai_options", "/api/detector_backend", "/api/camera_health", "/api/camera_sources", "/api/person_image", "/preview.mjpg", } # Serve files from PHOTOS folder by default def __init__(self, *args, **kwargs): super().__init__(*args, directory=str(folder), **kwargs) def log_message(self, fmt, *args): path = urllib.parse.urlparse(getattr(self, "path", "")).path if path.startswith("/static/"): return if path in self.QUIET_PATHS: return if path.endswith(".jpg") or path.endswith(".jpeg") or path.endswith(".png") or path.endswith(".zip"): return status = str(args[1]) if len(args) > 1 else "?" print(f"🌐 HTTP {getattr(self, 'command', '?')} {path} -> {status}") def _send_json(self, obj: Dict[str, Any], code: int = 200): data = json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _send_text(self, text: str, code: int = 200, ctype: str = "text/plain; charset=utf-8"): data = text.encode("utf-8") self.send_response(code) self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path q = urllib.parse.parse_qs(parsed.query) # Ignore favicon noise if path == "/favicon.ico": self.send_response(204) self.end_headers() return # Serve static CSS/JS from Web folder if path == "/static/style.css": css = _read_bytes_file(STYLE_CSS) if css is None: self.send_response(404) self.end_headers() return self.send_response(200) self.send_header("Content-Type", "text/css; charset=utf-8") self.send_header("Content-Length", str(len(css))) self.end_headers() self.wfile.write(css) return if path == "/static/gallery.js": js = _read_text_file(GALLERY_JS) if js is None: self.send_response(404) self.end_headers() return data = js.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/javascript; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) return # API endpoints if path == "/api/status": items = _list_photos(folder) res = { "ok": True, "photos_dir": str(folder), "count": len(items), "latest": items[0]["name"] if items else None, "url": url, "time": time.strftime("%Y-%m-%d %H:%M:%S"), } self._send_json(res, 200) return if path == "/api/errors": self._send_json({"ok": True, "counters": get_error_counters()}) return if path == "/api/mode": # return current mode m = _read_mode() self._send_json({"ok": True, "mode": m, "policy": _mode_policy(m)}) return if path == "/api/detector_backend": b = _read_detector_backend() strict_required = _strict_yolo_required() mode = _read_mode() locked = bool(strict_required and mode == "ai") self._send_json( { "ok": True, "backend": b, "yolo_runtime": str(config.read_vision_yolo_runtime()), "options": ["normal", "yolo"], "strict_required": strict_required, "locked": locked, } ) return if path == "/api/ai_readiness": data = _read_ai_readiness() self._send_json(data, 200 if data.get("ok") else 503) return if path == "/api/runtime_health": self._send_json({"ok": True, "health": _read_runtime_health()}) return if path == "/api/camera_health": try: self._send_json(direct_camera_client.health(timeout=3.0)) except Exception as e: self._send_json({"ok": False, "error": str(e), "camera": {}}, 503) return if path == "/api/camera_sources": try: self._send_json(direct_camera_client.cameras(timeout=3.0)) except Exception as e: self._send_json({"ok": False, "error": str(e), "options": []}, 503) return if path == "/api/set_camera_source": source = q.get("source", [""])[0].strip() if not source: self._send_json({"ok": False, "error": "missing source"}, 400) return try: self._send_json(direct_camera_client.set_source(source, timeout=8.0)) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_camera_resolution": try: width = int((q.get("width") or [""])[0]) height = int((q.get("height") or [""])[0]) fps = int((q.get("fps") or [""])[0]) except Exception: self._send_json({"ok": False, "error": "width, height, and fps are required integers"}, 400) return try: self._send_json(direct_camera_client.set_resolution(width, height, fps, timeout=8.0)) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_preferred_camera": serial = q.get("serial", [""])[0].strip() try: saved = config.write_camera_preferred_realsense_serial(serial) self._send_json(direct_camera_client.set_preferred_camera(saved, timeout=8.0)) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/mic": self._send_json({"ok": True, "options": _read_mic_options()}) return if path == "/api/ai_options": self._send_json({"ok": True, "options": _read_ai_options()}) return if path == "/api/autonomous_state": self._send_json({"ok": True, "state": _read_autonomous_state()}) return if path == "/api/people": try: self._send_json({"ok": True, "people": people_registry.list_people()}) except Exception as e: self._send_json({"ok": False, "error": str(e), "people": []}, 500) return if path == "/api/audio_prompts": try: self._send_json( { "ok": True, "dir": str(config.AUDIO_PROMPTS_DIR), "mode": str(config.read_audio_prompt_mode()), "fallback_to_gemini": bool(config.read_audio_prompts_fallback_to_gemini()), "prompts": audio_prompts.list_audio_prompts(), } ) except Exception as e: self._send_json({"ok": False, "error": str(e), "prompts": []}, 500) return if path == "/api/audio_prompt_record_status": self._send_json({"ok": True, "status": _read_audio_prompt_record_state()}) return if path == "/api/set_audio_prompt_mode": mode = q.get("mode", [""])[0] if mode == "": self._send_json({"ok": False, "error": "missing mode"}, 400) return try: value = str(config.write_audio_prompt_mode(mode)) self._send_json({"ok": True, "mode": value}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_audio_prompt_fallback": enabled = q.get("enabled", [""])[0] if enabled == "": self._send_json({"ok": False, "error": "missing enabled"}, 400) return try: value = bool(config.write_audio_prompts_fallback_to_gemini(enabled)) self._send_json({"ok": True, "fallback_to_gemini": value}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/download_audio_prompt": key = q.get("key", [""])[0].strip() if not key: self._send_json({"ok": False, "error": "missing key"}, 400) return try: prompt_path, body = audio_prompts.read_audio_prompt_bytes(key) self.send_response(200) self.send_header("Content-Type", "audio/wav") self.send_header("Content-Length", str(len(body))) self.send_header("Content-Disposition", f'attachment; filename="{prompt_path.name}"') self.end_headers() self.wfile.write(body) except FileNotFoundError: self._send_json({"ok": False, "error": "audio prompt not found"}, 404) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/delete_audio_prompt": key = q.get("key", [""])[0].strip() if not key: self._send_json({"ok": False, "error": "missing key"}, 400) return try: self._send_json(audio_prompts.delete_audio_prompt(key)) except KeyError as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/person_image": person_id = q.get("id", [""])[0].strip() kind = q.get("kind", ["face"])[0].strip().lower() if kind not in ("face", "scene"): kind = "face" data = people_registry.read_person_image(person_id, kind=kind) if data is None: self._send_json({"ok": False, "error": "person image not found"}, 404) return body, ctype, filename = data self.send_response(200) self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(body))) self.send_header("Content-Disposition", f'inline; filename="{filename}"') self.end_headers() self.wfile.write(body) return if path == "/api/delete_person": person_id = q.get("id", [""])[0].strip() if not person_id: self._send_json({"ok": False, "error": "missing id"}, 400) return try: ok = people_registry.delete_person(person_id) if not ok: self._send_json({"ok": False, "error": "person not found"}, 404) return self._send_json({"ok": True, "deleted": person_id}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/reset_people": try: count = int(people_registry.reset_people()) self._send_json({"ok": True, "deleted": count}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/download_person": person_id = q.get("id", [""])[0].strip() if not person_id: self._send_json({"ok": False, "error": "missing id"}, 400) return try: with tempfile.NamedTemporaryFile(prefix=f"{person_id}_", suffix=".zip", delete=False) as tmp: tmp_path = Path(tmp.name) archive_path = people_registry.export_person_zip(person_id, tmp_path) if archive_path is None or not archive_path.exists(): try: tmp_path.unlink() except Exception: pass self._send_json({"ok": False, "error": "person not found"}, 404) return body = archive_path.read_bytes() self.send_response(200) self.send_header("Content-Type", "application/zip") self.send_header("Content-Length", str(len(body))) self.send_header("Content-Disposition", f'attachment; filename="{person_id}.zip"') self.end_headers() self.wfile.write(body) try: archive_path.unlink() except Exception: pass except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_mode": # set mode via ?mode=ai|manual m = q.get("mode", [""])[0] if m == "command": self._send_json( { "ok": False, "error": "command mode was moved to G1_Lootah/AI_Command", }, 410, ) return if m not in ("ai", "manual"): self._send_json({"ok": False, "error": "invalid mode"}, 400) return try: m = config.write_runtime_mode(m) self._send_json({"ok": True, "mode": m, "policy": _mode_policy(m)}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_ai_options": try: hard_v = q.get("hard_target_lock_enabled", [None])[0] retake_v = q.get("retake_prompt_enabled", [None])[0] greet_enabled_v = q.get("autonomous_greeting_replay_enabled", [None])[0] greet_file_v = q.get("autonomous_greeting_replay_file", [None])[0] capture_enabled_v = q.get("autonomous_capture_replay_enabled", [None])[0] face_recognition_v = q.get("face_recognition_enabled", [None])[0] face_threshold_v = q.get("face_recognition_threshold", [None])[0] out = _read_ai_options() if hard_v is not None: out["hard_target_lock_enabled"] = bool( config.write_vision_hard_target_lock_enabled(hard_v) ) if retake_v is not None: out["retake_prompt_enabled"] = bool( config.write_vision_retake_prompt_enabled(retake_v) ) if greet_enabled_v is not None: out["autonomous_greeting_replay_enabled"] = bool( config.write_vision_autonomous_greeting_replay_enabled(greet_enabled_v) ) if greet_file_v is not None: out["autonomous_greeting_replay_file"] = str( config.write_vision_autonomous_greeting_replay_file(greet_file_v) ) if capture_enabled_v is not None: out["autonomous_capture_replay_enabled"] = bool( config.write_vision_autonomous_capture_replay_enabled(capture_enabled_v) ) if face_recognition_v is not None: out["face_recognition_enabled"] = bool( config.write_vision_face_recognition_enabled(face_recognition_v) ) if face_threshold_v is not None: out["face_recognition_threshold"] = float( config.write_vision_face_recognition_threshold(face_threshold_v) ) out["active_replay"] = _read_active_replay_name() self._send_json({"ok": True, "options": out}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_mic": try: enabled_v = q.get("enabled", [None])[0] if enabled_v is None: self._send_json({"ok": False, "error": "missing enabled"}, 400) return enabled = bool(config.write_gemini_mic_enabled(enabled_v)) self._send_json({"ok": True, "options": {"mic_enabled": enabled}}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/mode_policy": m = _read_mode() self._send_json({"ok": True, "mode": m, "policy": _mode_policy(m)}) return if path == "/api/set_detector_backend": b = q.get("backend", [""])[0] if b not in ("normal", "yolo"): self._send_json({"ok": False, "error": "invalid backend"}, 400) return try: mode = _read_mode() strict_required = _strict_yolo_required() if strict_required and mode == "ai" and b != "yolo": self._send_json( { "ok": False, "error": "Strict AI profile requires YOLO backend while mode=ai.", "backend": _read_detector_backend(), "strict_required": True, }, 409, ) return b = config.write_vision_detector_backend(b) self._send_json( { "ok": True, "backend": b, "yolo_runtime": str(config.read_vision_yolo_runtime()), "options": ["normal", "yolo"], "strict_required": strict_required, "locked": bool(strict_required and mode == "ai"), } ) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/command_info": self._send_json( { "ok": False, "error": "command mode moved", "moved_to": "G1_Lootah/AI_Command", }, 410, ) return if path == "/api/test": res = _realsense_test() self._send_json(res, 200 if res.get("ok") else 500) return if path == "/api/fix": res = _run_fix_script() self._send_json(res, 200 if res.get("ok") else 500) return if path == "/api/run_scripts_fix": # Run the Scripts/fix_realsense_usb.sh with optional mode ?mode=check|fix mode = q.get('mode', ['--fix'])[0] or '--fix' try: scripts_sh = _resolve_fix_script() if not scripts_sh: self._send_json( {"ok": False, "error": "script not found", "checked": [str(p) for p in _fix_script_candidates()]}, 404, ) return # run non-interactive p = subprocess.run(["bash", str(scripts_sh), mode], capture_output=True, text=True, timeout=60) out = (p.stdout or "").strip().splitlines() err = (p.stderr or "").strip().splitlines() ok = p.returncode == 0 resp = {"ok": ok, "rc": p.returncode, "stdout": out[-60:], "stderr": err[-60:]} self._send_json(resp, 200 if ok else 500) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/capture": # Take photo NOW (no remote) result = capture_func() ok = not result.startswith("[ERR]") self._send_json({"ok": ok, "result": result}, 200 if ok else 500) return if path == "/api/sessions": items = _list_photos(folder) sessions = _group_sessions(items) self._send_json({"ok": True, "sessions": sessions}) return if path == "/api/photos": items = _list_photos(folder) self._send_json({"ok": True, "photos": items}) return # Request a photo from autonomous mover or external monitor if path == "/api/request_photo": try: # touch a request file for external processes to pick up reqf = SCRIPTS_DIR / "request_photo.flag" reqf.write_text(str(time.time())) self._send_json({"ok": True, "requested": True}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # Clear all photos (DELETE) - destructive if path == "/api/clear_photos": try: for p in folder.iterdir(): if _is_photo_file(p): try: p.unlink() except Exception: pass self._send_json({"ok": True, "cleared": True}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # Create a ZIP archive of all photos and return path if path == "/api/download_zip": try: import shutil zip_base = folder / "_archives" zip_base.mkdir(parents=True, exist_ok=True) ts = int(time.time()) archive_name = str(zip_base / f"photos_{ts}") shutil.make_archive(archive_name, 'zip', root_dir=str(folder)) archive_file = archive_name + '.zip' # serve relative URL rel = os.path.relpath(archive_file, start=folder) self._send_json({"ok": True, "archive": rel}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # Trigger uploader now by touching a flag file if path == "/api/upload_now": try: upf = SCRIPTS_DIR / "upload_now.flag" upf.write_text(str(time.time())) self._send_json({"ok": True, "upload_now": True}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/delete": name = _safe_name(q.get("name", [""])[0]) if not name: self._send_json({"ok": False, "error": "missing name"}, 400) return target = (folder / name).resolve() # ensure inside folder try: target.relative_to(folder) except Exception: self._send_json({"ok": False, "error": "invalid path"}, 400) return if not target.exists(): self._send_json({"ok": False, "error": "not found"}, 404) return try: target.unlink() self._send_json({"ok": True, "deleted": name}, 200) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/retake": # Trigger a capture and return result. Optionally accept session_id to associate. session_id = q.get("session_id", [None])[0] result = capture_func() ok = not result.startswith("[ERR]") self._send_json({"ok": ok, "result": result, "session_id": session_id}, 200 if ok else 500) return if path == "/api/reupload": # Allow manual reupload: clears uploaded flag in upload DB so the uploader daemon retries name = _safe_name(q.get("name", [""])[0]) if not name: self._send_json({"ok": False, "error": "missing name"}, 400) return dbp = Path(config.UPLOAD_DB) if dbp.exists(): try: d = json.loads(dbp.read_text()) except Exception: d = {} else: d = {} # clear uploaded marker entry = d.get(name, {}) entry["uploaded"] = False d[name] = entry try: dbp.write_text(json.dumps(d, indent=2)) self._send_json({"ok": True, "requeued": name}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # Replay management endpoints if path == "/api/replays": try: items = _list_replays() self._send_json({"ok": True, "replays": [it["name"] for it in items], "items": items}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/replay_record_status": self._send_json({"ok": True, "status": _read_recorder_state()}) return if path == "/api/replay_test_status": self._send_json({"ok": True, "status": _read_replay_test_state()}) return if path == "/api/replay_record_start": try: if _read_mode() != "manual": self._send_json({"ok": False, "error": "replay recording is allowed only in manual mode"}, 409) return name = _normalize_replay_name(q.get("name", [""])[0]) seconds = float(q.get("seconds", ["15"])[0]) if seconds <= 0: self._send_json({"ok": False, "error": "seconds must be positive"}, 400) return cur = _read_recorder_state() if cur.get("running"): self._send_json({"ok": False, "error": "recorder already running", "status": cur}, 409) return preview_path = _resolve_replay_path(name) preview_path.parent.mkdir(parents=True, exist_ok=True) if preview_path.exists(): self._send_json({"ok": False, "error": "replay already exists"}, 409) return preview_output = str(preview_path) _set_recorder_state( running=True, ok=False, error="", name=name, seconds=float(seconds), output=preview_output, returncode=None, started_at=time.time(), finished_at=0.0, stdout_tail=[], ) threading.Thread(target=_run_replay_recorder, args=(name, seconds), daemon=True).start() self._send_json({"ok": True, "status": _read_recorder_state()}) except ValueError as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/test_replay": try: if _read_mode() != "manual": self._send_json({"ok": False, "error": "replay test is allowed only in manual mode"}, 409) return if replay_test_func is None: self._send_json({"ok": False, "error": "replay test is unavailable in this runtime"}, 503) return name = _normalize_replay_name(q.get("name", [""])[0]) candidate = _resolve_replay_path(name) if not candidate.exists(): self._send_json({"ok": False, "error": "replay not found"}, 404) return cur = _read_replay_test_state() if cur.get("running"): self._send_json({"ok": False, "error": "replay test already running", "status": cur}, 409) return _set_replay_test_state( running=True, ok=False, error="", name=name, result="", started_at=time.time(), finished_at=0.0, ) threading.Thread(target=_run_replay_test, args=(name, replay_test_func), daemon=True).start() self._send_json({"ok": True, "status": _read_replay_test_state()}) except ValueError as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/delete_replay": try: name = q.get("name", [""])[0] candidate = _resolve_replay_path(name) if not candidate.exists(): self._send_json({"ok": False, "error": "replay not found"}, 404) return active_name = _read_active_replay_name() rel = str(candidate.resolve().relative_to(config.DATA_DIR)).replace("\\", "/") if rel == active_name: self._send_json({"ok": False, "error": "cannot delete the active replay"}, 409) return if _is_greeting_replay(rel): self._send_json({"ok": False, "error": "cannot delete the configured AI greeting replay"}, 409) return candidate.unlink() self._send_json({"ok": True, "deleted": rel}) except ValueError as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/rename_replay": try: old_name = q.get("old", [""])[0] new_name = q.get("new", [""])[0] old_candidate = _resolve_replay_path(old_name) if not old_candidate.exists(): self._send_json({"ok": False, "error": "replay not found"}, 404) return old_rel = str(old_candidate.resolve().relative_to(config.DATA_DIR)).replace("\\", "/") old_parent = Path(old_rel).parent default_parent = None if str(old_parent) == "." else old_parent.as_posix() new_rel = _normalize_replay_name(new_name, default_parent=default_parent) new_candidate = _resolve_replay_path(new_rel) if new_candidate.exists(): self._send_json({"ok": False, "error": "target replay already exists"}, 409) return new_candidate.parent.mkdir(parents=True, exist_ok=True) old_candidate.rename(new_candidate) _update_replay_references_after_rename(old_rel, new_rel, new_candidate) self._send_json({"ok": True, "old": old_rel, "new": new_rel}) except ValueError as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/download_replay": try: name = q.get("name", [""])[0] candidate = _resolve_replay_path(name) if not candidate.exists(): self._send_json({"ok": False, "error": "replay not found"}, 404) return data = candidate.read_bytes() self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.send_header("Content-Disposition", f'attachment; filename="{candidate.name}"') self.end_headers() self.wfile.write(data) except ValueError as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/scripts": try: scripts_dir = DATA_SCRIPTS_DIR scripts_dir.mkdir(parents=True, exist_ok=True) files = [ p.name for p in sorted(scripts_dir.iterdir()) if p.is_file() and p.suffix.lower() in (".txt", ".json", ".jsonl") ] self._send_json({"ok": True, "scripts": files}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/script_content": # return raw text of a script by name (in Scripts/) name = q.get('name', [None])[0] if not name: self._send_json({"ok": False, "error": "missing name"}, 400) return try: scripts_dir = DATA_SCRIPTS_DIR candidate = (scripts_dir / _safe_name(name)).resolve() try: candidate.relative_to(scripts_dir) except Exception: self._send_json({"ok": False, "error": "invalid path"}, 400) return if not candidate.exists(): self._send_json({"ok": False, "error": "not found"}, 404) return txt = candidate.read_text(encoding='utf-8') self._send_json({"ok": True, "script": txt}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/get_replay": try: cur = _read_active_replay_name() self._send_json({"ok": True, "replay": cur}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/sanad_script": try: txt = _read_text_file(config.SANAD_SCRIPT_FILE) or "" self._send_json({"ok": True, "script": txt}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_replay": # set via ?name=filename.jsonl name = q.get("name", [""])[0] if not name: self._send_json({"ok": False, "error": "missing name"}, 400) return try: candidate = _resolve_replay_path(name) except ValueError as e: self._send_json({"ok": False, "error": str(e)}, 400) return if not candidate.exists(): self._send_json({"ok": False, "error": "replay not found"}, 404) return integrity = replay_file_integrity(candidate) if not integrity.get("ok"): self._send_json( {"ok": False, "error": "invalid replay file", "integrity": integrity}, 400, ) return try: rel_name = str(candidate.resolve().relative_to(config.DATA_DIR)).replace("\\", "/") config.write_selected_replay_name(rel_name) config.REPLAY_FILE = candidate resp = {"ok": True, "set": rel_name} if not integrity.get("trigger_ok", False): resp["warning"] = "replay has no trigger markers; timed capture fallback will be used" resp["integrity"] = integrity self._send_json(resp) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # MJPEG preview stream if path == "/preview.mjpg": self.send_response(200) self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=--frame") self.end_headers() try: while True: frame = _get_preview_frame() if frame is None: time.sleep(0.1) continue cv2 = _get_cv2() ret, jpg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 65]) if not ret: time.sleep(0.05) continue data = jpg.tobytes() self.wfile.write(b"--frame\r\n") self.wfile.write(b"Content-Type: image/jpeg\r\n") self.wfile.write(f"Content-Length: {len(data)}\r\n\r\n".encode('utf-8')) self.wfile.write(data) self.wfile.write(b"\r\n") self.wfile.flush() # small throttle time.sleep(0.08) except Exception as e: record_error( "photo_server", "preview_stream", e, {"client": self.client_address[0] if self.client_address else "unknown"}, ) return # Gallery page: serve gallery.html (client JS renders cards) if path == "/": tpl = _read_text_file(GALLERY_HTML) if tpl is None: self.send_response(404) self.end_headers() return html = tpl.replace("{{PHOTOS_DIR}}", str(folder)) # leave PHOTO_CARDS and MODE_TOGGLE for client-side JS to populate self._send_text(html, 200, "text/html; charset=utf-8") return # Otherwise: serve static files from photos/ (download/open) return super().do_GET() def do_POST(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path # Only used to update sanad script content currently if path == "/api/sanad_script": try: length = int(self.headers.get('Content-Length', '0')) body = self.rfile.read(length) if length else b"" # accept raw text or JSON {"script": "..."} try: payload = json.loads(body.decode('utf-8')) script = payload.get('script', '') except Exception: script = body.decode('utf-8') # write to SANAD_SCRIPT_FILE try: config.SANAD_SCRIPT_FILE.write_text(script, encoding='utf-8') self._send_json({"ok": True, "written": True}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/audio_prompt_record": try: length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) if length else b"" payload = json.loads(body.decode("utf-8")) if body else {} if not isinstance(payload, dict): self._send_json({"ok": False, "error": "invalid payload"}, 400) return key = str(payload.get("key", "") or "").strip() text = str(payload.get("text", "") or "").strip() filename = str(payload.get("filename", "") or "").strip() if not key: self._send_json({"ok": False, "error": "missing key"}, 400) return if not text: self._send_json({"ok": False, "error": "missing text"}, 400) return cur = _read_audio_prompt_record_state() if cur.get("running"): self._send_json({"ok": False, "error": "audio prompt recorder already running", "status": cur}, 409) return default_filename = audio_prompts.prompt_filename(key) _set_audio_prompt_record_state( running=True, key=key, filename=filename or default_filename, text=text, ok=False, error="", started_at=time.time(), finished_at=0.0, result={}, ) threading.Thread( target=_run_audio_prompt_record, args=(key, text, filename or default_filename), daemon=True, ).start() self._send_json({"ok": True, "status": _read_audio_prompt_record_state()}) except (ValueError, KeyError) as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/upload_person": try: form = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={ "REQUEST_METHOD": "POST", "CONTENT_TYPE": self.headers.get("Content-Type", ""), }, ) if "file" not in form: self._send_json({"ok": False, "error": "missing file"}, 400) return file_item = form["file"] if not getattr(file_item, "file", None): self._send_json({"ok": False, "error": "missing file payload"}, 400) return data = file_item.file.read() if not data: self._send_json({"ok": False, "error": "empty upload"}, 400) return person_id = str(form.getfirst("person_id", "") or "").strip() filename = str(getattr(file_item, "filename", "") or "") result = people_registry.import_person_photo(data, filename=filename, person_id=person_id) ok = bool(result.get("ok", False)) self._send_json(result, 200 if ok else 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/upload_audio_prompt": try: form = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={ "REQUEST_METHOD": "POST", "CONTENT_TYPE": self.headers.get("Content-Type", ""), }, ) key = str(form.getfirst("key", "") or "").strip() if not key: self._send_json({"ok": False, "error": "missing key"}, 400) return if "file" not in form: self._send_json({"ok": False, "error": "missing file"}, 400) return file_item = form["file"] if not getattr(file_item, "file", None): self._send_json({"ok": False, "error": "missing file payload"}, 400) return data = file_item.file.read() if not data: self._send_json({"ok": False, "error": "empty upload"}, 400) return filename = str(form.getfirst("filename", "") or getattr(file_item, "filename", "") or "") result = audio_prompts.save_audio_prompt(key, data, filename) self._send_json(result, 200 if bool(result.get("ok")) else 400) except (KeyError, ValueError) as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/upload_replay": try: form = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={ "REQUEST_METHOD": "POST", "CONTENT_TYPE": self.headers.get("Content-Type", ""), }, ) if "file" not in form: self._send_json({"ok": False, "error": "missing file"}, 400) return file_item = form["file"] if not getattr(file_item, "file", None): self._send_json({"ok": False, "error": "missing file payload"}, 400) return raw_name = form.getfirst("name", "") or getattr(file_item, "filename", "") or "uploaded_replay.jsonl" rel_name = _normalize_replay_name(raw_name) candidate = _resolve_replay_path(rel_name) if candidate.exists(): self._send_json({"ok": False, "error": "replay already exists"}, 409) return candidate.parent.mkdir(parents=True, exist_ok=True) data = file_item.file.read() if not data: self._send_json({"ok": False, "error": "empty upload"}, 400) return candidate.write_bytes(data) integrity = replay_file_integrity(candidate) if not integrity.get("ok"): try: candidate.unlink() except Exception: pass self._send_json({"ok": False, "error": "invalid replay file", "integrity": integrity}, 400) return self._send_json({"ok": True, "saved": rel_name, "integrity": integrity}) except ValueError as e: self._send_json({"ok": False, "error": str(e)}, 400) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # Upload / manage SANAD scripts if path == "/api/upload_script": try: # Accept JSON {"filename": "name.txt", "script": "..."} or raw text with ?filename=... length = int(self.headers.get('Content-Length', '0')) body = self.rfile.read(length) if length else b"" try: payload = json.loads(body.decode('utf-8')) filename = payload.get('filename') script = payload.get('script', '') except Exception: filename = urllib.parse.parse_qs(parsed.query).get('filename', [None])[0] script = body.decode('utf-8') if not filename: self._send_json({"ok": False, "error": "missing filename"}, 400) return scripts_dir = DATA_SCRIPTS_DIR scripts_dir.mkdir(parents=True, exist_ok=True) target = (scripts_dir / _safe_name(filename)).resolve() try: target.relative_to(scripts_dir) except Exception: self._send_json({"ok": False, "error": "invalid filename"}, 400) return target.write_text(script, encoding='utf-8') self._send_json({"ok": True, "saved": target.name}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/delete_script": try: length = int(self.headers.get('Content-Length', '0')) body = self.rfile.read(length) if length else b"" try: payload = json.loads(body.decode('utf-8')) name = payload.get('name') except Exception: # allow raw name in body name = body.decode('utf-8').strip() if not name: self._send_json({"ok": False, "error": "missing name"}, 400) return scripts_dir = DATA_SCRIPTS_DIR target = (scripts_dir / _safe_name(name)).resolve() try: target.relative_to(scripts_dir) except Exception: self._send_json({"ok": False, "error": "invalid path"}, 400) return if not target.exists(): self._send_json({"ok": False, "error": "not found"}, 404) return target.unlink() self._send_json({"ok": True, "deleted": name}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return if path == "/api/set_sanad_script": # Accept ?name=filename or JSON {"name":"..."} try: qs = urllib.parse.parse_qs(parsed.query) name = qs.get('name', [None])[0] if not name: length = int(self.headers.get('Content-Length', '0')) body = self.rfile.read(length) if length else b"" try: payload = json.loads(body.decode('utf-8')) name = payload.get('name') except Exception: name = None if not name: self._send_json({"ok": False, "error": "missing name"}, 400) return scripts_dir = DATA_SCRIPTS_DIR candidate = (scripts_dir / _safe_name(name)).resolve() try: candidate.relative_to(scripts_dir) except Exception: self._send_json({"ok": False, "error": "invalid script path"}, 400) return if not candidate.exists(): self._send_json({"ok": False, "error": "script not found"}, 404) return # copy contents into SANAD_SCRIPT_FILE try: txt = candidate.read_text(encoding='utf-8') config.SANAD_SCRIPT_FILE.write_text(txt, encoding='utf-8') self._send_json({"ok": True, "set": candidate.name}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # fallback self.send_response(404) self.end_headers() try: ThreadingHTTPServer.allow_reuse_address = True httpd = ThreadingHTTPServer((bind, port), Handler) except OSError as e: _log(lg, f"❌ Photo server failed: {e}", "error") # Assume another server already running _log(lg, f"🖼️ Phone gallery (existing): {url}", "info") return url def _serve(): try: httpd.serve_forever() except Exception: pass t = threading.Thread(target=_serve, daemon=True) t.start() _log(lg, f"🖼️ Photo server serving {folder} on {url}", "info") return url # ============================================================ # CLI mode (optional) # ============================================================ if __name__ == "__main__": photos = Path(os.environ.get("PHOTOS_DIR", str(DEFAULT_PHOTOS_DIR))).resolve() port = int(os.environ.get("PHOTO_SERVER_PORT", "8080")) start_photo_server(photos, port=port) print("Server running. Press Ctrl+C to stop.") try: while True: time.sleep(3600) except KeyboardInterrupt: pass