203 lines
7.0 KiB
Python
203 lines
7.0 KiB
Python
"""
|
||
goal_nav.py — Goal-oriented navigation using YOLO fast-check + LLaVA fallback
|
||
"""
|
||
import re
|
||
import time
|
||
import threading
|
||
|
||
from API.zmq_api import send_vel, gradual_stop
|
||
from API.camera_api import get_frame
|
||
from API.yolo_api import yolo_sees, yolo_closest, yolo_all_classes
|
||
from API.llava_api import call_llava, ask_goal, OLLAMA_MODEL
|
||
from API.memory_api import log_detection
|
||
from Core.config_loader import load_config
|
||
|
||
# Persist navigation events to logs/navigation.log so a tour-guide / patrol
|
||
# session leaves a post-mortem audit trail (every goal, every step, every
|
||
# YOLO-vs-LLaVA arbitration). Terminal output is unchanged — _nlog prints
|
||
# the same indented " [GoalNav] ..." line callers expect AND mirrors it
|
||
# to the log. Logger failures are swallowed so logging glitches can never
|
||
# crash the navigation loop.
|
||
try:
|
||
from Core.logger import log as _core_log
|
||
except Exception:
|
||
_core_log = None
|
||
|
||
|
||
def _nlog(msg: str, level: str = "info") -> None:
|
||
"""Print and persist to logs/navigation.log."""
|
||
print(f" [GoalNav] {msg}")
|
||
if _core_log is not None:
|
||
try:
|
||
_core_log(f"[GoalNav] {msg}", level, "navigation")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
_cfg = load_config("Navigation")
|
||
|
||
GOAL_ALIASES = _cfg["goal_aliases"]
|
||
YOLO_CLASSES = _cfg["yolo_goal_classes"]
|
||
MAX_STEPS = _cfg["max_steps"]
|
||
MIN_STEPS = _cfg["min_steps_before_check"]
|
||
SCAN_INTERVAL = _cfg["scan_interval_s"]
|
||
ROTATION_SPEED = _cfg["rotation_speed"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _goal_yolo_target(goal: str):
|
||
"""Map a natural-language goal to a YOLO class name, or None."""
|
||
gl = goal.lower()
|
||
for alias, cls in GOAL_ALIASES.items():
|
||
if alias in gl:
|
||
return cls
|
||
for cls in YOLO_CLASSES:
|
||
if cls in gl:
|
||
return cls
|
||
return None
|
||
|
||
|
||
def _extract_extra_condition(goal: str, yolo_target: str):
|
||
"""
|
||
Pull a compound condition out of a goal string.
|
||
e.g. 'find a person wearing a red shirt' -> 'wearing a red shirt'
|
||
Returns the extra condition text, or None if there is no qualifier.
|
||
"""
|
||
gl = goal.lower()
|
||
# strip the core target noun from the goal to isolate the qualifier
|
||
if yolo_target:
|
||
# remove everything up to and including the target noun
|
||
pattern = re.compile(
|
||
r"(?:find|go to|navigate to|look for|reach|head to)\s+"
|
||
r"(?:a |an |the |some )?" + re.escape(yolo_target),
|
||
re.IGNORECASE,
|
||
)
|
||
remainder = pattern.sub("", gl).strip()
|
||
# clean leftover noise
|
||
remainder = re.sub(r"^[,\s]+", "", remainder)
|
||
if remainder and len(remainder) > 3:
|
||
return remainder
|
||
return None
|
||
|
||
|
||
def _verify_condition(yolo_target: str, condition: str, img_b64) -> bool:
|
||
"""Use LLaVA to verify a compound condition (e.g. 'wearing red shirt')."""
|
||
if not condition or not img_b64:
|
||
return True # no extra condition — YOLO match is enough
|
||
prompt = (
|
||
f"You can see a {yolo_target} in this image. "
|
||
f"Is the following also true: '{condition}'? "
|
||
"Answer ONLY 'yes' or 'no'."
|
||
)
|
||
answer = call_llava(prompt, img_b64, num_predict=10)
|
||
return "yes" in answer.lower()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# main loop
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def navigate_to_goal(goal: str, max_steps: int = 0):
|
||
"""
|
||
Rotate-and-scan loop.
|
||
1. A background thread keeps the robot rotating slowly.
|
||
2. Every SCAN_INTERVAL seconds the fast YOLO check fires.
|
||
3. If YOLO spots the target, optionally verify the compound condition.
|
||
4. If no YOLO match, fall back to LLaVA for guidance.
|
||
"""
|
||
if max_steps <= 0:
|
||
max_steps = MAX_STEPS
|
||
|
||
yolo_target = _goal_yolo_target(goal)
|
||
condition = _extract_extra_condition(goal, yolo_target) if yolo_target else None
|
||
|
||
if yolo_target:
|
||
_nlog(f"YOLO target: '{yolo_target}'"
|
||
f"{f' condition: {condition}' if condition else ''}")
|
||
else:
|
||
_nlog("No YOLO shortcut — relying on LLaVA")
|
||
|
||
# --- continuous rotation thread (with LiDAR obstacle pause) ---
|
||
rotating = [True]
|
||
|
||
def _obstacle_check():
|
||
try:
|
||
from API.lidar_api import obstacle_ahead
|
||
return obstacle_ahead()
|
||
except ImportError:
|
||
return False
|
||
|
||
def _rotate():
|
||
while rotating[0]:
|
||
if _obstacle_check():
|
||
send_vel(0.0, 0.0, 0.0)
|
||
time.sleep(0.2)
|
||
else:
|
||
send_vel(0.0, 0.0, ROTATION_SPEED)
|
||
time.sleep(0.05)
|
||
|
||
rot_thread = threading.Thread(target=_rotate, daemon=True)
|
||
rot_thread.start()
|
||
|
||
reached = False
|
||
try:
|
||
for step in range(1, max_steps + 1):
|
||
# Track whether real work happened this iteration. If it did,
|
||
# the work itself already ate wall time — don't pay an extra
|
||
# SCAN_INTERVAL nap on top.
|
||
did_work = False
|
||
|
||
# --- YOLO fast check ---
|
||
if yolo_target and yolo_sees(yolo_target):
|
||
img_b64 = get_frame()
|
||
did_work = True
|
||
if condition:
|
||
if not _verify_condition(yolo_target, condition, img_b64):
|
||
_nlog(f"YOLO sees {yolo_target} but condition "
|
||
f"'{condition}' not met — continuing")
|
||
# fall through to the sleep-skip path
|
||
else:
|
||
_nlog(f"YOLO confirmed '{yolo_target}' at step {step}")
|
||
log_detection(yolo_target, position="goal", distance="close")
|
||
reached = True
|
||
break
|
||
else:
|
||
_nlog(f"YOLO confirmed '{yolo_target}' at step {step}")
|
||
log_detection(yolo_target, position="goal", distance="close")
|
||
reached = True
|
||
break
|
||
|
||
# --- LLaVA fallback (less frequent — every few steps) ---
|
||
if step >= MIN_STEPS and step % MIN_STEPS == 0:
|
||
img_b64 = get_frame()
|
||
if img_b64:
|
||
did_work = True
|
||
d = ask_goal(goal, img_b64)
|
||
if d.get("reached"):
|
||
_nlog(f"LLaVA says goal reached at step {step}")
|
||
reached = True
|
||
break
|
||
speak = d.get("speak", "")
|
||
if speak:
|
||
_nlog(f"LLaVA: {speak}")
|
||
|
||
# Only pay the scan interval when nothing happened this step.
|
||
# If YOLO hit or LLaVA fired, they already took 50–1000 ms.
|
||
if not did_work:
|
||
time.sleep(SCAN_INTERVAL)
|
||
|
||
finally:
|
||
rotating[0] = False
|
||
rot_thread.join(timeout=1.0)
|
||
gradual_stop()
|
||
|
||
if reached:
|
||
_nlog(f"Arrived: '{goal}'")
|
||
else:
|
||
_nlog(f"Could not reach '{goal}' within {max_steps} steps", "warn")
|
||
|
||
return reached
|