219 lines
9.7 KiB
Python
219 lines
9.7 KiB
Python
"""
|
|
motion_log.py — structured per-motion event log.
|
|
|
|
Drop-in: brain handlers and worker call log_motion(event, cmd, ...)
|
|
and the helper auto-parses direction/magnitude/unit from the canonical
|
|
string. The worker barely needs to know what it's logging.
|
|
|
|
Every motion the brain or worker executes writes ONE line to
|
|
`logs/motion.log` capturing:
|
|
|
|
timestamp | canonical | direction mag unit | target | actual | outcome | source
|
|
|
|
So a single voice command produces a paired START + END line:
|
|
|
|
17:45:01 | turn left 19 degrees | dir=left mag=19 unit=deg | target=2.4s | - | start | voice
|
|
17:45:03 | turn left 19 degrees | dir=left mag=19 unit=deg | target=2.4s | actual=2.41s | complete | voice
|
|
|
|
Or for a parameterless motion:
|
|
|
|
17:46:00 | turn right | dir=right mag=- unit=- | target=2.0s | - | start | voice
|
|
17:46:02 | turn right | dir=right mag=- unit=- | target=2.0s | actual=2.02s | complete | voice
|
|
|
|
This is independent of voice.log and transcript.log:
|
|
- voice.log = per-thread debug detail (dispatch, worker, watchdog,
|
|
reconnect)
|
|
- transcript.log = user-facing dialogue (what user said vs. what
|
|
Gemini said vs. what canonical fired)
|
|
- motion.log = ONE LINE PER MOTION with parsed params + actual
|
|
outcome — for post-hoc auditing of physical movements
|
|
|
|
API:
|
|
log_motion(event, canonical, direction=..., magnitude=..., unit=...,
|
|
target_sec=..., actual_sec=..., source="voice"|"text",
|
|
outcome="start"|"complete"|"interrupted"|"error", **extra)
|
|
|
|
Events (event field):
|
|
'start' — brain handler is about to issue motor commands.
|
|
target_sec is the planned duration; actual is None.
|
|
'complete' — brain returned normally; actual_sec set.
|
|
'interrupted' — motion_abort fired during execution.
|
|
'error' — brain raised an exception; reason in extra.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
|
|
# ─── canonical parser ─────────────────────────────────────────────
|
|
#
|
|
# Map a canonical command string to (direction, magnitude, unit) so
|
|
# the structured log auto-fills these fields when the caller doesn't
|
|
# pass them explicitly.
|
|
|
|
_RE_TURN_DEG_DIR = re.compile(r"^turn\s+(left|right)\s+(\d+(?:\.\d+)?)\s*deg(?:rees?)?$", re.I)
|
|
_RE_TURN_DEG = re.compile(r"^turn\s+(\d+(?:\.\d+)?)\s*deg(?:rees?)?$", re.I)
|
|
_RE_TURN_STEP_DIR = re.compile(r"^turn\s+(left|right)\s+(\d+(?:\.\d+)?)\s*steps?$", re.I)
|
|
_RE_TURN_DIR = re.compile(r"^turn\s+(left|right|around)$", re.I)
|
|
_RE_WALK_STEP = re.compile(r"^walk\s+(?:(forward|back(?:ward)?)\s+)?(\d+(?:\.\d+)?)\s*steps?$", re.I)
|
|
_RE_WALK_M = re.compile(r"^walk\s+(?:(forward|back(?:ward)?)\s+)?(\d+(?:\.\d+)?)\s*m(?:eter(?:s)?)?$", re.I)
|
|
_RE_MOVE_DIR = re.compile(r"^move\s+(forward|back(?:ward)?|left|right)$", re.I)
|
|
_RE_WALK_TO = re.compile(r"^walk\s+to\s+(.+)$", re.I)
|
|
|
|
|
|
def parse_canonical(cmd: str) -> dict:
|
|
"""Parse a canonical command into structured motion fields.
|
|
Returns {direction, magnitude, unit, kind}. Best-effort — if the
|
|
canonical doesn't match any known shape, all fields are '-' and
|
|
kind='other'."""
|
|
out = {"direction": "-", "magnitude": "-", "unit": "-", "kind": "other"}
|
|
if not cmd:
|
|
return out
|
|
s = cmd.strip().lower()
|
|
|
|
m = _RE_TURN_DEG_DIR.match(s)
|
|
if m:
|
|
return {"direction": m.group(1), "magnitude": m.group(2),
|
|
"unit": "deg", "kind": "turn"}
|
|
m = _RE_TURN_DEG.match(s)
|
|
if m:
|
|
return {"direction": "-", "magnitude": m.group(1),
|
|
"unit": "deg", "kind": "turn"}
|
|
m = _RE_TURN_STEP_DIR.match(s)
|
|
if m:
|
|
return {"direction": m.group(1), "magnitude": m.group(2),
|
|
"unit": "step", "kind": "turn"}
|
|
m = _RE_TURN_DIR.match(s)
|
|
if m:
|
|
return {"direction": m.group(1), "magnitude": "-",
|
|
"unit": "-", "kind": "turn"}
|
|
m = _RE_WALK_STEP.match(s)
|
|
if m:
|
|
d = (m.group(1) or "forward").lower()
|
|
if d.startswith("back"): d = "backward"
|
|
return {"direction": d, "magnitude": m.group(2),
|
|
"unit": "step", "kind": "walk"}
|
|
m = _RE_WALK_M.match(s)
|
|
if m:
|
|
d = (m.group(1) or "forward").lower()
|
|
if d.startswith("back"): d = "backward"
|
|
return {"direction": d, "magnitude": m.group(2),
|
|
"unit": "m", "kind": "walk"}
|
|
m = _RE_MOVE_DIR.match(s)
|
|
if m:
|
|
d = m.group(1).lower()
|
|
if d.startswith("back"): d = "backward"
|
|
return {"direction": d, "magnitude": "-",
|
|
"unit": "-", "kind": "move"}
|
|
m = _RE_WALK_TO.match(s)
|
|
if m:
|
|
return {"direction": "tracked", "magnitude": m.group(1).strip(),
|
|
"unit": "target", "kind": "walk_to"}
|
|
|
|
# Bare canonicals
|
|
if s == "stop": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "stop"}
|
|
if s == "sit down": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "posture"}
|
|
if s == "stand up": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "posture"}
|
|
if s == "wave hello":return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "arm"}
|
|
if s == "raise arm": return {"direction": "right", "magnitude": "-", "unit": "-", "kind": "arm"}
|
|
if s == "lower arm": return {"direction": "right", "magnitude": "-", "unit": "-", "kind": "arm"}
|
|
if s == "come here": return {"direction": "tracked", "magnitude": "-", "unit": "yolo", "kind": "come"}
|
|
if s == "follow me": return {"direction": "tracked", "magnitude": "-", "unit": "yolo", "kind": "follow"}
|
|
if s == "look around":return {"direction": "scan", "magnitude": "-", "unit": "-", "kind": "look"}
|
|
if s == "stay here": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "stay"}
|
|
if s == "go home": return {"direction": "home", "magnitude": "-", "unit": "-", "kind": "nav"}
|
|
if s == "patrol": return {"direction": "scan", "magnitude": "-", "unit": "-", "kind": "patrol"}
|
|
if s == "pause motion": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "pause"}
|
|
if s == "resume motion": return {"direction": "-", "magnitude": "-", "unit": "-", "kind": "resume"}
|
|
return out
|
|
|
|
try:
|
|
from Core.env_loader import PROJECT_ROOT # type: ignore
|
|
except Exception:
|
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
_LOG_DIR = os.path.join(PROJECT_ROOT, "logs")
|
|
os.makedirs(_LOG_DIR, exist_ok=True)
|
|
_LOG_PATH = os.path.join(_LOG_DIR, "motion.log")
|
|
|
|
_logger = logging.getLogger("marcus.motion")
|
|
_logger.setLevel(logging.INFO)
|
|
_logger.propagate = False
|
|
if not _logger.handlers:
|
|
_h = RotatingFileHandler(
|
|
_LOG_PATH, maxBytes=2_000_000, backupCount=3, encoding="utf-8",
|
|
)
|
|
# Use pipe-separator format so the file is grep/awk friendly.
|
|
_h.setFormatter(logging.Formatter("%(asctime)s | %(message)s"))
|
|
_logger.addHandler(_h)
|
|
|
|
|
|
def log_motion(event: str,
|
|
canonical: str,
|
|
direction=None,
|
|
magnitude=None,
|
|
unit=None,
|
|
target_sec=None,
|
|
actual_sec=None,
|
|
source: str = "voice",
|
|
**extra) -> None:
|
|
"""Emit one structured line describing a motion event.
|
|
|
|
direction/magnitude/unit auto-fill from parse_canonical(canonical)
|
|
if not passed — so the worker just calls log_motion('start', cmd)
|
|
and the parser handles the rest. Override any field by passing it
|
|
explicitly (useful when the brain knows odom-measured actuals).
|
|
Fields render with fixed width for grep/awk friendliness. Extra
|
|
kwargs become trailing 'k=v' tokens.
|
|
"""
|
|
def _f(v):
|
|
if v is None: return "-"
|
|
if isinstance(v, float): return "{:.2f}".format(v)
|
|
return str(v)
|
|
|
|
canonical = (canonical or "").strip() or "-"
|
|
parsed = parse_canonical(canonical) if canonical != "-" else {}
|
|
direction = direction if direction is not None else parsed.get("direction", "-")
|
|
magnitude = magnitude if magnitude is not None else parsed.get("magnitude", "-")
|
|
unit = unit if unit is not None else parsed.get("unit", "-")
|
|
|
|
parts = [
|
|
"{:35s}".format(canonical[:35]),
|
|
"dir={:8s}".format(_f(direction)[:8]),
|
|
"mag={:8s}".format(_f(magnitude)[:8]),
|
|
"unit={:6s}".format(_f(unit)[:6]),
|
|
"target={:>7s}".format(_f(target_sec) + "s" if target_sec is not None else "-"),
|
|
"actual={:>7s}".format(_f(actual_sec) + "s" if actual_sec is not None else "-"),
|
|
"{:11s}".format(event[:11]),
|
|
"{:5s}".format(source[:5]),
|
|
]
|
|
if extra:
|
|
parts.append(" ".join("{}={}".format(k, _f(v)) for k, v in extra.items()))
|
|
_logger.info(" | ".join(parts))
|
|
|
|
|
|
# Convenience tee so console + log show the same one-liner
|
|
def print_motion(event: str, canonical: str, **kw) -> None:
|
|
"""Like log_motion but ALSO prints to stdout so the operator sees
|
|
the line in the terminal dashboard. Use for the high-traffic
|
|
'start' and 'complete' events; use log_motion alone for the noisier
|
|
intermediate events."""
|
|
log_motion(event, canonical, **kw)
|
|
direction = kw.get("direction", "-")
|
|
magnitude = kw.get("magnitude", "-")
|
|
unit = kw.get("unit", "-")
|
|
target = kw.get("target_sec")
|
|
actual = kw.get("actual_sec")
|
|
src = kw.get("source", "voice")
|
|
line = " [Motion {:9s}] {:30s} dir={} mag={} unit={}".format(
|
|
event, canonical[:30], direction, magnitude, unit,
|
|
)
|
|
if target is not None:
|
|
line += " target={:.2f}s".format(float(target))
|
|
if actual is not None:
|
|
line += " actual={:.2f}s".format(float(actual))
|
|
print(line)
|