194 lines
6.9 KiB
Python
194 lines
6.9 KiB
Python
"""VisitorMemory — persistent visitor-profile store (NEW in P3).
|
|
|
|
SanadV3 has no memory/visitor store, so P3 builds one. File-IO only (no DB, no
|
|
ML), mirroring vision/face_gallery.py: a threading.RLock + one JSON file per
|
|
profile under data/memories/. Each profile links to a face_gallery face_id so a
|
|
recognized VIP can be greeted with remembered attributes/notes.
|
|
|
|
Profile schema (data/memories/<id>.json):
|
|
{ "id", "name", "attributes": {..}, "notes", "tags": [..],
|
|
"linked_face_id", "last_seen", "created", "visit_count" }
|
|
|
|
Kept Python-3.8 compatible. Best-effort + atomic writes; never raises on IO.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
_log = logging.getLogger("pkg3.memory")
|
|
|
|
|
|
def _now() -> str:
|
|
# ISO-ish UTC without importing datetime.now-with-tz gymnastics.
|
|
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
|
|
|
|
|
def _slug(name: str) -> str:
|
|
s = re.sub(r"[^a-zA-Z0-9_-]+", "-", (name or "").strip().lower()).strip("-")
|
|
return s or "visitor"
|
|
|
|
|
|
class VisitorMemory(object):
|
|
def __init__(self, root: Optional[str] = None):
|
|
if root is None:
|
|
root = os.environ.get("SANAD_MEMORIES_DIR", "")
|
|
if not root:
|
|
try:
|
|
from Project.Sanad.config import BASE_DIR
|
|
root = str(Path(BASE_DIR) / "data" / "memories")
|
|
except Exception:
|
|
root = str(Path.cwd() / "data" / "memories")
|
|
self.root = Path(root)
|
|
self._lock = threading.RLock()
|
|
try:
|
|
self.root.mkdir(parents=True, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
self._version = 0
|
|
|
|
# -- internal --
|
|
def _path(self, pid: str) -> Path:
|
|
return self.root / ("%s.json" % pid)
|
|
|
|
def _write(self, pid: str, data: Dict[str, Any]) -> bool:
|
|
"""Atomic write (tmp+replace), best-effort — never raises on IO (matches the
|
|
get/list/delete contract). Returns False + cleans up the tmp file on failure."""
|
|
tmp = self.root / ("%s.json.tmp" % pid)
|
|
try:
|
|
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
tmp.replace(self._path(pid))
|
|
self._version += 1
|
|
return True
|
|
except Exception:
|
|
_log.exception("visitor_memory: could not persist %s (kept in-memory)", pid)
|
|
try:
|
|
if tmp.exists():
|
|
tmp.unlink()
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def _unique_id(self, name: str) -> str:
|
|
base = _slug(name)
|
|
pid = base
|
|
n = 1
|
|
while self._path(pid).exists():
|
|
n += 1
|
|
pid = "%s-%d" % (base, n)
|
|
return pid
|
|
|
|
# -- queries --
|
|
def list(self) -> List[Dict[str, Any]]:
|
|
out = []
|
|
with self._lock:
|
|
for p in sorted(self.root.glob("*.json")):
|
|
try:
|
|
out.append(json.loads(p.read_text(encoding="utf-8")))
|
|
except Exception:
|
|
continue
|
|
return out
|
|
|
|
def get(self, pid: str) -> Optional[Dict[str, Any]]:
|
|
with self._lock:
|
|
p = self._path(pid)
|
|
if not p.exists():
|
|
return None
|
|
try:
|
|
return json.loads(p.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
|
|
def find_by_face(self, face_id: str) -> Optional[Dict[str, Any]]:
|
|
if not face_id:
|
|
return None
|
|
for prof in self.list():
|
|
if prof.get("linked_face_id") == face_id:
|
|
return prof
|
|
return None
|
|
|
|
# -- mutations --
|
|
def add(self, name: str, attributes: Optional[Dict[str, Any]] = None,
|
|
notes: str = "", tags: Optional[List[str]] = None,
|
|
linked_face_id: str = "") -> Dict[str, Any]:
|
|
with self._lock:
|
|
pid = self._unique_id(name)
|
|
prof = {
|
|
"id": pid, "name": name or pid,
|
|
"attributes": dict(attributes or {}),
|
|
"notes": notes or "", "tags": list(tags or []),
|
|
"linked_face_id": linked_face_id or "",
|
|
"created": _now(), "last_seen": _now(), "visit_count": 0,
|
|
}
|
|
self._write(pid, prof)
|
|
return prof
|
|
|
|
def update(self, pid: str, **fields) -> Optional[Dict[str, Any]]:
|
|
with self._lock:
|
|
prof = self.get(pid)
|
|
if prof is None:
|
|
return None
|
|
for k in ("name", "notes", "linked_face_id"):
|
|
if k in fields and fields[k] is not None:
|
|
prof[k] = fields[k]
|
|
if isinstance(fields.get("attributes"), dict):
|
|
prof.setdefault("attributes", {}).update(fields["attributes"])
|
|
if isinstance(fields.get("tags"), list):
|
|
prof["tags"] = fields["tags"]
|
|
self._write(pid, prof)
|
|
return prof
|
|
|
|
def touch(self, pid: str) -> Optional[Dict[str, Any]]:
|
|
"""Record a visit — bump last_seen + visit_count."""
|
|
with self._lock:
|
|
prof = self.get(pid)
|
|
if prof is None:
|
|
return None
|
|
prof["last_seen"] = _now()
|
|
prof["visit_count"] = int(prof.get("visit_count", 0)) + 1
|
|
self._write(pid, prof)
|
|
return prof
|
|
|
|
def delete(self, pid: str) -> bool:
|
|
with self._lock:
|
|
p = self._path(pid)
|
|
if not p.exists():
|
|
return False
|
|
try:
|
|
p.unlink()
|
|
self._version += 1
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
# -- greeting primer (feeds personalized greetings to the live session) --
|
|
def load_for_primer(self, limit: int = 12) -> str:
|
|
"""Compact text summary of known visitors, for the Gemini primer/persona so
|
|
the robot can greet a recognized VIP by name with remembered context."""
|
|
profs = self.list()
|
|
if not profs:
|
|
return ""
|
|
profs.sort(key=lambda p: p.get("last_seen", ""), reverse=True)
|
|
lines = ["Known visitors you may recognize (greet them personally):"]
|
|
for prof in profs[:limit]:
|
|
attrs = ", ".join("%s=%s" % (k, v) for k, v in (prof.get("attributes") or {}).items())
|
|
bits = [prof.get("name", prof.get("id", "?"))]
|
|
if prof.get("linked_face_id"):
|
|
bits.append("face:%s" % prof["linked_face_id"])
|
|
if attrs:
|
|
bits.append("(%s)" % attrs)
|
|
if prof.get("notes"):
|
|
bits.append("— %s" % prof["notes"])
|
|
lines.append(" • " + " ".join(bits))
|
|
return "\n".join(lines)
|
|
|
|
def status(self) -> Dict[str, Any]:
|
|
return {"ok": True, "count": len(list(self.root.glob("*.json"))),
|
|
"root": str(self.root), "version": self._version}
|