# SLAM_engine.py from __future__ import annotations import json import os import subprocess import time import multiprocessing as mp from dataclasses import dataclass from pathlib import Path from typing import Any, Optional from SLAM_Validation import run_startup_self_check def _autodetect_g1_host_ip(default: str = "192.168.123.164") -> str: """ Return the local IPv4 on the G1's 192.168.123.0/24 network. The Livox SDK binds a UDP socket to this address; using the wrong one (e.g. the workstation IP on the Jetson, or vice versa) produces a `bind failed` error storm. Auto-detecting lets the same SLAM_Config.json work on both the Jetson (eth0 = 192.168.123.164) and the workstation (enp3s0 = 192.168.123.222) without manual edits. """ try: out = subprocess.run( ["ip", "-4", "-o", "addr"], capture_output=True, text=True, timeout=2, ).stdout for line in out.splitlines(): for tok in line.split(): if tok.startswith("192.168.123.") and "/" in tok: return tok.split("/")[0] except Exception: pass return default # ------------------------- config loader (jsonl) ------------------------- def load_slam_config() -> dict: """ Loads config from: 1) env SLAM_CONFIG (full path) 2) ./SLAM_Config.json (same folder as this script) """ import json import os from pathlib import Path cfg_path = os.environ.get("SLAM_CONFIG", "").strip() if cfg_path: p = Path(cfg_path) else: p = Path(__file__).resolve().parent / "SLAM_Config.json" if not p.exists(): raise FileNotFoundError(f"Missing config file: {p}") text = p.read_text(encoding="utf-8").strip() if not text: raise RuntimeError("SLAM_Config.json is empty.") return json.loads(text) def _config_base_dir() -> Path: cfg_path = os.environ.get("SLAM_CONFIG", "").strip() if cfg_path: return Path(cfg_path).expanduser().resolve().parent return Path(__file__).resolve().parent def _resolve_from_config_dir(path_value: str | os.PathLike[str]) -> str: p = Path(path_value).expanduser() if p.is_absolute(): return str(p) return str((_config_base_dir() / p).resolve()) # ------------------------- dataclasses ------------------------- @dataclass class EngineConfig: config_file: str host_ip: str max_range: float slam_voxel_size: float pre_downsample_stride: int keyframe_enabled: bool keyframe_min_translation_m: float keyframe_min_rotation_deg: float @dataclass class FilterConfig: voxel_size: float hits_threshold: int window_sec: float strict_sec: float use_strict: bool decay_seconds: float max_voxels: int @dataclass class MapConfig: data_folder: str display_voxel: float save_voxel: float min_points_to_save: int save_extension: str @dataclass class LocalizationConfig: enabled: bool period_sec: float min_new_points: int min_points_for_localize: int voxel_localize: float max_corr_mult: float icp_max_iter: int accept_fitness: float accept_rmse: float ref_display_voxel: float @dataclass class RuntimeConfig: publish_hz: float frame_keep_latest: bool frame_queue_maxsize: int status_queue_maxsize: int cmd_queue_maxsize: int # ------------------------- helpers ------------------------- def _drain_keep_latest(q: mp.Queue) -> None: try: while True: q.get_nowait() except Exception: return def _safe_put(q: mp.Queue, item: Any, keep_latest: bool = False) -> None: try: if keep_latest: _drain_keep_latest(q) q.put_nowait(item) except Exception: pass def build_configs_from_json(cfg: dict) -> tuple[EngineConfig, FilterConfig, MapConfig, LocalizationConfig, RuntimeConfig]: maps_dir = _resolve_from_config_dir(str(cfg["app"]["maps_dir"])) # Resolve the Livox host IP. Accept the literal "auto" (or an env var) so # the same config works on the Jetson and the workstation. Also trigger # auto-detect if the config still holds the old workstation default but # we're clearly not running on the workstation. host_ip_cfg = str(cfg["network"]["default_host_ip"]).strip() env_ip = os.environ.get("LIVOX_HOST_IP", "").strip() if env_ip: resolved_host_ip = env_ip elif host_ip_cfg.lower() == "auto" or not host_ip_cfg: resolved_host_ip = _autodetect_g1_host_ip() else: # If config says 192.168.123.222 but this machine doesn't own that IP, # fall back to autodetect instead of crashing the Livox SDK. detected = _autodetect_g1_host_ip(default=host_ip_cfg) resolved_host_ip = detected if detected != host_ip_cfg else host_ip_cfg eng = EngineConfig( config_file=_resolve_from_config_dir(str(cfg["livox"]["config_file"])), host_ip=resolved_host_ip, max_range=float(cfg["slam"]["max_range"]), slam_voxel_size=float(cfg["slam"]["slam_voxel_size"]), pre_downsample_stride=int(cfg["slam"]["pre_downsample_stride"]), keyframe_enabled=bool(cfg["slam"]["keyframe"]["enabled"]), keyframe_min_translation_m=float(cfg["slam"]["keyframe"]["min_translation_m"]), keyframe_min_rotation_deg=float(cfg["slam"]["keyframe"]["min_rotation_deg"]), ) filt = FilterConfig( voxel_size=float(cfg["filter"]["voxel_size"]), hits_threshold=int(cfg["filter"]["hits_threshold"]), window_sec=float(cfg["filter"]["window_sec"]), strict_sec=float(cfg["filter"]["strict_sec"]), use_strict=bool(cfg["filter"]["use_strict"]), decay_seconds=float(cfg["filter"]["persistence"]["decay_seconds"]), max_voxels=int(cfg["filter"]["persistence"]["max_voxels"]), ) mp_cfg = MapConfig( data_folder=str(maps_dir), display_voxel=float(cfg["map"]["display_voxel"]), save_voxel=float(cfg["map"]["save_voxel"]), min_points_to_save=int(cfg["map"]["min_points_to_save"]), save_extension=str(cfg["map"]["save_extension"]), ) loc = LocalizationConfig( enabled=bool(cfg["localization"]["enabled"]), period_sec=float(cfg["localization"]["period_sec"]), min_new_points=int(cfg["localization"]["min_new_points"]), min_points_for_localize=int(cfg["localization"]["min_points_for_localize"]), voxel_localize=float(cfg["localization"]["voxel_localize"]), max_corr_mult=float(cfg["localization"]["max_corr_mult"]), icp_max_iter=int(cfg["localization"]["icp_max_iter"]), accept_fitness=float(cfg["localization"]["accept_fitness"]), accept_rmse=float(cfg["localization"]["accept_rmse"]), ref_display_voxel=float(cfg["localization"]["ref_display_voxel"]), ) run = RuntimeConfig( publish_hz=float(cfg["runtime"]["publish_hz"]), frame_keep_latest=bool(cfg["runtime"]["queue"]["frame_keep_latest"]), frame_queue_maxsize=int(cfg["runtime"]["queue"]["frame_queue_maxsize"]), status_queue_maxsize=int(cfg["runtime"]["queue"]["status_queue_maxsize"]), cmd_queue_maxsize=int(cfg["runtime"]["queue"]["cmd_queue_maxsize"]), ) return eng, filt, mp_cfg, loc, run # ------------------------- client ------------------------- class SlamEngineClient: def __init__(self): self.cfg_json = load_slam_config() self.self_check = run_startup_self_check(self.cfg_json, _config_base_dir()) self.eng_cfg, self.filt_cfg, self.map_cfg, self.loc_cfg, self.run_cfg = build_configs_from_json(self.cfg_json) ctx = mp.get_context("spawn") self.data_q: mp.Queue = ctx.Queue(maxsize=self.run_cfg.frame_queue_maxsize) self.status_q: mp.Queue = ctx.Queue(maxsize=self.run_cfg.status_queue_maxsize) self.cmd_q: mp.Queue = ctx.Queue(maxsize=self.run_cfg.cmd_queue_maxsize) self.proc: Optional[mp.Process] = None def start_process(self): if self.proc is not None and self.proc.is_alive(): return ctx = mp.get_context("spawn") self.data_q = ctx.Queue(maxsize=self.run_cfg.frame_queue_maxsize) self.status_q = ctx.Queue(maxsize=self.run_cfg.status_queue_maxsize) self.cmd_q = ctx.Queue(maxsize=self.run_cfg.cmd_queue_maxsize) from SLAM_worker import slam_worker self.proc = ctx.Process( target=slam_worker, args=(self.data_q, self.status_q, self.cmd_q, self.eng_cfg, self.filt_cfg, self.map_cfg, self.loc_cfg, self.run_cfg), daemon=True, ) self.proc.start() def stop_process(self): try: self.send("SHUTDOWN") except Exception: pass if self.proc is not None: self.proc.join(timeout=1.0) if self.proc.is_alive(): self.proc.terminate() self.proc.join(timeout=1.0) def send(self, cmd: Any): if self.proc is None or not self.proc.is_alive(): self.start_process() self.cmd_q.put(cmd) def connect(self): self.send("CONNECT") def start_mapping(self): self.send("START") def pause_mapping(self): self.send("PAUSE") def stop_mapping(self): self.send("STOP") def reset_mapping(self): self.send("RESET") def export_map(self, filename_base: str): self.send(("EXPORT", filename_base)) def export_nav(self, filename_base: str): self.send(("EXPORT_NAV", filename_base)) def load_ref_map(self, path: str): self.send(("LOAD_REF", path)) def localize_now(self): self.send("LOCALIZE") def clear_ref(self): self.send("CLEAR_REF") def set_density(self, mode: str): self.send(("SET_DENSITY", str(mode))) def set_min_stable_points(self, value: int): self.send(("SET_MIN_STABLE_POINTS", int(value))) def set_loop_closure(self, enabled: bool): self.send(("SET_LOOP_CLOSURE", bool(enabled))) def set_loc_state_machine(self, enabled: bool): self.send(("SET_LOC_STATE_MACHINE", bool(enabled))) def set_submap_mode(self, enabled: bool, cfg_patch: Optional[dict] = None): payload = {"enabled": bool(enabled)} if isinstance(cfg_patch, dict) and cfg_patch: payload.update(dict(cfg_patch)) self.send(("SET_SUBMAP_MODE", payload)) def set_approx_pose(self, x: float, y: float, z: float = 0.0, yaw_deg: Optional[float] = None): payload = {"x": float(x), "y": float(y), "z": float(z)} if yaw_deg is not None: payload["yaw_deg"] = float(yaw_deg) self.send(("SET_APPROX_POSE", payload)) def clear_approx_pose(self): self.send("CLEAR_APPROX_POSE") def set_autosave(self, enabled: bool, interval_sec: float, base_name: str): self.send( ( "SET_AUTOSAVE", { "enabled": bool(enabled), "interval_sec": float(interval_sec), "base_name": str(base_name), }, ) ) def set_nav_export_cfg(self, cfg_patch: dict): self.send(("SET_NAV_CONFIG", dict(cfg_patch))) def set_nav_runtime_cfg(self, cfg_patch: dict): self.send(("SET_NAV_RUNTIME", dict(cfg_patch))) def set_map_quality_cfg(self, cfg_patch: dict): self.send(("SET_MAP_QUALITY", dict(cfg_patch))) def set_filter_tuning(self, cfg_patch: dict): self.send(("SET_FILTER_TUNING", dict(cfg_patch))) def set_stability_profile(self, profile: str): self.send(("SET_STABILITY_PROFILE", str(profile))) def set_nav_goal(self, x: float, y: float): self.send(("SET_NAV_GOAL", {"x": float(x), "y": float(y)})) def clear_nav_goal(self): self.send("CLEAR_NAV_GOAL") def start_localize_only(self): self.send("START_LOCALIZE_ONLY") def stop_localize_only(self): self.send("STOP_LOCALIZE_ONLY") def record_start(self, base_name: str): self.send(("RECORD_START", {"base_name": str(base_name)})) def record_stop(self, save: bool = True, base_name: str = ""): payload = {"save": bool(save)} if base_name: payload["base_name"] = str(base_name) self.send(("RECORD_STOP", payload)) def mission_start(self, waypoints: list[dict]): self.send(("MISSION_START", {"waypoints": list(waypoints)})) def mission_pause(self): self.send("MISSION_PAUSE") def mission_resume(self): self.send("MISSION_RESUME") def mission_stop(self): self.send("MISSION_STOP") def get_self_check(self) -> dict: return dict(self.self_check) def sensor_prior(self, sensor: str, pose: Any, confidence: float = 1.0, timestamp: Optional[float] = None): self.send( ( "SENSOR_PRIOR", { "sensor": str(sensor), "pose": pose, "confidence": float(confidence), "timestamp": float(time.time()) if timestamp is None else float(timestamp), }, ) )