# Marcus — Full API & Developer Reference **Project:** Marcus | YS Lootah Technology | Jetson Orin NX + G1 EDU **Robot persona:** Sanad (wake word + self-intro; project code stays under `Marcus/`) **Entry points:** `run_marcus.py` (terminal) / `Server/marcus_server.py` (WebSocket) **Updated:** 2026-04-21 > **What changed since the early draft (April 4):** The project was restructured > from two monolithic scripts (`marcus_llava.py` + `marcus_yolo.py`) into a > layered architecture. See `Doc/architecture.md` for the current file tree, > `Doc/environment.md` for the verified Jetson software stack, `Doc/pipeline.md` > for end-to-end dataflow, and **`Doc/functions.md` for the authoritative > function inventory** (always generated from AST — treat it as the source of > truth for signatures). This reference describes the semantics (usage, JSON > schemas, examples); cross-check `functions.md` for exact signatures. Recent > deltas called out inline below. ### Recent API deltas (2026-04-21) | Change | Location | Note | |---|---|---| | GPU is mandatory for YOLO | `Config/config_Vision.json`, `Vision/marcus_yolo.py` | `yolo_device` defaults to `"cuda"` and is enforced; `_resolve_device()` raises `RuntimeError` on missing CUDA. `yolo_half=true` runs FP16 on Orin (capability 8.7). | | Ollama model | `Config/config_Brain.json` | Default `ollama_model` is `qwen2.5vl:3b` (not `llava:7b`). | | Ollama compute-graph caps | `Config/config_Brain.json` | `num_batch=128`, `num_ctx=2048` — required on 16 GB Orin NX to prevent the llama runner OOM. Propagated by `API/llava_api.py` and `Vision/marcus_imgsearch.py` to every `ollama.chat` call. | | `num_predict_main` lowered | `Config/config_Brain.json` | 200 → 120 (shaves ~400–600 ms per open-ended command; JSON still parses). | | ZMQ bind moved out of import | `API/zmq_api.py` | `init_zmq()` must be called from the main process before any `send_vel/send_cmd`. `init_brain()` does this. Children spawned via `multiprocessing` no longer collide on port 5556. | | Camera-retry poll | `Brain/marcus_brain.py::_handle_llava` | Replaced `time.sleep(1.0)` with 10×50 ms polls. | | Conditional scan sleeps | `Navigation/goal_nav.py`, `Autonomous/marcus_autonomous.py` | Removed unconditional per-step naps when real work (YOLO hit, LLaVA call, forward move) already consumed wall time. | | Image-search step delay | `Vision/marcus_imgsearch.py` | `STEP_DELAY` 0.4 s → 0.15 s. | | Built-in G1 microphone | `Voice/builtin_mic.py` (new), `API/audio_api.py`, `Config/config_Voice.json` | Mic now reads from UDP multicast `239.168.123.161:5555` (G1 on-board array mic) instead of the Hollyland USB. Config key `mic.backend` defaults to `"builtin_udp"`; set to `"pactl_parec"` to fall back to the old path. | | Built-in G1 TTS | `Voice/builtin_tts.py` (new), `API/audio_api.py` | `AudioAPI.speak(text)` now calls `client.TtsMaker(text, speaker_id)` directly. No MP3/WAV plumbing, no internet, no edge-tts/Piper. English only — `speak()` refuses non-ASCII to avoid the G1's silent Arabic→Chinese fallback. | | Voice stack — Gemini Live STT + TtsMaker hybrid | `Voice/audio_io.py`, `Voice/gemini_script.py`, `Voice/turn_recorder.py`, `Voice/marcus_voice.py` | Sanad-pattern port: `AudioIO.from_profile("builtin", audio_client=ac)` builds the G1 mic + speaker; `GeminiBrain` runs Gemini Live `response_modalities=["TEXT"]` in a worker thread; `_dispatch_gemini_command` gates each transcript on the wake word "Sanad" + fuzzy match against `command_vocab` then forwards to the brain. The brain's reply is spoken by `AudioAPI.speak()` via on-robot TtsMaker — Gemini never speaks. Earlier iterations (faster-whisper / wake_detector / Vosk / Moonshine / full S2S) all removed. Cloud dep: env `MARCUS_GEMINI_API_KEY`. | | Subsystem flags | `Config/config_Brain.json::subsystems.{lidar, voice, imgsearch, autonomous}` | `init_brain()` skips any subsystem with `false`. Defaults: lidar+voice+autonomous ON, imgsearch OFF. | | Robot persona → Sanad | Multiple | Wake words `["sanad","sannad","sanat","sunnat"]`; all prompts say "You are Sanad"; banner reads `SANAD AI BRAIN — READY`; hardcoded self-intro says "I am Sanad". Project/file/module names unchanged. | | Logger rename | `Core/log_backend.py` (was `Core/Logger.py`) | Case-only collision with `Core/logger.py` removed — repo now clones cleanly on macOS/Windows. Public API unchanged: `from Core.logger import log`. | | Log rotation everywhere | `Core/log_backend.py`, `API/audio_api.py`, `Voice/marcus_voice.py` | All `FileHandler`s swapped for `RotatingFileHandler` (5 MB × 3 backups, tunable via `MARCUS_LOG_MAX_BYTES` / `MARCUS_LOG_BACKUP_COUNT`). Prevents unbounded log growth on the Jetson. `default_logs_dir` pinned to lowercase `logs/`. | | English-only policy | `Brain/marcus_brain.py`, `Config/marcus_prompts.yaml`, `Config/config_Voice.json` | Arabic talk-pattern and greeting regexes removed; 5.8 KB of Arabic prompt examples stripped from `marcus_prompts.yaml`; Arabic wake words removed from config. `AudioAPI.speak(text, lang='en')` — only `'en'` accepted; non-ASCII is rejected. | | Dead-code + orphan sweep | `Legacy/marcus_nav.py`, `Config/config_Memory.json` | Deleted. Config count 13 → 12 JSON + 1 YAML. | | Orphan config keys wired up | `Vision/marcus_imgsearch.py`, `Voice/builtin_mic.py`, `API/camera_api.py`, `Navigation/marcus_odometry.py` | `config_ImageSearch.json` (4 keys), `config_Voice.mic_udp.read_timeout_sec`, `config_Camera.{timeout_ms, stale_threshold_s, reconnect_delay_s}`, `config_Odometry.json` (10 keys) are all read by code now. **0 orphan keys across 156 total.** | | Subprocess leak fix | `API/audio_api.py::_record_parec` | `Popen` now wrapped in try/finally; orphan `parec` processes can't survive Ctrl-C/exceptions. Last-resort `proc.kill()` catches only `OSError`. | --- ## Table of Contents 1. [Configuration Variables](#1-configuration-variables) 2. [ZMQ — Holosoma Communication](#2-zmq--holosoma-communication) 3. [Camera Functions](#3-camera-functions) 4. [YOLO Vision Module](#4-yolo-vision-module) 5. [LLaVA AI Functions](#5-llava-ai-functions) 6. [Arm SDK](#6-arm-sdk) 7. [Movement Functions](#7-movement-functions) 8. [Prompt Engineering](#8-prompt-engineering) 9. [Goal Navigation](#9-goal-navigation) 10. [Autonomous Patrol](#10-autonomous-patrol) 11. [Main Loop](#11-main-loop) 12. [JSON Schema Reference](#12-json-schema-reference) 13. [Environment & Paths](#13-environment--paths) 14. [Quick Reference Card](#14-quick-reference-card) 15. [Voice API (mic + TTS + STT)](#15-voice-api-mic--tts--stt) --- ## 1. Configuration Variables All configuration is now **JSON-driven** and lives under `Config/`. Each module loads its config at startup via `Core.config_loader.load_config(name)`. **`Config/config_ZMQ.json`** (Holosoma bridge) | Key | Default | Description | |---|---|---| | `zmq_host` | `"127.0.0.1"` | Holosoma ZMQ host | | `zmq_port` | `5556` | Holosoma ZMQ port | | `stop_iterations` | `20` | `gradual_stop()` message count | | `stop_delay` | `0.05` | seconds between stop messages | | `step_pause` | `0.3` | pause between consecutive action steps | **`Config/config_Brain.json`** (Ollama VL model) | Key | Default | Description | |---|---|---| | `ollama_model` | `"qwen2.5vl:3b"` | Ollama model tag | | `max_history` | `6` | conversation turns retained | | `num_batch` | `128` | llama.cpp batch — **cap, required for Jetson** | | `num_ctx` | `2048` | llama.cpp KV context length — **cap, required for Jetson** | | `num_predict_main` | `120` | max tokens for the main command path | | `num_predict_goal` | `80` | goal-navigation call | | `num_predict_patrol` | `100` | autonomous patrol call | | `num_predict_talk` | `80` | talk-only path | | `num_predict_verify` | `10` | YOLO condition verifier (`yes`/`no`) | **`Config/config_Vision.json`** (YOLO) | Key | Default | Description | |---|---|---| | `yolo_model_path` | `"Models/yolov8m.pt"` | weights file (auto-fetched if missing) | | `yolo_confidence` | `0.45` | detection confidence threshold | | `yolo_iou` | `0.45` | NMS IOU threshold | | `yolo_device` | `"cuda"` | **GPU required** — `"cpu"` raises `RuntimeError` | | `yolo_half` | `true` | FP16 inference (Ampere tensor cores) | | `yolo_img_size` | `320` | inference image size | | `tracked_classes` | 19 COCO classes | filter for relevant detections | **`Config/config_Camera.json`**: `424x240 @ 15 fps`, `JPEG quality 70`. **`Config/config_Voice.json`**: see section 6 below. **`Config/config_Network.json`**: Jetson eth0/wlan0 IPs, WebSocket port. --- ## 2. ZMQ — Holosoma Communication ### Setup The bind is no longer an import-time side effect. It runs inside `init_zmq()`, called once by `init_brain()` from the main process. Children (e.g. the LiDAR SLAM worker spawned via `multiprocessing.spawn`) can re-import `API.zmq_api` without rebinding. ```python # API/zmq_api.py — bind happens here, not at module import def init_zmq() -> zmq.Socket: global ctx, sock if sock is not None: return sock # idempotent ctx = zmq.Context() sock = ctx.socket(zmq.PUB) sock.bind(f"tcp://{ZMQ_HOST}:{ZMQ_PORT}") time.sleep(0.5) # let SUBs attach return sock ``` ### `send_vel(vx, vy, vyaw)` Send velocity command to Holosoma. Raises `RuntimeError` if `init_zmq()` wasn't called. ```python def send_vel(vx: float = 0.0, vy: float = 0.0, vyaw: float = 0.0): _ensure_sock().send_string(json.dumps({"vel": {"vx": vx, "vy": vy, "vyaw": vyaw}})) ``` | Parameter | Unit | Safe range | Effect | |-----------|------|-----------|--------| | `vx` | m/s | -0.2 to 0.4 | Forward (+) / Backward (-) | | `vy` | m/s | -0.2 to 0.2 | Lateral | | `vyaw` | rad/s | -0.3 to 0.3 | Turn left (+) / right (-) | ```python send_vel(vx=0.3) # walk forward send_vel(vx=-0.2) # walk backward send_vel(vyaw=0.3) # turn left send_vel(vyaw=-0.3) # turn right send_vel(0, 0, 0) # zero velocity (use gradual_stop() instead) ``` ### `gradual_stop()` Smooth deceleration to zero over ~1 second. ```python def gradual_stop(): for _ in range(STOP_ITERATIONS): # 20 iterations send_vel(0.0, 0.0, 0.0) time.sleep(STOP_DELAY) # 0.05s each = 1s total ``` **Always use this instead of a single zero-velocity message.** ZMQ PUB/SUB can drop messages — 20 guarantees delivery. ### `send_cmd(cmd)` ```python def send_cmd(cmd: str): sock.send_string(json.dumps({"cmd": cmd})) ``` | Command | Effect | |---------|--------| | `"start"` | Activate policy | | `"walk"` | Switch to walking mode | | `"stand"` | Return to standing | | `"stop"` | Deactivate (only after gradual_stop) | **Startup sequence:** ```python send_cmd("start"); time.sleep(0.5) send_cmd("walk"); time.sleep(0.5) # Now ready for velocity commands ``` --- ## 3. Camera Functions ### Architecture Two consumers share one camera feed: - `latest_frame_b64[0]` — base64 JPEG for LLaVA - `_raw_frame[0]` — raw BGR numpy array for YOLO Both protected by separate locks (`camera_lock`, `_raw_lock`). ### `camera_loop()` Background thread — auto-reconnects on USB drops. ```python def camera_loop(): while camera_alive[0]: pipeline = rs.pipeline() cfg.enable_stream(rs.stream.color, 424, 240, rs.format.bgr8, 15) pipeline.start(cfg) while camera_alive[0]: frames = pipeline.wait_for_frames(timeout_ms=3000) frame = np.asanyarray(...) with _raw_lock: _raw_frame[0] = frame.copy() # → YOLO with camera_lock: latest_frame_b64[0] = encode_jpeg(frame) # → LLaVA ``` ### `get_frame()` Returns latest base64 JPEG for LLaVA. ```python def get_frame(): with camera_lock: return latest_frame_b64[0] # None if not ready ``` **Camera specs:** | Property | Value | |----------|-------| | Device | RealSense D435I (serial: 243622073459) | | Capture | 424×240 @ 15fps | | Format | BGR8 | | Encoding | JPEG quality 70, base64 UTF-8 | | Why 424×240 | Reduces USB bandwidth drops during Ollama GPU inference | --- ## 4. YOLO Vision Module ### Import (in marcus_llava.py) ```python from marcus_yolo import ( start_yolo, yolo_sees, yolo_count, yolo_closest, yolo_summary, yolo_ppe_violations, yolo_person_too_close, yolo_all_classes, yolo_fps, ) # Start YOLO sharing the camera frame YOLO_AVAILABLE = start_yolo(raw_frame_ref=_raw_frame, frame_lock=_raw_lock) ``` ### `start_yolo(raw_frame_ref, frame_lock)` Loads YOLO model and starts inference background thread. ```python def start_yolo(raw_frame_ref=None, frame_lock=None) -> bool: ``` Returns `True` on success, `False` if model fails to load. ### `yolo_sees(class_name, min_confidence)` ```python yolo_sees("person") # True if person detected yolo_sees("chair", 0.6) # True with stricter confidence ``` Returns `bool`. Instant — no LLaVA call. ### `yolo_count(class_name)` ```python n = yolo_count("person") # 0, 1, 2... ``` ### `yolo_closest(class_name)` Returns the `Detection` object with the largest bounding box (closest to robot). ```python p = yolo_closest("person") if p: print(p.position) # "left" / "center" / "right" print(p.distance_estimate) # "very close" / "close" / "medium" / "far" print(p.confidence) # 0.0 to 1.0 print(p.size_ratio) # fraction of frame area ``` ### `yolo_summary()` ```python yolo_summary() # → "1 person (center, close) | 2 chairs (right, medium) | 1 laptop (left, far)" ``` ### `yolo_ppe_violations()` ```python violations = yolo_ppe_violations() # → ["no helmet (left)", "no vest (center)"] # Requires custom PPE model — returns [] with yolov8m.pt ``` ### `yolo_person_too_close(threshold)` ```python if yolo_person_too_close(threshold=0.25): gradual_stop() # person covers >25% of frame ``` ### `yolo_all_classes()` ```python classes = yolo_all_classes() # → {"person", "chair", "laptop"} ``` ### `yolo_fps()` ```python print(f"{yolo_fps():.1f}fps") # e.g. 4.4fps on CPU ``` ### Detection class properties | Property | Type | Description | |----------|------|-------------| | `class_name` | str | e.g. "person" | | `confidence` | float | 0.0 to 1.0 | | `position` | str | "left" / "center" / "right" | | `distance_estimate` | str | "very close" / "close" / "medium" / "far" | | `size_ratio` | float | bbox area / frame area | | `cx`, `cy` | int | bbox center coordinates | | `x1, y1, x2, y2` | int | bounding box corners | --- ## 5. LLaVA AI Functions ### `ask(command, img_b64)` Main command processor. ```python def ask(command: str, img_b64) -> dict: ``` | Parameter | Description | |-----------|-------------| | `command` | Natural language command | | `img_b64` | Base64 JPEG camera frame | Returns dict with `actions`, `arm`, `speak`, `abort`. **Options:** ```python options={"temperature": 0.0, "num_predict": 200} ``` **Response time:** 4-8s (14s first call warmup) ### `ask_goal(goal, img_b64)` Used in goal navigation loop. ```python def ask_goal(goal: str, img_b64) -> dict: ``` Returns: `reached` (bool), `next_move` (str), `duration` (float), `speak` (str) ### `ask_patrol(img_b64)` Used in autonomous patrol. Returns: `observation` (str), `alert` (str|None), `next_move` (str), `duration` (float) ### `_call_llava(prompt, img_b64, num_predict)` Internal helper — sends to Ollama API. ```python r = ollama.chat( model="llava:7b", messages=[{"role": "user", "content": prompt, "images": [img_b64]}], options={"temperature": 0.0, "num_predict": 200} ) ``` ### `_parse_json(raw)` Extracts JSON from LLaVA response. Strips markdown fences automatically. ```python raw = '```json\n{"move": "left"}\n```' d = _parse_json(raw) # → {"move": "left"} ``` --- ## 6. Arm SDK **Class:** `G1ArmActionClient` (from `unitree_sdk2py.g1.arm.g1_arm_action_client`) **Method:** `ExecuteAction(action_id: int) -> int` (returns 0 on success) ### `do_arm(action)` ```python def do_arm(action): # action: str name or int ID ``` ### Action ID Map | Friendly name | Action ID | Description | |---------------|-----------|-------------| | `wave` | 26 | High wave | | `raise_right` | 23 | Right hand up | | `raise_left` | 15 | Both hands up | | `both_up` | 15 | Both hands up | | `clap` | 17 | Clap hands | | `high_five` | 18 | High five | | `hug` | 19 | Hug pose | | `heart` | 20 | Heart shape | | `right_heart` | 21 | Right hand heart | | `reject` | 22 | Reject gesture | | `shake_hand` | 27 | Shake hand | | `face_wave` | 25 | Wave at face level | | `lower` | 99 | Release to default | ### Notes - Runs in background thread — does not block movement - Error 7404 = robot was moving during arm command — always `gradual_stop()` first - `ALL_ARM_NAMES` set intercepts arm words that LLaVA puts in `actions` list --- ## 7. Movement Functions ### `execute_action(move, duration)` Executes a single movement step. ```python def execute_action(move: str, duration: float): ``` - Intercepts arm names → routes to `do_arm()` - Calls `gradual_stop()` after each step - Waits `STEP_PAUSE` (0.3s) between steps ### `_merge_actions(actions)` Merges consecutive same-direction steps into one smooth movement. ```python # LLaVA returns: [{"move":"right","duration":1.0}, {"move":"right","duration":1.0}, {"move":"right","duration":1.0}, {"move":"right","duration":1.0}, {"move":"right","duration":1.0}] # _merge_actions produces: [{"move":"right","duration":5.0}] # one smooth 5-second rotation ``` ### `execute(d)` Runs full LLaVA decision. ```python def execute(d: dict): # 1. Check abort # 2. _merge_actions() — smooth consecutive steps # 3. execute_action() for each step in order # 4. do_arm() in background thread ``` ### `_move_step(move, duration)` Lightweight step for goal/patrol loops — no full `gradual_stop()` between checks. ```python def _move_step(move: str, duration: float): # send velocity for duration seconds # single zero-vel + 0.1s pause — then immediately check YOLO again ``` ### MOVE_MAP ```python MOVE_MAP = { "forward": ( 0.3, 0.0, 0.0), # vx m/s "backward": (-0.2, 0.0, 0.0), "left": ( 0.0, 0.0, 0.3), # vyaw rad/s "right": ( 0.0, 0.0, -0.3), } ``` --- ## 8. Prompt Engineering ### MAIN_PROMPT Controls LLaVA's response format for all standard commands. Key rules embedded in prompt: - `actions` is a list — one entry per step - `arm` is never a move value - `"90 degrees"` = 5.0s duration - `"1 step"` = 1.0s duration **To add arm examples or change behavior — edit MAIN_PROMPT examples section.** ### GOAL_PROMPT Used inside `navigate_to_goal()` as LLaVA fallback. Forces `{"reached": bool, "next_move": str, "duration": float, "speak": str}`. ### PATROL_PROMPT Used inside `patrol()` for scene assessment. Forces `{"observation": str, "alert": str|null, "next_move": str, "duration": float}`. --- ## 9. Goal Navigation ### `navigate_to_goal(goal, max_steps)` ```python def navigate_to_goal(goal: str, max_steps: int = 40): ``` **Flow:** 1. Extract YOLO target from goal text (`_goal_yolo_target()`) 2. Move left 0.4s (lightweight step) 3. After `MIN_STEPS_BEFORE_CHECK` (3) steps — check YOLO every step 4. If `yolo_sees(target)` → `gradual_stop()` → print result → return 5. Falls back to LLaVA if class not in YOLO set **Why minimum steps?** Prevents false stop from stale camera frame when robot hasn't moved yet. ### YOLO class aliases in goals ```python _GOAL_ALIASES = { "guy": "person", "man": "person", "woman": "person", "human": "person", "people": "person", "someone": "person", "table": "dining table", "sofa": "couch", } ``` ### Examples ```python navigate_to_goal("stop when you see a person") navigate_to_goal("keep turning left until you see a guy") navigate_to_goal("find a chair and stop in front of it") navigate_to_goal("stop when you are close to the laptop") navigate_to_goal("stop at the end of the corridor") # LLaVA fallback ``` --- ## 10. Autonomous Patrol ### `patrol(duration_minutes, alert_callback)` ```python def patrol(duration_minutes: float = 5.0, alert_callback=None): ``` **Each patrol step:** 1. YOLO PPE violations check (instant) 2. `yolo_person_too_close()` safety check — pauses if True 3. LLaVA scene assessment → navigation decision 4. `_move_step()` to next position **Custom alert handler:** ```python def my_alert(text: str): print(f"SECURITY: {text}") # send notification, sound alarm, etc. patrol(duration_minutes=10.0, alert_callback=my_alert) ``` --- ## 11. Main Loop ```python while True: cmd = input("Command: ").strip() if cmd.lower() in ("q", "quit", "exit"): break # YOLO query — never sent to LLaVA if any(w in cmd.lower() for w in ("yolo", "are you using yolo", "vision")): print(f" YOLO: {yolo_summary()} | {yolo_fps():.1f}fps") continue # Goal navigation if cmd.lower().startswith("goal:"): navigate_to_goal(cmd[5:].strip()) continue # Patrol if cmd.lower() == "patrol": patrol(duration_minutes=...) continue # Standard LLaVA command img = get_frame() d = ask(cmd, img) execute(d) ``` --- ## 12. JSON Schema Reference ### Standard command response ```json { "actions": [ {"move": "forward|backward|left|right|stop", "duration": 2.0}, {"move": "right", "duration": 2.0} ], "arm": "wave|raise_right|raise_left|clap|high_five|hug|heart|shake_hand|face_wave|null", "speak": "What Marcus says out loud", "abort": null } ``` ### Goal navigation response ```json { "reached": false, "next_move": "left", "duration": 0.5, "speak": "I see boxes but no person yet" } ``` ### Patrol assessment response ```json { "observation": "I see a person working at a desk", "alert": null, "next_move": "forward", "duration": 1.0 } ``` ### Field definitions | Field | Type | Values | |-------|------|--------| | `move` | str\|null | "forward", "backward", "left", "right", "stop", null | | `duration` | float | seconds (max 5.0 per step) | | `arm` | str\|null | action name or null | | `speak` | str | one sentence | | `abort` | str\|null | reason string or null | | `reached` | bool | true only if goal visually confirmed | --- ## 13. Environment & Paths ### Conda environments | Env | Python | Location | Purpose | |-----|--------|----------|---------| | `marcus` | 3.8 | `/home/unitree/miniconda3/envs/marcus` | Marcus brain + YOLO | | `hsinference` | 3.10 | `~/.holosoma_deps/miniconda3/envs/hsinference` | Holosoma policy | **Always use full path:** ```bash /home/unitree/miniconda3/envs/marcus/bin/python3 ~/Models_marcus/marcus_llava.py ``` ### Key file paths | File | Path | |------|------| | Marcus brain | `~/Models_marcus/marcus_llava.py` | | YOLO module | `~/Models_marcus/marcus_yolo.py` | | YOLO model | `~/Models_marcus/Model/yolov8m.pt` | | Loco model | `~/holosoma/.../models/loco/g1_29dof/fastsac_g1_29dof.onnx` | | LLaVA weights | `~/.ollama/models/` | | Arm SDK | `~/unitree_sdk2_python/` | ### Python imports ```python import ollama # LLaVA via Ollama import zmq # Holosoma communication import json, time, base64, threading, sys, io import numpy as np import pyrealsense2 as rs from PIL import Image from marcus_yolo import start_yolo, yolo_sees, yolo_summary # YOLO from unitree_sdk2py.g1.arm.g1_arm_action_client import G1ArmActionClient # Arm ``` --- ## 14. Quick Reference Card ``` STARTUP: Tab 1 (hsinference env): Holosoma locomotion policy python3 run_policy.py inference:g1-29dof-loco \ --task.velocity-input zmq --task.state-input zmq --task.interface eth0 Tab 2: ollama serve > /tmp/ollama.log 2>&1 & sleep 3 Tab 3 (marcus env): conda activate marcus && cd ~/Marcus && python3 run_marcus.py (YOLO + voice + LiDAR all start automatically per subsystems flags) WAKE WORD: "Sanad" COMMANDS: walk forward · turn right · turn left · move back turn right 90 degrees · turn left 3 steps what do you see · inspect the office wave · raise your right arm · clap · high five goal: stop when you see a person goal: keep turning left until you see a guy patrol are you using yolo q VELOCITIES: forward vx=+0.3 m/s backward vx=-0.2 m/s left vyaw=+0.3 right vyaw=-0.3 KEY FUNCTIONS: send_vel(vx, vy, vyaw) gradual_stop() send_cmd(str) get_frame() → b64 ask(cmd, img) → dict execute(dict) yolo_sees("person") yolo_summary() yolo_closest("person") navigate_to_goal(goal) patrol(minutes) do_arm("wave") ARM IDs: wave=26 raise_right=23 raise_left=15 clap=17 high_five=18 hug=19 heart=20 reject=22 shake_hand=27 SAFETY: gradual_stop() — always — never cut velocity abruptly Never send_cmd("stop") while moving camera_alive[0] = False — stops camera thread on exit Error 7404 — robot was moving during arm command — stop first ``` --- ## 15. Voice API (mic + Gemini Live STT + TtsMaker) Current pipeline: G1 mic → Gemini Live (`response_modalities=["TEXT"]`) → input_transcription → wake-word gate + fuzzy match → brain → on-robot TtsMaker reply. Sanad-pattern port; only cloud dependency is the Gemini API key. Replaces all prior local-STT attempts (Whisper / Moonshine / Vosk / wake_detector). The full Sanad-style speech-to-speech mode (Gemini speaks back) was tested and removed — TtsMaker as the single voice owner avoids the audio-collision class. ### Mic + Speaker bundle — `Voice.audio_io.AudioIO` Sanad-pattern factory. `BuiltinMic` joins the G1's UDP multicast audio (16 kHz mono int16). `BuiltinSpeaker` wraps `AudioClient.PlayStream` with 24→16 kHz resampling (built but idle in STT-only mode; TtsMaker owns the speaker via a separate firmware API). ```python from Voice.audio_io import AudioIO audio = AudioIO.from_profile("builtin", audio_client=ac) audio.start() try: pcm = audio.mic.read_chunk(1024) # 512 samples, ~32 ms audio.mic.flush() finally: audio.stop() ``` Config under `config_Voice.json::{mic_udp, speaker}`. ### Mic shim — `Voice.builtin_mic.BuiltinMic` Backward-compat shim. Subclasses `audio_io.BuiltinMic` and adds `read_seconds(s)` for `AudioAPI.record()`. Old imports of `from Voice.builtin_mic import BuiltinMic` keep working. ### TTS — `Voice.builtin_tts.BuiltinTTS` Wrapper around `unitree_sdk2py.g1.audio.g1_audio_client.AudioClient.TtsMaker`. English only — refuses non-ASCII input. ```python from Voice.builtin_tts import BuiltinTTS tts = BuiltinTTS(audio_client, default_speaker_id=0) tts.speak("Hello, I am Sanad", block=True) ``` Used by `AudioAPI.speak(text)` internally; application code should call `audio_api.speak(...)` rather than BuiltinTTS directly. ### Gemini Live STT — `Voice.gemini_script.GeminiBrain` Direct port of Sanad's `gemini/script.py`, configured with `response_modalities=["TEXT"]` so Gemini transcribes but never speaks. Reconnect-safe: 660 s session timeout, exponential backoff cap 30 s, client recreated after 10 consecutive errors. Runs an asyncio loop inside a worker thread; sync `start()/stop()` wrappers. ```python from Voice.audio_io import AudioIO from Voice.turn_recorder import TurnRecorder from Voice.gemini_script import GeminiBrain audio = AudioIO.from_profile("builtin", audio_client=ac) audio.start() rec = TurnRecorder(enabled=True, out_dir="Data/Voice/Recordings/gemini_turns") def on_transcript(text): print("USER:", text) def on_command(text, lang): print("dispatch:", text) brain = GeminiBrain( audio, rec, voice_name="Charon", system_prompt="...transcriber-role prompt...", api_key=os.environ["MARCUS_GEMINI_API_KEY"], on_transcript=on_transcript, on_command=on_command, ) brain.start() # ... later ... brain.stop() audio.stop() ``` Config under `config_Voice.json::stt.gemini_*` — model, voice, VAD sensitivity, session lifecycle, persona, recording. ### Per-turn recorder — `Voice.turn_recorder.TurnRecorder` Saves `_user.wav` per turn plus an `index.json` entry with both transcripts. In STT-only mode, no `_robot.wav` is written (Gemini emits text, not audio). ```python from Voice.turn_recorder import TurnRecorder rec = TurnRecorder(enabled=True, out_dir="Data/Voice/Recordings/gemini_turns", user_rate=16000, robot_rate=24000) rec.capture_user(pcm_bytes) rec.add_user_text("Sanad, turn right") rec.add_robot_text("Turning right") # Gemini's text reply (recorded for review, not spoken) rec.finish_turn() # → 20260425_120000_user.wav + index.json append ``` ### Voice orchestrator — `Voice.marcus_voice.VoiceModule` Drives the full pipeline: builds AudioIO + TurnRecorder + GeminiBrain, gates each transcript on the wake word "Sanad", strips it, fuzzy-matches against `command_vocab`, dedups partial transcripts within `command_cooldown_sec`, then forwards the cleaned text to the user-supplied `on_command` callback. ```python from API.audio_api import AudioAPI from Voice.marcus_voice import VoiceModule def on_command(text, lang): print(f"heard: {text}") # return or call audio_api.speak(reply); flush_mic() is automatic in marcus_brain audio = AudioAPI() voice = VoiceModule(audio, on_command=on_command) voice.start() # ... later ... voice.stop() ``` Vocabulary (`wake_words`, `command_vocab`, `garbage_patterns`) is loaded from `config_Voice.json::stt.*` at `VoiceModule.__init__`. All Gemini tunables (model, VAD, session timeouts, persona) live in the same config — no Python edits required. See `Doc/controlling.md` → "Voice" for the tuning-knobs cheat sheet. `flush_mic()` is a public hook that `Brain/marcus_brain._on_command` calls before AND after `audio_api.speak(reply)` so TtsMaker output isn't transcribed back into Gemini as a fake user utterance. The brain's `init_voice()` wires `on_command` to `process_command(text)` → `flush_mic()` → `audio_api.speak(reply)` → `flush_mic()`. ### AudioAPI — `API.audio_api.AudioAPI` Orchestration layer. Owns the `AudioClient`, manages mute/unmute, exposes a clean `speak` + `record` API. ```python from API.audio_api import AudioAPI audio = AudioAPI() audio.speak("Hello") # English only; non-ASCII returns early pcm = audio.record(seconds=5) # int16 mono 16 kHz — uses BuiltinMic audio.play_pcm(pcm) # raw PCM playback via Unitree RPC ``` Config: `config_Voice.json::tts.backend = "builtin_ttsmaker"`, `mic.backend = "builtin_udp"` (or `"pactl_parec"` to fall back to Hollyland). --- *Marcus — YS Lootah Technology | Kassam | April 2026*