""" Saqr - PPE Detection | Photo Manager ======================================== Interactive CLI to manage captured PPE photos. Features: list, view, move, rename, assign ID, delete, download/copy, export CSV, update status. Usage: python manager.py # interactive menu python manager.py --export # quick CSV export """ from __future__ import annotations import argparse import csv import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path import cv2 from logger import get_logger log = get_logger("Manager", "manager") ROOT = Path(__file__).parent CAPTURES_DIR = ROOT / "captures" STATUSES = ("SAFE", "PARTIAL", "UNSAFE") IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} CLASS_COLUMNS = [ "boots", "gloves", "goggles", "helmet", "no-boots", "no-gloves", "no-goggles", "no-helmet", "no-vest", "vest", ] # ANSI colours _C = {"SAFE": "\033[92m", "PARTIAL": "\033[93m", "UNSAFE": "\033[91m", "BOLD": "\033[1m", "RESET": "\033[0m"} def _cs(s): return f"{_C.get(s, '')}{s}{_C['RESET']}" def _bold(s): return f"{_C['BOLD']}{s}{_C['RESET']}" # ── Data models ─────────────────────────────────────────────────────────────── @dataclass class Photo: path: Path status: str filename: str person_id: str = "" class_name: str = "unknown" date_captured: str = "" @property def class_flags(self) -> dict[str, int]: flags = {c: 0 for c in CLASS_COLUMNS} stem = self.filename.lower() for c in CLASS_COLUMNS: if c in stem: flags[c] = 1 return flags @dataclass class EventRow: """One row from captures/events.csv (written by saqr.py).""" timestamp: str track_id: str event_type: str status: str wearing: str missing: str unknown: str photo: str path: str @property def class_flags(self) -> dict[str, int]: worn = {c.strip() for c in self.wearing.split(",") if c.strip()} return {c: (1 if c in worn else 0) for c in CLASS_COLUMNS} @property def missing_notes(self) -> str: items = [c.strip() for c in self.missing.split(",") if c.strip()] return "Missing " + ", ".join(items) if items else "Compliant" # ── Parsing & Loading ───────────────────────────────────────────────────────── def parse_photo(path: Path, status: str) -> Photo: stem = path.stem parts = stem.split("_") person_id = "" date_captured = "" class_name = "unknown" # Try to extract track_NNNN format if stem.startswith("track_") and len(parts) >= 2 and parts[1].isdigit(): person_id = f"track_{parts[1]}" elif len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): try: date_captured = datetime.strptime( f"{parts[0]}_{parts[1]}", "%Y%m%d_%H%M%S" ).strftime("%Y-%m-%d %H:%M:%S") except ValueError: pass if len(parts) > 3: class_name = "_".join(parts[3:]) return Photo( path=path, status=status, filename=path.name, person_id=person_id, class_name=class_name, date_captured=date_captured, ) def load_photos() -> list[Photo]: photos = [] for status in STATUSES: folder = CAPTURES_DIR / status if not folder.exists(): continue for f in sorted(folder.iterdir()): if f.suffix.lower() in IMG_EXTS: photos.append(parse_photo(f, status)) return photos def load_events_csv(path: Path) -> list[EventRow]: if not path.exists(): return [] rows = [] with open(path, "r", newline="", encoding="utf-8") as f: for row in csv.DictReader(f): rows.append(EventRow( timestamp=row.get("timestamp", ""), track_id=row.get("track_id", ""), event_type=row.get("event_type", ""), status=row.get("status", ""), wearing=row.get("wearing", ""), missing=row.get("missing", ""), unknown=row.get("unknown", ""), photo=row.get("photo", ""), path=row.get("path", ""), )) return rows # ── Core operations ─────────────────────────────────────────────────────────── def move_photo(photo: Photo, new_status: str) -> Photo: dst_dir = CAPTURES_DIR / new_status dst_dir.mkdir(parents=True, exist_ok=True) dst = dst_dir / photo.filename shutil.move(str(photo.path), str(dst)) log.info(f"Moved '{photo.filename}': {photo.status} -> {new_status}") return Photo(path=dst, status=new_status, filename=photo.filename, person_id=photo.person_id, class_name=photo.class_name, date_captured=photo.date_captured) def rename_photo(photo: Photo, new_name: str) -> Photo: if Path(new_name).suffix.lower() not in IMG_EXTS: new_name += photo.path.suffix dst = photo.path.parent / new_name photo.path.rename(dst) log.info(f"Renamed: '{photo.filename}' -> '{new_name}'") return Photo(path=dst, status=photo.status, filename=new_name, person_id=photo.person_id, class_name=photo.class_name, date_captured=photo.date_captured) def assign_id(photo: Photo, pid: str) -> Photo: pid = pid.strip().replace(" ", "_") dt = datetime.now().strftime("%Y%m%d_%H%M%S") cls = photo.class_name if photo.class_name != "unknown" else "ppe" return rename_photo(photo, f"{pid}_{dt}_{cls}{photo.path.suffix}") def delete_photo(photo: Photo) -> None: photo.path.unlink() log.info(f"Deleted: '{photo.filename}' ({photo.status})") def copy_photo(photo: Photo, dest: Path) -> Path: dest.mkdir(parents=True, exist_ok=True) dst = dest / photo.filename shutil.copy2(str(photo.path), str(dst)) log.info(f"Copied '{photo.filename}' -> {dst}") return dst def export_csv(photos: list[Photo], output: Path) -> None: event_rows = load_events_csv(CAPTURES_DIR / "events.csv") fields = ["photo", "track_id", "event_type", "status", "timestamp", "wearing", "missing", "unknown", "missing_notes", *CLASS_COLUMNS, "path"] with open(output, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() if event_rows: for r in event_rows: w.writerow({ "photo": r.photo, "track_id": r.track_id, "event_type": r.event_type, "status": r.status, "timestamp": r.timestamp, "wearing": r.wearing, "missing": r.missing, "unknown": r.unknown, "missing_notes": r.missing_notes, **r.class_flags, "path": r.path, }) else: for p in photos: w.writerow({ "photo": p.filename, "track_id": p.person_id, "event_type": "", "status": p.status, "timestamp": p.date_captured, "wearing": "", "missing": "", "unknown": "", "missing_notes": "", **p.class_flags, "path": str(p.path), }) count = len(event_rows) if event_rows else len(photos) log.info(f"CSV exported: {output} ({count} records)") # ── Display ─────────────────────────────────────────────────────────────────── def print_header(photos): counts = {s: sum(1 for p in photos if p.status == s) for s in STATUSES} print("\n" + "=" * 66) print(_bold(" Saqr - PPE Photo Manager")) print("=" * 66) print(f" {_cs('SAFE')} {counts['SAFE']:3d} | " f"{_cs('PARTIAL')} {counts['PARTIAL']:3d} | " f"{_cs('UNSAFE')} {counts['UNSAFE']:3d} | Total: {len(photos)}") print("=" * 66) def print_table(photos): print(f"\n {'#':>4} {'STATUS':<8} {'ID':<14} {'DATE':<19} FILENAME") print(" " + "-" * 68) for i, p in enumerate(photos): pid = (p.person_id or "-")[:12] date = (p.date_captured or "-")[:17] print(f" {i+1:>4} {p.status:<8} {pid:<14} {date:<19} {p.filename[:28]}") def pick_photo(photos, prompt="Select photo") -> Photo | None: if not photos: print(" No photos found.") return None print_table(photos) try: n = int(input(f"\n {prompt} (0=cancel): ")) if 1 <= n <= len(photos): return photos[n - 1] except ValueError: pass return None def show_details(photo): print(f"\n Filename : {photo.filename}") print(f" Status : {_cs(photo.status)}") print(f" ID : {photo.person_id or '-'}") print(f" Date : {photo.date_captured or '-'}") print(f" Path : {photo.path}") # ── Menu actions ────────────────────────────────────────────────────────────── def act_list(photos): print("\n Filter: [1] All [2] SAFE [3] PARTIAL [4] UNSAFE") ch = input(" Choice: ").strip() filt = {"2": "SAFE", "3": "PARTIAL", "4": "UNSAFE"}.get(ch) sub = [p for p in photos if filt is None or p.status == filt] if sub: print_table(sub) print(f"\n Showing {len(sub)} photo(s).") else: print(" None.") def act_view(photos): p = pick_photo(photos, "View photo") if not p: return show_details(p) img = cv2.imread(str(p.path)) if img is not None: cv2.imshow(f"Saqr - {p.filename}", img) print(" Press any key to close.") cv2.waitKey(0) cv2.destroyAllWindows() def act_move(photos): p = pick_photo(photos, "Move photo") if not p: return photos show_details(p) print(f"\n Move to: [1] SAFE [2] PARTIAL [3] UNSAFE") t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip()) if not t or t == p.status: return photos move_photo(p, t) print(f" Moved -> {_cs(t)}") return load_photos() def act_rename(photos): p = pick_photo(photos, "Rename photo") if not p: return photos name = input(" New filename: ").strip() if name: rename_photo(p, name) return load_photos() def act_assign_id(photos): p = pick_photo(photos, "Assign ID to photo") if not p: return photos pid = input(" Person ID (e.g. W001): ").strip() if pid: assign_id(p, pid) return load_photos() def act_delete(photos): p = pick_photo(photos, "Delete photo") if not p: return photos show_details(p) if input(f" Delete '{p.filename}'? (yes/no): ").strip().lower() in ("y", "yes"): delete_photo(p) print(" Deleted.") return load_photos() def act_download(photos): p = pick_photo(photos, "Download/copy photo") if not p: return dest = input(" Destination folder: ").strip() if dest: dst = copy_photo(p, Path(dest).expanduser()) print(f" Copied -> {dst}") def act_export(photos): default = ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" out = input(f" Output [{default.name}]: ").strip() output = Path(out).expanduser() if out else default export_csv(photos, output) print(f" Exported -> {output}") def act_update(photos): print("\n Re-classify: when PPE compliance changes (UNSAFE -> SAFE etc.)") p = pick_photo(photos, "Update photo status") if not p: return photos show_details(p) print(f"\n New status: [1] SAFE [2] PARTIAL [3] UNSAFE") t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip()) if not t or t == p.status: return photos move_photo(p, t) print(f" Updated -> {_cs(t)}") return load_photos() # ── Main menu ───────────────────────────────────────────────────────────────── MENU = """ [1] List photos [2] View photo [3] Move photo (change status) [4] Rename photo [5] Assign person ID [6] Delete photo [7] Download / Copy photo [8] Export report to CSV [9] Update status (re-classify) [0] Exit """ def run(): parser = argparse.ArgumentParser(description="Saqr Photo Manager") parser.add_argument("--export", action="store_true", help="Quick CSV export") args = parser.parse_args() if not CAPTURES_DIR.exists(): print(f"[ERROR] captures/ not found. Run saqr.py first.") raise SystemExit(1) photos = load_photos() if args.export: out = ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" export_csv(photos, out) print(f"Exported: {out}") return log.info("Manager started") actions = { "1": act_list, "2": act_view, "3": act_move, "4": act_rename, "5": act_assign_id, "6": act_delete, "7": act_download, "8": act_export, "9": act_update, } while True: photos = load_photos() print_header(photos) print(MENU) ch = input(" Choice: ").strip() if ch == "0": log.info("Manager ended") print(" Bye.\n") break action = actions.get(ch) if action: result = action(photos) if isinstance(result, list): photos = result else: print(" Unknown option.") input("\n Press Enter to continue...") if __name__ == "__main__": run()