Marcus/Vision/marcus_imgsearch.py

521 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
marcus_imgsearch.py — Image-Guided Search
==========================================
Project : Marcus | YS Lootah Technology
Hardware : Unitree G1 EDU + Jetson Orin NX
Purpose : Find a specific person or object by comparing camera frames
to a reference image provided by the user.
How it works
------------
1. User provides a reference image (photo of person or object)
2. Marcus starts rotating while scanning camera frames
3. Every step: LLaVA compares current frame to reference image
4. When match found: robot stops, reports location
5. Optional: YOLO pre-filter speeds up search (find person class first,
then LLaVA verifies it's the right person)
Usage in marcus_brain.py
------------------------
from marcus_imgsearch import ImageSearch
searcher = ImageSearch(get_frame_fn=get_frame, send_vel_fn=send_vel,
gradual_stop_fn=gradual_stop, llava_fn=_call_llava,
yolo_sees_fn=yolo_sees, model=OLLAMA_MODEL)
# Start search with a reference image (base64 JPEG)
result = searcher.search(ref_img_b64, hint="person in blue shirt", max_steps=60)
# result: {"found": True, "position": "center", "steps": 12, "description": "..."}
# Or from a file path
result = searcher.search_from_file("/tmp/target.jpg", hint="kassam")
Standalone test
---------------
python3 Vision/marcus_imgsearch.py --image /path/to/photo.jpg
Date : April 2026
"""
import base64
import io
import json
import os
import re
import sys
import threading
import time
from pathlib import Path
import numpy as np
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
# ══════════════════════════════════════════════════════════════════════════════
# CONFIGURATION (loaded from Config/config_ImageSearch.json)
# ══════════════════════════════════════════════════════════════════════════════
_PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_DIR not in sys.path:
sys.path.insert(0, _PROJECT_DIR)
try:
from Core.config_loader import load_config
_cfg = load_config("ImageSearch")
except Exception:
_cfg = {}
DEFAULT_MAX_STEPS = int(_cfg.get("default_max_steps", 60)) # rotation steps before giving up
STEP_DELAY = float(_cfg.get("step_delay_s", 0.15)) # min gap between YOLO checks
ROTATE_SPEED = float(_cfg.get("rotate_speed", 0.25)) # rad/s during search
MIN_STEPS_WARMUP = int(_cfg.get("min_steps_warmup", 3)) # skip first N steps (stale frame)
MATCH_CONFIDENCE_THR = 0.6 # LLaVA confidence threshold (reserved for future scoring)
# ══════════════════════════════════════════════════════════════════════════════
# IMAGE UTILITIES
# ══════════════════════════════════════════════════════════════════════════════
def _load_image_b64(path: str) -> str:
"""
Load an image file and return as base64 JPEG string.
Handles: JPEG, PNG, BMP, WEBP.
Resizes to 336x336 max for LLaVA efficiency.
Returns None if file not found or unreadable.
"""
if not PIL_AVAILABLE:
print(" [ImgSearch] PIL not available — pip install Pillow")
return None
try:
path = Path(path)
if not path.exists():
print(f" [ImgSearch] File not found: {path}")
return None
img = Image.open(path).convert("RGB")
# Resize to max 336x336 keeping aspect ratio
img.thumbnail((336, 336), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=85)
return base64.b64encode(buf.getvalue()).decode()
except Exception as e:
print(f" [ImgSearch] Cannot load image: {e}")
return None
def _numpy_to_b64(frame: np.ndarray, quality: int = 80) -> str:
"""Convert a BGR numpy frame to base64 JPEG."""
if not PIL_AVAILABLE or frame is None:
return None
try:
img = Image.fromarray(frame[:, :, ::-1]) # BGR → RGB
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=quality)
return base64.b64encode(buf.getvalue()).decode()
except Exception:
return None
def _resize_b64(img_b64: str, max_size: int = 336) -> str:
"""Resize a base64 image to max_size × max_size."""
if not PIL_AVAILABLE or not img_b64:
return img_b64
try:
raw = base64.b64decode(img_b64)
img = Image.open(io.BytesIO(raw)).convert("RGB")
img.thumbnail((max_size, max_size), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=85)
return base64.b64encode(buf.getvalue()).decode()
except Exception:
return img_b64
# ══════════════════════════════════════════════════════════════════════════════
# COMPARISON PROMPTS
# ══════════════════════════════════════════════════════════════════════════════
def _build_compare_prompt(hint: str = "") -> str:
"""
Build a LLaVA prompt that compares a reference image to the current camera frame.
The model receives TWO images: [reference, current_frame].
It must answer yes/no whether the target from the reference is visible
in the current frame.
"""
hint_line = f'The target is: "{hint}".' if hint else ""
return f"""You are helping a robot find a specific target.
IMAGE 1 (reference): Shows the target to find.
IMAGE 2 (current camera): Shows what the robot sees right now.
{hint_line}
Is the target from IMAGE 1 visible in IMAGE 2?
Answer ONLY with this JSON:
{{"found": false, "confidence": "low|medium|high", "position": "left|center|right|not visible", "description": "one sentence"}}
JSON:"""
def _build_single_prompt(hint: str) -> str:
"""
Prompt for when only current frame is available (no reference image).
Uses the hint description to search.
"""
return f"""You are a robot scanning for a target.
Target description: "{hint}"
Look at the camera image. Is the target visible?
Answer ONLY with this JSON:
{{"found": false, "confidence": "low|medium|high", "position": "left|center|right|not visible", "description": "one sentence"}}
JSON:"""
# ══════════════════════════════════════════════════════════════════════════════
# IMAGE SEARCH CLASS
# ══════════════════════════════════════════════════════════════════════════════
class ImageSearch:
"""
Image-guided robot search.
Rotates Marcus while comparing camera frames to a reference image.
Uses LLaVA for visual matching, YOLO as optional pre-filter.
Thread-safe. Supports abort via Ctrl+C.
Usage:
searcher = ImageSearch(
get_frame_fn=get_frame,
send_vel_fn=send_vel,
gradual_stop_fn=gradual_stop,
llava_fn=_call_llava,
yolo_sees_fn=yolo_sees,
model="qwen2.5vl:3b"
)
result = searcher.search(ref_img_b64, hint="person in blue shirt")
"""
def __init__(self, get_frame_fn, send_vel_fn, gradual_stop_fn,
llava_fn, yolo_sees_fn=None, model="qwen2.5vl:3b"):
self._get_frame = get_frame_fn
self._send_vel = send_vel_fn
self._gradual_stop = gradual_stop_fn
self._call_llava = llava_fn
self._yolo_sees = yolo_sees_fn
self._model = model
self._abort = [False]
def abort(self):
"""Signal the current search to stop."""
self._abort[0] = True
def search_from_file(self, image_path: str, hint: str = "",
max_steps: int = DEFAULT_MAX_STEPS,
direction: str = "left") -> dict:
"""
Search for a target shown in an image file.
Args:
image_path : path to JPEG/PNG reference image
hint : text description of the target (optional, improves accuracy)
max_steps : max rotation steps
direction : "left" or "right"
Returns:
{"found": bool, "position": str, "steps": int,
"description": str, "confidence": str}
"""
ref_b64 = _load_image_b64(image_path)
if ref_b64 is None:
return {"found": False, "position": "error",
"steps": 0, "description": "Could not load reference image",
"confidence": "none"}
return self.search(ref_b64, hint=hint, max_steps=max_steps, direction=direction)
def search(self, ref_img_b64: str = None, hint: str = "",
max_steps: int = DEFAULT_MAX_STEPS,
direction: str = "left",
yolo_prefilter: str = None) -> dict:
"""
Search for a target by rotating and comparing camera frames.
Args:
ref_img_b64 : reference image as base64 JPEG (None = use hint only)
hint : text description e.g. "person in blue shirt", "red backpack"
max_steps : max steps before giving up (each step ~0.4s)
direction : "left" or "right" rotation
yolo_prefilter: YOLO class to pre-filter (e.g. "person") before LLaVA check
None = always use LLaVA on every step
Returns:
dict with keys: found, position, steps, description, confidence
Edge cases:
- ref_img_b64 None + hint empty → warns, returns not found
- Camera not ready → waits up to 5s, then skips frame
- Keyboard interrupt → graceful stop
- YOLO not available → skips pre-filter, uses LLaVA only
- LLaVA error → logs and continues
"""
self._abort[0] = False
# Validate inputs
if ref_img_b64 is None and not hint:
print(" [ImgSearch] ⚠️ No reference image and no hint — cannot search")
return {"found": False, "position": "error", "steps": 0,
"description": "No reference image or hint provided", "confidence": "none"}
has_ref = ref_img_b64 is not None
# Choose prompt builder
if has_ref:
prompt = _build_compare_prompt(hint)
print(f"\n [ImgSearch] Reference image provided")
else:
prompt = _build_single_prompt(hint)
print(f"\n [ImgSearch] Text-only search: '{hint}'")
print(f" [ImgSearch] Direction: {direction} | Max steps: {max_steps}")
if yolo_prefilter:
print(f" [ImgSearch] YOLO pre-filter: '{yolo_prefilter}'")
print(f" [ImgSearch] Starting rotation...\n")
# Start continuous rotation
_keep_rotating = [True]
vyaw = ROTATE_SPEED if direction == "left" else -ROTATE_SPEED
def _rotate():
while _keep_rotating[0]:
self._send_vel(vyaw=vyaw)
time.sleep(0.05)
rot_thread = threading.Thread(target=_rotate, daemon=True)
rot_thread.start()
result = {"found": False, "position": "not visible",
"steps": 0, "description": "Not found", "confidence": "none"}
try:
for step in range(1, max_steps + 1):
if self._abort[0]:
print(" [ImgSearch] Aborted by user")
break
result["steps"] = step
time.sleep(STEP_DELAY)
# Warmup — skip first frames (stale)
if step <= MIN_STEPS_WARMUP:
print(f" [{step}/{max_steps}] Warming up...")
continue
# ── YOLO pre-filter ───────────────────────────────────────────
if yolo_prefilter and self._yolo_sees:
if not self._yolo_sees(yolo_prefilter):
print(f" [{step}/{max_steps}] YOLO: no {yolo_prefilter} — skip LLaVA")
continue
print(f" [{step}/{max_steps}] YOLO: {yolo_prefilter} found — running LLaVA comparison")
# ── Get current frame ─────────────────────────────────────────
current_frame = self._get_frame()
if current_frame is None:
print(f" [{step}/{max_steps}] ⏳ Camera not ready — skipping")
# Wait up to 3s for camera
for _ in range(6):
time.sleep(0.5)
current_frame = self._get_frame()
if current_frame:
break
if current_frame is None:
continue
# ── LLaVA comparison ─────────────────────────────────────────
print(f" [{step}/{max_steps}] 🔍 LLaVA comparing...")
try:
if has_ref:
# Pass BOTH images: [reference, current_frame]. Route through
# the shared Ollama client (so VLM-off and remote-host config
# are honored) and mirror the compute-graph caps.
from API.llava_api import NUM_BATCH, NUM_CTX, VLM_ENABLED, _client as _llava_client
if not VLM_ENABLED:
print(f" [{step}/{max_steps}] VLM disabled — skipping image-match")
continue
r = _llava_client.chat(
model=self._model,
messages=[{
"role": "user",
"content": prompt,
"images": [ref_img_b64, current_frame]
}],
options={
"temperature": 0.0,
"num_predict": 60,
"num_batch": NUM_BATCH,
"num_ctx": NUM_CTX,
}
)
raw = r["message"]["content"].strip()
else:
# Text-only description search
raw = self._call_llava(prompt, current_frame, num_predict=60)
# Parse response
raw_clean = raw.replace("```json", "").replace("```", "").strip()
s = raw_clean.find("{"); e = raw_clean.rfind("}") + 1
d = json.loads(raw_clean[s:e]) if s != -1 and e > 0 else None
if d is None:
print(f" [{step}/{max_steps}] ⚠️ Bad JSON: {raw[:60]}")
continue
found = d.get("found", False)
confidence = d.get("confidence", "low")
position = d.get("position", "not visible")
description= d.get("description", "")
if isinstance(found, str):
found = found.lower() in ("true", "yes", "1")
print(f" [{step}/{max_steps}] {'✅ MATCH' if found else '❌ no match'} "
f"| conf={confidence} | pos={position} | {description[:60]}")
if found and confidence in ("medium", "high"):
result = {
"found": True,
"position": position,
"steps": step,
"description": description,
"confidence": confidence,
}
break
except json.JSONDecodeError:
print(f" [{step}/{max_steps}] ⚠️ JSON parse error")
except Exception as ex:
print(f" [{step}/{max_steps}] ⚠️ LLaVA error: {ex}")
except KeyboardInterrupt:
print("\n [ImgSearch] Interrupted by user")
finally:
_keep_rotating[0] = False
self._gradual_stop()
# Print summary
if result["found"]:
print(f"\n ✅ Target found at step {result['steps']}!")
print(f" 📍 Position: {result['position']}")
print(f" 📝 {result['description']}\n")
else:
print(f"\n ❌ Target not found after {result['steps']} steps\n")
return result
# ══════════════════════════════════════════════════════════════════════════════
# WIRE INTO marcus_brain.py — add to main loop
# ══════════════════════════════════════════════════════════════════════════════
"""
Add to marcus_brain.py imports:
from marcus_imgsearch import ImageSearch
Add after Memory init:
_img_searcher = ImageSearch(
get_frame_fn = get_frame,
send_vel_fn = send_vel,
gradual_stop_fn = gradual_stop,
llava_fn = _call_llava,
yolo_sees_fn = yolo_sees,
model = OLLAMA_MODEL,
)
Add to main loop (before standard LLaVA command):
# Image search — "search/ path/to/photo.jpg [hint]"
# or "search/ hint text only"
if cmd.lower().startswith("search/"):
args = cmd[7:].strip()
# Check if first arg is a file path
parts = args.split(None, 1)
if parts and os.path.exists(parts[0]):
img_path = parts[0]
hint = parts[1] if len(parts) > 1 else ""
print(f" [Search] Reference image: {img_path}")
print(f" [Search] Hint: '{hint}'")
_img_searcher.search_from_file(
img_path, hint=hint, yolo_prefilter="person" if "person" in hint or not hint else None
)
else:
# Text description only
hint = args
print(f" [Search] Text search: '{hint}'")
_img_searcher.search(ref_img_b64=None, hint=hint)
continue
"""
# ══════════════════════════════════════════════════════════════════════════════
# STANDALONE TEST
# ══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Marcus Image Search — standalone test")
parser.add_argument("--image", type=str, default=None, help="Reference image path")
parser.add_argument("--hint", type=str, default="", help="Text description of target")
parser.add_argument("--steps", type=int, default=20, help="Max search steps")
parser.add_argument("--test", action="store_true", help="Run self-test without robot")
args = parser.parse_args()
if args.test:
# Test image loading and prompt building
print("=== Self-test (no robot) ===\n")
print("1. Prompt builder test:")
p = _build_compare_prompt("person in blue shirt")
print(f" With hint: {p[:80]}...")
p2 = _build_compare_prompt()
print(f" No hint: {p2[:80]}...")
print("\n2. Single prompt test:")
p3 = _build_single_prompt("red backpack near the door")
print(f" {p3[:80]}...")
if args.image:
print("\n3. Image loading test:")
b64 = _load_image_b64(args.image)
if b64:
size = len(b64) * 3 // 4 // 1024
print(f" Loaded: ~{size}KB")
else:
print(" Failed to load")
print("\nSelf-test complete.")
elif args.image or args.hint:
# Real search — needs robot hardware
print("Real search requires robot hardware.")
print("Import ImageSearch into marcus_brain.py instead.")
print(f" image: {args.image}")
print(f" hint: {args.hint}")
else:
parser.print_help()