252 lines
9.9 KiB
Python
252 lines
9.9 KiB
Python
"""
|
|
sequences.py — user-defined motion macros (record / save / play).
|
|
|
|
A sequence is a named list of canonical motion commands. Saved as JSON
|
|
under `Data/Sequences/<name>.json`. Created via voice during a recording
|
|
session (the user dictates a series of motions, then names the macro);
|
|
played back later by the same name. Sequence content is the SAME format
|
|
the dispatcher already enqueues — strings like 'turn left 90 degrees',
|
|
'walk forward 5 steps', 'sit down' — so playback simply re-enqueues
|
|
each one through the existing motion worker.
|
|
|
|
State model:
|
|
IDLE — no recording in progress.
|
|
RECORDING(name) — capturing dispatched commands into a buffer.
|
|
The optional `name` is set if the user pre-named
|
|
the sequence ('start recording as my-greet'); if
|
|
not, the user must name it on save.
|
|
PLAYING(name, queue) — currently re-issuing commands from `queue`.
|
|
(Tracked just for status; the work is done by
|
|
the motion worker, not this module.)
|
|
|
|
Threading:
|
|
All state mutations are guarded by `_lock`. The recording buffer is
|
|
appended from the dispatcher thread; saved/loaded from any thread.
|
|
|
|
Public API (used by Voice/marcus_voice.py):
|
|
Sequences() — singleton
|
|
seq.start_recording(name=None) — begin capture
|
|
seq.cancel_recording() — drop without saving
|
|
seq.save_recording(name) — finalise + write JSON
|
|
seq.record_command(canonical) — append to buffer if recording
|
|
seq.play(name) -> list[str] | None — return commands to enqueue
|
|
seq.list() -> list[str] — saved names
|
|
seq.delete(name) -> bool — remove file
|
|
seq.recording_state() -> dict — for status / persona feedback
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
from typing import Optional, List
|
|
|
|
# Resolve Data/Sequences/ off PROJECT_ROOT so this module works whether
|
|
# Marcus is run from the repo root or invoked via systemd. PROJECT_ROOT
|
|
# is set by Core/env_loader.
|
|
try:
|
|
from Core.env_loader import PROJECT_ROOT # type: ignore
|
|
except Exception:
|
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
_SEQUENCE_DIR = os.path.join(PROJECT_ROOT, "Data", "Sequences")
|
|
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,30}$", re.I)
|
|
|
|
# Skip these canonicals when capturing — control commands, not motions.
|
|
# Loaded from Config/language_tables.json::sequence_never_record.canonicals.
|
|
# Adding a new control canonical that should be excluded from recordings
|
|
# is a JSON edit, not a Python change.
|
|
from Voice._language_tables import sequence_never_record as _load_never_record
|
|
_NEVER_RECORD = _load_never_record()
|
|
|
|
|
|
def _safe_name(name: str) -> Optional[str]:
|
|
"""Normalise + validate a sequence name. Returns None if invalid.
|
|
Forces lower-case so 'My-Greet' and 'my-greet' refer to the same
|
|
file — voice transcripts often vary capitalisation."""
|
|
if not name:
|
|
return None
|
|
n = name.strip().lower().replace(" ", "-")
|
|
if not _NAME_RE.match(n):
|
|
return None
|
|
return n
|
|
|
|
|
|
class Sequences:
|
|
def __init__(self):
|
|
self._lock = threading.Lock()
|
|
self._recording: bool = False
|
|
self._record_name: Optional[str] = None
|
|
self._record_buf: List[str] = []
|
|
self._record_started_at: float = 0.0
|
|
os.makedirs(_SEQUENCE_DIR, exist_ok=True)
|
|
|
|
# ─── recording ─────────────────────────────────────────
|
|
|
|
def start_recording(self, name: Optional[str] = None) -> dict:
|
|
"""Begin capturing. If `name` provided + valid, sequence is
|
|
pre-named (save_recording can be called with no arg). Returns
|
|
a small dict suitable for logging / state-injection."""
|
|
with self._lock:
|
|
if self._recording:
|
|
return {"ok": False, "reason": "already recording",
|
|
"name": self._record_name}
|
|
self._recording = True
|
|
self._record_name = _safe_name(name) if name else None
|
|
self._record_buf = []
|
|
self._record_started_at = time.time()
|
|
return {"ok": True, "name": self._record_name}
|
|
|
|
def cancel_recording(self) -> dict:
|
|
with self._lock:
|
|
if not self._recording:
|
|
return {"ok": False, "reason": "not recording"}
|
|
n = self._record_name
|
|
count = len(self._record_buf)
|
|
self._recording = False
|
|
self._record_name = None
|
|
self._record_buf = []
|
|
self._record_started_at = 0.0
|
|
return {"ok": True, "name": n, "discarded": count}
|
|
|
|
def save_recording(self, name: Optional[str] = None) -> dict:
|
|
"""Persist the buffered commands as <name>.json. Resolves the
|
|
name in this order:
|
|
1. explicit `name` arg
|
|
2. name set by start_recording
|
|
3. None → error.
|
|
Empty buffers are NOT saved (would produce a dud macro)."""
|
|
with self._lock:
|
|
if not self._recording:
|
|
return {"ok": False, "reason": "not recording"}
|
|
target = _safe_name(name) or self._record_name
|
|
if not target:
|
|
return {"ok": False, "reason": "no name supplied"}
|
|
if not self._record_buf:
|
|
self._recording = False
|
|
self._record_name = None
|
|
return {"ok": False, "reason": "empty sequence (nothing recorded)",
|
|
"name": target}
|
|
payload = {
|
|
"name": target,
|
|
"created_at": self._record_started_at,
|
|
"saved_at": time.time(),
|
|
"commands": list(self._record_buf),
|
|
}
|
|
path = os.path.join(_SEQUENCE_DIR, target + ".json")
|
|
try:
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(payload, f, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
return {"ok": False, "reason": "write failed: %s" % e}
|
|
count = len(self._record_buf)
|
|
self._recording = False
|
|
self._record_name = None
|
|
self._record_buf = []
|
|
self._record_started_at = 0.0
|
|
return {"ok": True, "name": target, "count": count, "path": path}
|
|
|
|
def record_command(self, canonical: str) -> bool:
|
|
"""Append a canonical to the recording buffer (if recording).
|
|
Returns True if appended, False otherwise. Skips control commands
|
|
(see _NEVER_RECORD)."""
|
|
if not canonical:
|
|
return False
|
|
with self._lock:
|
|
if not self._recording:
|
|
return False
|
|
c = canonical.strip()
|
|
if not c or c.lower() in _NEVER_RECORD:
|
|
return False
|
|
self._record_buf.append(c)
|
|
return True
|
|
|
|
# ─── playback ─────────────────────────────────────────
|
|
|
|
def play(self, name: str) -> Optional[List[str]]:
|
|
"""Load the sequence's commands. Caller is responsible for
|
|
enqueueing them through the motion worker. Returns None if the
|
|
sequence doesn't exist or is malformed."""
|
|
target = _safe_name(name)
|
|
if not target:
|
|
return None
|
|
path = os.path.join(_SEQUENCE_DIR, target + ".json")
|
|
if not os.path.isfile(path):
|
|
return None
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
payload = json.load(f)
|
|
except Exception:
|
|
return None
|
|
cmds = payload.get("commands")
|
|
if not isinstance(cmds, list) or not all(isinstance(c, str) for c in cmds):
|
|
return None
|
|
return [c for c in cmds if c.strip()]
|
|
|
|
def list(self) -> List[str]:
|
|
"""Sorted list of saved sequence names."""
|
|
try:
|
|
files = os.listdir(_SEQUENCE_DIR)
|
|
except FileNotFoundError:
|
|
return []
|
|
out = sorted(
|
|
os.path.splitext(f)[0]
|
|
for f in files
|
|
if f.endswith(".json") and _NAME_RE.match(os.path.splitext(f)[0])
|
|
)
|
|
return out
|
|
|
|
def delete(self, name: str) -> bool:
|
|
target = _safe_name(name)
|
|
if not target:
|
|
return False
|
|
path = os.path.join(_SEQUENCE_DIR, target + ".json")
|
|
if not os.path.isfile(path):
|
|
return False
|
|
try:
|
|
os.remove(path)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
# ─── status ───────────────────────────────────────────
|
|
|
|
def recording_state(self) -> dict:
|
|
with self._lock:
|
|
return {
|
|
"recording": self._recording,
|
|
"name": self._record_name,
|
|
"count": len(self._record_buf),
|
|
"elapsed_sec": (time.time() - self._record_started_at
|
|
if self._recording else 0.0),
|
|
}
|
|
|
|
|
|
# Process-wide singleton — the dispatcher and the worker share one.
|
|
_INSTANCE: Optional[Sequences] = None
|
|
_INSTANCE_LOCK = threading.Lock()
|
|
|
|
|
|
def get_sequences() -> Sequences:
|
|
global _INSTANCE
|
|
with _INSTANCE_LOCK:
|
|
if _INSTANCE is None:
|
|
_INSTANCE = Sequences()
|
|
return _INSTANCE
|
|
|
|
|
|
# Standalone smoke check
|
|
if __name__ == "__main__":
|
|
s = Sequences()
|
|
print(s.start_recording("test-greet"))
|
|
print(s.record_command("turn right 45 degrees"))
|
|
print(s.record_command("wave hello"))
|
|
print(s.record_command("turn left 45 degrees"))
|
|
print(s.save_recording())
|
|
print("listed:", s.list())
|
|
print("play:", s.play("test-greet"))
|
|
print(s.delete("test-greet"))
|
|
print("after delete:", s.list())
|