580 lines
25 KiB
Python
580 lines
25 KiB
Python
"""
|
|
marcus_brain.py — Marcus AI Brain Orchestrator
|
|
================================================
|
|
Shared brain logic for both terminal (run_marcus.py) and server (marcus_server.py).
|
|
|
|
Usage:
|
|
Terminal: python3 run_marcus.py
|
|
Server: python3 -m Server.marcus_server (imports init_brain + process_command)
|
|
"""
|
|
import json
|
|
import os
|
|
import re
|
|
import time
|
|
import sys
|
|
|
|
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if PROJECT_DIR not in sys.path:
|
|
sys.path.insert(0, PROJECT_DIR)
|
|
|
|
from API.zmq_api import init_zmq, send_vel, gradual_stop, send_cmd
|
|
from API.camera_api import start_camera, stop_camera, get_frame
|
|
from API.yolo_api import (
|
|
init_yolo, yolo_summary, yolo_fps,
|
|
yolo_all_classes, yolo_closest, yolo_sees,
|
|
)
|
|
from API.odometry_api import init_odometry
|
|
from API.memory_api import init_memory, log_cmd, log_detection
|
|
from API.llava_api import (
|
|
OLLAMA_MODEL, ask, ask_talk, call_llava, parse_json, add_to_history,
|
|
)
|
|
from API.imgsearch_api import init_imgsearch, get_searcher
|
|
from Core.config_loader import load_config
|
|
from Core.logger import log as _log
|
|
|
|
from Brain.command_parser import try_local_command, init_autonomous
|
|
from Brain.executor import execute, execute_action
|
|
from Navigation.goal_nav import navigate_to_goal
|
|
from Navigation.patrol import patrol
|
|
from Autonomous.marcus_autonomous import AutonomousMode
|
|
|
|
_cfg = load_config("Brain")
|
|
|
|
_TALK_PATTERNS = [
|
|
# Questions
|
|
r"^(?:what|who|where|when|how|why|is|are|do|does|can|tell|describe|explain|show|analyze)\s+",
|
|
# Identity / facts told to the robot
|
|
r"^(?:my name is|i am|call me|that is|that person|note that|remember that)\s+",
|
|
# Acknowledgements
|
|
r"^(?:ok|okay|yes|no|good|nice|great|thanks|thank you|got it|understood|correct)\s*[!.]*$",
|
|
]
|
|
|
|
_NAT_GOAL_RE = re.compile(
|
|
r'^(?:keep\s+(?:turn|rotat|spin)\w*\s+\w+\s+until\s+(?:you\s+)?(?:see|find|spot)\s+.+'
|
|
r'|stop\s+when\s+(?:you\s+)?(?:see|find|spot)\s+.+'
|
|
r'|find\s+(?:a\s+|the\s+|me\s+a\s+)?\w.+'
|
|
r'|look\s+for\s+(?:a\s+|the\s+)?\w.+'
|
|
r'|search\s+for\s+(?:a\s+|the\s+)?\w.+)$',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# INIT — called once by both run_marcus.py and marcus_server.py
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def init_brain():
|
|
"""Initialize all subsystems. Call once at startup from the parent process.
|
|
|
|
Optional subsystems (lidar / voice / imgsearch / autonomous) are gated on
|
|
`config_Brain.json::subsystems.<name>`. Disabling the ones you don't need
|
|
brings Marcus's boot time down from ~18 s to ~5-7 s.
|
|
"""
|
|
subsys = _cfg.get("subsystems", {}) or {}
|
|
|
|
# Bind the ZMQ PUB socket before anything tries to publish on it.
|
|
# This is now explicit (previously it happened as an import side effect,
|
|
# which crashed every multiprocessing child that re-imported zmq_api).
|
|
init_zmq()
|
|
|
|
raw_frame, raw_lock = start_camera()
|
|
init_yolo(raw_frame, raw_lock)
|
|
|
|
from API.zmq_api import get_socket
|
|
init_odometry(zmq_sock=get_socket())
|
|
|
|
init_memory()
|
|
|
|
# LiDAR — optional
|
|
if subsys.get("lidar", True):
|
|
try:
|
|
from API.lidar_api import init_lidar
|
|
init_lidar()
|
|
except Exception as e:
|
|
print(f" [LiDAR] Init failed: {e} — continuing without LiDAR")
|
|
else:
|
|
print(" [LiDAR] disabled by config")
|
|
|
|
# Image search — optional
|
|
if subsys.get("imgsearch", False):
|
|
init_imgsearch(
|
|
get_frame_fn=get_frame,
|
|
send_vel_fn=send_vel,
|
|
gradual_stop_fn=gradual_stop,
|
|
llava_fn=call_llava,
|
|
yolo_sees_fn=yolo_sees,
|
|
model=OLLAMA_MODEL,
|
|
)
|
|
else:
|
|
print(" [ImgSearch] disabled by config")
|
|
|
|
# Autonomous exploration mode — optional
|
|
if subsys.get("autonomous", True):
|
|
from API.memory_api import mem as _mem_ref
|
|
from API.llava_api import PATROL_PROMPT
|
|
auto = AutonomousMode(
|
|
get_frame_fn=get_frame,
|
|
send_vel_fn=send_vel,
|
|
gradual_stop_fn=gradual_stop,
|
|
yolo_sees_fn=yolo_sees,
|
|
yolo_summary_fn=yolo_summary,
|
|
yolo_all_classes_fn=yolo_all_classes,
|
|
yolo_closest_fn=yolo_closest,
|
|
odom_fn=lambda: {"x": 0, "y": 0, "heading": 0},
|
|
call_llava_fn=call_llava,
|
|
patrol_prompt=PATROL_PROMPT,
|
|
mem=_mem_ref,
|
|
)
|
|
from API.odometry_api import odom as _odom_ref, ODOM_AVAILABLE
|
|
if _odom_ref and ODOM_AVAILABLE:
|
|
auto._odom_pos = lambda: {
|
|
"x": _odom_ref._x, "y": _odom_ref._y, "heading": _odom_ref._heading
|
|
}
|
|
init_autonomous(auto)
|
|
else:
|
|
print(" [Autonomous] disabled by config")
|
|
|
|
send_cmd("start")
|
|
time.sleep(0.5)
|
|
send_cmd("walk")
|
|
time.sleep(0.5)
|
|
|
|
# Voice module — optional
|
|
if subsys.get("voice", True):
|
|
_init_voice()
|
|
else:
|
|
print(" [Voice] disabled by config")
|
|
|
|
_log("Brain initialized", "info", "brain")
|
|
|
|
# Report VLM config only — no warmup thread. This matches Marcus_v1's
|
|
# concept: the first real VLM command performs the cold-load synchronously
|
|
# inside ollama.chat(), which takes ~60-90 s once on the Jetson and is
|
|
# fast for every subsequent call. A background warmup thread races with
|
|
# YOLO/camera/audio/Holosoma startup and with user input, and on a
|
|
# 16 GB unified-memory board that race is what triggers the OOM killer.
|
|
from API.llava_api import VLM_ENABLED, OLLAMA_HOST
|
|
if not VLM_ENABLED:
|
|
print(" [VLM] disabled by config — safe mode (no Ollama load)")
|
|
else:
|
|
host_short = OLLAMA_HOST.replace("http://", "")
|
|
print(f" [VLM] target: {host_short} ({OLLAMA_MODEL}) "
|
|
f"— first vision command will cold-load (~60-90 s)")
|
|
|
|
|
|
# Global voice references
|
|
_audio_api = None
|
|
_voice_module = None
|
|
|
|
|
|
def _init_voice():
|
|
"""
|
|
Initialize the voice subsystem: G1 built-in mic + Whisper STT + G1
|
|
built-in TtsMaker for replies. Every transcribed command flows through
|
|
process_command(), and the resulting `speak` string is sent to the G1
|
|
speaker.
|
|
"""
|
|
global _audio_api, _voice_module
|
|
try:
|
|
from API.audio_api import AudioAPI
|
|
from Voice.marcus_voice import VoiceModule
|
|
|
|
_audio_api = AudioAPI()
|
|
|
|
def _on_command(text, lang):
|
|
text = (text or "").strip()
|
|
if not text:
|
|
return
|
|
# One clean, distinctive line so the operator can see exactly
|
|
# what Whisper transcribed before the brain reacts. Everything
|
|
# else from the voice subsystem is file-only.
|
|
print(f' [Sanad] heard: "{text}"')
|
|
try:
|
|
result = process_command(text)
|
|
except Exception as e:
|
|
print(f" [Brain] Error processing voice command: {e}")
|
|
return
|
|
if isinstance(result, dict):
|
|
sp = (result.get("speak") or "").strip()
|
|
if sp and _audio_api:
|
|
_audio_api.speak(sp)
|
|
# Redraw the Command: prompt that our print clobbered
|
|
print("Command: ", end="", flush=True)
|
|
|
|
_voice_module = VoiceModule(_audio_api, on_command=_on_command)
|
|
_voice_module.start()
|
|
print(" [Voice] listening in background — say \"Sanad\" + your command")
|
|
except Exception as e:
|
|
print(f" [Voice] Init failed: {e} — continuing without voice")
|
|
_audio_api = None
|
|
_voice_module = None
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# PROCESS COMMAND — shared by terminal loop and WebSocket server
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def process_command(cmd: str) -> dict:
|
|
"""
|
|
Process a single command through the full brain pipeline.
|
|
Returns: {"type": str, "speak": str, "action": str, "elapsed": float}
|
|
|
|
Used by both run_marcus.py (terminal) and marcus_server.py (WebSocket).
|
|
"""
|
|
cmd = cmd.strip()
|
|
if not cmd:
|
|
return {"type": "empty", "speak": "", "action": "NONE", "elapsed": 0}
|
|
|
|
t0 = time.time()
|
|
|
|
# ── YOLO status ──────────────────────────────────────────────────────
|
|
if any(w in cmd.lower() for w in ("yolo", "what does yolo", "vision", "using yolo")):
|
|
from API.yolo_api import YOLO_AVAILABLE as _ya
|
|
status = "active" if _ya else "not loaded"
|
|
speak = f"YOLO: {status} | {yolo_summary()} | {yolo_fps():.1f}fps"
|
|
print(f" {speak}")
|
|
log_cmd(cmd, speak)
|
|
return {"type": "status", "speak": speak, "action": "YOLO", "elapsed": 0}
|
|
|
|
# ── Image search ─────────────────────────────────────────────────────
|
|
if cmd.lower().startswith("search/"):
|
|
speak = _handle_search(cmd)
|
|
return {"type": "search", "speak": speak, "action": "SEARCH", "elapsed": time.time() - t0}
|
|
|
|
# ── Auto-detect natural language goals ───────────────────────────────
|
|
if _NAT_GOAL_RE.match(cmd) and not cmd.lower().startswith("goal/"):
|
|
print(f" [Goal] Auto-detected: '{cmd}'")
|
|
navigate_to_goal(cmd.strip())
|
|
elapsed = time.time() - t0
|
|
log_cmd(cmd, f"Goal navigation: {cmd}", elapsed)
|
|
return {"type": "goal", "speak": f"Goal navigation: {cmd}", "action": "GOAL", "elapsed": elapsed}
|
|
|
|
# ── Explicit goal/ ───────────────────────────────────────────────────
|
|
if cmd.lower().startswith("goal/"):
|
|
goal = cmd[5:].strip()
|
|
if goal:
|
|
navigate_to_goal(goal)
|
|
elapsed = time.time() - t0
|
|
log_cmd(cmd, f"Goal navigation: {goal}", elapsed)
|
|
return {"type": "goal", "speak": f"Goal navigation: {goal}", "action": "GOAL", "elapsed": elapsed}
|
|
return {"type": "error", "speak": "Usage: goal/ stop when you see a person", "action": "NONE", "elapsed": 0}
|
|
|
|
# ── Autonomous patrol ────────────────────────────────────────────────
|
|
if cmd.lower().startswith("patrol"):
|
|
mins = 5.0
|
|
if " " in cmd:
|
|
try:
|
|
mins = float(cmd.split()[-1])
|
|
except ValueError:
|
|
pass
|
|
patrol(duration_minutes=mins)
|
|
elapsed = time.time() - t0
|
|
log_cmd(cmd, f"Patrol {mins}min", elapsed)
|
|
return {"type": "patrol", "speak": f"Patrol {mins}min complete", "action": "PATROL", "elapsed": elapsed}
|
|
|
|
# ── Local commands (place / odom / memory / help) ────────────────────
|
|
if try_local_command(cmd):
|
|
log_cmd(cmd, "local command")
|
|
return {"type": "local", "speak": "Done", "action": "LOCAL", "elapsed": time.time() - t0}
|
|
|
|
# ── Talk-only (questions / acknowledgements) ─────────────────────────
|
|
if any(re.match(p, cmd, re.IGNORECASE) for p in _TALK_PATTERNS):
|
|
speak = _handle_talk(cmd)
|
|
return {"type": "talk", "speak": speak, "action": "TALK", "elapsed": time.time() - t0}
|
|
|
|
# ── Greeting ─────────────────────────────────────────────────────────
|
|
if re.match(r"^(?:hi+|hey+|hello+|sup|yo+|greetings|good (?:morning|afternoon|evening))\s*[!.]*$",
|
|
cmd, re.IGNORECASE):
|
|
response = "Hello! I am Sanad. How can I help you?"
|
|
print(f"Sanad: {response}")
|
|
add_to_history(cmd, response)
|
|
log_cmd(cmd, response)
|
|
return {"type": "greeting", "speak": response, "action": "GREETING", "elapsed": 0}
|
|
|
|
# ── "Come to me" shortcut ────────────────────────────────────────────
|
|
if re.match(r"^(?:come(?:\s+back)?(?:\s+to\s+me)?|come\s+here|get\s+closer|approach|move\s+closer)\s*[!.]*$", cmd, re.IGNORECASE):
|
|
execute_action("forward", 2.0)
|
|
resp = "Coming to you"
|
|
print(f"Sanad: {resp}")
|
|
add_to_history(cmd, resp)
|
|
log_cmd(cmd, resp)
|
|
return {"type": "move", "speak": resp, "action": "FORWARD 2.0s", "elapsed": 2.0}
|
|
|
|
# ── Multi-step compound ──────────────────────────────────────────────
|
|
_multi = re.match(
|
|
r"turn\s+(right|left)\s*(\d+)?\s*(?:deg(?:rees?)?)?\s+(?:and\s+then|then|and)?\s+"
|
|
r"(?:move\s+|go\s+|walk\s+|step\s+)?(back(?:ward)?|forward)\s*(\d+)?\s*(?:steps?|meter)?",
|
|
cmd, re.IGNORECASE)
|
|
if _multi:
|
|
turn_dir = _multi.group(1).lower()
|
|
turn_deg = float(_multi.group(2) or 90)
|
|
walk_dir = "backward" if "back" in _multi.group(3).lower() else "forward"
|
|
walk_dur = float(_multi.group(4) or 2)
|
|
execute_action("right" if turn_dir == "right" else "left", turn_deg / 18.0)
|
|
execute_action(walk_dir, walk_dur)
|
|
resp = f"Turned {turn_dir} {int(turn_deg)} degrees then moved {walk_dir}"
|
|
print(f"Sanad: {resp}")
|
|
add_to_history(cmd, resp)
|
|
log_cmd(cmd, resp)
|
|
return {"type": "move", "speak": resp, "action": f"MULTI {turn_dir}+{walk_dir}", "elapsed": time.time() - t0}
|
|
|
|
# ── Standard LLaVA command ───────────────────────────────────────────
|
|
return _handle_llava(cmd)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# HANDLERS (return speak text)
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _handle_search(cmd):
|
|
args = cmd[7:].strip()
|
|
if not args:
|
|
print(" Usage: search/ /path/to/photo.jpg [hint]")
|
|
return "Usage: search/ <path or hint>"
|
|
searcher = get_searcher()
|
|
if not searcher:
|
|
print(" [Search] Image search not available")
|
|
return "Image search not available"
|
|
parts = args.split(None, 1)
|
|
if parts and os.path.exists(parts[0]):
|
|
img_path = parts[0]
|
|
hint = parts[1].strip() if len(parts) > 1 else ""
|
|
yolo_pre = "person" if not hint or "person" in hint.lower() else None
|
|
log_cmd(cmd, f"Image search: {img_path}")
|
|
result = searcher.search_from_file(img_path, hint=hint, yolo_prefilter=yolo_pre)
|
|
return result.get("description", "Search complete")
|
|
hint = args
|
|
yolo_pre = "person" if any(w in hint.lower() for w in ("person", "guy", "man", "woman")) else None
|
|
log_cmd(cmd, f"Image search: {hint}")
|
|
result = searcher.search(ref_img_b64=None, hint=hint, yolo_prefilter=yolo_pre)
|
|
return result.get("description", "Search complete")
|
|
|
|
|
|
def _handle_talk(cmd):
|
|
print("Thinking...")
|
|
try:
|
|
img = get_frame()
|
|
facts_str = ""
|
|
try:
|
|
from API.llava_api import _facts
|
|
if _facts:
|
|
facts_str = "\nKnown facts: " + "; ".join(_facts) + "."
|
|
except ImportError:
|
|
pass
|
|
d = ask_talk(cmd, img, facts=facts_str)
|
|
sp = d.get("speak", "")
|
|
print(f"Sanad: {sp}")
|
|
log_cmd(cmd, sp)
|
|
return sp
|
|
except Exception as ex:
|
|
print(f" Error: {ex}")
|
|
return f"Error: {ex}"
|
|
|
|
|
|
def _handle_llava(cmd):
|
|
print("Thinking...")
|
|
t0 = time.time()
|
|
img = get_frame()
|
|
|
|
# Poll up to 500 ms in 50 ms slices instead of blocking a full second.
|
|
# Returns the moment a frame is available — most drops recover in <100 ms.
|
|
if img is None:
|
|
print(" Waiting for camera...")
|
|
for _ in range(10):
|
|
time.sleep(0.05)
|
|
img = get_frame()
|
|
if img is not None:
|
|
break
|
|
|
|
if img is None:
|
|
print(" Camera not ready — command cancelled")
|
|
log_cmd(cmd, "camera not ready")
|
|
return {"type": "error", "speak": "Camera not ready", "action": "NONE", "elapsed": 0}
|
|
|
|
d = ask(cmd, img)
|
|
dur = time.time() - t0
|
|
print(f" ({dur:.1f}s) -> {json.dumps(d)}")
|
|
resp = execute(d)
|
|
log_cmd(cmd, resp or "", dur)
|
|
|
|
from API.yolo_api import YOLO_AVAILABLE as _ya
|
|
if _ya:
|
|
for cls in yolo_all_classes():
|
|
det = yolo_closest(cls)
|
|
if det:
|
|
log_detection(cls, det.position, det.distance_estimate)
|
|
|
|
action_str = d.get("actions", [{}])[0].get("move", "NONE") if d.get("actions") else "NONE"
|
|
return {"type": "decision", "speak": resp or "", "action": action_str.upper(),
|
|
"elapsed": dur, "raw": d}
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# HELPERS
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _strip_ansi(s: str) -> str:
|
|
"""Strip ANSI colour escapes for correct visual width calculations."""
|
|
import re as _re
|
|
return _re.sub(r"\x1b\[[0-9;]*m", "", s)
|
|
|
|
|
|
def get_brain_status() -> dict:
|
|
"""Return current brain status for server status message."""
|
|
from API.yolo_api import YOLO_AVAILABLE as _ya
|
|
from API.odometry_api import ODOM_AVAILABLE as _oa
|
|
from API.memory_api import MEMORY_AVAILABLE as _ma
|
|
from API.camera_api import CAM_WIDTH, CAM_HEIGHT, CAM_FPS
|
|
try:
|
|
from API.lidar_api import LIDAR_AVAILABLE as _la, get_loc_state
|
|
lidar_state = get_loc_state() if _la else "off"
|
|
except ImportError:
|
|
_la = False
|
|
lidar_state = "off"
|
|
return {
|
|
"model": OLLAMA_MODEL,
|
|
"yolo": _ya,
|
|
"odometry": _oa,
|
|
"memory": _ma,
|
|
"lidar": _la,
|
|
"lidar_state": lidar_state,
|
|
"voice": _voice_module is not None and _voice_module.is_running,
|
|
"camera": f"{CAM_WIDTH}x{CAM_HEIGHT}@{CAM_FPS}",
|
|
}
|
|
|
|
|
|
def shutdown():
|
|
"""Clean shutdown of all subsystems."""
|
|
print("\nShutting down Marcus...")
|
|
# Stop voice module
|
|
if _voice_module and _voice_module.is_running:
|
|
_voice_module.stop()
|
|
# Stop autonomous mode if running
|
|
from Brain.command_parser import _auto
|
|
if _auto and _auto.is_enabled():
|
|
_auto.disable()
|
|
stop_camera()
|
|
gradual_stop()
|
|
send_cmd("stop")
|
|
|
|
from API.odometry_api import odom as _o
|
|
if _o:
|
|
_o.stop()
|
|
|
|
from API.memory_api import mem as _m
|
|
if _m:
|
|
_m.end_session()
|
|
|
|
try:
|
|
from API.lidar_api import stop_lidar
|
|
stop_lidar()
|
|
except Exception:
|
|
pass
|
|
|
|
_log("Marcus stopped", "info", "brain")
|
|
print("Marcus stopped.")
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# TERMINAL MODE — used by run_marcus.py
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def run_terminal():
|
|
"""Run brain with terminal input loop."""
|
|
init_brain()
|
|
|
|
# ─── DASHBOARD ───────────────────────────────────────────────────────
|
|
# Separate the boot log from the interactive dashboard with a clear
|
|
# visual break. Print at the end of init so it's always the last thing
|
|
# on screen before the first `Command:` prompt — even if the operator
|
|
# scrolled through a wall of subsystem init messages.
|
|
status = get_brain_status()
|
|
|
|
def _fmt(v):
|
|
if v is True: return "\033[92mON \033[0m" # green
|
|
if v is False: return "\033[91mOFF\033[0m" # red
|
|
return str(v)
|
|
|
|
W = 58
|
|
LEFT_W = 28
|
|
RIGHT_W = (W - 2) - LEFT_W # visible chars available in the right column
|
|
|
|
def _pad(s: str, width: int) -> str:
|
|
"""ljust by visible width, treating ANSI colour escapes as zero-width."""
|
|
visible = len(_strip_ansi(s))
|
|
return s + " " * max(0, width - visible)
|
|
|
|
print("\n\n" + "╔" + "═" * (W-2) + "╗")
|
|
print("║" + _pad(" SANAD — AI BRAIN READY", W-2) + "║")
|
|
print("╠" + "═" * (W-2) + "╣")
|
|
from API.llava_api import VLM_ENABLED
|
|
left = [("model", status["model"]),
|
|
("vlm", _fmt(VLM_ENABLED)),
|
|
("voice", _fmt(status["voice"])),
|
|
("camera", status["camera"])]
|
|
right = [("yolo", _fmt(status["yolo"])),
|
|
("lidar", _fmt(status["lidar"])),
|
|
("memory", _fmt(status["memory"])),
|
|
("odometry", _fmt(status["odometry"]))]
|
|
for i in range(max(len(left), len(right))):
|
|
l = f" {left[i][0]:<8}: {left[i][1]}" if i < len(left) else ""
|
|
r = f" {right[i][0]:<8}: {right[i][1]}" if i < len(right) else ""
|
|
print("║" + _pad(l, LEFT_W) + _pad(r, RIGHT_W) + "║")
|
|
print("╠" + "═" * (W-2) + "╣")
|
|
print("║" + _pad(" Type a command, or say \"Sanad, <command>\".", W-2) + "║")
|
|
print("║" + _pad(" help · example · yolo · test_tts · auto on/off · q", W-2) + "║")
|
|
print("╚" + "═" * (W-2) + "╝\n")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
cmd = input("Command: ").strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
break
|
|
if not cmd:
|
|
continue
|
|
if cmd.lower() in ("q", "quit", "exit"):
|
|
break
|
|
if cmd.lower() in ("mute/", "unmute/"):
|
|
# Route through the audio API so the action respects whichever
|
|
# mic backend is active (BuiltinMic flushes the UDP buffer;
|
|
# the legacy pactl path mutes PulseAudio source 3).
|
|
if _audio_api is None:
|
|
print(" Voice is not initialized")
|
|
continue
|
|
if cmd.lower() == "mute/":
|
|
_audio_api._mute_mic()
|
|
print(" Mic muted")
|
|
else:
|
|
_audio_api._unmute_mic()
|
|
print(" Mic unmuted")
|
|
continue
|
|
if cmd.lower().startswith("test_tts"):
|
|
# Probe speaker IDs to find which one speaks English on this
|
|
# firmware. Usage: `test_tts` (runs 0, 1, 2) or `test_tts 1`.
|
|
if _audio_api is None or _audio_api._tts_engine is None:
|
|
print(" Voice is not initialized")
|
|
continue
|
|
parts = cmd.split()
|
|
ids = [int(x) for x in parts[1:]] if len(parts) > 1 else [0, 1, 2]
|
|
phrase = "Hello, I am Sanad."
|
|
for sid in ids:
|
|
print(f" → speaker_id = {sid}")
|
|
_audio_api._tts_engine.speak(phrase, speaker_id=sid, block=True)
|
|
time.sleep(0.3)
|
|
print(' Pick the ID that sounded English and set it in')
|
|
print(' Config/config_Voice.json :: tts.builtin_speaker_id')
|
|
continue
|
|
result = process_command(cmd)
|
|
sp = result.get("speak", "") if isinstance(result, dict) else ""
|
|
if sp and _audio_api:
|
|
_audio_api.speak(sp)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_terminal()
|