397 lines
13 KiB
Python
397 lines
13 KiB
Python
# capture_service.py
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from Core.error_events import record_error
|
|
from Core import settings as config
|
|
from Server import direct_camera_client
|
|
|
|
SERVER_DIR = Path(__file__).resolve().parent
|
|
PROJECT_ROOT = SERVER_DIR.parent
|
|
|
|
FIX_SCRIPT = str((PROJECT_ROOT / "Scripts" / "fix_realsense_usb.sh").resolve())
|
|
UPLOAD_NOW_FLAG = (PROJECT_ROOT / "Scripts" / "upload_now.flag").resolve()
|
|
|
|
|
|
def _touch_upload_now_flag():
|
|
try:
|
|
UPLOAD_NOW_FLAG.parent.mkdir(parents=True, exist_ok=True)
|
|
UPLOAD_NOW_FLAG.write_text(str(time.time()), encoding="utf-8")
|
|
except Exception as e:
|
|
record_error("capture_service", "touch_upload_now_flag", e)
|
|
|
|
|
|
def _next_photo_name(out_dir: Path, prefix: str, ext: str) -> Path:
|
|
base = out_dir / f"{prefix}.{ext}"
|
|
if not base.exists():
|
|
return base
|
|
i = 1
|
|
while True:
|
|
p = out_dir / f"{prefix}({i}).{ext}"
|
|
if not p.exists():
|
|
return p
|
|
i += 1
|
|
|
|
|
|
def take_photo_sync(prefix: str | None = None) -> str:
|
|
"""
|
|
Captures one photo and returns the saved path or a compact error.
|
|
Production runtime uses the direct camera service only.
|
|
"""
|
|
retry_count = max(0, int(config.read_watchdog_camera_capture_retry_count()))
|
|
retry_delay = max(0.0, float(config.read_watchdog_camera_capture_retry_delay_sec()))
|
|
total_attempts = max(1, retry_count + 1)
|
|
last_err = "[ERR] take_photo unknown error"
|
|
|
|
for attempt in range(1, total_attempts + 1):
|
|
try:
|
|
env = os.environ.copy()
|
|
env.setdefault("PHOTOS_DIR", str(Path(config.PHOTOS_DIR).resolve()))
|
|
if prefix:
|
|
env["PHOTO_PREFIX"] = prefix
|
|
else:
|
|
env.setdefault("PHOTO_PREFIX", "photo")
|
|
if not direct_camera_client.is_enabled():
|
|
last_err = "[ERR] direct camera service is disabled"
|
|
record_error(
|
|
"capture_service",
|
|
"take_photo_sync_attempt_failed",
|
|
context={"attempt": attempt, "total_attempts": total_attempts, "reason": "direct_camera_disabled"},
|
|
)
|
|
continue
|
|
|
|
saved = direct_camera_client.capture(
|
|
prefix=env.get("PHOTO_PREFIX", "photo"),
|
|
ext=env.get("PHOTO_EXT", "jpg").strip().lstrip(".") or "jpg",
|
|
timeout=10.0,
|
|
)
|
|
_touch_upload_now_flag()
|
|
return saved
|
|
except Exception as e:
|
|
last_err = f"[ERR] take_photo exception: {e}"
|
|
record_error(
|
|
"capture_service",
|
|
"take_photo_sync_exception",
|
|
e,
|
|
{"attempt": attempt, "total_attempts": total_attempts},
|
|
)
|
|
|
|
if attempt < total_attempts and retry_delay > 0.0:
|
|
time.sleep(retry_delay)
|
|
|
|
return last_err
|
|
|
|
|
|
def take_photo_test_sync() -> str:
|
|
"""
|
|
Takes a photo and deletes it immediately. Good as a camera health test.
|
|
Returns "OK" or an error string.
|
|
"""
|
|
result = take_photo_sync()
|
|
if result.startswith("/"):
|
|
try:
|
|
p = Path(result)
|
|
if p.exists():
|
|
p.unlink()
|
|
return "OK"
|
|
except Exception as e:
|
|
return f"[WARN] captured but delete failed: {e}"
|
|
return result
|
|
|
|
|
|
def run_fix_realsense_sync() -> str:
|
|
"""
|
|
Runs fix_realsense_usb.sh. NOTE: script uses sudo tee -> needs passwordless sudo to fully work.
|
|
"""
|
|
try:
|
|
if not Path(FIX_SCRIPT).exists():
|
|
return f"[ERR] fix script not found: {FIX_SCRIPT}"
|
|
|
|
p = subprocess.run(
|
|
["bash", FIX_SCRIPT],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=20,
|
|
)
|
|
out = (p.stdout or "").strip()
|
|
err = (p.stderr or "").strip()
|
|
if p.returncode == 0:
|
|
return out or "OK"
|
|
return f"[ERR] fix script rc={p.returncode}\n{out}\n{err}".strip()
|
|
except Exception as e:
|
|
record_error("capture_service", "run_fix_realsense_sync_exception", e)
|
|
return f"[ERR] fix exception: {e}"
|
|
|
|
|
|
def replay_file_integrity(path: Path) -> dict:
|
|
"""
|
|
Checks:
|
|
- file exists
|
|
- has at least one frame with 'q'
|
|
- has at least one trigger marker (meta contains trigger)
|
|
"""
|
|
p = Path(path).resolve()
|
|
out = {
|
|
"ok": False, # usable replay (has frames)
|
|
"trigger_ok": False, # has explicit trigger markers
|
|
"path": str(p),
|
|
"exists": False,
|
|
"frames": 0,
|
|
"triggers": 0,
|
|
"error": "",
|
|
}
|
|
if not p.exists() or not p.is_file():
|
|
out["error"] = "missing replay file"
|
|
return out
|
|
out["exists"] = True
|
|
|
|
import json
|
|
|
|
try:
|
|
with p.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
d = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
if isinstance(d, dict) and isinstance(d.get("q"), list):
|
|
out["frames"] += 1
|
|
meta = d.get("meta")
|
|
trig = False
|
|
if isinstance(meta, str) and ("trigger" in meta.lower()):
|
|
trig = True
|
|
elif isinstance(meta, dict) and bool(meta.get("trigger")):
|
|
trig = True
|
|
if trig:
|
|
out["triggers"] += 1
|
|
out["ok"] = out["frames"] > 0
|
|
out["trigger_ok"] = out["triggers"] > 0
|
|
if out["frames"] <= 0:
|
|
out["error"] = "replay has no valid frames"
|
|
elif out["triggers"] <= 0:
|
|
out["warning"] = "replay has no trigger markers; timed fallback capture will be used"
|
|
return out
|
|
except Exception as e:
|
|
out["error"] = str(e)
|
|
record_error("capture_service", "replay_file_integrity", e, {"path": str(p)})
|
|
return out
|
|
|
|
|
|
def replay_timing_profile(path: Path, end_margin_sec: float | None = None) -> dict:
|
|
"""
|
|
Returns replay timing information used to align countdown audio and fallback capture.
|
|
|
|
If the replay contains explicit trigger markers, the first trigger offset is treated as the
|
|
desired shot time. Otherwise the shot is scheduled slightly before the replay ends so capture
|
|
happens before the arm starts returning home.
|
|
"""
|
|
p = Path(path).resolve()
|
|
margin = max(0.0, float(end_margin_sec if end_margin_sec is not None else config.REPLAY_CAPTURE_END_MARGIN_SEC))
|
|
out = {
|
|
"ok": False,
|
|
"path": str(p),
|
|
"exists": False,
|
|
"frames": 0,
|
|
"timed_frames": 0,
|
|
"duration_sec": 0.0,
|
|
"trigger_offsets_sec": [],
|
|
"trigger_count": 0,
|
|
"capture_offset_sec": max(0.0, min(config.PHOTO_DELAY_SEC, config.PHOTO_TOTAL_SEC)),
|
|
"capture_source": "config_fallback",
|
|
"end_margin_sec": margin,
|
|
"error": "",
|
|
}
|
|
if not p.exists() or not p.is_file():
|
|
out["error"] = "missing replay file"
|
|
return out
|
|
out["exists"] = True
|
|
|
|
import json
|
|
|
|
timed_offsets: list[float] = []
|
|
trigger_offsets: list[float] = []
|
|
first_t: float | None = None
|
|
|
|
try:
|
|
with p.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
d = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
if not (isinstance(d, dict) and isinstance(d.get("q"), list)):
|
|
continue
|
|
|
|
out["frames"] += 1
|
|
try:
|
|
t_val = float(d.get("t"))
|
|
except Exception:
|
|
continue
|
|
|
|
if first_t is None:
|
|
first_t = t_val
|
|
rel_t = max(0.0, t_val - first_t)
|
|
timed_offsets.append(rel_t)
|
|
out["timed_frames"] += 1
|
|
|
|
meta = d.get("meta")
|
|
if isinstance(meta, str) and ("trigger" in meta.lower()):
|
|
trigger_offsets.append(rel_t)
|
|
elif isinstance(meta, dict) and bool(meta.get("trigger")):
|
|
trigger_offsets.append(rel_t)
|
|
|
|
out["ok"] = out["frames"] > 0
|
|
if not out["ok"]:
|
|
out["error"] = "replay has no valid frames"
|
|
return out
|
|
|
|
if timed_offsets:
|
|
duration_sec = max(0.0, timed_offsets[-1])
|
|
out["duration_sec"] = duration_sec
|
|
out["trigger_offsets_sec"] = [round(x, 3) for x in trigger_offsets]
|
|
out["trigger_count"] = len(trigger_offsets)
|
|
|
|
if trigger_offsets:
|
|
out["capture_offset_sec"] = max(0.0, min(duration_sec, trigger_offsets[0]))
|
|
out["capture_source"] = "trigger"
|
|
elif duration_sec > 0.0:
|
|
out["capture_offset_sec"] = max(0.0, duration_sec - margin)
|
|
out["capture_source"] = "timed_end"
|
|
else:
|
|
out["capture_source"] = "config_fallback"
|
|
else:
|
|
out["error"] = "replay has no timestamped frames; config fallback timing will be used"
|
|
|
|
return out
|
|
except Exception as e:
|
|
out["error"] = str(e)
|
|
record_error("capture_service", "replay_timing_profile", e, {"path": str(p)})
|
|
return out
|
|
|
|
|
|
def ensure_replay_integrity(active_replay: Path, fallback_replay: Path) -> tuple[Path, dict]:
|
|
"""
|
|
Returns a valid replay path. If active invalid, tries fallback.
|
|
"""
|
|
active = Path(active_replay).resolve()
|
|
fallback = Path(fallback_replay).resolve()
|
|
active_report = replay_file_integrity(active)
|
|
if active_report.get("ok") and active_report.get("trigger_ok"):
|
|
return active, {"selected": str(active), "active": active_report, "fallback_used": False}
|
|
|
|
fallback_report = replay_file_integrity(fallback)
|
|
if fallback_report.get("ok") and fallback_report.get("trigger_ok"):
|
|
return fallback, {
|
|
"selected": str(fallback),
|
|
"active": active_report,
|
|
"fallback": fallback_report,
|
|
"fallback_used": True,
|
|
"reason": "active replay missing trigger markers",
|
|
}
|
|
|
|
# Degraded mode: no trigger-marked file found, keep active if it has frames.
|
|
if active_report.get("ok"):
|
|
return active, {
|
|
"selected": str(active),
|
|
"active": active_report,
|
|
"fallback": fallback_report,
|
|
"fallback_used": False,
|
|
"degraded_no_trigger": True,
|
|
}
|
|
|
|
# If both invalid, keep active and report both failures.
|
|
return active, {
|
|
"selected": str(active),
|
|
"active": active_report,
|
|
"fallback": fallback_report,
|
|
"fallback_used": False,
|
|
}
|
|
|
|
|
|
def capture_with_replay_sync(
|
|
replay_runner,
|
|
replay_file: Path,
|
|
home_file: Path,
|
|
delay_sec: float,
|
|
prefix: str | None = None,
|
|
speed: float = 1.0,
|
|
finalize_timeout_sec: float = 4.0,
|
|
cancel_event: threading.Event | None = None,
|
|
) -> str:
|
|
"""
|
|
Unified capture pipeline:
|
|
replay + trigger callback capture + fallback timed capture + upload flag.
|
|
"""
|
|
delay_sec = max(0.0, float(delay_sec))
|
|
done = threading.Event()
|
|
lock = threading.Lock()
|
|
result = {"value": None}
|
|
capture_started = {"value": False}
|
|
|
|
def _is_cancelled() -> bool:
|
|
return bool(cancel_event is not None and cancel_event.is_set())
|
|
|
|
def _capture_once(reason: str):
|
|
if _is_cancelled():
|
|
return
|
|
with lock:
|
|
if capture_started["value"]:
|
|
return
|
|
capture_started["value"] = True
|
|
try:
|
|
result["value"] = take_photo_sync(prefix=prefix)
|
|
except Exception as e:
|
|
record_error("capture_service", "capture_with_replay_capture_once", e, {"reason": reason})
|
|
result["value"] = f"[ERR] capture failed ({reason}): {e}"
|
|
finally:
|
|
done.set()
|
|
|
|
if replay_runner is not None and bool(getattr(replay_runner, "is_playing", False)):
|
|
return "[ERR] replay busy"
|
|
if _is_cancelled():
|
|
return "[ERR] capture cancelled"
|
|
|
|
try:
|
|
if replay_runner is not None:
|
|
def _trigger_cb(_frame):
|
|
threading.Thread(target=lambda: _capture_once("trigger"), daemon=True).start()
|
|
|
|
threading.Thread(
|
|
target=lambda: replay_runner.run(replay_file, home_file, speed, trigger_callback=_trigger_cb),
|
|
daemon=True,
|
|
).start()
|
|
except Exception as e:
|
|
record_error("capture_service", "capture_with_replay_start_replay", e)
|
|
return f"[ERR] replay start failed: {e}"
|
|
|
|
deadline = time.time() + delay_sec
|
|
while (not done.is_set()) and (time.time() < deadline):
|
|
if _is_cancelled():
|
|
return "[ERR] capture cancelled"
|
|
time.sleep(0.02)
|
|
|
|
if (not done.is_set()) and (not _is_cancelled()):
|
|
_capture_once("fallback")
|
|
|
|
finalize_deadline = time.time() + max(0.1, finalize_timeout_sec)
|
|
while not done.is_set():
|
|
if _is_cancelled():
|
|
return "[ERR] capture cancelled"
|
|
if time.time() >= finalize_deadline:
|
|
break
|
|
time.sleep(0.02)
|
|
if not done.is_set():
|
|
record_error("capture_service", "capture_with_replay_timeout", context={"delay_sec": delay_sec})
|
|
return "[ERR] capture pipeline timeout"
|
|
|
|
return str(result["value"] or "[ERR] capture pipeline no result")
|