""" command_parser.py — Local command regex patterns + dispatcher Handles place memory, odometry, session recall, help, examples """ import re import threading import time from API.zmq_api import send_vel, gradual_stop, MOVE_MAP, STEP_DURATION_SEC from API.memory_api import mem, place_save, place_goto, places_list_str from API.odometry_api import odom, ODOM_AVAILABLE from API.camera_api import get_frame from API.llava_api import ask, call_llava, parse_json from Brain.executor import execute from Core.motion_state import motion_abort, wait_while_paused from Core.motion_log import log_motion, print_motion # ── Compiled patterns ──────────────────────────────────────────────────────── _RE_REMEMBER = re.compile( r"^(?:remember|save|mark|call|name|label)\s+(?:this|here|current|position)?\s*as\s+(.+)$", re.I) _RE_GOTO = re.compile( r"^(?:go\s+to|navigate\s+to|take\s+me\s+to|move\s+to|return\s+to|head\s+to)\s+(.+)$", re.I) _RE_FORGET = re.compile( r"^(?:forget|delete|remove)\s+(?:place\s+)?(.+)$", re.I) _RE_RENAME = re.compile( r"^rename\s+(.+?)\s+(?:to|as)\s+(.+)$", re.I) _RE_WALK_DIST = re.compile( r"^(?:walk|go|move)\s+(?:forward\s+)?(\d+(?:\.\d+)?)\s*m(?:eter(?:s)?)?$", re.I) _RE_WALK_BACK = re.compile( r"^(?:walk|go|move)\s+backward?\s+(\d+(?:\.\d+)?)\s*m(?:eter(?:s)?)?$", re.I) _RE_TURN_DEG = re.compile( r"^turn\s+(?:(left|right)\s+)?(\d+(?:\.\d+)?)\s*deg(?:ree(?:s)?)?$", re.I) # Step-based motion: "walk 1 step", "walk forward 2 steps", "move back 1 step", # "turn left", "turn right 2 steps". Kept local so these never fall through to # the VLM — on the Jetson the cold-load is 60-90 s and we don't want to pay # that for a trivial two-second motion. One step = 2 s of motion at the default # velocity, matching the undo-loop duration already used below. _RE_WALK_STEP = re.compile( r"^(?:walk|go|move|step)(?:\s+(forward|back(?:ward)?))?\s+(\d+(?:\.\d+)?)\s*steps?$", re.I) _RE_TURN_STEP = re.compile( r"^turn\s+(left|right)(?:\s+(\d+(?:\.\d+)?)\s*steps?)?$", re.I) # Simple one-shot motion — one word or verb+direction, no counts/units. # All default to a ~2 s motion at the normal velocity. Kept local so the # user doesn't eat a 5 s Qwen round-trip for a trivial "go back". # # Matches: # "left" / "right" / "forward" / "back" / "backward" # "go back" / "step back" / "move back" / "walk back" / "run back" # "go forward" / "step forward" / "move forward" / "walk forward" # "go left" / "move right" / "step left" / etc. # "head forward" / "head back" # Does NOT match multi-word phrases like "walk to the chair" — those # still fall through to Qwen where they belong. _RE_SIMPLE_DIR = re.compile( r"^(?:(?:walk|go|move|step|run|head)\s+)?" r"(forward|back(?:ward)?|left|right)$", re.I, ) # Bare stop / pause words — no need to ask Qwen what "stop" means. _RE_STOP_SIMPLE = re.compile( r"^(?:stop|halt|wait|pause|stay|freeze|hold|stand\s+still|don'?t\s+move)$", re.I, ) _RE_PATROL_RT = re.compile( r"^patrol[/:]\s*(.+)$", re.I) _RE_LAST_CMD = re.compile( r"^(?:last\s+command|what\s+did\s+i\s+(?:say|type)\s+last|repeat\s+last)$", re.I) _RE_DO_AGAIN = re.compile( r"^(?:do\s+that\s+again|repeat|again|redo)$", re.I) _RE_UNDO = re.compile( r"^(?:undo|go\s+back\s+(?:to\s+)?(?:where|from\s+where)\s+(?:you|i)\s+(?:started|were|came)|reverse\s+last|turn\s+back\s+from).*$", re.I) _RE_LAST_SESS = re.compile( r"^(?:last\s+session|what\s+(?:did\s+you\s+do|happened)\s+last\s+(?:session|time)|previous\s+session)$", re.I) _RE_WHERE = re.compile( r"^(?:where\s+am\s+i|current\s+position|my\s+(?:location|position)|position)$", re.I) _RE_GO_HOME = re.compile( r"^(?:go\s+home|return\s+to\s+start|come\s+back\s+home|go\s+back\s+to\s+start)$", re.I) _RE_SESSION_SUMMARY = re.compile( r"^(?:session\s+summary|what\s+happened\s+today|session\s+report)$", re.I) _RE_AUTO = re.compile( r"^auto(?:nomous)?\s+(on|off|status|save|summary)$", re.I) # ── DIRECT-MOTION CANONICALS ───────────────────────────────────────────── # Canonicals that the dispatcher emits but were previously falling through # to the LLaVA/Qwen fallback (5–30s blocking call on Jetson). Each maps # to a fast, deterministic motion or arm gesture. See Bug #3 in the # 14-bug audit. _RE_TURN_AROUND = re.compile(r"^turn[\s_-]+around$", re.I) _RE_SIT_DOWN = re.compile(r"^sit[\s_-]+down$", re.I) _RE_STAND_UP = re.compile(r"^stand[\s_-]+up$", re.I) _RE_WAVE_HELLO = re.compile(r"^wave(?:[\s_-]+(?:hello|hi))?$", re.I) _RE_RAISE_ARM = re.compile(r"^raise[\s_-]+(?:right[\s_-]+)?arm$", re.I) _RE_LOWER_ARM = re.compile(r"^lower[\s_-]+(?:right[\s_-]+)?arm$", re.I) _RE_STAY_HERE = re.compile(r"^stay[\s_-]+(?:here|in[\s_-]+place)$", re.I) _RE_POINT_GESTURE = re.compile(r"^point(?:[\s_-]+(?:at|to)[\s_-]+.*)?$", re.I) # Note: 'move left' / 'move right' canonicals are handled by the existing # _RE_SIMPLE_DIR (verb=move, direction=left/right via MOVE_MAP). MOVE_MAP # in config_Navigation.json defines 'left'/'right' as ROTATIONS, not # strafes — historical decision; if real strafes are wanted later add # vy axis to MOVE_MAP and a separate strafe handler. # ── HIGHER-LEVEL MOTIONS ───────────────────────────────────────────────────── # 'look around' = scan-left + return + scan-right + return; uses execute_action # so motion_abort halts it cleanly. _RE_LOOK_AROUND = re.compile( r"^(?:look[\s_-]+around|scan[\s_-]+(?:the\s+)?(?:room|area)|survey[\s_-]+(?:the\s+)?surroundings?)$", re.I, ) # 'follow me' = walk forward when YOLO sees a person and they aren't too close, # else hold. Loop bounded by FOLLOW_ME_MAX_SEC and motion_abort. _RE_FOLLOW_ME = re.compile( r"^(?:follow[\s_-]+me|come[\s_-]+with[\s_-]+me|stick[\s_-]+with[\s_-]+me)$", re.I, ) # Spatial planner: 'walk to/towards the '. Distinct from _RE_GOTO # above because that one is reserved for place-memory names ('go to # kitchen' looks up the saved 'kitchen' position). This planner uses # LLaVA scene grounding to actually steer toward an arbitrary visible # object — used for unsaved targets like 'the door' or 'the red chair'. # Verbs: walk / step + to/towards/over to. We do NOT include 'go to' # here to avoid colliding with place memory. _RE_WALK_TO = re.compile( r"^(?:(?:walk|step)\s+(?:over\s+)?(?:to|towards?|up\s+to)|approach)\s+" r"(?:the\s+|a\s+|an\s+|that\s+|this\s+)?(.+?)$", re.I, ) # Tunable durations for higher-level motions. Kept inline (not config) so a # hot-edit doesn't risk an import-time crash; tweak if the demo asks for # longer scan / follow runs. LOOK_AROUND_TURN_SEC = 1.0 # ~45° at default vyaw LOOK_AROUND_HOLD_SEC = 0.4 # pause at each extreme so VLM can see FOLLOW_ME_MAX_SEC = 30.0 # hard upper bound on a 'follow me' session FOLLOW_ME_STEP_SEC = 0.5 # one forward burst per yolo-poll cycle FOLLOW_ME_POLL_SEC = 0.2 # how often to re-check yolo when no person # Spatial planner tunables. The planner asks LLaVA where the target is # (left/center/right + near/medium/far) every WALK_TO_REPLAN_SEC, then # turns to centre + walks forward in WALK_TO_STEP_SEC bursts. Total # session bounded by WALK_TO_MAX_SEC. WALK_TO_MAX_SEC = 45.0 WALK_TO_REPLAN_SEC = 2.5 # cadence of LLaVA scene queries WALK_TO_STEP_SEC = 1.0 # forward burst length between re-plans WALK_TO_TURN_SHORT_SEC = 0.4 # small correction (~18° at default vyaw) WALK_TO_TURN_WIDE_SEC = 1.0 # larger correction when target is far off WALK_TO_LLAVA_TIMEOUT_SEC = 15.0 # per-call deadline on the VLM scene query. # Without this, ollama-python's default # 5-min HTTP timeout would let a stalled # VLM block the entire motion worker # — and motion_abort can't break out of # an in-flight HTTP call. def _call_llava_with_timeout(prompt, img_b64, num_predict=120, timeout_sec=WALK_TO_LLAVA_TIMEOUT_SEC): """Run call_llava in a worker thread with a hard deadline. Returns the raw string response, or None on timeout/error. We cannot truly cancel the HTTP request (ollama-python doesn't expose a cancel handle), so the thread is left to finish on its own — but the planner moves on. The orphaned thread will exit when the HTTP eventually returns; we rely on it being a daemon so it doesn't block process exit. """ result = {"raw": None, "err": None} def _runner(): try: result["raw"] = call_llava(prompt, img_b64, num_predict=num_predict) except Exception as e: result["err"] = str(e) th = threading.Thread(target=_runner, daemon=True) th.start() th.join(timeout=timeout_sec) if th.is_alive(): return None # timeout if result["err"]: return None # error return result["raw"] # Autonomous mode instance — set by init_autonomous() _auto = None def init_autonomous(auto_instance): """Wire in the AutonomousMode instance from marcus_brain.""" global _auto _auto = auto_instance def try_local_command(cmd: str) -> bool: """ Handle local commands (place, odom, memory, help). Returns True if handled, False if not matched (send to LLaVA). """ # ── PLACE MEMORY ───────────────────────────────────────────────────── m = _RE_REMEMBER.match(cmd) if m: place_save(m.group(1).strip()) return True m = _RE_GOTO.match(cmd) if m: name = m.group(1).strip() if name.lower() in ("start", "home", "beginning"): if odom and ODOM_AVAILABLE: odom.return_to_start() else: print(" [Places] Odometry not running — cannot return to start") return True # Try saved-place memory first. If place_goto returns False # (no such place), fall through to the spatial planner — the # user probably meant a visible object like 'the door' that # was never saved as a place. Do this by rewriting the cmd # to the planner's input shape and re-dispatching through # try_local_command via _RE_WALK_TO below; we DON'T return # True here so the rest of try_local_command runs. if place_goto(name): return True # Re-target this same call through the spatial planner. We # mutate the local 'cmd' string, then fall through to the # _RE_WALK_TO branch later in this function (see further down). print(f" [Places] '{name}' not saved — trying spatial planner") cmd = "walk to " + name # Note: don't return — let control fall through to _RE_WALK_TO. m = _RE_FORGET.match(cmd) if m: if mem: mem.delete_place(m.group(1).strip()) else: print(" [Places] Memory not available") return True m = _RE_RENAME.match(cmd) if m: if mem: mem.rename_place(m.group(1).strip(), m.group(2).strip()) else: print(" [Places] Memory not available") return True if re.match(r"^(?:places|list\s+places|what\s+places|show\s+(?:places|locations)|known\s+places)$", cmd, re.I): print(places_list_str()) return True # ── ODOMETRY MOVEMENT ──────────────────────────────────────────────── m = _RE_WALK_DIST.match(cmd) if m: meters = float(m.group(1)) if odom: odom.walk_distance(meters) else: vx, _, _ = MOVE_MAP["forward"] t0 = time.time() while time.time() - t0 < meters / abs(vx): if motion_abort.is_set(): break send_vel(vx=vx) time.sleep(0.05) gradual_stop() return True m = _RE_WALK_BACK.match(cmd) if m: meters = float(m.group(1)) if odom: odom.walk_distance(meters, direction="backward") else: vx, _, _ = MOVE_MAP["backward"] t0 = time.time() while time.time() - t0 < meters / abs(vx): if motion_abort.is_set(): break send_vel(vx=vx) time.sleep(0.05) gradual_stop() return True m = _RE_TURN_DEG.match(cmd) if m: direction = m.group(1) degrees = float(m.group(2)) if direction and direction.lower() == "right": degrees = -degrees if odom: odom.turn_degrees(degrees) else: # vyaw magnitude comes from MOVE_MAP["left"]; duration is # abs(degrees)/(vyaw_deg_per_sec). vyaw in config is rad/s. _, _, vyaw_mag = MOVE_MAP["left"] vyaw_deg_per_sec = abs(vyaw_mag) * 180.0 / 3.14159265 vyaw = vyaw_mag if degrees > 0 else -vyaw_mag duration = abs(degrees) / vyaw_deg_per_sec t0 = time.time() while time.time() - t0 < duration: if motion_abort.is_set(): break send_vel(vyaw=vyaw) time.sleep(0.05) gradual_stop() return True m = _RE_WALK_STEP.match(cmd) if m: direction = (m.group(1) or "forward").lower() if direction.startswith("back"): direction = "backward" # Step count may be fractional ('walk 3.5 steps' / 'walk half # a step' after fraction normalization). Convert to float for # the duration calc; STEP_DURATION_SEC × 3.5 = 7s of motion. steps = float(m.group(2)) vx, _, _ = MOVE_MAP[direction] duration = STEP_DURATION_SEC * steps t0 = time.time() while time.time() - t0 < duration: if motion_abort.is_set(): break wait_while_paused() if motion_abort.is_set(): break send_vel(vx=vx) time.sleep(0.05) gradual_stop() return True m = _RE_TURN_STEP.match(cmd) if m: direction = m.group(1).lower() # Fractional step count for sub-step turns (e.g. 'turn left 0.5 # steps' from 'turn left half a step'). steps = float(m.group(2)) if m.group(2) else 1.0 _, _, vyaw = MOVE_MAP[direction] duration = STEP_DURATION_SEC * steps t0 = time.time() while time.time() - t0 < duration: if motion_abort.is_set(): break wait_while_paused() if motion_abort.is_set(): break send_vel(vyaw=vyaw) time.sleep(0.05) gradual_stop() return True # ── BARE / SIMPLE DIRECTIONAL COMMANDS ─────────────────────────────── # "left", "right", "forward", "back", "go back", "move forward", # "step right", "walk back", "head forward" — any one-word direction # or verb+direction with no explicit count. Duration and velocities # come entirely from config_Navigation.json (MOVE_MAP + # step_duration_sec) — no magic numbers here. m = _RE_SIMPLE_DIR.match(cmd) if m: direction = m.group(1).lower() if direction.startswith("back"): direction = "backward" vx, vy, vyaw = MOVE_MAP[direction] t0 = time.time() while time.time() - t0 < STEP_DURATION_SEC: if motion_abort.is_set(): break wait_while_paused() if motion_abort.is_set(): break send_vel(vx=vx, vy=vy, vyaw=vyaw) time.sleep(0.05) gradual_stop() return True # ── BARE STOP / PAUSE ───────────────────────────────────────────────── m = _RE_STOP_SIMPLE.match(cmd) if m: gradual_stop() return True # ── DIRECT-MOTION CANONICALS (no LLaVA fallback) ──────────────────── # These canonicals were previously falling through to ask(cmd, img) → # Ollama → 5–30s blocking call. We handle them deterministically here. # Bug #3 in the 14-bug audit; see README of that fix for rationale. # 'turn around' = 180° turn. Synthesize the brain call as the # parametric form so we reuse the existing _RE_TURN_DEG path # (which has motion_abort / motion_pause / odom support). if _RE_TURN_AROUND.match(cmd): return try_local_command("turn 180 degrees") # 'wave hello' / 'wave' → arm gesture via do_arm(). The arm_api # is currently a stub (arm_available=false) so this prints a clear # message rather than calling Ollama. Once GR00T is wired the gesture # will fire automatically — no change here needed. if _RE_WAVE_HELLO.match(cmd): from API.arm_api import do_arm do_arm("wave") return True if _RE_RAISE_ARM.match(cmd): from API.arm_api import do_arm do_arm("raise_right") # default to right; alias is in config_Arm.json return True if _RE_LOWER_ARM.match(cmd): from API.arm_api import do_arm do_arm("lower") return True if _RE_POINT_GESTURE.match(cmd): from API.arm_api import do_arm # 'point at X' has no canonical gesture in the arm vocabulary — # closest match is 'right_up' which extends the right arm forward. # Use 'high_wave' (alias=wave) only if 'right_up' isn't registered. from API.arm_api import ARM_ALIASES, ARM_ACTIONS for choice in ("right_up", "raise_right", "high_wave", "wave"): if choice in ARM_ALIASES or choice in ARM_ACTIONS: do_arm(choice) break else: print(" [Arm] no point/raise gesture configured — skipping") return True # 'sit down' / 'stand up' need posture commands the SDK doesn't # currently expose through marcus's ZMQ velocity bridge. Print a # clear log line and gradual_stop (safety) instead of taking 20s # to have Qwen invent something. When G1 SportClient.StandUp() / # Sit() are wired into a posture_api, replace these with real calls. if _RE_SIT_DOWN.match(cmd): gradual_stop() print(" [Posture] sit_down not yet wired — gradual_stop only") return True if _RE_STAND_UP.match(cmd): gradual_stop() print(" [Posture] stand_up not yet wired — gradual_stop only") return True # 'stay here' / 'stay in place' — explicit hold. Just stops; the # robot maintains its standing posture by SDK default. if _RE_STAY_HERE.match(cmd): gradual_stop() print(" [Posture] holding position") return True # ── LOOK AROUND ─────────────────────────────────────────────────────── # Scan left ~45° → centre → right ~90° → centre. Each leg goes through # execute_action which polls motion_abort, so 'stop' mid-scan halts # cleanly. No-op fallback if MOVE_MAP doesn't define left/right (shouldn't # happen on a real config but keep us defensive). if _RE_LOOK_AROUND.match(cmd): from Brain.executor import execute_action # late import — avoids # circular at module load try: execute_action("left", LOOK_AROUND_TURN_SEC) if motion_abort.is_set(): return True time.sleep(LOOK_AROUND_HOLD_SEC) if motion_abort.is_set(): return True execute_action("right", LOOK_AROUND_TURN_SEC * 2) if motion_abort.is_set(): return True time.sleep(LOOK_AROUND_HOLD_SEC) if motion_abort.is_set(): return True execute_action("left", LOOK_AROUND_TURN_SEC) except Exception as e: print(f" [LookAround] failed: {e}") return True # ── FOLLOW ME ───────────────────────────────────────────────────────── # Watch for a person in the camera frame; when one is detected and not # already too close, advance forward in a short burst. If the person is # too close, hold. If no person seen, hold and re-poll. Bounded by # FOLLOW_ME_MAX_SEC and motion_abort. yolo_api uses fallback stubs when # YOLO isn't loaded — in that case we give up immediately and tell the # operator (no person tracking = no point walking blindly forward). if _RE_FOLLOW_ME.match(cmd): from Brain.executor import move_step try: from API.yolo_api import yolo_sees, yolo_person_too_close except Exception as e: print(f" [FollowMe] yolo_api unavailable: {e}") return True # Probe for actual YOLO availability — both the stub `yolo_sees` # and the stub `yolo_person_too_close` always return False, so a # follow_me run with no real YOLO would just hold forever. try: from API.yolo_api import _stub_sees as _stub_sees_ref # type: ignore stubbed = (yolo_sees is _stub_sees_ref) except Exception: stubbed = False if stubbed: print(" [FollowMe] YOLO not loaded — cannot track persons. " "Skipping. (init_yolo() at startup to enable.)") return True print(f" [FollowMe] tracking up to {FOLLOW_ME_MAX_SEC:.0f}s — " "say 'stop' to end") deadline = time.time() + FOLLOW_ME_MAX_SEC last_log = 0.0 while time.time() < deadline: if motion_abort.is_set(): print(" [FollowMe] motion_abort set — ending") gradual_stop() break try: see_person = bool(yolo_sees("person")) except Exception: see_person = False try: too_close = bool(yolo_person_too_close()) except Exception: too_close = False now = time.time() if now - last_log > 2.0: state = ("person too close" if (see_person and too_close) else "tracking" if see_person else "no person") print(f" [FollowMe] {state}") last_log = now if see_person and not too_close: move_step("forward", FOLLOW_ME_STEP_SEC) else: gradual_stop() time.sleep(FOLLOW_ME_POLL_SEC) gradual_stop() print(" [FollowMe] done") return True # ── WALK TO TARGET (spatial planner) ───────────────────────────────── # LLaVA-grounded approach to an arbitrary visible object. This is a # best-effort planner — accuracy depends entirely on the VLM. Loop: # 1. Capture a frame, ask LLaVA: "Is visible? bearing # (left/center/right)? distance (near/medium/far)?" # 2. If not visible: scan left/right small turn, retry up to N times. # 3. If visible: turn-toward (small or wide based on bearing) then # walk forward for WALK_TO_STEP_SEC. # 4. Done when distance == 'near' OR motion_abort OR timeout. # All motion goes through move_step / execute_action which already # poll motion_abort and motion_pause, so 'stop'/'pause' work normally. m = _RE_WALK_TO.match(cmd) if m: target = m.group(1).strip().rstrip(".!?,").strip() if not target: return True # If the target is a known place name, defer to place_goto — # the user probably meant the saved place, not the visible # object with that name. (Belt-and-braces with _RE_GOTO above.) if mem and target.lower() in {p.lower() for p in (mem.list_places() or [])}: place_goto(target) return True from Brain.executor import move_step, execute_action print(f" [WalkTo] target = {target!r}; max {WALK_TO_MAX_SEC:.0f}s") deadline = time.time() + WALK_TO_MAX_SEC scan_attempts_left = 4 # how many turns before giving up if unseen last_replan = 0.0 # Kill-switch on the LLaVA retry loop. Without this, an unresponsive # Ollama (server crashed / model OOM / network wedged) makes the # planner grind for the full WALK_TO_MAX_SEC, hammering with retries # every 0.5 s and producing nothing but noise. Bail after a small # consecutive-timeout count so the user gets a fast 'unable to plan' # rather than a 45 s silent wait. Field-tested cap. consecutive_llava_timeouts = 0 LLAVA_TIMEOUT_GIVE_UP = 3 while time.time() < deadline: if motion_abort.is_set(): print(" [WalkTo] motion_abort set — ending") gradual_stop() break # Re-plan via LLaVA at each cycle. Caching one query per # WALK_TO_REPLAN_SEC keeps token cost in check. now = time.time() if now - last_replan < WALK_TO_REPLAN_SEC and last_replan > 0: time.sleep(0.1) continue last_replan = now img_b64 = get_frame() if not img_b64: print(" [WalkTo] no camera frame — pausing") time.sleep(0.5) continue prompt = ( f"You are guiding a robot toward the {target!r}. Look at " f"the image. Answer ONLY in JSON, no other text:\n" f"{{\"visible\": true|false, \"bearing\": \"left|center|right\", " f"\"distance\": \"near|medium|far\", \"reason\": \"\"}}\n" f"- 'visible' = whether the {target!r} is in the frame\n" f"- 'bearing' = which side of the frame it's on\n" f"- 'distance' = rough estimate based on apparent size\n" f"- 'near' means within ~1 metre (almost stopping distance)" ) # Wrap LLaVA in a thread-timeout: ollama-python's default # HTTP timeout is ~5 minutes, which would let a stalled VLM # block the entire motion worker far longer than the watchdog # would tolerate. WALK_TO_LLAVA_TIMEOUT_SEC = 15s gives the # VLM enough room to answer at idle while keeping abort # responsive (next iteration re-checks motion_abort). if motion_abort.is_set(): break raw = _call_llava_with_timeout(prompt, img_b64, num_predict=120) if raw is None: consecutive_llava_timeouts += 1 print(f" [WalkTo] LLaVA timeout/error " f"({consecutive_llava_timeouts}/{LLAVA_TIMEOUT_GIVE_UP}) " f"— retrying") if consecutive_llava_timeouts >= LLAVA_TIMEOUT_GIVE_UP: print(f" [WalkTo] LLaVA unresponsive after " f"{LLAVA_TIMEOUT_GIVE_UP} timeouts — giving up. " f"Check Ollama (curl http://localhost:11434/api/version).") gradual_stop() return True time.sleep(0.5) continue consecutive_llava_timeouts = 0 obs = parse_json(raw) or {} visible = bool(obs.get("visible")) bearing = (obs.get("bearing") or "").lower() distance = (obs.get("distance") or "").lower() reason = obs.get("reason", "") print(f" [WalkTo] visible={visible} bearing={bearing} " f"distance={distance} ({reason[:60]})") if motion_abort.is_set(): break if not visible: if scan_attempts_left <= 0: print(f" [WalkTo] {target!r} not found after scans — " "giving up") break scan_attempts_left -= 1 # Small turn left to widen the search arc; next iteration # the LLaVA call will see the new view. execute_action("left", WALK_TO_TURN_SHORT_SEC) continue scan_attempts_left = 4 # reset once we see it again # Reached: stop and report if distance == "near": gradual_stop() print(f" [WalkTo] arrived at {target!r}") return True # Steer toward target if bearing == "left": turn_sec = (WALK_TO_TURN_WIDE_SEC if distance == "far" else WALK_TO_TURN_SHORT_SEC) execute_action("left", turn_sec) elif bearing == "right": turn_sec = (WALK_TO_TURN_WIDE_SEC if distance == "far" else WALK_TO_TURN_SHORT_SEC) execute_action("right", turn_sec) # bearing == 'center' or unknown → just walk forward if motion_abort.is_set(): break # Forward burst move_step("forward", WALK_TO_STEP_SEC) gradual_stop() return True # ── NAMED PATROL ROUTE ─────────────────────────────────────────────── m = _RE_PATROL_RT.match(cmd) if m: raw_route = m.group(1) names = re.split(r"[→,;]+|\s{2,}", raw_route) names = [n.strip() for n in names if n.strip()] if not names: print(" Usage: patrol: door → desk → exit") return True if not mem: print(" [Places] Memory not available") return True waypoints, missing = [], [] for name in names: place = mem.get_place(name) if place is None: missing.append(name) elif not place.get("has_odom"): print(f" [Places] '{name}' has no coordinates — skipping") else: waypoints.append({"x": place["x"], "y": place["y"], "heading": place["heading"], "name": name}) if missing: print(f" [Places] Unknown places: {', '.join(missing)}") if not waypoints: print(" [Places] No valid waypoints — patrol cancelled") return True if odom: print(f" [Places] Named patrol: {' → '.join(n['name'] for n in waypoints)}") odom.patrol_route(waypoints) else: print(" [Places] Odometry not running") return True # ── SESSION MEMORY RECALL ──────────────────────────────────────────── if _RE_LAST_CMD.match(cmd): if mem: last = mem.get_last_command() print(f" Last command: '{last}'" if last else " No commands logged yet") else: print(" Memory not available") return True if _RE_UNDO.match(cmd): if not mem: print(" Memory not available — cannot undo") return True recent = mem.get_last_n_commands(5) move_words = {"turn right": ("left", 1), "turn left": ("right", -1), "walk forward": ("backward", 1), "move forward": ("backward", 1), "move back": ("forward", 1), "walk backward": ("forward", 1)} for c in reversed(recent): cl = c.lower() for phrase, (reverse_dir, _) in move_words.items(): if phrase in cl: print(f" Undoing: '{c}' → reversing with '{reverse_dir}'") vx, vy, vyaw = MOVE_MAP[reverse_dir] t0 = time.time() while time.time() - t0 < STEP_DURATION_SEC: send_vel(vx=vx, vy=vy, vyaw=vyaw) time.sleep(0.05) gradual_stop() return True print(" No movement command to undo") return True if _RE_DO_AGAIN.match(cmd): if not mem: print(" Memory not available — cannot repeat") return True recent = mem.get_last_n_commands(5) repeat = None for c in reversed(recent): if not _RE_DO_AGAIN.match(c) and not _RE_LAST_CMD.match(c): repeat = c break if repeat: print(f" Repeating: '{repeat}'") if try_local_command(repeat): return True # Not a local command — send directly to LLaVA print("Thinking...") img = get_frame() if img: d = ask(repeat, img) execute(d) return True else: print(" No previous command to repeat") return True if _RE_LAST_SESS.match(cmd): if mem: print(mem.last_session_summary()) else: print(" Memory not available") return True if _RE_SESSION_SUMMARY.match(cmd): if mem: print(f" Session: {mem._session_id}") print(f" Duration: {mem.session_duration_str()}") print(f" Commands: {mem.commands_count()}") print(f" Places: {mem.places_count()}") detections = mem.get_session_detections() classes = {d.get("class") for d in detections} print(f" Detected: {', '.join(classes) if classes else 'nothing yet'}") else: print(" Memory not available") return True if _RE_WHERE.match(cmd): if odom and ODOM_AVAILABLE: print(f" Position: {odom.status_str()}") print(f" Distance from start: {odom.get_distance_from_start():.2f}m") else: print(" Odometry not running — position unknown") return True if _RE_GO_HOME.match(cmd): if odom and ODOM_AVAILABLE: odom.return_to_start() else: print(" Odometry not running — cannot navigate home") return True # ── AUTONOMOUS MODE ────────────────────────────────────────────────── m = _RE_AUTO.match(cmd) if m: subcmd = m.group(1).lower() if _auto is None: print(" [Auto] Autonomous mode not initialized") return True if subcmd == "on": _auto.enable() elif subcmd == "off": _auto.disable() elif subcmd == "status": _auto.status() elif subcmd == "save": _auto.save_snapshot() elif subcmd == "summary": if _auto.is_enabled(): _auto.status() else: print(" [Auto] Not running — use 'auto on' to start") return True # ── LIDAR STATUS ───────────────────────────────────────────────────── if re.match(r"^(?:lidar|lidar\s+status|slam\s+status)$", cmd, re.I): try: from API.lidar_api import LIDAR_AVAILABLE, get_lidar_status if not LIDAR_AVAILABLE: print(" LiDAR: not available") else: s = get_lidar_status() print(f" LiDAR: {s['mode']} | loc: {s['loc_state']} | " f"frame age: {s['last_frame_age']}s") if s.get("pose"): p = s["pose"] print(f" SLAM pose: x={p['x']:.2f} y={p['y']:.2f} h={p['heading']:.1f}") safety = s.get("safety", {}) if safety.get("emergency"): print(f" EMERGENCY: {safety.get('reasons', [])}") perf = s.get("perf", {}) if perf: print(f" FPS: {perf.get('input_fps', 0):.0f} in / " f"ICP: {perf.get('icp_ms', 0):.1f}ms / " f"CPU: {perf.get('cpu_percent', 0):.0f}%") except ImportError: print(" LiDAR: module not loaded") return True # ── HELP / EXAMPLES ────────────────────────────────────────────────── if re.match(r"^(?:help[/]|help|commands|menu|[?][/]|[?])$", cmd, re.I): _print_help() return True if re.match(r"^(?:example[/]|examples[/]|ex[/]|example|examples|ex|show examples)$", cmd, re.I): _print_examples() return True return False def _print_help(): print(""" MARCUS — COMMAND HELP Movement: turn left/right, walk forward/back, walk 1 meter, turn 90 degrees Vision: what do you see, yolo Goals: goal/ stop when you see a person Places: remember this as door, go to door, places, forget door Patrol: patrol, patrol: door → desk → exit Session: last command, do that again, last session, session summary Search: search/ /path/to/photo.jpg [hint], search/ person in blue Auto: auto on, auto off, auto status, auto save, auto summary LiDAR: lidar, lidar status System: help, example, yolo, q""") def _print_examples(): print(""" MARCUS — USAGE EXAMPLES turn left | turn right 90 degrees | walk forward | walk 1 meter what do you see | describe what is in front of you goal/ stop when you see a person | goal/ stop when you see a laptop remember this as door | go to door | places | forget door patrol | patrol: door → desk → window last command | do that again | last session | session summary""")