166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
"""
|
|
goal_nav.py — Goal-oriented navigation using YOLO fast-check + LLaVA fallback
|
|
"""
|
|
import re
|
|
import time
|
|
import threading
|
|
|
|
from API.zmq_api import send_vel, gradual_stop
|
|
from API.camera_api import get_frame
|
|
from API.yolo_api import yolo_sees, yolo_closest, yolo_all_classes
|
|
from API.llava_api import call_llava, ask_goal, OLLAMA_MODEL
|
|
from API.memory_api import log_detection
|
|
from Core.config_loader import load_config
|
|
|
|
_cfg = load_config("Navigation")
|
|
|
|
GOAL_ALIASES = _cfg["goal_aliases"]
|
|
YOLO_CLASSES = _cfg["yolo_goal_classes"]
|
|
MAX_STEPS = _cfg["max_steps"]
|
|
MIN_STEPS = _cfg["min_steps_before_check"]
|
|
SCAN_INTERVAL = _cfg["scan_interval_s"]
|
|
ROTATION_SPEED = _cfg["rotation_speed"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _goal_yolo_target(goal: str):
|
|
"""Map a natural-language goal to a YOLO class name, or None."""
|
|
gl = goal.lower()
|
|
for alias, cls in GOAL_ALIASES.items():
|
|
if alias in gl:
|
|
return cls
|
|
for cls in YOLO_CLASSES:
|
|
if cls in gl:
|
|
return cls
|
|
return None
|
|
|
|
|
|
def _extract_extra_condition(goal: str, yolo_target: str):
|
|
"""
|
|
Pull a compound condition out of a goal string.
|
|
e.g. 'find a person wearing a red shirt' -> 'wearing a red shirt'
|
|
Returns the extra condition text, or None if there is no qualifier.
|
|
"""
|
|
gl = goal.lower()
|
|
# strip the core target noun from the goal to isolate the qualifier
|
|
if yolo_target:
|
|
# remove everything up to and including the target noun
|
|
pattern = re.compile(
|
|
r"(?:find|go to|navigate to|look for|reach|head to)\s+"
|
|
r"(?:a |an |the |some )?" + re.escape(yolo_target),
|
|
re.IGNORECASE,
|
|
)
|
|
remainder = pattern.sub("", gl).strip()
|
|
# clean leftover noise
|
|
remainder = re.sub(r"^[,\s]+", "", remainder)
|
|
if remainder and len(remainder) > 3:
|
|
return remainder
|
|
return None
|
|
|
|
|
|
def _verify_condition(yolo_target: str, condition: str, img_b64) -> bool:
|
|
"""Use LLaVA to verify a compound condition (e.g. 'wearing red shirt')."""
|
|
if not condition or not img_b64:
|
|
return True # no extra condition — YOLO match is enough
|
|
prompt = (
|
|
f"You can see a {yolo_target} in this image. "
|
|
f"Is the following also true: '{condition}'? "
|
|
"Answer ONLY 'yes' or 'no'."
|
|
)
|
|
answer = call_llava(prompt, img_b64, num_predict=10)
|
|
return "yes" in answer.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# main loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def navigate_to_goal(goal: str, max_steps: int = 0):
|
|
"""
|
|
Rotate-and-scan loop.
|
|
1. A background thread keeps the robot rotating slowly.
|
|
2. Every SCAN_INTERVAL seconds the fast YOLO check fires.
|
|
3. If YOLO spots the target, optionally verify the compound condition.
|
|
4. If no YOLO match, fall back to LLaVA for guidance.
|
|
"""
|
|
if max_steps <= 0:
|
|
max_steps = MAX_STEPS
|
|
|
|
yolo_target = _goal_yolo_target(goal)
|
|
condition = _extract_extra_condition(goal, yolo_target) if yolo_target else None
|
|
|
|
if yolo_target:
|
|
print(f" [GoalNav] YOLO target: '{yolo_target}'"
|
|
f"{f' condition: {condition}' if condition else ''}")
|
|
else:
|
|
print(f" [GoalNav] No YOLO shortcut — relying on LLaVA")
|
|
|
|
# --- continuous rotation thread (with LiDAR obstacle pause) ---
|
|
rotating = [True]
|
|
|
|
def _obstacle_check():
|
|
try:
|
|
from API.lidar_api import obstacle_ahead
|
|
return obstacle_ahead()
|
|
except ImportError:
|
|
return False
|
|
|
|
def _rotate():
|
|
while rotating[0]:
|
|
if _obstacle_check():
|
|
send_vel(0.0, 0.0, 0.0)
|
|
time.sleep(0.2)
|
|
else:
|
|
send_vel(0.0, 0.0, ROTATION_SPEED)
|
|
time.sleep(0.05)
|
|
|
|
rot_thread = threading.Thread(target=_rotate, daemon=True)
|
|
rot_thread.start()
|
|
|
|
reached = False
|
|
try:
|
|
for step in range(1, max_steps + 1):
|
|
time.sleep(SCAN_INTERVAL)
|
|
|
|
# --- YOLO fast check ---
|
|
if yolo_target and yolo_sees(yolo_target):
|
|
img_b64 = get_frame()
|
|
if condition:
|
|
if not _verify_condition(yolo_target, condition, img_b64):
|
|
print(f" [GoalNav] YOLO sees {yolo_target} but condition "
|
|
f"'{condition}' not met — continuing")
|
|
continue
|
|
|
|
print(f" [GoalNav] YOLO confirmed '{yolo_target}' at step {step}")
|
|
log_detection(yolo_target, position="goal", distance="close")
|
|
reached = True
|
|
break
|
|
|
|
# --- LLaVA fallback (less frequent — every few steps) ---
|
|
if step >= MIN_STEPS and step % MIN_STEPS == 0:
|
|
img_b64 = get_frame()
|
|
if img_b64:
|
|
d = ask_goal(goal, img_b64)
|
|
if d.get("reached"):
|
|
print(f" [GoalNav] LLaVA says goal reached at step {step}")
|
|
reached = True
|
|
break
|
|
speak = d.get("speak", "")
|
|
if speak:
|
|
print(f" [GoalNav] LLaVA: {speak}")
|
|
|
|
finally:
|
|
rotating[0] = False
|
|
rot_thread.join(timeout=1.0)
|
|
gradual_stop()
|
|
|
|
if reached:
|
|
print(f" [GoalNav] Arrived: '{goal}'")
|
|
else:
|
|
print(f" [GoalNav] Could not reach '{goal}' within {max_steps} steps")
|
|
|
|
return reached
|