Manual_Photographer/photo_server.py
2026-04-12 18:53:20 +04:00

515 lines
17 KiB
Python

#!/usr/bin/env python3
"""
photo_server.py
---------------
HTTP photo gallery server for /home/unitree/photographer/photos
✅ Features:
- Phone/Laptop gallery UI: GET /
- Serves photos directly (open/download): GET /<filename>
- Delete photo: GET /api/delete?name=<filename>
- Capture photo now (without remote): GET /api/capture
- Camera status: GET /api/status
- Camera test (RealSense enumerate via teleimager python): GET /api/test
- Run RealSense USB fix script: GET /api/fix
- CSS/HTML split:
Scripts/gallery.html
Scripts/style.css
Served as:
/ -> gallery.html template (photo list injected)
/static/style.css -> style.css file
⚠️ Security note:
- This is an open LAN server (anyone on same WiFi can view/download/delete).
Use only on trusted networks.
Works well with your structure (no new folders).
"""
from __future__ import annotations
import json
import os
import socket
import threading
import time
import urllib.parse
import subprocess
from pathlib import Path
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from typing import Optional, Callable, Dict, Any, List
try:
from Logger import Logs
except Exception:
Logs = None # type: ignore
# ============================================================
# Defaults / Paths
# ============================================================
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_PHOTOS_DIR = (BASE_DIR / "photos").resolve()
SCRIPTS_DIR = (BASE_DIR / "Scripts").resolve()
GALLERY_HTML = SCRIPTS_DIR / "gallery.html"
STYLE_CSS = SCRIPTS_DIR / "style.css"
FIX_SCRIPT = (BASE_DIR / "fix_realsense_usb.sh").resolve()
# Teleimager python (Py3.10) + capture script
TELE_PY = os.environ.get("TELEIMAGER_PY", "/home/unitree/miniconda3/envs/teleimager/bin/python")
TAKE_PHOTO_PY = str((BASE_DIR / "take_photo.py").resolve())
# ============================================================
# Logging helpers
# ============================================================
def _make_logger(log_name: str = "photo_server.log"):
if Logs is None:
return None
lg = Logs()
try:
lg.LogEngine("G1_Logs", log_name)
except Exception:
pass
return lg
def _log(lg, msg: str, level: str = "info"):
# level: info / warning / error
if lg is not None:
try:
lg.print_and_log(msg, message_type=level)
return
except Exception:
pass
print(msg)
# ============================================================
# Utilities
# ============================================================
def _get_local_ip() -> str:
"""
Best-effort IP that phones can reach (same WiFi).
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def _safe_name(name: str) -> str:
"""
Prevent path traversal: keep only filename (no dirs).
"""
name = name.strip().replace("\\", "/")
name = name.split("/")[-1]
return name
def _is_photo_file(p: Path) -> bool:
if not p.is_file():
return False
ext = p.suffix.lower()
return ext in (".jpg", ".jpeg", ".png", ".webp")
def _list_photos(folder: Path) -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []
folder.mkdir(parents=True, exist_ok=True)
for p in sorted(folder.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if not _is_photo_file(p):
continue
st = p.stat()
items.append(
{
"name": p.name,
"size": st.st_size,
"mtime": st.st_mtime,
}
)
return items
def _read_text_file(path: Path) -> Optional[str]:
try:
return path.read_text(encoding="utf-8")
except Exception:
return None
def _read_bytes_file(path: Path) -> Optional[bytes]:
try:
return path.read_bytes()
except Exception:
return None
# ============================================================
# Capture/Test/Fix implementations
# ============================================================
def _capture_via_teleimager_python() -> str:
"""
Capture a photo by running take_photo.py with teleimager python.
Returns: saved path string OR compact error string.
"""
try:
env = os.environ.copy()
env.setdefault("PHOTOS_DIR", str(DEFAULT_PHOTOS_DIR))
env.setdefault("PHOTO_PREFIX", "photo")
env.setdefault("TELEIMAGER_HOST", "127.0.0.1")
env.setdefault("ZMQ_PORT", os.environ.get("ZMQ_PORT", "55555"))
env.setdefault("FRAME_TIMEOUT_S", os.environ.get("FRAME_TIMEOUT_S", "3.0"))
p = subprocess.run(
[TELE_PY, TAKE_PHOTO_PY],
capture_output=True,
text=True,
env=env,
timeout=20,
)
out = (p.stdout or "").strip().splitlines()
err = (p.stderr or "").strip().splitlines()
last = out[-1].strip() if out else ""
if p.returncode == 0 and last:
return last
msg = f"[ERR] capture failed rc={p.returncode}"
if out:
msg += "\n" + "\n".join(out[-8:])
if err:
msg += "\n" + "\n".join(err[-8:])
return msg
except Exception as e:
return f"[ERR] capture exception: {e}"
def _realsense_test() -> Dict[str, Any]:
"""
Quick test using teleimager python:
- import pyrealsense2
- count devices
"""
try:
code = (
"import pyrealsense2 as rs\n"
"ctx = rs.context()\n"
"devs = list(ctx.query_devices())\n"
"print(len(devs))\n"
"print([d.get_info(rs.camera_info.serial_number) for d in devs])\n"
)
p = subprocess.run(
[TELE_PY, "-c", code],
capture_output=True,
text=True,
timeout=10,
)
out = (p.stdout or "").strip().splitlines()
err = (p.stderr or "").strip()
if p.returncode != 0:
return {"ok": False, "rc": p.returncode, "error": err or "realsense test failed", "stdout": out[-20:]}
count = int(out[0]) if out else 0
serials = []
if len(out) >= 2:
# second line prints python list
serials = out[1]
return {"ok": True, "devices": count, "serials": serials, "rc": 0}
except Exception as e:
return {"ok": False, "error": str(e)}
def _run_fix_script() -> Dict[str, Any]:
"""
Runs fix_realsense_usb.sh in NON-INTERACTIVE mode.
If sudo password is required, it fails gracefully (no terminal prompt).
"""
if not FIX_SCRIPT.exists():
return {"ok": False, "error": f"fix script not found: {FIX_SCRIPT}"}
try:
# IMPORTANT: -n = non-interactive (won't prompt for password)
p = subprocess.run(
["sudo", "-n", "bash", str(FIX_SCRIPT)],
capture_output=True,
text=True,
timeout=40,
)
out = (p.stdout or "").strip().splitlines()[-80:]
err = (p.stderr or "").strip().splitlines()[-80:]
if p.returncode == 0:
return {"ok": True, "rc": 0, "stdout": out, "stderr": err}
# Most common case: sudo needs password
joined_err = "\n".join(err).lower()
if "a password is required" in joined_err or "sudo:" in joined_err:
return {
"ok": False,
"rc": p.returncode,
"error": "sudo password required. Run manually in terminal: sudo ./fix_realsense_usb.sh",
"stdout": out,
"stderr": err,
}
return {"ok": False, "rc": p.returncode, "stdout": out, "stderr": err}
except Exception as e:
return {"ok": False, "error": str(e)}
# ============================================================
# HTML template fallback (if Scripts/gallery.html missing)
# ============================================================
_FALLBACK_GALLERY_HTML = """<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Sanad Photos</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<h2>📸 Sanad Photos</h2>
<div class="toolbar">
<a href="/api/status" target="_blank">Status</a>
<a href="/api/capture" target="_blank">Capture Now</a>
<a href="/api/test" target="_blank">Camera Test</a>
<a href="/api/fix" target="_blank">Fix RealSense</a>
</div>
<div class="meta">Photos folder: <code>{{PHOTOS_DIR}}</code></div>
<div id="list">{{PHOTO_CARDS}}</div>
</body>
</html>
"""
_FALLBACK_STYLE_CSS = """\
:root{--border:#ddd;--bg:#fff;--muted:#666}
body{font-family:Arial,sans-serif;padding:12px;max-width:900px;margin:auto;background:#fafafa}
h2{margin:8px 0 14px}
.toolbar{margin-bottom:12px;display:flex;gap:10px;flex-wrap:wrap}
.toolbar a{text-decoration:none;color:#0b5fff;background:var(--bg);padding:6px 10px;border:1px solid var(--border);border-radius:8px}
.meta{color:var(--muted);margin:10px 0 14px}
.card{margin:10px 0;padding:10px;border:1px solid var(--border);border-radius:10px;background:var(--bg)}
.card .name{font-weight:700}
.card .actions{margin-top:6px}
.card .actions a{text-decoration:none;color:#0b5fff}
.card img{margin-top:10px;max-width:100%;height:auto;border-radius:10px}
"""
# ============================================================
# Main server factory
# ============================================================
def start_photo_server(
folder: Path = DEFAULT_PHOTOS_DIR,
port: int = 8080,
bind: str = "0.0.0.0",
capture_func: Optional[Callable[[], str]] = None,
logger=None,
) -> str:
"""
Starts the gallery server in a daemon thread.
Returns the URL (best-effort reachable IP).
If port is already in use, it logs and returns the URL anyway (assumes old server is running).
"""
lg = logger or _make_logger("photo_server.log")
folder = folder.resolve()
folder.mkdir(parents=True, exist_ok=True)
ip = _get_local_ip()
url = f"http://{ip}:{port}/"
if capture_func is None:
capture_func = _capture_via_teleimager_python
class Handler(SimpleHTTPRequestHandler):
# Serve files from PHOTOS folder by default
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(folder), **kwargs)
def log_message(self, fmt, *args):
# keep phone requests visible (and not too noisy)
print(f"{self.client_address[0]} - - [{self.log_date_time_string()}] {fmt % args}")
def _send_json(self, obj: Dict[str, Any], code: int = 200):
data = json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def _send_text(self, text: str, code: int = 200, ctype: str = "text/plain; charset=utf-8"):
data = text.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", ctype)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
q = urllib.parse.parse_qs(parsed.query)
# Ignore favicon noise
if path == "/favicon.ico":
self.send_response(204)
self.end_headers()
return
# Serve CSS from Scripts/style.css (or fallback)
if path == "/static/style.css":
css = _read_bytes_file(STYLE_CSS)
if css is None:
css = _FALLBACK_STYLE_CSS.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/css; charset=utf-8")
self.send_header("Content-Length", str(len(css)))
self.end_headers()
self.wfile.write(css)
return
# API endpoints
if path == "/api/status":
items = _list_photos(folder)
res = {
"ok": True,
"photos_dir": str(folder),
"count": len(items),
"latest": items[0]["name"] if items else None,
"url": url,
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
}
self._send_json(res, 200)
return
if path == "/api/test":
res = _realsense_test()
self._send_json(res, 200 if res.get("ok") else 500)
return
if path == "/api/fix":
res = _run_fix_script()
self._send_json(res, 200 if res.get("ok") else 500)
return
if path == "/api/capture":
# Take photo NOW (no remote)
result = capture_func()
ok = not result.startswith("[ERR]")
self._send_json({"ok": ok, "result": result}, 200 if ok else 500)
return
if path == "/api/delete":
name = _safe_name(q.get("name", [""])[0])
if not name:
self._send_json({"ok": False, "error": "missing name"}, 400)
return
target = (folder / name).resolve()
# ensure inside folder
try:
target.relative_to(folder)
except Exception:
self._send_json({"ok": False, "error": "invalid path"}, 400)
return
if not target.exists():
self._send_json({"ok": False, "error": "not found"}, 404)
return
try:
target.unlink()
self._send_json({"ok": True, "deleted": name}, 200)
except Exception as e:
self._send_json({"ok": False, "error": str(e)}, 500)
return
# Gallery page
if path == "/":
items = _list_photos(folder)
cards = []
for it in items:
n = it["name"]
n_url = urllib.parse.quote(n)
cards.append(
f"""
<div class="card">
<div class="name">{n}</div>
<div class="actions">
<a href="/{n_url}" target="_blank">Open/Download</a>
&nbsp; | &nbsp;
<a href="/api/delete?name={n_url}" onclick="return confirm('Delete {n}?')">Delete</a>
</div>
<img src="/{n_url}" />
</div>
"""
)
cards_html = "".join(cards) if cards else "<i>No photos yet.</i>"
tpl = _read_text_file(GALLERY_HTML) or _FALLBACK_GALLERY_HTML
html = (
tpl.replace("{{PHOTOS_DIR}}", str(folder))
.replace("{{PHOTO_CARDS}}", cards_html)
)
self._send_text(html, 200, "text/html; charset=utf-8")
return
# Otherwise: serve static files from photos/ (download/open)
return super().do_GET()
try:
ThreadingHTTPServer.allow_reuse_address = True
httpd = ThreadingHTTPServer((bind, port), Handler)
except OSError as e:
_log(lg, f"❌ Photo server failed: {e}", "error")
# Assume another server already running
_log(lg, f"🖼️ Phone gallery (existing): {url}", "info")
return url
def _serve():
try:
httpd.serve_forever()
except Exception:
pass
t = threading.Thread(target=_serve, daemon=True)
t.start()
_log(lg, f"🖼️ Photo server serving {folder} on {url}", "info")
return url
# ============================================================
# CLI mode (optional)
# ============================================================
if __name__ == "__main__":
photos = Path(os.environ.get("PHOTOS_DIR", str(DEFAULT_PHOTOS_DIR))).resolve()
port = int(os.environ.get("PHOTO_SERVER_PORT", "8080"))
start_photo_server(photos, port=port)
print("Server running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(3600)
except KeyboardInterrupt:
pass