""" marcus_imgsearch.py — Image-Guided Search ========================================== Project : Marcus | YS Lootah Technology Hardware : Unitree G1 EDU + Jetson Orin NX Purpose : Find a specific person or object by comparing camera frames to a reference image provided by the user. How it works ------------ 1. User provides a reference image (photo of person or object) 2. Marcus starts rotating while scanning camera frames 3. Every step: LLaVA compares current frame to reference image 4. When match found: robot stops, reports location 5. Optional: YOLO pre-filter speeds up search (find person class first, then LLaVA verifies it's the right person) Usage in marcus_llava.py ------------------------ from marcus_imgsearch import ImageSearch searcher = ImageSearch(get_frame_fn=get_frame, send_vel_fn=send_vel, gradual_stop_fn=gradual_stop, llava_fn=_call_llava, yolo_sees_fn=yolo_sees, model=OLLAMA_MODEL) # Start search with a reference image (base64 JPEG) result = searcher.search(ref_img_b64, hint="person in blue shirt", max_steps=60) # result: {"found": True, "position": "center", "steps": 12, "description": "..."} # Or from a file path result = searcher.search_from_file("/tmp/target.jpg", hint="kassam") Standalone test --------------- python3 ~/Models_marcus/marcus_imgsearch.py --image /path/to/photo.jpg Date : April 2026 """ import base64 import io import json import time import threading import os import re from pathlib import Path import numpy as np try: from PIL import Image PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False # ══════════════════════════════════════════════════════════════════════════════ # CONFIGURATION # ══════════════════════════════════════════════════════════════════════════════ DEFAULT_MAX_STEPS = 60 # max rotation steps before giving up STEP_DELAY = 0.15 # min gap between YOLO checks (was 0.4 — reduced # because the rotation thread paces motion already # and each LLaVA call is 600-1500 ms of real work) ROTATE_SPEED = 0.25 # rad/s rotation speed during search MIN_STEPS_WARMUP = 3 # skip first N steps (stale frame) MATCH_CONFIDENCE_THR = 0.6 # LLaVA confidence threshold (not used directly, # but kept for future scoring) # ══════════════════════════════════════════════════════════════════════════════ # IMAGE UTILITIES # ══════════════════════════════════════════════════════════════════════════════ def _load_image_b64(path: str) -> str: """ Load an image file and return as base64 JPEG string. Handles: JPEG, PNG, BMP, WEBP. Resizes to 336x336 max for LLaVA efficiency. Returns None if file not found or unreadable. """ if not PIL_AVAILABLE: print(" [ImgSearch] PIL not available — pip install Pillow") return None try: path = Path(path) if not path.exists(): print(f" [ImgSearch] File not found: {path}") return None img = Image.open(path).convert("RGB") # Resize to max 336x336 keeping aspect ratio img.thumbnail((336, 336), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=85) return base64.b64encode(buf.getvalue()).decode() except Exception as e: print(f" [ImgSearch] Cannot load image: {e}") return None def _numpy_to_b64(frame: np.ndarray, quality: int = 80) -> str: """Convert a BGR numpy frame to base64 JPEG.""" if not PIL_AVAILABLE or frame is None: return None try: img = Image.fromarray(frame[:, :, ::-1]) # BGR → RGB buf = io.BytesIO() img.save(buf, format="JPEG", quality=quality) return base64.b64encode(buf.getvalue()).decode() except Exception: return None def _resize_b64(img_b64: str, max_size: int = 336) -> str: """Resize a base64 image to max_size × max_size.""" if not PIL_AVAILABLE or not img_b64: return img_b64 try: raw = base64.b64decode(img_b64) img = Image.open(io.BytesIO(raw)).convert("RGB") img.thumbnail((max_size, max_size), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=85) return base64.b64encode(buf.getvalue()).decode() except Exception: return img_b64 # ══════════════════════════════════════════════════════════════════════════════ # COMPARISON PROMPTS # ══════════════════════════════════════════════════════════════════════════════ def _build_compare_prompt(hint: str = "") -> str: """ Build a LLaVA prompt that compares a reference image to the current camera frame. The model receives TWO images: [reference, current_frame]. It must answer yes/no whether the target from the reference is visible in the current frame. """ hint_line = f'The target is: "{hint}".' if hint else "" return f"""You are helping a robot find a specific target. IMAGE 1 (reference): Shows the target to find. IMAGE 2 (current camera): Shows what the robot sees right now. {hint_line} Is the target from IMAGE 1 visible in IMAGE 2? Answer ONLY with this JSON: {{"found": false, "confidence": "low|medium|high", "position": "left|center|right|not visible", "description": "one sentence"}} JSON:""" def _build_single_prompt(hint: str) -> str: """ Prompt for when only current frame is available (no reference image). Uses the hint description to search. """ return f"""You are a robot scanning for a target. Target description: "{hint}" Look at the camera image. Is the target visible? Answer ONLY with this JSON: {{"found": false, "confidence": "low|medium|high", "position": "left|center|right|not visible", "description": "one sentence"}} JSON:""" # ══════════════════════════════════════════════════════════════════════════════ # IMAGE SEARCH CLASS # ══════════════════════════════════════════════════════════════════════════════ class ImageSearch: """ Image-guided robot search. Rotates Marcus while comparing camera frames to a reference image. Uses LLaVA for visual matching, YOLO as optional pre-filter. Thread-safe. Supports abort via Ctrl+C. Usage: searcher = ImageSearch( get_frame_fn=get_frame, send_vel_fn=send_vel, gradual_stop_fn=gradual_stop, llava_fn=_call_llava, yolo_sees_fn=yolo_sees, model="qwen2.5vl:3b" ) result = searcher.search(ref_img_b64, hint="person in blue shirt") """ def __init__(self, get_frame_fn, send_vel_fn, gradual_stop_fn, llava_fn, yolo_sees_fn=None, model="qwen2.5vl:3b"): self._get_frame = get_frame_fn self._send_vel = send_vel_fn self._gradual_stop = gradual_stop_fn self._call_llava = llava_fn self._yolo_sees = yolo_sees_fn self._model = model self._abort = [False] def abort(self): """Signal the current search to stop.""" self._abort[0] = True def search_from_file(self, image_path: str, hint: str = "", max_steps: int = DEFAULT_MAX_STEPS, direction: str = "left") -> dict: """ Search for a target shown in an image file. Args: image_path : path to JPEG/PNG reference image hint : text description of the target (optional, improves accuracy) max_steps : max rotation steps direction : "left" or "right" Returns: {"found": bool, "position": str, "steps": int, "description": str, "confidence": str} """ ref_b64 = _load_image_b64(image_path) if ref_b64 is None: return {"found": False, "position": "error", "steps": 0, "description": "Could not load reference image", "confidence": "none"} return self.search(ref_b64, hint=hint, max_steps=max_steps, direction=direction) def search(self, ref_img_b64: str = None, hint: str = "", max_steps: int = DEFAULT_MAX_STEPS, direction: str = "left", yolo_prefilter: str = None) -> dict: """ Search for a target by rotating and comparing camera frames. Args: ref_img_b64 : reference image as base64 JPEG (None = use hint only) hint : text description e.g. "person in blue shirt", "red backpack" max_steps : max steps before giving up (each step ~0.4s) direction : "left" or "right" rotation yolo_prefilter: YOLO class to pre-filter (e.g. "person") before LLaVA check None = always use LLaVA on every step Returns: dict with keys: found, position, steps, description, confidence Edge cases: - ref_img_b64 None + hint empty → warns, returns not found - Camera not ready → waits up to 5s, then skips frame - Keyboard interrupt → graceful stop - YOLO not available → skips pre-filter, uses LLaVA only - LLaVA error → logs and continues """ self._abort[0] = False # Validate inputs if ref_img_b64 is None and not hint: print(" [ImgSearch] ⚠️ No reference image and no hint — cannot search") return {"found": False, "position": "error", "steps": 0, "description": "No reference image or hint provided", "confidence": "none"} has_ref = ref_img_b64 is not None # Choose prompt builder if has_ref: prompt = _build_compare_prompt(hint) print(f"\n [ImgSearch] Reference image provided") else: prompt = _build_single_prompt(hint) print(f"\n [ImgSearch] Text-only search: '{hint}'") print(f" [ImgSearch] Direction: {direction} | Max steps: {max_steps}") if yolo_prefilter: print(f" [ImgSearch] YOLO pre-filter: '{yolo_prefilter}'") print(f" [ImgSearch] Starting rotation...\n") # Start continuous rotation _keep_rotating = [True] vyaw = ROTATE_SPEED if direction == "left" else -ROTATE_SPEED def _rotate(): while _keep_rotating[0]: self._send_vel(vyaw=vyaw) time.sleep(0.05) rot_thread = threading.Thread(target=_rotate, daemon=True) rot_thread.start() result = {"found": False, "position": "not visible", "steps": 0, "description": "Not found", "confidence": "none"} try: for step in range(1, max_steps + 1): if self._abort[0]: print(" [ImgSearch] Aborted by user") break result["steps"] = step time.sleep(STEP_DELAY) # Warmup — skip first frames (stale) if step <= MIN_STEPS_WARMUP: print(f" [{step}/{max_steps}] Warming up...") continue # ── YOLO pre-filter ─────────────────────────────────────────── if yolo_prefilter and self._yolo_sees: if not self._yolo_sees(yolo_prefilter): print(f" [{step}/{max_steps}] YOLO: no {yolo_prefilter} — skip LLaVA") continue print(f" [{step}/{max_steps}] YOLO: {yolo_prefilter} found — running LLaVA comparison") # ── Get current frame ───────────────────────────────────────── current_frame = self._get_frame() if current_frame is None: print(f" [{step}/{max_steps}] ⏳ Camera not ready — skipping") # Wait up to 3s for camera for _ in range(6): time.sleep(0.5) current_frame = self._get_frame() if current_frame: break if current_frame is None: continue # ── LLaVA comparison ───────────────────────────────────────── print(f" [{step}/{max_steps}] 🔍 LLaVA comparing...") try: if has_ref: # Pass BOTH images: [reference, current_frame] # num_batch/num_ctx mirror llava_api.py — without these # caps the compute graph OOMs the runner on Jetson. import ollama as _ollama from API.llava_api import NUM_BATCH, NUM_CTX r = _ollama.chat( model=self._model, messages=[{ "role": "user", "content": prompt, "images": [ref_img_b64, current_frame] }], options={ "temperature": 0.0, "num_predict": 60, "num_batch": NUM_BATCH, "num_ctx": NUM_CTX, } ) raw = r["message"]["content"].strip() else: # Text-only description search raw = self._call_llava(prompt, current_frame, num_predict=60) # Parse response raw_clean = raw.replace("```json", "").replace("```", "").strip() s = raw_clean.find("{"); e = raw_clean.rfind("}") + 1 d = json.loads(raw_clean[s:e]) if s != -1 and e > 0 else None if d is None: print(f" [{step}/{max_steps}] ⚠️ Bad JSON: {raw[:60]}") continue found = d.get("found", False) confidence = d.get("confidence", "low") position = d.get("position", "not visible") description= d.get("description", "") if isinstance(found, str): found = found.lower() in ("true", "yes", "1") print(f" [{step}/{max_steps}] {'✅ MATCH' if found else '❌ no match'} " f"| conf={confidence} | pos={position} | {description[:60]}") if found and confidence in ("medium", "high"): result = { "found": True, "position": position, "steps": step, "description": description, "confidence": confidence, } break except json.JSONDecodeError: print(f" [{step}/{max_steps}] ⚠️ JSON parse error") except Exception as ex: print(f" [{step}/{max_steps}] ⚠️ LLaVA error: {ex}") except KeyboardInterrupt: print("\n [ImgSearch] Interrupted by user") finally: _keep_rotating[0] = False self._gradual_stop() # Print summary if result["found"]: print(f"\n ✅ Target found at step {result['steps']}!") print(f" 📍 Position: {result['position']}") print(f" 📝 {result['description']}\n") else: print(f"\n ❌ Target not found after {result['steps']} steps\n") return result # ══════════════════════════════════════════════════════════════════════════════ # WIRE INTO marcus_llava.py — add to main loop # ══════════════════════════════════════════════════════════════════════════════ """ Add to marcus_llava.py imports: from marcus_imgsearch import ImageSearch Add after Memory init: _img_searcher = ImageSearch( get_frame_fn = get_frame, send_vel_fn = send_vel, gradual_stop_fn = gradual_stop, llava_fn = _call_llava, yolo_sees_fn = yolo_sees, model = OLLAMA_MODEL, ) Add to main loop (before standard LLaVA command): # Image search — "search/ path/to/photo.jpg [hint]" # or "search/ hint text only" if cmd.lower().startswith("search/"): args = cmd[7:].strip() # Check if first arg is a file path parts = args.split(None, 1) if parts and os.path.exists(parts[0]): img_path = parts[0] hint = parts[1] if len(parts) > 1 else "" print(f" [Search] Reference image: {img_path}") print(f" [Search] Hint: '{hint}'") _img_searcher.search_from_file( img_path, hint=hint, yolo_prefilter="person" if "person" in hint or not hint else None ) else: # Text description only hint = args print(f" [Search] Text search: '{hint}'") _img_searcher.search(ref_img_b64=None, hint=hint) continue """ # ══════════════════════════════════════════════════════════════════════════════ # STANDALONE TEST # ══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Marcus Image Search — standalone test") parser.add_argument("--image", type=str, default=None, help="Reference image path") parser.add_argument("--hint", type=str, default="", help="Text description of target") parser.add_argument("--steps", type=int, default=20, help="Max search steps") parser.add_argument("--test", action="store_true", help="Run self-test without robot") args = parser.parse_args() if args.test: # Test image loading and prompt building print("=== Self-test (no robot) ===\n") print("1. Prompt builder test:") p = _build_compare_prompt("person in blue shirt") print(f" With hint: {p[:80]}...") p2 = _build_compare_prompt() print(f" No hint: {p2[:80]}...") print("\n2. Single prompt test:") p3 = _build_single_prompt("red backpack near the door") print(f" {p3[:80]}...") if args.image: print("\n3. Image loading test:") b64 = _load_image_b64(args.image) if b64: size = len(b64) * 3 // 4 // 1024 print(f" Loaded: ~{size}KB") else: print(" Failed to load") print("\nSelf-test complete.") elif args.image or args.hint: # Real search — needs robot hardware print("Real search requires robot hardware.") print("Import ImageSearch into marcus_llava.py instead.") print(f" image: {args.image}") print(f" hint: {args.hint}") else: parser.print_help()