""" marcus_yolo.py — Marcus Vision Module ======================================= Project : Marcus | YS Lootah Technology Purpose : YOLO-based person + object detection Import this module in marcus_brain.py — runs as background thread Usage (imported): from marcus_yolo import start_yolo, yolo_sees, yolo_count, yolo_closest, yolo_summary Usage (standalone): conda run -n marcus python3 Vision/marcus_yolo.py """ import os import sys import time import threading import json import numpy as np from collections import defaultdict # ── Configuration ───────────────────────────────────────────────────────────── _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) try: from Core.config_loader import load_config _cfg = load_config("Vision") except Exception as _e: print(f" [YOLO] config_Vision.json not loaded ({_e}) — using defaults") _cfg = {} # Logger — every YOLO event also persists to logs/vision.log so post-mortem # audits can reconstruct what the eyes saw, when models loaded/failed, and # when inference errors fired. Helper keeps the terminal output unchanged # (still prints " [YOLO] ..." with the existing indent) but ALSO writes # a clean "[YOLO] ..." line to vision.log via Core.logger. try: from Core.logger import log as _core_log except Exception: _core_log = None def _vlog(msg: str, level: str = "info") -> None: """Print to terminal AND append to logs/vision.log. Logger failures are swallowed so any logging glitch can't crash the inference loop.""" print(f" [YOLO] {msg}") if _core_log is not None: try: _core_log(f"[YOLO] {msg}", level, "vision") except Exception: pass YOLO_MODEL_PATH = os.path.join(_PROJECT_ROOT, _cfg.get("yolo_model_path", "Models/yolov8m.pt")) YOLO_CONFIDENCE = float(_cfg.get("yolo_confidence", 0.45)) YOLO_IOU = float(_cfg.get("yolo_iou", 0.45)) YOLO_DEVICE = _cfg.get("yolo_device", "cpu") # "cpu" | "cuda" | "0" | "cuda:N" YOLO_IMG_SIZE = int(_cfg.get("yolo_img_size", 320)) YOLO_HALF = bool(_cfg.get("yolo_half", True)) # FP16 on GPU (ignored on CPU) # FPS cap. On CPU, Orin NX manages ~2-3 FPS of YOLOv8m @ 320px. We throttle # lower so CPU inference doesn't compete with Holosoma for cycles. On CUDA, # value is irrelevant (GPU is fast enough that the existing 0.02 s sleep # already caps at ~21 FPS). YOLO_FPS_CAP = float(_cfg.get("yolo_fps_cap", 2.0)) def _resolve_device(requested: str) -> tuple: """ Resolve the YOLO inference device. Both GPU and CPU are supported. On Jetson Orin NX with Qwen2.5-VL loaded, YOLO on "cuda" takes ~2 GiB of iGPU memory and forces Ollama into a 30/70 CPU/GPU split that crawls vision queries. Matching Marcus_v1's working architecture, the default is now "cpu" — YOLO gets ~1-3 FPS on Orin CPU which is plenty for "is there a person" queries, and Qwen keeps the whole iGPU. Set yolo_device="cuda" only if VLM is disabled (subsystems.vlm=false). Returns (device_str, use_half). Never raises for CPU; raises for a CUDA request only when CUDA is genuinely unavailable. """ req = (requested or "cpu").lower() if req == "cpu": # half-precision only makes sense on GPU; force fp32 on CPU return "cpu", False try: import torch except ImportError as e: raise RuntimeError( "[YOLO] PyTorch not installed — cannot run on CUDA. " "Either install CUDA-enabled torch, or set " "yolo_device='cpu' in Config/config_Vision.json." ) from e if not torch.cuda.is_available(): raise RuntimeError( "[YOLO] yolo_device='cuda' but torch.cuda.is_available()==False. " "Either fix CUDA (tegrastats, nvcc --version) or set " "yolo_device='cpu' in Config/config_Vision.json." ) dev = req if (req.startswith("cuda") or req == "0") else "cuda" return dev, YOLO_HALF # COCO classes to track (ignore everything else) TRACKED_CLASSES = { "person", "chair", "couch", "bed", "dining table", "bottle", "cup", "laptop", "keyboard", "mouse", "backpack", "handbag", "suitcase", "car", "truck", "motorcycle", "bicycle", "fire hydrant", "stop sign", } # PPE classes — active when custom model loaded PPE_VIOLATION_CLASSES = {"no-helmet", "no_helmet", "no-vest", "no_vest"} # ── Shared state ────────────────────────────────────────────────────────────── _detections_lock = threading.Lock() _latest_detections = [] # list of dicts _yolo_running = [False] # When True, the inference loop skips model forward passes. Used by the VLM # path: Qwen2.5-VL's vision encode needs ~1.5 GiB of iGPU activations and the # Jetson's 15 GiB is shared with YOLO + Holosoma, so concurrent inference # spikes the runner into OOM. Pausing YOLO for the ~1 s the VLM spends on an # image prevents that peak. Model weights stay resident (fast resume). _yolo_paused = [False] _yolo_fps = [0.0] # ── Detection class ─────────────────────────────────────────────────────────── class Detection: """Single YOLO detection result.""" def __init__(self, class_name, confidence, x1, y1, x2, y2, frame_w, frame_h): self.class_name = class_name self.confidence = confidence self.x1, self.y1 = x1, y1 self.x2, self.y2 = x2, y2 self.cx = (x1 + x2) // 2 self.cy = (y1 + y2) // 2 self.width = x2 - x1 self.height = y2 - y1 self.area = self.width * self.height self.frame_w = frame_w self.frame_h = frame_h @property def size_ratio(self) -> float: """Fraction of frame covered — larger = closer.""" return self.area / max(self.frame_w * self.frame_h, 1) @property def position(self) -> str: """left / center / right based on bbox center.""" third = self.frame_w // 3 if self.cx < third: return "left" elif self.cx > third * 2: return "right" return "center" @property def distance_estimate(self) -> str: """Rough distance from size ratio.""" r = self.size_ratio if r > 0.30: return "very close" if r > 0.10: return "close" if r > 0.03: return "medium" return "far" def to_dict(self) -> dict: return { "class": self.class_name, "confidence": round(self.confidence, 2), "position": self.position, "distance": self.distance_estimate, "size_ratio": round(self.size_ratio, 4), "bbox": [self.x1, self.y1, self.x2, self.y2], "center": [self.cx, self.cy], } def __repr__(self): return (f"Detection({self.class_name} {self.confidence:.0%} " f"@ {self.position} {self.distance_estimate})") # ── Public query API ────────────────────────────────────────────────────────── def yolo_sees(class_name: str, min_confidence: float = 0.45) -> bool: """ Check if YOLO currently detects a specific class. Args: class_name : COCO class e.g. "person", "chair", "bottle" min_confidence : minimum confidence threshold (default 0.45) Returns: True if detected, False otherwise. Example: if yolo_sees("person"): gradual_stop() """ with _detections_lock: return any( d.class_name.lower() == class_name.lower() and d.confidence >= min_confidence for d in _latest_detections ) def yolo_count(class_name: str) -> int: """ Return number of detected instances of class_name. Example: n = yolo_count("person") print(f"Detected {n} people") """ with _detections_lock: return sum( 1 for d in _latest_detections if d.class_name.lower() == class_name.lower() ) def yolo_closest(class_name: str = "person"): """ Return the closest Detection instance of class_name (largest bbox). Returns: Detection object or None. Example: p = yolo_closest("person") if p: print(p.position, p.distance_estimate) """ with _detections_lock: matches = [d for d in _latest_detections if d.class_name.lower() == class_name.lower()] if not matches: return None return max(matches, key=lambda d: d.size_ratio) def yolo_all_classes() -> set: """Return set of all currently detected class names.""" with _detections_lock: return {d.class_name for d in _latest_detections} def yolo_summary() -> str: """ Return human-readable summary of current detections. Returns: e.g. "2 persons (left, close) | 1 chair (center, medium)" """ with _detections_lock: dets = list(_latest_detections) if not dets: return "nothing detected" counts = defaultdict(list) for d in dets: counts[d.class_name].append(d) parts = [] for cls, items in counts.items(): n = len(items) name = f"{n} {cls}{'s' if n > 1 else ''}" locs = list(set(d.position for d in items)) dist = items[0].distance_estimate parts.append(f"{name} ({', '.join(locs)}, {dist})") return " | ".join(parts) def yolo_ppe_violations() -> list: """ Return list of detected PPE violations. Requires custom PPE model loaded (not default yolov8m). Returns: List of violation strings e.g. ["no helmet (left)", "no vest (center)"] """ violations = [] with _detections_lock: for d in _latest_detections: cls = d.class_name.lower() if cls in ("no-helmet", "no_helmet"): violations.append(f"no helmet ({d.position})") elif cls in ("no-vest", "no_vest"): violations.append(f"no vest ({d.position})") return violations def yolo_person_too_close(threshold: float = 0.25) -> bool: """ Return True if a person occupies more than threshold of the frame. Use for safety stop — person is dangerously close. Args: threshold : size_ratio above which = too close (default 0.25) """ p = yolo_closest("person") return p is not None and p.size_ratio > threshold def yolo_is_running() -> bool: """Return True if YOLO inference loop is active.""" return _yolo_running[0] def yolo_pause() -> None: """ Stop YOLO forward passes and release PyTorch's CUDA cache back to the driver so Ollama's vision encoder has contiguous iGPU memory to allocate into. Weights stay resident, so resume is instant. """ _yolo_paused[0] = True try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception: pass def yolo_resume() -> None: """Resume YOLO inference after a pause().""" _yolo_paused[0] = False def yolo_is_paused() -> bool: return _yolo_paused[0] def yolo_fps() -> float: """Return current YOLO inference FPS.""" return _yolo_fps[0] # ── YOLO inference loop ─────────────────────────────────────────────────────── def _inference_loop(model, is_ppe: bool, raw_frame_ref, frame_lock, device: str, use_half: bool): """Background inference loop. Reads frames, updates _latest_detections.""" frame_count = 0 t_fps = time.time() # Minimum wall-clock interval between inferences, in seconds. On CPU this # is the main throttle; on CUDA the model itself limits throughput. min_period = 1.0 / max(YOLO_FPS_CAP, 0.1) last_infer = 0.0 while _yolo_running[0]: if _yolo_paused[0]: time.sleep(0.03) continue # FPS cap — on CPU especially, we don't want YOLO to hammer the cores # that Holosoma's 50 Hz RL policy also needs. dt_since = time.time() - last_infer if dt_since < min_period: time.sleep(min(0.05, min_period - dt_since)) continue with frame_lock: frame = raw_frame_ref[0] if frame is None: time.sleep(0.05) continue last_infer = time.time() try: results = model( frame, imgsz=YOLO_IMG_SIZE, conf=YOLO_CONFIDENCE, iou=YOLO_IOU, device=device, half=use_half, verbose=False )[0] except Exception as e: _vlog(f"Inference error: {e}", "error") time.sleep(0.2) continue h, w = frame.shape[:2] dets = [] for box in results.boxes: cls_id = int(box.cls[0]) class_name = model.names[cls_id] confidence = float(box.conf[0]) if not is_ppe and class_name not in TRACKED_CLASSES: continue x1, y1, x2, y2 = map(int, box.xyxy[0]) dets.append(Detection(class_name, confidence, x1, y1, x2, y2, w, h)) with _detections_lock: _latest_detections.clear() _latest_detections.extend(dets) frame_count += 1 elapsed = time.time() - t_fps if elapsed >= 1.0: _yolo_fps[0] = round(frame_count / elapsed, 1) frame_count = 0 t_fps = time.time() time.sleep(0.02) # ── Camera loop for standalone mode ────────────────────────────────────────── def _camera_loop(raw_frame_ref, frame_lock, cam_alive): """Capture RealSense frames when running standalone.""" import pyrealsense2 as rs while cam_alive[0]: pipeline = None try: pipeline = rs.pipeline() cfg = rs.config() cfg.enable_stream(rs.stream.color, 424, 240, rs.format.bgr8, 15) pipeline.start(cfg) print("Camera connected ✅") while cam_alive[0]: frames = pipeline.wait_for_frames(timeout_ms=3000) frame = np.asanyarray(frames.get_color_frame().get_data()) with frame_lock: raw_frame_ref[0] = frame.copy() except Exception as e: print(f"Camera: {e} — reconnecting...") # pipeline may already be stopped or never started; swallow only # the expected RealSense "pipeline not started" error, not every # possible failure mode. try: pipeline.stop() except RuntimeError: pass time.sleep(2.0) # ── Start function — called by marcus_brain.py ──────────────────────────────── def start_yolo(raw_frame_ref=None, frame_lock=None): """ Start YOLO inference in background thread. Called automatically by marcus_brain.py during startup. Shares the camera frame reference from marcus_brain's camera thread. Args: raw_frame_ref : list[np.ndarray|None] — shared raw BGR frame frame_lock : threading.Lock protecting raw_frame_ref Example (in marcus_brain.py): from marcus_yolo import start_yolo, yolo_sees, yolo_summary start_yolo(raw_frame_ref=_raw_frame, frame_lock=_raw_lock) """ try: from ultralytics import YOLO except ImportError: _vlog("ultralytics not installed — pip install ultralytics", "error") return False _vlog(f"Loading model: {YOLO_MODEL_PATH}") try: model = YOLO(YOLO_MODEL_PATH) except Exception as e: _vlog(f"Failed to load model: {e}", "error") return False names = set(model.names.values()) is_ppe = bool(names & PPE_VIOLATION_CLASSES) device, use_half = _resolve_device(YOLO_DEVICE) # Move weights onto the target device once so inferences don't pay a # CPU→GPU copy every call. Ultralytics handles FP16 casting via the # `half=True` predict kwarg — don't call `.half()` on the inner module, # it conflicts with ultralytics' own input dtype preprocess. try: model.to(device) except Exception as e: _vlog(f"Could not move model to {device} ({e}) — continuing", "warn") gpu_info = "" if device != "cpu": try: import torch gpu_info = f" ({torch.cuda.get_device_name(0)})" except Exception: pass _vlog(f"Model loaded ✅ | device: {device}{gpu_info}" f"{' | FP16' if use_half else ''} | " f"{'PPE model' if is_ppe else f'{len(TRACKED_CLASSES & names)} tracked classes'}") _yolo_running[0] = True threading.Thread( target=_inference_loop, args=(model, is_ppe, raw_frame_ref, frame_lock, device, use_half), daemon=True ).start() return True # ══════════════════════════════════════════════════════════════════════════════ # STANDALONE MODE — run directly for testing # ══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": import pyrealsense2 as rs raw_frame_ref = [None] frame_lock = threading.Lock() cam_alive = [True] # Start camera threading.Thread( target=_camera_loop, args=(raw_frame_ref, frame_lock, cam_alive), daemon=True ).start() time.sleep(3.0) # Start YOLO ok = start_yolo(raw_frame_ref=raw_frame_ref, frame_lock=frame_lock) if not ok: print("YOLO failed to start. Exiting.") exit(1) time.sleep(2.0) print() print("╔══════════════════════════════════════════════╗") print("║ MARCUS VISION — YOLO ACTIVE ║") print("╠══════════════════════════════════════════════╣") print(f"║ Model : {YOLO_MODEL_PATH[-36:]:<36}║") print(f"║ Conf : {YOLO_CONFIDENCE:<36}║") _dev, _half = _resolve_device(YOLO_DEVICE) _dev_label = f"{_dev}{' FP16' if _half else ''}" print(f"║ Device: {_dev_label:<36}║") print("╠══════════════════════════════════════════════╣") print("║ what — describe scene ║") print("║ person — detect people ║") print("║ ppe — check PPE violations ║") print("║ count — count instances ║") print("║ closest — closest instance info ║") print("║ all — all detections ║") print("║ fps — inference speed ║") print("║ q — quit ║") print("╚══════════════════════════════════════════════╝") print() while True: try: cmd = input("Vision: ").strip().lower() except (EOFError, KeyboardInterrupt): break if not cmd: continue if cmd == "q": break elif cmd == "what": print(f" {yolo_summary()}") elif cmd == "person": n = yolo_count("person") if n == 0: print(" No person detected") else: for i, p in enumerate( [d for d in _latest_detections if d.class_name == "person"], 1 ): print(f" Person {i}: {p.position}, {p.distance_estimate} " f"({p.confidence:.0%})") elif cmd == "ppe": v = yolo_ppe_violations() print(f" {'No violations' if not v else chr(10).join(v)}") elif cmd.startswith("count "): cls = cmd[6:].strip() print(f" {yolo_count(cls)} {cls}(s) detected") elif cmd.startswith("closest "): cls = cmd[8:].strip() d = yolo_closest(cls) if d: print(f" Closest {cls}: {d.position}, {d.distance_estimate} " f"({d.confidence:.0%})") else: print(f" No {cls} detected") elif cmd == "all": with _detections_lock: dets = list(_latest_detections) if not dets: print(" Nothing detected") else: for d in dets: print(f" {d}") elif cmd == "fps": print(f" {yolo_fps():.1f} fps") else: print(f" Unknown: {cmd}") cam_alive[0] = False _yolo_running[0] = False print("Marcus Vision stopped.")