99 lines
3.8 KiB
Python
99 lines
3.8 KiB
Python
"""Recognition state file — atomic JSON I/O shared by parent + child.
|
|
|
|
The dashboard (parent process) writes this file on every toggle / face
|
|
gallery change; the Gemini child (`gemini/script.py`) polls it at 1 Hz
|
|
to flip its in-memory flags without a session restart.
|
|
|
|
Format (data/.recognition_state.json):
|
|
{
|
|
"vision_enabled": bool,
|
|
"face_rec_enabled": bool,
|
|
"gallery_version": int, # bumped on every face CRUD
|
|
"zone_rec_enabled": bool, # N3 — zones/places knowledge toggle
|
|
"zones_version": int, # bumped on every zone/place CRUD
|
|
"nav_target_zone_id": int, # active "go here" destination (0 = none)
|
|
"nav_target_place_id": int,
|
|
"movement_enabled": bool # N2 — Gemini-driven locomotion gate
|
|
}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
|
|
|
|
@dataclass
|
|
class RecognitionState:
|
|
vision_enabled: bool = False
|
|
face_rec_enabled: bool = False
|
|
gallery_version: int = 0
|
|
# N3 — zones/places knowledge (zone → place → linked faces)
|
|
zone_rec_enabled: bool = False
|
|
zones_version: int = 0
|
|
# "Go here" destination — the active place the robot should head to.
|
|
# 0/0 = no destination set. Actual locomotion is wired by N2.
|
|
nav_target_zone_id: int = 0
|
|
nav_target_place_id: int = 0
|
|
# N2 — Gemini-driven locomotion enable gate (default OFF for safety)
|
|
movement_enabled: bool = False
|
|
# Auto-record every conversation turn to data/recordings/ (default ON to
|
|
# match historical behavior). Toggled live from the Live Gemini panel; the
|
|
# child syncs TurnRecorder.enabled to this without a session restart.
|
|
record_enabled: bool = True
|
|
|
|
|
|
def read(path: Path) -> RecognitionState:
|
|
"""Return the persisted state, or a default if missing/corrupt."""
|
|
try:
|
|
raw = json.loads(Path(path).read_text(encoding="utf-8"))
|
|
except (FileNotFoundError, json.JSONDecodeError, OSError,
|
|
UnicodeDecodeError, ValueError):
|
|
# UnicodeDecodeError (bad UTF-8 bytes) and ValueError (e.g. a non-dict
|
|
# top-level) are not OSError/JSONDecodeError subclasses — catch them too
|
|
# so read() honours its "never raises" contract on a corrupt file.
|
|
return RecognitionState()
|
|
if not isinstance(raw, dict):
|
|
return RecognitionState()
|
|
return RecognitionState(
|
|
vision_enabled=bool(raw.get("vision_enabled", False)),
|
|
face_rec_enabled=bool(raw.get("face_rec_enabled", False)),
|
|
gallery_version=int(raw.get("gallery_version", 0)),
|
|
zone_rec_enabled=bool(raw.get("zone_rec_enabled", False)),
|
|
zones_version=int(raw.get("zones_version", 0)),
|
|
nav_target_zone_id=int(raw.get("nav_target_zone_id", 0)),
|
|
nav_target_place_id=int(raw.get("nav_target_place_id", 0)),
|
|
movement_enabled=bool(raw.get("movement_enabled", False)),
|
|
record_enabled=bool(raw.get("record_enabled", True)),
|
|
)
|
|
|
|
|
|
def write(path: Path, state: RecognitionState) -> None:
|
|
"""Write atomically via tempfile + os.replace."""
|
|
p = Path(path)
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
fd, tmp = tempfile.mkstemp(prefix=f".{p.name}.", suffix=".tmp", dir=str(p.parent))
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
|
json.dump(asdict(state), fh, ensure_ascii=False, indent=2)
|
|
os.replace(tmp, p)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
def mutate(path: Path, **changes) -> RecognitionState:
|
|
"""Read-modify-write helper. Returns the new state."""
|
|
cur = read(path)
|
|
for k, v in changes.items():
|
|
if hasattr(cur, k):
|
|
setattr(cur, k, v)
|
|
write(path, cur)
|
|
return cur
|