"""Robot arm controller — real DDS motor commands and JSONL motion replay. Production-grade replay engine ported from G1_Lootah/Controller/g1_replay_trigger_r2x.py. Features: body-lock, ramp-in/out interpolation, watchdog, speed control, CRC. Falls back gracefully to simulation when the Unitree SDK is unavailable. """ from __future__ import annotations import json import threading import time from dataclasses import dataclass from pathlib import Path from typing import Any from Project.Sanad.config import ( G1_NUM_MOTOR, KD_HIGH, KD_LOW, KD_WRIST, KP_HIGH, KP_LOW, KP_WRIST, MOTIONS_DIR, REPLAY_HZ, WEAK_MOTORS, WRIST_MOTORS, ) from Project.Sanad.core.config_loader import section as _cfg_section from Project.Sanad.core.event_bus import bus from Project.Sanad.core.logger import get_logger log = get_logger("arm_controller") _AC = _cfg_section("motion", "arm_controller") # G1 hardware constants — single source in config/core_config.json from Project.Sanad.config import ENABLE_ARM_SDK_INDEX RAMP_IN_STEPS = _AC.get("ramp_in_steps", 60) # ~1.0s smooth move to start pose RAMP_OUT_STEPS = _AC.get("ramp_out_steps", 180) # ~3.0s smooth return to home SETTLE_HOLD_SEC = _AC.get("settle_hold_sec", 0.5) # hold start pose before replay begins WATCHDOG_TIMEOUT = _AC.get("watchdog_timeout_sec", 0.25) # hold last pose if state stale WATCHDOG_DISABLE_AFTER = _AC.get("watchdog_disable_after_sec", 1.0) # abort if state stale this long ARM_INDICES = range( _AC.get("arm_indices_start", 15), _AC.get("arm_indices_stop", 29), ) # -- SDK import (optional) -- try: from unitree_sdk2py.core.channel import ( ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber, ) from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_ from unitree_sdk2py.utils.crc import CRC # IDL factory — LowCmd_() with no args fails because the dataclass # has 5 required positional fields. The SDK ships a default factory # named `unitree_hg_msg_dds__LowCmd_` that constructs a fully-zeroed # instance with the right number of motor_cmd entries. try: from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ _make_low_cmd = unitree_hg_msg_dds__LowCmd_ except ImportError: _make_low_cmd = None _HAS_SDK = True except ImportError: _HAS_SDK = False _make_low_cmd = None log.warning("Unitree SDK not available — running in simulation mode") # G1 arm-action client for built-in arm moves (wave, shake_hand, hug, …). # NOTE: do NOT use LocoClient here — LocoClient is the locomotion/body-move # client and its ExecuteAction() doesn't recognise arm-action IDs, so arm # commands become silent no-ops. The correct client is the arm-specific # G1ArmActionClient with the SDK's action_map (name → opcode lookup). try: from unitree_sdk2py.g1.arm.g1_arm_action_client import ( G1ArmActionClient, action_map as _ARM_ACTION_MAP, ) _HAS_ARM_CLIENT = True except ImportError: G1ArmActionClient = None _ARM_ACTION_MAP = {} _HAS_ARM_CLIENT = False @dataclass class Action: name: str id: int file: str = "" # JSONL filename (empty = SDK built-in) category: str = "sdk" # "sdk" | "jsonl" # -- SDK actions (fixed — built into Unitree firmware) -- SDK_ACTIONS: list[Action] = [ Action("release_arm", 0, category="sdk"), Action("shake_hand", 1, category="sdk"), Action("high_five", 2, category="sdk"), Action("hug", 3, category="sdk"), Action("high_wave", 4, category="sdk"), Action("clap", 5, category="sdk"), Action("face_wave", 6, category="sdk"), Action("left_kiss", 7, category="sdk"), Action("heart", 8, category="sdk"), Action("right_heart", 9, category="sdk"), Action("hands_up", 10, category="sdk"), Action("x_ray", 11, category="sdk"), Action("right_hand_up", 12, category="sdk"), Action("reject", 13, category="sdk"), Action("right_kiss", 14, category="sdk"), Action("two_hand_kiss", 15, category="sdk"), ] # Next auto-ID for JSONL actions starts after SDK range. _JSONL_ID_START = _AC.get("jsonl_id_start", 100) def _scan_jsonl_actions() -> list[Action]: """Auto-discover all .jsonl files in data/motions/ and create actions. Called at startup and whenever the dashboard requests a refresh. The action name is derived from the filename (without extension), with underscores replacing hyphens/spaces. """ MOTIONS_DIR.mkdir(parents=True, exist_ok=True) actions = [] for idx, path in enumerate(sorted(MOTIONS_DIR.glob("*.jsonl"))): name = path.stem.replace("-", "_").replace(" ", "_") actions.append(Action( name=name, id=_JSONL_ID_START + idx, file=path.name, category="jsonl", )) return actions def rebuild_action_registry() -> tuple[list[Action], dict[int, Action], dict[str, Action]]: """Rebuild the full action list from SDK + disk scan. Called on startup and refresh.""" jsonl_actions = _scan_jsonl_actions() all_actions = list(SDK_ACTIONS) + jsonl_actions by_id = {a.id: a for a in all_actions} by_name = {a.name: a for a in all_actions} log.info("Action registry: %d SDK + %d JSONL = %d total", len(SDK_ACTIONS), len(jsonl_actions), len(all_actions)) return all_actions, by_id, by_name # Initial build ACTIONS, ACTION_BY_ID, ACTION_BY_NAME = rebuild_action_registry() def _lerp_q(q_start: list[float], q_end: list[float], t: float) -> list[float]: """Linear interpolation between two joint-position vectors, t in [0,1].""" return [a + (b - a) * t for a, b in zip(q_start, q_end)] def _load_frames(path: Path) -> list[dict[str, Any]]: """Read JSONL file, return list of frames with 't' and 'q' keys.""" frames = [] with open(path, "r") as f: for line in f: line = line.strip() if not line: continue data = json.loads(line) if "q" in data: frames.append(data) return frames def _load_home_q(home_file: str = "arm_home.jsonl") -> list[float] | None: path = MOTIONS_DIR / home_file if not path.exists(): return None frames = _load_frames(path) return frames[0]["q"] if frames else None class ArmController: """Thread-safe arm controller with real DDS replay and simulation fallback.""" def __init__(self): self._lock = threading.Lock() self._cancel = threading.Event() self._is_busy = False self._last_action_time = 0.0 self.cooldown_sec = 1.0 self._initialized = False # DDS handles (set in init()) self._arm_pub = None self._state_sub = None self._low_cmd = None self._crc = None self._arm_client = None # Live state from LowState_ subscriber self._current_q: list[float] = [0.0] * G1_NUM_MOTOR self._last_state_time = 0.0 self._state_lock = threading.Lock() self._first_state_event = threading.Event() # Cached motion file metadata self._motion_files_cache: dict[str, dict[str, Any]] = {} # -- initialization -- def init(self, network_interface: str = "lo") -> bool: if self._initialized: return True if not _HAS_SDK: log.info("Simulation mode — DDS init skipped") return False try: ChannelFactoryInitialize(0, network_interface) self._arm_pub = ChannelPublisher("rt/arm_sdk", LowCmd_) self._arm_pub.Init() self._state_sub = ChannelSubscriber("rt/lowstate", LowState_) self._state_sub.Init(self._on_low_state, 10) # IDL types need the SDK's default factory — bare LowCmd_() fails # because the dataclass has 5 required positional fields. if _make_low_cmd is not None: self._low_cmd = _make_low_cmd() else: # Last-resort: try a few constructor signatures try: self._low_cmd = LowCmd_() except TypeError: # Build with explicit zeroed fields from unitree_sdk2py.idl.unitree_hg.msg.dds_ import MotorCmd_ try: from unitree_sdk2py.idl.default import ( unitree_hg_msg_dds__MotorCmd_ as _make_motor_cmd, ) except ImportError: _make_motor_cmd = lambda: MotorCmd_( mode=0, q=0.0, dq=0.0, tau=0.0, kp=0.0, kd=0.0, reserve=0, ) self._low_cmd = LowCmd_( mode_pr=0, mode_machine=0, motor_cmd=[_make_motor_cmd() for _ in range(35)], reserve=[0, 0, 0, 0], crc=0, ) self._crc = CRC() # Arm-specific action client for built-in moves if _HAS_ARM_CLIENT: try: self._arm_client = G1ArmActionClient() self._arm_client.SetTimeout(10.0) self._arm_client.Init() log.info("G1ArmActionClient initialized (%d actions) — built-in moves available", len(_ARM_ACTION_MAP)) except Exception as exc: log.warning("G1ArmActionClient init failed: %s — built-in actions disabled", exc) self._arm_client = None self._initialized = True log.info("DDS initialized on %s", network_interface) except Exception as exc: log.error("DDS init failed: %s", exc) return self._initialized def _on_low_state(self, msg): """Callback from DDS subscriber — updates current joint positions.""" with self._state_lock: self._current_q = [float(msg.motor_state[i].q) for i in range(G1_NUM_MOTOR)] self._last_state_time = time.monotonic() if not self._first_state_event.is_set(): self._first_state_event.set() def wait_for_state(self, timeout: float = 2.0) -> bool: """Block until first LowState_ callback fires (or timeout). Returns True if state received.""" return self._first_state_event.wait(timeout=timeout) # -- internal API exposed for teaching/macro_player (encapsulation boundary) -- def get_current_q(self) -> list[float]: """Public read of current joint positions.""" return self._get_current_q() def send_frame(self, arm_target_q: list[float], body_lock_q: list[float]): """Public single-frame send. Use only inside a controlled playback loop.""" self._send_frame(arm_target_q, body_lock_q) def disable(self): """Public disable — releases arm SDK control.""" self._disable_sdk() def state_age(self) -> float: """Seconds since last LowState_ callback.""" return self._state_age() def _get_current_q(self) -> list[float]: with self._state_lock: return list(self._current_q) def _state_age(self) -> float: with self._state_lock: return time.monotonic() - self._last_state_time if self._last_state_time else 999.0 # -- frame sending (real DDS with CRC) -- def _send_frame(self, arm_target_q: list[float], body_lock_q: list[float]): """Send one motor frame via DDS. Body stays locked, arms get target.""" if not self._initialized or self._low_cmd is None: return # Enable ARM_SDK self._low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 1.0 for i in range(G1_NUM_MOTOR): self._low_cmd.motor_cmd[i].mode = 1 self._low_cmd.motor_cmd[i].dq = 0.0 self._low_cmd.motor_cmd[i].tau = 0.0 # Arms get replay data, body stays locked if i >= 15: self._low_cmd.motor_cmd[i].q = arm_target_q[i] else: self._low_cmd.motor_cmd[i].q = body_lock_q[i] # Per-motor gains if i in WEAK_MOTORS: kp, kd = KP_LOW, KD_LOW elif i in WRIST_MOTORS: kp, kd = KP_WRIST, KD_WRIST else: kp, kd = KP_HIGH, KD_HIGH self._low_cmd.motor_cmd[i].kp = kp self._low_cmd.motor_cmd[i].kd = kd self._low_cmd.crc = self._crc.Crc(self._low_cmd) self._arm_pub.Write(self._low_cmd) def _disable_sdk(self): """Send 10 disable frames at 50 Hz — direct port of g1_replay_v4_stable.py:DisableSDK (lines 141-147).""" if not self._initialized or self._low_cmd is None: return self._low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 0.0 self._low_cmd.crc = self._crc.Crc(self._low_cmd) for _ in range(10): self._arm_pub.Write(self._low_cmd) time.sleep(0.02) # -- replay engine -- def replay_file(self, filepath: str, speed: float = 1.0): """Play a JSONL motion file with full production replay logic. Args: filepath: Path to .jsonl motion file. speed: Playback speed multiplier (1.0 = normal). """ with self._lock: if self._is_busy: log.warning("replay_file: arm busy, skipping") return self._is_busy = True self._cancel.clear() try: self._replay_file_inner(filepath, speed) finally: with self._lock: self._is_busy = False self._last_action_time = time.monotonic() def _replay_file_inner(self, filepath: str, speed: float = 1.0): """One-for-one port of g1_replay_v4_stable.py:ReplayWithHome.Run(). Five phases — timing and math match the reference exactly: 1. Wait for first LowState_ message (no body-lock from zeros). 2. Load data: home_q (last valid frame of arm_home.jsonl), full_body_lock_q (live snapshot), and the replay frames. 3. MOVE TO START — 60 steps at 60 Hz, alpha = k/steps (starts at 0 = exact current pose, ends at 59/60 just shy of target). 3b. SETTLE HOLD — replaces the reference's human `input("Press Enter to Begin")` pause; gives the physical motors time to reach the commanded start pose before playback so the first real frames don't jerk. 4. PLAY — `for f in frames: if f['t']-t0 >= play_elapsed` frame-select pattern, fixed 1/REPLAY_HZ sleep per iteration. 5. RETURN HOME — 180 steps alpha = k/steps from last_played_q to home_q, body locked. Then DisableSDK (10 frames). """ path = Path(filepath) if not path.is_absolute(): path = MOTIONS_DIR / path if not path.exists(): raise FileNotFoundError(f"Motion file not found: {path}") frames = _load_frames(path) if not frames: log.warning("Empty motion file: %s", path) return if not _HAS_SDK: duration = len(frames) / REPLAY_HZ / speed if speed else len(frames) / REPLAY_HZ log.info("[SIM] Replay %s (%.1fs, %d frames, speed=%.1f)", path.name, duration, len(frames), speed) self._sim_replay(frames, speed) return log.info("Replay %s (%d frames @ %.0f Hz, speed=%.1f)", path.name, len(frames), REPLAY_HZ, speed) # ─── 1. Wait for first LowState ───────────────────────── if not self._first_state_event.is_set(): log.warning("Waiting for first LowState message...") if not self._first_state_event.wait(timeout=2.0): log.error("No LowState received in 2s — refusing to replay (would lock body to zeros)") return # ─── 2. Load data (ref lines 154-166) ─────────────────── home_q = _load_home_q() or [0.0] * G1_NUM_MOTOR full_body_lock_q = self._get_current_q() # snapshot live state interval = 1.0 / REPLAY_HZ file_start_q = frames[0]["q"] # ─── 3. MOVE TO START (ref lines 171-181) ─────────────── log.debug("Moving to start (%d steps)", RAMP_IN_STEPS) for k in range(RAMP_IN_STEPS): if self._cancel.is_set(): self._return_home(full_body_lock_q, full_body_lock_q, home_q) return alpha = k / RAMP_IN_STEPS # 0 .. (RAMP_IN_STEPS-1)/RAMP_IN_STEPS interp_q = list(full_body_lock_q) for j in range(15, G1_NUM_MOTOR): interp_q[j] = (1 - alpha) * full_body_lock_q[j] + alpha * file_start_q[j] self._send_frame(interp_q, full_body_lock_q) time.sleep(interval) # ─── 3b. SETTLE HOLD — replaces reference's Enter pause ─ settle_frames = max(0, int(SETTLE_HOLD_SEC * REPLAY_HZ)) if settle_frames > 0: log.debug("Settle hold (%d frames / %.2fs)", settle_frames, SETTLE_HOLD_SEC) for _ in range(settle_frames): if self._cancel.is_set(): self._return_home(file_start_q, full_body_lock_q, home_q) return self._send_frame(file_start_q, full_body_lock_q) time.sleep(interval) # ─── 4. PLAY (ref lines 183-234) ──────────────────────── log.debug("Playing %d frames", len(frames)) last_played_q = file_start_q play_elapsed = 0.0 last_real = time.monotonic() t0 = frames[0].get("t", 0.0) while True: if self._cancel.is_set(): break # Watchdog — abort if LowState goes stale age = self._state_age() if age > WATCHDOG_DISABLE_AFTER: log.error("Watchdog abort — LowState stale %.2fs", age) self._disable_sdk() return now_real = time.monotonic() dt_real = now_real - last_real last_real = now_real play_elapsed += dt_real * speed # Pick the next frame whose timestamp has elapsed (reference pattern) target_frame = None for f in frames: if f.get("t", 0.0) - t0 >= play_elapsed: target_frame = f break if target_frame is None: break self._send_frame(target_frame["q"], full_body_lock_q) last_played_q = target_frame["q"] time.sleep(interval) # ─── 5. RETURN HOME (ref lines 239-256) + DisableSDK ──── self._return_home(last_played_q, full_body_lock_q, home_q) def _return_home(self, from_q: list[float], body_lock_q: list[float], home_q: list[float]): """Smooth return to home — direct port of g1_replay_v4_stable.py:239-256. 180 steps × (1/60)s = 3s linear ramp on arm motors only (indices 15-28); body motors (0-14) stay locked to `body_lock_q`. Then DisableSDK sends 10 disable-bit frames at 50 Hz. IMPORTANT: the reference's return-home is unconditional — it always runs to completion regardless of why the play loop ended (natural end OR 'q' press). We clear `_cancel` at entry so a user-hit Cancel (which set `_cancel` to break the play loop) doesn't also abort the return ramp. Without this, the arm "snaps" home because the loop exits on the first iteration. """ self._cancel.clear() log.info("Returning home (%d steps / %.1fs)", RAMP_OUT_STEPS, RAMP_OUT_STEPS / REPLAY_HZ) interval = 1.0 / REPLAY_HZ for k in range(RAMP_OUT_STEPS): alpha = k / RAMP_OUT_STEPS # 0 .. (RAMP_OUT_STEPS-1)/RAMP_OUT_STEPS interp_q = list(from_q) for j in range(15, G1_NUM_MOTOR): interp_q[j] = (1 - alpha) * from_q[j] + alpha * home_q[j] self._send_frame(interp_q, body_lock_q) time.sleep(interval) self._disable_sdk() log.info("Home reached, SDK disabled") def _sim_replay(self, frames: list[dict], speed: float): """Simulation replay — emit events, sleep for equivalent duration.""" interval = 1.0 / REPLAY_HZ for i, frame in enumerate(frames): if self._cancel.is_set(): log.info("[SIM] Replay cancelled at frame %d/%d", i, len(frames)) return time.sleep(interval / max(speed, 0.1)) log.info("[SIM] Replay complete") # -- public API -- @property def is_busy(self) -> bool: return self._is_busy def cancel(self): """Cancel the currently running replay. Sets the cancel flag — the play loop in `_replay_file_inner` checks this and breaks out; `_return_home` then runs as the final phase of the same replay (matches the reference's end-of-Run behaviour in g1_replay_v4_stable.py). """ self._cancel.set() def _unused_return_to_home(self, duration_sec: float = 3.0, home_file: str = "arm_home.jsonl") -> None: """Deprecated — replay's own `_return_home` is called automatically when cancel breaks the play loop. Kept here only to preserve any external caller; no new code should use this. """ if not self._initialized or self._low_cmd is None: log.warning("return_to_home: arm controller not initialised") return if not self._first_state_event.wait(timeout=2.0): log.error("return_to_home: no LowState received in 2s — aborting") return home_path = MOTIONS_DIR / home_file if not home_path.exists(): log.warning("return_to_home: %s missing — skipping", home_path.name) return # Use the LAST valid 'q' in the file as the settle pose home_q: list[float] | None = None try: for frame in _load_frames(home_path): q = frame.get("q") if q and len(q) == G1_NUM_MOTOR: home_q = q except Exception as exc: log.warning("return_to_home: reading %s failed: %s", home_path.name, exc) return if home_q is None: log.warning("return_to_home: %s has no valid 'q' frames", home_path.name) return with self._state_lock: start_q = list(self._current_q) body_lock_q = list(start_q) # Let the ramp publish frames even though we just cancelled self._cancel.clear() with self._lock: if self._is_busy: # A pending replay is still winding down — wait a beat log.debug("return_to_home: arm busy, waiting briefly") self._is_busy = True try: steps = max(30, int(duration_sec * REPLAY_HZ)) # ≥ 0.5s ramp dt = 1.0 / REPLAY_HZ log.info("return_to_home: ramp %d steps (%.1fs) → %s", steps, duration_sec, home_file) for k in range(steps): if self._cancel.is_set(): log.info("return_to_home: cancelled mid-ramp") break alpha = (k + 1) / steps interp_q = list(body_lock_q) for j in range(15, G1_NUM_MOTOR): interp_q[j] = (1 - alpha) * start_q[j] + alpha * home_q[j] self._send_frame(interp_q, body_lock_q) time.sleep(dt) log.info("return_to_home: done") finally: with self._lock: self._is_busy = False def refresh_actions(self): """Re-scan data/motions/ and rebuild the action registry.""" global ACTIONS, ACTION_BY_ID, ACTION_BY_NAME ACTIONS, ACTION_BY_ID, ACTION_BY_NAME = rebuild_action_registry() def list_actions(self) -> list[dict[str, Any]]: return [ {"id": a.id, "name": a.name, "file": a.file, "category": a.category} for a in ACTIONS ] def list_motion_files(self) -> list[dict[str, Any]]: """List all JSONL files in data/motions/ with metadata. Caches frame count by (path, mtime) to avoid re-parsing megabytes of JSONL on every dashboard refresh. """ MOTIONS_DIR.mkdir(parents=True, exist_ok=True) result = [] for p in sorted(MOTIONS_DIR.glob("*.jsonl")): stat = p.stat() cache_key = f"{p}:{stat.st_mtime_ns}" cached = self._motion_files_cache.get(cache_key) if cached is None: frames = _load_frames(p) duration = len(frames) / REPLAY_HZ if frames else 0 cached = { "name": p.name, "path": str(p), "frames": len(frames), "duration_sec": round(duration, 2), "size_kb": round(stat.st_size / 1024, 1), } # Drop stale entries for this path before adding new one stale = [k for k in self._motion_files_cache if k.startswith(f"{p}:")] for k in stale: self._motion_files_cache.pop(k, None) self._motion_files_cache[cache_key] = cached result.append(cached) return result def trigger_by_id(self, action_id: int, speed: float = 1.0): action = ACTION_BY_ID.get(action_id) if action is None: raise KeyError(f"Unknown action id: {action_id}") self._execute(action, speed) def trigger_by_name(self, name: str, speed: float = 1.0): action = ACTION_BY_NAME.get(name) if action is None: raise KeyError(f"Unknown action: {name}") self._execute(action, speed) def _execute(self, action: Action, speed: float = 1.0): with self._lock: if self._is_busy: log.warning("Arm busy, skipping %s", action.name) return self._is_busy = True self._cancel.clear() try: bus.emit_sync("motion.action_started", action=action.name) if action.file: self._replay_file_inner(action.file, speed=speed) else: self._run_sdk_action(action) except Exception as exc: log.error("Action %s failed: %s", action.name, exc) finally: with self._lock: self._is_busy = False self._last_action_time = time.monotonic() bus.emit_sync("motion.action_done", action=action.name) def _run_sdk_action(self, action: Action): if not _HAS_SDK: log.info("[SIM] SDK action: %s (id=%d)", action.name, action.id) time.sleep(2.0) return if self._arm_client is None: log.warning( "SDK action %s requested but G1ArmActionClient not available — skipping", action.name, ) return # Sanad's registry uses underscored names ("shake_hand", "x_ray"); # the SDK's action_map is keyed by human-readable forms that mix # spaces and hyphens ("shake hand", "x-ray", "two-hand kiss"). # Try each candidate in turn. name = action.name candidates = [ name, name.replace("_", " "), # shake_hand → shake hand name.replace("_", "-"), # x_ray → x-ray # two-word with specific hyphenation: first token with hyphen, # rest with spaces (matches SDK's "two-hand kiss" pattern) name.replace("_", "-", 1).replace("_", " "), ] sdk_name = next((c for c in candidates if c in _ARM_ACTION_MAP), None) if sdk_name is None: log.warning( "SDK action %s not in G1ArmActionClient action_map — tried %s. keys=%s", action.name, candidates, sorted(_ARM_ACTION_MAP.keys())[:12], ) return opcode = _ARM_ACTION_MAP[sdk_name] log.info("SDK action: %s (opcode=%s)", action.name, opcode) try: self._arm_client.ExecuteAction(opcode) # Built-in arm actions block on the robot side for ~3s; the SDK # call returns immediately. Sleep so we don't hammer it back-to-back. time.sleep(3.0) except Exception as exc: log.error("SDK action %s failed: %s", action.name, exc) def status(self) -> dict[str, Any]: return { "initialized": self._initialized, "sdk_available": _HAS_SDK, "busy": self._is_busy, "state_age_sec": round(self._state_age(), 3), "sdk_actions": len(SDK_ACTIONS), "jsonl_actions": len([a for a in ACTIONS if a.category == "jsonl"]), "total_actions": len(ACTIONS), "total_motion_files": len(list(MOTIONS_DIR.glob("*.jsonl"))) if MOTIONS_DIR.exists() else 0, }