2196 lines
86 KiB
Python
2196 lines
86 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
photo_server.py
|
|
---------------
|
|
HTTP photo gallery server for the configured AI_Photographer photos directory.
|
|
|
|
✅ Features:
|
|
- Phone/Laptop gallery UI: GET /
|
|
- Serves photos directly (open/download): GET /<filename>
|
|
- Delete photo: GET /api/delete?name=<filename>
|
|
- Capture via unified replay pipeline: GET /api/capture
|
|
- Camera status: GET /api/status
|
|
- Camera test (RealSense enumerate via camera env python): GET /api/test
|
|
- Run RealSense USB fix script: GET /api/fix
|
|
- CSS/HTML split:
|
|
Web/gallery.html
|
|
Web/style.css
|
|
Served as:
|
|
/ -> gallery.html template (photo list injected)
|
|
/static/style.css -> style.css file
|
|
|
|
⚠️ Security note:
|
|
- This is an open LAN server (anyone on same WiFi can view/download/delete).
|
|
Use only on trusted networks.
|
|
|
|
Works well with your structure (no new folders).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import socket
|
|
import sys
|
|
import threading
|
|
import tempfile
|
|
import time
|
|
import urllib.parse
|
|
import subprocess
|
|
import cgi
|
|
from pathlib import Path
|
|
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
|
|
from typing import Optional, Callable, Dict, Any, List
|
|
from Core import settings as config
|
|
from Core import audio_prompts
|
|
from Core import audio_prompt_recorder
|
|
from Core.error_events import get_error_counters, record_error
|
|
from Core import people_registry
|
|
from Server.capture_service import replay_file_integrity, take_photo_sync
|
|
from Server import direct_camera_client
|
|
|
|
try:
|
|
from Core.Logger import Logs
|
|
except Exception:
|
|
Logs = None # type: ignore
|
|
|
|
|
|
# ============================================================
|
|
# Defaults / Paths
|
|
# ============================================================
|
|
SERVER_DIR = Path(__file__).resolve().parent
|
|
PROJECT_ROOT = SERVER_DIR.parent
|
|
SCRIPTS_DIR = (PROJECT_ROOT / "Scripts").resolve()
|
|
APP_DATA_DIR = Path(getattr(config, "APP_DATA_DIR", PROJECT_ROOT / "Data")).resolve()
|
|
DATA_SCRIPTS_DIR = Path(getattr(config, "APP_SCRIPTS_DIR", APP_DATA_DIR / "Scripts")).resolve()
|
|
AUTONOMOUS_STATE_FILE = Path(getattr(config, "AUTONOMOUS_STATE_FILE", APP_DATA_DIR / "Runtime" / "autonomous_state.json")).resolve()
|
|
RUNTIME_HEALTH_FILE = Path(getattr(config, "RUNTIME_HEALTH_FILE", APP_DATA_DIR / "Runtime" / "runtime_health.json")).resolve()
|
|
DEFAULT_PHOTOS_DIR = Path(getattr(config, "PHOTOS_DIR", PROJECT_ROOT / "photos" / "Captures")).resolve()
|
|
WEB_DIR = (PROJECT_ROOT / "Web").resolve()
|
|
|
|
GALLERY_HTML = WEB_DIR / "gallery.html"
|
|
STYLE_CSS = WEB_DIR / "style.css"
|
|
GALLERY_JS = WEB_DIR / "gallery.js"
|
|
|
|
FIX_SCRIPT_ENV = os.environ.get("FIX_SCRIPT_PATH", "").strip()
|
|
_DEFAULT_RUNTIME_HOME = Path(os.environ.get("HOME", "~")).expanduser()
|
|
CAMERA_ENV_PY = os.environ.get(
|
|
"TELEIMAGER_PY",
|
|
str((_DEFAULT_RUNTIME_HOME / "miniconda3" / "envs" / "teleimager" / "bin" / "python").resolve()),
|
|
)
|
|
|
|
# Preview fallback controls
|
|
PREVIEW_USE_OPENCV_FALLBACK = os.environ.get("PREVIEW_USE_OPENCV_FALLBACK", "1").strip().lower() not in (
|
|
"0", "false", "no", "off"
|
|
)
|
|
PREVIEW_RETRY_SEC = float(os.environ.get("PREVIEW_RETRY_SEC", "2.0"))
|
|
|
|
# Shared preview camera state to avoid reopening camera on every frame
|
|
_PREVIEW_CAM_LOCK = threading.Lock()
|
|
_PREVIEW_CAM: Optional[Any] = None
|
|
_PREVIEW_LAST_OPEN_TRY = 0.0
|
|
_CV2_MODULE = None
|
|
_CAMERA_CAPTURE_CLS = None
|
|
|
|
_RECORDER_LOCK = threading.Lock()
|
|
_RECORDER_STATE: Dict[str, Any] = {
|
|
"running": False,
|
|
"name": "",
|
|
"seconds": 0.0,
|
|
"output": "",
|
|
"returncode": None,
|
|
"ok": False,
|
|
"error": "",
|
|
"started_at": 0.0,
|
|
"finished_at": 0.0,
|
|
"stdout_tail": [],
|
|
}
|
|
|
|
_REPLAY_TEST_LOCK = threading.Lock()
|
|
_REPLAY_TEST_STATE: Dict[str, Any] = {
|
|
"running": False,
|
|
"name": "",
|
|
"ok": False,
|
|
"error": "",
|
|
"result": "",
|
|
"started_at": 0.0,
|
|
"finished_at": 0.0,
|
|
}
|
|
|
|
_AUDIO_PROMPT_RECORD_LOCK = threading.Lock()
|
|
_AUDIO_PROMPT_RECORD_STATE: Dict[str, Any] = {
|
|
"running": False,
|
|
"key": "",
|
|
"filename": "",
|
|
"text": "",
|
|
"ok": False,
|
|
"error": "",
|
|
"started_at": 0.0,
|
|
"finished_at": 0.0,
|
|
"result": {},
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# Logging helpers
|
|
# ============================================================
|
|
def _make_logger(log_name: str = "photo_server"):
|
|
if Logs is None:
|
|
return None
|
|
lg = Logs()
|
|
try:
|
|
lg.LogEngine("G1_Logs", log_name)
|
|
except Exception:
|
|
pass
|
|
return lg
|
|
|
|
|
|
def _log(lg, msg: str, level: str = "info"):
|
|
# level: info / warning / error
|
|
if lg is not None:
|
|
try:
|
|
lg.print_and_log(msg, message_type=level)
|
|
return
|
|
except Exception:
|
|
pass
|
|
print(msg)
|
|
|
|
|
|
# ============================================================
|
|
# Utilities
|
|
# ============================================================
|
|
def _get_local_ip() -> str:
|
|
"""
|
|
Best-effort IP that phones can reach (same WiFi).
|
|
"""
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("8.8.8.8", 80))
|
|
ip = s.getsockname()[0]
|
|
s.close()
|
|
return ip
|
|
except Exception:
|
|
return "127.0.0.1"
|
|
|
|
|
|
def _safe_name(name: str) -> str:
|
|
"""
|
|
Prevent path traversal: keep only filename (no dirs).
|
|
"""
|
|
name = name.strip().replace("\\", "/")
|
|
name = name.split("/")[-1]
|
|
return name
|
|
|
|
|
|
def _fix_script_candidates() -> List[Path]:
|
|
cands: List[Path] = []
|
|
if FIX_SCRIPT_ENV:
|
|
p = Path(FIX_SCRIPT_ENV).expanduser()
|
|
if not p.is_absolute():
|
|
p = (PROJECT_ROOT / p).resolve()
|
|
cands.append(p)
|
|
cands.append((SCRIPTS_DIR / "fix_realsense_usb.sh").resolve())
|
|
cands.append((PROJECT_ROOT / "fix_realsense_usb.sh").resolve())
|
|
return cands
|
|
|
|
|
|
def _resolve_fix_script() -> Optional[Path]:
|
|
for p in _fix_script_candidates():
|
|
if p.exists() and p.is_file():
|
|
return p
|
|
return None
|
|
|
|
|
|
def _get_cv2():
|
|
global _CV2_MODULE
|
|
if _CV2_MODULE is None:
|
|
import cv2
|
|
|
|
_CV2_MODULE = cv2
|
|
return _CV2_MODULE
|
|
|
|
|
|
def _get_camera_capture_cls():
|
|
global _CAMERA_CAPTURE_CLS
|
|
if _CAMERA_CAPTURE_CLS is None:
|
|
from Modes.AI.camera_module import CameraCapture
|
|
|
|
_CAMERA_CAPTURE_CLS = CameraCapture
|
|
return _CAMERA_CAPTURE_CLS
|
|
|
|
|
|
def _is_photo_file(p: Path) -> bool:
|
|
if not p.is_file():
|
|
return False
|
|
ext = p.suffix.lower()
|
|
return ext in (".jpg", ".jpeg", ".png", ".webp")
|
|
|
|
|
|
def _list_photos(folder: Path) -> List[Dict[str, Any]]:
|
|
items: List[Dict[str, Any]] = []
|
|
folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
for p in sorted(folder.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
|
|
if not _is_photo_file(p):
|
|
continue
|
|
st = p.stat()
|
|
items.append(
|
|
{
|
|
"name": p.name,
|
|
"size": st.st_size,
|
|
"mtime": st.st_mtime,
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def _group_sessions(items: List[Dict[str, Any]], gap_sec: int = 120) -> List[Dict[str, Any]]:
|
|
"""Group photos into sessions when gaps between photos exceed gap_sec."""
|
|
sessions = []
|
|
cur = None
|
|
for it in sorted(items, key=lambda x: x["mtime"]):
|
|
t = it["mtime"]
|
|
if cur is None:
|
|
cur = {"id": len(sessions) + 1, "start": t, "end": t, "photos": [it["name"]]}
|
|
continue
|
|
if t - cur["end"] > gap_sec:
|
|
sessions.append(cur)
|
|
cur = {"id": len(sessions) + 1, "start": t, "end": t, "photos": [it["name"]]}
|
|
else:
|
|
cur["photos"].append(it["name"])
|
|
cur["end"] = t
|
|
|
|
if cur is not None:
|
|
sessions.append(cur)
|
|
return sessions
|
|
|
|
|
|
def _read_text_file(path: Path) -> Optional[str]:
|
|
try:
|
|
return path.read_text(encoding="utf-8")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _read_bytes_file(path: Path) -> Optional[bytes]:
|
|
try:
|
|
return path.read_bytes()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _read_mode() -> str:
|
|
try:
|
|
m = str(config.read_runtime_mode()).strip().lower()
|
|
except Exception:
|
|
m = "manual"
|
|
if m == "command":
|
|
return "ai"
|
|
if m not in ("manual", "ai"):
|
|
return "manual"
|
|
return m
|
|
|
|
|
|
def _read_detector_backend() -> str:
|
|
try:
|
|
b = str(config.read_vision_detector_backend()).strip().lower()
|
|
except Exception:
|
|
b = "normal"
|
|
if b not in ("yolo", "normal"):
|
|
return "normal"
|
|
return b
|
|
|
|
|
|
def _strict_yolo_required() -> bool:
|
|
try:
|
|
return bool(config.read_vision_yolo_strict_required())
|
|
except Exception:
|
|
return bool(getattr(config, "VISION_YOLO_STRICT_REQUIRED", False))
|
|
|
|
|
|
def _read_ai_readiness() -> Dict[str, Any]:
|
|
from Modes.AI.vision_detector import probe_ai_readiness
|
|
|
|
backend = _read_detector_backend()
|
|
strict_required = _strict_yolo_required()
|
|
readiness = probe_ai_readiness(backend=backend, strict_required=strict_required)
|
|
state = _read_autonomous_state()
|
|
|
|
ai_blocked = bool(state.get("ai_blocked", False))
|
|
ai_block_reason = str(state.get("ai_block_reason", "") or "").strip()
|
|
if ai_blocked and ai_block_reason:
|
|
readiness["ok"] = False
|
|
readiness["block_reason"] = ai_block_reason
|
|
|
|
return {
|
|
"ok": bool(readiness.get("ok", False)),
|
|
"strict_required": bool(strict_required),
|
|
"backend": backend,
|
|
"yolo_runtime": str(readiness.get("yolo_runtime", "") or ""),
|
|
"yolo_loaded": bool(readiness.get("yolo_loaded", False)),
|
|
"person_model_ok": bool(readiness.get("person_model_ok", False)),
|
|
"face_model_ok": bool(readiness.get("face_model_ok", False)),
|
|
"block_reason": str(readiness.get("block_reason", "") or ""),
|
|
"person_model_path": str(readiness.get("person_model_path", "") or ""),
|
|
"face_model_path": str(readiness.get("face_model_path", "") or ""),
|
|
}
|
|
|
|
|
|
def _read_runtime_health() -> Dict[str, Any]:
|
|
default = {
|
|
"ws_connected": False,
|
|
"ws_state": "detached",
|
|
"ws_restarts": 0,
|
|
"ws_last_error": "",
|
|
"mic_enabled": True,
|
|
"mic_state": "idle",
|
|
"mic_restarts": 0,
|
|
"mic_last_error": "",
|
|
"speaker_state": "idle",
|
|
"speaker_restarts": 0,
|
|
"speaker_last_error": "",
|
|
"audio_gate_open": False,
|
|
"time": 0.0,
|
|
}
|
|
try:
|
|
if not RUNTIME_HEALTH_FILE.exists():
|
|
return default
|
|
raw = json.loads(RUNTIME_HEALTH_FILE.read_text(encoding="utf-8"))
|
|
if not isinstance(raw, dict):
|
|
return default
|
|
out = dict(default)
|
|
out.update(raw)
|
|
return out
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _read_ai_options() -> Dict[str, Any]:
|
|
try:
|
|
hard = bool(config.read_vision_hard_target_lock_enabled())
|
|
except Exception:
|
|
hard = True
|
|
try:
|
|
retake = bool(config.read_vision_retake_prompt_enabled())
|
|
except Exception:
|
|
retake = True
|
|
try:
|
|
greeting_replay_enabled = bool(config.read_vision_autonomous_greeting_replay_enabled())
|
|
except Exception:
|
|
greeting_replay_enabled = True
|
|
try:
|
|
greeting_replay_file = str(config.read_vision_autonomous_greeting_replay_file() or "").strip()
|
|
except Exception:
|
|
greeting_replay_file = "right_hand_up.jsonl"
|
|
try:
|
|
capture_replay_enabled = bool(config.read_vision_autonomous_capture_replay_enabled())
|
|
except Exception:
|
|
capture_replay_enabled = True
|
|
try:
|
|
face_recognition_enabled = bool(config.read_vision_face_recognition_enabled())
|
|
except Exception:
|
|
face_recognition_enabled = True
|
|
try:
|
|
face_recognition_threshold = float(config.read_vision_face_recognition_threshold())
|
|
except Exception:
|
|
face_recognition_threshold = 0.88
|
|
return {
|
|
"hard_target_lock_enabled": hard,
|
|
"retake_prompt_enabled": retake,
|
|
"autonomous_greeting_replay_enabled": greeting_replay_enabled,
|
|
"autonomous_greeting_replay_file": greeting_replay_file,
|
|
"autonomous_capture_replay_enabled": capture_replay_enabled,
|
|
"face_recognition_enabled": face_recognition_enabled,
|
|
"face_recognition_threshold": face_recognition_threshold,
|
|
"active_replay": _read_active_replay_name(),
|
|
}
|
|
|
|
|
|
def _read_mic_options() -> Dict[str, Any]:
|
|
try:
|
|
enabled = bool(config.read_gemini_mic_enabled())
|
|
except Exception:
|
|
enabled = True
|
|
return {"mic_enabled": enabled}
|
|
|
|
|
|
def _safe_relative_name(name: str) -> str:
|
|
raw = str(name or "").strip().replace("\\", "/").lstrip("/")
|
|
if not raw:
|
|
return ""
|
|
p = Path(raw)
|
|
if p.is_absolute():
|
|
return ""
|
|
parts = [part for part in p.parts if part not in ("", ".")]
|
|
if any(part == ".." for part in parts):
|
|
return ""
|
|
return Path(*parts).as_posix() if parts else ""
|
|
|
|
|
|
def _normalize_replay_name(name: str, default_parent: str | None = None) -> str:
|
|
safe = _safe_relative_name(name)
|
|
if not safe:
|
|
raise ValueError("invalid replay path")
|
|
|
|
p = Path(safe)
|
|
if len(p.parts) == 1 and default_parent:
|
|
p = Path(default_parent) / p.name
|
|
if p.suffix.lower() != ".jsonl":
|
|
p = p.with_suffix(".jsonl")
|
|
if p.name == config.HOME_FILE.name:
|
|
raise ValueError("reserved replay name")
|
|
return p.as_posix()
|
|
|
|
|
|
def _resolve_replay_path(name: str) -> Path:
|
|
candidate = config.resolve_replay_path(_normalize_replay_name(name))
|
|
try:
|
|
candidate.relative_to(config.DATA_DIR)
|
|
except Exception as e:
|
|
raise ValueError("invalid replay path") from e
|
|
return candidate
|
|
|
|
|
|
def _is_replay_file(path: Path) -> bool:
|
|
return path.is_file() and path.suffix.lower() == ".jsonl" and path.name != config.HOME_FILE.name
|
|
|
|
|
|
def _read_active_replay_name() -> str:
|
|
try:
|
|
return config.read_selected_replay_name()
|
|
except Exception:
|
|
return str(config.REPLAY_FILE.name)
|
|
|
|
|
|
def _list_replays() -> List[Dict[str, Any]]:
|
|
items: List[Dict[str, Any]] = []
|
|
root = config.DATA_DIR.resolve()
|
|
root.mkdir(parents=True, exist_ok=True)
|
|
active_name = _read_active_replay_name()
|
|
greeting_name = str(config.read_vision_autonomous_greeting_replay_file() or "").strip().replace("\\", "/")
|
|
for path in sorted(root.rglob("*.jsonl")):
|
|
if not _is_replay_file(path):
|
|
continue
|
|
try:
|
|
rel = str(path.resolve().relative_to(root)).replace("\\", "/")
|
|
except Exception:
|
|
rel = path.name
|
|
st = path.stat()
|
|
integrity = replay_file_integrity(path)
|
|
items.append(
|
|
{
|
|
"name": rel,
|
|
"label": path.name,
|
|
"size": st.st_size,
|
|
"mtime": st.st_mtime,
|
|
"active": rel == active_name,
|
|
"greeting": rel == greeting_name or path.name == greeting_name,
|
|
"trigger_ok": bool(integrity.get("trigger_ok", False)),
|
|
"ok": bool(integrity.get("ok", False)),
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def _replay_recorder_candidates() -> List[Path]:
|
|
candidates = []
|
|
env_path = os.environ.get("REPLAY_RECORDER_SCRIPT", "").strip()
|
|
if env_path:
|
|
candidates.append(Path(env_path).expanduser())
|
|
cfg_raw = str(getattr(config, "REPLAY_RECORDER_SCRIPT", "") or "").strip()
|
|
if cfg_raw:
|
|
cfg_path = Path(cfg_raw).expanduser()
|
|
candidates.append(cfg_path)
|
|
candidates.append((PROJECT_ROOT.parent / "G1_Lootah" / "Manual_Recorder" / "g1_teach_v4_stable.py").resolve())
|
|
candidates.append((PROJECT_ROOT.parents[1] / "G1_Lootah" / "Manual_Recorder" / "g1_teach_v4_stable.py").resolve())
|
|
candidates.append((Path.home() / "G1_Lootah" / "Manual_Recorder" / "g1_teach_v4_stable.py").resolve())
|
|
return [p if p.is_absolute() else p.resolve() for p in candidates]
|
|
|
|
|
|
def _resolve_replay_recorder_script() -> Optional[Path]:
|
|
for candidate in _replay_recorder_candidates():
|
|
try:
|
|
if candidate.exists() and candidate.is_file():
|
|
return candidate
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _read_recorder_state() -> Dict[str, Any]:
|
|
with _RECORDER_LOCK:
|
|
return dict(_RECORDER_STATE)
|
|
|
|
|
|
def _set_recorder_state(**kwargs):
|
|
with _RECORDER_LOCK:
|
|
_RECORDER_STATE.update(kwargs)
|
|
|
|
|
|
def _read_replay_test_state() -> Dict[str, Any]:
|
|
with _REPLAY_TEST_LOCK:
|
|
return dict(_REPLAY_TEST_STATE)
|
|
|
|
|
|
def _set_replay_test_state(**kwargs):
|
|
with _REPLAY_TEST_LOCK:
|
|
_REPLAY_TEST_STATE.update(kwargs)
|
|
|
|
|
|
def _read_audio_prompt_record_state() -> Dict[str, Any]:
|
|
with _AUDIO_PROMPT_RECORD_LOCK:
|
|
return dict(_AUDIO_PROMPT_RECORD_STATE)
|
|
|
|
|
|
def _set_audio_prompt_record_state(**kwargs):
|
|
with _AUDIO_PROMPT_RECORD_LOCK:
|
|
_AUDIO_PROMPT_RECORD_STATE.update(kwargs)
|
|
|
|
|
|
def _run_audio_prompt_record(key: str, text: str, filename: str):
|
|
try:
|
|
result = audio_prompt_recorder.record_prompt_from_text(key, text, filename=filename)
|
|
_set_audio_prompt_record_state(
|
|
running=False,
|
|
ok=True,
|
|
error="",
|
|
finished_at=time.time(),
|
|
result=result,
|
|
)
|
|
except Exception as e:
|
|
_set_audio_prompt_record_state(
|
|
running=False,
|
|
ok=False,
|
|
error=str(e),
|
|
finished_at=time.time(),
|
|
result={},
|
|
)
|
|
|
|
|
|
def _run_replay_recorder(name: str, seconds: float):
|
|
script = _resolve_replay_recorder_script()
|
|
if script is None:
|
|
_set_recorder_state(
|
|
running=False,
|
|
ok=False,
|
|
error="recorder script not found",
|
|
finished_at=time.time(),
|
|
)
|
|
return
|
|
try:
|
|
from Modes.Manual.controller import auto_pick_iface
|
|
|
|
iface = auto_pick_iface()
|
|
except Exception as e:
|
|
_set_recorder_state(
|
|
running=False,
|
|
ok=False,
|
|
error=f"failed to resolve DDS interface: {e}",
|
|
finished_at=time.time(),
|
|
)
|
|
return
|
|
|
|
output_path = _resolve_replay_path(name)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
cmd = [
|
|
sys.executable,
|
|
str(script),
|
|
iface,
|
|
"--output",
|
|
str(output_path),
|
|
"--seconds",
|
|
str(seconds),
|
|
"--home",
|
|
str(config.HOME_FILE),
|
|
]
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
input="y\nn\n",
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=max(30.0, seconds + 45.0),
|
|
cwd=str(PROJECT_ROOT),
|
|
)
|
|
combined = ((proc.stdout or "") + "\n" + (proc.stderr or "")).strip()
|
|
tail = [line for line in combined.splitlines() if line.strip()][-25:]
|
|
saved_path = ""
|
|
for line in tail:
|
|
marker = "💾 Saved "
|
|
if marker in line and " to " in line:
|
|
saved_path = line.split(" to ", 1)[1].strip()
|
|
ok = proc.returncode == 0 and bool(saved_path)
|
|
_set_recorder_state(
|
|
running=False,
|
|
ok=ok,
|
|
error="" if ok else (tail[-1] if tail else f"recorder failed rc={proc.returncode}"),
|
|
returncode=proc.returncode,
|
|
finished_at=time.time(),
|
|
output=saved_path,
|
|
stdout_tail=tail,
|
|
)
|
|
except Exception as e:
|
|
_set_recorder_state(
|
|
running=False,
|
|
ok=False,
|
|
error=str(e),
|
|
finished_at=time.time(),
|
|
stdout_tail=[],
|
|
)
|
|
|
|
|
|
def _run_replay_test(name: str, replay_test_func: Callable[[str], str]):
|
|
try:
|
|
result = replay_test_func(name)
|
|
ok = not str(result or "").startswith("[ERR]")
|
|
_set_replay_test_state(
|
|
running=False,
|
|
ok=ok,
|
|
error="" if ok else str(result or "").replace("[ERR] ", "").replace("[ERR]", "").strip(),
|
|
result=str(result or ""),
|
|
finished_at=time.time(),
|
|
)
|
|
except Exception as e:
|
|
_set_replay_test_state(
|
|
running=False,
|
|
ok=False,
|
|
error=str(e),
|
|
result="",
|
|
finished_at=time.time(),
|
|
)
|
|
|
|
|
|
def _update_replay_references_after_rename(old_rel: str, new_rel: str, new_path: Path):
|
|
if _read_active_replay_name() == old_rel:
|
|
config.write_selected_replay_name(new_rel)
|
|
config.REPLAY_FILE = new_path
|
|
greeting_name = str(config.read_vision_autonomous_greeting_replay_file() or "").strip().replace("\\", "/")
|
|
if greeting_name == old_rel:
|
|
config.write_vision_autonomous_greeting_replay_file(new_rel)
|
|
|
|
|
|
def _is_greeting_replay(rel_name: str) -> bool:
|
|
greeting_name = str(config.read_vision_autonomous_greeting_replay_file() or "").strip().replace("\\", "/")
|
|
return rel_name == greeting_name
|
|
|
|
|
|
def _mode_policy(mode: str) -> Dict[str, Any]:
|
|
mode = mode if mode in ("manual", "ai") else "manual"
|
|
if mode == "manual":
|
|
return {
|
|
"voice_request_photo": False,
|
|
"description": "Manual mode: Gemini mic can stay on, but mapped photo commands and AI detection are disabled.",
|
|
}
|
|
if mode == "ai":
|
|
return {
|
|
"voice_request_photo": True,
|
|
"description": "AI mode: mapped voice can request photo.",
|
|
}
|
|
return {
|
|
"voice_request_photo": False,
|
|
"description": "Unknown mode.",
|
|
}
|
|
|
|
|
|
def _read_autonomous_state() -> Dict[str, Any]:
|
|
default: Dict[str, Any] = {
|
|
"state": "IDLE",
|
|
"session_id": 0,
|
|
"interaction_active": False,
|
|
"intent_detected": False,
|
|
"detector_backend": "normal",
|
|
"ai_blocked": False,
|
|
"ai_block_reason": "",
|
|
"person_count": 0,
|
|
"face_count": 0,
|
|
"group_count": 0,
|
|
"group_size": 0,
|
|
"group_detected": False,
|
|
"max_area": 0.0,
|
|
"audio_gate_open": False,
|
|
"subject_id": None,
|
|
"subject_visible": False,
|
|
"target_lock_active": False,
|
|
"target_lock_type": "",
|
|
"target_lock_id": None,
|
|
"target_switch_blocked_count": 0,
|
|
"depth_m": None,
|
|
"approach_speed_mps": 0.0,
|
|
"camera_ok": False,
|
|
"depth_ok": False,
|
|
"camera_restarts": 0,
|
|
"depth_restarts": 0,
|
|
"ws_connected": False,
|
|
"mic_state": "",
|
|
"speaker_state": "",
|
|
"cooldown_remaining": 0.0,
|
|
"confirm_timeout_remaining": 0.0,
|
|
"framing_timeout_remaining": 0.0,
|
|
"countdown_remaining": 0.0,
|
|
"retake_prompt_enabled": False,
|
|
"retake_recommended": False,
|
|
"retake_reason": "",
|
|
"retake_count": 0,
|
|
"retake_limit": 0,
|
|
"recognized_person_id": "",
|
|
"recognized_person_known": False,
|
|
"recognized_person_new": False,
|
|
"recognized_person_label": "",
|
|
"recognized_person_match_score": 0.0,
|
|
"recognized_person_created_date": "",
|
|
"time": 0.0,
|
|
}
|
|
try:
|
|
if not AUTONOMOUS_STATE_FILE.exists():
|
|
return default
|
|
data = json.loads(AUTONOMOUS_STATE_FILE.read_text(encoding="utf-8"))
|
|
if not isinstance(data, dict):
|
|
return default
|
|
merged = dict(default)
|
|
merged.update(data)
|
|
return merged
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
# ============================================================
|
|
# Capture/Test/Fix implementations
|
|
# ============================================================
|
|
def _capture_via_service() -> str:
|
|
"""
|
|
Capture a photo using the shared capture service.
|
|
"""
|
|
try:
|
|
return take_photo_sync(prefix="photo")
|
|
except Exception as e:
|
|
return f"[ERR] capture exception: {e}"
|
|
|
|
|
|
def _preview_camera_source() -> Optional[str]:
|
|
src = os.environ.get("CAMERA_DEVICE", "").strip()
|
|
if src:
|
|
return src
|
|
idx = os.environ.get("CAMERA_INDEX", "").strip()
|
|
return idx or None
|
|
|
|
|
|
def _get_preview_frame(zmq_host: str = "127.0.0.1", zmq_port: int = 55555, video_source: Optional[str] = None):
|
|
"""Best-effort single frame for MJPEG preview.
|
|
Tries direct camera server, then CameraCapture fallback.
|
|
Returns BGR frame or None.
|
|
"""
|
|
global _PREVIEW_CAM, _PREVIEW_LAST_OPEN_TRY
|
|
try:
|
|
if direct_camera_client.is_enabled():
|
|
frame = direct_camera_client.frame_bgr(timeout=2.0)
|
|
if frame is not None:
|
|
return frame
|
|
|
|
if not PREVIEW_USE_OPENCV_FALLBACK:
|
|
return None
|
|
|
|
now = time.time()
|
|
with _PREVIEW_CAM_LOCK:
|
|
if _PREVIEW_CAM is None and (now - _PREVIEW_LAST_OPEN_TRY) >= PREVIEW_RETRY_SEC:
|
|
_PREVIEW_LAST_OPEN_TRY = now
|
|
src = video_source if video_source is not None else _preview_camera_source()
|
|
cam = _get_camera_capture_cls()(src)
|
|
if cam.initialize():
|
|
_PREVIEW_CAM = cam
|
|
else:
|
|
try:
|
|
cam.release()
|
|
except Exception:
|
|
pass
|
|
_PREVIEW_CAM = None
|
|
|
|
if _PREVIEW_CAM is None:
|
|
return None
|
|
|
|
frame = _PREVIEW_CAM.capture_frame()
|
|
if frame is None:
|
|
# If device dropped, close it and retry after cooldown.
|
|
cap = getattr(_PREVIEW_CAM, "cap", None)
|
|
if cap is None or not cap.isOpened():
|
|
try:
|
|
_PREVIEW_CAM.release()
|
|
except Exception:
|
|
pass
|
|
_PREVIEW_CAM = None
|
|
return None
|
|
|
|
return frame
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _realsense_test() -> Dict[str, Any]:
|
|
"""
|
|
Quick test using the camera env python:
|
|
- import pyrealsense2
|
|
- count devices
|
|
"""
|
|
try:
|
|
code = (
|
|
"import pyrealsense2 as rs\n"
|
|
"ctx = rs.context()\n"
|
|
"devs = list(ctx.query_devices())\n"
|
|
"print(len(devs))\n"
|
|
"print([d.get_info(rs.camera_info.serial_number) for d in devs])\n"
|
|
)
|
|
p = subprocess.run(
|
|
[CAMERA_ENV_PY, "-c", code],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
out = (p.stdout or "").strip().splitlines()
|
|
err = (p.stderr or "").strip()
|
|
|
|
if p.returncode != 0:
|
|
return {"ok": False, "rc": p.returncode, "error": err or "realsense test failed", "stdout": out[-20:]}
|
|
|
|
count = int(out[0]) if out else 0
|
|
serials = []
|
|
if len(out) >= 2:
|
|
# second line prints python list
|
|
serials = out[1]
|
|
return {"ok": True, "devices": count, "serials": serials, "rc": 0}
|
|
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
|
|
def _run_fix_script() -> Dict[str, Any]:
|
|
"""
|
|
Runs fix_realsense_usb.sh in NON-INTERACTIVE mode.
|
|
If sudo password is required, it fails gracefully (no terminal prompt).
|
|
"""
|
|
fix_script = _resolve_fix_script()
|
|
if not fix_script:
|
|
checked = [str(p) for p in _fix_script_candidates()]
|
|
return {"ok": False, "error": "fix script not found", "checked": checked}
|
|
|
|
try:
|
|
# IMPORTANT: -n = non-interactive (won't prompt for password)
|
|
p = subprocess.run(
|
|
["sudo", "-n", "bash", str(fix_script)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=40,
|
|
)
|
|
|
|
out = (p.stdout or "").strip().splitlines()[-80:]
|
|
err = (p.stderr or "").strip().splitlines()[-80:]
|
|
|
|
if p.returncode == 0:
|
|
return {"ok": True, "rc": 0, "stdout": out, "stderr": err}
|
|
|
|
# Most common case: sudo needs password
|
|
joined_err = "\n".join(err).lower()
|
|
if "a password is required" in joined_err or "sudo:" in joined_err:
|
|
return {
|
|
"ok": False,
|
|
"rc": p.returncode,
|
|
"error": f"sudo password required. Run manually in terminal: sudo bash {fix_script}",
|
|
"stdout": out,
|
|
"stderr": err,
|
|
}
|
|
|
|
return {"ok": False, "rc": p.returncode, "stdout": out, "stderr": err}
|
|
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
|
|
|
|
# ============================================================
|
|
# HTML template fallback (if Scripts/gallery.html missing)
|
|
# ============================================================
|
|
# All HTML/CSS/JS files live in WEB_DIR. The server reads them at runtime.
|
|
|
|
|
|
# ============================================================
|
|
# Main server factory
|
|
# ============================================================
|
|
def start_photo_server(
|
|
folder: Path = DEFAULT_PHOTOS_DIR,
|
|
port: int = 8080,
|
|
bind: str = "0.0.0.0",
|
|
capture_func: Optional[Callable[[], str]] = None,
|
|
replay_test_func: Optional[Callable[[str], str]] = None,
|
|
logger=None,
|
|
) -> str:
|
|
"""
|
|
Starts the gallery server in a daemon thread.
|
|
Returns the URL (best-effort reachable IP).
|
|
|
|
If port is already in use, it logs and returns the URL anyway (assumes old server is running).
|
|
"""
|
|
lg = logger or _make_logger("photo_server.log")
|
|
folder = folder.resolve()
|
|
folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
ip = _get_local_ip()
|
|
url = f"http://{ip}:{port}/"
|
|
|
|
if capture_func is None:
|
|
capture_func = _capture_via_service
|
|
|
|
class Handler(SimpleHTTPRequestHandler):
|
|
QUIET_PATHS = {
|
|
"/api/autonomous_state",
|
|
"/api/runtime_health",
|
|
"/api/photos",
|
|
"/api/people",
|
|
"/api/audio_prompts",
|
|
"/api/audio_prompt_record_status",
|
|
"/api/set_audio_prompt_mode",
|
|
"/api/set_audio_prompt_fallback",
|
|
"/api/replays",
|
|
"/api/get_replay",
|
|
"/api/replay_record_status",
|
|
"/api/replay_test_status",
|
|
"/api/mode",
|
|
"/api/ai_readiness",
|
|
"/api/ai_options",
|
|
"/api/detector_backend",
|
|
"/api/camera_health",
|
|
"/api/camera_sources",
|
|
"/api/person_image",
|
|
"/preview.mjpg",
|
|
}
|
|
|
|
# Serve files from PHOTOS folder by default
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, directory=str(folder), **kwargs)
|
|
|
|
def log_message(self, fmt, *args):
|
|
path = urllib.parse.urlparse(getattr(self, "path", "")).path
|
|
if path.startswith("/static/"):
|
|
return
|
|
if path in self.QUIET_PATHS:
|
|
return
|
|
if path.endswith(".jpg") or path.endswith(".jpeg") or path.endswith(".png") or path.endswith(".zip"):
|
|
return
|
|
status = str(args[1]) if len(args) > 1 else "?"
|
|
print(f"🌐 HTTP {getattr(self, 'command', '?')} {path} -> {status}")
|
|
|
|
def _send_json(self, obj: Dict[str, Any], code: int = 200):
|
|
data = json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def _send_text(self, text: str, code: int = 200, ctype: str = "text/plain; charset=utf-8"):
|
|
data = text.encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", ctype)
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def do_GET(self):
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
path = parsed.path
|
|
q = urllib.parse.parse_qs(parsed.query)
|
|
|
|
# Ignore favicon noise
|
|
if path == "/favicon.ico":
|
|
self.send_response(204)
|
|
self.end_headers()
|
|
return
|
|
|
|
# Serve static CSS/JS from Web folder
|
|
if path == "/static/style.css":
|
|
css = _read_bytes_file(STYLE_CSS)
|
|
if css is None:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
return
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/css; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(css)))
|
|
self.end_headers()
|
|
self.wfile.write(css)
|
|
return
|
|
|
|
if path == "/static/gallery.js":
|
|
js = _read_text_file(GALLERY_JS)
|
|
if js is None:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
return
|
|
data = js.encode("utf-8")
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/javascript; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
return
|
|
|
|
# API endpoints
|
|
if path == "/api/status":
|
|
items = _list_photos(folder)
|
|
res = {
|
|
"ok": True,
|
|
"photos_dir": str(folder),
|
|
"count": len(items),
|
|
"latest": items[0]["name"] if items else None,
|
|
"url": url,
|
|
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
}
|
|
self._send_json(res, 200)
|
|
return
|
|
|
|
if path == "/api/errors":
|
|
self._send_json({"ok": True, "counters": get_error_counters()})
|
|
return
|
|
|
|
if path == "/api/mode":
|
|
# return current mode
|
|
m = _read_mode()
|
|
self._send_json({"ok": True, "mode": m, "policy": _mode_policy(m)})
|
|
return
|
|
|
|
if path == "/api/detector_backend":
|
|
b = _read_detector_backend()
|
|
strict_required = _strict_yolo_required()
|
|
mode = _read_mode()
|
|
locked = bool(strict_required and mode == "ai")
|
|
self._send_json(
|
|
{
|
|
"ok": True,
|
|
"backend": b,
|
|
"yolo_runtime": str(config.read_vision_yolo_runtime()),
|
|
"options": ["normal", "yolo"],
|
|
"strict_required": strict_required,
|
|
"locked": locked,
|
|
}
|
|
)
|
|
return
|
|
|
|
if path == "/api/ai_readiness":
|
|
data = _read_ai_readiness()
|
|
self._send_json(data, 200 if data.get("ok") else 503)
|
|
return
|
|
|
|
if path == "/api/runtime_health":
|
|
self._send_json({"ok": True, "health": _read_runtime_health()})
|
|
return
|
|
|
|
if path == "/api/camera_health":
|
|
try:
|
|
self._send_json(direct_camera_client.health(timeout=3.0))
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e), "camera": {}}, 503)
|
|
return
|
|
|
|
if path == "/api/camera_sources":
|
|
try:
|
|
self._send_json(direct_camera_client.cameras(timeout=3.0))
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e), "options": []}, 503)
|
|
return
|
|
|
|
if path == "/api/set_camera_source":
|
|
source = q.get("source", [""])[0].strip()
|
|
if not source:
|
|
self._send_json({"ok": False, "error": "missing source"}, 400)
|
|
return
|
|
try:
|
|
self._send_json(direct_camera_client.set_source(source, timeout=8.0))
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_camera_resolution":
|
|
try:
|
|
width = int((q.get("width") or [""])[0])
|
|
height = int((q.get("height") or [""])[0])
|
|
fps = int((q.get("fps") or [""])[0])
|
|
except Exception:
|
|
self._send_json({"ok": False, "error": "width, height, and fps are required integers"}, 400)
|
|
return
|
|
try:
|
|
self._send_json(direct_camera_client.set_resolution(width, height, fps, timeout=8.0))
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_preferred_camera":
|
|
serial = q.get("serial", [""])[0].strip()
|
|
try:
|
|
saved = config.write_camera_preferred_realsense_serial(serial)
|
|
self._send_json(direct_camera_client.set_preferred_camera(saved, timeout=8.0))
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/mic":
|
|
self._send_json({"ok": True, "options": _read_mic_options()})
|
|
return
|
|
|
|
if path == "/api/ai_options":
|
|
self._send_json({"ok": True, "options": _read_ai_options()})
|
|
return
|
|
|
|
if path == "/api/autonomous_state":
|
|
self._send_json({"ok": True, "state": _read_autonomous_state()})
|
|
return
|
|
|
|
if path == "/api/people":
|
|
try:
|
|
self._send_json({"ok": True, "people": people_registry.list_people()})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e), "people": []}, 500)
|
|
return
|
|
|
|
if path == "/api/audio_prompts":
|
|
try:
|
|
self._send_json(
|
|
{
|
|
"ok": True,
|
|
"dir": str(config.AUDIO_PROMPTS_DIR),
|
|
"mode": str(config.read_audio_prompt_mode()),
|
|
"fallback_to_gemini": bool(config.read_audio_prompts_fallback_to_gemini()),
|
|
"prompts": audio_prompts.list_audio_prompts(),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e), "prompts": []}, 500)
|
|
return
|
|
|
|
if path == "/api/audio_prompt_record_status":
|
|
self._send_json({"ok": True, "status": _read_audio_prompt_record_state()})
|
|
return
|
|
|
|
if path == "/api/set_audio_prompt_mode":
|
|
mode = q.get("mode", [""])[0]
|
|
if mode == "":
|
|
self._send_json({"ok": False, "error": "missing mode"}, 400)
|
|
return
|
|
try:
|
|
value = str(config.write_audio_prompt_mode(mode))
|
|
self._send_json({"ok": True, "mode": value})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_audio_prompt_fallback":
|
|
enabled = q.get("enabled", [""])[0]
|
|
if enabled == "":
|
|
self._send_json({"ok": False, "error": "missing enabled"}, 400)
|
|
return
|
|
try:
|
|
value = bool(config.write_audio_prompts_fallback_to_gemini(enabled))
|
|
self._send_json({"ok": True, "fallback_to_gemini": value})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/download_audio_prompt":
|
|
key = q.get("key", [""])[0].strip()
|
|
if not key:
|
|
self._send_json({"ok": False, "error": "missing key"}, 400)
|
|
return
|
|
try:
|
|
prompt_path, body = audio_prompts.read_audio_prompt_bytes(key)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "audio/wav")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.send_header("Content-Disposition", f'attachment; filename="{prompt_path.name}"')
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
except FileNotFoundError:
|
|
self._send_json({"ok": False, "error": "audio prompt not found"}, 404)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/delete_audio_prompt":
|
|
key = q.get("key", [""])[0].strip()
|
|
if not key:
|
|
self._send_json({"ok": False, "error": "missing key"}, 400)
|
|
return
|
|
try:
|
|
self._send_json(audio_prompts.delete_audio_prompt(key))
|
|
except KeyError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/person_image":
|
|
person_id = q.get("id", [""])[0].strip()
|
|
kind = q.get("kind", ["face"])[0].strip().lower()
|
|
if kind not in ("face", "scene"):
|
|
kind = "face"
|
|
data = people_registry.read_person_image(person_id, kind=kind)
|
|
if data is None:
|
|
self._send_json({"ok": False, "error": "person image not found"}, 404)
|
|
return
|
|
body, ctype, filename = data
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", ctype)
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.send_header("Content-Disposition", f'inline; filename="{filename}"')
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
return
|
|
|
|
if path == "/api/delete_person":
|
|
person_id = q.get("id", [""])[0].strip()
|
|
if not person_id:
|
|
self._send_json({"ok": False, "error": "missing id"}, 400)
|
|
return
|
|
try:
|
|
ok = people_registry.delete_person(person_id)
|
|
if not ok:
|
|
self._send_json({"ok": False, "error": "person not found"}, 404)
|
|
return
|
|
self._send_json({"ok": True, "deleted": person_id})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/reset_people":
|
|
try:
|
|
count = int(people_registry.reset_people())
|
|
self._send_json({"ok": True, "deleted": count})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/download_person":
|
|
person_id = q.get("id", [""])[0].strip()
|
|
if not person_id:
|
|
self._send_json({"ok": False, "error": "missing id"}, 400)
|
|
return
|
|
try:
|
|
with tempfile.NamedTemporaryFile(prefix=f"{person_id}_", suffix=".zip", delete=False) as tmp:
|
|
tmp_path = Path(tmp.name)
|
|
archive_path = people_registry.export_person_zip(person_id, tmp_path)
|
|
if archive_path is None or not archive_path.exists():
|
|
try:
|
|
tmp_path.unlink()
|
|
except Exception:
|
|
pass
|
|
self._send_json({"ok": False, "error": "person not found"}, 404)
|
|
return
|
|
body = archive_path.read_bytes()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/zip")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.send_header("Content-Disposition", f'attachment; filename="{person_id}.zip"')
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
try:
|
|
archive_path.unlink()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_mode":
|
|
# set mode via ?mode=ai|manual
|
|
m = q.get("mode", [""])[0]
|
|
if m == "command":
|
|
self._send_json(
|
|
{
|
|
"ok": False,
|
|
"error": "command mode was moved to G1_Lootah/AI_Command",
|
|
},
|
|
410,
|
|
)
|
|
return
|
|
if m not in ("ai", "manual"):
|
|
self._send_json({"ok": False, "error": "invalid mode"}, 400)
|
|
return
|
|
try:
|
|
m = config.write_runtime_mode(m)
|
|
self._send_json({"ok": True, "mode": m, "policy": _mode_policy(m)})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_ai_options":
|
|
try:
|
|
hard_v = q.get("hard_target_lock_enabled", [None])[0]
|
|
retake_v = q.get("retake_prompt_enabled", [None])[0]
|
|
greet_enabled_v = q.get("autonomous_greeting_replay_enabled", [None])[0]
|
|
greet_file_v = q.get("autonomous_greeting_replay_file", [None])[0]
|
|
capture_enabled_v = q.get("autonomous_capture_replay_enabled", [None])[0]
|
|
face_recognition_v = q.get("face_recognition_enabled", [None])[0]
|
|
face_threshold_v = q.get("face_recognition_threshold", [None])[0]
|
|
out = _read_ai_options()
|
|
if hard_v is not None:
|
|
out["hard_target_lock_enabled"] = bool(
|
|
config.write_vision_hard_target_lock_enabled(hard_v)
|
|
)
|
|
if retake_v is not None:
|
|
out["retake_prompt_enabled"] = bool(
|
|
config.write_vision_retake_prompt_enabled(retake_v)
|
|
)
|
|
if greet_enabled_v is not None:
|
|
out["autonomous_greeting_replay_enabled"] = bool(
|
|
config.write_vision_autonomous_greeting_replay_enabled(greet_enabled_v)
|
|
)
|
|
if greet_file_v is not None:
|
|
out["autonomous_greeting_replay_file"] = str(
|
|
config.write_vision_autonomous_greeting_replay_file(greet_file_v)
|
|
)
|
|
if capture_enabled_v is not None:
|
|
out["autonomous_capture_replay_enabled"] = bool(
|
|
config.write_vision_autonomous_capture_replay_enabled(capture_enabled_v)
|
|
)
|
|
if face_recognition_v is not None:
|
|
out["face_recognition_enabled"] = bool(
|
|
config.write_vision_face_recognition_enabled(face_recognition_v)
|
|
)
|
|
if face_threshold_v is not None:
|
|
out["face_recognition_threshold"] = float(
|
|
config.write_vision_face_recognition_threshold(face_threshold_v)
|
|
)
|
|
out["active_replay"] = _read_active_replay_name()
|
|
self._send_json({"ok": True, "options": out})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_mic":
|
|
try:
|
|
enabled_v = q.get("enabled", [None])[0]
|
|
if enabled_v is None:
|
|
self._send_json({"ok": False, "error": "missing enabled"}, 400)
|
|
return
|
|
enabled = bool(config.write_gemini_mic_enabled(enabled_v))
|
|
self._send_json({"ok": True, "options": {"mic_enabled": enabled}})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/mode_policy":
|
|
m = _read_mode()
|
|
self._send_json({"ok": True, "mode": m, "policy": _mode_policy(m)})
|
|
return
|
|
|
|
if path == "/api/set_detector_backend":
|
|
b = q.get("backend", [""])[0]
|
|
if b not in ("normal", "yolo"):
|
|
self._send_json({"ok": False, "error": "invalid backend"}, 400)
|
|
return
|
|
try:
|
|
mode = _read_mode()
|
|
strict_required = _strict_yolo_required()
|
|
if strict_required and mode == "ai" and b != "yolo":
|
|
self._send_json(
|
|
{
|
|
"ok": False,
|
|
"error": "Strict AI profile requires YOLO backend while mode=ai.",
|
|
"backend": _read_detector_backend(),
|
|
"strict_required": True,
|
|
},
|
|
409,
|
|
)
|
|
return
|
|
b = config.write_vision_detector_backend(b)
|
|
self._send_json(
|
|
{
|
|
"ok": True,
|
|
"backend": b,
|
|
"yolo_runtime": str(config.read_vision_yolo_runtime()),
|
|
"options": ["normal", "yolo"],
|
|
"strict_required": strict_required,
|
|
"locked": bool(strict_required and mode == "ai"),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/command_info":
|
|
self._send_json(
|
|
{
|
|
"ok": False,
|
|
"error": "command mode moved",
|
|
"moved_to": "G1_Lootah/AI_Command",
|
|
},
|
|
410,
|
|
)
|
|
return
|
|
|
|
if path == "/api/test":
|
|
res = _realsense_test()
|
|
self._send_json(res, 200 if res.get("ok") else 500)
|
|
return
|
|
|
|
if path == "/api/fix":
|
|
res = _run_fix_script()
|
|
self._send_json(res, 200 if res.get("ok") else 500)
|
|
return
|
|
|
|
if path == "/api/run_scripts_fix":
|
|
# Run the Scripts/fix_realsense_usb.sh with optional mode ?mode=check|fix
|
|
mode = q.get('mode', ['--fix'])[0] or '--fix'
|
|
try:
|
|
scripts_sh = _resolve_fix_script()
|
|
if not scripts_sh:
|
|
self._send_json(
|
|
{"ok": False, "error": "script not found", "checked": [str(p) for p in _fix_script_candidates()]},
|
|
404,
|
|
)
|
|
return
|
|
# run non-interactive
|
|
p = subprocess.run(["bash", str(scripts_sh), mode], capture_output=True, text=True, timeout=60)
|
|
out = (p.stdout or "").strip().splitlines()
|
|
err = (p.stderr or "").strip().splitlines()
|
|
ok = p.returncode == 0
|
|
resp = {"ok": ok, "rc": p.returncode, "stdout": out[-60:], "stderr": err[-60:]}
|
|
self._send_json(resp, 200 if ok else 500)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/capture":
|
|
# Take photo NOW (no remote)
|
|
result = capture_func()
|
|
ok = not result.startswith("[ERR]")
|
|
self._send_json({"ok": ok, "result": result}, 200 if ok else 500)
|
|
return
|
|
|
|
if path == "/api/sessions":
|
|
items = _list_photos(folder)
|
|
sessions = _group_sessions(items)
|
|
self._send_json({"ok": True, "sessions": sessions})
|
|
return
|
|
|
|
if path == "/api/photos":
|
|
items = _list_photos(folder)
|
|
self._send_json({"ok": True, "photos": items})
|
|
return
|
|
|
|
# Request a photo from autonomous mover or external monitor
|
|
if path == "/api/request_photo":
|
|
try:
|
|
# touch a request file for external processes to pick up
|
|
reqf = SCRIPTS_DIR / "request_photo.flag"
|
|
reqf.write_text(str(time.time()))
|
|
self._send_json({"ok": True, "requested": True})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
# Clear all photos (DELETE) - destructive
|
|
if path == "/api/clear_photos":
|
|
try:
|
|
for p in folder.iterdir():
|
|
if _is_photo_file(p):
|
|
try:
|
|
p.unlink()
|
|
except Exception:
|
|
pass
|
|
self._send_json({"ok": True, "cleared": True})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
# Create a ZIP archive of all photos and return path
|
|
if path == "/api/download_zip":
|
|
try:
|
|
import shutil
|
|
zip_base = folder / "_archives"
|
|
zip_base.mkdir(parents=True, exist_ok=True)
|
|
ts = int(time.time())
|
|
archive_name = str(zip_base / f"photos_{ts}")
|
|
shutil.make_archive(archive_name, 'zip', root_dir=str(folder))
|
|
archive_file = archive_name + '.zip'
|
|
# serve relative URL
|
|
rel = os.path.relpath(archive_file, start=folder)
|
|
self._send_json({"ok": True, "archive": rel})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
# Trigger uploader now by touching a flag file
|
|
if path == "/api/upload_now":
|
|
try:
|
|
upf = SCRIPTS_DIR / "upload_now.flag"
|
|
upf.write_text(str(time.time()))
|
|
self._send_json({"ok": True, "upload_now": True})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/delete":
|
|
name = _safe_name(q.get("name", [""])[0])
|
|
if not name:
|
|
self._send_json({"ok": False, "error": "missing name"}, 400)
|
|
return
|
|
target = (folder / name).resolve()
|
|
|
|
# ensure inside folder
|
|
try:
|
|
target.relative_to(folder)
|
|
except Exception:
|
|
self._send_json({"ok": False, "error": "invalid path"}, 400)
|
|
return
|
|
|
|
if not target.exists():
|
|
self._send_json({"ok": False, "error": "not found"}, 404)
|
|
return
|
|
|
|
try:
|
|
target.unlink()
|
|
self._send_json({"ok": True, "deleted": name}, 200)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/retake":
|
|
# Trigger a capture and return result. Optionally accept session_id to associate.
|
|
session_id = q.get("session_id", [None])[0]
|
|
result = capture_func()
|
|
ok = not result.startswith("[ERR]")
|
|
self._send_json({"ok": ok, "result": result, "session_id": session_id}, 200 if ok else 500)
|
|
return
|
|
|
|
if path == "/api/reupload":
|
|
# Allow manual reupload: clears uploaded flag in upload DB so the uploader daemon retries
|
|
name = _safe_name(q.get("name", [""])[0])
|
|
if not name:
|
|
self._send_json({"ok": False, "error": "missing name"}, 400)
|
|
return
|
|
dbp = Path(config.UPLOAD_DB)
|
|
if dbp.exists():
|
|
try:
|
|
d = json.loads(dbp.read_text())
|
|
except Exception:
|
|
d = {}
|
|
else:
|
|
d = {}
|
|
# clear uploaded marker
|
|
entry = d.get(name, {})
|
|
entry["uploaded"] = False
|
|
d[name] = entry
|
|
try:
|
|
dbp.write_text(json.dumps(d, indent=2))
|
|
self._send_json({"ok": True, "requeued": name})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
# Replay management endpoints
|
|
if path == "/api/replays":
|
|
try:
|
|
items = _list_replays()
|
|
self._send_json({"ok": True, "replays": [it["name"] for it in items], "items": items})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/replay_record_status":
|
|
self._send_json({"ok": True, "status": _read_recorder_state()})
|
|
return
|
|
|
|
if path == "/api/replay_test_status":
|
|
self._send_json({"ok": True, "status": _read_replay_test_state()})
|
|
return
|
|
|
|
if path == "/api/replay_record_start":
|
|
try:
|
|
if _read_mode() != "manual":
|
|
self._send_json({"ok": False, "error": "replay recording is allowed only in manual mode"}, 409)
|
|
return
|
|
name = _normalize_replay_name(q.get("name", [""])[0])
|
|
seconds = float(q.get("seconds", ["15"])[0])
|
|
if seconds <= 0:
|
|
self._send_json({"ok": False, "error": "seconds must be positive"}, 400)
|
|
return
|
|
cur = _read_recorder_state()
|
|
if cur.get("running"):
|
|
self._send_json({"ok": False, "error": "recorder already running", "status": cur}, 409)
|
|
return
|
|
preview_path = _resolve_replay_path(name)
|
|
preview_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if preview_path.exists():
|
|
self._send_json({"ok": False, "error": "replay already exists"}, 409)
|
|
return
|
|
preview_output = str(preview_path)
|
|
_set_recorder_state(
|
|
running=True,
|
|
ok=False,
|
|
error="",
|
|
name=name,
|
|
seconds=float(seconds),
|
|
output=preview_output,
|
|
returncode=None,
|
|
started_at=time.time(),
|
|
finished_at=0.0,
|
|
stdout_tail=[],
|
|
)
|
|
threading.Thread(target=_run_replay_recorder, args=(name, seconds), daemon=True).start()
|
|
self._send_json({"ok": True, "status": _read_recorder_state()})
|
|
except ValueError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/test_replay":
|
|
try:
|
|
if _read_mode() != "manual":
|
|
self._send_json({"ok": False, "error": "replay test is allowed only in manual mode"}, 409)
|
|
return
|
|
if replay_test_func is None:
|
|
self._send_json({"ok": False, "error": "replay test is unavailable in this runtime"}, 503)
|
|
return
|
|
name = _normalize_replay_name(q.get("name", [""])[0])
|
|
candidate = _resolve_replay_path(name)
|
|
if not candidate.exists():
|
|
self._send_json({"ok": False, "error": "replay not found"}, 404)
|
|
return
|
|
cur = _read_replay_test_state()
|
|
if cur.get("running"):
|
|
self._send_json({"ok": False, "error": "replay test already running", "status": cur}, 409)
|
|
return
|
|
_set_replay_test_state(
|
|
running=True,
|
|
ok=False,
|
|
error="",
|
|
name=name,
|
|
result="",
|
|
started_at=time.time(),
|
|
finished_at=0.0,
|
|
)
|
|
threading.Thread(target=_run_replay_test, args=(name, replay_test_func), daemon=True).start()
|
|
self._send_json({"ok": True, "status": _read_replay_test_state()})
|
|
except ValueError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/delete_replay":
|
|
try:
|
|
name = q.get("name", [""])[0]
|
|
candidate = _resolve_replay_path(name)
|
|
if not candidate.exists():
|
|
self._send_json({"ok": False, "error": "replay not found"}, 404)
|
|
return
|
|
active_name = _read_active_replay_name()
|
|
rel = str(candidate.resolve().relative_to(config.DATA_DIR)).replace("\\", "/")
|
|
if rel == active_name:
|
|
self._send_json({"ok": False, "error": "cannot delete the active replay"}, 409)
|
|
return
|
|
if _is_greeting_replay(rel):
|
|
self._send_json({"ok": False, "error": "cannot delete the configured AI greeting replay"}, 409)
|
|
return
|
|
candidate.unlink()
|
|
self._send_json({"ok": True, "deleted": rel})
|
|
except ValueError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/rename_replay":
|
|
try:
|
|
old_name = q.get("old", [""])[0]
|
|
new_name = q.get("new", [""])[0]
|
|
old_candidate = _resolve_replay_path(old_name)
|
|
if not old_candidate.exists():
|
|
self._send_json({"ok": False, "error": "replay not found"}, 404)
|
|
return
|
|
old_rel = str(old_candidate.resolve().relative_to(config.DATA_DIR)).replace("\\", "/")
|
|
old_parent = Path(old_rel).parent
|
|
default_parent = None if str(old_parent) == "." else old_parent.as_posix()
|
|
new_rel = _normalize_replay_name(new_name, default_parent=default_parent)
|
|
new_candidate = _resolve_replay_path(new_rel)
|
|
if new_candidate.exists():
|
|
self._send_json({"ok": False, "error": "target replay already exists"}, 409)
|
|
return
|
|
new_candidate.parent.mkdir(parents=True, exist_ok=True)
|
|
old_candidate.rename(new_candidate)
|
|
_update_replay_references_after_rename(old_rel, new_rel, new_candidate)
|
|
self._send_json({"ok": True, "old": old_rel, "new": new_rel})
|
|
except ValueError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/download_replay":
|
|
try:
|
|
name = q.get("name", [""])[0]
|
|
candidate = _resolve_replay_path(name)
|
|
if not candidate.exists():
|
|
self._send_json({"ok": False, "error": "replay not found"}, 404)
|
|
return
|
|
data = candidate.read_bytes()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.send_header("Content-Disposition", f'attachment; filename="{candidate.name}"')
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
except ValueError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/scripts":
|
|
try:
|
|
scripts_dir = DATA_SCRIPTS_DIR
|
|
scripts_dir.mkdir(parents=True, exist_ok=True)
|
|
files = [
|
|
p.name
|
|
for p in sorted(scripts_dir.iterdir())
|
|
if p.is_file() and p.suffix.lower() in (".txt", ".json", ".jsonl")
|
|
]
|
|
self._send_json({"ok": True, "scripts": files})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/script_content":
|
|
# return raw text of a script by name (in Scripts/)
|
|
name = q.get('name', [None])[0]
|
|
if not name:
|
|
self._send_json({"ok": False, "error": "missing name"}, 400)
|
|
return
|
|
try:
|
|
scripts_dir = DATA_SCRIPTS_DIR
|
|
candidate = (scripts_dir / _safe_name(name)).resolve()
|
|
try:
|
|
candidate.relative_to(scripts_dir)
|
|
except Exception:
|
|
self._send_json({"ok": False, "error": "invalid path"}, 400)
|
|
return
|
|
if not candidate.exists():
|
|
self._send_json({"ok": False, "error": "not found"}, 404)
|
|
return
|
|
txt = candidate.read_text(encoding='utf-8')
|
|
self._send_json({"ok": True, "script": txt})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/get_replay":
|
|
try:
|
|
cur = _read_active_replay_name()
|
|
self._send_json({"ok": True, "replay": cur})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/sanad_script":
|
|
try:
|
|
txt = _read_text_file(config.SANAD_SCRIPT_FILE) or ""
|
|
self._send_json({"ok": True, "script": txt})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_replay":
|
|
# set via ?name=filename.jsonl
|
|
name = q.get("name", [""])[0]
|
|
if not name:
|
|
self._send_json({"ok": False, "error": "missing name"}, 400)
|
|
return
|
|
try:
|
|
candidate = _resolve_replay_path(name)
|
|
except ValueError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
return
|
|
if not candidate.exists():
|
|
self._send_json({"ok": False, "error": "replay not found"}, 404)
|
|
return
|
|
integrity = replay_file_integrity(candidate)
|
|
if not integrity.get("ok"):
|
|
self._send_json(
|
|
{"ok": False, "error": "invalid replay file", "integrity": integrity},
|
|
400,
|
|
)
|
|
return
|
|
try:
|
|
rel_name = str(candidate.resolve().relative_to(config.DATA_DIR)).replace("\\", "/")
|
|
config.write_selected_replay_name(rel_name)
|
|
config.REPLAY_FILE = candidate
|
|
resp = {"ok": True, "set": rel_name}
|
|
if not integrity.get("trigger_ok", False):
|
|
resp["warning"] = "replay has no trigger markers; timed capture fallback will be used"
|
|
resp["integrity"] = integrity
|
|
self._send_json(resp)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
# MJPEG preview stream
|
|
if path == "/preview.mjpg":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=--frame")
|
|
self.end_headers()
|
|
try:
|
|
while True:
|
|
frame = _get_preview_frame()
|
|
if frame is None:
|
|
time.sleep(0.1)
|
|
continue
|
|
cv2 = _get_cv2()
|
|
ret, jpg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 65])
|
|
if not ret:
|
|
time.sleep(0.05)
|
|
continue
|
|
data = jpg.tobytes()
|
|
self.wfile.write(b"--frame\r\n")
|
|
self.wfile.write(b"Content-Type: image/jpeg\r\n")
|
|
self.wfile.write(f"Content-Length: {len(data)}\r\n\r\n".encode('utf-8'))
|
|
self.wfile.write(data)
|
|
self.wfile.write(b"\r\n")
|
|
self.wfile.flush()
|
|
# small throttle
|
|
time.sleep(0.08)
|
|
except Exception as e:
|
|
record_error(
|
|
"photo_server",
|
|
"preview_stream",
|
|
e,
|
|
{"client": self.client_address[0] if self.client_address else "unknown"},
|
|
)
|
|
return
|
|
|
|
|
|
# Gallery page: serve gallery.html (client JS renders cards)
|
|
if path == "/":
|
|
tpl = _read_text_file(GALLERY_HTML)
|
|
if tpl is None:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
return
|
|
html = tpl.replace("{{PHOTOS_DIR}}", str(folder))
|
|
# leave PHOTO_CARDS and MODE_TOGGLE for client-side JS to populate
|
|
self._send_text(html, 200, "text/html; charset=utf-8")
|
|
return
|
|
|
|
# Otherwise: serve static files from photos/ (download/open)
|
|
return super().do_GET()
|
|
|
|
def do_POST(self):
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
path = parsed.path
|
|
# Only used to update sanad script content currently
|
|
if path == "/api/sanad_script":
|
|
try:
|
|
length = int(self.headers.get('Content-Length', '0'))
|
|
body = self.rfile.read(length) if length else b""
|
|
# accept raw text or JSON {"script": "..."}
|
|
try:
|
|
payload = json.loads(body.decode('utf-8'))
|
|
script = payload.get('script', '')
|
|
except Exception:
|
|
script = body.decode('utf-8')
|
|
# write to SANAD_SCRIPT_FILE
|
|
try:
|
|
config.SANAD_SCRIPT_FILE.write_text(script, encoding='utf-8')
|
|
self._send_json({"ok": True, "written": True})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/audio_prompt_record":
|
|
try:
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
body = self.rfile.read(length) if length else b""
|
|
payload = json.loads(body.decode("utf-8")) if body else {}
|
|
if not isinstance(payload, dict):
|
|
self._send_json({"ok": False, "error": "invalid payload"}, 400)
|
|
return
|
|
key = str(payload.get("key", "") or "").strip()
|
|
text = str(payload.get("text", "") or "").strip()
|
|
filename = str(payload.get("filename", "") or "").strip()
|
|
if not key:
|
|
self._send_json({"ok": False, "error": "missing key"}, 400)
|
|
return
|
|
if not text:
|
|
self._send_json({"ok": False, "error": "missing text"}, 400)
|
|
return
|
|
cur = _read_audio_prompt_record_state()
|
|
if cur.get("running"):
|
|
self._send_json({"ok": False, "error": "audio prompt recorder already running", "status": cur}, 409)
|
|
return
|
|
default_filename = audio_prompts.prompt_filename(key)
|
|
_set_audio_prompt_record_state(
|
|
running=True,
|
|
key=key,
|
|
filename=filename or default_filename,
|
|
text=text,
|
|
ok=False,
|
|
error="",
|
|
started_at=time.time(),
|
|
finished_at=0.0,
|
|
result={},
|
|
)
|
|
threading.Thread(
|
|
target=_run_audio_prompt_record,
|
|
args=(key, text, filename or default_filename),
|
|
daemon=True,
|
|
).start()
|
|
self._send_json({"ok": True, "status": _read_audio_prompt_record_state()})
|
|
except (ValueError, KeyError) as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/upload_person":
|
|
try:
|
|
form = cgi.FieldStorage(
|
|
fp=self.rfile,
|
|
headers=self.headers,
|
|
environ={
|
|
"REQUEST_METHOD": "POST",
|
|
"CONTENT_TYPE": self.headers.get("Content-Type", ""),
|
|
},
|
|
)
|
|
if "file" not in form:
|
|
self._send_json({"ok": False, "error": "missing file"}, 400)
|
|
return
|
|
file_item = form["file"]
|
|
if not getattr(file_item, "file", None):
|
|
self._send_json({"ok": False, "error": "missing file payload"}, 400)
|
|
return
|
|
data = file_item.file.read()
|
|
if not data:
|
|
self._send_json({"ok": False, "error": "empty upload"}, 400)
|
|
return
|
|
person_id = str(form.getfirst("person_id", "") or "").strip()
|
|
filename = str(getattr(file_item, "filename", "") or "")
|
|
result = people_registry.import_person_photo(data, filename=filename, person_id=person_id)
|
|
ok = bool(result.get("ok", False))
|
|
self._send_json(result, 200 if ok else 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/upload_audio_prompt":
|
|
try:
|
|
form = cgi.FieldStorage(
|
|
fp=self.rfile,
|
|
headers=self.headers,
|
|
environ={
|
|
"REQUEST_METHOD": "POST",
|
|
"CONTENT_TYPE": self.headers.get("Content-Type", ""),
|
|
},
|
|
)
|
|
key = str(form.getfirst("key", "") or "").strip()
|
|
if not key:
|
|
self._send_json({"ok": False, "error": "missing key"}, 400)
|
|
return
|
|
if "file" not in form:
|
|
self._send_json({"ok": False, "error": "missing file"}, 400)
|
|
return
|
|
file_item = form["file"]
|
|
if not getattr(file_item, "file", None):
|
|
self._send_json({"ok": False, "error": "missing file payload"}, 400)
|
|
return
|
|
data = file_item.file.read()
|
|
if not data:
|
|
self._send_json({"ok": False, "error": "empty upload"}, 400)
|
|
return
|
|
filename = str(form.getfirst("filename", "") or getattr(file_item, "filename", "") or "")
|
|
result = audio_prompts.save_audio_prompt(key, data, filename)
|
|
self._send_json(result, 200 if bool(result.get("ok")) else 400)
|
|
except (KeyError, ValueError) as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/upload_replay":
|
|
try:
|
|
form = cgi.FieldStorage(
|
|
fp=self.rfile,
|
|
headers=self.headers,
|
|
environ={
|
|
"REQUEST_METHOD": "POST",
|
|
"CONTENT_TYPE": self.headers.get("Content-Type", ""),
|
|
},
|
|
)
|
|
if "file" not in form:
|
|
self._send_json({"ok": False, "error": "missing file"}, 400)
|
|
return
|
|
file_item = form["file"]
|
|
if not getattr(file_item, "file", None):
|
|
self._send_json({"ok": False, "error": "missing file payload"}, 400)
|
|
return
|
|
raw_name = form.getfirst("name", "") or getattr(file_item, "filename", "") or "uploaded_replay.jsonl"
|
|
rel_name = _normalize_replay_name(raw_name)
|
|
candidate = _resolve_replay_path(rel_name)
|
|
if candidate.exists():
|
|
self._send_json({"ok": False, "error": "replay already exists"}, 409)
|
|
return
|
|
candidate.parent.mkdir(parents=True, exist_ok=True)
|
|
data = file_item.file.read()
|
|
if not data:
|
|
self._send_json({"ok": False, "error": "empty upload"}, 400)
|
|
return
|
|
candidate.write_bytes(data)
|
|
integrity = replay_file_integrity(candidate)
|
|
if not integrity.get("ok"):
|
|
try:
|
|
candidate.unlink()
|
|
except Exception:
|
|
pass
|
|
self._send_json({"ok": False, "error": "invalid replay file", "integrity": integrity}, 400)
|
|
return
|
|
self._send_json({"ok": True, "saved": rel_name, "integrity": integrity})
|
|
except ValueError as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 400)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
# Upload / manage SANAD scripts
|
|
if path == "/api/upload_script":
|
|
try:
|
|
# Accept JSON {"filename": "name.txt", "script": "..."} or raw text with ?filename=...
|
|
length = int(self.headers.get('Content-Length', '0'))
|
|
body = self.rfile.read(length) if length else b""
|
|
try:
|
|
payload = json.loads(body.decode('utf-8'))
|
|
filename = payload.get('filename')
|
|
script = payload.get('script', '')
|
|
except Exception:
|
|
filename = urllib.parse.parse_qs(parsed.query).get('filename', [None])[0]
|
|
script = body.decode('utf-8')
|
|
if not filename:
|
|
self._send_json({"ok": False, "error": "missing filename"}, 400)
|
|
return
|
|
scripts_dir = DATA_SCRIPTS_DIR
|
|
scripts_dir.mkdir(parents=True, exist_ok=True)
|
|
target = (scripts_dir / _safe_name(filename)).resolve()
|
|
try:
|
|
target.relative_to(scripts_dir)
|
|
except Exception:
|
|
self._send_json({"ok": False, "error": "invalid filename"}, 400)
|
|
return
|
|
target.write_text(script, encoding='utf-8')
|
|
self._send_json({"ok": True, "saved": target.name})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/delete_script":
|
|
try:
|
|
length = int(self.headers.get('Content-Length', '0'))
|
|
body = self.rfile.read(length) if length else b""
|
|
try:
|
|
payload = json.loads(body.decode('utf-8'))
|
|
name = payload.get('name')
|
|
except Exception:
|
|
# allow raw name in body
|
|
name = body.decode('utf-8').strip()
|
|
if not name:
|
|
self._send_json({"ok": False, "error": "missing name"}, 400)
|
|
return
|
|
scripts_dir = DATA_SCRIPTS_DIR
|
|
target = (scripts_dir / _safe_name(name)).resolve()
|
|
try:
|
|
target.relative_to(scripts_dir)
|
|
except Exception:
|
|
self._send_json({"ok": False, "error": "invalid path"}, 400)
|
|
return
|
|
if not target.exists():
|
|
self._send_json({"ok": False, "error": "not found"}, 404)
|
|
return
|
|
target.unlink()
|
|
self._send_json({"ok": True, "deleted": name})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
if path == "/api/set_sanad_script":
|
|
# Accept ?name=filename or JSON {"name":"..."}
|
|
try:
|
|
qs = urllib.parse.parse_qs(parsed.query)
|
|
name = qs.get('name', [None])[0]
|
|
if not name:
|
|
length = int(self.headers.get('Content-Length', '0'))
|
|
body = self.rfile.read(length) if length else b""
|
|
try:
|
|
payload = json.loads(body.decode('utf-8'))
|
|
name = payload.get('name')
|
|
except Exception:
|
|
name = None
|
|
if not name:
|
|
self._send_json({"ok": False, "error": "missing name"}, 400)
|
|
return
|
|
scripts_dir = DATA_SCRIPTS_DIR
|
|
candidate = (scripts_dir / _safe_name(name)).resolve()
|
|
try:
|
|
candidate.relative_to(scripts_dir)
|
|
except Exception:
|
|
self._send_json({"ok": False, "error": "invalid script path"}, 400)
|
|
return
|
|
if not candidate.exists():
|
|
self._send_json({"ok": False, "error": "script not found"}, 404)
|
|
return
|
|
# copy contents into SANAD_SCRIPT_FILE
|
|
try:
|
|
txt = candidate.read_text(encoding='utf-8')
|
|
config.SANAD_SCRIPT_FILE.write_text(txt, encoding='utf-8')
|
|
self._send_json({"ok": True, "set": candidate.name})
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
except Exception as e:
|
|
self._send_json({"ok": False, "error": str(e)}, 500)
|
|
return
|
|
|
|
# fallback
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
try:
|
|
ThreadingHTTPServer.allow_reuse_address = True
|
|
httpd = ThreadingHTTPServer((bind, port), Handler)
|
|
except OSError as e:
|
|
_log(lg, f"❌ Photo server failed: {e}", "error")
|
|
# Assume another server already running
|
|
_log(lg, f"🖼️ Phone gallery (existing): {url}", "info")
|
|
return url
|
|
|
|
def _serve():
|
|
try:
|
|
httpd.serve_forever()
|
|
except Exception:
|
|
pass
|
|
|
|
t = threading.Thread(target=_serve, daemon=True)
|
|
t.start()
|
|
|
|
|
|
_log(lg, f"🖼️ Photo server serving {folder} on {url}", "info")
|
|
return url
|
|
|
|
|
|
# ============================================================
|
|
# CLI mode (optional)
|
|
# ============================================================
|
|
if __name__ == "__main__":
|
|
photos = Path(os.environ.get("PHOTOS_DIR", str(DEFAULT_PHOTOS_DIR))).resolve()
|
|
port = int(os.environ.get("PHOTO_SERVER_PORT", "8080"))
|
|
start_photo_server(photos, port=port)
|
|
print("Server running. Press Ctrl+C to stop.")
|
|
try:
|
|
while True:
|
|
time.sleep(3600)
|
|
except KeyboardInterrupt:
|
|
pass
|