"""VisitorMemory — persistent visitor-profile store (NEW in P3). SanadV3 has no memory/visitor store, so P3 builds one. File-IO only (no DB, no ML), mirroring vision/face_gallery.py: a threading.RLock + one JSON file per profile under data/memories/. Each profile links to a face_gallery face_id so a recognized VIP can be greeted with remembered attributes/notes. Profile schema (data/memories/.json): { "id", "name", "attributes": {..}, "notes", "tags": [..], "linked_face_id", "last_seen", "created", "visit_count" } Kept Python-3.8 compatible. Best-effort + atomic writes; never raises on IO. """ from __future__ import annotations import json import logging import os import re import threading import time from pathlib import Path from typing import Any, Dict, List, Optional _log = logging.getLogger("pkg3.memory") def _now() -> str: # ISO-ish UTC without importing datetime.now-with-tz gymnastics. return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def _slug(name: str) -> str: s = re.sub(r"[^a-zA-Z0-9_-]+", "-", (name or "").strip().lower()).strip("-") return s or "visitor" class VisitorMemory(object): def __init__(self, root: Optional[str] = None): if root is None: root = os.environ.get("SANAD_MEMORIES_DIR", "") if not root: try: from Project.Sanad.config import BASE_DIR root = str(Path(BASE_DIR) / "data" / "memories") except Exception: root = str(Path.cwd() / "data" / "memories") self.root = Path(root) self._lock = threading.RLock() try: self.root.mkdir(parents=True, exist_ok=True) except Exception: pass self._version = 0 # -- internal -- def _path(self, pid: str) -> Path: return self.root / ("%s.json" % pid) def _write(self, pid: str, data: Dict[str, Any]) -> bool: """Atomic write (tmp+replace), best-effort — never raises on IO (matches the get/list/delete contract). Returns False + cleans up the tmp file on failure.""" tmp = self.root / ("%s.json.tmp" % pid) try: tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(self._path(pid)) self._version += 1 return True except Exception: _log.exception("visitor_memory: could not persist %s (kept in-memory)", pid) try: if tmp.exists(): tmp.unlink() except Exception: pass return False def _unique_id(self, name: str) -> str: base = _slug(name) pid = base n = 1 while self._path(pid).exists(): n += 1 pid = "%s-%d" % (base, n) return pid # -- queries -- def list(self) -> List[Dict[str, Any]]: out = [] with self._lock: for p in sorted(self.root.glob("*.json")): try: out.append(json.loads(p.read_text(encoding="utf-8"))) except Exception: continue return out def get(self, pid: str) -> Optional[Dict[str, Any]]: with self._lock: p = self._path(pid) if not p.exists(): return None try: return json.loads(p.read_text(encoding="utf-8")) except Exception: return None def find_by_face(self, face_id: str) -> Optional[Dict[str, Any]]: if not face_id: return None for prof in self.list(): if prof.get("linked_face_id") == face_id: return prof return None # -- mutations -- def add(self, name: str, attributes: Optional[Dict[str, Any]] = None, notes: str = "", tags: Optional[List[str]] = None, linked_face_id: str = "") -> Dict[str, Any]: with self._lock: pid = self._unique_id(name) prof = { "id": pid, "name": name or pid, "attributes": dict(attributes or {}), "notes": notes or "", "tags": list(tags or []), "linked_face_id": linked_face_id or "", "created": _now(), "last_seen": _now(), "visit_count": 0, } self._write(pid, prof) return prof def update(self, pid: str, **fields) -> Optional[Dict[str, Any]]: with self._lock: prof = self.get(pid) if prof is None: return None for k in ("name", "notes", "linked_face_id"): if k in fields and fields[k] is not None: prof[k] = fields[k] if isinstance(fields.get("attributes"), dict): prof.setdefault("attributes", {}).update(fields["attributes"]) if isinstance(fields.get("tags"), list): prof["tags"] = fields["tags"] self._write(pid, prof) return prof def touch(self, pid: str) -> Optional[Dict[str, Any]]: """Record a visit — bump last_seen + visit_count.""" with self._lock: prof = self.get(pid) if prof is None: return None prof["last_seen"] = _now() prof["visit_count"] = int(prof.get("visit_count", 0)) + 1 self._write(pid, prof) return prof def delete(self, pid: str) -> bool: with self._lock: p = self._path(pid) if not p.exists(): return False try: p.unlink() self._version += 1 return True except Exception: return False # -- greeting primer (feeds personalized greetings to the live session) -- def load_for_primer(self, limit: int = 12) -> str: """Compact text summary of known visitors, for the Gemini primer/persona so the robot can greet a recognized VIP by name with remembered context.""" profs = self.list() if not profs: return "" profs.sort(key=lambda p: p.get("last_seen", ""), reverse=True) lines = ["Known visitors you may recognize (greet them personally):"] for prof in profs[:limit]: attrs = ", ".join("%s=%s" % (k, v) for k, v in (prof.get("attributes") or {}).items()) bits = [prof.get("name", prof.get("id", "?"))] if prof.get("linked_face_id"): bits.append("face:%s" % prof["linked_face_id"]) if attrs: bits.append("(%s)" % attrs) if prof.get("notes"): bits.append("— %s" % prof["notes"]) lines.append(" • " + " ".join(bits)) return "\n".join(lines) def status(self) -> Dict[str, Any]: return {"ok": True, "count": len(list(self.root.glob("*.json"))), "root": str(self.root), "version": self._version}