#!/usr/bin/env python3 """ photo_server.py --------------- HTTP photo gallery server for /home/unitree/photographer/photos ✅ Features: - Phone/Laptop gallery UI: GET / - Serves photos directly (open/download): GET / - Delete photo: GET /api/delete?name= - Capture photo now (without remote): GET /api/capture - Camera status: GET /api/status - Camera test (RealSense enumerate via teleimager python): GET /api/test - Run RealSense USB fix script: GET /api/fix - CSS/HTML split: Scripts/gallery.html Scripts/style.css Served as: / -> gallery.html template (photo list injected) /static/style.css -> style.css file ⚠️ Security note: - This is an open LAN server (anyone on same WiFi can view/download/delete). Use only on trusted networks. Works well with your structure (no new folders). """ from __future__ import annotations import json import os import socket import threading import time import urllib.parse import subprocess from pathlib import Path from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from typing import Optional, Callable, Dict, Any, List try: from Logger import Logs except Exception: Logs = None # type: ignore # ============================================================ # Defaults / Paths # ============================================================ BASE_DIR = Path(__file__).resolve().parent DEFAULT_PHOTOS_DIR = (BASE_DIR / "photos").resolve() SCRIPTS_DIR = (BASE_DIR / "Scripts").resolve() GALLERY_HTML = SCRIPTS_DIR / "gallery.html" STYLE_CSS = SCRIPTS_DIR / "style.css" FIX_SCRIPT = (BASE_DIR / "fix_realsense_usb.sh").resolve() # Teleimager python (Py3.10) + capture script TELE_PY = os.environ.get("TELEIMAGER_PY", "/home/unitree/miniconda3/envs/teleimager/bin/python") TAKE_PHOTO_PY = str((BASE_DIR / "take_photo.py").resolve()) # ============================================================ # Logging helpers # ============================================================ def _make_logger(log_name: str = "photo_server.log"): if Logs is None: return None lg = Logs() try: lg.LogEngine("G1_Logs", log_name) except Exception: pass return lg def _log(lg, msg: str, level: str = "info"): # level: info / warning / error if lg is not None: try: lg.print_and_log(msg, message_type=level) return except Exception: pass print(msg) # ============================================================ # Utilities # ============================================================ def _get_local_ip() -> str: """ Best-effort IP that phones can reach (same WiFi). """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "127.0.0.1" def _safe_name(name: str) -> str: """ Prevent path traversal: keep only filename (no dirs). """ name = name.strip().replace("\\", "/") name = name.split("/")[-1] return name def _is_photo_file(p: Path) -> bool: if not p.is_file(): return False ext = p.suffix.lower() return ext in (".jpg", ".jpeg", ".png", ".webp") def _list_photos(folder: Path) -> List[Dict[str, Any]]: items: List[Dict[str, Any]] = [] folder.mkdir(parents=True, exist_ok=True) for p in sorted(folder.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if not _is_photo_file(p): continue st = p.stat() items.append( { "name": p.name, "size": st.st_size, "mtime": st.st_mtime, } ) return items def _read_text_file(path: Path) -> Optional[str]: try: return path.read_text(encoding="utf-8") except Exception: return None def _read_bytes_file(path: Path) -> Optional[bytes]: try: return path.read_bytes() except Exception: return None # ============================================================ # Capture/Test/Fix implementations # ============================================================ def _capture_via_teleimager_python() -> str: """ Capture a photo by running take_photo.py with teleimager python. Returns: saved path string OR compact error string. """ try: env = os.environ.copy() env.setdefault("PHOTOS_DIR", str(DEFAULT_PHOTOS_DIR)) env.setdefault("PHOTO_PREFIX", "photo") env.setdefault("TELEIMAGER_HOST", "127.0.0.1") env.setdefault("ZMQ_PORT", os.environ.get("ZMQ_PORT", "55555")) env.setdefault("FRAME_TIMEOUT_S", os.environ.get("FRAME_TIMEOUT_S", "3.0")) p = subprocess.run( [TELE_PY, TAKE_PHOTO_PY], capture_output=True, text=True, env=env, timeout=20, ) out = (p.stdout or "").strip().splitlines() err = (p.stderr or "").strip().splitlines() last = out[-1].strip() if out else "" if p.returncode == 0 and last: return last msg = f"[ERR] capture failed rc={p.returncode}" if out: msg += "\n" + "\n".join(out[-8:]) if err: msg += "\n" + "\n".join(err[-8:]) return msg except Exception as e: return f"[ERR] capture exception: {e}" def _realsense_test() -> Dict[str, Any]: """ Quick test using teleimager python: - import pyrealsense2 - count devices """ try: code = ( "import pyrealsense2 as rs\n" "ctx = rs.context()\n" "devs = list(ctx.query_devices())\n" "print(len(devs))\n" "print([d.get_info(rs.camera_info.serial_number) for d in devs])\n" ) p = subprocess.run( [TELE_PY, "-c", code], capture_output=True, text=True, timeout=10, ) out = (p.stdout or "").strip().splitlines() err = (p.stderr or "").strip() if p.returncode != 0: return {"ok": False, "rc": p.returncode, "error": err or "realsense test failed", "stdout": out[-20:]} count = int(out[0]) if out else 0 serials = [] if len(out) >= 2: # second line prints python list serials = out[1] return {"ok": True, "devices": count, "serials": serials, "rc": 0} except Exception as e: return {"ok": False, "error": str(e)} def _run_fix_script() -> Dict[str, Any]: """ Runs fix_realsense_usb.sh in NON-INTERACTIVE mode. If sudo password is required, it fails gracefully (no terminal prompt). """ if not FIX_SCRIPT.exists(): return {"ok": False, "error": f"fix script not found: {FIX_SCRIPT}"} try: # IMPORTANT: -n = non-interactive (won't prompt for password) p = subprocess.run( ["sudo", "-n", "bash", str(FIX_SCRIPT)], capture_output=True, text=True, timeout=40, ) out = (p.stdout or "").strip().splitlines()[-80:] err = (p.stderr or "").strip().splitlines()[-80:] if p.returncode == 0: return {"ok": True, "rc": 0, "stdout": out, "stderr": err} # Most common case: sudo needs password joined_err = "\n".join(err).lower() if "a password is required" in joined_err or "sudo:" in joined_err: return { "ok": False, "rc": p.returncode, "error": "sudo password required. Run manually in terminal: sudo ./fix_realsense_usb.sh", "stdout": out, "stderr": err, } return {"ok": False, "rc": p.returncode, "stdout": out, "stderr": err} except Exception as e: return {"ok": False, "error": str(e)} # ============================================================ # HTML template fallback (if Scripts/gallery.html missing) # ============================================================ _FALLBACK_GALLERY_HTML = """ Sanad Photos

