457 lines
16 KiB
Python
457 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
import shutil
|
|
import time
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from Core import settings as config
|
|
|
|
|
|
PERSON_JSON = "person.json"
|
|
FACE_REF_NAME = "face_ref.jpg"
|
|
SCENE_REF_NAME = "scene_ref.jpg"
|
|
CAPTURE_PREFIX = "capture_"
|
|
SUPPORTED_IMAGE_EXTS = (".jpg", ".jpeg", ".png", ".webp")
|
|
|
|
|
|
def people_dir() -> Path:
|
|
path = Path(config.PEOPLE_DIR).resolve()
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _now_ts() -> float:
|
|
return time.time()
|
|
|
|
|
|
def _fmt_date(ts: float) -> str:
|
|
return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d")
|
|
|
|
|
|
def _person_dirs() -> list[Path]:
|
|
root = people_dir()
|
|
return [p for p in sorted(root.iterdir()) if p.is_dir()]
|
|
|
|
|
|
def _person_json_path(person_dir: Path) -> Path:
|
|
return person_dir / PERSON_JSON
|
|
|
|
|
|
def _read_person(person_dir: Path) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
raw = json.loads(_person_json_path(person_dir).read_text(encoding="utf-8"))
|
|
if not isinstance(raw, dict):
|
|
return None
|
|
return raw
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _write_person(person_dir: Path, data: Dict[str, Any]) -> None:
|
|
person_dir.mkdir(parents=True, exist_ok=True)
|
|
_person_json_path(person_dir).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _list_existing_sequences() -> list[int]:
|
|
out: list[int] = []
|
|
for person_dir in _person_dirs():
|
|
meta = _read_person(person_dir)
|
|
if not meta:
|
|
continue
|
|
try:
|
|
out.append(int(meta.get("sequence", 0)))
|
|
except Exception:
|
|
continue
|
|
return out
|
|
|
|
|
|
def _next_sequence() -> int:
|
|
seqs = _list_existing_sequences()
|
|
return (max(seqs) if seqs else 0) + 1
|
|
|
|
|
|
def _make_person_id(sequence: int, ts: float) -> str:
|
|
return f"guest_{int(sequence):04d}_{datetime.fromtimestamp(ts).strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
|
|
def _display_label(meta: Dict[str, Any]) -> str:
|
|
short_id = str(meta.get("short_id") or meta.get("person_id") or "guest").replace("_", " ")
|
|
created_date = str(meta.get("created_date") or "")
|
|
return f"{short_id} from {created_date}".strip()
|
|
|
|
|
|
def _clip_box(box: Dict[str, Any], frame_shape: tuple[int, ...], pad_ratio: float = 0.18) -> Optional[tuple[int, int, int, int]]:
|
|
if not isinstance(box, dict):
|
|
return None
|
|
try:
|
|
h, w = frame_shape[:2]
|
|
x = float(box.get("x", 0.0))
|
|
y = float(box.get("y", 0.0))
|
|
bw = max(1.0, float(box.get("w", 0.0)))
|
|
bh = max(1.0, float(box.get("h", 0.0)))
|
|
pad_w = bw * float(pad_ratio)
|
|
pad_h = bh * float(pad_ratio)
|
|
x1 = max(0, int(math.floor(x - pad_w)))
|
|
y1 = max(0, int(math.floor(y - pad_h)))
|
|
x2 = min(w, int(math.ceil(x + bw + pad_w)))
|
|
y2 = min(h, int(math.ceil(y + bh + pad_h)))
|
|
if x2 <= x1 or y2 <= y1:
|
|
return None
|
|
return x1, y1, x2, y2
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def detect_face_box(image_bgr: np.ndarray) -> Optional[Dict[str, int]]:
|
|
if image_bgr is None or getattr(image_bgr, "size", 0) == 0:
|
|
return None
|
|
try:
|
|
cascade = cv2.CascadeClassifier(str(Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml"))
|
|
if cascade.empty():
|
|
return None
|
|
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
|
|
faces = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(48, 48))
|
|
if len(faces) == 0:
|
|
return None
|
|
x, y, w, h = max(faces, key=lambda item: int(item[2]) * int(item[3]))
|
|
return {"x": int(x), "y": int(y), "w": int(w), "h": int(h)}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _extract_face_crop(frame_bgr: np.ndarray, face_box: Dict[str, Any]) -> Optional[np.ndarray]:
|
|
clipped = _clip_box(face_box, frame_bgr.shape, pad_ratio=0.18)
|
|
if clipped is None:
|
|
return None
|
|
x1, y1, x2, y2 = clipped
|
|
crop = frame_bgr[y1:y2, x1:x2]
|
|
if crop is None or crop.size == 0:
|
|
return None
|
|
return crop.copy()
|
|
|
|
|
|
def _extract_scene_crop(frame_bgr: np.ndarray, subject_box: Optional[Dict[str, Any]]) -> np.ndarray:
|
|
if frame_bgr is None or getattr(frame_bgr, "size", 0) == 0:
|
|
raise ValueError("empty frame")
|
|
if not subject_box:
|
|
return frame_bgr.copy()
|
|
clipped = _clip_box(subject_box, frame_bgr.shape, pad_ratio=0.28)
|
|
if clipped is None:
|
|
return frame_bgr.copy()
|
|
x1, y1, x2, y2 = clipped
|
|
crop = frame_bgr[y1:y2, x1:x2]
|
|
if crop is None or crop.size == 0:
|
|
return frame_bgr.copy()
|
|
return crop.copy()
|
|
|
|
|
|
def _compute_embedding(face_bgr: np.ndarray) -> np.ndarray:
|
|
gray = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2GRAY)
|
|
gray = cv2.resize(gray, (48, 48), interpolation=cv2.INTER_AREA)
|
|
gray = cv2.equalizeHist(gray)
|
|
vec = gray.astype(np.float32).reshape(-1)
|
|
vec -= float(vec.mean())
|
|
norm = float(np.linalg.norm(vec))
|
|
if norm <= 1e-6:
|
|
return vec
|
|
return vec / norm
|
|
|
|
|
|
def _embedding_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
|
if a.size == 0 or b.size == 0:
|
|
return 0.0
|
|
denom = float(np.linalg.norm(a) * np.linalg.norm(b))
|
|
if denom <= 1e-6:
|
|
return 0.0
|
|
return float(np.dot(a, b) / denom)
|
|
|
|
|
|
def _load_embedding(meta: Dict[str, Any]) -> np.ndarray:
|
|
raw = meta.get("embedding") or []
|
|
try:
|
|
arr = np.asarray(raw, dtype=np.float32)
|
|
except Exception:
|
|
arr = np.asarray([], dtype=np.float32)
|
|
return arr
|
|
|
|
|
|
def _save_image(path: Path, image_bgr: np.ndarray) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
cv2.imwrite(str(path), image_bgr)
|
|
|
|
|
|
def _append_sample(person_dir: Path, meta: Dict[str, Any], face_bgr: np.ndarray, scene_bgr: np.ndarray, embedding: np.ndarray, source: str) -> Dict[str, Any]:
|
|
now = _now_ts()
|
|
sample_count = int(meta.get("sample_count", 0)) + 1
|
|
prev_embedding = _load_embedding(meta)
|
|
if prev_embedding.size and prev_embedding.shape == embedding.shape:
|
|
new_embedding = ((prev_embedding * float(sample_count - 1)) + embedding) / float(sample_count)
|
|
norm = float(np.linalg.norm(new_embedding))
|
|
if norm > 1e-6:
|
|
new_embedding = new_embedding / norm
|
|
else:
|
|
new_embedding = embedding
|
|
|
|
face_name = f"face_{sample_count:03d}.jpg"
|
|
scene_name = f"scene_{sample_count:03d}.jpg"
|
|
_save_image(person_dir / face_name, face_bgr)
|
|
_save_image(person_dir / scene_name, scene_bgr)
|
|
|
|
meta["sample_count"] = sample_count
|
|
meta["times_seen"] = int(meta.get("times_seen", 0)) + 1
|
|
meta["last_seen_at"] = now
|
|
meta["embedding"] = new_embedding.astype(float).tolist()
|
|
meta["last_source"] = str(source or "vision")
|
|
meta["face_images"] = sorted(
|
|
[p.name for p in person_dir.iterdir() if p.is_file() and p.name.startswith("face_") and p.suffix.lower() in SUPPORTED_IMAGE_EXTS]
|
|
)
|
|
meta["scene_images"] = sorted(
|
|
[p.name for p in person_dir.iterdir() if p.is_file() and p.name.startswith("scene_") and p.suffix.lower() in SUPPORTED_IMAGE_EXTS]
|
|
)
|
|
if not (person_dir / FACE_REF_NAME).exists():
|
|
_save_image(person_dir / FACE_REF_NAME, face_bgr)
|
|
if not (person_dir / SCENE_REF_NAME).exists():
|
|
_save_image(person_dir / SCENE_REF_NAME, scene_bgr)
|
|
_write_person(person_dir, meta)
|
|
return meta
|
|
|
|
|
|
def _create_person(face_bgr: np.ndarray, scene_bgr: np.ndarray, embedding: np.ndarray, source: str) -> Dict[str, Any]:
|
|
now = _now_ts()
|
|
seq = _next_sequence()
|
|
person_id = _make_person_id(seq, now)
|
|
short_id = f"guest_{seq:04d}"
|
|
person_dir = people_dir() / person_id
|
|
_save_image(person_dir / FACE_REF_NAME, face_bgr)
|
|
_save_image(person_dir / SCENE_REF_NAME, scene_bgr)
|
|
meta = {
|
|
"person_id": person_id,
|
|
"short_id": short_id,
|
|
"display_name": f"{short_id} from {_fmt_date(now)}",
|
|
"sequence": seq,
|
|
"created_at": now,
|
|
"created_date": _fmt_date(now),
|
|
"last_seen_at": now,
|
|
"sample_count": 0,
|
|
"times_seen": 0,
|
|
"source": str(source or "vision"),
|
|
"last_source": str(source or "vision"),
|
|
"embedding": embedding.astype(float).tolist(),
|
|
"face_images": [],
|
|
"scene_images": [],
|
|
"captured_photos": [],
|
|
}
|
|
meta = _append_sample(person_dir, meta, face_bgr, scene_bgr, embedding, source=source)
|
|
meta["new_person"] = True
|
|
meta["known_person"] = False
|
|
meta["person_dir"] = str(person_dir)
|
|
meta["display_label"] = _display_label(meta)
|
|
return meta
|
|
|
|
|
|
def recognize_or_enroll(frame_bgr: np.ndarray, face_box: Optional[Dict[str, Any]], subject_box: Optional[Dict[str, Any]] = None, threshold: Optional[float] = None, source: str = "vision") -> Dict[str, Any]:
|
|
if frame_bgr is None or getattr(frame_bgr, "size", 0) == 0:
|
|
return {"ok": False, "error": "empty frame"}
|
|
if face_box is None:
|
|
return {"ok": False, "error": "no face"}
|
|
|
|
try:
|
|
face_bgr = _extract_face_crop(frame_bgr, face_box)
|
|
if face_bgr is None:
|
|
return {"ok": False, "error": "invalid face crop"}
|
|
scene_bgr = _extract_scene_crop(frame_bgr, subject_box)
|
|
embedding = _compute_embedding(face_bgr)
|
|
if embedding.size == 0:
|
|
return {"ok": False, "error": "invalid embedding"}
|
|
|
|
match_threshold = float(threshold if threshold is not None else config.read_vision_face_recognition_threshold())
|
|
best_meta = None
|
|
best_dir = None
|
|
best_score = -1.0
|
|
|
|
for person_dir in _person_dirs():
|
|
meta = _read_person(person_dir)
|
|
if not meta:
|
|
continue
|
|
stored = _load_embedding(meta)
|
|
if stored.size == 0 or stored.shape != embedding.shape:
|
|
continue
|
|
score = _embedding_similarity(embedding, stored)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_meta = meta
|
|
best_dir = person_dir
|
|
|
|
if best_meta is not None and best_dir is not None and best_score >= match_threshold:
|
|
best_meta = _append_sample(best_dir, best_meta, face_bgr, scene_bgr, embedding, source=source)
|
|
best_meta["ok"] = True
|
|
best_meta["known_person"] = True
|
|
best_meta["new_person"] = False
|
|
best_meta["match_score"] = float(best_score)
|
|
best_meta["display_label"] = _display_label(best_meta)
|
|
return best_meta
|
|
|
|
created = _create_person(face_bgr, scene_bgr, embedding, source=source)
|
|
created["ok"] = True
|
|
created["match_score"] = float(best_score if best_score >= 0.0 else 0.0)
|
|
return created
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
|
|
def _load_image_from_bytes(data: bytes) -> Optional[np.ndarray]:
|
|
if not data:
|
|
return None
|
|
arr = np.frombuffer(data, dtype=np.uint8)
|
|
image = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
return image
|
|
|
|
|
|
def import_person_photo(data: bytes, filename: str = "", person_id: str = "") -> Dict[str, Any]:
|
|
image = _load_image_from_bytes(data)
|
|
if image is None:
|
|
return {"ok": False, "error": "invalid image"}
|
|
face_box = detect_face_box(image)
|
|
if face_box is None:
|
|
return {"ok": False, "error": "no face detected in uploaded image"}
|
|
person_id = str(person_id or "").strip()
|
|
if person_id:
|
|
person_dir = people_dir() / person_id
|
|
meta = _read_person(person_dir)
|
|
if not meta:
|
|
return {"ok": False, "error": "person not found"}
|
|
face_bgr = _extract_face_crop(image, face_box)
|
|
if face_bgr is None:
|
|
return {"ok": False, "error": "invalid face crop"}
|
|
scene_bgr = image.copy()
|
|
embedding = _compute_embedding(face_bgr)
|
|
meta = _append_sample(person_dir, meta, face_bgr, scene_bgr, embedding, source="upload")
|
|
meta["ok"] = True
|
|
meta["known_person"] = True
|
|
meta["new_person"] = False
|
|
meta["display_label"] = _display_label(meta)
|
|
return meta
|
|
return recognize_or_enroll(image, face_box, None, source="upload")
|
|
|
|
|
|
def attach_captured_photo(person_id: str, photo_path: str | Path) -> bool:
|
|
person_id = str(person_id or "").strip()
|
|
if not person_id:
|
|
return False
|
|
person_dir = people_dir() / person_id
|
|
meta = _read_person(person_dir)
|
|
if not meta:
|
|
return False
|
|
src = Path(photo_path).expanduser()
|
|
if not src.exists() or not src.is_file():
|
|
return False
|
|
dst = person_dir / f"{CAPTURE_PREFIX}{src.name}"
|
|
try:
|
|
shutil.copy2(src, dst)
|
|
photos = meta.get("captured_photos") or []
|
|
if not isinstance(photos, list):
|
|
photos = []
|
|
if dst.name not in photos:
|
|
photos.append(dst.name)
|
|
meta["captured_photos"] = sorted(photos)
|
|
meta["last_seen_at"] = _now_ts()
|
|
_write_person(person_dir, meta)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def list_people() -> list[Dict[str, Any]]:
|
|
items: list[Dict[str, Any]] = []
|
|
for person_dir in _person_dirs():
|
|
meta = _read_person(person_dir)
|
|
if not meta:
|
|
continue
|
|
face_thumb = person_dir / FACE_REF_NAME
|
|
scene_thumb = person_dir / SCENE_REF_NAME
|
|
items.append(
|
|
{
|
|
"person_id": str(meta.get("person_id") or person_dir.name),
|
|
"short_id": str(meta.get("short_id") or ""),
|
|
"display_name": str(meta.get("display_name") or _display_label(meta)),
|
|
"created_at": float(meta.get("created_at", 0.0) or 0.0),
|
|
"created_date": str(meta.get("created_date") or ""),
|
|
"last_seen_at": float(meta.get("last_seen_at", 0.0) or 0.0),
|
|
"sample_count": int(meta.get("sample_count", 0) or 0),
|
|
"times_seen": int(meta.get("times_seen", 0) or 0),
|
|
"source": str(meta.get("source") or meta.get("last_source") or ""),
|
|
"last_source": str(meta.get("last_source") or ""),
|
|
"face_thumb": face_thumb.name if face_thumb.exists() else "",
|
|
"scene_thumb": scene_thumb.name if scene_thumb.exists() else "",
|
|
"captured_photos": list(meta.get("captured_photos") or []),
|
|
"known": True,
|
|
}
|
|
)
|
|
items.sort(key=lambda item: float(item.get("last_seen_at", 0.0) or 0.0), reverse=True)
|
|
return items
|
|
|
|
|
|
def get_person(person_id: str) -> Optional[Dict[str, Any]]:
|
|
person_dir = people_dir() / str(person_id or "").strip()
|
|
meta = _read_person(person_dir)
|
|
if not meta:
|
|
return None
|
|
meta["person_dir"] = str(person_dir)
|
|
meta["display_label"] = _display_label(meta)
|
|
return meta
|
|
|
|
|
|
def read_person_image(person_id: str, kind: str = "face") -> Optional[tuple[bytes, str, str]]:
|
|
person_id = str(person_id or "").strip()
|
|
if not person_id:
|
|
return None
|
|
person_dir = people_dir() / person_id
|
|
if kind == "scene":
|
|
target = person_dir / SCENE_REF_NAME
|
|
else:
|
|
target = person_dir / FACE_REF_NAME
|
|
if not target.exists():
|
|
return None
|
|
data = target.read_bytes()
|
|
return data, "image/jpeg", target.name
|
|
|
|
|
|
def delete_person(person_id: str) -> bool:
|
|
person_dir = people_dir() / str(person_id or "").strip()
|
|
if not person_dir.exists() or not person_dir.is_dir():
|
|
return False
|
|
shutil.rmtree(person_dir)
|
|
return True
|
|
|
|
|
|
def reset_people() -> int:
|
|
count = 0
|
|
for person_dir in _person_dirs():
|
|
shutil.rmtree(person_dir)
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def export_person_zip(person_id: str, out_path: Path) -> Optional[Path]:
|
|
person_id = str(person_id or "").strip()
|
|
if not person_id:
|
|
return None
|
|
person_dir = people_dir() / person_id
|
|
if not person_dir.exists():
|
|
return None
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with zipfile.ZipFile(out_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
for child in sorted(person_dir.rglob("*")):
|
|
if not child.is_file():
|
|
continue
|
|
zf.write(child, arcname=str(child.relative_to(person_dir.parent)))
|
|
return out_path
|