Sanadv3/vision/face_gallery.py

364 lines
14 KiB
Python

"""Face gallery — pure file IO over data/faces/face_{id}/.
Layout per face:
face_{id}/
face_1.jpg ← samples (≥1 required)
face_2.jpg
face_3.png
meta.json ← optional: {"name": "...", "description": "...", "added_at": "..."}
`description` is free text the operator writes about the person ("lead
engineer, likes coffee") — it's folded into the Gemini primer turn so
Gemini can reference it when it recognises that face.
No ML — Gemini does the recognition in-context using the samples we feed it
via the primer turn. This module's only jobs are:
- enumerate enrolled faces
- serve & accept JPEG/PNG bytes per face
- rename / describe / delete / zip / load-for-primer
Thread-safe via a single internal RLock.
"""
from __future__ import annotations
import io
import json
import re
import threading
import zipfile
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Iterable
from Project.Sanad.core.logger import get_logger
log = get_logger("face_gallery")
_DIR_RE = re.compile(r"^face_(\d+)$")
ALLOWED_EXTS = {".jpg", ".jpeg", ".png"}
SAMPLE_NAME_RE = re.compile(r"^face_(\d+)\.(jpg|jpeg|png)$", re.IGNORECASE)
@dataclass
class PhotoInfo:
name: str
size_bytes: int
path: Path
@dataclass
class FaceEntry:
id: int
name: str | None
added_at: str | None
dir: Path
description: str | None = None
sample_paths: list[Path] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"added_at": self.added_at,
"dir": str(self.dir),
"photos": [
{"name": p.name, "size_bytes": p.stat().st_size}
for p in self.sample_paths
if p.exists()
],
}
class FaceGallery:
"""File-system backed gallery rooted at `root` (e.g. data/faces/)."""
def __init__(self, root: Path | str) -> None:
self.root = Path(root)
self._lock = threading.RLock()
# ── read ────────────────────────────────────────────────
def _ensure_root(self) -> None:
self.root.mkdir(parents=True, exist_ok=True)
def _iter_face_dirs(self) -> Iterable[tuple[int, Path]]:
if not self.root.exists():
return
for child in sorted(self.root.iterdir()):
if not child.is_dir():
continue
m = _DIR_RE.match(child.name)
if not m:
continue
yield int(m.group(1)), child
def _samples_in(self, face_dir: Path) -> list[Path]:
out: list[Path] = []
for p in sorted(face_dir.iterdir()):
if p.is_file() and p.suffix.lower() in ALLOWED_EXTS:
out.append(p)
return out
def _meta(self, face_dir: Path) -> tuple[str | None, str | None, str | None]:
"""Return (name, description, added_at) — any may be None."""
meta_path = face_dir / "meta.json"
if not meta_path.exists():
return None, None, None
try:
data = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return None, None, None
name = data.get("name")
description = data.get("description")
added = data.get("added_at")
return (name if name else None), (description if description else None), added
def list(self) -> list[FaceEntry]:
with self._lock:
entries: list[FaceEntry] = []
for face_id, face_dir in self._iter_face_dirs():
name, description, added = self._meta(face_dir)
entries.append(FaceEntry(
id=face_id,
name=name,
description=description,
added_at=added,
dir=face_dir,
sample_paths=self._samples_in(face_dir),
))
return entries
def get(self, face_id: int) -> FaceEntry | None:
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
return None
name, description, added = self._meta(face_dir)
return FaceEntry(
id=face_id, name=name, description=description, added_at=added,
dir=face_dir, sample_paths=self._samples_in(face_dir),
)
def get_photo(self, face_id: int, photo_name: str) -> Path | None:
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
return None
p = face_dir / photo_name
try:
p.resolve().relative_to(face_dir.resolve())
except ValueError:
return None
if not p.exists() or p.suffix.lower() not in ALLOWED_EXTS:
return None
return p
# ── write ───────────────────────────────────────────────
def next_id(self) -> int:
with self._lock:
max_id = 0
for face_id, _ in self._iter_face_dirs():
if face_id > max_id:
max_id = face_id
return max_id + 1
def _next_sample_name(self, face_dir: Path, ext: str) -> str:
"""Return next free face_N.<ext> filename inside face_dir."""
existing = self._samples_in(face_dir)
max_n = 0
for p in existing:
m = SAMPLE_NAME_RE.match(p.name)
if m:
n = int(m.group(1))
if n > max_n:
max_n = n
return f"face_{max_n + 1}{ext.lower()}"
@staticmethod
def _detect_ext(jpeg_or_png: bytes) -> str:
"""Sniff PNG vs JPEG from the magic bytes."""
if len(jpeg_or_png) >= 8 and jpeg_or_png[:8] == b"\x89PNG\r\n\x1a\n":
return ".png"
return ".jpg"
def _write_meta(self, face_dir: Path, name: str | None,
description: str | None = None,
added_at: str | None = None) -> None:
meta: dict[str, str] = {}
if name:
meta["name"] = name
if description:
meta["description"] = description
meta["added_at"] = added_at or datetime.now().isoformat(timespec="seconds")
(face_dir / "meta.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def create_face(self, image_bytes_list: list[bytes],
name: str | None = None,
description: str | None = None) -> FaceEntry:
"""Create a new face_{next_id}/ with one or more samples."""
if not image_bytes_list:
raise ValueError("create_face: empty image list")
with self._lock:
self._ensure_root()
face_id = self.next_id()
face_dir = self.root / f"face_{face_id}"
face_dir.mkdir(parents=True, exist_ok=False)
for idx, data in enumerate(image_bytes_list, start=1):
ext = self._detect_ext(data)
fname = f"face_{idx}{ext}"
(face_dir / fname).write_bytes(data)
clean_name = (name or "").strip() or None
clean_desc = (description or "").strip() or None
self._write_meta(face_dir, clean_name, clean_desc)
log.info("Created face_%d (samples=%d, name=%s, desc=%s)",
face_id, len(image_bytes_list), clean_name or "(unnamed)",
"yes" if clean_desc else "no")
return self.get(face_id) # type: ignore[return-value]
def add_photo(self, face_id: int, image_bytes: bytes) -> str:
"""Append a new sample to an existing face. Returns the filename."""
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
raise FileNotFoundError(f"face_{face_id} not found")
ext = self._detect_ext(image_bytes)
fname = self._next_sample_name(face_dir, ext)
(face_dir / fname).write_bytes(image_bytes)
log.info("Added sample %s to face_%d", fname, face_id)
return fname
def rename(self, face_id: int, name: str | None) -> None:
"""Update meta.json with a new name (or clear it if name is empty).
Preserves the existing description + added_at.
"""
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
raise FileNotFoundError(f"face_{face_id} not found")
_, description, added = self._meta(face_dir)
clean = (name or "").strip() or None
self._write_meta(face_dir, clean, description, added_at=added)
log.info("Renamed face_%d%s", face_id, clean or "(unnamed)")
def set_description(self, face_id: int, description: str | None) -> None:
"""Update meta.json with a free-text description (or clear it).
Preserves the existing name + added_at. The description is folded
into the Gemini primer turn so Gemini can reference it.
"""
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
raise FileNotFoundError(f"face_{face_id} not found")
name, _, added = self._meta(face_dir)
clean = (description or "").strip() or None
self._write_meta(face_dir, name, clean, added_at=added)
log.info("Set description for face_%d (%s)", face_id,
"cleared" if not clean else f"{len(clean)} chars")
def delete_photo(self, face_id: int, photo_name: str) -> None:
"""Delete one photo. Refuses if it's the only remaining sample."""
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
raise FileNotFoundError(f"face_{face_id} not found")
samples = self._samples_in(face_dir)
if len(samples) <= 1:
raise ValueError(
"Cannot delete the only photo — delete the face instead."
)
target = self.get_photo(face_id, photo_name)
if target is None:
raise FileNotFoundError(f"photo {photo_name} not found")
target.unlink()
log.info("Deleted %s from face_%d", photo_name, face_id)
def delete_face(self, face_id: int) -> None:
"""Delete the entire face_{id}/ folder (including meta.json)."""
import shutil
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
raise FileNotFoundError(f"face_{face_id} not found")
shutil.rmtree(face_dir)
log.info("Deleted face_%d", face_id)
def zip_face(self, face_id: int) -> bytes:
"""Return the entire face_{id}/ folder packaged as a ZIP."""
with self._lock:
face_dir = self.root / f"face_{face_id}"
if not face_dir.is_dir():
raise FileNotFoundError(f"face_{face_id} not found")
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in sorted(face_dir.iterdir()):
if p.is_file():
zf.write(p, arcname=f"face_{face_id}/{p.name}")
return buf.getvalue()
# ── primer support (used by gemini/script.py) ───────────
def load_for_primer(
self, max_samples_per_face: int = 3, resize_long_side: int = 256,
) -> list[tuple[FaceEntry, list[bytes]]]:
"""Return [(FaceEntry, [jpeg_bytes,…]), …] for Gemini upload.
Resizes each sample to longest-side <= resize_long_side, re-encodes
as JPEG (q=85) to keep the token cost manageable. Falls back to
the raw bytes if PIL isn't available.
"""
entries = self.list()
if not entries:
return []
out: list[tuple[FaceEntry, list[bytes]]] = []
for e in entries:
paths = e.sample_paths[:max_samples_per_face]
jpegs: list[bytes] = []
for p in paths:
try:
raw = p.read_bytes()
except OSError:
continue
processed = self._resize_for_primer(raw, resize_long_side)
jpegs.append(processed or raw)
if jpegs:
out.append((e, jpegs))
return out
@staticmethod
def _resize_for_primer(raw: bytes, long_side: int) -> bytes | None:
"""Resize image to longest-side ≤ long_side, re-encode JPEG q=85.
Returns None on any failure (caller falls back to raw bytes).
"""
try:
from PIL import Image # type: ignore
except Exception:
return None
try:
img = Image.open(io.BytesIO(raw))
img.load()
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
w, h = img.size
scale = long_side / max(w, h) if max(w, h) > long_side else 1.0
if scale < 1.0:
img = img.resize(
(max(1, int(w * scale)), max(1, int(h * scale))),
Image.LANCZOS,
)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=85, optimize=True)
return buf.getvalue()
except Exception:
return None