Sanad_Package_3/visitor_memory.py

194 lines
6.9 KiB
Python

"""VisitorMemory — persistent visitor-profile store (NEW in P3).
SanadV3 has no memory/visitor store, so P3 builds one. File-IO only (no DB, no
ML), mirroring vision/face_gallery.py: a threading.RLock + one JSON file per
profile under data/memories/. Each profile links to a face_gallery face_id so a
recognized VIP can be greeted with remembered attributes/notes.
Profile schema (data/memories/<id>.json):
{ "id", "name", "attributes": {..}, "notes", "tags": [..],
"linked_face_id", "last_seen", "created", "visit_count" }
Kept Python-3.8 compatible. Best-effort + atomic writes; never raises on IO.
"""
from __future__ import annotations
import json
import logging
import os
import re
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
_log = logging.getLogger("pkg3.memory")
def _now() -> str:
# ISO-ish UTC without importing datetime.now-with-tz gymnastics.
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def _slug(name: str) -> str:
s = re.sub(r"[^a-zA-Z0-9_-]+", "-", (name or "").strip().lower()).strip("-")
return s or "visitor"
class VisitorMemory(object):
def __init__(self, root: Optional[str] = None):
if root is None:
root = os.environ.get("SANAD_MEMORIES_DIR", "")
if not root:
try:
from Project.Sanad.config import BASE_DIR
root = str(Path(BASE_DIR) / "data" / "memories")
except Exception:
root = str(Path.cwd() / "data" / "memories")
self.root = Path(root)
self._lock = threading.RLock()
try:
self.root.mkdir(parents=True, exist_ok=True)
except Exception:
pass
self._version = 0
# -- internal --
def _path(self, pid: str) -> Path:
return self.root / ("%s.json" % pid)
def _write(self, pid: str, data: Dict[str, Any]) -> bool:
"""Atomic write (tmp+replace), best-effort — never raises on IO (matches the
get/list/delete contract). Returns False + cleans up the tmp file on failure."""
tmp = self.root / ("%s.json.tmp" % pid)
try:
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self._path(pid))
self._version += 1
return True
except Exception:
_log.exception("visitor_memory: could not persist %s (kept in-memory)", pid)
try:
if tmp.exists():
tmp.unlink()
except Exception:
pass
return False
def _unique_id(self, name: str) -> str:
base = _slug(name)
pid = base
n = 1
while self._path(pid).exists():
n += 1
pid = "%s-%d" % (base, n)
return pid
# -- queries --
def list(self) -> List[Dict[str, Any]]:
out = []
with self._lock:
for p in sorted(self.root.glob("*.json")):
try:
out.append(json.loads(p.read_text(encoding="utf-8")))
except Exception:
continue
return out
def get(self, pid: str) -> Optional[Dict[str, Any]]:
with self._lock:
p = self._path(pid)
if not p.exists():
return None
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
return None
def find_by_face(self, face_id: str) -> Optional[Dict[str, Any]]:
if not face_id:
return None
for prof in self.list():
if prof.get("linked_face_id") == face_id:
return prof
return None
# -- mutations --
def add(self, name: str, attributes: Optional[Dict[str, Any]] = None,
notes: str = "", tags: Optional[List[str]] = None,
linked_face_id: str = "") -> Dict[str, Any]:
with self._lock:
pid = self._unique_id(name)
prof = {
"id": pid, "name": name or pid,
"attributes": dict(attributes or {}),
"notes": notes or "", "tags": list(tags or []),
"linked_face_id": linked_face_id or "",
"created": _now(), "last_seen": _now(), "visit_count": 0,
}
self._write(pid, prof)
return prof
def update(self, pid: str, **fields) -> Optional[Dict[str, Any]]:
with self._lock:
prof = self.get(pid)
if prof is None:
return None
for k in ("name", "notes", "linked_face_id"):
if k in fields and fields[k] is not None:
prof[k] = fields[k]
if isinstance(fields.get("attributes"), dict):
prof.setdefault("attributes", {}).update(fields["attributes"])
if isinstance(fields.get("tags"), list):
prof["tags"] = fields["tags"]
self._write(pid, prof)
return prof
def touch(self, pid: str) -> Optional[Dict[str, Any]]:
"""Record a visit — bump last_seen + visit_count."""
with self._lock:
prof = self.get(pid)
if prof is None:
return None
prof["last_seen"] = _now()
prof["visit_count"] = int(prof.get("visit_count", 0)) + 1
self._write(pid, prof)
return prof
def delete(self, pid: str) -> bool:
with self._lock:
p = self._path(pid)
if not p.exists():
return False
try:
p.unlink()
self._version += 1
return True
except Exception:
return False
# -- greeting primer (feeds personalized greetings to the live session) --
def load_for_primer(self, limit: int = 12) -> str:
"""Compact text summary of known visitors, for the Gemini primer/persona so
the robot can greet a recognized VIP by name with remembered context."""
profs = self.list()
if not profs:
return ""
profs.sort(key=lambda p: p.get("last_seen", ""), reverse=True)
lines = ["Known visitors you may recognize (greet them personally):"]
for prof in profs[:limit]:
attrs = ", ".join("%s=%s" % (k, v) for k, v in (prof.get("attributes") or {}).items())
bits = [prof.get("name", prof.get("id", "?"))]
if prof.get("linked_face_id"):
bits.append("face:%s" % prof["linked_face_id"])
if attrs:
bits.append("(%s)" % attrs)
if prof.get("notes"):
bits.append("%s" % prof["notes"])
lines.append("" + " ".join(bits))
return "\n".join(lines)
def status(self) -> Dict[str, Any]:
return {"ok": True, "count": len(list(self.root.glob("*.json"))),
"root": str(self.root), "version": self._version}