Marcus/Brain/marcus_brain.py

768 lines
34 KiB
Python

"""
marcus_brain.py — Marcus AI Brain Orchestrator
================================================
Shared brain logic for both terminal (run_marcus.py) and server (marcus_server.py).
Usage:
Terminal: python3 run_marcus.py
Server: python3 -m Server.marcus_server (imports init_brain + process_command)
"""
import json
import os
import re
import time
import sys
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_DIR not in sys.path:
sys.path.insert(0, PROJECT_DIR)
from API.zmq_api import init_zmq, send_vel, gradual_stop, send_cmd
from API.camera_api import start_camera, stop_camera, get_frame
from API.yolo_api import (
init_yolo, yolo_summary, yolo_fps,
yolo_all_classes, yolo_closest, yolo_sees,
)
from API.odometry_api import init_odometry
from API.memory_api import init_memory, log_cmd, log_detection
from API.llava_api import (
OLLAMA_MODEL, ask, ask_talk, call_llava, parse_json, add_to_history,
)
from API.imgsearch_api import init_imgsearch, get_searcher
from Core.config_loader import load_config
from Core.logger import log as _log
from Brain.command_parser import try_local_command, init_autonomous
from Brain.executor import execute, execute_action
from Navigation.goal_nav import navigate_to_goal
from Navigation.patrol import patrol
from Autonomous.marcus_autonomous import AutonomousMode
_cfg = load_config("Brain")
_TALK_PATTERNS = [
# Questions
r"^(?:what|who|where|when|how|why|is|are|do|does|can|tell|describe|explain|show|analyze)\s+",
# Identity / facts told to the robot
r"^(?:my name is|i am|call me|that is|that person|note that|remember that)\s+",
# Acknowledgements
r"^(?:ok|okay|yes|no|good|nice|great|thanks|thank you|got it|understood|correct)\s*[!.]*$",
]
_NAT_GOAL_RE = re.compile(
r'^(?:keep\s+(?:turn|rotat|spin)\w*\s+\w+\s+until\s+(?:you\s+)?(?:see|find|spot)\s+.+'
r'|stop\s+when\s+(?:you\s+)?(?:see|find|spot)\s+.+'
r'|find\s+(?:a\s+|the\s+|me\s+a\s+)?\w.+'
r'|look\s+for\s+(?:a\s+|the\s+)?\w.+'
r'|search\s+for\s+(?:a\s+|the\s+)?\w.+)$',
re.IGNORECASE
)
# ══════════════════════════════════════════════════════════════════════════════
# INIT — called once by both run_marcus.py and marcus_server.py
# ══════════════════════════════════════════════════════════════════════════════
def init_brain():
"""Initialize all subsystems. Call once at startup from the parent process.
Optional subsystems (lidar / voice / imgsearch / autonomous) are gated on
`config_Brain.json::subsystems.<name>`. Disabling the ones you don't need
brings Marcus's boot time down from ~18 s to ~5-7 s.
"""
subsys = _cfg.get("subsystems", {}) or {}
# Bind the ZMQ PUB socket before anything tries to publish on it.
# This is now explicit (previously it happened as an import side effect,
# which crashed every multiprocessing child that re-imported zmq_api).
init_zmq()
raw_frame, raw_lock = start_camera()
# YOLO is optional on the Jetson: with Qwen2.5-VL loaded, YOLO's ~2 GiB
# of iGPU pushes Ollama into 30/70 CPU/GPU split and inference crawls.
# Set subsystems.yolo=false in config_Brain.json to skip it entirely;
# the VLM can describe the scene directly. Autonomous/patrol features
# that rely on YOLO degrade gracefully via the yolo_api stubs.
if subsys.get("yolo", True):
init_yolo(raw_frame, raw_lock)
else:
print(" [YOLO] disabled by config (subsystems.yolo=false) — saves ~2 GiB iGPU for VLM")
from API.zmq_api import get_socket
init_odometry(zmq_sock=get_socket())
init_memory()
# LiDAR — optional
if subsys.get("lidar", True):
try:
from API.lidar_api import init_lidar
init_lidar()
except Exception as e:
print(f" [LiDAR] Init failed: {e} — continuing without LiDAR")
else:
print(" [LiDAR] disabled by config")
# Image search — optional
if subsys.get("imgsearch", False):
init_imgsearch(
get_frame_fn=get_frame,
send_vel_fn=send_vel,
gradual_stop_fn=gradual_stop,
llava_fn=call_llava,
yolo_sees_fn=yolo_sees,
model=OLLAMA_MODEL,
)
else:
print(" [ImgSearch] disabled by config")
# Autonomous exploration mode — optional
if subsys.get("autonomous", True):
from API.memory_api import mem as _mem_ref
from API.llava_api import PATROL_PROMPT
auto = AutonomousMode(
get_frame_fn=get_frame,
send_vel_fn=send_vel,
gradual_stop_fn=gradual_stop,
yolo_sees_fn=yolo_sees,
yolo_summary_fn=yolo_summary,
yolo_all_classes_fn=yolo_all_classes,
yolo_closest_fn=yolo_closest,
odom_fn=lambda: {"x": 0, "y": 0, "heading": 0},
call_llava_fn=call_llava,
patrol_prompt=PATROL_PROMPT,
mem=_mem_ref,
)
from API.odometry_api import odom as _odom_ref, ODOM_AVAILABLE
if _odom_ref and ODOM_AVAILABLE:
auto._odom_pos = lambda: {
"x": _odom_ref._x, "y": _odom_ref._y, "heading": _odom_ref._heading
}
init_autonomous(auto)
else:
print(" [Autonomous] disabled by config")
send_cmd("start")
time.sleep(0.5)
send_cmd("walk")
time.sleep(0.5)
# Voice module — optional
if subsys.get("voice", True):
_init_voice()
else:
print(" [Voice] disabled by config")
_log("Brain initialized", "info", "brain")
# Synchronous warmup — same shape as Marcus_v1's marcus_llava.py. The
# Python process blocks here for ~60-90 s on the first run so the first
# real user command doesn't pay the cold-load. One attempt, no retry, no
# thread. By the time the dashboard prints, Qwen is resident in iGPU.
from API.llava_api import VLM_ENABLED, OLLAMA_HOST, _client as _llava_client
if not VLM_ENABLED:
print(" [VLM] disabled by config — safe mode (no Ollama load)")
else:
host_short = OLLAMA_HOST.replace("http://", "")
print(f" [VLM] target: {host_short} ({OLLAMA_MODEL})")
print(" [VLM] Warming up... (loading into iGPU — may take 60-90 s on cold start)")
try:
_llava_client.chat(
model=OLLAMA_MODEL,
messages=[{"role": "user", "content": "hi"}],
options={"temperature": 0.0, "num_predict": 5,
"num_batch": _cfg.get("num_batch", 16),
"num_ctx": _cfg.get("num_ctx", 1024)},
)
print(" [VLM] warm — first command will be fast")
except Exception as _e:
print(f" [VLM] warmup failed ({_e}) — first command may cold-load")
# Global voice references
_audio_api = None
_voice_module = None
def _init_voice():
"""
Initialize the voice subsystem: G1 built-in mic + Gemini Live S2S
(subprocess in gemini_sdk env, runs Voice/gemini_runner.py). Every
transcribed user command flows through process_command(); the brain
decides motion side-effects, but the verbal reply itself comes from
Gemini's voice through the G1 speaker (NOT TtsMaker — TtsMaker stays
for non-voice subsystem callers but isn't invoked from this callback).
"""
global _audio_api, _voice_module
try:
from API.audio_api import AudioAPI
from Voice.marcus_voice import VoiceModule
_audio_api = AudioAPI()
# Heuristic filter for unusable Gemini transcripts. Gemini emits
# `<noise>` literally when audio is non-speech and `.` for empty
# bursts. These shouldn't pollute the terminal or trigger motion.
# Bilingual aware: short Arabic motion commands like "اجلس" /
# "قف" / "تعال" are LEGIT — only reject short non-ASCII-non-Arabic
# snippets (Korean / Thai / etc — echo/distortion garbage).
def _is_supported_lang_char(c: str) -> bool:
# ASCII letters/digits/punct OR Arabic block (U+0600..U+06FF)
# OR whitespace.
return c.isascii() or "؀" <= c <= "ۿ" or c.isspace()
def _is_garbage_transcript(t: str) -> bool:
stripped = t.strip().strip(".!?,").strip()
if not stripped:
return True
low = stripped.lower()
if low in ("<noise>", "noise", "yeah", "ok", "okay", "uh", "um", "hmm", "mm"):
return True
# Bare single character (often "." → ".") or all punctuation.
if all(not c.isalnum() for c in stripped):
return True
# Reject only when text is BOTH short AND uses chars from
# neither English nor Arabic — those are echo/distortion
# mistranscriptions (Korean/Thai/etc fragments).
if len(stripped) <= 6:
if not all(_is_supported_lang_char(c) for c in stripped):
return True
return False
def _on_command(text, lang):
text = (text or "").strip()
if not text:
return
if _is_garbage_transcript(text):
# Skip silently — neither show nor dispatch to brain.
return
print(f' [Sanad] heard: "{text}"')
try:
result = process_command(text)
except Exception as e:
print(f" [Brain] Error processing voice command: {e}")
return
if isinstance(result, dict):
sp = (result.get("speak") or "").strip()
act = (result.get("action") or "").strip()
# In the Gemini S2S architecture Gemini owns the voice; we
# do NOT call audio_api.speak(sp) here (would collide with
# Gemini's own audio reply). Just show the operator what
# the brain decided so they can correlate motion with
# Gemini's spoken acknowledgement.
if act and act not in ("NONE", "TALK"):
print(f" [Sanad] doing: {act}{f'{sp[:80]}' if sp else ''}")
elif sp:
print(f" [Brain] {sp[:120]}")
print("Command: ", end="", flush=True)
_voice_module = VoiceModule(_audio_api, on_command=_on_command)
_voice_module.start()
print(" [Voice] listening in background — say \"Sanad\" + your command")
except Exception as e:
print(f" [Voice] Init failed: {e} — continuing without voice")
_audio_api = None
_voice_module = None
# ══════════════════════════════════════════════════════════════════════════════
# PROCESS COMMAND — shared by terminal loop and WebSocket server
# ══════════════════════════════════════════════════════════════════════════════
def process_command(cmd: str) -> dict:
"""
Process a single command through the full brain pipeline.
Returns: {"type": str, "speak": str, "action": str, "elapsed": float}
Used by both run_marcus.py (terminal) and marcus_server.py (WebSocket).
"""
cmd = cmd.strip()
if not cmd:
return {"type": "empty", "speak": "", "action": "NONE", "elapsed": 0}
t0 = time.time()
# ── YOLO status ──────────────────────────────────────────────────────
if any(w in cmd.lower() for w in ("yolo", "what does yolo", "vision", "using yolo")):
from API.yolo_api import YOLO_AVAILABLE as _ya
status = "active" if _ya else "not loaded"
speak = f"YOLO: {status} | {yolo_summary()} | {yolo_fps():.1f}fps"
print(f" {speak}")
log_cmd(cmd, speak)
return {"type": "status", "speak": speak, "action": "YOLO", "elapsed": 0}
# ── Image search ─────────────────────────────────────────────────────
if cmd.lower().startswith("search/"):
speak = _handle_search(cmd)
return {"type": "search", "speak": speak, "action": "SEARCH", "elapsed": time.time() - t0}
# ── Auto-detect natural language goals ───────────────────────────────
if _NAT_GOAL_RE.match(cmd) and not cmd.lower().startswith("goal/"):
print(f" [Goal] Auto-detected: '{cmd}'")
navigate_to_goal(cmd.strip())
elapsed = time.time() - t0
log_cmd(cmd, f"Goal navigation: {cmd}", elapsed)
return {"type": "goal", "speak": f"Goal navigation: {cmd}", "action": "GOAL", "elapsed": elapsed}
# ── Explicit goal/ ───────────────────────────────────────────────────
if cmd.lower().startswith("goal/"):
goal = cmd[5:].strip()
if goal:
navigate_to_goal(goal)
elapsed = time.time() - t0
log_cmd(cmd, f"Goal navigation: {goal}", elapsed)
return {"type": "goal", "speak": f"Goal navigation: {goal}", "action": "GOAL", "elapsed": elapsed}
return {"type": "error", "speak": "Usage: goal/ stop when you see a person", "action": "NONE", "elapsed": 0}
# ── Autonomous patrol ────────────────────────────────────────────────
if cmd.lower().startswith("patrol"):
mins = 5.0
if " " in cmd:
try:
mins = float(cmd.split()[-1])
except ValueError:
pass
patrol(duration_minutes=mins)
elapsed = time.time() - t0
log_cmd(cmd, f"Patrol {mins}min", elapsed)
return {"type": "patrol", "speak": f"Patrol {mins}min complete", "action": "PATROL", "elapsed": elapsed}
# ── Local commands (place / odom / memory / help) ────────────────────
if try_local_command(cmd):
log_cmd(cmd, "local command")
return {"type": "local", "speak": "Done", "action": "LOCAL", "elapsed": time.time() - t0}
# ── Talk-only (questions / acknowledgements) ─────────────────────────
if any(re.match(p, cmd, re.IGNORECASE) for p in _TALK_PATTERNS):
speak = _handle_talk(cmd)
return {"type": "talk", "speak": speak, "action": "TALK", "elapsed": time.time() - t0}
# ── Greeting ─────────────────────────────────────────────────────────
if re.match(r"^(?:hi+|hey+|hello+|sup|yo+|greetings|good (?:morning|afternoon|evening))\s*[!.]*$",
cmd, re.IGNORECASE):
response = "Hello! I am Sanad. How can I help you?"
print(f"Sanad: {response}")
add_to_history(cmd, response)
log_cmd(cmd, response)
return {"type": "greeting", "speak": response, "action": "GREETING", "elapsed": 0}
# ── "Come to me" — smart YOLO-tracked approach ──────────────────────
# Old behaviour: walk forward 2s blindly. New: if YOLO is loaded,
# locate the person in the camera frame, turn toward them if off-
# centre, walk forward in short bursts, stop when 'arrived' (person
# fills enough of the frame to be 'right in front'). Bounded by
# COME_TO_ME_MAX_SEC; falls back to a 2s blind walk if YOLO isn't
# available so the canonical still does *something*.
#
# Arrival threshold: yolo_person_too_close(threshold=...) returns
# True if the person's bbox size_ratio (bbox_area / frame_area)
# exceeds the threshold. Defaults are calibrated for safety
# (0.25 = ~1m away, used by patrol). For come_here we want the
# robot to come MUCH closer — arm's length, ~0.5m — which is
# roughly size_ratio > 0.32 (lookup in marcus_yolo.distance_estimate:
# 'very close' starts at 0.30). 0.32 is a safe 'arrived' marker;
# any higher and the robot might bump into the user.
if re.match(r"^(?:come(?:\s+back)?(?:\s+to\s+me)?|come\s+here|get\s+closer|approach|move\s+closer)\s*[!.]*$", cmd, re.IGNORECASE):
from Brain.executor import move_step
try:
from API.yolo_api import yolo_sees, yolo_person_too_close, yolo_closest
from API.yolo_api import _stub_sees as _stub_sees_ref # type: ignore
yolo_active = (yolo_sees is not _stub_sees_ref)
except Exception:
yolo_active = False
COME_TO_ME_MAX_SEC = 25.0
COME_TO_ME_STEP_SEC = 0.6
COME_TO_ME_TURN_SEC = 0.3 # small correction (~14° at default vyaw)
COME_TO_ME_POLL_SEC = 0.3
# Arrival threshold: 0.32 size_ratio ≈ ~0.5m / arm's length.
# Patrol's 0.25 default is too lenient for come-to-me — it
# would treat a 1m-away user as 'already arrived'.
COME_TO_ME_ARRIVED_RATIO = 0.32
if not yolo_active:
# Fallback — same behaviour as before for hardware setups
# without YOLO loaded.
print(" [ComeToMe] YOLO not loaded — walking forward 2s")
execute_action("forward", 2.0)
resp = "Coming to you"
print(f"Sanad: {resp}")
add_to_history(cmd, resp)
log_cmd(cmd, resp)
return {"type": "move", "speak": resp, "action": "FORWARD 2.0s", "elapsed": 2.0}
print(f" [ComeToMe] tracking person up to {COME_TO_ME_MAX_SEC:.0f}s — say 'stop' to end")
deadline = time.time() + COME_TO_ME_MAX_SEC
last_log = 0.0
scan_attempts_left = 6 # how many turns before giving up if person never seen
while time.time() < deadline:
try:
from Core.motion_state import motion_abort
if motion_abort.is_set():
print(" [ComeToMe] motion_abort — stopping")
break
except Exception:
pass
try:
seen = bool(yolo_sees("person"))
except Exception:
seen = False
try:
# IMPORTANT: pass an explicit 'arrived' threshold (0.32)
# — without it, the default 0.25 (safety patrol's value)
# would call the user 'too close' when they're a metre
# away, and the robot would stop without walking. We
# want it to come arm's length close.
arrived = bool(yolo_person_too_close(
threshold=COME_TO_ME_ARRIVED_RATIO,
))
except Exception:
arrived = False
now = time.time()
if now - last_log > 1.5:
state = ("arrived (close to person)" if (seen and arrived)
else "tracking person" if seen
else "scanning for person")
print(f" [ComeToMe] {state}")
last_log = now
if seen and arrived:
# Made it — robot is close enough to stop in front
print(" [ComeToMe] arrived in front of person")
break
if seen and not arrived:
# Person is in view but not yet arm's length close —
# centre on them and walk forward in a burst.
bearing = "center"
try:
det = yolo_closest("person")
if det and getattr(det, "position", None):
# position is something like 'left' / 'center' / 'right'
bearing = str(det.position).lower()
except Exception:
pass
if bearing == "left":
execute_action("left", COME_TO_ME_TURN_SEC)
elif bearing == "right":
execute_action("right", COME_TO_ME_TURN_SEC)
# forward burst
move_step("forward", COME_TO_ME_STEP_SEC)
scan_attempts_left = 6 # reset
continue
# No person — try a small scan turn, then check again
if scan_attempts_left <= 0:
print(" [ComeToMe] no person found after scans — giving up")
break
scan_attempts_left -= 1
execute_action("left", COME_TO_ME_TURN_SEC)
time.sleep(COME_TO_ME_POLL_SEC)
try:
from API.zmq_api import gradual_stop as _gs
_gs()
except Exception:
pass
elapsed = time.time() - t0
resp = "Coming to you"
print(f"Sanad: {resp}")
add_to_history(cmd, resp)
log_cmd(cmd, resp)
return {"type": "move", "speak": resp, "action": "COME_TO_ME", "elapsed": elapsed}
# ── Multi-step compound ──────────────────────────────────────────────
_multi = re.match(
r"turn\s+(right|left)\s*(\d+)?\s*(?:deg(?:rees?)?)?\s+(?:and\s+then|then|and)?\s+"
r"(?:move\s+|go\s+|walk\s+|step\s+)?(back(?:ward)?|forward)\s*(\d+)?\s*(?:steps?|meter)?",
cmd, re.IGNORECASE)
if _multi:
turn_dir = _multi.group(1).lower()
turn_deg = float(_multi.group(2) or 90)
walk_dir = "backward" if "back" in _multi.group(3).lower() else "forward"
walk_dur = float(_multi.group(4) or 2)
execute_action("right" if turn_dir == "right" else "left", turn_deg / 18.0)
execute_action(walk_dir, walk_dur)
resp = f"Turned {turn_dir} {int(turn_deg)} degrees then moved {walk_dir}"
print(f"Sanad: {resp}")
add_to_history(cmd, resp)
log_cmd(cmd, resp)
return {"type": "move", "speak": resp, "action": f"MULTI {turn_dir}+{walk_dir}", "elapsed": time.time() - t0}
# ── Standard LLaVA command ───────────────────────────────────────────
return _handle_llava(cmd)
# ══════════════════════════════════════════════════════════════════════════════
# HANDLERS (return speak text)
# ══════════════════════════════════════════════════════════════════════════════
def _handle_search(cmd):
args = cmd[7:].strip()
if not args:
print(" Usage: search/ /path/to/photo.jpg [hint]")
return "Usage: search/ <path or hint>"
searcher = get_searcher()
if not searcher:
print(" [Search] Image search not available")
return "Image search not available"
parts = args.split(None, 1)
if parts and os.path.exists(parts[0]):
img_path = parts[0]
hint = parts[1].strip() if len(parts) > 1 else ""
yolo_pre = "person" if not hint or "person" in hint.lower() else None
log_cmd(cmd, f"Image search: {img_path}")
result = searcher.search_from_file(img_path, hint=hint, yolo_prefilter=yolo_pre)
return result.get("description", "Search complete")
hint = args
yolo_pre = "person" if any(w in hint.lower() for w in ("person", "guy", "man", "woman")) else None
log_cmd(cmd, f"Image search: {hint}")
result = searcher.search(ref_img_b64=None, hint=hint, yolo_prefilter=yolo_pre)
return result.get("description", "Search complete")
def _handle_talk(cmd):
print("Thinking...")
try:
img = get_frame()
facts_str = ""
try:
from API.llava_api import _facts
if _facts:
facts_str = "\nKnown facts: " + "; ".join(_facts) + "."
except ImportError:
pass
d = ask_talk(cmd, img, facts=facts_str)
sp = d.get("speak", "")
print(f"Sanad: {sp}")
log_cmd(cmd, sp)
return sp
except Exception as ex:
print(f" Error: {ex}")
return f"Error: {ex}"
def _handle_llava(cmd):
print("Thinking...")
t0 = time.time()
img = get_frame()
# Poll up to 500 ms in 50 ms slices instead of blocking a full second.
# Returns the moment a frame is available — most drops recover in <100 ms.
if img is None:
print(" Waiting for camera...")
for _ in range(10):
time.sleep(0.05)
img = get_frame()
if img is not None:
break
if img is None:
print(" Camera not ready — command cancelled")
log_cmd(cmd, "camera not ready")
return {"type": "error", "speak": "Camera not ready", "action": "NONE", "elapsed": 0}
d = ask(cmd, img)
dur = time.time() - t0
print(f" ({dur:.1f}s) -> {json.dumps(d)}")
resp = execute(d)
log_cmd(cmd, resp or "", dur)
from API.yolo_api import YOLO_AVAILABLE as _ya
if _ya:
for cls in yolo_all_classes():
det = yolo_closest(cls)
if det:
log_detection(cls, det.position, det.distance_estimate)
action_str = d.get("actions", [{}])[0].get("move", "NONE") if d.get("actions") else "NONE"
return {"type": "decision", "speak": resp or "", "action": action_str.upper(),
"elapsed": dur, "raw": d}
# ══════════════════════════════════════════════════════════════════════════════
# HELPERS
# ══════════════════════════════════════════════════════════════════════════════
def _strip_ansi(s: str) -> str:
"""Strip ANSI colour escapes for correct visual width calculations."""
import re as _re
return _re.sub(r"\x1b\[[0-9;]*m", "", s)
def get_brain_status() -> dict:
"""Return current brain status for server status message."""
from API.yolo_api import YOLO_AVAILABLE as _ya
from API.odometry_api import ODOM_AVAILABLE as _oa
from API.memory_api import MEMORY_AVAILABLE as _ma
from API.camera_api import CAM_WIDTH, CAM_HEIGHT, CAM_FPS
try:
from API.lidar_api import LIDAR_AVAILABLE as _la, get_loc_state
lidar_state = get_loc_state() if _la else "off"
except ImportError:
_la = False
lidar_state = "off"
return {
"model": OLLAMA_MODEL,
"yolo": _ya,
"odometry": _oa,
"memory": _ma,
"lidar": _la,
"lidar_state": lidar_state,
"voice": _voice_module is not None and _voice_module.is_running,
"camera": f"{CAM_WIDTH}x{CAM_HEIGHT}@{CAM_FPS}",
}
def shutdown():
"""Clean shutdown of all subsystems."""
print("\nShutting down Marcus...")
# Stop voice module
if _voice_module and _voice_module.is_running:
_voice_module.stop()
# Stop autonomous mode if running
from Brain.command_parser import _auto
if _auto and _auto.is_enabled():
_auto.disable()
stop_camera()
gradual_stop()
send_cmd("stop")
from API.odometry_api import odom as _o
if _o:
_o.stop()
from API.memory_api import mem as _m
if _m:
_m.end_session()
try:
from API.lidar_api import stop_lidar
stop_lidar()
except Exception:
pass
_log("Marcus stopped", "info", "brain")
print("Marcus stopped.")
# ══════════════════════════════════════════════════════════════════════════════
# TERMINAL MODE — used by run_marcus.py
# ══════════════════════════════════════════════════════════════════════════════
def run_terminal():
"""Run brain with terminal input loop."""
init_brain()
# ─── DASHBOARD ───────────────────────────────────────────────────────
# Separate the boot log from the interactive dashboard with a clear
# visual break. Print at the end of init so it's always the last thing
# on screen before the first `Command:` prompt — even if the operator
# scrolled through a wall of subsystem init messages.
status = get_brain_status()
def _fmt(v):
if v is True: return "\033[92mON \033[0m" # green
if v is False: return "\033[91mOFF\033[0m" # red
return str(v)
W = 58
LEFT_W = 28
RIGHT_W = (W - 2) - LEFT_W # visible chars available in the right column
def _pad(s: str, width: int) -> str:
"""ljust by visible width, treating ANSI colour escapes as zero-width."""
visible = len(_strip_ansi(s))
return s + " " * max(0, width - visible)
print("\n\n" + "" + "" * (W-2) + "")
print("" + _pad(" SANAD — AI BRAIN READY", W-2) + "")
print("" + "" * (W-2) + "")
from API.llava_api import VLM_ENABLED
left = [("model", status["model"]),
("vlm", _fmt(VLM_ENABLED)),
("voice", _fmt(status["voice"])),
("camera", status["camera"])]
right = [("yolo", _fmt(status["yolo"])),
("lidar", _fmt(status["lidar"])),
("memory", _fmt(status["memory"])),
("odometry", _fmt(status["odometry"]))]
for i in range(max(len(left), len(right))):
l = f" {left[i][0]:<8}: {left[i][1]}" if i < len(left) else ""
r = f" {right[i][0]:<8}: {right[i][1]}" if i < len(right) else ""
print("" + _pad(l, LEFT_W) + _pad(r, RIGHT_W) + "")
print("" + "" * (W-2) + "")
print("" + _pad(" Type a command, or say \"Sanad, <command>\".", W-2) + "")
print("" + _pad(" help · example · yolo · test_tts · auto on/off · q", W-2) + "")
print("" + "" * (W-2) + "\n")
try:
while True:
try:
cmd = input("Command: ").strip()
except (EOFError, KeyboardInterrupt):
break
if not cmd:
continue
if cmd.lower() in ("q", "quit", "exit"):
break
if cmd.lower() in ("mute/", "unmute/"):
# Route through the audio API so the action respects whichever
# mic backend is active (BuiltinMic flushes the UDP buffer;
# the legacy pactl path mutes PulseAudio source 3).
if _audio_api is None:
print(" Voice is not initialized")
continue
if cmd.lower() == "mute/":
_audio_api._mute_mic()
print(" Mic muted")
else:
_audio_api._unmute_mic()
print(" Mic unmuted")
continue
if cmd.lower().startswith("test_tts"):
# Probe speaker IDs to find which one speaks English on this
# firmware. Usage: `test_tts` (runs 0, 1, 2) or `test_tts 1`.
if _audio_api is None or _audio_api._tts_engine is None:
print(" Voice is not initialized")
continue
parts = cmd.split()
ids = [int(x) for x in parts[1:]] if len(parts) > 1 else [0, 1, 2]
phrase = "Hello, I am Sanad."
for sid in ids:
print(f" → speaker_id = {sid}")
_audio_api._tts_engine.speak(phrase, speaker_id=sid, block=True)
time.sleep(0.3)
print(' Pick the ID that sounded English and set it in')
print(' Config/config_Voice.json :: tts.builtin_speaker_id')
continue
# Structured motion log for terminal-typed commands. The
# voice path already logs through Voice/marcus_voice's
# worker; here we cover the OTHER entry-point so every
# motion (text or voice) shows up in logs/motion.log with
# parsed direction/magnitude/unit + actual elapsed.
try:
from Core.motion_log import log_motion as _mlog
_mlog("start", cmd, source="text")
_t0 = time.time()
except Exception:
_t0 = time.time()
result = process_command(cmd)
try:
_mlog("complete", cmd, source="text",
actual_sec=time.time() - _t0)
except Exception:
pass
sp = result.get("speak", "") if isinstance(result, dict) else ""
if sp and _audio_api:
_audio_api.speak(sp)
except KeyboardInterrupt:
pass
shutdown()
if __name__ == "__main__":
run_terminal()