417 lines
12 KiB
Python
417 lines
12 KiB
Python
"""Interactive CLI to manage captured PPE photos + CSV export."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import shutil
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
|
|
from saqr.core.paths import CAPTURES_DIR, PROJECT_ROOT
|
|
from saqr.utils.logger import get_logger
|
|
|
|
log = get_logger("Manager", "manager")
|
|
|
|
STATUSES = ("SAFE", "PARTIAL", "UNSAFE")
|
|
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
|
|
|
|
CLASS_COLUMNS = [
|
|
"boots", "gloves", "goggles", "helmet",
|
|
"no-boots", "no-gloves", "no-goggles", "no-helmet", "no-vest", "vest",
|
|
]
|
|
|
|
_C = {"SAFE": "\033[92m", "PARTIAL": "\033[93m", "UNSAFE": "\033[91m",
|
|
"BOLD": "\033[1m", "RESET": "\033[0m"}
|
|
|
|
|
|
def _cs(s): return f"{_C.get(s, '')}{s}{_C['RESET']}"
|
|
def _bold(s): return f"{_C['BOLD']}{s}{_C['RESET']}"
|
|
|
|
|
|
@dataclass
|
|
class Photo:
|
|
path: Path
|
|
status: str
|
|
filename: str
|
|
person_id: str = ""
|
|
class_name: str = "unknown"
|
|
date_captured: str = ""
|
|
|
|
@property
|
|
def class_flags(self):
|
|
flags = {c: 0 for c in CLASS_COLUMNS}
|
|
stem = self.filename.lower()
|
|
for c in CLASS_COLUMNS:
|
|
if c in stem:
|
|
flags[c] = 1
|
|
return flags
|
|
|
|
|
|
@dataclass
|
|
class EventRow:
|
|
timestamp: str
|
|
track_id: str
|
|
event_type: str
|
|
status: str
|
|
wearing: str
|
|
missing: str
|
|
unknown: str
|
|
photo: str
|
|
path: str
|
|
|
|
@property
|
|
def class_flags(self):
|
|
worn = {c.strip() for c in self.wearing.split(",") if c.strip()}
|
|
return {c: (1 if c in worn else 0) for c in CLASS_COLUMNS}
|
|
|
|
@property
|
|
def missing_notes(self):
|
|
items = [c.strip() for c in self.missing.split(",") if c.strip()]
|
|
return "Missing " + ", ".join(items) if items else "Compliant"
|
|
|
|
|
|
def parse_photo(path: Path, status: str) -> Photo:
|
|
stem = path.stem
|
|
parts = stem.split("_")
|
|
person_id = ""
|
|
date_captured = ""
|
|
class_name = "unknown"
|
|
|
|
if stem.startswith("track_") and len(parts) >= 2 and parts[1].isdigit():
|
|
person_id = f"track_{parts[1]}"
|
|
elif len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit():
|
|
try:
|
|
date_captured = datetime.strptime(
|
|
f"{parts[0]}_{parts[1]}", "%Y%m%d_%H%M%S"
|
|
).strftime("%Y-%m-%d %H:%M:%S")
|
|
except ValueError:
|
|
pass
|
|
if len(parts) > 3:
|
|
class_name = "_".join(parts[3:])
|
|
|
|
return Photo(
|
|
path=path, status=status, filename=path.name,
|
|
person_id=person_id, class_name=class_name, date_captured=date_captured,
|
|
)
|
|
|
|
|
|
def load_photos():
|
|
photos = []
|
|
for status in STATUSES:
|
|
folder = CAPTURES_DIR / status
|
|
if not folder.exists():
|
|
continue
|
|
for f in sorted(folder.iterdir()):
|
|
if f.suffix.lower() in IMG_EXTS:
|
|
photos.append(parse_photo(f, status))
|
|
return photos
|
|
|
|
|
|
def load_events_csv(path: Path):
|
|
if not path.exists():
|
|
return []
|
|
rows = []
|
|
with open(path, "r", newline="", encoding="utf-8") as f:
|
|
for row in csv.DictReader(f):
|
|
rows.append(EventRow(
|
|
timestamp=row.get("timestamp", ""),
|
|
track_id=row.get("track_id", ""),
|
|
event_type=row.get("event_type", ""),
|
|
status=row.get("status", ""),
|
|
wearing=row.get("wearing", ""),
|
|
missing=row.get("missing", ""),
|
|
unknown=row.get("unknown", ""),
|
|
photo=row.get("photo", ""),
|
|
path=row.get("path", ""),
|
|
))
|
|
return rows
|
|
|
|
|
|
def move_photo(photo: Photo, new_status: str) -> Photo:
|
|
dst_dir = CAPTURES_DIR / new_status
|
|
dst_dir.mkdir(parents=True, exist_ok=True)
|
|
dst = dst_dir / photo.filename
|
|
shutil.move(str(photo.path), str(dst))
|
|
log.info(f"Moved '{photo.filename}': {photo.status} -> {new_status}")
|
|
return Photo(path=dst, status=new_status, filename=photo.filename,
|
|
person_id=photo.person_id, class_name=photo.class_name,
|
|
date_captured=photo.date_captured)
|
|
|
|
|
|
def rename_photo(photo: Photo, new_name: str) -> Photo:
|
|
if Path(new_name).suffix.lower() not in IMG_EXTS:
|
|
new_name += photo.path.suffix
|
|
dst = photo.path.parent / new_name
|
|
photo.path.rename(dst)
|
|
log.info(f"Renamed: '{photo.filename}' -> '{new_name}'")
|
|
return Photo(path=dst, status=photo.status, filename=new_name,
|
|
person_id=photo.person_id, class_name=photo.class_name,
|
|
date_captured=photo.date_captured)
|
|
|
|
|
|
def assign_id(photo: Photo, pid: str) -> Photo:
|
|
pid = pid.strip().replace(" ", "_")
|
|
dt = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
cls = photo.class_name if photo.class_name != "unknown" else "ppe"
|
|
return rename_photo(photo, f"{pid}_{dt}_{cls}{photo.path.suffix}")
|
|
|
|
|
|
def delete_photo(photo: Photo) -> None:
|
|
photo.path.unlink()
|
|
log.info(f"Deleted: '{photo.filename}' ({photo.status})")
|
|
|
|
|
|
def copy_photo(photo: Photo, dest: Path) -> Path:
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
dst = dest / photo.filename
|
|
shutil.copy2(str(photo.path), str(dst))
|
|
log.info(f"Copied '{photo.filename}' -> {dst}")
|
|
return dst
|
|
|
|
|
|
def export_csv(photos, output: Path) -> None:
|
|
event_rows = load_events_csv(CAPTURES_DIR / "events.csv")
|
|
|
|
fields = ["photo", "track_id", "event_type", "status", "timestamp",
|
|
"wearing", "missing", "unknown", "missing_notes",
|
|
*CLASS_COLUMNS, "path"]
|
|
|
|
with open(output, "w", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=fields)
|
|
w.writeheader()
|
|
|
|
if event_rows:
|
|
for r in event_rows:
|
|
w.writerow({
|
|
"photo": r.photo, "track_id": r.track_id,
|
|
"event_type": r.event_type, "status": r.status,
|
|
"timestamp": r.timestamp, "wearing": r.wearing,
|
|
"missing": r.missing, "unknown": r.unknown,
|
|
"missing_notes": r.missing_notes,
|
|
**r.class_flags, "path": r.path,
|
|
})
|
|
else:
|
|
for p in photos:
|
|
w.writerow({
|
|
"photo": p.filename, "track_id": p.person_id,
|
|
"event_type": "", "status": p.status,
|
|
"timestamp": p.date_captured, "wearing": "",
|
|
"missing": "", "unknown": "", "missing_notes": "",
|
|
**p.class_flags, "path": str(p.path),
|
|
})
|
|
|
|
count = len(event_rows) if event_rows else len(photos)
|
|
log.info(f"CSV exported: {output} ({count} records)")
|
|
|
|
|
|
def print_header(photos):
|
|
counts = {s: sum(1 for p in photos if p.status == s) for s in STATUSES}
|
|
print("\n" + "=" * 66)
|
|
print(_bold(" Saqr - PPE Photo Manager"))
|
|
print("=" * 66)
|
|
print(f" {_cs('SAFE')} {counts['SAFE']:3d} | "
|
|
f"{_cs('PARTIAL')} {counts['PARTIAL']:3d} | "
|
|
f"{_cs('UNSAFE')} {counts['UNSAFE']:3d} | Total: {len(photos)}")
|
|
print("=" * 66)
|
|
|
|
|
|
def print_table(photos):
|
|
print(f"\n {'#':>4} {'STATUS':<8} {'ID':<14} {'DATE':<19} FILENAME")
|
|
print(" " + "-" * 68)
|
|
for i, p in enumerate(photos):
|
|
pid = (p.person_id or "-")[:12]
|
|
date = (p.date_captured or "-")[:17]
|
|
print(f" {i+1:>4} {p.status:<8} {pid:<14} {date:<19} {p.filename[:28]}")
|
|
|
|
|
|
def pick_photo(photos, prompt="Select photo"):
|
|
if not photos:
|
|
print(" No photos found.")
|
|
return None
|
|
print_table(photos)
|
|
try:
|
|
n = int(input(f"\n {prompt} (0=cancel): "))
|
|
if 1 <= n <= len(photos):
|
|
return photos[n - 1]
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def show_details(photo):
|
|
print(f"\n Filename : {photo.filename}")
|
|
print(f" Status : {_cs(photo.status)}")
|
|
print(f" ID : {photo.person_id or '-'}")
|
|
print(f" Date : {photo.date_captured or '-'}")
|
|
print(f" Path : {photo.path}")
|
|
|
|
|
|
def act_list(photos):
|
|
print("\n Filter: [1] All [2] SAFE [3] PARTIAL [4] UNSAFE")
|
|
ch = input(" Choice: ").strip()
|
|
filt = {"2": "SAFE", "3": "PARTIAL", "4": "UNSAFE"}.get(ch)
|
|
sub = [p for p in photos if filt is None or p.status == filt]
|
|
if sub:
|
|
print_table(sub)
|
|
print(f"\n Showing {len(sub)} photo(s).")
|
|
else:
|
|
print(" None.")
|
|
|
|
|
|
def act_view(photos):
|
|
p = pick_photo(photos, "View photo")
|
|
if not p:
|
|
return
|
|
show_details(p)
|
|
img = cv2.imread(str(p.path))
|
|
if img is not None:
|
|
cv2.imshow(f"Saqr - {p.filename}", img)
|
|
print(" Press any key to close.")
|
|
cv2.waitKey(0)
|
|
cv2.destroyAllWindows()
|
|
|
|
|
|
def act_move(photos):
|
|
p = pick_photo(photos, "Move photo")
|
|
if not p:
|
|
return photos
|
|
show_details(p)
|
|
print("\n Move to: [1] SAFE [2] PARTIAL [3] UNSAFE")
|
|
t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip())
|
|
if not t or t == p.status:
|
|
return photos
|
|
move_photo(p, t)
|
|
print(f" Moved -> {_cs(t)}")
|
|
return load_photos()
|
|
|
|
|
|
def act_rename(photos):
|
|
p = pick_photo(photos, "Rename photo")
|
|
if not p:
|
|
return photos
|
|
name = input(" New filename: ").strip()
|
|
if name:
|
|
rename_photo(p, name)
|
|
return load_photos()
|
|
|
|
|
|
def act_assign_id(photos):
|
|
p = pick_photo(photos, "Assign ID to photo")
|
|
if not p:
|
|
return photos
|
|
pid = input(" Person ID (e.g. W001): ").strip()
|
|
if pid:
|
|
assign_id(p, pid)
|
|
return load_photos()
|
|
|
|
|
|
def act_delete(photos):
|
|
p = pick_photo(photos, "Delete photo")
|
|
if not p:
|
|
return photos
|
|
show_details(p)
|
|
if input(f" Delete '{p.filename}'? (yes/no): ").strip().lower() in ("y", "yes"):
|
|
delete_photo(p)
|
|
print(" Deleted.")
|
|
return load_photos()
|
|
|
|
|
|
def act_download(photos):
|
|
p = pick_photo(photos, "Download/copy photo")
|
|
if not p:
|
|
return
|
|
dest = input(" Destination folder: ").strip()
|
|
if dest:
|
|
dst = copy_photo(p, Path(dest).expanduser())
|
|
print(f" Copied -> {dst}")
|
|
|
|
|
|
def act_export(photos):
|
|
default = PROJECT_ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
out = input(f" Output [{default.name}]: ").strip()
|
|
output = Path(out).expanduser() if out else default
|
|
export_csv(photos, output)
|
|
print(f" Exported -> {output}")
|
|
|
|
|
|
def act_update(photos):
|
|
print("\n Re-classify: when PPE compliance changes (UNSAFE -> SAFE etc.)")
|
|
p = pick_photo(photos, "Update photo status")
|
|
if not p:
|
|
return photos
|
|
show_details(p)
|
|
print("\n New status: [1] SAFE [2] PARTIAL [3] UNSAFE")
|
|
t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip())
|
|
if not t or t == p.status:
|
|
return photos
|
|
move_photo(p, t)
|
|
print(f" Updated -> {_cs(t)}")
|
|
return load_photos()
|
|
|
|
|
|
MENU = """
|
|
[1] List photos
|
|
[2] View photo
|
|
[3] Move photo (change status)
|
|
[4] Rename photo
|
|
[5] Assign person ID
|
|
[6] Delete photo
|
|
[7] Download / Copy photo
|
|
[8] Export report to CSV
|
|
[9] Update status (re-classify)
|
|
[0] Exit
|
|
"""
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Saqr Photo Manager")
|
|
parser.add_argument("--export", action="store_true", help="Quick CSV export")
|
|
args = parser.parse_args()
|
|
|
|
if not CAPTURES_DIR.exists():
|
|
print("[ERROR] runtime/captures/ not found. Run saqr first.")
|
|
raise SystemExit(1)
|
|
|
|
photos = load_photos()
|
|
|
|
if args.export:
|
|
out = PROJECT_ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
export_csv(photos, out)
|
|
print(f"Exported: {out}")
|
|
return
|
|
|
|
log.info("Manager started")
|
|
actions = {
|
|
"1": act_list, "2": act_view, "3": act_move, "4": act_rename,
|
|
"5": act_assign_id, "6": act_delete, "7": act_download,
|
|
"8": act_export, "9": act_update,
|
|
}
|
|
|
|
while True:
|
|
photos = load_photos()
|
|
print_header(photos)
|
|
print(MENU)
|
|
ch = input(" Choice: ").strip()
|
|
|
|
if ch == "0":
|
|
log.info("Manager ended")
|
|
print(" Bye.\n")
|
|
break
|
|
|
|
action = actions.get(ch)
|
|
if action:
|
|
result = action(photos)
|
|
if isinstance(result, list):
|
|
photos = result
|
|
else:
|
|
print(" Unknown option.")
|
|
|
|
input("\n Press Enter to continue...")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|