📸 Sanad Photos

Photos folder: {{PHOTOS_DIR}}
{{PHOTO_CARDS}}
""" _FALLBACK_STYLE_CSS = """\ :root{--border:#ddd;--bg:#fff;--muted:#666} body{font-family:Arial,sans-serif;padding:12px;max-width:900px;margin:auto;background:#fafafa} h2{margin:8px 0 14px} .toolbar{margin-bottom:12px;display:flex;gap:10px;flex-wrap:wrap} .toolbar a{text-decoration:none;color:#0b5fff;background:var(--bg);padding:6px 10px;border:1px solid var(--border);border-radius:8px} .meta{color:var(--muted);margin:10px 0 14px} .card{margin:10px 0;padding:10px;border:1px solid var(--border);border-radius:10px;background:var(--bg)} .card .name{font-weight:700} .card .actions{margin-top:6px} .card .actions a{text-decoration:none;color:#0b5fff} .card img{margin-top:10px;max-width:100%;height:auto;border-radius:10px} """ # ============================================================ # Main server factory # ============================================================ def start_photo_server( folder: Path = DEFAULT_PHOTOS_DIR, port: int = 8080, bind: str = "0.0.0.0", capture_func: Optional[Callable[[], str]] = None, logger=None, ) -> str: """ Starts the gallery server in a daemon thread. Returns the URL (best-effort reachable IP). If port is already in use, it logs and returns the URL anyway (assumes old server is running). """ lg = logger or _make_logger("photo_server.log") folder = folder.resolve() folder.mkdir(parents=True, exist_ok=True) ip = _get_local_ip() url = f"http://{ip}:{port}/" if capture_func is None: capture_func = _capture_via_teleimager_python class Handler(SimpleHTTPRequestHandler): # Serve files from PHOTOS folder by default def __init__(self, *args, **kwargs): super().__init__(*args, directory=str(folder), **kwargs) def log_message(self, fmt, *args): # keep phone requests visible (and not too noisy) print(f"{self.client_address[0]} - - [{self.log_date_time_string()}] {fmt % args}") def _send_json(self, obj: Dict[str, Any], code: int = 200): data = json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _send_text(self, text: str, code: int = 200, ctype: str = "text/plain; charset=utf-8"): data = text.encode("utf-8") self.send_response(code) self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path q = urllib.parse.parse_qs(parsed.query) # Ignore favicon noise if path == "/favicon.ico": self.send_response(204) self.end_headers() return # Serve CSS from Scripts/style.css (or fallback) if path == "/static/style.css": css = _read_bytes_file(STYLE_CSS) if css is None: css = _FALLBACK_STYLE_CSS.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/css; charset=utf-8") self.send_header("Content-Length", str(len(css))) self.end_headers() self.wfile.write(css) return # API endpoints if path == "/api/status": items = _list_photos(folder) res = { "ok": True, "photos_dir": str(folder), "count": len(items), "latest": items[0]["name"] if items else None, "url": url, "time": time.strftime("%Y-%m-%d %H:%M:%S"), } self._send_json(res, 200) return if path == "/api/test": res = _realsense_test() self._send_json(res, 200 if res.get("ok") else 500) return if path == "/api/fix": res = _run_fix_script() self._send_json(res, 200 if res.get("ok") else 500) return if path == "/api/capture": # Take photo NOW (no remote) result = capture_func() ok = not result.startswith("[ERR]") self._send_json({"ok": ok, "result": result}, 200 if ok else 500) return if path == "/api/delete": name = _safe_name(q.get("name", [""])[0]) if not name: self._send_json({"ok": False, "error": "missing name"}, 400) return target = (folder / name).resolve() # ensure inside folder try: target.relative_to(folder) except Exception: self._send_json({"ok": False, "error": "invalid path"}, 400) return if not target.exists(): self._send_json({"ok": False, "error": "not found"}, 404) return try: target.unlink() self._send_json({"ok": True, "deleted": name}, 200) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) return # Gallery page if path == "/": items = _list_photos(folder) cards = [] for it in items: n = it["name"] n_url = urllib.parse.quote(n) cards.append( f"""
{n}
""" ) cards_html = "".join(cards) if cards else "No photos yet." tpl = _read_text_file(GALLERY_HTML) or _FALLBACK_GALLERY_HTML html = ( tpl.replace("{{PHOTOS_DIR}}", str(folder)) .replace("{{PHOTO_CARDS}}", cards_html) ) self._send_text(html, 200, "text/html; charset=utf-8") return # Otherwise: serve static files from photos/ (download/open) return super().do_GET() try: ThreadingHTTPServer.allow_reuse_address = True httpd = ThreadingHTTPServer((bind, port), Handler) except OSError as e: _log(lg, f"❌ Photo server failed: {e}", "error") # Assume another server already running _log(lg, f"🖼️ Phone gallery (existing): {url}", "info") return url def _serve(): try: httpd.serve_forever() except Exception: pass t = threading.Thread(target=_serve, daemon=True) t.start() _log(lg, f"🖼️ Photo server serving {folder} on {url}", "info") return url # ============================================================ # CLI mode (optional) # ============================================================ if __name__ == "__main__": photos = Path(os.environ.get("PHOTOS_DIR", str(DEFAULT_PHOTOS_DIR))).resolve() port = int(os.environ.get("PHOTO_SERVER_PORT", "8080")) start_photo_server(photos, port=port) print("Server running. Press Ctrl+C to stop.") try: while True: time.sleep(3600) except KeyboardInterrupt: pass