Saqr/apps/manager_cli.py

417 lines
12 KiB
Python

"""Interactive CLI to manage captured PPE photos + CSV export."""
from __future__ import annotations
import argparse
import csv
import shutil
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import cv2
from core.paths import CAPTURES_DIR, PROJECT_ROOT
from utils.logger import get_logger
log = get_logger("Manager", "manager")
STATUSES = ("SAFE", "PARTIAL", "UNSAFE")
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
CLASS_COLUMNS = [
"boots", "gloves", "goggles", "helmet",
"no-boots", "no-gloves", "no-goggles", "no-helmet", "no-vest", "vest",
]
_C = {"SAFE": "\033[92m", "PARTIAL": "\033[93m", "UNSAFE": "\033[91m",
"BOLD": "\033[1m", "RESET": "\033[0m"}
def _cs(s): return f"{_C.get(s, '')}{s}{_C['RESET']}"
def _bold(s): return f"{_C['BOLD']}{s}{_C['RESET']}"
@dataclass
class Photo:
path: Path
status: str
filename: str
person_id: str = ""
class_name: str = "unknown"
date_captured: str = ""
@property
def class_flags(self):
flags = {c: 0 for c in CLASS_COLUMNS}
stem = self.filename.lower()
for c in CLASS_COLUMNS:
if c in stem:
flags[c] = 1
return flags
@dataclass
class EventRow:
timestamp: str
track_id: str
event_type: str
status: str
wearing: str
missing: str
unknown: str
photo: str
path: str
@property
def class_flags(self):
worn = {c.strip() for c in self.wearing.split(",") if c.strip()}
return {c: (1 if c in worn else 0) for c in CLASS_COLUMNS}
@property
def missing_notes(self):
items = [c.strip() for c in self.missing.split(",") if c.strip()]
return "Missing " + ", ".join(items) if items else "Compliant"
def parse_photo(path: Path, status: str) -> Photo:
stem = path.stem
parts = stem.split("_")
person_id = ""
date_captured = ""
class_name = "unknown"
if stem.startswith("track_") and len(parts) >= 2 and parts[1].isdigit():
person_id = f"track_{parts[1]}"
elif len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit():
try:
date_captured = datetime.strptime(
f"{parts[0]}_{parts[1]}", "%Y%m%d_%H%M%S"
).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
pass
if len(parts) > 3:
class_name = "_".join(parts[3:])
return Photo(
path=path, status=status, filename=path.name,
person_id=person_id, class_name=class_name, date_captured=date_captured,
)
def load_photos():
photos = []
for status in STATUSES:
folder = CAPTURES_DIR / status
if not folder.exists():
continue
for f in sorted(folder.iterdir()):
if f.suffix.lower() in IMG_EXTS:
photos.append(parse_photo(f, status))
return photos
def load_events_csv(path: Path):
if not path.exists():
return []
rows = []
with open(path, "r", newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
rows.append(EventRow(
timestamp=row.get("timestamp", ""),
track_id=row.get("track_id", ""),
event_type=row.get("event_type", ""),
status=row.get("status", ""),
wearing=row.get("wearing", ""),
missing=row.get("missing", ""),
unknown=row.get("unknown", ""),
photo=row.get("photo", ""),
path=row.get("path", ""),
))
return rows
def move_photo(photo: Photo, new_status: str) -> Photo:
dst_dir = CAPTURES_DIR / new_status
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / photo.filename
shutil.move(str(photo.path), str(dst))
log.info(f"Moved '{photo.filename}': {photo.status} -> {new_status}")
return Photo(path=dst, status=new_status, filename=photo.filename,
person_id=photo.person_id, class_name=photo.class_name,
date_captured=photo.date_captured)
def rename_photo(photo: Photo, new_name: str) -> Photo:
if Path(new_name).suffix.lower() not in IMG_EXTS:
new_name += photo.path.suffix
dst = photo.path.parent / new_name
photo.path.rename(dst)
log.info(f"Renamed: '{photo.filename}' -> '{new_name}'")
return Photo(path=dst, status=photo.status, filename=new_name,
person_id=photo.person_id, class_name=photo.class_name,
date_captured=photo.date_captured)
def assign_id(photo: Photo, pid: str) -> Photo:
pid = pid.strip().replace(" ", "_")
dt = datetime.now().strftime("%Y%m%d_%H%M%S")
cls = photo.class_name if photo.class_name != "unknown" else "ppe"
return rename_photo(photo, f"{pid}_{dt}_{cls}{photo.path.suffix}")
def delete_photo(photo: Photo) -> None:
photo.path.unlink()
log.info(f"Deleted: '{photo.filename}' ({photo.status})")
def copy_photo(photo: Photo, dest: Path) -> Path:
dest.mkdir(parents=True, exist_ok=True)
dst = dest / photo.filename
shutil.copy2(str(photo.path), str(dst))
log.info(f"Copied '{photo.filename}' -> {dst}")
return dst
def export_csv(photos, output: Path) -> None:
event_rows = load_events_csv(CAPTURES_DIR / "events.csv")
fields = ["photo", "track_id", "event_type", "status", "timestamp",
"wearing", "missing", "unknown", "missing_notes",
*CLASS_COLUMNS, "path"]
with open(output, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
if event_rows:
for r in event_rows:
w.writerow({
"photo": r.photo, "track_id": r.track_id,
"event_type": r.event_type, "status": r.status,
"timestamp": r.timestamp, "wearing": r.wearing,
"missing": r.missing, "unknown": r.unknown,
"missing_notes": r.missing_notes,
**r.class_flags, "path": r.path,
})
else:
for p in photos:
w.writerow({
"photo": p.filename, "track_id": p.person_id,
"event_type": "", "status": p.status,
"timestamp": p.date_captured, "wearing": "",
"missing": "", "unknown": "", "missing_notes": "",
**p.class_flags, "path": str(p.path),
})
count = len(event_rows) if event_rows else len(photos)
log.info(f"CSV exported: {output} ({count} records)")
def print_header(photos):
counts = {s: sum(1 for p in photos if p.status == s) for s in STATUSES}
print("\n" + "=" * 66)
print(_bold(" Saqr - PPE Photo Manager"))
print("=" * 66)
print(f" {_cs('SAFE')} {counts['SAFE']:3d} | "
f"{_cs('PARTIAL')} {counts['PARTIAL']:3d} | "
f"{_cs('UNSAFE')} {counts['UNSAFE']:3d} | Total: {len(photos)}")
print("=" * 66)
def print_table(photos):
print(f"\n {'#':>4} {'STATUS':<8} {'ID':<14} {'DATE':<19} FILENAME")
print(" " + "-" * 68)
for i, p in enumerate(photos):
pid = (p.person_id or "-")[:12]
date = (p.date_captured or "-")[:17]
print(f" {i+1:>4} {p.status:<8} {pid:<14} {date:<19} {p.filename[:28]}")
def pick_photo(photos, prompt="Select photo"):
if not photos:
print(" No photos found.")
return None
print_table(photos)
try:
n = int(input(f"\n {prompt} (0=cancel): "))
if 1 <= n <= len(photos):
return photos[n - 1]
except ValueError:
pass
return None
def show_details(photo):
print(f"\n Filename : {photo.filename}")
print(f" Status : {_cs(photo.status)}")
print(f" ID : {photo.person_id or '-'}")
print(f" Date : {photo.date_captured or '-'}")
print(f" Path : {photo.path}")
def act_list(photos):
print("\n Filter: [1] All [2] SAFE [3] PARTIAL [4] UNSAFE")
ch = input(" Choice: ").strip()
filt = {"2": "SAFE", "3": "PARTIAL", "4": "UNSAFE"}.get(ch)
sub = [p for p in photos if filt is None or p.status == filt]
if sub:
print_table(sub)
print(f"\n Showing {len(sub)} photo(s).")
else:
print(" None.")
def act_view(photos):
p = pick_photo(photos, "View photo")
if not p:
return
show_details(p)
img = cv2.imread(str(p.path))
if img is not None:
cv2.imshow(f"Saqr - {p.filename}", img)
print(" Press any key to close.")
cv2.waitKey(0)
cv2.destroyAllWindows()
def act_move(photos):
p = pick_photo(photos, "Move photo")
if not p:
return photos
show_details(p)
print("\n Move to: [1] SAFE [2] PARTIAL [3] UNSAFE")
t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip())
if not t or t == p.status:
return photos
move_photo(p, t)
print(f" Moved -> {_cs(t)}")
return load_photos()
def act_rename(photos):
p = pick_photo(photos, "Rename photo")
if not p:
return photos
name = input(" New filename: ").strip()
if name:
rename_photo(p, name)
return load_photos()
def act_assign_id(photos):
p = pick_photo(photos, "Assign ID to photo")
if not p:
return photos
pid = input(" Person ID (e.g. W001): ").strip()
if pid:
assign_id(p, pid)
return load_photos()
def act_delete(photos):
p = pick_photo(photos, "Delete photo")
if not p:
return photos
show_details(p)
if input(f" Delete '{p.filename}'? (yes/no): ").strip().lower() in ("y", "yes"):
delete_photo(p)
print(" Deleted.")
return load_photos()
def act_download(photos):
p = pick_photo(photos, "Download/copy photo")
if not p:
return
dest = input(" Destination folder: ").strip()
if dest:
dst = copy_photo(p, Path(dest).expanduser())
print(f" Copied -> {dst}")
def act_export(photos):
default = PROJECT_ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
out = input(f" Output [{default.name}]: ").strip()
output = Path(out).expanduser() if out else default
export_csv(photos, output)
print(f" Exported -> {output}")
def act_update(photos):
print("\n Re-classify: when PPE compliance changes (UNSAFE -> SAFE etc.)")
p = pick_photo(photos, "Update photo status")
if not p:
return photos
show_details(p)
print("\n New status: [1] SAFE [2] PARTIAL [3] UNSAFE")
t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip())
if not t or t == p.status:
return photos
move_photo(p, t)
print(f" Updated -> {_cs(t)}")
return load_photos()
MENU = """
[1] List photos
[2] View photo
[3] Move photo (change status)
[4] Rename photo
[5] Assign person ID
[6] Delete photo
[7] Download / Copy photo
[8] Export report to CSV
[9] Update status (re-classify)
[0] Exit
"""
def main():
parser = argparse.ArgumentParser(description="Saqr Photo Manager")
parser.add_argument("--export", action="store_true", help="Quick CSV export")
args = parser.parse_args()
if not CAPTURES_DIR.exists():
print("[ERROR] runtime/captures/ not found. Run saqr first.")
raise SystemExit(1)
photos = load_photos()
if args.export:
out = PROJECT_ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
export_csv(photos, out)
print(f"Exported: {out}")
return
log.info("Manager started")
actions = {
"1": act_list, "2": act_view, "3": act_move, "4": act_rename,
"5": act_assign_id, "6": act_delete, "7": act_download,
"8": act_export, "9": act_update,
}
while True:
photos = load_photos()
print_header(photos)
print(MENU)
ch = input(" Choice: ").strip()
if ch == "0":
log.info("Manager ended")
print(" Bye.\n")
break
action = actions.get(ch)
if action:
result = action(photos)
if isinstance(result, list):
photos = result
else:
print(" Unknown option.")
input("\n Press Enter to continue...")
if __name__ == "__main__":
main()