Marcus/Client/marcus_client.py

1037 lines
51 KiB
Python

"""
Marcus Navigation GUI Client — runs on Workstation
Tabs: Navigation | Camera | LiDAR
Start: python marcus_client.py
Requires: pip install websockets pillow
LiDAR/SLAM runs locally on this workstation (same as SLAM_GUI.py).
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, filedialog
import asyncio, threading, json, base64, time, sys, os
from PIL import Image, ImageTk
import io
import numpy as np
import websockets
# Load network defaults from Config/config_Network.json. Falls back to sane
# hardcoded values if the config isn't reachable (the GUI can run standalone
# from a workstation that isn't on the same filesystem as the project).
_PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_DIR not in sys.path:
sys.path.insert(0, _PROJECT_DIR)
try:
from Core.config_loader import load_config
_net = load_config("Network")
JETSON_IP = _net.get("jetson_ip", "192.168.123.164")
JETSON_PORT = _net.get("websocket_port", 8765)
except Exception:
JETSON_IP = "192.168.123.164"
JETSON_PORT = 8765
# ─── SLAM (runs locally on workstation) ───────────────────────────────────────
SLAM_DIR = os.path.expanduser("~/Robotics_workspace/yslootahtech/G1_Lootah/Lidar")
if SLAM_DIR not in sys.path:
sys.path.insert(0, SLAM_DIR)
try:
from SLAM_engine import SlamEngineClient
SLAM_AVAILABLE = True
except ImportError:
SlamEngineClient = None
SLAM_AVAILABLE = False
print("[SLAM] Not available — G1_Lootah/Lidar not found or dependencies missing")
WS_URL = f"ws://{JETSON_IP}:{JETSON_PORT}"
# ─── THEME ────────────────────────────────────────────────────────────────────
BG = "#131C2E"
BG2 = "#1C2942"
BG3 = "#243352"
CARD_BG = "#1E2E4A"
ACCENT = "#E87722"
TEAL = "#00838F"
GREEN = "#2E7D32"
GREEN_LT = "#66BB6A"
RED = "#C62828"
RED_LT = "#EF5350"
BLUE = "#1565C0"
PURPLE = "#6A1B9A"
ORANGE = "#E65100"
WHITE = "#ECEFF1"
GRAY = "#78909C"
DARK = "#0D1520"
FONT = ("Segoe UI", 11)
FONT_SM = ("Segoe UI", 10)
FONT_LG = ("Segoe UI", 13, "bold")
FONT_XL = ("Segoe UI", 16, "bold")
MONO = ("Cascadia Code", 10)
# ─── CAMERA ───────────────────────────────────────────────────────────────────
RESOLUTION_PRESETS = [
("1080p 15fps", 1920, 1080, 15),
("720p 30fps", 1280, 720, 30),
("720p 15fps", 1280, 720, 15),
("540p 30fps", 960, 540, 30),
("540p 15fps", 960, 540, 15),
("480p 60fps", 848, 480, 60),
("480p 30fps", 640, 480, 30),
("480p 15fps", 640, 480, 15),
("240p 30fps", 424, 240, 30),
]
CAMERA_PROFILES = {
"low": ("LOW", "424x240\n30Hz"),
"medium": ("MEDIUM", "640x480\n30Hz"),
"high": ("HIGH", "960x540\n15Hz"),
"full": ("FULL", "1280x720\n15Hz"),
}
# ─── SCROLLABLE FRAME HELPER ─────────────────────────────────────────────────
class ScrollFrame(tk.Frame):
"""A frame with a vertical scrollbar for content that overflows."""
def __init__(self, parent, bg=BG, **kw):
super().__init__(parent, bg=bg, **kw)
self._canvas = tk.Canvas(self, bg=bg, highlightthickness=0, bd=0)
self._scrollbar = ttk.Scrollbar(self, orient="vertical", command=self._canvas.yview)
self.inner = tk.Frame(self._canvas, bg=bg)
self.inner.bind("<Configure>", lambda e: self._canvas.configure(scrollregion=self._canvas.bbox("all")))
self._canvas_window = self._canvas.create_window((0, 0), window=self.inner, anchor="nw")
self._canvas.configure(yscrollcommand=self._scrollbar.set)
self._canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self._scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self._canvas.bind("<Configure>", self._on_canvas_resize)
self.inner.bind("<Enter>", self._bind_mousewheel)
self.inner.bind("<Leave>", self._unbind_mousewheel)
def _on_canvas_resize(self, event):
self._canvas.itemconfig(self._canvas_window, width=event.width)
def _bind_mousewheel(self, event):
self._canvas.bind_all("<MouseWheel>", self._on_mousewheel)
self._canvas.bind_all("<Button-4>", self._on_mousewheel)
self._canvas.bind_all("<Button-5>", self._on_mousewheel)
def _unbind_mousewheel(self, event):
self._canvas.unbind_all("<MouseWheel>")
self._canvas.unbind_all("<Button-4>")
self._canvas.unbind_all("<Button-5>")
def _on_mousewheel(self, event):
if event.num == 4:
self._canvas.yview_scroll(-1, "units")
elif event.num == 5:
self._canvas.yview_scroll(1, "units")
else:
self._canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
# ─── STYLED BUTTON ───────────────────────────────────────────────────────────
def btn(parent, text, bg_color=BG3, fg_color=WHITE, font=FONT_SM, command=None, **kw):
return tk.Button(parent, text=text, font=font, bg=bg_color, fg=fg_color,
activebackground=ACCENT, activeforeground=WHITE,
relief=tk.FLAT, cursor="hand2", command=command, **kw)
class MarcusGUI:
def __init__(self, root):
self.root = root
self.root.title("Marcus — Navigation Control")
self.root.configure(bg=BG)
self.root.geometry("1360x860")
self.root.minsize(1000, 650)
self.ws = None
self.loop = None
self.connected = False
self.ws_thread = None
self.cmd_history = []
self.history_idx = -1
self._cam_profile = "medium"
self._cam_preview_on = True
# SLAM (local)
self._slam = None # type: SlamEngineClient or None
self._slam_running = False
self._map_points = []
self._map_robot_pos = None
self._map_goal = None
self._map_scale = 30.0
self._build_ui()
self._start_ws_thread()
self._start_slam_poll()
# ═════════════════════════════════════════════════════════════════════════
# TOP BAR
# ═════════════════════════════════════════════════════════════════════════
def _build_ui(self):
topbar = tk.Frame(self.root, bg=BG, pady=10)
topbar.pack(fill=tk.X, padx=16)
tk.Label(topbar, text="MARCUS", font=("Segoe UI", 20, "bold"),
fg=ACCENT, bg=BG).pack(side=tk.LEFT)
tk.Label(topbar, text=" Navigation Control System | YS Lootah Technology",
font=FONT, fg=GRAY, bg=BG).pack(side=tk.LEFT)
self.lbl_conn = tk.Label(topbar, text=" DISCONNECTED", font=FONT_SM, fg=RED_LT, bg=BG)
self.lbl_conn.pack(side=tk.RIGHT, padx=10)
self.lbl_lidar = tk.Label(topbar, text="LiDAR: -", font=FONT_SM, fg=GRAY, bg=BG)
self.lbl_lidar.pack(side=tk.RIGHT, padx=10)
self.lbl_model = tk.Label(topbar, text="Model: -", font=FONT_SM, fg=GRAY, bg=BG)
self.lbl_model.pack(side=tk.RIGHT, padx=10)
tk.Frame(self.root, bg=ACCENT, height=2).pack(fill=tk.X)
# ── Notebook ──
style = ttk.Style()
style.theme_use("clam")
style.configure("M.TNotebook", background=BG, borderwidth=0)
style.configure("M.TNotebook.Tab", background=BG2, foreground=WHITE,
font=("Segoe UI", 12, "bold"), padding=[24, 10])
style.map("M.TNotebook.Tab",
background=[("selected", TEAL)], foreground=[("selected", WHITE)])
self.notebook = ttk.Notebook(self.root, style="M.TNotebook")
self.notebook.pack(fill=tk.BOTH, expand=True)
self.tab_nav = tk.Frame(self.notebook, bg=BG)
self.tab_camera = tk.Frame(self.notebook, bg=BG)
self.tab_lidar = tk.Frame(self.notebook, bg=BG)
self.notebook.add(self.tab_nav, text=" Navigation ")
self.notebook.add(self.tab_camera, text=" Camera ")
self.notebook.add(self.tab_lidar, text=" LiDAR ")
self._build_nav_tab()
self._build_camera_tab()
self._build_lidar_tab()
# ── Bottom ──
botbar = tk.Frame(self.root, bg=DARK, pady=6)
botbar.pack(fill=tk.X)
self.lbl_status = tk.Label(botbar, text=f"Connecting to {JETSON_IP}:{JETSON_PORT}...",
font=FONT_SM, fg=GRAY, bg=DARK)
self.lbl_status.pack(side=tk.LEFT, padx=16)
# ═════════════════════════════════════════════════════════════════════════
# TAB 1: NAVIGATION
# ═════════════════════════════════════════════════════════════════════════
def _build_nav_tab(self):
main = tk.Frame(self.tab_nav, bg=BG)
main.pack(fill=tk.BOTH, expand=True, padx=16, pady=10)
# Left: camera
left = tk.Frame(main, bg=BG)
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
tk.Label(left, text="Live Camera - RealSense D435I",
font=FONT_SM, fg=GRAY, bg=BG).pack(anchor=tk.W, pady=(0, 6))
self.canvas = tk.Label(left, bg=DARK, text="Waiting for stream...",
fg=GRAY, font=FONT)
self.canvas.pack(fill=tk.BOTH, expand=True)
# Decision overlay
action_card = tk.Frame(left, bg=CARD_BG, pady=12, padx=16)
action_card.pack(fill=tk.X, pady=(10, 0))
tk.Label(action_card, text="Last Decision", font=FONT_SM, fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
self.lbl_action = tk.Label(action_card, text="-", font=FONT_XL, fg=ACCENT, bg=CARD_BG)
self.lbl_action.pack(anchor=tk.W)
self.lbl_elapsed = tk.Label(action_card, text="", font=FONT_SM, fg=GRAY, bg=CARD_BG)
self.lbl_elapsed.pack(anchor=tk.W)
# Right: controls
right = tk.Frame(main, bg=BG, width=360)
right.pack(side=tk.RIGHT, fill=tk.BOTH, padx=(16, 0))
right.pack_propagate(False)
self._section(right, "Navigation Command")
cmd_frame = tk.Frame(right, bg=BG)
cmd_frame.pack(fill=tk.X, pady=(0, 6))
self.cmd_entry = tk.Entry(cmd_frame, font=FONT, bg=BG3, fg=WHITE,
insertbackground=WHITE, relief=tk.FLAT, bd=8)
self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.cmd_entry.bind("<Return>", lambda e: self._send_command())
self.cmd_entry.bind("<Up>", self._history_up)
self.cmd_entry.bind("<Down>", self._history_down)
btn(cmd_frame, "Send", ACCENT, command=self._send_command, padx=16, pady=6,
font=FONT).pack(side=tk.RIGHT, padx=(8, 0))
self._section(right, "Quick Commands")
qf = tk.Frame(right, bg=BG)
qf.pack(fill=tk.X, pady=(0, 10))
quick = [("Forward", "move forward slowly"), ("Left", "turn left 30 degrees"),
("Right", "turn right 30 degrees"), ("Stop", "stop"),
("Walk", "walk forward"), ("Halt", "halt and wait"),
("What?", "what do you see"), ("Status", "__status__")]
for i, (label, cmd) in enumerate(quick):
btn(qf, label, command=lambda c=cmd: self._quick_cmd(c), pady=6).grid(
row=i//4, column=i%4, padx=3, pady=3, sticky="nsew")
for c in range(4): qf.columnconfigure(c, weight=1)
self._section(right, "Decision Log")
self.log = scrolledtext.ScrolledText(right, font=MONO, bg=DARK, fg=WHITE,
insertbackground=WHITE, relief=tk.FLAT,
wrap=tk.WORD, state=tk.DISABLED)
self.log.pack(fill=tk.BOTH, expand=True)
for tag, color in [("cmd", ACCENT), ("forward", GREEN_LT), ("turn", "#42A5F5"),
("stop", RED_LT), ("arrived", ACCENT), ("system", GRAY),
("error", RED_LT), ("think", "#FFD54F")]:
self.log.tag_config(tag, foreground=color)
btn(right, "Clear Log", command=self._clear_log, pady=4).pack(anchor=tk.E, pady=(6, 0))
# ═════════════════════════════════════════════════════════════════════════
# TAB 2: CAMERA
# ═════════════════════════════════════════════════════════════════════════
def _build_camera_tab(self):
main = tk.Frame(self.tab_camera, bg=BG)
main.pack(fill=tk.BOTH, expand=True, padx=16, pady=10)
# Left: preview
left = tk.Frame(main, bg=BG)
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
tk.Label(left, text="Camera Preview", font=FONT_LG, fg=WHITE, bg=BG).pack(anchor=tk.W, pady=(0, 6))
self.cam_preview = tk.Label(left, bg=DARK, text="Camera preview", fg=GRAY, font=FONT)
self.cam_preview.pack(fill=tk.BOTH, expand=True)
info_bar = tk.Frame(left, bg=CARD_BG, pady=8, padx=16)
info_bar.pack(fill=tk.X, pady=(10, 0))
self.lbl_cam_info = tk.Label(info_bar, text="Profile: medium | 640x480 @ 30Hz | Pipeline: -",
font=MONO, fg=GRAY, bg=CARD_BG)
self.lbl_cam_info.pack(anchor=tk.W)
# Right: scrollable controls
right_scroll = ScrollFrame(main, bg=BG)
right_scroll.pack(side=tk.RIGHT, fill=tk.BOTH, padx=(16, 0))
right_scroll.configure(width=400)
right = right_scroll.inner
# Profile buttons
self._section(right, "Camera Profile")
pf = tk.Frame(right, bg=BG)
pf.pack(fill=tk.X, pady=(0, 10))
self.cam_profile_btns = {}
for i, (key, (label, desc)) in enumerate(CAMERA_PROFILES.items()):
b = btn(pf, f"{label}\n{desc}", bg_color=TEAL if key == "medium" else BG3,
pady=10, command=lambda k=key: self._set_cam_profile(k))
b.grid(row=0, column=i, padx=4, pady=4, sticky="nsew")
self.cam_profile_btns[key] = b
for c in range(len(CAMERA_PROFILES)): pf.columnconfigure(c, weight=1)
# Resolution
self._section(right, "Custom Resolution")
rf = tk.Frame(right, bg=BG)
rf.pack(fill=tk.X, pady=(0, 10))
tk.Label(rf, text="Preset:", font=FONT_SM, fg=GRAY, bg=BG).grid(row=0, column=0, sticky="w", padx=(0,4))
self.cam_preset_var = tk.StringVar(value="640x480 @ 30fps")
ttk.Combobox(rf, textvariable=self.cam_preset_var, state="readonly",
values=[f"{w}x{h} @ {f}fps" for _, w, h, f in RESOLUTION_PRESETS],
width=22).grid(row=0, column=1, columnspan=3, padx=4, pady=4, sticky="ew")
for r, (lbl, attr) in enumerate([(("W:"), "cam_w"), ("H:", "cam_h"), ("FPS:", "cam_fps")], 1):
tk.Label(rf, text=lbl, font=FONT_SM, fg=GRAY, bg=BG).grid(row=r, column=0, sticky="w", padx=(0,4))
e = tk.Entry(rf, font=FONT_SM, bg=BG3, fg=WHITE, width=8, relief=tk.FLAT, bd=6)
e.grid(row=r, column=1, padx=4, pady=3, sticky="ew")
setattr(self, f"{attr}_entry", e)
self.cam_w_entry.insert(0, "640")
self.cam_h_entry.insert(0, "480")
self.cam_fps_entry.insert(0, "30")
btn(rf, "Apply", BLUE, command=self._apply_resolution, pady=6).grid(
row=3, column=2, columnspan=2, padx=4, pady=4, sticky="ew")
for c in range(4): rf.columnconfigure(c, weight=1)
# Actions
self._section(right, "Camera Actions")
for label, color, cmd in [("Capture Photo", GREEN, self._cam_capture),
("Toggle Preview", TEAL, self._cam_toggle_preview),
("Refresh Status", BG3, self._cam_refresh)]:
btn(right, label, color, command=cmd, font=FONT, pady=10).pack(fill=tk.X, padx=4, pady=3)
# Status log
self._section(right, "Camera Status")
self.cam_status_text = scrolledtext.ScrolledText(right, font=MONO, bg=DARK, fg=WHITE,
relief=tk.FLAT, wrap=tk.WORD,
state=tk.DISABLED, height=8)
self.cam_status_text.pack(fill=tk.X, padx=4, pady=(0, 10))
# ═════════════════════════════════════════════════════════════════════════
# TAB 3: LiDAR — matches SLAM_GUI.py layout exactly
# ═════════════════════════════════════════════════════════════════════════
def _build_lidar_tab(self):
# Colors matching SLAM_GUI.py
CBG = "#0a0a0a" # card bg (rgba(255,255,255,0.04) on dark)
CBDR = "#1a1a2e" # card border area
PILL = "#2b2b2b" # default pill button
SLAM_ACCENT = "#E87722"
main = tk.PanedWindow(self.tab_lidar, orient=tk.HORIZONTAL, bg=BG,
sashwidth=4, sashrelief=tk.FLAT)
main.pack(fill=tk.BOTH, expand=True)
# ── LEFT: Sidebar (controls) — matches SLAM_GUI sidebar ──
left_scroll = ScrollFrame(main, bg=CBDR)
main.add(left_scroll, width=360, minsize=300)
sb = left_scroll.inner
sb.configure(bg=CBDR)
tk.Label(sb, text="SLAM COMMANDER", font=("Segoe UI", 18, "bold"),
fg=SLAM_ACCENT, bg=CBDR).pack(anchor=tk.W, padx=14, pady=(14, 10))
# --- Card: NETWORK SETTINGS ---
net = self._slam_card(sb, "NETWORK SETTINGS")
cf = tk.Frame(net, bg=CBG)
cf.pack(fill=tk.X, pady=4)
self.lidar_iface = self._slam_entry(cf, "enp3s0")
self.lidar_iface.pack(fill=tk.X, pady=2)
self.lidar_ip = self._slam_entry(cf, "192.168.123.222")
self.lidar_ip.pack(fill=tk.X, pady=2)
r1 = tk.Frame(net, bg=CBG)
r1.pack(fill=tk.X, pady=4)
self._pill(r1, "CONNECT", "#1677ff", lambda: self._lidar_cmd("connect")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
self._pill(r1, "START", "#21a453", lambda: self._lidar_cmd("start")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
self._pill(r1, "LOCALIZE-ONLY", "#7a4cff", lambda: self._lidar_cmd("localize_only")).pack(side=tk.LEFT, fill=tk.X, expand=True)
r2 = tk.Frame(net, bg=CBG)
r2.pack(fill=tk.X, pady=4)
self._pill(r2, "PAUSE", "#c78a12", lambda: self._lidar_cmd("pause")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
self._pill(r2, "STOP", "#d0302f", lambda: self._lidar_cmd("stop")).pack(side=tk.LEFT, fill=tk.X, expand=True)
self._pill(net, "RESET (NO SAVE)", "#3a3a3a", lambda: self._lidar_cmd("reset")).pack(fill=tk.X, pady=4)
# --- Card: WORKFLOW ---
wf = self._slam_card(sb, "WORKFLOW (BEFORE RUN)")
for lbl, cmd in [("MAP NEW PLACE", "workflow_map_new"),
("NAVIGATE IN MAPPED PLACE", "workflow_nav_map"),
("LIVE NAV WITH MAP", "workflow_live_nav"),
("LIVE NAV (NO MAP)", "workflow_live_nav_nomap"),
("QUICK DEMO", "workflow_demo")]:
self._pill(wf, lbl, PILL, lambda c=cmd: self._lidar_cmd(c)).pack(fill=tk.X, pady=2)
self.lbl_workflow = tk.Label(wf, text="Workflow: NOT_SELECTED", font=MONO, fg=WHITE, bg=CBG)
self.lbl_workflow.pack(anchor=tk.W, pady=(6, 0))
# --- Card: MAP CONTROLS ---
mc = self._slam_card(sb, "MAP CONTROLS")
self.lidar_map_name = self._slam_entry(mc, "Work")
self.lidar_map_name.pack(fill=tk.X, pady=4)
self._pill(mc, "SAVE NOW", PILL, lambda: self._lidar_cmd("save_map")).pack(fill=tk.X, pady=2)
self._pill(mc, "EXPORT NAV", PILL, lambda: self._lidar_cmd("export_nav")).pack(fill=tk.X, pady=2)
# Goal
tk.Label(mc, text="NAVIGATION GOAL", font=FONT_SM, fg="#ffffff99", bg=CBG).pack(anchor=tk.W, pady=(10, 4))
gr = tk.Frame(mc, bg=CBG)
gr.pack(fill=tk.X, pady=2)
self.lidar_goal_x = self._slam_entry(gr, "0.0", width=8)
self.lidar_goal_x.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 3))
self.lidar_goal_y = self._slam_entry(gr, "0.0", width=8)
self.lidar_goal_y.pack(side=tk.LEFT, fill=tk.X, expand=True)
gb = tk.Frame(mc, bg=CBG)
gb.pack(fill=tk.X, pady=2)
self._pill(gb, "SET GOAL", PILL, self._lidar_set_goal).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
self._pill(gb, "CLEAR", PILL, lambda: self._lidar_cmd("clear_goal")).pack(side=tk.LEFT, fill=tk.X, expand=True)
# Mission
tk.Label(mc, text="MISSION WAYPOINTS", font=FONT_SM, fg="#ffffff99", bg=CBG).pack(anchor=tk.W, pady=(10, 4))
self.lidar_mission = self._slam_entry(mc, "1.0,0.0; 2.0,0.0")
self.lidar_mission.pack(fill=tk.X, pady=2)
mm = tk.Frame(mc, bg=CBG)
mm.pack(fill=tk.X, pady=2)
self._pill(mm, "MISSION START", PILL, lambda: self._lidar_cmd("mission_start")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,2))
for lbl, cmd in [("PAUSE M", "mission_pause"), ("RESUME M", "mission_resume"), ("STOP M", "mission_stop")]:
self._pill(mm, lbl, PILL, lambda c=cmd: self._lidar_cmd(c)).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(2,0))
# Density
tk.Label(mc, text="POINT DENSITY", font=FONT_SM, fg="#ffffff99", bg=CBG).pack(anchor=tk.W, pady=(10, 4))
dr = tk.Frame(mc, bg=CBG)
dr.pack(fill=tk.X, pady=2)
for lbl, val in [("LOW", "low"), ("MEDIUM", "medium"), ("HIGH", "high")]:
self._pill(dr, lbl, PILL, lambda v=val: self._lidar_cmd(f"density_{v}")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=1)
# --- Card: LOCALIZATION ---
loc = self._slam_card(sb, "LOCALIZATION")
self._pill(loc, "LOAD REF MAP", PILL, lambda: self._lidar_cmd("load_ref")).pack(fill=tk.X, pady=2)
self._pill(loc, "LOCALIZE NOW", PILL, lambda: self._lidar_cmd("localize_now")).pack(fill=tk.X, pady=2)
self._pill(loc, "CLEAR REF", PILL, lambda: self._lidar_cmd("clear_ref")).pack(fill=tk.X, pady=2)
# ── RIGHT: Map + Status (main view) ──
right = tk.Frame(main, bg=BG)
main.add(right, minsize=500)
# Map canvas
self.map_canvas = tk.Canvas(right, bg=DARK, highlightthickness=0)
self.map_canvas.pack(fill=tk.BOTH, expand=True, padx=8, pady=(8, 4))
self._map_points = []
self._map_robot_pos = None
self._map_goal = None
self._map_scale = 30.0
self.map_canvas.bind("<Configure>", lambda e: self._render_map(self._map_points, self._map_robot_pos, self._map_goal))
# Status strip
status_strip = tk.Frame(right, bg=BG)
status_strip.pack(fill=tk.X, padx=8, pady=(0, 4))
self.lidar_cards = {}
for i, (key, title, default, color) in enumerate([
("status", "Status", "IDLE", GRAY), ("points", "Points", "0", GRAY),
("pose", "Position", "0, 0, 0", GRAY), ("fps", "FPS", "- / -", GRAY)]):
card = tk.Frame(status_strip, bg=CARD_BG, padx=14, pady=8)
card.grid(row=0, column=i, padx=3, pady=3, sticky="nsew")
tk.Label(card, text=title, font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
lbl = tk.Label(card, text=default, font=FONT_LG, fg=color, bg=CARD_BG)
lbl.pack(anchor=tk.W)
self.lidar_cards[key] = lbl
status_strip.columnconfigure(i, weight=1)
# Perf strip
perf_strip = tk.Frame(right, bg=BG)
perf_strip.pack(fill=tk.X, padx=8, pady=(0, 4))
self.lidar_perf = {}
for i, (key, title, default) in enumerate([
("icp", "ICP Time", "- ms"), ("cpu", "CPU", "-%"),
("queue", "Queue Lag", "-"), ("growth", "Growth", "- pts/s")]):
card = tk.Frame(perf_strip, bg=CARD_BG, padx=10, pady=6)
card.grid(row=0, column=i, padx=3, pady=2, sticky="nsew")
tk.Label(card, text=title, font=("Segoe UI", 8), fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
lbl = tk.Label(card, text=default, font=FONT_SM, fg=WHITE, bg=CARD_BG)
lbl.pack(anchor=tk.W)
self.lidar_perf[key] = lbl
perf_strip.columnconfigure(i, weight=1)
# Loc + Nav + Log row
bottom = tk.Frame(right, bg=BG)
bottom.pack(fill=tk.X, padx=8, pady=(0, 8))
loc_card = tk.Frame(bottom, bg=CARD_BG, padx=12, pady=8)
loc_card.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 3))
tk.Label(loc_card, text="Localization", font=FONT_SM, fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
self.lbl_loc_state = tk.Label(loc_card, text="State: -", font=FONT_SM, fg=GRAY, bg=CARD_BG)
self.lbl_loc_state.pack(anchor=tk.W)
self.lbl_loc_fitness = tk.Label(loc_card, text="Fitness: - | RMSE: -", font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG)
self.lbl_loc_fitness.pack(anchor=tk.W)
self.lbl_loc_match = tk.Label(loc_card, text="Match: -", font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG)
self.lbl_loc_match.pack(anchor=tk.W)
nav_card = tk.Frame(bottom, bg=CARD_BG, padx=12, pady=8)
nav_card.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(3, 3))
tk.Label(nav_card, text="Navigation", font=FONT_SM, fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
self.lbl_nav_goal = tk.Label(nav_card, text="Goal: -", font=FONT_SM, fg=GRAY, bg=CARD_BG)
self.lbl_nav_goal.pack(anchor=tk.W)
self.lbl_nav_cmd = tk.Label(nav_card, text="Cmd: -", font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG)
self.lbl_nav_cmd.pack(anchor=tk.W)
log_card = tk.Frame(bottom, bg=CARD_BG, padx=4, pady=4)
log_card.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(3, 0))
self.lidar_log = scrolledtext.ScrolledText(log_card, font=("Cascadia Code", 9), bg=DARK, fg=WHITE,
relief=tk.FLAT, wrap=tk.WORD,
state=tk.DISABLED, height=5, width=30)
self.lidar_log.pack(fill=tk.BOTH, expand=True)
for tag, c in [("ok", GREEN_LT), ("warn", "#c78a12"), ("err", RED_LT), ("info", GRAY)]:
self.lidar_log.tag_config(tag, foreground=c)
# ─── SLAM_GUI-style widget helpers ─────────────────────────────────────
def _slam_card(self, parent, title):
"""Card matching SLAM_GUI.py card_style()."""
CBG = "#0a0a0a"
card = tk.Frame(parent, bg=CBG, padx=14, pady=14, highlightbackground="#ffffff10",
highlightthickness=1)
card.pack(fill=tk.X, padx=14, pady=6)
tk.Label(card, text=title, font=("Segoe UI", 11, "bold"), fg="#ffffffc0", bg=CBG).pack(anchor=tk.W, pady=(0, 8))
return card
def _pill(self, parent, text, bg_color, command):
"""Pill button matching SLAM_GUI.py pill_button()."""
return tk.Button(parent, text=text, font=("Segoe UI", 10, "bold"),
bg=bg_color, fg=WHITE, activebackground=ACCENT, activeforeground=WHITE,
relief=tk.FLAT, cursor="hand2", pady=8, command=command)
def _slam_entry(self, parent, default="", width=0):
"""Entry matching SLAM_GUI.py QLineEdit style."""
kw = {"width": width} if width else {}
e = tk.Entry(parent, font=("Segoe UI", 10, "bold"), bg="#00000050", fg=WHITE,
insertbackground=WHITE, relief=tk.FLAT, bd=8, **kw)
e.insert(0, default)
return e
# ═════════════════════════════════════════════════════════════════════════
# HELPERS
# ═════════════════════════════════════════════════════════════════════════
def _section(self, parent, title):
tk.Label(parent, text=title, font=FONT_LG, fg=WHITE, bg=BG).pack(anchor=tk.W, pady=(12, 2), padx=4)
tk.Frame(parent, bg=BG3, height=1).pack(fill=tk.X, padx=4, pady=(0, 6))
# ─── MAP DRAWING ──────────────────────────────────────────────────────────
def _draw_map_grid(self):
c = self.map_canvas
c.delete("grid")
w = c.winfo_width() or 600
h = c.winfo_height() or 400
cx, cy = w // 2, h // 2
s = self._map_scale
# Draw grid lines every meter
grid_color = "#1A2A40"
for i in range(-20, 21):
x = cx + i * s
c.create_line(x, 0, x, h, fill=grid_color, tags="grid")
y = cy + i * s
c.create_line(0, y, w, y, fill=grid_color, tags="grid")
# Axes
c.create_line(cx, 0, cx, h, fill="#2A4060", width=1, tags="grid")
c.create_line(0, cy, w, cy, fill="#2A4060", width=1, tags="grid")
# Origin label
c.create_text(cx + 4, cy - 8, text="0,0", fill=GRAY, font=("Segoe UI", 8), anchor="w", tags="grid")
def _render_map(self, points=None, robot_pos=None, goal=None):
"""Render 2D top-down map with point cloud, robot, and goal."""
c = self.map_canvas
c.update_idletasks()
w = c.winfo_width() or 600
h = c.winfo_height() or 400
cx, cy = w // 2, h // 2
s = self._map_scale
c.delete("all")
self._draw_map_grid()
# Draw points
if points:
for px, py in points:
sx = cx + px * s
sy = cy - py * s # flip Y (up = positive)
if 0 <= sx <= w and 0 <= sy <= h:
c.create_rectangle(sx, sy, sx + 2, sy + 2, fill="#4FC3F7", outline="", tags="pts")
# Draw goal
if goal:
gx = cx + goal[0] * s
gy = cy - goal[1] * s
c.create_oval(gx - 8, gy - 8, gx + 8, gy + 8, outline="#FF5722", width=2, tags="goal")
c.create_line(gx - 5, gy, gx + 5, gy, fill="#FF5722", width=2, tags="goal")
c.create_line(gx, gy - 5, gx, gy + 5, fill="#FF5722", width=2, tags="goal")
# Draw robot
if robot_pos:
rx = cx + robot_pos[0] * s
ry = cy - robot_pos[1] * s
c.create_oval(rx - 6, ry - 6, rx + 6, ry + 6, fill="#66BB6A", outline=WHITE, width=2, tags="robot")
# Direction indicator (forward = up in robot frame)
c.create_line(rx, ry, rx, ry - 14, fill="#66BB6A", width=2, arrow=tk.LAST, tags="robot")
# Scale indicator
bar_len = s # 1 meter
c.create_line(10, h - 15, 10 + bar_len, h - 15, fill=WHITE, width=2, tags="scale")
c.create_text(10 + bar_len / 2, h - 6, text="1m", fill=GRAY, font=("Segoe UI", 8), tags="scale")
def _log(self, text, tag="system"):
self.log.configure(state=tk.NORMAL)
self.log.insert(tk.END, text + "\n", tag)
self.log.see(tk.END)
self.log.configure(state=tk.DISABLED)
def _lidar_log(self, text, tag="info"):
self.lidar_log.configure(state=tk.NORMAL)
self.lidar_log.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] {text}\n", tag)
self.lidar_log.see(tk.END)
self.lidar_log.configure(state=tk.DISABLED)
def _cam_log(self, text):
self.cam_status_text.configure(state=tk.NORMAL)
self.cam_status_text.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] {text}\n")
self.cam_status_text.see(tk.END)
self.cam_status_text.configure(state=tk.DISABLED)
def _clear_log(self):
self.log.configure(state=tk.NORMAL)
self.log.delete("1.0", tk.END)
self.log.configure(state=tk.DISABLED)
# ─── NAVIGATION ───────────────────────────────────────────────────────────
def _send_command(self):
cmd = self.cmd_entry.get().strip()
if not cmd or not self.connected: return
self.cmd_entry.delete(0, tk.END)
self.cmd_history.insert(0, cmd)
self.history_idx = -1
if cmd == "__status__":
asyncio.run_coroutine_threadsafe(self._ws_send({"type": "ping"}), self.loop)
return
self._log(f"\n[{time.strftime('%H:%M:%S')}] You: {cmd}", "cmd")
asyncio.run_coroutine_threadsafe(
self._ws_send({"type": "command", "command": cmd}), self.loop)
def _quick_cmd(self, cmd):
self.cmd_entry.delete(0, tk.END)
self.cmd_entry.insert(0, cmd)
self._send_command()
def _history_up(self, event):
if self.cmd_history and self.history_idx < len(self.cmd_history) - 1:
self.history_idx += 1
self.cmd_entry.delete(0, tk.END)
self.cmd_entry.insert(0, self.cmd_history[self.history_idx])
def _history_down(self, event):
if self.history_idx > 0:
self.history_idx -= 1
self.cmd_entry.delete(0, tk.END)
self.cmd_entry.insert(0, self.cmd_history[self.history_idx])
elif self.history_idx == 0:
self.history_idx = -1
self.cmd_entry.delete(0, tk.END)
# ─── CAMERA ───────────────────────────────────────────────────────────────
def _set_cam_profile(self, profile):
self._cam_profile = profile
for k, b in self.cam_profile_btns.items():
b.configure(bg=TEAL if k == profile else BG3)
asyncio.run_coroutine_threadsafe(
self._ws_send({"type": "set_camera", "profile": profile}), self.loop)
self._cam_log(f"Switching to profile: {profile}")
def _apply_resolution(self):
try:
w, h, f = int(self.cam_w_entry.get()), int(self.cam_h_entry.get()), int(self.cam_fps_entry.get())
except ValueError:
self._cam_log("Invalid resolution values"); return
asyncio.run_coroutine_threadsafe(
self._ws_send({"type": "set_resolution", "width": w, "height": h, "fps": f}), self.loop)
self._cam_log(f"Applying: {w}x{h} @ {f}fps")
def _cam_capture(self):
asyncio.run_coroutine_threadsafe(self._ws_send({"type": "capture"}), self.loop)
self._cam_log("Capture requested")
def _cam_toggle_preview(self):
self._cam_preview_on = not self._cam_preview_on
if not self._cam_preview_on:
self.cam_preview.configure(image="", text="Preview paused")
self.cam_preview.image = None
self._cam_log(f"Preview {'ON' if self._cam_preview_on else 'OFF'}")
def _cam_refresh(self):
asyncio.run_coroutine_threadsafe(self._ws_send({"type": "get_camera"}), self.loop)
self._cam_log("Refreshing...")
# ─── LiDAR (local SLAM engine) ───────────────────────────────────────────
def _ensure_slam(self) -> bool:
if not SLAM_AVAILABLE:
self._lidar_log("SLAM not available — check G1_Lootah/Lidar path", "err")
return False
if self._slam is None:
try:
self._slam = SlamEngineClient()
self._lidar_log("SLAM engine created", "ok")
except Exception as e:
self._lidar_log(f"SLAM init error: {e}", "err")
return False
return True
def _lidar_cmd(self, cmd):
if not self._ensure_slam():
return
try:
if cmd == "connect":
self._slam.start_process()
self._slam.connect()
self._slam_running = True
self._lidar_log("SLAM started and connected", "ok")
elif cmd == "start":
self._slam.start_mapping()
self._lidar_log("Mapping started", "ok")
elif cmd == "pause":
self._slam.pause_mapping()
self._lidar_log("Mapping paused", "warn")
elif cmd == "stop":
self._slam.stop_mapping()
self._lidar_log("Mapping stopped", "info")
elif cmd == "reset":
self._slam.stop_process()
self._slam_running = False
self._lidar_log("SLAM reset (no save)", "warn")
elif cmd == "localize_only":
self._slam.start_process()
self._slam.connect()
self._slam_running = True
self._lidar_log("Localize-only mode", "ok")
elif cmd == "save_map":
name = self.lidar_map_name.get().strip() or "Work"
self._slam.export_map(name)
self._lidar_log(f"Map saved: {name}", "ok")
elif cmd == "export_nav":
name = self.lidar_map_name.get().strip() or "nav"
self._slam.export_nav(name)
self._lidar_log(f"Nav exported: {name}", "ok")
elif cmd == "load_ref":
path = filedialog.askopenfilename(title="Select Reference Map",
filetypes=[("PLY files", "*.ply"), ("All", "*.*")])
if not path: return
self._slam.load_ref_map(path)
self._lidar_log(f"Ref loaded: {path}", "ok")
elif cmd == "localize_now":
self._slam.localize_now()
self._lidar_log("Localization triggered", "info")
elif cmd == "clear_ref":
self._slam.clear_ref()
self._lidar_log("Ref cleared", "info")
elif cmd == "clear_goal":
self._slam.clear_nav_goal()
self._map_goal = None
self._lidar_log("Goal cleared", "info")
elif cmd == "mission_start":
waypoints = []
for pair in self.lidar_mission.get().strip().split(";"):
parts = pair.strip().split(",")
if len(parts) == 2:
try: waypoints.append({"x": float(parts[0]), "y": float(parts[1])})
except ValueError: pass
self._slam.mission_start(waypoints)
self._lidar_log(f"Mission started ({len(waypoints)} waypoints)", "ok")
elif cmd == "mission_pause":
self._slam.mission_pause()
self._lidar_log("Mission paused", "warn")
elif cmd == "mission_resume":
self._slam.mission_resume()
self._lidar_log("Mission resumed", "ok")
elif cmd == "mission_stop":
self._slam.mission_stop()
self._lidar_log("Mission stopped", "info")
elif cmd.startswith("density_"):
mode = cmd.replace("density_", "")
self._slam.set_density(mode)
self._lidar_log(f"Density: {mode}", "info")
elif cmd.startswith("workflow_"):
self._lidar_log(f"Workflow: {cmd}", "info")
else:
self._lidar_log(f"Unknown: {cmd}", "warn")
except Exception as e:
self._lidar_log(f"SLAM error: {e}", "err")
def _lidar_set_goal(self):
if not self._ensure_slam(): return
try:
x, y = float(self.lidar_goal_x.get()), float(self.lidar_goal_y.get())
except ValueError:
self._lidar_log("Invalid coordinates", "err"); return
try:
self._slam.set_nav_goal(x, y)
self._map_goal = (x, y)
self._lidar_log(f"Goal set: ({x}, {y})", "ok")
except Exception as e:
self._lidar_log(f"Goal error: {e}", "err")
def _start_slam_poll(self):
"""Poll SLAM data_q and status_q every 200ms to update map and status."""
self._poll_slam()
def _poll_slam(self):
if self._slam is not None and self._slam_running:
try:
self._drain_slam_data()
except Exception:
pass
self.root.after(200, self._poll_slam)
def _drain_slam_data(self):
"""Drain SLAM queues and update map + status displays."""
slam = self._slam
if slam is None:
return
# Drain data_q for frame payloads
payload = None
try:
while not slam.data_q.empty():
item = slam.data_q.get_nowait()
if isinstance(item, tuple) and len(item) >= 2 and item[0] == "FRAME":
payload = item[1]
except Exception:
pass
# Drain status_q
try:
while not slam.status_q.empty():
msg = slam.status_q.get_nowait()
if isinstance(msg, tuple) and len(msg) >= 2:
key, val = msg[0], msg[1]
if isinstance(val, str):
self._lidar_log(f"{key}: {val}", "info")
elif key == "LOCALIZE" and isinstance(val, dict):
fit = val.get("fitness", 0)
rmse = val.get("rmse", 0)
self.lbl_loc_fitness.configure(text=f"Fitness: {fit:.3f} | RMSE: {rmse:.4f}")
elif key == "LOC_STATE":
ls = str(val)
c = GREEN_LT if ls == "TRACKING" else RED_LT if ls == "LOST" else ORANGE
self.lbl_loc_state.configure(text=f"State: {ls}", fg=c)
elif key == "ERROR":
self._lidar_log(f"ERROR: {val}", "err")
elif key == "INFO":
if isinstance(val, str):
self._lidar_log(val, "info")
except Exception:
pass
if payload is None:
return
# Update status cards
pts = payload.get("stable_points")
if pts is not None and isinstance(pts, np.ndarray):
self.lidar_cards["points"].configure(text=f"{len(pts):,}", fg=WHITE)
# Downsample for map
if len(pts) > 0:
stride = max(1, len(pts) // 2000)
self._map_points = pts[::stride, :2].tolist()
pose = payload.get("pose")
if pose is not None and isinstance(pose, np.ndarray) and pose.shape == (4, 4):
x, y, z = float(pose[0, 3]), float(pose[1, 3]), float(pose[2, 3])
self.lidar_cards["pose"].configure(text=f"{x:.2f}, {y:.2f}, {z:.2f}", fg=WHITE)
self._map_robot_pos = (x, y)
mode = payload.get("mode", "")
if mode:
s = str(mode).upper()
self.lidar_cards["status"].configure(text=s,
fg=GREEN_LT if s == "MAPPING" else BLUE if "LOCALIZ" in s else ORANGE if s == "PAUSED" else GRAY)
perf = payload.get("perf", {})
if perf:
self.lidar_cards["fps"].configure(
text=f"{perf.get('input_fps',0):.0f} / {perf.get('publish_fps',0):.0f}", fg=WHITE)
self.lidar_perf["icp"].configure(text=f"{perf.get('icp_ms',0):.1f} ms")
self.lidar_perf["cpu"].configure(text=f"{perf.get('cpu_percent',0):.0f}%")
self.lidar_perf["queue"].configure(text=str(perf.get("queue_lag", 0)))
self.lidar_perf["growth"].configure(text=f"{perf.get('stable_growth_per_sec',0):.0f} pts/s")
loc_state = payload.get("loc_state", "")
if loc_state:
c = GREEN_LT if loc_state == "TRACKING" else RED_LT if loc_state == "LOST" else ORANGE
self.lbl_loc_state.configure(text=f"State: {loc_state}", fg=c)
nav = payload.get("nav", {})
if isinstance(nav, dict):
goal = nav.get("goal")
if goal and isinstance(goal, dict):
self.lbl_nav_goal.configure(text=f"Goal: ({goal.get('x',0):.2f}, {goal.get('y',0):.2f})", fg=WHITE)
self._map_goal = (goal.get("x", 0), goal.get("y", 0))
cmd_info = nav.get("cmd")
if cmd_info and isinstance(cmd_info, dict):
self.lbl_nav_cmd.configure(
text=f"Cmd: lin={cmd_info.get('linear_mps','-')} ang={cmd_info.get('angular_rps','-')}")
# Render map
self._render_map(self._map_points, self._map_robot_pos, self._map_goal)
# ═════════════════════════════════════════════════════════════════════════
# WEBSOCKET
# ═════════════════════════════════════════════════════════════════════════
def _start_ws_thread(self):
self.loop = asyncio.new_event_loop()
self.ws_thread = threading.Thread(target=lambda: (
asyncio.set_event_loop(self.loop),
self.loop.run_until_complete(self._ws_connect_loop())), daemon=True)
self.ws_thread.start()
async def _ws_connect_loop(self):
while True:
try:
self._set_status("Connecting...", RED_LT)
async with websockets.connect(WS_URL) as ws:
self.ws, self.connected = ws, True
self._set_status(f"Connected to {JETSON_IP}", GREEN_LT)
self.root.after(0, lambda: self.lbl_conn.configure(text=" CONNECTED", fg=GREEN_LT))
await self._ws_receive(ws)
except Exception as e:
self.connected, self.ws = False, None
self._set_status(f"Reconnecting... ({e})", RED_LT)
self.root.after(0, lambda: self.lbl_conn.configure(text=" DISCONNECTED", fg=RED_LT))
await asyncio.sleep(3)
async def _ws_receive(self, ws):
async for raw in ws:
try:
data = json.loads(raw)
t = data.get("type")
if t == "frame":
self.root.after(0, self._update_frame, data["data"])
elif t == "status":
lidar = "ALIVE" if data.get("lidar") else "OFFLINE"
model = data.get("model", "?")
self.root.after(0, lambda l=lidar, m=model: (
self.lbl_lidar.configure(text=f"LiDAR: {l}", fg=GREEN_LT if l == "ALIVE" else RED_LT),
self.lbl_model.configure(text=f"Model: {m}", fg=TEAL)))
self._log(f"[{data.get('timestamp','')}] {data.get('message','')}", "system")
if data.get("camera_config"):
self.root.after(0, self._update_cam_config, data["camera_config"])
elif t == "thinking":
self._log(f" Thinking... ('{data.get('command','')}')", "think")
self.root.after(0, lambda: self.lbl_action.configure(text="Thinking...", fg="#FFD54F"))
elif t == "decision":
a, e, c, ts = data.get("action",""), data.get("elapsed","?"), data.get("cmd",""), data.get("timestamp","")
tag = "forward" if c=="FORWARD" else "turn" if c in ("LEFT","RIGHT") else "stop" if c=="STOP" else "arrived" if c=="ARRIVED" else "system"
self._log(f" [{ts}] Decision ({e}s): {a}", tag)
color = GREEN_LT if c=="FORWARD" else "#42A5F5" if c in ("LEFT","RIGHT") else RED_LT if c=="STOP" else ACCENT
self.root.after(0, lambda a=a,cl=color,e=e: (
self.lbl_action.configure(text=a, fg=cl),
self.lbl_elapsed.configure(text=f"Response: {e}s")))
elif t == "camera_config":
self.root.after(0, self._update_cam_config, data)
elif t == "pong":
self._log(f" [{data.get('timestamp','')}] Status - LiDAR: {'OK' if data.get('lidar') else 'OFFLINE'}", "system")
elif t == "error":
self._log(f" ERROR: {data.get('message','')}", "error")
except Exception as e:
self._log(f" Parse error: {e}", "error")
async def _ws_send(self, data):
if self.ws and self.connected:
try: await self.ws.send(json.dumps(data))
except Exception as e: self._log(f" Send error: {e}", "error")
# ─── UPDATES ──────────────────────────────────────────────────────────────
def _update_frame(self, b64):
try:
img = Image.open(io.BytesIO(base64.b64decode(b64)))
img_resized = img.resize((640, 480), Image.LANCZOS)
photo = ImageTk.PhotoImage(img_resized)
self.canvas.configure(image=photo, text="")
self.canvas.image = photo
if self._cam_preview_on:
p2 = ImageTk.PhotoImage(img_resized)
self.cam_preview.configure(image=p2, text="")
self.cam_preview.image = p2
except Exception as e:
# Malformed base64 / torn frame / Tk disposed during shutdown —
# log it so UI freezes become debuggable rather than silent.
print(f" [Client] _update_frame dropped: {type(e).__name__}: {e}")
def _update_cam_config(self, d):
p, w, h, f = d.get("profile","?"), d.get("width","?"), d.get("height","?"), d.get("fps","?")
active = d.get("pipeline_active", False)
self.lbl_cam_info.configure(text=f"Profile: {p} | {w}x{h} @ {f}Hz | Pipeline: {'active' if active else 'stopped'}")
self._cam_profile = p
for k, b in self.cam_profile_btns.items():
b.configure(bg=TEAL if k == p else BG3)
self._cam_log(f"Camera: {p} ({w}x{h}@{f}) pipeline={'active' if active else 'stopped'}")
def _set_status(self, msg, color=GRAY):
self.root.after(0, lambda: self.lbl_status.configure(text=msg, fg=color))
if __name__ == "__main__":
root = tk.Tk()
MarcusGUI(root)
root.mainloop()