Marcus/Voice/sequences.py

252 lines
9.9 KiB
Python

"""
sequences.py — user-defined motion macros (record / save / play).
A sequence is a named list of canonical motion commands. Saved as JSON
under `Data/Sequences/<name>.json`. Created via voice during a recording
session (the user dictates a series of motions, then names the macro);
played back later by the same name. Sequence content is the SAME format
the dispatcher already enqueues — strings like 'turn left 90 degrees',
'walk forward 5 steps', 'sit down' — so playback simply re-enqueues
each one through the existing motion worker.
State model:
IDLE — no recording in progress.
RECORDING(name) — capturing dispatched commands into a buffer.
The optional `name` is set if the user pre-named
the sequence ('start recording as my-greet'); if
not, the user must name it on save.
PLAYING(name, queue) — currently re-issuing commands from `queue`.
(Tracked just for status; the work is done by
the motion worker, not this module.)
Threading:
All state mutations are guarded by `_lock`. The recording buffer is
appended from the dispatcher thread; saved/loaded from any thread.
Public API (used by Voice/marcus_voice.py):
Sequences() — singleton
seq.start_recording(name=None) — begin capture
seq.cancel_recording() — drop without saving
seq.save_recording(name) — finalise + write JSON
seq.record_command(canonical) — append to buffer if recording
seq.play(name) -> list[str] | None — return commands to enqueue
seq.list() -> list[str] — saved names
seq.delete(name) -> bool — remove file
seq.recording_state() -> dict — for status / persona feedback
"""
from __future__ import annotations
import json
import os
import re
import threading
import time
from typing import Optional, List
# Resolve Data/Sequences/ off PROJECT_ROOT so this module works whether
# Marcus is run from the repo root or invoked via systemd. PROJECT_ROOT
# is set by Core/env_loader.
try:
from Core.env_loader import PROJECT_ROOT # type: ignore
except Exception:
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_SEQUENCE_DIR = os.path.join(PROJECT_ROOT, "Data", "Sequences")
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,30}$", re.I)
# Skip these canonicals when capturing — control commands, not motions.
# Loaded from Config/language_tables.json::sequence_never_record.canonicals.
# Adding a new control canonical that should be excluded from recordings
# is a JSON edit, not a Python change.
from Voice._language_tables import sequence_never_record as _load_never_record
_NEVER_RECORD = _load_never_record()
def _safe_name(name: str) -> Optional[str]:
"""Normalise + validate a sequence name. Returns None if invalid.
Forces lower-case so 'My-Greet' and 'my-greet' refer to the same
file — voice transcripts often vary capitalisation."""
if not name:
return None
n = name.strip().lower().replace(" ", "-")
if not _NAME_RE.match(n):
return None
return n
class Sequences:
def __init__(self):
self._lock = threading.Lock()
self._recording: bool = False
self._record_name: Optional[str] = None
self._record_buf: List[str] = []
self._record_started_at: float = 0.0
os.makedirs(_SEQUENCE_DIR, exist_ok=True)
# ─── recording ─────────────────────────────────────────
def start_recording(self, name: Optional[str] = None) -> dict:
"""Begin capturing. If `name` provided + valid, sequence is
pre-named (save_recording can be called with no arg). Returns
a small dict suitable for logging / state-injection."""
with self._lock:
if self._recording:
return {"ok": False, "reason": "already recording",
"name": self._record_name}
self._recording = True
self._record_name = _safe_name(name) if name else None
self._record_buf = []
self._record_started_at = time.time()
return {"ok": True, "name": self._record_name}
def cancel_recording(self) -> dict:
with self._lock:
if not self._recording:
return {"ok": False, "reason": "not recording"}
n = self._record_name
count = len(self._record_buf)
self._recording = False
self._record_name = None
self._record_buf = []
self._record_started_at = 0.0
return {"ok": True, "name": n, "discarded": count}
def save_recording(self, name: Optional[str] = None) -> dict:
"""Persist the buffered commands as <name>.json. Resolves the
name in this order:
1. explicit `name` arg
2. name set by start_recording
3. None → error.
Empty buffers are NOT saved (would produce a dud macro)."""
with self._lock:
if not self._recording:
return {"ok": False, "reason": "not recording"}
target = _safe_name(name) or self._record_name
if not target:
return {"ok": False, "reason": "no name supplied"}
if not self._record_buf:
self._recording = False
self._record_name = None
return {"ok": False, "reason": "empty sequence (nothing recorded)",
"name": target}
payload = {
"name": target,
"created_at": self._record_started_at,
"saved_at": time.time(),
"commands": list(self._record_buf),
}
path = os.path.join(_SEQUENCE_DIR, target + ".json")
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
except Exception as e:
return {"ok": False, "reason": "write failed: %s" % e}
count = len(self._record_buf)
self._recording = False
self._record_name = None
self._record_buf = []
self._record_started_at = 0.0
return {"ok": True, "name": target, "count": count, "path": path}
def record_command(self, canonical: str) -> bool:
"""Append a canonical to the recording buffer (if recording).
Returns True if appended, False otherwise. Skips control commands
(see _NEVER_RECORD)."""
if not canonical:
return False
with self._lock:
if not self._recording:
return False
c = canonical.strip()
if not c or c.lower() in _NEVER_RECORD:
return False
self._record_buf.append(c)
return True
# ─── playback ─────────────────────────────────────────
def play(self, name: str) -> Optional[List[str]]:
"""Load the sequence's commands. Caller is responsible for
enqueueing them through the motion worker. Returns None if the
sequence doesn't exist or is malformed."""
target = _safe_name(name)
if not target:
return None
path = os.path.join(_SEQUENCE_DIR, target + ".json")
if not os.path.isfile(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
payload = json.load(f)
except Exception:
return None
cmds = payload.get("commands")
if not isinstance(cmds, list) or not all(isinstance(c, str) for c in cmds):
return None
return [c for c in cmds if c.strip()]
def list(self) -> List[str]:
"""Sorted list of saved sequence names."""
try:
files = os.listdir(_SEQUENCE_DIR)
except FileNotFoundError:
return []
out = sorted(
os.path.splitext(f)[0]
for f in files
if f.endswith(".json") and _NAME_RE.match(os.path.splitext(f)[0])
)
return out
def delete(self, name: str) -> bool:
target = _safe_name(name)
if not target:
return False
path = os.path.join(_SEQUENCE_DIR, target + ".json")
if not os.path.isfile(path):
return False
try:
os.remove(path)
return True
except Exception:
return False
# ─── status ───────────────────────────────────────────
def recording_state(self) -> dict:
with self._lock:
return {
"recording": self._recording,
"name": self._record_name,
"count": len(self._record_buf),
"elapsed_sec": (time.time() - self._record_started_at
if self._recording else 0.0),
}
# Process-wide singleton — the dispatcher and the worker share one.
_INSTANCE: Optional[Sequences] = None
_INSTANCE_LOCK = threading.Lock()
def get_sequences() -> Sequences:
global _INSTANCE
with _INSTANCE_LOCK:
if _INSTANCE is None:
_INSTANCE = Sequences()
return _INSTANCE
# Standalone smoke check
if __name__ == "__main__":
s = Sequences()
print(s.start_recording("test-greet"))
print(s.record_command("turn right 45 degrees"))
print(s.record_command("wave hello"))
print(s.record_command("turn left 45 degrees"))
print(s.save_recording())
print("listed:", s.list())
print("play:", s.play("test-greet"))
print(s.delete("test-greet"))
print("after delete:", s.list())