""" Marcus Server — WebSocket interface to the full Marcus Brain ============================================================= Runs on Jetson Orin NX. Uses the SAME brain as run_marcus.py but accepts commands via WebSocket instead of terminal input. Start: python3 -m Server.marcus_server OR: python3 -m Server.marcus_server --host 0.0.0.0 --port 8765 """ import asyncio import argparse import json import os import subprocess import sys import threading import time PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if PROJECT_DIR not in sys.path: sys.path.insert(0, PROJECT_DIR) import websockets from Core.config_loader import load_config from Core.logger import log from Brain.marcus_brain import init_brain, process_command, get_brain_status, shutdown from API.camera_api import get_frame _net = load_config("Network") _cam = load_config("Camera") HOST = "0.0.0.0" PORT = _net.get("websocket_port", 8765) def _camera_config_payload() -> dict: """Current camera parameters for the `camera_config` client event.""" return { "type": "camera_config", "profile": _cam.get("profile", "default"), "width": _cam.get("width", 424), "height": _cam.get("height", 240), "fps": _cam.get("fps", 15), "jpeg_quality": _cam.get("jpeg_quality", 70), "pipeline_active": True, # camera thread is always on in this build "timestamp": time.strftime("%H:%M:%S"), } connected_clients = set() def _get_interface_ips(): """Get IP addresses for eth0 and wlan0.""" import socket ips = {} for iface in ("eth0", "wlan0"): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(0.1) # This doesn't actually send — just resolves the interface IP import fcntl import struct ip = socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, struct.pack('256s', iface.encode('utf-8')) )[20:24]) ips[iface] = ip s.close() except Exception: pass return ips # ─── WEBSOCKET HANDLER ──────────────────────────────────────────────────────── async def handler(websocket): connected_clients.add(websocket) client = websocket.remote_address print(f"[Server] Client connected: {client}") log(f"Client connected: {client}", "info", "server") # Send status status = get_brain_status() lidar_ok = _check_lidar() await websocket.send(json.dumps({ "type": "status", "model": status["model"], "yolo": status["yolo"], "odometry": status["odometry"], "memory": status["memory"], "camera": status["camera"], "lidar": lidar_ok, "message": "Sanad server ready (full brain)", })) # Also push current camera config so the GUI's Camera tab populates # immediately on connect without needing an explicit get_camera round-trip. await websocket.send(json.dumps(_camera_config_payload())) try: async for message in websocket: data = json.loads(message) msg_type = data.get("type") if msg_type == "command": command = data.get("command", "").strip() if not command: continue ts = time.strftime("%H:%M:%S") print(f"[Nav] Command: '{command}'") # Tell client we're thinking await websocket.send(json.dumps({ "type": "thinking", "command": command, "timestamp": ts, })) # Run command through the full brain (same as run_marcus.py) # This runs in a thread to not block the event loop loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, process_command, command) # Send result back entry = { "type": "decision", "command": command, "action": result.get("action", "NONE"), "speak": result.get("speak", ""), "cmd": result.get("action", "NONE"), "value": "", "elapsed": round(result.get("elapsed", 0), 1), "timestamp": ts, "brain_type": result.get("type", "unknown"), } print(f"[Nav] Result ({entry['elapsed']}s): {entry['action']} — {entry['speak'][:60]}") await websocket.send(json.dumps(entry)) elif msg_type == "capture": frame = get_frame() if frame: await websocket.send(json.dumps({ "type": "capture_result", "ok": True, "data": frame, "timestamp": time.strftime("%H:%M:%S"), })) else: await websocket.send(json.dumps({ "type": "capture_result", "ok": False, "message": "No frame available", })) elif msg_type == "ping": status = get_brain_status() await websocket.send(json.dumps({ "type": "pong", "lidar": _check_lidar(), "status": status, "timestamp": time.strftime("%H:%M:%S"), })) elif msg_type == "get_camera": await websocket.send(json.dumps(_camera_config_payload())) elif msg_type in ("set_camera", "set_resolution"): # The RealSense pipeline is started once at boot with values # from config_Camera.json; hot-switching resolution would # require tearing down and rebuilding the camera thread. # Not implemented today — reply with a clear error so the # GUI surfaces it rather than hanging forever waiting for # a camera_config event that never comes. await websocket.send(json.dumps({ "type": "error", "message": (f"`{msg_type}` is not supported in this build — " "camera parameters are pinned from config_Camera.json " "at startup."), "echo": msg_type, "timestamp": time.strftime("%H:%M:%S"), })) # Still send the current config so the GUI can refresh its UI. await websocket.send(json.dumps(_camera_config_payload())) else: await websocket.send(json.dumps({ "type": "error", "message": f"Unknown message type: {msg_type!r}", "echo": msg_type, "timestamp": time.strftime("%H:%M:%S"), })) except websockets.exceptions.ConnectionClosed: print(f"[Server] Client disconnected: {client}") finally: connected_clients.discard(websocket) # ─── FRAME BROADCAST ───────────────────────────────────────────────────────── async def broadcast_frames(): """Push camera frames to all connected clients at ~10Hz.""" global connected_clients while True: if connected_clients: frame = get_frame() if frame: msg = json.dumps({"type": "frame", "data": frame}) dead = set() for ws in connected_clients.copy(): try: await ws.send(msg) except Exception: dead.add(ws) connected_clients -= dead await asyncio.sleep(0.1) def _check_lidar(): lidar_ip = _net.get("lidar_ip", "192.168.123.120") try: r = subprocess.run(["ping", "-c", "1", "-W", "1", lidar_ip], capture_output=True) return r.returncode == 0 except Exception: return False # ─── MAIN ───────────────────────────────────────────────────────────────────── async def run_server(host: str, port: int): # Initialize the full brain (camera, YOLO, odometry, memory, LLaVA) init_brain() status = get_brain_status() ips = _get_interface_ips() eth_ip = ips.get("eth0", "—") wlan_ip = ips.get("wlan0", "—") print() print("=" * 56) print(" Marcus Server (Full Brain)") print(f" Bind : ws://{host}:{port}") if eth_ip != "—": print(f" eth0 : ws://{eth_ip}:{port}") if wlan_ip != "—": print(f" wlan0 : ws://{wlan_ip}:{port}") print(f" Model : {status['model']}") print(f" YOLO : {'active' if status['yolo'] else 'off'}") print(f" Odometry : {'active' if status['odometry'] else 'off'}") print(f" Memory : {'active' if status['memory'] else 'off'}") print(f" Camera : {status['camera']}") print("=" * 56) log(f"Server starting on ws://{host}:{port}", "info", "server") async with websockets.serve(handler, host, port): print(f"[Server] Listening on ws://{host}:{port}") print("[Server] Waiting for client to connect...") await broadcast_frames() def main(): parser = argparse.ArgumentParser(description="Marcus Server (Full Brain)") parser.add_argument("--host", default=HOST, help=f"Bind address (default: {HOST})") parser.add_argument("--port", type=int, default=PORT, help=f"Port (default: {PORT})") args = parser.parse_args() try: asyncio.run(run_server(args.host, args.port)) except KeyboardInterrupt: pass finally: shutdown() if __name__ == "__main__": main()