1037 lines
51 KiB
Python
1037 lines
51 KiB
Python
"""
|
|
Marcus Navigation GUI Client — runs on Workstation
|
|
Tabs: Navigation | Camera | LiDAR
|
|
Start: python marcus_client.py
|
|
Requires: pip install websockets pillow
|
|
|
|
LiDAR/SLAM runs locally on this workstation (same as SLAM_GUI.py).
|
|
"""
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext, filedialog
|
|
import asyncio, threading, json, base64, time, sys, os
|
|
from PIL import Image, ImageTk
|
|
import io
|
|
import numpy as np
|
|
import websockets
|
|
|
|
# Load network defaults from Config/config_Network.json. Falls back to sane
|
|
# hardcoded values if the config isn't reachable (the GUI can run standalone
|
|
# from a workstation that isn't on the same filesystem as the project).
|
|
_PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if _PROJECT_DIR not in sys.path:
|
|
sys.path.insert(0, _PROJECT_DIR)
|
|
try:
|
|
from Core.config_loader import load_config
|
|
_net = load_config("Network")
|
|
JETSON_IP = _net.get("jetson_ip", "192.168.123.164")
|
|
JETSON_PORT = _net.get("websocket_port", 8765)
|
|
except Exception:
|
|
JETSON_IP = "192.168.123.164"
|
|
JETSON_PORT = 8765
|
|
|
|
# ─── SLAM (runs locally on workstation) ───────────────────────────────────────
|
|
SLAM_DIR = os.path.expanduser("~/Robotics_workspace/yslootahtech/G1_Lootah/Lidar")
|
|
if SLAM_DIR not in sys.path:
|
|
sys.path.insert(0, SLAM_DIR)
|
|
|
|
try:
|
|
from SLAM_engine import SlamEngineClient
|
|
SLAM_AVAILABLE = True
|
|
except ImportError:
|
|
SlamEngineClient = None
|
|
SLAM_AVAILABLE = False
|
|
print("[SLAM] Not available — G1_Lootah/Lidar not found or dependencies missing")
|
|
WS_URL = f"ws://{JETSON_IP}:{JETSON_PORT}"
|
|
|
|
# ─── THEME ────────────────────────────────────────────────────────────────────
|
|
BG = "#131C2E"
|
|
BG2 = "#1C2942"
|
|
BG3 = "#243352"
|
|
CARD_BG = "#1E2E4A"
|
|
ACCENT = "#E87722"
|
|
TEAL = "#00838F"
|
|
GREEN = "#2E7D32"
|
|
GREEN_LT = "#66BB6A"
|
|
RED = "#C62828"
|
|
RED_LT = "#EF5350"
|
|
BLUE = "#1565C0"
|
|
PURPLE = "#6A1B9A"
|
|
ORANGE = "#E65100"
|
|
WHITE = "#ECEFF1"
|
|
GRAY = "#78909C"
|
|
DARK = "#0D1520"
|
|
|
|
FONT = ("Segoe UI", 11)
|
|
FONT_SM = ("Segoe UI", 10)
|
|
FONT_LG = ("Segoe UI", 13, "bold")
|
|
FONT_XL = ("Segoe UI", 16, "bold")
|
|
MONO = ("Cascadia Code", 10)
|
|
|
|
# ─── CAMERA ───────────────────────────────────────────────────────────────────
|
|
RESOLUTION_PRESETS = [
|
|
("1080p 15fps", 1920, 1080, 15),
|
|
("720p 30fps", 1280, 720, 30),
|
|
("720p 15fps", 1280, 720, 15),
|
|
("540p 30fps", 960, 540, 30),
|
|
("540p 15fps", 960, 540, 15),
|
|
("480p 60fps", 848, 480, 60),
|
|
("480p 30fps", 640, 480, 30),
|
|
("480p 15fps", 640, 480, 15),
|
|
("240p 30fps", 424, 240, 30),
|
|
]
|
|
|
|
CAMERA_PROFILES = {
|
|
"low": ("LOW", "424x240\n30Hz"),
|
|
"medium": ("MEDIUM", "640x480\n30Hz"),
|
|
"high": ("HIGH", "960x540\n15Hz"),
|
|
"full": ("FULL", "1280x720\n15Hz"),
|
|
}
|
|
|
|
|
|
# ─── SCROLLABLE FRAME HELPER ─────────────────────────────────────────────────
|
|
class ScrollFrame(tk.Frame):
|
|
"""A frame with a vertical scrollbar for content that overflows."""
|
|
def __init__(self, parent, bg=BG, **kw):
|
|
super().__init__(parent, bg=bg, **kw)
|
|
self._canvas = tk.Canvas(self, bg=bg, highlightthickness=0, bd=0)
|
|
self._scrollbar = ttk.Scrollbar(self, orient="vertical", command=self._canvas.yview)
|
|
self.inner = tk.Frame(self._canvas, bg=bg)
|
|
|
|
self.inner.bind("<Configure>", lambda e: self._canvas.configure(scrollregion=self._canvas.bbox("all")))
|
|
self._canvas_window = self._canvas.create_window((0, 0), window=self.inner, anchor="nw")
|
|
self._canvas.configure(yscrollcommand=self._scrollbar.set)
|
|
|
|
self._canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
|
self._scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
|
|
|
self._canvas.bind("<Configure>", self._on_canvas_resize)
|
|
self.inner.bind("<Enter>", self._bind_mousewheel)
|
|
self.inner.bind("<Leave>", self._unbind_mousewheel)
|
|
|
|
def _on_canvas_resize(self, event):
|
|
self._canvas.itemconfig(self._canvas_window, width=event.width)
|
|
|
|
def _bind_mousewheel(self, event):
|
|
self._canvas.bind_all("<MouseWheel>", self._on_mousewheel)
|
|
self._canvas.bind_all("<Button-4>", self._on_mousewheel)
|
|
self._canvas.bind_all("<Button-5>", self._on_mousewheel)
|
|
|
|
def _unbind_mousewheel(self, event):
|
|
self._canvas.unbind_all("<MouseWheel>")
|
|
self._canvas.unbind_all("<Button-4>")
|
|
self._canvas.unbind_all("<Button-5>")
|
|
|
|
def _on_mousewheel(self, event):
|
|
if event.num == 4:
|
|
self._canvas.yview_scroll(-1, "units")
|
|
elif event.num == 5:
|
|
self._canvas.yview_scroll(1, "units")
|
|
else:
|
|
self._canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
|
|
|
|
|
|
# ─── STYLED BUTTON ───────────────────────────────────────────────────────────
|
|
def btn(parent, text, bg_color=BG3, fg_color=WHITE, font=FONT_SM, command=None, **kw):
|
|
return tk.Button(parent, text=text, font=font, bg=bg_color, fg=fg_color,
|
|
activebackground=ACCENT, activeforeground=WHITE,
|
|
relief=tk.FLAT, cursor="hand2", command=command, **kw)
|
|
|
|
|
|
class MarcusGUI:
|
|
def __init__(self, root):
|
|
self.root = root
|
|
self.root.title("Marcus — Navigation Control")
|
|
self.root.configure(bg=BG)
|
|
self.root.geometry("1360x860")
|
|
self.root.minsize(1000, 650)
|
|
|
|
self.ws = None
|
|
self.loop = None
|
|
self.connected = False
|
|
self.ws_thread = None
|
|
self.cmd_history = []
|
|
self.history_idx = -1
|
|
self._cam_profile = "medium"
|
|
self._cam_preview_on = True
|
|
|
|
# SLAM (local)
|
|
self._slam = None # type: SlamEngineClient or None
|
|
self._slam_running = False
|
|
self._map_points = []
|
|
self._map_robot_pos = None
|
|
self._map_goal = None
|
|
self._map_scale = 30.0
|
|
|
|
self._build_ui()
|
|
self._start_ws_thread()
|
|
self._start_slam_poll()
|
|
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
# TOP BAR
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
def _build_ui(self):
|
|
topbar = tk.Frame(self.root, bg=BG, pady=10)
|
|
topbar.pack(fill=tk.X, padx=16)
|
|
|
|
tk.Label(topbar, text="MARCUS", font=("Segoe UI", 20, "bold"),
|
|
fg=ACCENT, bg=BG).pack(side=tk.LEFT)
|
|
tk.Label(topbar, text=" Navigation Control System | YS Lootah Technology",
|
|
font=FONT, fg=GRAY, bg=BG).pack(side=tk.LEFT)
|
|
|
|
self.lbl_conn = tk.Label(topbar, text=" DISCONNECTED", font=FONT_SM, fg=RED_LT, bg=BG)
|
|
self.lbl_conn.pack(side=tk.RIGHT, padx=10)
|
|
self.lbl_lidar = tk.Label(topbar, text="LiDAR: -", font=FONT_SM, fg=GRAY, bg=BG)
|
|
self.lbl_lidar.pack(side=tk.RIGHT, padx=10)
|
|
self.lbl_model = tk.Label(topbar, text="Model: -", font=FONT_SM, fg=GRAY, bg=BG)
|
|
self.lbl_model.pack(side=tk.RIGHT, padx=10)
|
|
|
|
tk.Frame(self.root, bg=ACCENT, height=2).pack(fill=tk.X)
|
|
|
|
# ── Notebook ──
|
|
style = ttk.Style()
|
|
style.theme_use("clam")
|
|
style.configure("M.TNotebook", background=BG, borderwidth=0)
|
|
style.configure("M.TNotebook.Tab", background=BG2, foreground=WHITE,
|
|
font=("Segoe UI", 12, "bold"), padding=[24, 10])
|
|
style.map("M.TNotebook.Tab",
|
|
background=[("selected", TEAL)], foreground=[("selected", WHITE)])
|
|
|
|
self.notebook = ttk.Notebook(self.root, style="M.TNotebook")
|
|
self.notebook.pack(fill=tk.BOTH, expand=True)
|
|
|
|
self.tab_nav = tk.Frame(self.notebook, bg=BG)
|
|
self.tab_camera = tk.Frame(self.notebook, bg=BG)
|
|
self.tab_lidar = tk.Frame(self.notebook, bg=BG)
|
|
|
|
self.notebook.add(self.tab_nav, text=" Navigation ")
|
|
self.notebook.add(self.tab_camera, text=" Camera ")
|
|
self.notebook.add(self.tab_lidar, text=" LiDAR ")
|
|
|
|
self._build_nav_tab()
|
|
self._build_camera_tab()
|
|
self._build_lidar_tab()
|
|
|
|
# ── Bottom ──
|
|
botbar = tk.Frame(self.root, bg=DARK, pady=6)
|
|
botbar.pack(fill=tk.X)
|
|
self.lbl_status = tk.Label(botbar, text=f"Connecting to {JETSON_IP}:{JETSON_PORT}...",
|
|
font=FONT_SM, fg=GRAY, bg=DARK)
|
|
self.lbl_status.pack(side=tk.LEFT, padx=16)
|
|
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
# TAB 1: NAVIGATION
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
def _build_nav_tab(self):
|
|
main = tk.Frame(self.tab_nav, bg=BG)
|
|
main.pack(fill=tk.BOTH, expand=True, padx=16, pady=10)
|
|
|
|
# Left: camera
|
|
left = tk.Frame(main, bg=BG)
|
|
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
|
|
|
tk.Label(left, text="Live Camera - RealSense D435I",
|
|
font=FONT_SM, fg=GRAY, bg=BG).pack(anchor=tk.W, pady=(0, 6))
|
|
|
|
self.canvas = tk.Label(left, bg=DARK, text="Waiting for stream...",
|
|
fg=GRAY, font=FONT)
|
|
self.canvas.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Decision overlay
|
|
action_card = tk.Frame(left, bg=CARD_BG, pady=12, padx=16)
|
|
action_card.pack(fill=tk.X, pady=(10, 0))
|
|
tk.Label(action_card, text="Last Decision", font=FONT_SM, fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
|
|
self.lbl_action = tk.Label(action_card, text="-", font=FONT_XL, fg=ACCENT, bg=CARD_BG)
|
|
self.lbl_action.pack(anchor=tk.W)
|
|
self.lbl_elapsed = tk.Label(action_card, text="", font=FONT_SM, fg=GRAY, bg=CARD_BG)
|
|
self.lbl_elapsed.pack(anchor=tk.W)
|
|
|
|
# Right: controls
|
|
right = tk.Frame(main, bg=BG, width=360)
|
|
right.pack(side=tk.RIGHT, fill=tk.BOTH, padx=(16, 0))
|
|
right.pack_propagate(False)
|
|
|
|
self._section(right, "Navigation Command")
|
|
|
|
cmd_frame = tk.Frame(right, bg=BG)
|
|
cmd_frame.pack(fill=tk.X, pady=(0, 6))
|
|
self.cmd_entry = tk.Entry(cmd_frame, font=FONT, bg=BG3, fg=WHITE,
|
|
insertbackground=WHITE, relief=tk.FLAT, bd=8)
|
|
self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
self.cmd_entry.bind("<Return>", lambda e: self._send_command())
|
|
self.cmd_entry.bind("<Up>", self._history_up)
|
|
self.cmd_entry.bind("<Down>", self._history_down)
|
|
btn(cmd_frame, "Send", ACCENT, command=self._send_command, padx=16, pady=6,
|
|
font=FONT).pack(side=tk.RIGHT, padx=(8, 0))
|
|
|
|
self._section(right, "Quick Commands")
|
|
qf = tk.Frame(right, bg=BG)
|
|
qf.pack(fill=tk.X, pady=(0, 10))
|
|
quick = [("Forward", "move forward slowly"), ("Left", "turn left 30 degrees"),
|
|
("Right", "turn right 30 degrees"), ("Stop", "stop"),
|
|
("Walk", "walk forward"), ("Halt", "halt and wait"),
|
|
("What?", "what do you see"), ("Status", "__status__")]
|
|
for i, (label, cmd) in enumerate(quick):
|
|
btn(qf, label, command=lambda c=cmd: self._quick_cmd(c), pady=6).grid(
|
|
row=i//4, column=i%4, padx=3, pady=3, sticky="nsew")
|
|
for c in range(4): qf.columnconfigure(c, weight=1)
|
|
|
|
self._section(right, "Decision Log")
|
|
self.log = scrolledtext.ScrolledText(right, font=MONO, bg=DARK, fg=WHITE,
|
|
insertbackground=WHITE, relief=tk.FLAT,
|
|
wrap=tk.WORD, state=tk.DISABLED)
|
|
self.log.pack(fill=tk.BOTH, expand=True)
|
|
for tag, color in [("cmd", ACCENT), ("forward", GREEN_LT), ("turn", "#42A5F5"),
|
|
("stop", RED_LT), ("arrived", ACCENT), ("system", GRAY),
|
|
("error", RED_LT), ("think", "#FFD54F")]:
|
|
self.log.tag_config(tag, foreground=color)
|
|
btn(right, "Clear Log", command=self._clear_log, pady=4).pack(anchor=tk.E, pady=(6, 0))
|
|
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
# TAB 2: CAMERA
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
def _build_camera_tab(self):
|
|
main = tk.Frame(self.tab_camera, bg=BG)
|
|
main.pack(fill=tk.BOTH, expand=True, padx=16, pady=10)
|
|
|
|
# Left: preview
|
|
left = tk.Frame(main, bg=BG)
|
|
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
|
|
|
tk.Label(left, text="Camera Preview", font=FONT_LG, fg=WHITE, bg=BG).pack(anchor=tk.W, pady=(0, 6))
|
|
self.cam_preview = tk.Label(left, bg=DARK, text="Camera preview", fg=GRAY, font=FONT)
|
|
self.cam_preview.pack(fill=tk.BOTH, expand=True)
|
|
|
|
info_bar = tk.Frame(left, bg=CARD_BG, pady=8, padx=16)
|
|
info_bar.pack(fill=tk.X, pady=(10, 0))
|
|
self.lbl_cam_info = tk.Label(info_bar, text="Profile: medium | 640x480 @ 30Hz | Pipeline: -",
|
|
font=MONO, fg=GRAY, bg=CARD_BG)
|
|
self.lbl_cam_info.pack(anchor=tk.W)
|
|
|
|
# Right: scrollable controls
|
|
right_scroll = ScrollFrame(main, bg=BG)
|
|
right_scroll.pack(side=tk.RIGHT, fill=tk.BOTH, padx=(16, 0))
|
|
right_scroll.configure(width=400)
|
|
right = right_scroll.inner
|
|
|
|
# Profile buttons
|
|
self._section(right, "Camera Profile")
|
|
pf = tk.Frame(right, bg=BG)
|
|
pf.pack(fill=tk.X, pady=(0, 10))
|
|
self.cam_profile_btns = {}
|
|
for i, (key, (label, desc)) in enumerate(CAMERA_PROFILES.items()):
|
|
b = btn(pf, f"{label}\n{desc}", bg_color=TEAL if key == "medium" else BG3,
|
|
pady=10, command=lambda k=key: self._set_cam_profile(k))
|
|
b.grid(row=0, column=i, padx=4, pady=4, sticky="nsew")
|
|
self.cam_profile_btns[key] = b
|
|
for c in range(len(CAMERA_PROFILES)): pf.columnconfigure(c, weight=1)
|
|
|
|
# Resolution
|
|
self._section(right, "Custom Resolution")
|
|
rf = tk.Frame(right, bg=BG)
|
|
rf.pack(fill=tk.X, pady=(0, 10))
|
|
|
|
tk.Label(rf, text="Preset:", font=FONT_SM, fg=GRAY, bg=BG).grid(row=0, column=0, sticky="w", padx=(0,4))
|
|
self.cam_preset_var = tk.StringVar(value="640x480 @ 30fps")
|
|
ttk.Combobox(rf, textvariable=self.cam_preset_var, state="readonly",
|
|
values=[f"{w}x{h} @ {f}fps" for _, w, h, f in RESOLUTION_PRESETS],
|
|
width=22).grid(row=0, column=1, columnspan=3, padx=4, pady=4, sticky="ew")
|
|
|
|
for r, (lbl, attr) in enumerate([(("W:"), "cam_w"), ("H:", "cam_h"), ("FPS:", "cam_fps")], 1):
|
|
tk.Label(rf, text=lbl, font=FONT_SM, fg=GRAY, bg=BG).grid(row=r, column=0, sticky="w", padx=(0,4))
|
|
e = tk.Entry(rf, font=FONT_SM, bg=BG3, fg=WHITE, width=8, relief=tk.FLAT, bd=6)
|
|
e.grid(row=r, column=1, padx=4, pady=3, sticky="ew")
|
|
setattr(self, f"{attr}_entry", e)
|
|
|
|
self.cam_w_entry.insert(0, "640")
|
|
self.cam_h_entry.insert(0, "480")
|
|
self.cam_fps_entry.insert(0, "30")
|
|
|
|
btn(rf, "Apply", BLUE, command=self._apply_resolution, pady=6).grid(
|
|
row=3, column=2, columnspan=2, padx=4, pady=4, sticky="ew")
|
|
for c in range(4): rf.columnconfigure(c, weight=1)
|
|
|
|
# Actions
|
|
self._section(right, "Camera Actions")
|
|
for label, color, cmd in [("Capture Photo", GREEN, self._cam_capture),
|
|
("Toggle Preview", TEAL, self._cam_toggle_preview),
|
|
("Refresh Status", BG3, self._cam_refresh)]:
|
|
btn(right, label, color, command=cmd, font=FONT, pady=10).pack(fill=tk.X, padx=4, pady=3)
|
|
|
|
# Status log
|
|
self._section(right, "Camera Status")
|
|
self.cam_status_text = scrolledtext.ScrolledText(right, font=MONO, bg=DARK, fg=WHITE,
|
|
relief=tk.FLAT, wrap=tk.WORD,
|
|
state=tk.DISABLED, height=8)
|
|
self.cam_status_text.pack(fill=tk.X, padx=4, pady=(0, 10))
|
|
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
# TAB 3: LiDAR — matches SLAM_GUI.py layout exactly
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
def _build_lidar_tab(self):
|
|
# Colors matching SLAM_GUI.py
|
|
CBG = "#0a0a0a" # card bg (rgba(255,255,255,0.04) on dark)
|
|
CBDR = "#1a1a2e" # card border area
|
|
PILL = "#2b2b2b" # default pill button
|
|
SLAM_ACCENT = "#E87722"
|
|
|
|
main = tk.PanedWindow(self.tab_lidar, orient=tk.HORIZONTAL, bg=BG,
|
|
sashwidth=4, sashrelief=tk.FLAT)
|
|
main.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# ── LEFT: Sidebar (controls) — matches SLAM_GUI sidebar ──
|
|
left_scroll = ScrollFrame(main, bg=CBDR)
|
|
main.add(left_scroll, width=360, minsize=300)
|
|
sb = left_scroll.inner
|
|
sb.configure(bg=CBDR)
|
|
|
|
tk.Label(sb, text="SLAM COMMANDER", font=("Segoe UI", 18, "bold"),
|
|
fg=SLAM_ACCENT, bg=CBDR).pack(anchor=tk.W, padx=14, pady=(14, 10))
|
|
|
|
# --- Card: NETWORK SETTINGS ---
|
|
net = self._slam_card(sb, "NETWORK SETTINGS")
|
|
cf = tk.Frame(net, bg=CBG)
|
|
cf.pack(fill=tk.X, pady=4)
|
|
self.lidar_iface = self._slam_entry(cf, "enp3s0")
|
|
self.lidar_iface.pack(fill=tk.X, pady=2)
|
|
self.lidar_ip = self._slam_entry(cf, "192.168.123.222")
|
|
self.lidar_ip.pack(fill=tk.X, pady=2)
|
|
|
|
r1 = tk.Frame(net, bg=CBG)
|
|
r1.pack(fill=tk.X, pady=4)
|
|
self._pill(r1, "CONNECT", "#1677ff", lambda: self._lidar_cmd("connect")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
|
|
self._pill(r1, "START", "#21a453", lambda: self._lidar_cmd("start")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
|
|
self._pill(r1, "LOCALIZE-ONLY", "#7a4cff", lambda: self._lidar_cmd("localize_only")).pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
|
|
r2 = tk.Frame(net, bg=CBG)
|
|
r2.pack(fill=tk.X, pady=4)
|
|
self._pill(r2, "PAUSE", "#c78a12", lambda: self._lidar_cmd("pause")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
|
|
self._pill(r2, "STOP", "#d0302f", lambda: self._lidar_cmd("stop")).pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
|
|
self._pill(net, "RESET (NO SAVE)", "#3a3a3a", lambda: self._lidar_cmd("reset")).pack(fill=tk.X, pady=4)
|
|
|
|
# --- Card: WORKFLOW ---
|
|
wf = self._slam_card(sb, "WORKFLOW (BEFORE RUN)")
|
|
for lbl, cmd in [("MAP NEW PLACE", "workflow_map_new"),
|
|
("NAVIGATE IN MAPPED PLACE", "workflow_nav_map"),
|
|
("LIVE NAV WITH MAP", "workflow_live_nav"),
|
|
("LIVE NAV (NO MAP)", "workflow_live_nav_nomap"),
|
|
("QUICK DEMO", "workflow_demo")]:
|
|
self._pill(wf, lbl, PILL, lambda c=cmd: self._lidar_cmd(c)).pack(fill=tk.X, pady=2)
|
|
|
|
self.lbl_workflow = tk.Label(wf, text="Workflow: NOT_SELECTED", font=MONO, fg=WHITE, bg=CBG)
|
|
self.lbl_workflow.pack(anchor=tk.W, pady=(6, 0))
|
|
|
|
# --- Card: MAP CONTROLS ---
|
|
mc = self._slam_card(sb, "MAP CONTROLS")
|
|
self.lidar_map_name = self._slam_entry(mc, "Work")
|
|
self.lidar_map_name.pack(fill=tk.X, pady=4)
|
|
|
|
self._pill(mc, "SAVE NOW", PILL, lambda: self._lidar_cmd("save_map")).pack(fill=tk.X, pady=2)
|
|
self._pill(mc, "EXPORT NAV", PILL, lambda: self._lidar_cmd("export_nav")).pack(fill=tk.X, pady=2)
|
|
|
|
# Goal
|
|
tk.Label(mc, text="NAVIGATION GOAL", font=FONT_SM, fg="#ffffff99", bg=CBG).pack(anchor=tk.W, pady=(10, 4))
|
|
gr = tk.Frame(mc, bg=CBG)
|
|
gr.pack(fill=tk.X, pady=2)
|
|
self.lidar_goal_x = self._slam_entry(gr, "0.0", width=8)
|
|
self.lidar_goal_x.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 3))
|
|
self.lidar_goal_y = self._slam_entry(gr, "0.0", width=8)
|
|
self.lidar_goal_y.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
gb = tk.Frame(mc, bg=CBG)
|
|
gb.pack(fill=tk.X, pady=2)
|
|
self._pill(gb, "SET GOAL", PILL, self._lidar_set_goal).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3))
|
|
self._pill(gb, "CLEAR", PILL, lambda: self._lidar_cmd("clear_goal")).pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
|
|
# Mission
|
|
tk.Label(mc, text="MISSION WAYPOINTS", font=FONT_SM, fg="#ffffff99", bg=CBG).pack(anchor=tk.W, pady=(10, 4))
|
|
self.lidar_mission = self._slam_entry(mc, "1.0,0.0; 2.0,0.0")
|
|
self.lidar_mission.pack(fill=tk.X, pady=2)
|
|
mm = tk.Frame(mc, bg=CBG)
|
|
mm.pack(fill=tk.X, pady=2)
|
|
self._pill(mm, "MISSION START", PILL, lambda: self._lidar_cmd("mission_start")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,2))
|
|
for lbl, cmd in [("PAUSE M", "mission_pause"), ("RESUME M", "mission_resume"), ("STOP M", "mission_stop")]:
|
|
self._pill(mm, lbl, PILL, lambda c=cmd: self._lidar_cmd(c)).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(2,0))
|
|
|
|
# Density
|
|
tk.Label(mc, text="POINT DENSITY", font=FONT_SM, fg="#ffffff99", bg=CBG).pack(anchor=tk.W, pady=(10, 4))
|
|
dr = tk.Frame(mc, bg=CBG)
|
|
dr.pack(fill=tk.X, pady=2)
|
|
for lbl, val in [("LOW", "low"), ("MEDIUM", "medium"), ("HIGH", "high")]:
|
|
self._pill(dr, lbl, PILL, lambda v=val: self._lidar_cmd(f"density_{v}")).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=1)
|
|
|
|
# --- Card: LOCALIZATION ---
|
|
loc = self._slam_card(sb, "LOCALIZATION")
|
|
self._pill(loc, "LOAD REF MAP", PILL, lambda: self._lidar_cmd("load_ref")).pack(fill=tk.X, pady=2)
|
|
self._pill(loc, "LOCALIZE NOW", PILL, lambda: self._lidar_cmd("localize_now")).pack(fill=tk.X, pady=2)
|
|
self._pill(loc, "CLEAR REF", PILL, lambda: self._lidar_cmd("clear_ref")).pack(fill=tk.X, pady=2)
|
|
|
|
# ── RIGHT: Map + Status (main view) ──
|
|
right = tk.Frame(main, bg=BG)
|
|
main.add(right, minsize=500)
|
|
|
|
# Map canvas
|
|
self.map_canvas = tk.Canvas(right, bg=DARK, highlightthickness=0)
|
|
self.map_canvas.pack(fill=tk.BOTH, expand=True, padx=8, pady=(8, 4))
|
|
self._map_points = []
|
|
self._map_robot_pos = None
|
|
self._map_goal = None
|
|
self._map_scale = 30.0
|
|
self.map_canvas.bind("<Configure>", lambda e: self._render_map(self._map_points, self._map_robot_pos, self._map_goal))
|
|
|
|
# Status strip
|
|
status_strip = tk.Frame(right, bg=BG)
|
|
status_strip.pack(fill=tk.X, padx=8, pady=(0, 4))
|
|
|
|
self.lidar_cards = {}
|
|
for i, (key, title, default, color) in enumerate([
|
|
("status", "Status", "IDLE", GRAY), ("points", "Points", "0", GRAY),
|
|
("pose", "Position", "0, 0, 0", GRAY), ("fps", "FPS", "- / -", GRAY)]):
|
|
card = tk.Frame(status_strip, bg=CARD_BG, padx=14, pady=8)
|
|
card.grid(row=0, column=i, padx=3, pady=3, sticky="nsew")
|
|
tk.Label(card, text=title, font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
|
|
lbl = tk.Label(card, text=default, font=FONT_LG, fg=color, bg=CARD_BG)
|
|
lbl.pack(anchor=tk.W)
|
|
self.lidar_cards[key] = lbl
|
|
status_strip.columnconfigure(i, weight=1)
|
|
|
|
# Perf strip
|
|
perf_strip = tk.Frame(right, bg=BG)
|
|
perf_strip.pack(fill=tk.X, padx=8, pady=(0, 4))
|
|
self.lidar_perf = {}
|
|
for i, (key, title, default) in enumerate([
|
|
("icp", "ICP Time", "- ms"), ("cpu", "CPU", "-%"),
|
|
("queue", "Queue Lag", "-"), ("growth", "Growth", "- pts/s")]):
|
|
card = tk.Frame(perf_strip, bg=CARD_BG, padx=10, pady=6)
|
|
card.grid(row=0, column=i, padx=3, pady=2, sticky="nsew")
|
|
tk.Label(card, text=title, font=("Segoe UI", 8), fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
|
|
lbl = tk.Label(card, text=default, font=FONT_SM, fg=WHITE, bg=CARD_BG)
|
|
lbl.pack(anchor=tk.W)
|
|
self.lidar_perf[key] = lbl
|
|
perf_strip.columnconfigure(i, weight=1)
|
|
|
|
# Loc + Nav + Log row
|
|
bottom = tk.Frame(right, bg=BG)
|
|
bottom.pack(fill=tk.X, padx=8, pady=(0, 8))
|
|
|
|
loc_card = tk.Frame(bottom, bg=CARD_BG, padx=12, pady=8)
|
|
loc_card.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 3))
|
|
tk.Label(loc_card, text="Localization", font=FONT_SM, fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
|
|
self.lbl_loc_state = tk.Label(loc_card, text="State: -", font=FONT_SM, fg=GRAY, bg=CARD_BG)
|
|
self.lbl_loc_state.pack(anchor=tk.W)
|
|
self.lbl_loc_fitness = tk.Label(loc_card, text="Fitness: - | RMSE: -", font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG)
|
|
self.lbl_loc_fitness.pack(anchor=tk.W)
|
|
self.lbl_loc_match = tk.Label(loc_card, text="Match: -", font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG)
|
|
self.lbl_loc_match.pack(anchor=tk.W)
|
|
|
|
nav_card = tk.Frame(bottom, bg=CARD_BG, padx=12, pady=8)
|
|
nav_card.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(3, 3))
|
|
tk.Label(nav_card, text="Navigation", font=FONT_SM, fg=GRAY, bg=CARD_BG).pack(anchor=tk.W)
|
|
self.lbl_nav_goal = tk.Label(nav_card, text="Goal: -", font=FONT_SM, fg=GRAY, bg=CARD_BG)
|
|
self.lbl_nav_goal.pack(anchor=tk.W)
|
|
self.lbl_nav_cmd = tk.Label(nav_card, text="Cmd: -", font=("Segoe UI", 9), fg=GRAY, bg=CARD_BG)
|
|
self.lbl_nav_cmd.pack(anchor=tk.W)
|
|
|
|
log_card = tk.Frame(bottom, bg=CARD_BG, padx=4, pady=4)
|
|
log_card.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(3, 0))
|
|
self.lidar_log = scrolledtext.ScrolledText(log_card, font=("Cascadia Code", 9), bg=DARK, fg=WHITE,
|
|
relief=tk.FLAT, wrap=tk.WORD,
|
|
state=tk.DISABLED, height=5, width=30)
|
|
self.lidar_log.pack(fill=tk.BOTH, expand=True)
|
|
for tag, c in [("ok", GREEN_LT), ("warn", "#c78a12"), ("err", RED_LT), ("info", GRAY)]:
|
|
self.lidar_log.tag_config(tag, foreground=c)
|
|
|
|
# ─── SLAM_GUI-style widget helpers ─────────────────────────────────────
|
|
def _slam_card(self, parent, title):
|
|
"""Card matching SLAM_GUI.py card_style()."""
|
|
CBG = "#0a0a0a"
|
|
card = tk.Frame(parent, bg=CBG, padx=14, pady=14, highlightbackground="#ffffff10",
|
|
highlightthickness=1)
|
|
card.pack(fill=tk.X, padx=14, pady=6)
|
|
tk.Label(card, text=title, font=("Segoe UI", 11, "bold"), fg="#ffffffc0", bg=CBG).pack(anchor=tk.W, pady=(0, 8))
|
|
return card
|
|
|
|
def _pill(self, parent, text, bg_color, command):
|
|
"""Pill button matching SLAM_GUI.py pill_button()."""
|
|
return tk.Button(parent, text=text, font=("Segoe UI", 10, "bold"),
|
|
bg=bg_color, fg=WHITE, activebackground=ACCENT, activeforeground=WHITE,
|
|
relief=tk.FLAT, cursor="hand2", pady=8, command=command)
|
|
|
|
def _slam_entry(self, parent, default="", width=0):
|
|
"""Entry matching SLAM_GUI.py QLineEdit style."""
|
|
kw = {"width": width} if width else {}
|
|
e = tk.Entry(parent, font=("Segoe UI", 10, "bold"), bg="#00000050", fg=WHITE,
|
|
insertbackground=WHITE, relief=tk.FLAT, bd=8, **kw)
|
|
e.insert(0, default)
|
|
return e
|
|
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
# HELPERS
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
def _section(self, parent, title):
|
|
tk.Label(parent, text=title, font=FONT_LG, fg=WHITE, bg=BG).pack(anchor=tk.W, pady=(12, 2), padx=4)
|
|
tk.Frame(parent, bg=BG3, height=1).pack(fill=tk.X, padx=4, pady=(0, 6))
|
|
|
|
# ─── MAP DRAWING ──────────────────────────────────────────────────────────
|
|
def _draw_map_grid(self):
|
|
c = self.map_canvas
|
|
c.delete("grid")
|
|
w = c.winfo_width() or 600
|
|
h = c.winfo_height() or 400
|
|
cx, cy = w // 2, h // 2
|
|
s = self._map_scale
|
|
# Draw grid lines every meter
|
|
grid_color = "#1A2A40"
|
|
for i in range(-20, 21):
|
|
x = cx + i * s
|
|
c.create_line(x, 0, x, h, fill=grid_color, tags="grid")
|
|
y = cy + i * s
|
|
c.create_line(0, y, w, y, fill=grid_color, tags="grid")
|
|
# Axes
|
|
c.create_line(cx, 0, cx, h, fill="#2A4060", width=1, tags="grid")
|
|
c.create_line(0, cy, w, cy, fill="#2A4060", width=1, tags="grid")
|
|
# Origin label
|
|
c.create_text(cx + 4, cy - 8, text="0,0", fill=GRAY, font=("Segoe UI", 8), anchor="w", tags="grid")
|
|
|
|
def _render_map(self, points=None, robot_pos=None, goal=None):
|
|
"""Render 2D top-down map with point cloud, robot, and goal."""
|
|
c = self.map_canvas
|
|
c.update_idletasks()
|
|
w = c.winfo_width() or 600
|
|
h = c.winfo_height() or 400
|
|
cx, cy = w // 2, h // 2
|
|
s = self._map_scale
|
|
|
|
c.delete("all")
|
|
self._draw_map_grid()
|
|
|
|
# Draw points
|
|
if points:
|
|
for px, py in points:
|
|
sx = cx + px * s
|
|
sy = cy - py * s # flip Y (up = positive)
|
|
if 0 <= sx <= w and 0 <= sy <= h:
|
|
c.create_rectangle(sx, sy, sx + 2, sy + 2, fill="#4FC3F7", outline="", tags="pts")
|
|
|
|
# Draw goal
|
|
if goal:
|
|
gx = cx + goal[0] * s
|
|
gy = cy - goal[1] * s
|
|
c.create_oval(gx - 8, gy - 8, gx + 8, gy + 8, outline="#FF5722", width=2, tags="goal")
|
|
c.create_line(gx - 5, gy, gx + 5, gy, fill="#FF5722", width=2, tags="goal")
|
|
c.create_line(gx, gy - 5, gx, gy + 5, fill="#FF5722", width=2, tags="goal")
|
|
|
|
# Draw robot
|
|
if robot_pos:
|
|
rx = cx + robot_pos[0] * s
|
|
ry = cy - robot_pos[1] * s
|
|
c.create_oval(rx - 6, ry - 6, rx + 6, ry + 6, fill="#66BB6A", outline=WHITE, width=2, tags="robot")
|
|
# Direction indicator (forward = up in robot frame)
|
|
c.create_line(rx, ry, rx, ry - 14, fill="#66BB6A", width=2, arrow=tk.LAST, tags="robot")
|
|
|
|
# Scale indicator
|
|
bar_len = s # 1 meter
|
|
c.create_line(10, h - 15, 10 + bar_len, h - 15, fill=WHITE, width=2, tags="scale")
|
|
c.create_text(10 + bar_len / 2, h - 6, text="1m", fill=GRAY, font=("Segoe UI", 8), tags="scale")
|
|
|
|
def _log(self, text, tag="system"):
|
|
self.log.configure(state=tk.NORMAL)
|
|
self.log.insert(tk.END, text + "\n", tag)
|
|
self.log.see(tk.END)
|
|
self.log.configure(state=tk.DISABLED)
|
|
|
|
def _lidar_log(self, text, tag="info"):
|
|
self.lidar_log.configure(state=tk.NORMAL)
|
|
self.lidar_log.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] {text}\n", tag)
|
|
self.lidar_log.see(tk.END)
|
|
self.lidar_log.configure(state=tk.DISABLED)
|
|
|
|
def _cam_log(self, text):
|
|
self.cam_status_text.configure(state=tk.NORMAL)
|
|
self.cam_status_text.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] {text}\n")
|
|
self.cam_status_text.see(tk.END)
|
|
self.cam_status_text.configure(state=tk.DISABLED)
|
|
|
|
def _clear_log(self):
|
|
self.log.configure(state=tk.NORMAL)
|
|
self.log.delete("1.0", tk.END)
|
|
self.log.configure(state=tk.DISABLED)
|
|
|
|
# ─── NAVIGATION ───────────────────────────────────────────────────────────
|
|
def _send_command(self):
|
|
cmd = self.cmd_entry.get().strip()
|
|
if not cmd or not self.connected: return
|
|
self.cmd_entry.delete(0, tk.END)
|
|
self.cmd_history.insert(0, cmd)
|
|
self.history_idx = -1
|
|
if cmd == "__status__":
|
|
asyncio.run_coroutine_threadsafe(self._ws_send({"type": "ping"}), self.loop)
|
|
return
|
|
self._log(f"\n[{time.strftime('%H:%M:%S')}] You: {cmd}", "cmd")
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._ws_send({"type": "command", "command": cmd}), self.loop)
|
|
|
|
def _quick_cmd(self, cmd):
|
|
self.cmd_entry.delete(0, tk.END)
|
|
self.cmd_entry.insert(0, cmd)
|
|
self._send_command()
|
|
|
|
def _history_up(self, event):
|
|
if self.cmd_history and self.history_idx < len(self.cmd_history) - 1:
|
|
self.history_idx += 1
|
|
self.cmd_entry.delete(0, tk.END)
|
|
self.cmd_entry.insert(0, self.cmd_history[self.history_idx])
|
|
|
|
def _history_down(self, event):
|
|
if self.history_idx > 0:
|
|
self.history_idx -= 1
|
|
self.cmd_entry.delete(0, tk.END)
|
|
self.cmd_entry.insert(0, self.cmd_history[self.history_idx])
|
|
elif self.history_idx == 0:
|
|
self.history_idx = -1
|
|
self.cmd_entry.delete(0, tk.END)
|
|
|
|
# ─── CAMERA ───────────────────────────────────────────────────────────────
|
|
def _set_cam_profile(self, profile):
|
|
self._cam_profile = profile
|
|
for k, b in self.cam_profile_btns.items():
|
|
b.configure(bg=TEAL if k == profile else BG3)
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._ws_send({"type": "set_camera", "profile": profile}), self.loop)
|
|
self._cam_log(f"Switching to profile: {profile}")
|
|
|
|
def _apply_resolution(self):
|
|
try:
|
|
w, h, f = int(self.cam_w_entry.get()), int(self.cam_h_entry.get()), int(self.cam_fps_entry.get())
|
|
except ValueError:
|
|
self._cam_log("Invalid resolution values"); return
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._ws_send({"type": "set_resolution", "width": w, "height": h, "fps": f}), self.loop)
|
|
self._cam_log(f"Applying: {w}x{h} @ {f}fps")
|
|
|
|
def _cam_capture(self):
|
|
asyncio.run_coroutine_threadsafe(self._ws_send({"type": "capture"}), self.loop)
|
|
self._cam_log("Capture requested")
|
|
|
|
def _cam_toggle_preview(self):
|
|
self._cam_preview_on = not self._cam_preview_on
|
|
if not self._cam_preview_on:
|
|
self.cam_preview.configure(image="", text="Preview paused")
|
|
self.cam_preview.image = None
|
|
self._cam_log(f"Preview {'ON' if self._cam_preview_on else 'OFF'}")
|
|
|
|
def _cam_refresh(self):
|
|
asyncio.run_coroutine_threadsafe(self._ws_send({"type": "get_camera"}), self.loop)
|
|
self._cam_log("Refreshing...")
|
|
|
|
# ─── LiDAR (local SLAM engine) ───────────────────────────────────────────
|
|
def _ensure_slam(self) -> bool:
|
|
if not SLAM_AVAILABLE:
|
|
self._lidar_log("SLAM not available — check G1_Lootah/Lidar path", "err")
|
|
return False
|
|
if self._slam is None:
|
|
try:
|
|
self._slam = SlamEngineClient()
|
|
self._lidar_log("SLAM engine created", "ok")
|
|
except Exception as e:
|
|
self._lidar_log(f"SLAM init error: {e}", "err")
|
|
return False
|
|
return True
|
|
|
|
def _lidar_cmd(self, cmd):
|
|
if not self._ensure_slam():
|
|
return
|
|
try:
|
|
if cmd == "connect":
|
|
self._slam.start_process()
|
|
self._slam.connect()
|
|
self._slam_running = True
|
|
self._lidar_log("SLAM started and connected", "ok")
|
|
elif cmd == "start":
|
|
self._slam.start_mapping()
|
|
self._lidar_log("Mapping started", "ok")
|
|
elif cmd == "pause":
|
|
self._slam.pause_mapping()
|
|
self._lidar_log("Mapping paused", "warn")
|
|
elif cmd == "stop":
|
|
self._slam.stop_mapping()
|
|
self._lidar_log("Mapping stopped", "info")
|
|
elif cmd == "reset":
|
|
self._slam.stop_process()
|
|
self._slam_running = False
|
|
self._lidar_log("SLAM reset (no save)", "warn")
|
|
elif cmd == "localize_only":
|
|
self._slam.start_process()
|
|
self._slam.connect()
|
|
self._slam_running = True
|
|
self._lidar_log("Localize-only mode", "ok")
|
|
elif cmd == "save_map":
|
|
name = self.lidar_map_name.get().strip() or "Work"
|
|
self._slam.export_map(name)
|
|
self._lidar_log(f"Map saved: {name}", "ok")
|
|
elif cmd == "export_nav":
|
|
name = self.lidar_map_name.get().strip() or "nav"
|
|
self._slam.export_nav(name)
|
|
self._lidar_log(f"Nav exported: {name}", "ok")
|
|
elif cmd == "load_ref":
|
|
path = filedialog.askopenfilename(title="Select Reference Map",
|
|
filetypes=[("PLY files", "*.ply"), ("All", "*.*")])
|
|
if not path: return
|
|
self._slam.load_ref_map(path)
|
|
self._lidar_log(f"Ref loaded: {path}", "ok")
|
|
elif cmd == "localize_now":
|
|
self._slam.localize_now()
|
|
self._lidar_log("Localization triggered", "info")
|
|
elif cmd == "clear_ref":
|
|
self._slam.clear_ref()
|
|
self._lidar_log("Ref cleared", "info")
|
|
elif cmd == "clear_goal":
|
|
self._slam.clear_nav_goal()
|
|
self._map_goal = None
|
|
self._lidar_log("Goal cleared", "info")
|
|
elif cmd == "mission_start":
|
|
waypoints = []
|
|
for pair in self.lidar_mission.get().strip().split(";"):
|
|
parts = pair.strip().split(",")
|
|
if len(parts) == 2:
|
|
try: waypoints.append({"x": float(parts[0]), "y": float(parts[1])})
|
|
except ValueError: pass
|
|
self._slam.mission_start(waypoints)
|
|
self._lidar_log(f"Mission started ({len(waypoints)} waypoints)", "ok")
|
|
elif cmd == "mission_pause":
|
|
self._slam.mission_pause()
|
|
self._lidar_log("Mission paused", "warn")
|
|
elif cmd == "mission_resume":
|
|
self._slam.mission_resume()
|
|
self._lidar_log("Mission resumed", "ok")
|
|
elif cmd == "mission_stop":
|
|
self._slam.mission_stop()
|
|
self._lidar_log("Mission stopped", "info")
|
|
elif cmd.startswith("density_"):
|
|
mode = cmd.replace("density_", "")
|
|
self._slam.set_density(mode)
|
|
self._lidar_log(f"Density: {mode}", "info")
|
|
elif cmd.startswith("workflow_"):
|
|
self._lidar_log(f"Workflow: {cmd}", "info")
|
|
else:
|
|
self._lidar_log(f"Unknown: {cmd}", "warn")
|
|
except Exception as e:
|
|
self._lidar_log(f"SLAM error: {e}", "err")
|
|
|
|
def _lidar_set_goal(self):
|
|
if not self._ensure_slam(): return
|
|
try:
|
|
x, y = float(self.lidar_goal_x.get()), float(self.lidar_goal_y.get())
|
|
except ValueError:
|
|
self._lidar_log("Invalid coordinates", "err"); return
|
|
try:
|
|
self._slam.set_nav_goal(x, y)
|
|
self._map_goal = (x, y)
|
|
self._lidar_log(f"Goal set: ({x}, {y})", "ok")
|
|
except Exception as e:
|
|
self._lidar_log(f"Goal error: {e}", "err")
|
|
|
|
def _start_slam_poll(self):
|
|
"""Poll SLAM data_q and status_q every 200ms to update map and status."""
|
|
self._poll_slam()
|
|
|
|
def _poll_slam(self):
|
|
if self._slam is not None and self._slam_running:
|
|
try:
|
|
self._drain_slam_data()
|
|
except Exception:
|
|
pass
|
|
self.root.after(200, self._poll_slam)
|
|
|
|
def _drain_slam_data(self):
|
|
"""Drain SLAM queues and update map + status displays."""
|
|
slam = self._slam
|
|
if slam is None:
|
|
return
|
|
|
|
# Drain data_q for frame payloads
|
|
payload = None
|
|
try:
|
|
while not slam.data_q.empty():
|
|
item = slam.data_q.get_nowait()
|
|
if isinstance(item, tuple) and len(item) >= 2 and item[0] == "FRAME":
|
|
payload = item[1]
|
|
except Exception:
|
|
pass
|
|
|
|
# Drain status_q
|
|
try:
|
|
while not slam.status_q.empty():
|
|
msg = slam.status_q.get_nowait()
|
|
if isinstance(msg, tuple) and len(msg) >= 2:
|
|
key, val = msg[0], msg[1]
|
|
if isinstance(val, str):
|
|
self._lidar_log(f"{key}: {val}", "info")
|
|
elif key == "LOCALIZE" and isinstance(val, dict):
|
|
fit = val.get("fitness", 0)
|
|
rmse = val.get("rmse", 0)
|
|
self.lbl_loc_fitness.configure(text=f"Fitness: {fit:.3f} | RMSE: {rmse:.4f}")
|
|
elif key == "LOC_STATE":
|
|
ls = str(val)
|
|
c = GREEN_LT if ls == "TRACKING" else RED_LT if ls == "LOST" else ORANGE
|
|
self.lbl_loc_state.configure(text=f"State: {ls}", fg=c)
|
|
elif key == "ERROR":
|
|
self._lidar_log(f"ERROR: {val}", "err")
|
|
elif key == "INFO":
|
|
if isinstance(val, str):
|
|
self._lidar_log(val, "info")
|
|
except Exception:
|
|
pass
|
|
|
|
if payload is None:
|
|
return
|
|
|
|
# Update status cards
|
|
pts = payload.get("stable_points")
|
|
if pts is not None and isinstance(pts, np.ndarray):
|
|
self.lidar_cards["points"].configure(text=f"{len(pts):,}", fg=WHITE)
|
|
# Downsample for map
|
|
if len(pts) > 0:
|
|
stride = max(1, len(pts) // 2000)
|
|
self._map_points = pts[::stride, :2].tolist()
|
|
|
|
pose = payload.get("pose")
|
|
if pose is not None and isinstance(pose, np.ndarray) and pose.shape == (4, 4):
|
|
x, y, z = float(pose[0, 3]), float(pose[1, 3]), float(pose[2, 3])
|
|
self.lidar_cards["pose"].configure(text=f"{x:.2f}, {y:.2f}, {z:.2f}", fg=WHITE)
|
|
self._map_robot_pos = (x, y)
|
|
|
|
mode = payload.get("mode", "")
|
|
if mode:
|
|
s = str(mode).upper()
|
|
self.lidar_cards["status"].configure(text=s,
|
|
fg=GREEN_LT if s == "MAPPING" else BLUE if "LOCALIZ" in s else ORANGE if s == "PAUSED" else GRAY)
|
|
|
|
perf = payload.get("perf", {})
|
|
if perf:
|
|
self.lidar_cards["fps"].configure(
|
|
text=f"{perf.get('input_fps',0):.0f} / {perf.get('publish_fps',0):.0f}", fg=WHITE)
|
|
self.lidar_perf["icp"].configure(text=f"{perf.get('icp_ms',0):.1f} ms")
|
|
self.lidar_perf["cpu"].configure(text=f"{perf.get('cpu_percent',0):.0f}%")
|
|
self.lidar_perf["queue"].configure(text=str(perf.get("queue_lag", 0)))
|
|
self.lidar_perf["growth"].configure(text=f"{perf.get('stable_growth_per_sec',0):.0f} pts/s")
|
|
|
|
loc_state = payload.get("loc_state", "")
|
|
if loc_state:
|
|
c = GREEN_LT if loc_state == "TRACKING" else RED_LT if loc_state == "LOST" else ORANGE
|
|
self.lbl_loc_state.configure(text=f"State: {loc_state}", fg=c)
|
|
|
|
nav = payload.get("nav", {})
|
|
if isinstance(nav, dict):
|
|
goal = nav.get("goal")
|
|
if goal and isinstance(goal, dict):
|
|
self.lbl_nav_goal.configure(text=f"Goal: ({goal.get('x',0):.2f}, {goal.get('y',0):.2f})", fg=WHITE)
|
|
self._map_goal = (goal.get("x", 0), goal.get("y", 0))
|
|
cmd_info = nav.get("cmd")
|
|
if cmd_info and isinstance(cmd_info, dict):
|
|
self.lbl_nav_cmd.configure(
|
|
text=f"Cmd: lin={cmd_info.get('linear_mps','-')} ang={cmd_info.get('angular_rps','-')}")
|
|
|
|
# Render map
|
|
self._render_map(self._map_points, self._map_robot_pos, self._map_goal)
|
|
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
# WEBSOCKET
|
|
# ═════════════════════════════════════════════════════════════════════════
|
|
def _start_ws_thread(self):
|
|
self.loop = asyncio.new_event_loop()
|
|
self.ws_thread = threading.Thread(target=lambda: (
|
|
asyncio.set_event_loop(self.loop),
|
|
self.loop.run_until_complete(self._ws_connect_loop())), daemon=True)
|
|
self.ws_thread.start()
|
|
|
|
async def _ws_connect_loop(self):
|
|
while True:
|
|
try:
|
|
self._set_status("Connecting...", RED_LT)
|
|
async with websockets.connect(WS_URL) as ws:
|
|
self.ws, self.connected = ws, True
|
|
self._set_status(f"Connected to {JETSON_IP}", GREEN_LT)
|
|
self.root.after(0, lambda: self.lbl_conn.configure(text=" CONNECTED", fg=GREEN_LT))
|
|
await self._ws_receive(ws)
|
|
except Exception as e:
|
|
self.connected, self.ws = False, None
|
|
self._set_status(f"Reconnecting... ({e})", RED_LT)
|
|
self.root.after(0, lambda: self.lbl_conn.configure(text=" DISCONNECTED", fg=RED_LT))
|
|
await asyncio.sleep(3)
|
|
|
|
async def _ws_receive(self, ws):
|
|
async for raw in ws:
|
|
try:
|
|
data = json.loads(raw)
|
|
t = data.get("type")
|
|
if t == "frame":
|
|
self.root.after(0, self._update_frame, data["data"])
|
|
elif t == "status":
|
|
lidar = "ALIVE" if data.get("lidar") else "OFFLINE"
|
|
model = data.get("model", "?")
|
|
self.root.after(0, lambda l=lidar, m=model: (
|
|
self.lbl_lidar.configure(text=f"LiDAR: {l}", fg=GREEN_LT if l == "ALIVE" else RED_LT),
|
|
self.lbl_model.configure(text=f"Model: {m}", fg=TEAL)))
|
|
self._log(f"[{data.get('timestamp','')}] {data.get('message','')}", "system")
|
|
if data.get("camera_config"):
|
|
self.root.after(0, self._update_cam_config, data["camera_config"])
|
|
elif t == "thinking":
|
|
self._log(f" Thinking... ('{data.get('command','')}')", "think")
|
|
self.root.after(0, lambda: self.lbl_action.configure(text="Thinking...", fg="#FFD54F"))
|
|
elif t == "decision":
|
|
a, e, c, ts = data.get("action",""), data.get("elapsed","?"), data.get("cmd",""), data.get("timestamp","")
|
|
tag = "forward" if c=="FORWARD" else "turn" if c in ("LEFT","RIGHT") else "stop" if c=="STOP" else "arrived" if c=="ARRIVED" else "system"
|
|
self._log(f" [{ts}] Decision ({e}s): {a}", tag)
|
|
color = GREEN_LT if c=="FORWARD" else "#42A5F5" if c in ("LEFT","RIGHT") else RED_LT if c=="STOP" else ACCENT
|
|
self.root.after(0, lambda a=a,cl=color,e=e: (
|
|
self.lbl_action.configure(text=a, fg=cl),
|
|
self.lbl_elapsed.configure(text=f"Response: {e}s")))
|
|
elif t == "camera_config":
|
|
self.root.after(0, self._update_cam_config, data)
|
|
elif t == "pong":
|
|
self._log(f" [{data.get('timestamp','')}] Status - LiDAR: {'OK' if data.get('lidar') else 'OFFLINE'}", "system")
|
|
elif t == "error":
|
|
self._log(f" ERROR: {data.get('message','')}", "error")
|
|
except Exception as e:
|
|
self._log(f" Parse error: {e}", "error")
|
|
|
|
async def _ws_send(self, data):
|
|
if self.ws and self.connected:
|
|
try: await self.ws.send(json.dumps(data))
|
|
except Exception as e: self._log(f" Send error: {e}", "error")
|
|
|
|
# ─── UPDATES ──────────────────────────────────────────────────────────────
|
|
def _update_frame(self, b64):
|
|
try:
|
|
img = Image.open(io.BytesIO(base64.b64decode(b64)))
|
|
img_resized = img.resize((640, 480), Image.LANCZOS)
|
|
photo = ImageTk.PhotoImage(img_resized)
|
|
self.canvas.configure(image=photo, text="")
|
|
self.canvas.image = photo
|
|
if self._cam_preview_on:
|
|
p2 = ImageTk.PhotoImage(img_resized)
|
|
self.cam_preview.configure(image=p2, text="")
|
|
self.cam_preview.image = p2
|
|
except Exception as e:
|
|
# Malformed base64 / torn frame / Tk disposed during shutdown —
|
|
# log it so UI freezes become debuggable rather than silent.
|
|
print(f" [Client] _update_frame dropped: {type(e).__name__}: {e}")
|
|
|
|
def _update_cam_config(self, d):
|
|
p, w, h, f = d.get("profile","?"), d.get("width","?"), d.get("height","?"), d.get("fps","?")
|
|
active = d.get("pipeline_active", False)
|
|
self.lbl_cam_info.configure(text=f"Profile: {p} | {w}x{h} @ {f}Hz | Pipeline: {'active' if active else 'stopped'}")
|
|
self._cam_profile = p
|
|
for k, b in self.cam_profile_btns.items():
|
|
b.configure(bg=TEAL if k == p else BG3)
|
|
self._cam_log(f"Camera: {p} ({w}x{h}@{f}) pipeline={'active' if active else 'stopped'}")
|
|
|
|
def _set_status(self, msg, color=GRAY):
|
|
self.root.after(0, lambda: self.lbl_status.configure(text=msg, fg=color))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
root = tk.Tk()
|
|
MarcusGUI(root)
|
|
root.mainloop()
|