Marcus/Navigation/goal_nav.py

181 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
goal_nav.py — Goal-oriented navigation using YOLO fast-check + LLaVA fallback
"""
import re
import time
import threading
from API.zmq_api import send_vel, gradual_stop
from API.camera_api import get_frame
from API.yolo_api import yolo_sees, yolo_closest, yolo_all_classes
from API.llava_api import call_llava, ask_goal, OLLAMA_MODEL
from API.memory_api import log_detection
from Core.config_loader import load_config
_cfg = load_config("Navigation")
GOAL_ALIASES = _cfg["goal_aliases"]
YOLO_CLASSES = _cfg["yolo_goal_classes"]
MAX_STEPS = _cfg["max_steps"]
MIN_STEPS = _cfg["min_steps_before_check"]
SCAN_INTERVAL = _cfg["scan_interval_s"]
ROTATION_SPEED = _cfg["rotation_speed"]
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _goal_yolo_target(goal: str):
"""Map a natural-language goal to a YOLO class name, or None."""
gl = goal.lower()
for alias, cls in GOAL_ALIASES.items():
if alias in gl:
return cls
for cls in YOLO_CLASSES:
if cls in gl:
return cls
return None
def _extract_extra_condition(goal: str, yolo_target: str):
"""
Pull a compound condition out of a goal string.
e.g. 'find a person wearing a red shirt' -> 'wearing a red shirt'
Returns the extra condition text, or None if there is no qualifier.
"""
gl = goal.lower()
# strip the core target noun from the goal to isolate the qualifier
if yolo_target:
# remove everything up to and including the target noun
pattern = re.compile(
r"(?:find|go to|navigate to|look for|reach|head to)\s+"
r"(?:a |an |the |some )?" + re.escape(yolo_target),
re.IGNORECASE,
)
remainder = pattern.sub("", gl).strip()
# clean leftover noise
remainder = re.sub(r"^[,\s]+", "", remainder)
if remainder and len(remainder) > 3:
return remainder
return None
def _verify_condition(yolo_target: str, condition: str, img_b64) -> bool:
"""Use LLaVA to verify a compound condition (e.g. 'wearing red shirt')."""
if not condition or not img_b64:
return True # no extra condition — YOLO match is enough
prompt = (
f"You can see a {yolo_target} in this image. "
f"Is the following also true: '{condition}'? "
"Answer ONLY 'yes' or 'no'."
)
answer = call_llava(prompt, img_b64, num_predict=10)
return "yes" in answer.lower()
# ---------------------------------------------------------------------------
# main loop
# ---------------------------------------------------------------------------
def navigate_to_goal(goal: str, max_steps: int = 0):
"""
Rotate-and-scan loop.
1. A background thread keeps the robot rotating slowly.
2. Every SCAN_INTERVAL seconds the fast YOLO check fires.
3. If YOLO spots the target, optionally verify the compound condition.
4. If no YOLO match, fall back to LLaVA for guidance.
"""
if max_steps <= 0:
max_steps = MAX_STEPS
yolo_target = _goal_yolo_target(goal)
condition = _extract_extra_condition(goal, yolo_target) if yolo_target else None
if yolo_target:
print(f" [GoalNav] YOLO target: '{yolo_target}'"
f"{f' condition: {condition}' if condition else ''}")
else:
print(f" [GoalNav] No YOLO shortcut — relying on LLaVA")
# --- continuous rotation thread (with LiDAR obstacle pause) ---
rotating = [True]
def _obstacle_check():
try:
from API.lidar_api import obstacle_ahead
return obstacle_ahead()
except ImportError:
return False
def _rotate():
while rotating[0]:
if _obstacle_check():
send_vel(0.0, 0.0, 0.0)
time.sleep(0.2)
else:
send_vel(0.0, 0.0, ROTATION_SPEED)
time.sleep(0.05)
rot_thread = threading.Thread(target=_rotate, daemon=True)
rot_thread.start()
reached = False
try:
for step in range(1, max_steps + 1):
# Track whether real work happened this iteration. If it did,
# the work itself already ate wall time — don't pay an extra
# SCAN_INTERVAL nap on top.
did_work = False
# --- YOLO fast check ---
if yolo_target and yolo_sees(yolo_target):
img_b64 = get_frame()
did_work = True
if condition:
if not _verify_condition(yolo_target, condition, img_b64):
print(f" [GoalNav] YOLO sees {yolo_target} but condition "
f"'{condition}' not met — continuing")
# fall through to the sleep-skip path
else:
print(f" [GoalNav] YOLO confirmed '{yolo_target}' at step {step}")
log_detection(yolo_target, position="goal", distance="close")
reached = True
break
else:
print(f" [GoalNav] YOLO confirmed '{yolo_target}' at step {step}")
log_detection(yolo_target, position="goal", distance="close")
reached = True
break
# --- LLaVA fallback (less frequent — every few steps) ---
if step >= MIN_STEPS and step % MIN_STEPS == 0:
img_b64 = get_frame()
if img_b64:
did_work = True
d = ask_goal(goal, img_b64)
if d.get("reached"):
print(f" [GoalNav] LLaVA says goal reached at step {step}")
reached = True
break
speak = d.get("speak", "")
if speak:
print(f" [GoalNav] LLaVA: {speak}")
# Only pay the scan interval when nothing happened this step.
# If YOLO hit or LLaVA fired, they already took 501000 ms.
if not did_work:
time.sleep(SCAN_INTERVAL)
finally:
rotating[0] = False
rot_thread.join(timeout=1.0)
gradual_stop()
if reached:
print(f" [GoalNav] Arrived: '{goal}'")
else:
print(f" [GoalNav] Could not reach '{goal}' within {max_steps} steps")
return reached