Marcus/Core/motion_log.py

219 lines
9.7 KiB
Python

"""
motion_log.py — structured per-motion event log.
Drop-in: brain handlers and worker call log_motion(event, cmd, ...)
and the helper auto-parses direction/magnitude/unit from the canonical
string. The worker barely needs to know what it's logging.
Every motion the brain or worker executes writes ONE line to
`logs/motion.log` capturing:
timestamp | canonical | direction mag unit | target | actual | outcome | source
So a single voice command produces a paired START + END line:
17:45:01 | turn left 19 degrees | dir=left mag=19 unit=deg | target=2.4s | - | start | voice
17:45:03 | turn left 19 degrees | dir=left mag=19 unit=deg | target=2.4s | actual=2.41s | complete | voice
Or for a parameterless motion:
17:46:00 | turn right | dir=right mag=- unit=- | target=2.0s | - | start | voice
17:46:02 | turn right | dir=right mag=- unit=- | target=2.0s | actual=2.02s | complete | voice
This is independent of voice.log and transcript.log:
- voice.log = per-thread debug detail (dispatch, worker, watchdog,
reconnect)
- transcript.log = user-facing dialogue (what user said vs. what
Gemini said vs. what canonical fired)
- motion.log = ONE LINE PER MOTION with parsed params + actual
outcome — for post-hoc auditing of physical movements
API:
log_motion(event, canonical, direction=..., magnitude=..., unit=...,
target_sec=..., actual_sec=..., source="voice"|"text",
outcome="start"|"complete"|"interrupted"|"error", **extra)
Events (event field):
'start' — brain handler is about to issue motor commands.
target_sec is the planned duration; actual is None.
'complete' — brain returned normally; actual_sec set.
'interrupted' — motion_abort fired during execution.
'error' — brain raised an exception; reason in extra.
"""
from __future__ import annotations
import logging
import os
import re
from logging.handlers import RotatingFileHandler
# ─── canonical parser ─────────────────────────────────────────────
#
# Map a canonical command string to (direction, magnitude, unit) so
# the structured log auto-fills these fields when the caller doesn't
# pass them explicitly.
_RE_TURN_DEG_DIR = re.compile(r"^turn\s+(left|right)\s+(\d+(?:\.\d+)?)\s*deg(?:rees?)?$", re.I)
_RE_TURN_DEG = re.compile(r"^turn\s+(\d+(?:\.\d+)?)\s*deg(?:rees?)?$", re.I)
_RE_TURN_STEP_DIR = re.compile(r"^turn\s+(left|right)\s+(\d+(?:\.\d+)?)\s*steps?$", re.I)
_RE_TURN_DIR = re.compile(r"^turn\s+(left|right|around)$", re.I)
_RE_WALK_STEP = re.compile(r"^walk\s+(?:(forward|back(?:ward)?)\s+)?(\d+(?:\.\d+)?)\s*steps?$", re.I)
_RE_WALK_M = re.compile(r"^walk\s+(?:(forward|back(?:ward)?)\s+)?(\d+(?:\.\d+)?)\s*m(?:eter(?:s)?)?$", re.I)
_RE_MOVE_DIR = re.compile(r"^move\s+(forward|back(?:ward)?|left|right)$", re.I)
_RE_WALK_TO = re.compile(r"^walk\s+to\s+(.+)$", re.I)
def parse_canonical(cmd: str) -> dict:
"""Parse a canonical command into structured motion fields.
Returns {direction, magnitude, unit, kind}. Best-effort — if the
canonical doesn't match any known shape, all fields are '-' and
kind='other'."""
out = {"direction": "-", "magnitude": "-", "unit": "-", "kind": "other"}
if not cmd:
return out
s = cmd.strip().lower()
m = _RE_TURN_DEG_DIR.match(s)
if m:
return {"direction": m.group(1), "magnitude": m.group(2),
"unit": "deg", "kind": "turn"}
m = _RE_TURN_DEG.match(s)
if m:
return {"direction": "-", "magnitude": m.group(1),
"unit": "deg", "kind": "turn"}
m = _RE_TURN_STEP_DIR.match(s)
if m:
return {"direction": m.group(1), "magnitude": m.group(2),
"unit": "step", "kind": "turn"}
m = _RE_TURN_DIR.match(s)
if m:
return {"direction": m.group(1), "magnitude": "-",
"unit": "-", "kind": "turn"}
m = _RE_WALK_STEP.match(s)
if m:
d = (m.group(1) or "forward").lower()
if d.startswith("back"): d = "backward"
return {"direction": d, "magnitude": m.group(2),
"unit": "step", "kind": "walk"}
m = _RE_WALK_M.match(s)
if m:
d = (m.group(1) or "forward").lower()
if d.startswith("back"): d = "backward"
return {"direction": d, "magnitude": m.group(2),
"unit": "m", "kind": "walk"}
m = _RE_MOVE_DIR.match(s)
if m:
d = m.group(1).lower()
if d.startswith("back"): d = "backward"
return {"direction": d, "magnitude": "-",
"unit": "-", "kind": "move"}
m = _RE_WALK_TO.match(s)
if m:
return {"direction": "tracked", "magnitude": m.group(1).strip(),
"unit": "target", "kind": "walk_to"}
# Bare canonicals
if s == "stop": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "stop"}
if s == "sit down": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "posture"}
if s == "stand up": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "posture"}
if s == "wave hello":return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "arm"}
if s == "raise arm": return {"direction": "right", "magnitude": "-", "unit": "-", "kind": "arm"}
if s == "lower arm": return {"direction": "right", "magnitude": "-", "unit": "-", "kind": "arm"}
if s == "come here": return {"direction": "tracked", "magnitude": "-", "unit": "yolo", "kind": "come"}
if s == "follow me": return {"direction": "tracked", "magnitude": "-", "unit": "yolo", "kind": "follow"}
if s == "look around":return {"direction": "scan", "magnitude": "-", "unit": "-", "kind": "look"}
if s == "stay here": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "stay"}
if s == "go home": return {"direction": "home", "magnitude": "-", "unit": "-", "kind": "nav"}
if s == "patrol": return {"direction": "scan", "magnitude": "-", "unit": "-", "kind": "patrol"}
if s == "pause motion": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "pause"}
if s == "resume motion": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "resume"}
return out
try:
from Core.env_loader import PROJECT_ROOT # type: ignore
except Exception:
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_LOG_DIR = os.path.join(PROJECT_ROOT, "logs")
os.makedirs(_LOG_DIR, exist_ok=True)
_LOG_PATH = os.path.join(_LOG_DIR, "motion.log")
_logger = logging.getLogger("marcus.motion")
_logger.setLevel(logging.INFO)
_logger.propagate = False
if not _logger.handlers:
_h = RotatingFileHandler(
_LOG_PATH, maxBytes=2_000_000, backupCount=3, encoding="utf-8",
)
# Use pipe-separator format so the file is grep/awk friendly.
_h.setFormatter(logging.Formatter("%(asctime)s | %(message)s"))
_logger.addHandler(_h)
def log_motion(event: str,
canonical: str,
direction=None,
magnitude=None,
unit=None,
target_sec=None,
actual_sec=None,
source: str = "voice",
**extra) -> None:
"""Emit one structured line describing a motion event.
direction/magnitude/unit auto-fill from parse_canonical(canonical)
if not passed — so the worker just calls log_motion('start', cmd)
and the parser handles the rest. Override any field by passing it
explicitly (useful when the brain knows odom-measured actuals).
Fields render with fixed width for grep/awk friendliness. Extra
kwargs become trailing 'k=v' tokens.
"""
def _f(v):
if v is None: return "-"
if isinstance(v, float): return "{:.2f}".format(v)
return str(v)
canonical = (canonical or "").strip() or "-"
parsed = parse_canonical(canonical) if canonical != "-" else {}
direction = direction if direction is not None else parsed.get("direction", "-")
magnitude = magnitude if magnitude is not None else parsed.get("magnitude", "-")
unit = unit if unit is not None else parsed.get("unit", "-")
parts = [
"{:35s}".format(canonical[:35]),
"dir={:8s}".format(_f(direction)[:8]),
"mag={:8s}".format(_f(magnitude)[:8]),
"unit={:6s}".format(_f(unit)[:6]),
"target={:>7s}".format(_f(target_sec) + "s" if target_sec is not None else "-"),
"actual={:>7s}".format(_f(actual_sec) + "s" if actual_sec is not None else "-"),
"{:11s}".format(event[:11]),
"{:5s}".format(source[:5]),
]
if extra:
parts.append(" ".join("{}={}".format(k, _f(v)) for k, v in extra.items()))
_logger.info(" | ".join(parts))
# Convenience tee so console + log show the same one-liner
def print_motion(event: str, canonical: str, **kw) -> None:
"""Like log_motion but ALSO prints to stdout so the operator sees
the line in the terminal dashboard. Use for the high-traffic
'start' and 'complete' events; use log_motion alone for the noisier
intermediate events."""
log_motion(event, canonical, **kw)
direction = kw.get("direction", "-")
magnitude = kw.get("magnitude", "-")
unit = kw.get("unit", "-")
target = kw.get("target_sec")
actual = kw.get("actual_sec")
src = kw.get("source", "voice")
line = " [Motion {:9s}] {:30s} dir={} mag={} unit={}".format(
event, canonical[:30], direction, magnitude, unit,
)
if target is not None:
line += " target={:.2f}s".format(float(target))
if actual is not None:
line += " actual={:.2f}s".format(float(actual))
print(line)