Marcus/Brain/command_parser.py

866 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
command_parser.py — Local command regex patterns + dispatcher
Handles place memory, odometry, session recall, help, examples
"""
import re
import threading
import time
from API.zmq_api import send_vel, gradual_stop, MOVE_MAP, STEP_DURATION_SEC
from API.memory_api import mem, place_save, place_goto, places_list_str
from API.odometry_api import odom, ODOM_AVAILABLE
from API.camera_api import get_frame
from API.llava_api import ask, call_llava, parse_json
from Brain.executor import execute
from Core.motion_state import motion_abort, wait_while_paused
from Core.motion_log import log_motion, print_motion
# ── Compiled patterns ────────────────────────────────────────────────────────
_RE_REMEMBER = re.compile(
r"^(?:remember|save|mark|call|name|label)\s+(?:this|here|current|position)?\s*as\s+(.+)$", re.I)
_RE_GOTO = re.compile(
r"^(?:go\s+to|navigate\s+to|take\s+me\s+to|move\s+to|return\s+to|head\s+to)\s+(.+)$", re.I)
_RE_FORGET = re.compile(
r"^(?:forget|delete|remove)\s+(?:place\s+)?(.+)$", re.I)
_RE_RENAME = re.compile(
r"^rename\s+(.+?)\s+(?:to|as)\s+(.+)$", re.I)
_RE_WALK_DIST = re.compile(
r"^(?:walk|go|move)\s+(?:forward\s+)?(\d+(?:\.\d+)?)\s*m(?:eter(?:s)?)?$", re.I)
_RE_WALK_BACK = re.compile(
r"^(?:walk|go|move)\s+backward?\s+(\d+(?:\.\d+)?)\s*m(?:eter(?:s)?)?$", re.I)
_RE_TURN_DEG = re.compile(
r"^turn\s+(?:(left|right)\s+)?(\d+(?:\.\d+)?)\s*deg(?:ree(?:s)?)?$", re.I)
# Step-based motion: "walk 1 step", "walk forward 2 steps", "move back 1 step",
# "turn left", "turn right 2 steps". Kept local so these never fall through to
# the VLM — on the Jetson the cold-load is 60-90 s and we don't want to pay
# that for a trivial two-second motion. One step = 2 s of motion at the default
# velocity, matching the undo-loop duration already used below.
_RE_WALK_STEP = re.compile(
r"^(?:walk|go|move|step)(?:\s+(forward|back(?:ward)?))?\s+(\d+(?:\.\d+)?)\s*steps?$", re.I)
_RE_TURN_STEP = re.compile(
r"^turn\s+(left|right)(?:\s+(\d+(?:\.\d+)?)\s*steps?)?$", re.I)
# Simple one-shot motion — one word or verb+direction, no counts/units.
# All default to a ~2 s motion at the normal velocity. Kept local so the
# user doesn't eat a 5 s Qwen round-trip for a trivial "go back".
#
# Matches:
# "left" / "right" / "forward" / "back" / "backward"
# "go back" / "step back" / "move back" / "walk back" / "run back"
# "go forward" / "step forward" / "move forward" / "walk forward"
# "go left" / "move right" / "step left" / etc.
# "head forward" / "head back"
# Does NOT match multi-word phrases like "walk to the chair" — those
# still fall through to Qwen where they belong.
_RE_SIMPLE_DIR = re.compile(
r"^(?:(?:walk|go|move|step|run|head)\s+)?"
r"(forward|back(?:ward)?|left|right)$",
re.I,
)
# Bare stop / pause words — no need to ask Qwen what "stop" means.
_RE_STOP_SIMPLE = re.compile(
r"^(?:stop|halt|wait|pause|stay|freeze|hold|stand\s+still|don'?t\s+move)$",
re.I,
)
_RE_PATROL_RT = re.compile(
r"^patrol[/:]\s*(.+)$", re.I)
_RE_LAST_CMD = re.compile(
r"^(?:last\s+command|what\s+did\s+i\s+(?:say|type)\s+last|repeat\s+last)$", re.I)
_RE_DO_AGAIN = re.compile(
r"^(?:do\s+that\s+again|repeat|again|redo)$", re.I)
_RE_UNDO = re.compile(
r"^(?:undo|go\s+back\s+(?:to\s+)?(?:where|from\s+where)\s+(?:you|i)\s+(?:started|were|came)|reverse\s+last|turn\s+back\s+from).*$", re.I)
_RE_LAST_SESS = re.compile(
r"^(?:last\s+session|what\s+(?:did\s+you\s+do|happened)\s+last\s+(?:session|time)|previous\s+session)$", re.I)
_RE_WHERE = re.compile(
r"^(?:where\s+am\s+i|current\s+position|my\s+(?:location|position)|position)$", re.I)
_RE_GO_HOME = re.compile(
r"^(?:go\s+home|return\s+to\s+start|come\s+back\s+home|go\s+back\s+to\s+start)$", re.I)
_RE_SESSION_SUMMARY = re.compile(
r"^(?:session\s+summary|what\s+happened\s+today|session\s+report)$", re.I)
_RE_AUTO = re.compile(
r"^auto(?:nomous)?\s+(on|off|status|save|summary)$", re.I)
# ── DIRECT-MOTION CANONICALS ─────────────────────────────────────────────
# Canonicals that the dispatcher emits but were previously falling through
# to the LLaVA/Qwen fallback (530s blocking call on Jetson). Each maps
# to a fast, deterministic motion or arm gesture. See Bug #3 in the
# 14-bug audit.
_RE_TURN_AROUND = re.compile(r"^turn[\s_-]+around$", re.I)
_RE_SIT_DOWN = re.compile(r"^sit[\s_-]+down$", re.I)
_RE_STAND_UP = re.compile(r"^stand[\s_-]+up$", re.I)
_RE_WAVE_HELLO = re.compile(r"^wave(?:[\s_-]+(?:hello|hi))?$", re.I)
_RE_RAISE_ARM = re.compile(r"^raise[\s_-]+(?:right[\s_-]+)?arm$", re.I)
_RE_LOWER_ARM = re.compile(r"^lower[\s_-]+(?:right[\s_-]+)?arm$", re.I)
_RE_STAY_HERE = re.compile(r"^stay[\s_-]+(?:here|in[\s_-]+place)$", re.I)
_RE_POINT_GESTURE = re.compile(r"^point(?:[\s_-]+(?:at|to)[\s_-]+.*)?$", re.I)
# Note: 'move left' / 'move right' canonicals are handled by the existing
# _RE_SIMPLE_DIR (verb=move, direction=left/right via MOVE_MAP). MOVE_MAP
# in config_Navigation.json defines 'left'/'right' as ROTATIONS, not
# strafes — historical decision; if real strafes are wanted later add
# vy axis to MOVE_MAP and a separate strafe handler.
# ── HIGHER-LEVEL MOTIONS ─────────────────────────────────────────────────────
# 'look around' = scan-left + return + scan-right + return; uses execute_action
# so motion_abort halts it cleanly.
_RE_LOOK_AROUND = re.compile(
r"^(?:look[\s_-]+around|scan[\s_-]+(?:the\s+)?(?:room|area)|survey[\s_-]+(?:the\s+)?surroundings?)$",
re.I,
)
# 'follow me' = walk forward when YOLO sees a person and they aren't too close,
# else hold. Loop bounded by FOLLOW_ME_MAX_SEC and motion_abort.
_RE_FOLLOW_ME = re.compile(
r"^(?:follow[\s_-]+me|come[\s_-]+with[\s_-]+me|stick[\s_-]+with[\s_-]+me)$",
re.I,
)
# Spatial planner: 'walk to/towards the <X>'. Distinct from _RE_GOTO
# above because that one is reserved for place-memory names ('go to
# kitchen' looks up the saved 'kitchen' position). This planner uses
# LLaVA scene grounding to actually steer toward an arbitrary visible
# object — used for unsaved targets like 'the door' or 'the red chair'.
# Verbs: walk / step + to/towards/over to. We do NOT include 'go to'
# here to avoid colliding with place memory.
_RE_WALK_TO = re.compile(
r"^(?:(?:walk|step)\s+(?:over\s+)?(?:to|towards?|up\s+to)|approach)\s+"
r"(?:the\s+|a\s+|an\s+|that\s+|this\s+)?(.+?)$",
re.I,
)
# Tunable durations for higher-level motions. Kept inline (not config) so a
# hot-edit doesn't risk an import-time crash; tweak if the demo asks for
# longer scan / follow runs.
LOOK_AROUND_TURN_SEC = 1.0 # ~45° at default vyaw
LOOK_AROUND_HOLD_SEC = 0.4 # pause at each extreme so VLM can see
FOLLOW_ME_MAX_SEC = 30.0 # hard upper bound on a 'follow me' session
FOLLOW_ME_STEP_SEC = 0.5 # one forward burst per yolo-poll cycle
FOLLOW_ME_POLL_SEC = 0.2 # how often to re-check yolo when no person
# Spatial planner tunables. The planner asks LLaVA where the target is
# (left/center/right + near/medium/far) every WALK_TO_REPLAN_SEC, then
# turns to centre + walks forward in WALK_TO_STEP_SEC bursts. Total
# session bounded by WALK_TO_MAX_SEC.
WALK_TO_MAX_SEC = 45.0
WALK_TO_REPLAN_SEC = 2.5 # cadence of LLaVA scene queries
WALK_TO_STEP_SEC = 1.0 # forward burst length between re-plans
WALK_TO_TURN_SHORT_SEC = 0.4 # small correction (~18° at default vyaw)
WALK_TO_TURN_WIDE_SEC = 1.0 # larger correction when target is far off
WALK_TO_LLAVA_TIMEOUT_SEC = 15.0 # per-call deadline on the VLM scene query.
# Without this, ollama-python's default
# 5-min HTTP timeout would let a stalled
# VLM block the entire motion worker
# — and motion_abort can't break out of
# an in-flight HTTP call.
def _call_llava_with_timeout(prompt, img_b64, num_predict=120,
timeout_sec=WALK_TO_LLAVA_TIMEOUT_SEC):
"""Run call_llava in a worker thread with a hard deadline.
Returns the raw string response, or None on timeout/error.
We cannot truly cancel the HTTP request (ollama-python doesn't
expose a cancel handle), so the thread is left to finish on its
own — but the planner moves on. The orphaned thread will exit
when the HTTP eventually returns; we rely on it being a daemon
so it doesn't block process exit.
"""
result = {"raw": None, "err": None}
def _runner():
try:
result["raw"] = call_llava(prompt, img_b64, num_predict=num_predict)
except Exception as e:
result["err"] = str(e)
th = threading.Thread(target=_runner, daemon=True)
th.start()
th.join(timeout=timeout_sec)
if th.is_alive():
return None # timeout
if result["err"]:
return None # error
return result["raw"]
# Autonomous mode instance — set by init_autonomous()
_auto = None
def init_autonomous(auto_instance):
"""Wire in the AutonomousMode instance from marcus_brain."""
global _auto
_auto = auto_instance
def try_local_command(cmd: str) -> bool:
"""
Handle local commands (place, odom, memory, help).
Returns True if handled, False if not matched (send to LLaVA).
"""
# ── PLACE MEMORY ─────────────────────────────────────────────────────
m = _RE_REMEMBER.match(cmd)
if m:
place_save(m.group(1).strip())
return True
m = _RE_GOTO.match(cmd)
if m:
name = m.group(1).strip()
if name.lower() in ("start", "home", "beginning"):
if odom and ODOM_AVAILABLE:
odom.return_to_start()
else:
print(" [Places] Odometry not running — cannot return to start")
return True
# Try saved-place memory first. If place_goto returns False
# (no such place), fall through to the spatial planner — the
# user probably meant a visible object like 'the door' that
# was never saved as a place. Do this by rewriting the cmd
# to the planner's input shape and re-dispatching through
# try_local_command via _RE_WALK_TO below; we DON'T return
# True here so the rest of try_local_command runs.
if place_goto(name):
return True
# Re-target this same call through the spatial planner. We
# mutate the local 'cmd' string, then fall through to the
# _RE_WALK_TO branch later in this function (see further down).
print(f" [Places] '{name}' not saved — trying spatial planner")
cmd = "walk to " + name
# Note: don't return — let control fall through to _RE_WALK_TO.
m = _RE_FORGET.match(cmd)
if m:
if mem:
mem.delete_place(m.group(1).strip())
else:
print(" [Places] Memory not available")
return True
m = _RE_RENAME.match(cmd)
if m:
if mem:
mem.rename_place(m.group(1).strip(), m.group(2).strip())
else:
print(" [Places] Memory not available")
return True
if re.match(r"^(?:places|list\s+places|what\s+places|show\s+(?:places|locations)|known\s+places)$", cmd, re.I):
print(places_list_str())
return True
# ── ODOMETRY MOVEMENT ────────────────────────────────────────────────
m = _RE_WALK_DIST.match(cmd)
if m:
meters = float(m.group(1))
if odom:
odom.walk_distance(meters)
else:
vx, _, _ = MOVE_MAP["forward"]
t0 = time.time()
while time.time() - t0 < meters / abs(vx):
if motion_abort.is_set(): break
send_vel(vx=vx)
time.sleep(0.05)
gradual_stop()
return True
m = _RE_WALK_BACK.match(cmd)
if m:
meters = float(m.group(1))
if odom:
odom.walk_distance(meters, direction="backward")
else:
vx, _, _ = MOVE_MAP["backward"]
t0 = time.time()
while time.time() - t0 < meters / abs(vx):
if motion_abort.is_set(): break
send_vel(vx=vx)
time.sleep(0.05)
gradual_stop()
return True
m = _RE_TURN_DEG.match(cmd)
if m:
direction = m.group(1)
degrees = float(m.group(2))
if direction and direction.lower() == "right":
degrees = -degrees
if odom:
odom.turn_degrees(degrees)
else:
# vyaw magnitude comes from MOVE_MAP["left"]; duration is
# abs(degrees)/(vyaw_deg_per_sec). vyaw in config is rad/s.
_, _, vyaw_mag = MOVE_MAP["left"]
vyaw_deg_per_sec = abs(vyaw_mag) * 180.0 / 3.14159265
vyaw = vyaw_mag if degrees > 0 else -vyaw_mag
duration = abs(degrees) / vyaw_deg_per_sec
t0 = time.time()
while time.time() - t0 < duration:
if motion_abort.is_set(): break
send_vel(vyaw=vyaw)
time.sleep(0.05)
gradual_stop()
return True
m = _RE_WALK_STEP.match(cmd)
if m:
direction = (m.group(1) or "forward").lower()
if direction.startswith("back"):
direction = "backward"
# Step count may be fractional ('walk 3.5 steps' / 'walk half
# a step' after fraction normalization). Convert to float for
# the duration calc; STEP_DURATION_SEC × 3.5 = 7s of motion.
steps = float(m.group(2))
vx, _, _ = MOVE_MAP[direction]
duration = STEP_DURATION_SEC * steps
t0 = time.time()
while time.time() - t0 < duration:
if motion_abort.is_set(): break
wait_while_paused()
if motion_abort.is_set(): break
send_vel(vx=vx)
time.sleep(0.05)
gradual_stop()
return True
m = _RE_TURN_STEP.match(cmd)
if m:
direction = m.group(1).lower()
# Fractional step count for sub-step turns (e.g. 'turn left 0.5
# steps' from 'turn left half a step').
steps = float(m.group(2)) if m.group(2) else 1.0
_, _, vyaw = MOVE_MAP[direction]
duration = STEP_DURATION_SEC * steps
t0 = time.time()
while time.time() - t0 < duration:
if motion_abort.is_set(): break
wait_while_paused()
if motion_abort.is_set(): break
send_vel(vyaw=vyaw)
time.sleep(0.05)
gradual_stop()
return True
# ── BARE / SIMPLE DIRECTIONAL COMMANDS ───────────────────────────────
# "left", "right", "forward", "back", "go back", "move forward",
# "step right", "walk back", "head forward" — any one-word direction
# or verb+direction with no explicit count. Duration and velocities
# come entirely from config_Navigation.json (MOVE_MAP +
# step_duration_sec) — no magic numbers here.
m = _RE_SIMPLE_DIR.match(cmd)
if m:
direction = m.group(1).lower()
if direction.startswith("back"):
direction = "backward"
vx, vy, vyaw = MOVE_MAP[direction]
t0 = time.time()
while time.time() - t0 < STEP_DURATION_SEC:
if motion_abort.is_set(): break
wait_while_paused()
if motion_abort.is_set(): break
send_vel(vx=vx, vy=vy, vyaw=vyaw)
time.sleep(0.05)
gradual_stop()
return True
# ── BARE STOP / PAUSE ─────────────────────────────────────────────────
m = _RE_STOP_SIMPLE.match(cmd)
if m:
gradual_stop()
return True
# ── DIRECT-MOTION CANONICALS (no LLaVA fallback) ────────────────────
# These canonicals were previously falling through to ask(cmd, img) →
# Ollama → 530s blocking call. We handle them deterministically here.
# Bug #3 in the 14-bug audit; see README of that fix for rationale.
# 'turn around' = 180° turn. Synthesize the brain call as the
# parametric form so we reuse the existing _RE_TURN_DEG path
# (which has motion_abort / motion_pause / odom support).
if _RE_TURN_AROUND.match(cmd):
return try_local_command("turn 180 degrees")
# 'wave hello' / 'wave' → arm gesture via do_arm(). The arm_api
# is currently a stub (arm_available=false) so this prints a clear
# message rather than calling Ollama. Once GR00T is wired the gesture
# will fire automatically — no change here needed.
if _RE_WAVE_HELLO.match(cmd):
from API.arm_api import do_arm
do_arm("wave")
return True
if _RE_RAISE_ARM.match(cmd):
from API.arm_api import do_arm
do_arm("raise_right") # default to right; alias is in config_Arm.json
return True
if _RE_LOWER_ARM.match(cmd):
from API.arm_api import do_arm
do_arm("lower")
return True
if _RE_POINT_GESTURE.match(cmd):
from API.arm_api import do_arm
# 'point at X' has no canonical gesture in the arm vocabulary —
# closest match is 'right_up' which extends the right arm forward.
# Use 'high_wave' (alias=wave) only if 'right_up' isn't registered.
from API.arm_api import ARM_ALIASES, ARM_ACTIONS
for choice in ("right_up", "raise_right", "high_wave", "wave"):
if choice in ARM_ALIASES or choice in ARM_ACTIONS:
do_arm(choice)
break
else:
print(" [Arm] no point/raise gesture configured — skipping")
return True
# 'sit down' / 'stand up' need posture commands the SDK doesn't
# currently expose through marcus's ZMQ velocity bridge. Print a
# clear log line and gradual_stop (safety) instead of taking 20s
# to have Qwen invent something. When G1 SportClient.StandUp() /
# Sit() are wired into a posture_api, replace these with real calls.
if _RE_SIT_DOWN.match(cmd):
gradual_stop()
print(" [Posture] sit_down not yet wired — gradual_stop only")
return True
if _RE_STAND_UP.match(cmd):
gradual_stop()
print(" [Posture] stand_up not yet wired — gradual_stop only")
return True
# 'stay here' / 'stay in place' — explicit hold. Just stops; the
# robot maintains its standing posture by SDK default.
if _RE_STAY_HERE.match(cmd):
gradual_stop()
print(" [Posture] holding position")
return True
# ── LOOK AROUND ───────────────────────────────────────────────────────
# Scan left ~45° → centre → right ~90° → centre. Each leg goes through
# execute_action which polls motion_abort, so 'stop' mid-scan halts
# cleanly. No-op fallback if MOVE_MAP doesn't define left/right (shouldn't
# happen on a real config but keep us defensive).
if _RE_LOOK_AROUND.match(cmd):
from Brain.executor import execute_action # late import — avoids
# circular at module load
try:
execute_action("left", LOOK_AROUND_TURN_SEC)
if motion_abort.is_set(): return True
time.sleep(LOOK_AROUND_HOLD_SEC)
if motion_abort.is_set(): return True
execute_action("right", LOOK_AROUND_TURN_SEC * 2)
if motion_abort.is_set(): return True
time.sleep(LOOK_AROUND_HOLD_SEC)
if motion_abort.is_set(): return True
execute_action("left", LOOK_AROUND_TURN_SEC)
except Exception as e:
print(f" [LookAround] failed: {e}")
return True
# ── FOLLOW ME ─────────────────────────────────────────────────────────
# Watch for a person in the camera frame; when one is detected and not
# already too close, advance forward in a short burst. If the person is
# too close, hold. If no person seen, hold and re-poll. Bounded by
# FOLLOW_ME_MAX_SEC and motion_abort. yolo_api uses fallback stubs when
# YOLO isn't loaded — in that case we give up immediately and tell the
# operator (no person tracking = no point walking blindly forward).
if _RE_FOLLOW_ME.match(cmd):
from Brain.executor import move_step
try:
from API.yolo_api import yolo_sees, yolo_person_too_close
except Exception as e:
print(f" [FollowMe] yolo_api unavailable: {e}")
return True
# Probe for actual YOLO availability — both the stub `yolo_sees`
# and the stub `yolo_person_too_close` always return False, so a
# follow_me run with no real YOLO would just hold forever.
try:
from API.yolo_api import _stub_sees as _stub_sees_ref # type: ignore
stubbed = (yolo_sees is _stub_sees_ref)
except Exception:
stubbed = False
if stubbed:
print(" [FollowMe] YOLO not loaded — cannot track persons. "
"Skipping. (init_yolo() at startup to enable.)")
return True
print(f" [FollowMe] tracking up to {FOLLOW_ME_MAX_SEC:.0f}s — "
"say 'stop' to end")
deadline = time.time() + FOLLOW_ME_MAX_SEC
last_log = 0.0
while time.time() < deadline:
if motion_abort.is_set():
print(" [FollowMe] motion_abort set — ending")
gradual_stop()
break
try:
see_person = bool(yolo_sees("person"))
except Exception:
see_person = False
try:
too_close = bool(yolo_person_too_close())
except Exception:
too_close = False
now = time.time()
if now - last_log > 2.0:
state = ("person too close" if (see_person and too_close)
else "tracking" if see_person
else "no person")
print(f" [FollowMe] {state}")
last_log = now
if see_person and not too_close:
move_step("forward", FOLLOW_ME_STEP_SEC)
else:
gradual_stop()
time.sleep(FOLLOW_ME_POLL_SEC)
gradual_stop()
print(" [FollowMe] done")
return True
# ── WALK TO TARGET (spatial planner) ─────────────────────────────────
# LLaVA-grounded approach to an arbitrary visible object. This is a
# best-effort planner — accuracy depends entirely on the VLM. Loop:
# 1. Capture a frame, ask LLaVA: "Is <target> visible? bearing
# (left/center/right)? distance (near/medium/far)?"
# 2. If not visible: scan left/right small turn, retry up to N times.
# 3. If visible: turn-toward (small or wide based on bearing) then
# walk forward for WALK_TO_STEP_SEC.
# 4. Done when distance == 'near' OR motion_abort OR timeout.
# All motion goes through move_step / execute_action which already
# poll motion_abort and motion_pause, so 'stop'/'pause' work normally.
m = _RE_WALK_TO.match(cmd)
if m:
target = m.group(1).strip().rstrip(".!?,").strip()
if not target:
return True
# If the target is a known place name, defer to place_goto —
# the user probably meant the saved place, not the visible
# object with that name. (Belt-and-braces with _RE_GOTO above.)
if mem and target.lower() in {p.lower() for p in (mem.list_places() or [])}:
place_goto(target)
return True
from Brain.executor import move_step, execute_action
print(f" [WalkTo] target = {target!r}; max {WALK_TO_MAX_SEC:.0f}s")
deadline = time.time() + WALK_TO_MAX_SEC
scan_attempts_left = 4 # how many turns before giving up if unseen
last_replan = 0.0
# Kill-switch on the LLaVA retry loop. Without this, an unresponsive
# Ollama (server crashed / model OOM / network wedged) makes the
# planner grind for the full WALK_TO_MAX_SEC, hammering with retries
# every 0.5 s and producing nothing but noise. Bail after a small
# consecutive-timeout count so the user gets a fast 'unable to plan'
# rather than a 45 s silent wait. Field-tested cap.
consecutive_llava_timeouts = 0
LLAVA_TIMEOUT_GIVE_UP = 3
while time.time() < deadline:
if motion_abort.is_set():
print(" [WalkTo] motion_abort set — ending")
gradual_stop()
break
# Re-plan via LLaVA at each cycle. Caching one query per
# WALK_TO_REPLAN_SEC keeps token cost in check.
now = time.time()
if now - last_replan < WALK_TO_REPLAN_SEC and last_replan > 0:
time.sleep(0.1)
continue
last_replan = now
img_b64 = get_frame()
if not img_b64:
print(" [WalkTo] no camera frame — pausing")
time.sleep(0.5)
continue
prompt = (
f"You are guiding a robot toward the {target!r}. Look at "
f"the image. Answer ONLY in JSON, no other text:\n"
f"{{\"visible\": true|false, \"bearing\": \"left|center|right\", "
f"\"distance\": \"near|medium|far\", \"reason\": \"<brief>\"}}\n"
f"- 'visible' = whether the {target!r} is in the frame\n"
f"- 'bearing' = which side of the frame it's on\n"
f"- 'distance' = rough estimate based on apparent size\n"
f"- 'near' means within ~1 metre (almost stopping distance)"
)
# Wrap LLaVA in a thread-timeout: ollama-python's default
# HTTP timeout is ~5 minutes, which would let a stalled VLM
# block the entire motion worker far longer than the watchdog
# would tolerate. WALK_TO_LLAVA_TIMEOUT_SEC = 15s gives the
# VLM enough room to answer at idle while keeping abort
# responsive (next iteration re-checks motion_abort).
if motion_abort.is_set(): break
raw = _call_llava_with_timeout(prompt, img_b64, num_predict=120)
if raw is None:
consecutive_llava_timeouts += 1
print(f" [WalkTo] LLaVA timeout/error "
f"({consecutive_llava_timeouts}/{LLAVA_TIMEOUT_GIVE_UP}) "
f"— retrying")
if consecutive_llava_timeouts >= LLAVA_TIMEOUT_GIVE_UP:
print(f" [WalkTo] LLaVA unresponsive after "
f"{LLAVA_TIMEOUT_GIVE_UP} timeouts — giving up. "
f"Check Ollama (curl http://localhost:11434/api/version).")
gradual_stop()
return True
time.sleep(0.5)
continue
consecutive_llava_timeouts = 0
obs = parse_json(raw) or {}
visible = bool(obs.get("visible"))
bearing = (obs.get("bearing") or "").lower()
distance = (obs.get("distance") or "").lower()
reason = obs.get("reason", "")
print(f" [WalkTo] visible={visible} bearing={bearing} "
f"distance={distance} ({reason[:60]})")
if motion_abort.is_set(): break
if not visible:
if scan_attempts_left <= 0:
print(f" [WalkTo] {target!r} not found after scans — "
"giving up")
break
scan_attempts_left -= 1
# Small turn left to widen the search arc; next iteration
# the LLaVA call will see the new view.
execute_action("left", WALK_TO_TURN_SHORT_SEC)
continue
scan_attempts_left = 4 # reset once we see it again
# Reached: stop and report
if distance == "near":
gradual_stop()
print(f" [WalkTo] arrived at {target!r}")
return True
# Steer toward target
if bearing == "left":
turn_sec = (WALK_TO_TURN_WIDE_SEC if distance == "far"
else WALK_TO_TURN_SHORT_SEC)
execute_action("left", turn_sec)
elif bearing == "right":
turn_sec = (WALK_TO_TURN_WIDE_SEC if distance == "far"
else WALK_TO_TURN_SHORT_SEC)
execute_action("right", turn_sec)
# bearing == 'center' or unknown → just walk forward
if motion_abort.is_set(): break
# Forward burst
move_step("forward", WALK_TO_STEP_SEC)
gradual_stop()
return True
# ── NAMED PATROL ROUTE ───────────────────────────────────────────────
m = _RE_PATROL_RT.match(cmd)
if m:
raw_route = m.group(1)
names = re.split(r"[→,;]+|\s{2,}", raw_route)
names = [n.strip() for n in names if n.strip()]
if not names:
print(" Usage: patrol: door → desk → exit")
return True
if not mem:
print(" [Places] Memory not available")
return True
waypoints, missing = [], []
for name in names:
place = mem.get_place(name)
if place is None:
missing.append(name)
elif not place.get("has_odom"):
print(f" [Places] '{name}' has no coordinates — skipping")
else:
waypoints.append({"x": place["x"], "y": place["y"], "heading": place["heading"], "name": name})
if missing:
print(f" [Places] Unknown places: {', '.join(missing)}")
if not waypoints:
print(" [Places] No valid waypoints — patrol cancelled")
return True
if odom:
print(f" [Places] Named patrol: {''.join(n['name'] for n in waypoints)}")
odom.patrol_route(waypoints)
else:
print(" [Places] Odometry not running")
return True
# ── SESSION MEMORY RECALL ────────────────────────────────────────────
if _RE_LAST_CMD.match(cmd):
if mem:
last = mem.get_last_command()
print(f" Last command: '{last}'" if last else " No commands logged yet")
else:
print(" Memory not available")
return True
if _RE_UNDO.match(cmd):
if not mem:
print(" Memory not available — cannot undo")
return True
recent = mem.get_last_n_commands(5)
move_words = {"turn right": ("left", 1), "turn left": ("right", -1),
"walk forward": ("backward", 1), "move forward": ("backward", 1),
"move back": ("forward", 1), "walk backward": ("forward", 1)}
for c in reversed(recent):
cl = c.lower()
for phrase, (reverse_dir, _) in move_words.items():
if phrase in cl:
print(f" Undoing: '{c}' → reversing with '{reverse_dir}'")
vx, vy, vyaw = MOVE_MAP[reverse_dir]
t0 = time.time()
while time.time() - t0 < STEP_DURATION_SEC:
send_vel(vx=vx, vy=vy, vyaw=vyaw)
time.sleep(0.05)
gradual_stop()
return True
print(" No movement command to undo")
return True
if _RE_DO_AGAIN.match(cmd):
if not mem:
print(" Memory not available — cannot repeat")
return True
recent = mem.get_last_n_commands(5)
repeat = None
for c in reversed(recent):
if not _RE_DO_AGAIN.match(c) and not _RE_LAST_CMD.match(c):
repeat = c
break
if repeat:
print(f" Repeating: '{repeat}'")
if try_local_command(repeat):
return True
# Not a local command — send directly to LLaVA
print("Thinking...")
img = get_frame()
if img:
d = ask(repeat, img)
execute(d)
return True
else:
print(" No previous command to repeat")
return True
if _RE_LAST_SESS.match(cmd):
if mem:
print(mem.last_session_summary())
else:
print(" Memory not available")
return True
if _RE_SESSION_SUMMARY.match(cmd):
if mem:
print(f" Session: {mem._session_id}")
print(f" Duration: {mem.session_duration_str()}")
print(f" Commands: {mem.commands_count()}")
print(f" Places: {mem.places_count()}")
detections = mem.get_session_detections()
classes = {d.get("class") for d in detections}
print(f" Detected: {', '.join(classes) if classes else 'nothing yet'}")
else:
print(" Memory not available")
return True
if _RE_WHERE.match(cmd):
if odom and ODOM_AVAILABLE:
print(f" Position: {odom.status_str()}")
print(f" Distance from start: {odom.get_distance_from_start():.2f}m")
else:
print(" Odometry not running — position unknown")
return True
if _RE_GO_HOME.match(cmd):
if odom and ODOM_AVAILABLE:
odom.return_to_start()
else:
print(" Odometry not running — cannot navigate home")
return True
# ── AUTONOMOUS MODE ──────────────────────────────────────────────────
m = _RE_AUTO.match(cmd)
if m:
subcmd = m.group(1).lower()
if _auto is None:
print(" [Auto] Autonomous mode not initialized")
return True
if subcmd == "on":
_auto.enable()
elif subcmd == "off":
_auto.disable()
elif subcmd == "status":
_auto.status()
elif subcmd == "save":
_auto.save_snapshot()
elif subcmd == "summary":
if _auto.is_enabled():
_auto.status()
else:
print(" [Auto] Not running — use 'auto on' to start")
return True
# ── LIDAR STATUS ─────────────────────────────────────────────────────
if re.match(r"^(?:lidar|lidar\s+status|slam\s+status)$", cmd, re.I):
try:
from API.lidar_api import LIDAR_AVAILABLE, get_lidar_status
if not LIDAR_AVAILABLE:
print(" LiDAR: not available")
else:
s = get_lidar_status()
print(f" LiDAR: {s['mode']} | loc: {s['loc_state']} | "
f"frame age: {s['last_frame_age']}s")
if s.get("pose"):
p = s["pose"]
print(f" SLAM pose: x={p['x']:.2f} y={p['y']:.2f} h={p['heading']:.1f}")
safety = s.get("safety", {})
if safety.get("emergency"):
print(f" EMERGENCY: {safety.get('reasons', [])}")
perf = s.get("perf", {})
if perf:
print(f" FPS: {perf.get('input_fps', 0):.0f} in / "
f"ICP: {perf.get('icp_ms', 0):.1f}ms / "
f"CPU: {perf.get('cpu_percent', 0):.0f}%")
except ImportError:
print(" LiDAR: module not loaded")
return True
# ── HELP / EXAMPLES ──────────────────────────────────────────────────
if re.match(r"^(?:help[/]|help|commands|menu|[?][/]|[?])$", cmd, re.I):
_print_help()
return True
if re.match(r"^(?:example[/]|examples[/]|ex[/]|example|examples|ex|show examples)$", cmd, re.I):
_print_examples()
return True
return False
def _print_help():
print("""
MARCUS — COMMAND HELP
Movement: turn left/right, walk forward/back, walk 1 meter, turn 90 degrees
Vision: what do you see, yolo
Goals: goal/ stop when you see a person
Places: remember this as door, go to door, places, forget door
Patrol: patrol, patrol: door → desk → exit
Session: last command, do that again, last session, session summary
Search: search/ /path/to/photo.jpg [hint], search/ person in blue
Auto: auto on, auto off, auto status, auto save, auto summary
LiDAR: lidar, lidar status
System: help, example, yolo, q""")
def _print_examples():
print("""
MARCUS — USAGE EXAMPLES
turn left | turn right 90 degrees | walk forward | walk 1 meter
what do you see | describe what is in front of you
goal/ stop when you see a person | goal/ stop when you see a laptop
remember this as door | go to door | places | forget door
patrol | patrol: door → desk → window
last command | do that again | last session | session summary""")