92 lines
2.7 KiB
Python
92 lines
2.7 KiB
Python
# photo_runner.py
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
|
|
TELE_PY = os.environ.get("TELEIMAGER_PY", "/home/unitree/miniconda3/envs/teleimager/bin/python")
|
|
TAKE_PHOTO_PY = str((BASE_DIR / "take_photo.py").resolve())
|
|
|
|
FIX_SCRIPT = str((BASE_DIR / "fix_realsense_usb.sh").resolve())
|
|
|
|
|
|
def take_photo_sync() -> str:
|
|
"""
|
|
Runs take_photo.py using teleimager python (Py3.10) and returns saved path or compact error.
|
|
"""
|
|
try:
|
|
env = os.environ.copy()
|
|
env.setdefault("PHOTOS_DIR", str((BASE_DIR / "photos").resolve()))
|
|
env.setdefault("PHOTO_PREFIX", "photo")
|
|
env.setdefault("TELEIMAGER_HOST", "127.0.0.1")
|
|
|
|
# Option A (ZMQ)
|
|
env.setdefault("ZMQ_PORT", os.environ.get("ZMQ_PORT", "55555"))
|
|
env.setdefault("FRAME_TIMEOUT_S", os.environ.get("FRAME_TIMEOUT_S", "3.0"))
|
|
|
|
p = subprocess.run(
|
|
[TELE_PY, TAKE_PHOTO_PY],
|
|
capture_output=True,
|
|
text=True,
|
|
env=env,
|
|
timeout=20,
|
|
)
|
|
|
|
out = (p.stdout or "").strip().splitlines()
|
|
err = (p.stderr or "").strip().splitlines()
|
|
last = out[-1].strip() if out else ""
|
|
|
|
if p.returncode == 0 and last:
|
|
return last
|
|
|
|
msg = f"[ERR] take_photo.py failed rc={p.returncode}"
|
|
if out:
|
|
msg += "\n" + "\n".join(out[-10:])
|
|
if err:
|
|
msg += "\n" + "\n".join(err[-10:])
|
|
return msg
|
|
|
|
except Exception as e:
|
|
return f"[ERR] take_photo exception: {e}"
|
|
|
|
|
|
def take_photo_test_sync() -> str:
|
|
"""
|
|
Takes a photo and deletes it immediately. Good as a camera health test.
|
|
Returns "OK" or an error string.
|
|
"""
|
|
result = take_photo_sync()
|
|
if result.startswith("/"):
|
|
try:
|
|
p = Path(result)
|
|
if p.exists():
|
|
p.unlink()
|
|
return "OK"
|
|
except Exception as e:
|
|
return f"[WARN] captured but delete failed: {e}"
|
|
return result
|
|
|
|
|
|
def run_fix_realsense_sync() -> str:
|
|
"""
|
|
Runs fix_realsense_usb.sh. NOTE: script uses sudo tee -> needs passwordless sudo to fully work.
|
|
"""
|
|
try:
|
|
if not Path(FIX_SCRIPT).exists():
|
|
return f"[ERR] fix script not found: {FIX_SCRIPT}"
|
|
|
|
p = subprocess.run(
|
|
["bash", FIX_SCRIPT],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=20,
|
|
)
|
|
out = (p.stdout or "").strip()
|
|
err = (p.stderr or "").strip()
|
|
if p.returncode == 0:
|
|
return out or "OK"
|
|
return f"[ERR] fix script rc={p.returncode}\n{out}\n{err}".strip()
|
|
except Exception as e:
|
|
return f"[ERR] fix exception: {e}"
|