AI_Photographer/Core/people_registry.py
2026-04-12 18:52:37 +04:00

457 lines
16 KiB
Python

from __future__ import annotations
import json
import math
import shutil
import time
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import cv2
import numpy as np
from Core import settings as config
PERSON_JSON = "person.json"
FACE_REF_NAME = "face_ref.jpg"
SCENE_REF_NAME = "scene_ref.jpg"
CAPTURE_PREFIX = "capture_"
SUPPORTED_IMAGE_EXTS = (".jpg", ".jpeg", ".png", ".webp")
def people_dir() -> Path:
path = Path(config.PEOPLE_DIR).resolve()
path.mkdir(parents=True, exist_ok=True)
return path
def _now_ts() -> float:
return time.time()
def _fmt_date(ts: float) -> str:
return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d")
def _person_dirs() -> list[Path]:
root = people_dir()
return [p for p in sorted(root.iterdir()) if p.is_dir()]
def _person_json_path(person_dir: Path) -> Path:
return person_dir / PERSON_JSON
def _read_person(person_dir: Path) -> Optional[Dict[str, Any]]:
try:
raw = json.loads(_person_json_path(person_dir).read_text(encoding="utf-8"))
if not isinstance(raw, dict):
return None
return raw
except Exception:
return None
def _write_person(person_dir: Path, data: Dict[str, Any]) -> None:
person_dir.mkdir(parents=True, exist_ok=True)
_person_json_path(person_dir).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def _list_existing_sequences() -> list[int]:
out: list[int] = []
for person_dir in _person_dirs():
meta = _read_person(person_dir)
if not meta:
continue
try:
out.append(int(meta.get("sequence", 0)))
except Exception:
continue
return out
def _next_sequence() -> int:
seqs = _list_existing_sequences()
return (max(seqs) if seqs else 0) + 1
def _make_person_id(sequence: int, ts: float) -> str:
return f"guest_{int(sequence):04d}_{datetime.fromtimestamp(ts).strftime('%Y%m%d_%H%M%S')}"
def _display_label(meta: Dict[str, Any]) -> str:
short_id = str(meta.get("short_id") or meta.get("person_id") or "guest").replace("_", " ")
created_date = str(meta.get("created_date") or "")
return f"{short_id} from {created_date}".strip()
def _clip_box(box: Dict[str, Any], frame_shape: tuple[int, ...], pad_ratio: float = 0.18) -> Optional[tuple[int, int, int, int]]:
if not isinstance(box, dict):
return None
try:
h, w = frame_shape[:2]
x = float(box.get("x", 0.0))
y = float(box.get("y", 0.0))
bw = max(1.0, float(box.get("w", 0.0)))
bh = max(1.0, float(box.get("h", 0.0)))
pad_w = bw * float(pad_ratio)
pad_h = bh * float(pad_ratio)
x1 = max(0, int(math.floor(x - pad_w)))
y1 = max(0, int(math.floor(y - pad_h)))
x2 = min(w, int(math.ceil(x + bw + pad_w)))
y2 = min(h, int(math.ceil(y + bh + pad_h)))
if x2 <= x1 or y2 <= y1:
return None
return x1, y1, x2, y2
except Exception:
return None
def detect_face_box(image_bgr: np.ndarray) -> Optional[Dict[str, int]]:
if image_bgr is None or getattr(image_bgr, "size", 0) == 0:
return None
try:
cascade = cv2.CascadeClassifier(str(Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml"))
if cascade.empty():
return None
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
faces = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(48, 48))
if len(faces) == 0:
return None
x, y, w, h = max(faces, key=lambda item: int(item[2]) * int(item[3]))
return {"x": int(x), "y": int(y), "w": int(w), "h": int(h)}
except Exception:
return None
def _extract_face_crop(frame_bgr: np.ndarray, face_box: Dict[str, Any]) -> Optional[np.ndarray]:
clipped = _clip_box(face_box, frame_bgr.shape, pad_ratio=0.18)
if clipped is None:
return None
x1, y1, x2, y2 = clipped
crop = frame_bgr[y1:y2, x1:x2]
if crop is None or crop.size == 0:
return None
return crop.copy()
def _extract_scene_crop(frame_bgr: np.ndarray, subject_box: Optional[Dict[str, Any]]) -> np.ndarray:
if frame_bgr is None or getattr(frame_bgr, "size", 0) == 0:
raise ValueError("empty frame")
if not subject_box:
return frame_bgr.copy()
clipped = _clip_box(subject_box, frame_bgr.shape, pad_ratio=0.28)
if clipped is None:
return frame_bgr.copy()
x1, y1, x2, y2 = clipped
crop = frame_bgr[y1:y2, x1:x2]
if crop is None or crop.size == 0:
return frame_bgr.copy()
return crop.copy()
def _compute_embedding(face_bgr: np.ndarray) -> np.ndarray:
gray = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2GRAY)
gray = cv2.resize(gray, (48, 48), interpolation=cv2.INTER_AREA)
gray = cv2.equalizeHist(gray)
vec = gray.astype(np.float32).reshape(-1)
vec -= float(vec.mean())
norm = float(np.linalg.norm(vec))
if norm <= 1e-6:
return vec
return vec / norm
def _embedding_similarity(a: np.ndarray, b: np.ndarray) -> float:
if a.size == 0 or b.size == 0:
return 0.0
denom = float(np.linalg.norm(a) * np.linalg.norm(b))
if denom <= 1e-6:
return 0.0
return float(np.dot(a, b) / denom)
def _load_embedding(meta: Dict[str, Any]) -> np.ndarray:
raw = meta.get("embedding") or []
try:
arr = np.asarray(raw, dtype=np.float32)
except Exception:
arr = np.asarray([], dtype=np.float32)
return arr
def _save_image(path: Path, image_bgr: np.ndarray) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
cv2.imwrite(str(path), image_bgr)
def _append_sample(person_dir: Path, meta: Dict[str, Any], face_bgr: np.ndarray, scene_bgr: np.ndarray, embedding: np.ndarray, source: str) -> Dict[str, Any]:
now = _now_ts()
sample_count = int(meta.get("sample_count", 0)) + 1
prev_embedding = _load_embedding(meta)
if prev_embedding.size and prev_embedding.shape == embedding.shape:
new_embedding = ((prev_embedding * float(sample_count - 1)) + embedding) / float(sample_count)
norm = float(np.linalg.norm(new_embedding))
if norm > 1e-6:
new_embedding = new_embedding / norm
else:
new_embedding = embedding
face_name = f"face_{sample_count:03d}.jpg"
scene_name = f"scene_{sample_count:03d}.jpg"
_save_image(person_dir / face_name, face_bgr)
_save_image(person_dir / scene_name, scene_bgr)
meta["sample_count"] = sample_count
meta["times_seen"] = int(meta.get("times_seen", 0)) + 1
meta["last_seen_at"] = now
meta["embedding"] = new_embedding.astype(float).tolist()
meta["last_source"] = str(source or "vision")
meta["face_images"] = sorted(
[p.name for p in person_dir.iterdir() if p.is_file() and p.name.startswith("face_") and p.suffix.lower() in SUPPORTED_IMAGE_EXTS]
)
meta["scene_images"] = sorted(
[p.name for p in person_dir.iterdir() if p.is_file() and p.name.startswith("scene_") and p.suffix.lower() in SUPPORTED_IMAGE_EXTS]
)
if not (person_dir / FACE_REF_NAME).exists():
_save_image(person_dir / FACE_REF_NAME, face_bgr)
if not (person_dir / SCENE_REF_NAME).exists():
_save_image(person_dir / SCENE_REF_NAME, scene_bgr)
_write_person(person_dir, meta)
return meta
def _create_person(face_bgr: np.ndarray, scene_bgr: np.ndarray, embedding: np.ndarray, source: str) -> Dict[str, Any]:
now = _now_ts()
seq = _next_sequence()
person_id = _make_person_id(seq, now)
short_id = f"guest_{seq:04d}"
person_dir = people_dir() / person_id
_save_image(person_dir / FACE_REF_NAME, face_bgr)
_save_image(person_dir / SCENE_REF_NAME, scene_bgr)
meta = {
"person_id": person_id,
"short_id": short_id,
"display_name": f"{short_id} from {_fmt_date(now)}",
"sequence": seq,
"created_at": now,
"created_date": _fmt_date(now),
"last_seen_at": now,
"sample_count": 0,
"times_seen": 0,
"source": str(source or "vision"),
"last_source": str(source or "vision"),
"embedding": embedding.astype(float).tolist(),
"face_images": [],
"scene_images": [],
"captured_photos": [],
}
meta = _append_sample(person_dir, meta, face_bgr, scene_bgr, embedding, source=source)
meta["new_person"] = True
meta["known_person"] = False
meta["person_dir"] = str(person_dir)
meta["display_label"] = _display_label(meta)
return meta
def recognize_or_enroll(frame_bgr: np.ndarray, face_box: Optional[Dict[str, Any]], subject_box: Optional[Dict[str, Any]] = None, threshold: Optional[float] = None, source: str = "vision") -> Dict[str, Any]:
if frame_bgr is None or getattr(frame_bgr, "size", 0) == 0:
return {"ok": False, "error": "empty frame"}
if face_box is None:
return {"ok": False, "error": "no face"}
try:
face_bgr = _extract_face_crop(frame_bgr, face_box)
if face_bgr is None:
return {"ok": False, "error": "invalid face crop"}
scene_bgr = _extract_scene_crop(frame_bgr, subject_box)
embedding = _compute_embedding(face_bgr)
if embedding.size == 0:
return {"ok": False, "error": "invalid embedding"}
match_threshold = float(threshold if threshold is not None else config.read_vision_face_recognition_threshold())
best_meta = None
best_dir = None
best_score = -1.0
for person_dir in _person_dirs():
meta = _read_person(person_dir)
if not meta:
continue
stored = _load_embedding(meta)
if stored.size == 0 or stored.shape != embedding.shape:
continue
score = _embedding_similarity(embedding, stored)
if score > best_score:
best_score = score
best_meta = meta
best_dir = person_dir
if best_meta is not None and best_dir is not None and best_score >= match_threshold:
best_meta = _append_sample(best_dir, best_meta, face_bgr, scene_bgr, embedding, source=source)
best_meta["ok"] = True
best_meta["known_person"] = True
best_meta["new_person"] = False
best_meta["match_score"] = float(best_score)
best_meta["display_label"] = _display_label(best_meta)
return best_meta
created = _create_person(face_bgr, scene_bgr, embedding, source=source)
created["ok"] = True
created["match_score"] = float(best_score if best_score >= 0.0 else 0.0)
return created
except Exception as e:
return {"ok": False, "error": str(e)}
def _load_image_from_bytes(data: bytes) -> Optional[np.ndarray]:
if not data:
return None
arr = np.frombuffer(data, dtype=np.uint8)
image = cv2.imdecode(arr, cv2.IMREAD_COLOR)
return image
def import_person_photo(data: bytes, filename: str = "", person_id: str = "") -> Dict[str, Any]:
image = _load_image_from_bytes(data)
if image is None:
return {"ok": False, "error": "invalid image"}
face_box = detect_face_box(image)
if face_box is None:
return {"ok": False, "error": "no face detected in uploaded image"}
person_id = str(person_id or "").strip()
if person_id:
person_dir = people_dir() / person_id
meta = _read_person(person_dir)
if not meta:
return {"ok": False, "error": "person not found"}
face_bgr = _extract_face_crop(image, face_box)
if face_bgr is None:
return {"ok": False, "error": "invalid face crop"}
scene_bgr = image.copy()
embedding = _compute_embedding(face_bgr)
meta = _append_sample(person_dir, meta, face_bgr, scene_bgr, embedding, source="upload")
meta["ok"] = True
meta["known_person"] = True
meta["new_person"] = False
meta["display_label"] = _display_label(meta)
return meta
return recognize_or_enroll(image, face_box, None, source="upload")
def attach_captured_photo(person_id: str, photo_path: str | Path) -> bool:
person_id = str(person_id or "").strip()
if not person_id:
return False
person_dir = people_dir() / person_id
meta = _read_person(person_dir)
if not meta:
return False
src = Path(photo_path).expanduser()
if not src.exists() or not src.is_file():
return False
dst = person_dir / f"{CAPTURE_PREFIX}{src.name}"
try:
shutil.copy2(src, dst)
photos = meta.get("captured_photos") or []
if not isinstance(photos, list):
photos = []
if dst.name not in photos:
photos.append(dst.name)
meta["captured_photos"] = sorted(photos)
meta["last_seen_at"] = _now_ts()
_write_person(person_dir, meta)
return True
except Exception:
return False
def list_people() -> list[Dict[str, Any]]:
items: list[Dict[str, Any]] = []
for person_dir in _person_dirs():
meta = _read_person(person_dir)
if not meta:
continue
face_thumb = person_dir / FACE_REF_NAME
scene_thumb = person_dir / SCENE_REF_NAME
items.append(
{
"person_id": str(meta.get("person_id") or person_dir.name),
"short_id": str(meta.get("short_id") or ""),
"display_name": str(meta.get("display_name") or _display_label(meta)),
"created_at": float(meta.get("created_at", 0.0) or 0.0),
"created_date": str(meta.get("created_date") or ""),
"last_seen_at": float(meta.get("last_seen_at", 0.0) or 0.0),
"sample_count": int(meta.get("sample_count", 0) or 0),
"times_seen": int(meta.get("times_seen", 0) or 0),
"source": str(meta.get("source") or meta.get("last_source") or ""),
"last_source": str(meta.get("last_source") or ""),
"face_thumb": face_thumb.name if face_thumb.exists() else "",
"scene_thumb": scene_thumb.name if scene_thumb.exists() else "",
"captured_photos": list(meta.get("captured_photos") or []),
"known": True,
}
)
items.sort(key=lambda item: float(item.get("last_seen_at", 0.0) or 0.0), reverse=True)
return items
def get_person(person_id: str) -> Optional[Dict[str, Any]]:
person_dir = people_dir() / str(person_id or "").strip()
meta = _read_person(person_dir)
if not meta:
return None
meta["person_dir"] = str(person_dir)
meta["display_label"] = _display_label(meta)
return meta
def read_person_image(person_id: str, kind: str = "face") -> Optional[tuple[bytes, str, str]]:
person_id = str(person_id or "").strip()
if not person_id:
return None
person_dir = people_dir() / person_id
if kind == "scene":
target = person_dir / SCENE_REF_NAME
else:
target = person_dir / FACE_REF_NAME
if not target.exists():
return None
data = target.read_bytes()
return data, "image/jpeg", target.name
def delete_person(person_id: str) -> bool:
person_dir = people_dir() / str(person_id or "").strip()
if not person_dir.exists() or not person_dir.is_dir():
return False
shutil.rmtree(person_dir)
return True
def reset_people() -> int:
count = 0
for person_dir in _person_dirs():
shutil.rmtree(person_dir)
count += 1
return count
def export_person_zip(person_id: str, out_path: Path) -> Optional[Path]:
person_id = str(person_id or "").strip()
if not person_id:
return None
person_dir = people_dir() / person_id
if not person_dir.exists():
return None
out_path.parent.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(out_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for child in sorted(person_dir.rglob("*")):
if not child.is_file():
continue
zf.write(child, arcname=str(child.relative_to(person_dir.parent)))
return out_path