Marcus/Lidar/SLAM_worker.py

3829 lines
178 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SLAM_worker.py
from __future__ import annotations
import time
import queue
import traceback
import threading
import multiprocessing as mp
from collections import deque
from pathlib import Path
from typing import Optional, Any, Dict, List, Tuple
import numpy as np
from SLAM_engine import (
EngineConfig,
FilterConfig,
MapConfig,
LocalizationConfig,
RuntimeConfig,
load_slam_config,
)
from SLAM_Validation import run_startup_self_check
def _now() -> float:
return time.time()
def _as_bool(v: Any, default: bool = False) -> bool:
if v is None:
return default
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return bool(v)
return str(v).strip().lower() in ("1", "true", "yes", "on")
def _drain_keep_latest(q: mp.Queue) -> None:
try:
while True:
q.get_nowait()
except Exception:
return
def _safe_put(q: mp.Queue, item: Any, keep_latest: bool = False) -> None:
try:
if keep_latest:
_drain_keep_latest(q)
q.put_nowait(item)
except Exception:
pass
def slam_worker(
data_q: mp.Queue,
status_q: mp.Queue,
cmd_q: mp.Queue,
eng_cfg: EngineConfig,
filt_cfg: FilterConfig,
map_cfg: MapConfig,
loc_cfg: LocalizationConfig,
run_cfg: RuntimeConfig,
):
# ─────────────────────────────────────────────────────────────────────
# Redirect this subprocess's stdout AND stderr to logs/lidar_sdk.log.
# The Livox C++ SDK writes its `[console] [error] ...` stream to both
# streams, and its `bind failed` / `Create socket ...` messages go to
# stdout. Without this dup2, a disconnected or misconfigured LiDAR
# floods Marcus's terminal with thousands of lines/second. The worker
# communicates status upward through the multiprocessing queues, so
# losing stdout/stderr here is fine — nothing the operator needs to
# see is lost.
# ─────────────────────────────────────────────────────────────────────
import os as _os, sys as _sys
try:
_log_dir = _os.path.join(
_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))),
"logs",
)
_os.makedirs(_log_dir, exist_ok=True)
_err_path = _os.path.join(_log_dir, "lidar_sdk.log")
_fd = _os.open(_err_path, _os.O_WRONLY | _os.O_CREAT | _os.O_APPEND, 0o644)
_os.dup2(_fd, 1) # replace stdout FD so C++ printf/puts are captured
_os.dup2(_fd, 2) # replace stderr FD for C++ std::cerr / spdlog
_sys.stdout = _os.fdopen(1, "w", buffering=1)
_sys.stderr = _os.fdopen(2, "w", buffering=1)
except Exception:
pass # never crash just because the log redirect failed
# suppress noisy warning from ctypes->numpy conversion
import warnings
warnings.filterwarnings(
"ignore",
message="A builtin ctypes object gave a PEP3118 format string",
category=RuntimeWarning,
)
try:
from livox2_python import Livox2
from kiss_icp.config import load_config
from kiss_icp.pipeline import KissICP
from SLAM_Diagnostics import WorkerDiagnostics
from SLAM_LocalizationService import LocalizationFrameState
from SLAM_Transforms import (
apply_transform_points as _apply_transform_points,
blend_rigid_tf as _blend_rigid_tf_impl,
tf_delta as _tf_delta_impl,
tf_from_xyzyaw as _tf_from_xyzyaw_impl,
to_world_points as _to_world_points,
yaw_deg_from_tf as _yaw_deg_from_tf_impl,
)
from SLAM_Filter import VoxelPersistenceFilter
from SLAM_Filter import FilterConfig as VFilterCfg
from SLAM_Filter import IndoorMapQualityConfig, IndoorMapQualityFilter
from SLAM_MAP import StableMapLayer
from SLAM_MAP import MapConfig as StableMapCfg
from SLAM_LoopClosure import LoopClosureConfig, LoopClosureBackend
from SLAM_PlaceRecognition import PlaceRecognitionConfig, PlaceRecognitionIndex
from SLAM_StateMachine import LocalizationStateConfig, LocalizationStateMachine
from SLAM_Submap import SubmapConfig, LocalGlobalSubmapMapper, SubmapCheckpointer
from SLAM_Session import SessionMemoryConfig, SessionTransformStore
from SLAM_Navigation import NavigationExportConfig, NavigationExporter
from SLAM_Fusion import FusionConfig as SensorFusionConfig, SensorPoseFusion
from SLAM_NavRuntime import (
LiveCostmapConfig,
LiveCostmapRuntime,
GlobalAStarPlanner,
LocalPlannerConfig,
LocalReactivePlanner,
)
from SLAM_Mission import MissionConfig, WaypointMissionManager
from SLAM_Safety import SafetyConfig, SafetySupervisor
except Exception as e:
_safe_put(status_q, ("ERROR", f"Import failed: {e}"))
return
def st(level: str, msg: Any):
_safe_put(status_q, (level, msg))
Path(map_cfg.data_folder).mkdir(parents=True, exist_ok=True)
diagnostics = None
try:
fault_log_path = Path(map_cfg.data_folder) / "SLAM_worker_fault.log"
diagnostics = WorkerDiagnostics(fault_log_path)
except Exception as e:
diagnostics = None
_safe_put(status_q, ("WARN", f"Diagnostics disabled: {e}"))
try:
full_cfg = load_slam_config()
except Exception:
full_cfg = {}
loc_cfg_raw = full_cfg.get("localization", {}) if isinstance(full_cfg, dict) else {}
guard_cfg = full_cfg.get("mapping_guard", {}) if isinstance(full_cfg, dict) else {}
loc_pt2plane_unsafe = _as_bool(
(loc_cfg_raw or {}).get("allow_point_to_plane_unsafe", False),
False,
)
loc_use_point_to_plane = _as_bool(
(loc_cfg_raw or {}).get("use_point_to_plane", False),
False,
) and bool(loc_pt2plane_unsafe)
if _as_bool((loc_cfg_raw or {}).get("use_point_to_plane", False), False) and not loc_pt2plane_unsafe:
_safe_put(
status_q,
(
"WARN",
"Localization point-to-plane disabled for stability. Set localization.allow_point_to_plane_unsafe=true to force it.",
),
)
loc_live_period_sec = max(0.25, float((loc_cfg_raw or {}).get("live_period_sec", 0.55)))
loc_tracking_window_enabled = _as_bool(
(loc_cfg_raw or {}).get("tracking_window_enabled", True),
True,
)
loc_tracking_window_radius_m = max(
2.0,
float((loc_cfg_raw or {}).get("tracking_window_radius_m", 8.0)),
)
loc_tracking_window_min_ref_points = max(
120,
int((loc_cfg_raw or {}).get("tracking_window_min_ref_points", 450)),
)
loc_tracking_window_max_ref_points = max(
int(loc_tracking_window_min_ref_points),
int((loc_cfg_raw or {}).get("tracking_window_max_ref_points", 140000)),
)
loc_global_reloc_anchor_voxel_m = max(
0.4,
float((loc_cfg_raw or {}).get("global_reloc_anchor_voxel_m", 1.4)),
)
loc_global_reloc_max_anchors = max(
8,
int((loc_cfg_raw or {}).get("global_reloc_max_anchors", 24)),
)
loc_global_reloc_yaw_step_deg = float(
np.clip(
float((loc_cfg_raw or {}).get("global_reloc_yaw_step_deg", 60.0)),
15.0,
120.0,
)
)
loc_global_reloc_corr_mult = max(
1.2,
float((loc_cfg_raw or {}).get("global_reloc_corr_mult", 1.9)),
)
loc_global_reloc_coarse_iter = max(
4,
int((loc_cfg_raw or {}).get("global_reloc_coarse_iter", 8)),
)
loc_global_reloc_refine_iter = max(
8,
int((loc_cfg_raw or {}).get("global_reloc_refine_iter", 20)),
)
loc_global_reloc_min_corr = max(
15,
int((loc_cfg_raw or {}).get("global_reloc_min_corr", 25)),
)
loc_bidir_enabled = _as_bool((loc_cfg_raw or {}).get("bidirectional_check_enabled", True), True)
loc_bidir_min_ratio = float(
np.clip(float((loc_cfg_raw or {}).get("bidirectional_min_ratio", 0.18)), 0.01, 1.0)
)
loc_bidir_max_rmse = max(0.05, float((loc_cfg_raw or {}).get("bidirectional_max_rmse", 0.55)))
loc_guess_ttl_sec = max(2.0, float((loc_cfg_raw or {}).get("approx_guess_ttl_sec", 60.0)))
loc_guess_crop_radius_m = max(
2.0,
float((loc_cfg_raw or {}).get("approx_guess_crop_radius_m", 9.0)),
)
loc_guess_default_z_m = float((loc_cfg_raw or {}).get("approx_guess_default_z_m", 0.0))
loc_guess_strict_local = _as_bool(
(loc_cfg_raw or {}).get("approx_guess_strict_local", True),
True,
)
loc_guess_local_radius_m = max(
1.0,
min(
float(loc_guess_crop_radius_m),
float((loc_cfg_raw or {}).get("approx_guess_local_radius_m", 4.5)),
),
)
loc_guess_global_fallback_after_failures = max(
1,
int((loc_cfg_raw or {}).get("approx_guess_global_fallback_after_failures", 6)),
)
loc_guess_yaw_step_deg = float(
np.clip(
float((loc_cfg_raw or {}).get("approx_guess_yaw_step_deg", 30.0)),
10.0,
120.0,
)
)
loc_guess_max_start_offset_m = max(
1.0,
float((loc_cfg_raw or {}).get("approx_guess_max_start_offset_m", 6.0)),
)
loc_guess_bootstrap_min_hits = max(
1,
int((loc_cfg_raw or {}).get("approx_guess_bootstrap_min_hits", 2)),
)
loc_guess_bootstrap_min_fitness = float(
np.clip(
float((loc_cfg_raw or {}).get("approx_guess_bootstrap_min_fitness", 0.34)),
0.0,
1.0,
)
)
loc_guess_bootstrap_min_bidir = float(
np.clip(
float((loc_cfg_raw or {}).get("approx_guess_bootstrap_min_bidir", 0.24)),
0.0,
1.0,
)
)
loc_live_track_strict = _as_bool(
(loc_cfg_raw or {}).get("live_track_strict", True),
True,
)
loc_live_track_min_fit = float(
np.clip(
float((loc_cfg_raw or {}).get("live_track_min_fitness", 0.42)),
0.0,
1.0,
)
)
loc_live_track_min_bidir = float(
np.clip(
float((loc_cfg_raw or {}).get("live_track_min_bidir", 0.28)),
0.0,
1.0,
)
)
loc_live_track_max_step_trans_m = max(
0.02,
float((loc_cfg_raw or {}).get("live_track_max_step_translation_m", 0.20)),
)
loc_live_track_max_step_rot_deg = max(
0.5,
float((loc_cfg_raw or {}).get("live_track_max_step_rotation_deg", 12.0)),
)
loc_live_stack_enabled = _as_bool((loc_cfg_raw or {}).get("live_stack_enabled", True), True)
loc_live_stack_frames = max(1, int((loc_cfg_raw or {}).get("live_stack_frames", 8)))
loc_live_stack_voxel_m = max(
0.04,
float((loc_cfg_raw or {}).get("live_stack_voxel_m", max(0.06, float(loc_cfg.voxel_localize) * 0.7))),
)
loc_live_stack_max_points = max(800, int((loc_cfg_raw or {}).get("live_stack_max_points", 22000)))
loc_live_stack_min_frames = max(1, int((loc_cfg_raw or {}).get("live_stack_min_frames", 2)))
vis_cfg = full_cfg.get("localization_visualization", {}) if isinstance(full_cfg, dict) else {}
loc_vis_enabled = _as_bool((vis_cfg or {}).get("enabled", True), True)
loc_vis_period_sec = max(0.10, float((vis_cfg or {}).get("period_sec", 0.35)))
loc_vis_voxel_m = max(0.08, float((vis_cfg or {}).get("voxel_m", 0.24)))
loc_vis_max_points = max(4000, int((vis_cfg or {}).get("max_points", 28000)))
loc_vis_local_radius_m = max(2.0, float((vis_cfg or {}).get("local_radius_m", 12.0)))
loc_vis_match_dist_m = max(0.03, float((vis_cfg or {}).get("match_dist_m", 0.30)))
loc_vis_near_dist_m = max(loc_vis_match_dist_m + 0.02, float((vis_cfg or {}).get("near_dist_m", 0.60)))
loc_vis_confirm_frames = max(1, int((vis_cfg or {}).get("confirm_frames", 3)))
loc_vis_decay_per_frame = max(1, int((vis_cfg or {}).get("decay_per_frame", 1)))
lidar = None
slam = None
filt = None
stable = None
loop_cfg = LoopClosureConfig.from_dict(full_cfg.get("loop_closure", {}))
loop_backend = LoopClosureBackend(loop_cfg)
loc_state_cfg = LocalizationStateConfig.from_dict(full_cfg.get("state_machine", {}))
loc_state = LocalizationStateMachine(loc_state_cfg)
session_cfg = SessionMemoryConfig.from_dict(full_cfg.get("session_memory", {}))
session_store = SessionTransformStore(map_cfg.data_folder, session_cfg)
nav_cfg = NavigationExportConfig.from_dict(full_cfg.get("navigation_export", {}))
nav_exporter = NavigationExporter(nav_cfg, map_cfg.data_folder)
nav_rt_cfg = LiveCostmapConfig.from_dict(full_cfg.get("navigation_runtime", full_cfg.get("navigation_export", {})))
nav_runtime = LiveCostmapRuntime(nav_rt_cfg)
nav_global_planner = GlobalAStarPlanner(blocked_cost=int(nav_rt_cfg.blocked_cost))
nav_local_planner = LocalReactivePlanner(LocalPlannerConfig.from_dict(full_cfg.get("local_planner", {})))
fusion_cfg = SensorFusionConfig.from_dict(full_cfg.get("fusion", {}))
sensor_fusion = SensorPoseFusion(fusion_cfg)
mission_cfg = MissionConfig.from_dict(full_cfg.get("mission", {}))
mission = WaypointMissionManager(mission_cfg)
safety_cfg = SafetyConfig.from_dict(full_cfg.get("safety", {}))
safety = SafetySupervisor(safety_cfg)
map_quality_cfg = IndoorMapQualityConfig.from_dict(full_cfg.get("map_quality", {}))
map_quality = IndoorMapQualityFilter(map_quality_cfg)
submap_cfg = SubmapConfig.from_dict(full_cfg.get("submap_mapping", {}))
submap_mapper = LocalGlobalSubmapMapper(submap_cfg)
submap_mode_enabled = bool(submap_cfg.enabled)
_submap_raw = full_cfg.get("submap_mapping", {}) or {}
_ckpt_enabled = _as_bool(_submap_raw.get("checkpoint_enabled", True), True)
_ckpt_interval = max(10.0, float(_submap_raw.get("checkpoint_interval_sec", 60.0)))
submap_ckpt = SubmapCheckpointer(str(map_cfg.data_folder), interval_s=_ckpt_interval) if _ckpt_enabled else None
if submap_ckpt is not None and submap_mode_enabled:
if submap_ckpt.load_into(submap_mapper):
st("INFO", f"Submap checkpoint restored from {map_cfg.data_folder}")
place_cfg = PlaceRecognitionConfig.from_dict(full_cfg.get("place_recognition", {}))
place_recog = PlaceRecognitionIndex(place_cfg, Path(__file__).resolve().parent)
livox_dbg_cfg = full_cfg.get("livox_debug", {}) or {}
livox_debug_enabled = _as_bool(livox_dbg_cfg.get("enabled", False), False)
livox_print_every_n = max(1, int(livox_dbg_cfg.get("print_every_n_frames", 20)))
pose_guard_enabled = _as_bool((guard_cfg or {}).get("enabled", True), True)
pose_guard_max_trans = max(0.08, float((guard_cfg or {}).get("max_frame_translation_m", 0.45)))
pose_guard_max_rot_deg = max(2.0, float((guard_cfg or {}).get("max_frame_rotation_deg", 30.0)))
pose_guard_ref_dt = max(0.01, float((guard_cfg or {}).get("reference_dt_sec", 0.06)))
map_lock_cfg = full_cfg.get("mapping_lock", {}) if isinstance(full_cfg, dict) else {}
map_lock_enabled = _as_bool((map_lock_cfg or {}).get("enabled", True), True)
map_lock_period_sec = max(0.5, float((map_lock_cfg or {}).get("period_sec", 1.5)))
map_lock_voxel_m = max(0.05, float((map_lock_cfg or {}).get("voxel_m", 0.22)))
map_lock_max_corr_m = max(0.3, float((map_lock_cfg or {}).get("max_corr_m", 1.2)))
map_lock_min_stable_pts = max(200, int((map_lock_cfg or {}).get("min_stable_points", 700)))
map_lock_accept_fitness = float(np.clip(float((map_lock_cfg or {}).get("accept_fitness", 0.28)), 0.0, 1.0))
map_lock_accept_rmse = max(0.05, float((map_lock_cfg or {}).get("accept_rmse", 0.40)))
map_lock_max_trans_m = max(0.02, float((map_lock_cfg or {}).get("max_step_translation_m", 0.35)))
map_lock_max_rot_deg = max(1.0, float((map_lock_cfg or {}).get("max_step_rotation_deg", 15.0)))
map_lock_apply_damping = _as_bool((map_lock_cfg or {}).get("apply_damping", True), True)
map_lock_apply_alpha_t = float(np.clip(float((map_lock_cfg or {}).get("apply_alpha_translation", 0.42)), 0.05, 1.0))
map_lock_apply_alpha_r = float(np.clip(float((map_lock_cfg or {}).get("apply_alpha_rotation", 0.50)), 0.05, 1.0))
map_lock_apply_max_trans_m = max(0.01, float((map_lock_cfg or {}).get("apply_max_translation_m", 0.08)))
map_lock_apply_max_rot_deg = max(0.5, float((map_lock_cfg or {}).get("apply_max_rotation_deg", 5.0)))
map_lock_window_radius_m = max(0.0, float((map_lock_cfg or {}).get("window_radius_m", 8.0)))
map_lock_min_live_pts = max(60, int((map_lock_cfg or {}).get("min_live_points", 140)))
map_lock_icp_max_iter = max(5, int((map_lock_cfg or {}).get("icp_max_iter", 20)))
map_lock_block_on_reject = _as_bool((map_lock_cfg or {}).get("block_on_reject", True), True)
map_lock_retry_after_sec = float((map_lock_cfg or {}).get("retry_after_reject_sec", 0.20))
map_lock_retry_after_sec = float(np.clip(map_lock_retry_after_sec, 0.05, max(0.05, map_lock_period_sec)))
map_lock_block_on_severe = _as_bool((map_lock_cfg or {}).get("block_on_severe_reject", True), True)
map_lock_severe_fit = float(np.clip(float((map_lock_cfg or {}).get("severe_fit", 0.14)), 0.0, 1.0))
map_lock_severe_rmse = max(0.10, float((map_lock_cfg or {}).get("severe_rmse", 0.75)))
map_lock_severe_trans_m = max(
float(map_lock_max_trans_m),
float((map_lock_cfg or {}).get("severe_trans_m", 0.45)),
)
map_lock_severe_rot_deg = max(
float(map_lock_max_rot_deg),
float((map_lock_cfg or {}).get("severe_rot_deg", 22.0)),
)
rotate_cfg = full_cfg.get("rotate_guard", {}) if isinstance(full_cfg, dict) else {}
rotate_mode_enabled = _as_bool((rotate_cfg or {}).get("enabled", True), True)
rotate_in_place_max_trans_m = max(0.01, float((rotate_cfg or {}).get("in_place_max_trans_m", 0.05)))
rotate_in_place_min_rot_deg = max(0.5, float((rotate_cfg or {}).get("in_place_min_rot_deg", 4.0)))
rotate_pose_guard_max_rot_deg = max(
float(pose_guard_max_rot_deg),
float((rotate_cfg or {}).get("pose_guard_max_rot_deg", 34.0)),
)
rotate_skip_continuity = _as_bool((rotate_cfg or {}).get("skip_continuity", True), True)
rotate_map_lock_period_sec = max(0.05, float((rotate_cfg or {}).get("map_lock_period_sec", 0.25)))
rotate_map_lock_accept_fitness = float(
np.clip(float((rotate_cfg or {}).get("map_lock_accept_fitness", max(0.05, map_lock_accept_fitness - 0.06))), 0.0, 1.0)
)
rotate_map_lock_accept_rmse = max(
float(map_lock_accept_rmse),
float((rotate_cfg or {}).get("map_lock_accept_rmse", map_lock_accept_rmse + 0.10)),
)
rotate_map_lock_max_step_rot_deg = max(
float(map_lock_max_rot_deg),
float((rotate_cfg or {}).get("map_lock_max_step_rot_deg", 14.0)),
)
rotate_map_lock_apply_alpha_t = float(
np.clip(
float((rotate_cfg or {}).get("map_lock_apply_alpha_translation", max(0.05, map_lock_apply_alpha_t * 0.55))),
0.03,
1.0,
)
)
rotate_map_lock_apply_alpha_r = float(
np.clip(
float((rotate_cfg or {}).get("map_lock_apply_alpha_rotation", max(0.05, map_lock_apply_alpha_r * 0.70))),
0.03,
1.0,
)
)
rotate_map_lock_apply_max_trans_m = max(
0.005,
float((rotate_cfg or {}).get("map_lock_apply_max_translation_m", max(0.01, map_lock_apply_max_trans_m * 0.45))),
)
rotate_map_lock_apply_max_rot_deg = max(
0.5,
float((rotate_cfg or {}).get("map_lock_apply_max_rotation_deg", max(1.0, map_lock_apply_max_rot_deg * 0.70))),
)
rotate_freeze_mapping = _as_bool((rotate_cfg or {}).get("freeze_mapping_while_rotating", True), True)
rotate_freeze_min_stable_pts = max(0, int((rotate_cfg or {}).get("freeze_min_stable_points", 250)))
rotate_map_lock_block_on_reject = _as_bool((rotate_cfg or {}).get("block_on_reject", False), False)
rotate_map_lock_block_on_severe = _as_bool((rotate_cfg or {}).get("block_on_severe", True), True)
continuity_cfg = full_cfg.get("continuity_guard", {}) if isinstance(full_cfg, dict) else {}
continuity_enabled = _as_bool((continuity_cfg or {}).get("enabled", True), True)
continuity_voxel_m = max(0.05, float((continuity_cfg or {}).get("voxel_m", 0.24)))
continuity_min_points = max(40, int((continuity_cfg or {}).get("min_points", 120)))
continuity_match_radius_m = max(0.08, float((continuity_cfg or {}).get("match_radius_m", 0.45)))
continuity_min_inlier_ratio = float(np.clip(float((continuity_cfg or {}).get("min_inlier_ratio", 0.35)), 0.01, 1.0))
continuity_max_median_m = max(0.05, float((continuity_cfg or {}).get("max_median_m", 0.30)))
continuity_max_p90_m = max(0.08, float((continuity_cfg or {}).get("max_p90_m", 0.60)))
continuity_recover_after_rejects = max(2, int((continuity_cfg or {}).get("recover_after_rejects", 10)))
cleanup_cfg = full_cfg.get("map_cleanup", {}) if isinstance(full_cfg, dict) else {}
cleanup_enabled = _as_bool((cleanup_cfg or {}).get("enabled", True), True)
cleanup_period_sec = max(0.5, float((cleanup_cfg or {}).get("period_sec", 1.8)))
cleanup_voxel_m = max(0.05, float((cleanup_cfg or {}).get("voxel_m", 0.16)))
cleanup_cluster_radius_m = max(0.12, float((cleanup_cfg or {}).get("cluster_radius_m", 0.42)))
cleanup_min_cluster_points = max(20, int((cleanup_cfg or {}).get("min_cluster_points", 120)))
cleanup_keep_largest_n = max(1, int((cleanup_cfg or {}).get("keep_largest_n", 1)))
cleanup_strict_largest_only = _as_bool((cleanup_cfg or {}).get("strict_largest_only", True), True)
cleanup_min_total_points = max(150, int((cleanup_cfg or {}).get("min_total_points", 800)))
autosave_cfg = full_cfg.get("autosave", {}) or {}
autosave_enabled = _as_bool(autosave_cfg.get("enabled", False), False)
autosave_interval_sec = max(5.0, float(autosave_cfg.get("interval_sec", 90.0)))
autosave_base_name = str(autosave_cfg.get("base_name", "autosave_map"))
last_autosave_t = 0.0
self_check_report = run_startup_self_check(full_cfg, Path(__file__).resolve().parent)
connected = False
mapping_enabled = False
localize_only_enabled = False
worker_mode = "IDLE"
running = True
state_lock = threading.RLock()
# localization
ref_map_path: Optional[str] = None
# publish control
pub_hz = float(run_cfg.publish_hz)
last_pub = 0.0
# keyframe gating
last_key_pose = None
last_frame_t = 0.0
last_guard_pose = None
last_guard_t = 0.0
guard_reject_count = 0
guard_last_warn_t = 0.0
odom_to_map_transform = np.eye(4, dtype=np.float64)
map_lock_last_t = 0.0
map_lock_accept_count = 0
map_lock_reject_count = 0
map_lock_last_warn_t = 0.0
continuity_prev_pts = None
continuity_reject_count = 0
continuity_consecutive_rejects = 0
continuity_last_warn_t = 0.0
rotate_freeze_count = 0
rotate_last_warn_t = 0.0
cleanup_last_t = 0.0
cleanup_removed_total = 0
cleanup_last_info: Dict[str, Any] = {"status": "na"}
# localization cadence/state
last_localize_t = 0.0
last_localize_points = 0
last_localize_transform = np.eye(4, dtype=np.float64)
last_localize_confidence = 0.0
ref_pcd_cache = None
ref_points_cache: Optional[np.ndarray] = None
ref_nav_points_cache: Optional[np.ndarray] = None
ref_nav_cache_src_n: int = 0
ref_cache_path: Optional[str] = None
slam_to_ref_transform = np.eye(4, dtype=np.float64)
slam_to_ref_valid = False
approx_pose_guess_tf: Optional[np.ndarray] = None
approx_pose_guess_until_t: float = 0.0
approx_pose_guess_fail_count: int = 0
approx_pose_guess_success_count: int = 0
ref_match_points_cache: Optional[np.ndarray] = None
ref_match_confirm_hits: Optional[np.ndarray] = None
ref_match_last_pub_t: float = 0.0
ref_match_last_stats: Dict[str, Any] = {}
loc_live_stack: deque[np.ndarray] = deque(maxlen=int(loc_live_stack_frames))
loc_frames = LocalizationFrameState()
# runtime point-density control (LOW/MEDIUM/HIGH)
base_stride = max(1, int(eng_cfg.pre_downsample_stride))
density_mode = "MEDIUM"
live_stride = base_stride
runtime_filter_voxel = float(filt_cfg.voxel_size)
runtime_filter_hit_threshold = int(max(1, filt_cfg.hits_threshold))
runtime_filter_decay = float(filt_cfg.strict_sec if filt_cfg.use_strict else filt_cfg.window_sec)
stability_profile = "BALANCED"
# telemetry/performance
perf_input_count = 0
perf_publish_count = 0
perf_last_rate_t = _now()
perf_last_cpu_wall_t = _now()
perf_last_cpu_proc_t = time.process_time()
perf_input_fps = 0.0
perf_publish_fps = 0.0
perf_icp_ms = 0.0
perf_cpu_pct = 0.0
perf_queue_lag = 0
perf_stable_growth = 0.0
perf_prev_stable_count = 0
perf_prev_stable_t = _now()
# recording / replay capture
recording_enabled = False
recording_frames: List[np.ndarray] = []
recording_poses: List[np.ndarray] = []
recording_base_name = "slam_recording"
recording_max_frames = max(100, int((full_cfg.get("replay", {}) or {}).get("max_record_frames", 6000)))
recording_drop_count = 0
# nav runtime / planner / mission
nav_goal_xy: Optional[Tuple[float, float]] = None
nav_last_update_t = 0.0
nav_last_plan_t = 0.0
nav_plan_period_sec = max(0.15, float((full_cfg.get("navigation_runtime", {}) or {}).get("plan_period_sec", 0.35)))
nav_path_world: List[Tuple[float, float]] = []
latest_live_pts: Optional[np.ndarray] = None
latest_live_localize_pts: Optional[np.ndarray] = None
raw_points_q: queue.Queue = queue.Queue(maxsize=max(2, int(run_cfg.frame_queue_maxsize)))
def _sync_loc_vars_from_service() -> None:
nonlocal odom_to_map_transform, last_localize_transform, last_localize_confidence
nonlocal slam_to_ref_transform, slam_to_ref_valid
odom_to_map_transform = np.array(loc_frames.odom_to_map, dtype=np.float64, copy=True)
last_localize_transform = np.array(loc_frames.last_alignment, dtype=np.float64, copy=True)
last_localize_confidence = float(loc_frames.confidence)
slam_to_ref_transform = np.array(loc_frames.odom_to_ref, dtype=np.float64, copy=True)
slam_to_ref_valid = bool(loc_frames.ref_valid)
def _worker_diag_state() -> Dict[str, Any]:
return {
"connected": bool(connected),
"mapping_enabled": bool(mapping_enabled),
"localize_only_enabled": bool(localize_only_enabled),
"worker_mode": str(worker_mode),
"ref_map_path": str(ref_map_path) if ref_map_path else "",
"last_localize_points": int(last_localize_points),
"last_localize_t": float(last_localize_t),
"density_mode": str(density_mode),
"stability_profile": str(stability_profile),
"submap_enabled": bool(submap_mode_enabled),
"loop_closure_enabled": bool(loop_cfg.enabled),
"loc_frames": loc_frames.snapshot(),
}
def _clear_raw_points_queue() -> None:
try:
while True:
raw_points_q.get_nowait()
except Exception:
return
# ---------------- init stack ----------------
def init_slam_stack():
nonlocal slam, filt, stable
# kiss_icp >= 1.2.x requires a `deskew` positional arg; pre-1.2 doesn't.
# Try the new signature first, fall back to the old one.
try:
cfg = load_config(
config_file=None,
deskew=bool(getattr(eng_cfg, "deskew", True)),
max_range=float(eng_cfg.max_range),
)
except TypeError:
cfg = load_config(config_file=None, max_range=float(eng_cfg.max_range))
cfg.mapping.voxel_size = float(eng_cfg.slam_voxel_size)
slam = KissICP(cfg)
vf_cfg = VFilterCfg(
voxel_size=float(runtime_filter_voxel),
hit_threshold=int(runtime_filter_hit_threshold),
decay_seconds=float(runtime_filter_decay),
max_voxels=int(filt_cfg.max_voxels),
)
filt = VoxelPersistenceFilter(vf_cfg)
sm_cfg = StableMapCfg(
display_voxel=float(map_cfg.display_voxel),
save_voxel=float(map_cfg.save_voxel),
data_folder=str(map_cfg.data_folder),
save_extension=str(map_cfg.save_extension),
)
stable = StableMapLayer(sm_cfg)
def _run_locked(fn, *args, **kwargs):
with state_lock:
return fn(*args, **kwargs)
def reset_all():
nonlocal mapping_enabled, localize_only_enabled, worker_mode, last_key_pose
nonlocal last_guard_pose, last_guard_t, guard_reject_count, guard_last_warn_t
nonlocal last_frame_t
nonlocal odom_to_map_transform, map_lock_last_t, map_lock_accept_count, map_lock_reject_count, map_lock_last_warn_t
nonlocal continuity_prev_pts, continuity_reject_count, continuity_consecutive_rejects, continuity_last_warn_t
nonlocal rotate_freeze_count, rotate_last_warn_t
nonlocal cleanup_last_t, cleanup_removed_total, cleanup_last_info
nonlocal last_localize_t, last_localize_points, last_localize_transform, last_localize_confidence
nonlocal ref_pcd_cache, ref_points_cache, ref_nav_points_cache, ref_nav_cache_src_n, ref_cache_path, slam_to_ref_transform, slam_to_ref_valid
nonlocal approx_pose_guess_tf, approx_pose_guess_until_t, approx_pose_guess_fail_count, approx_pose_guess_success_count
nonlocal ref_match_points_cache, ref_match_confirm_hits, ref_match_last_pub_t, ref_match_last_stats
nonlocal loc_live_stack
nonlocal last_autosave_t
nonlocal perf_input_count, perf_publish_count, perf_last_rate_t
nonlocal perf_last_cpu_wall_t, perf_last_cpu_proc_t
nonlocal perf_input_fps, perf_publish_fps, perf_icp_ms, perf_cpu_pct
nonlocal perf_queue_lag, perf_stable_growth, perf_prev_stable_count, perf_prev_stable_t
nonlocal recording_enabled, recording_frames, recording_poses, recording_drop_count
nonlocal nav_goal_xy, nav_last_update_t, nav_last_plan_t, nav_path_world, latest_live_pts, latest_live_localize_pts
init_slam_stack()
mapping_enabled = False
localize_only_enabled = False
worker_mode = "IDLE"
last_frame_t = 0.0
last_key_pose = None
last_guard_pose = None
last_guard_t = 0.0
guard_reject_count = 0
guard_last_warn_t = 0.0
loc_frames.reset()
_sync_loc_vars_from_service()
map_lock_last_t = 0.0
map_lock_accept_count = 0
map_lock_reject_count = 0
map_lock_last_warn_t = 0.0
continuity_prev_pts = None
continuity_reject_count = 0
continuity_consecutive_rejects = 0
continuity_last_warn_t = 0.0
rotate_freeze_count = 0
rotate_last_warn_t = 0.0
cleanup_last_t = 0.0
cleanup_removed_total = 0
cleanup_last_info = {"status": "na"}
last_localize_t = 0.0
last_localize_points = 0
ref_pcd_cache = None
ref_points_cache = None
ref_nav_points_cache = None
ref_nav_cache_src_n = 0
ref_cache_path = None
approx_pose_guess_tf = None
approx_pose_guess_until_t = 0.0
approx_pose_guess_fail_count = 0
approx_pose_guess_success_count = 0
ref_match_points_cache = None
ref_match_confirm_hits = None
ref_match_last_pub_t = 0.0
ref_match_last_stats = {}
loc_live_stack.clear()
loop_backend.reset()
loc_state.reset()
submap_mapper.reset()
if submap_ckpt is not None:
submap_ckpt.delete()
sensor_fusion.reset()
_imu_integrator.reset()
mission.reset()
nav_runtime.reset()
last_autosave_t = 0.0
perf_input_count = 0
perf_publish_count = 0
perf_last_rate_t = _now()
perf_last_cpu_wall_t = _now()
perf_last_cpu_proc_t = time.process_time()
perf_input_fps = 0.0
perf_publish_fps = 0.0
perf_icp_ms = 0.0
perf_cpu_pct = 0.0
perf_queue_lag = 0
perf_stable_growth = 0.0
perf_prev_stable_count = 0
perf_prev_stable_t = _now()
recording_enabled = False
recording_frames = []
recording_poses = []
recording_drop_count = 0
nav_goal_xy = None
nav_last_update_t = 0.0
nav_last_plan_t = 0.0
nav_path_world = []
latest_live_pts = None
latest_live_localize_pts = None
_clear_raw_points_queue()
st("INFO", "RESET done: SLAM + Filter + Stable map cleared. (no save)")
init_slam_stack()
# ---------------- helpers ----------------
def height_colors_fast(pts: np.ndarray, alpha: float = 0.9) -> np.ndarray:
if pts is None or len(pts) == 0:
return np.zeros((0, 4), dtype=np.float32)
z = pts[:, 2]
colors = np.zeros((len(pts), 4), dtype=np.float32)
colors[:, 3] = alpha
norm = np.clip((z + 1.5) / 3.0, 0, 1)
colors[:, 0] = norm
colors[:, 1] = 1.0 - np.abs(norm - 0.5) * 2.0
colors[:, 2] = 1.0 - norm
return colors
def pose_delta_ok(pose, last_pose) -> bool:
if last_pose is None:
return True
dp = pose[:3, 3] - last_pose[:3, 3]
trans = float(np.linalg.norm(dp))
R = pose[:3, :3] @ last_pose[:3, :3].T
ang = float(np.degrees(np.arccos(np.clip((np.trace(R) - 1) / 2, -1, 1))))
return (trans >= float(eng_cfg.keyframe_min_translation_m)) or (ang >= float(eng_cfg.keyframe_min_rotation_deg))
def to_world_points(points_sensor: np.ndarray, pose: Optional[np.ndarray]) -> np.ndarray:
"""
Convert sensor-frame points to world frame using pose (world_T_sensor).
Falls back to input points when pose is unavailable/invalid.
"""
return _to_world_points(points_sensor, pose)
def apply_transform_points(points: Optional[np.ndarray], transform: np.ndarray) -> Optional[np.ndarray]:
return _apply_transform_points(points, transform)
def _tf_delta(prev_tf: np.ndarray, new_tf: np.ndarray) -> Tuple[float, float]:
return _tf_delta_impl(prev_tf, new_tf)
def _blend_rigid_tf(prev_tf: np.ndarray, new_tf: np.ndarray, alpha_t: float, alpha_r: float) -> np.ndarray:
return _blend_rigid_tf_impl(prev_tf, new_tf, alpha_t, alpha_r)
def _yaw_deg_from_tf(tf: np.ndarray) -> float:
return _yaw_deg_from_tf_impl(tf)
def _tf_from_xyzyaw(x: float, y: float, z: float, yaw_deg: float) -> np.ndarray:
return _tf_from_xyzyaw_impl(x, y, z, yaw_deg)
def voxel_downsample_np(points: np.ndarray, voxel_m: float) -> np.ndarray:
pts = np.asarray(points, dtype=np.float32)
if pts.ndim != 2 or pts.shape[1] != 3 or len(pts) == 0:
return np.zeros((0, 3), dtype=np.float32)
v = float(voxel_m)
if v <= 0.0:
return pts
keys = np.floor(pts / v).astype(np.int32)
_, first_idx = np.unique(keys, axis=0, return_index=True)
return pts[np.sort(first_idx)]
def _rigid_fit_svd(src_pts: np.ndarray, dst_pts: np.ndarray) -> Optional[np.ndarray]:
"""Estimate rigid transform (dst = T * src) with SVD."""
if src_pts is None or dst_pts is None:
return None
if len(src_pts) < 3 or len(dst_pts) < 3:
return None
a = np.asarray(src_pts, dtype=np.float64)
b = np.asarray(dst_pts, dtype=np.float64)
if a.shape != b.shape or a.ndim != 2 or a.shape[1] != 3:
return None
c_a = np.mean(a, axis=0)
c_b = np.mean(b, axis=0)
aa = a - c_a
bb = b - c_b
H = aa.T @ bb
try:
U, _, Vt = np.linalg.svd(H, full_matrices=False)
except Exception:
return None
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[2, :] *= -1.0
R = Vt.T @ U.T
t = c_b - (R @ c_a)
T = np.eye(4, dtype=np.float64)
T[:3, :3] = R
T[:3, 3] = t
return T
def _icp_point_to_point_np(
src_pts: np.ndarray,
tgt_pts: np.ndarray,
init_tf: Optional[np.ndarray],
max_corr_m: float,
max_iter: int,
min_corr: int = 30,
) -> Dict[str, Any]:
"""
Pure NumPy/SciPy point-to-point ICP to avoid Open3D native crashes.
Returns {transform, fitness, rmse, inliers}.
"""
src0 = np.asarray(src_pts, dtype=np.float64)
tgt = np.asarray(tgt_pts, dtype=np.float64)
if src0.ndim != 2 or tgt.ndim != 2 or src0.shape[1] != 3 or tgt.shape[1] != 3:
return {"transform": np.eye(4, dtype=np.float64), "fitness": 0.0, "rmse": 9e9, "inliers": 0}
if len(src0) < 3 or len(tgt) < 3:
return {"transform": np.eye(4, dtype=np.float64), "fitness": 0.0, "rmse": 9e9, "inliers": 0}
try:
from scipy.spatial import cKDTree
except Exception:
return {"transform": np.eye(4, dtype=np.float64), "fitness": 0.0, "rmse": 9e9, "inliers": 0}
T = np.eye(4, dtype=np.float64) if init_tf is None else np.asarray(init_tf, dtype=np.float64).copy()
if T.shape != (4, 4):
T = np.eye(4, dtype=np.float64)
tree = cKDTree(tgt)
max_corr = float(max(0.05, max_corr_m))
min_corr_n = int(max(10, min_corr))
last_rmse = 9e9
last_inliers = 0
for _ in range(int(max(1, max_iter))):
src_w = (src0 @ T[:3, :3].T) + T[:3, 3]
d, idx = tree.query(src_w, k=1)
keep = np.isfinite(d) & (d <= max_corr)
n_in = int(np.count_nonzero(keep))
if n_in < min_corr_n:
break
src_corr = src_w[keep]
tgt_corr = tgt[np.asarray(idx[keep], dtype=np.int32)]
delta = _rigid_fit_svd(src_corr, tgt_corr)
if delta is None:
break
T = np.asarray(delta, dtype=np.float64) @ T
prev_rmse = float(last_rmse)
rmse = float(np.sqrt(np.mean(np.square(d[keep]))))
last_rmse = rmse
last_inliers = n_in
# Tiny improvement => converged.
if abs(prev_rmse - rmse) < 1e-5:
break
fitness = float(last_inliers) / max(1.0, float(len(src0)))
return {"transform": T, "fitness": float(fitness), "rmse": float(last_rmse), "inliers": int(last_inliers)}
def _bidir_match_stats(
src_pts: np.ndarray,
tgt_pts: np.ndarray,
transform: np.ndarray,
max_corr_m: float,
) -> Dict[str, Any]:
stats = {
"ok": False,
"live_to_ref_inlier": 0.0,
"ref_to_live_inlier": 0.0,
"bidir_inlier": 0.0,
"bidir_rmse": 9e9,
}
try:
from scipy.spatial import cKDTree
except Exception:
return stats
try:
src = np.asarray(src_pts, dtype=np.float64)
tgt = np.asarray(tgt_pts, dtype=np.float64)
tf = np.asarray(transform, dtype=np.float64)
if src.ndim != 2 or tgt.ndim != 2 or src.shape[1] != 3 or tgt.shape[1] != 3:
return stats
if len(src) < 8 or len(tgt) < 8 or tf.shape != (4, 4):
return stats
src_w = (src @ tf[:3, :3].T) + tf[:3, 3]
max_corr = max(0.05, float(max_corr_m))
tree_t = cKDTree(tgt)
d_st, _ = tree_t.query(src_w, k=1)
in_st = np.isfinite(d_st) & (d_st <= max_corr)
in_st_ratio = float(np.count_nonzero(in_st)) / max(1.0, float(len(src_w)))
tree_s = cKDTree(src_w)
d_ts, _ = tree_s.query(tgt, k=1)
in_ts = np.isfinite(d_ts) & (d_ts <= max_corr)
in_ts_ratio = float(np.count_nonzero(in_ts)) / max(1.0, float(len(tgt)))
inlier_mix = np.concatenate([d_st[in_st], d_ts[in_ts]], axis=0)
rmse = float(np.sqrt(np.mean(np.square(inlier_mix)))) if len(inlier_mix) > 0 else 9e9
stats.update(
{
"ok": True,
"live_to_ref_inlier": float(in_st_ratio),
"ref_to_live_inlier": float(in_ts_ratio),
"bidir_inlier": float(min(in_st_ratio, in_ts_ratio)),
"bidir_rmse": float(rmse),
}
)
return stats
except Exception:
return stats
def _build_ref_match_visual(
live_ref_pts: Optional[np.ndarray],
pose_ref: Optional[np.ndarray],
now_t: float,
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Dict[str, Any]]:
nonlocal ref_match_points_cache, ref_match_confirm_hits, ref_match_last_pub_t, ref_match_last_stats
stats: Dict[str, Any] = {"enabled": bool(loc_vis_enabled), "updated": False}
if not loc_vis_enabled:
ref_match_last_stats = stats
return None, None, stats
if ref_points_cache is None or len(ref_points_cache) == 0:
stats["reason"] = "no_ref"
ref_match_last_stats = stats
return None, None, stats
if (float(now_t) - float(ref_match_last_pub_t)) < float(loc_vis_period_sec):
return None, None, dict(ref_match_last_stats)
ref_match_last_pub_t = float(now_t)
if ref_match_points_cache is None or ref_match_confirm_hits is None:
ref_ds = voxel_downsample_np(np.asarray(ref_points_cache, dtype=np.float32), float(loc_vis_voxel_m))
if len(ref_ds) > int(loc_vis_max_points):
step = max(2, int(np.ceil(float(len(ref_ds)) / float(loc_vis_max_points))))
ref_ds = ref_ds[::step]
ref_match_points_cache = np.asarray(ref_ds, dtype=np.float32)
ref_match_confirm_hits = np.zeros((len(ref_match_points_cache),), dtype=np.int16)
pts_ref = np.asarray(ref_match_points_cache, dtype=np.float32)
if len(pts_ref) == 0:
stats["reason"] = "empty_ref_sample"
ref_match_last_stats = stats
return pts_ref, np.zeros((0, 4), dtype=np.float32), stats
local_mask = np.ones((len(pts_ref),), dtype=bool)
center = None
if pose_ref is not None:
pose_np = np.asarray(pose_ref, dtype=np.float64)
if pose_np.shape == (4, 4):
center = np.asarray(pose_np[:3, 3], dtype=np.float32)
if center is not None and np.isfinite(center).all():
rel = pts_ref - center.reshape((1, 3))
d2 = np.einsum("ij,ij->i", rel, rel)
local_mask = d2 <= (float(loc_vis_local_radius_m) * float(loc_vis_local_radius_m))
colors = np.zeros((len(pts_ref), 4), dtype=np.float32)
colors[:, :3] = np.array([0.45, 0.45, 0.45], dtype=np.float32)
colors[:, 3] = 0.16
live = None if live_ref_pts is None else np.asarray(live_ref_pts, dtype=np.float32)
matched = np.zeros((len(pts_ref),), dtype=bool)
near = np.zeros((len(pts_ref),), dtype=bool)
unmatched = np.zeros((len(pts_ref),), dtype=bool)
try:
if live is not None and live.ndim == 2 and live.shape[1] == 3 and len(live) >= 20 and np.any(local_mask):
from scipy.spatial import cKDTree
tree = cKDTree(np.asarray(live, dtype=np.float64))
idx = np.where(local_mask)[0]
d, _ = tree.query(np.asarray(pts_ref[idx], dtype=np.float64), k=1)
d = np.asarray(d, dtype=np.float64)
match_idx = idx[d <= float(loc_vis_match_dist_m)]
near_idx = idx[(d > float(loc_vis_match_dist_m)) & (d <= float(loc_vis_near_dist_m))]
unmatch_idx = idx[d > float(loc_vis_near_dist_m)]
matched[match_idx] = True
near[near_idx] = True
unmatched[unmatch_idx] = True
except Exception as _vis_exc:
st("WARN", f"[loc_vis] match visual update failed: {_vis_exc}")
if ref_match_confirm_hits is None or len(ref_match_confirm_hits) != len(pts_ref):
ref_match_confirm_hits = np.zeros((len(pts_ref),), dtype=np.int16)
hits = np.asarray(ref_match_confirm_hits, dtype=np.int16)
hits[matched] = np.clip(hits[matched] + 1, 0, 32000)
decay_mask = local_mask & (~matched)
if np.any(decay_mask):
hits[decay_mask] = np.maximum(0, hits[decay_mask] - int(loc_vis_decay_per_frame))
confirmed = local_mask & (hits >= int(loc_vis_confirm_frames))
ref_match_confirm_hits = hits
colors[unmatched, :3] = np.array([0.95, 0.15, 0.15], dtype=np.float32)
colors[unmatched, 3] = 0.22
colors[near, :3] = np.array([0.96, 0.86, 0.16], dtype=np.float32)
colors[near, 3] = 0.24
colors[matched, :3] = np.array([0.15, 0.85, 0.20], dtype=np.float32)
colors[matched, 3] = 0.24
colors[confirmed, :3] = np.array([0.05, 1.00, 0.28], dtype=np.float32)
colors[confirmed, 3] = 0.34
local_n = int(np.count_nonzero(local_mask))
stats.update(
{
"updated": True,
"ref_points": int(len(pts_ref)),
"local_points": int(local_n),
"matched": int(np.count_nonzero(matched)),
"near": int(np.count_nonzero(near)),
"unmatched": int(np.count_nonzero(unmatched)),
"confirmed": int(np.count_nonzero(confirmed)),
"matched_ratio": float(np.count_nonzero(matched)) / max(1.0, float(local_n)),
}
)
ref_match_last_stats = dict(stats)
return pts_ref, colors, stats
def compute_map_lock_correction(
points_sensor: np.ndarray,
pose_map: Optional[np.ndarray],
now_t: float,
overrides: Optional[Dict[str, Any]] = None,
) -> Tuple[Optional[np.ndarray], Dict[str, Any]]:
nonlocal map_lock_last_t
info: Dict[str, Any] = {"status": "skip", "ran": False}
ov = overrides if isinstance(overrides, dict) else {}
period_sec_local = max(0.05, float(ov.get("period_sec", map_lock_period_sec)))
max_corr_local = max(0.20, float(ov.get("max_corr_m", map_lock_max_corr_m)))
accept_fit_local = float(np.clip(float(ov.get("accept_fitness", map_lock_accept_fitness)), 0.0, 1.0))
accept_rmse_local = max(0.05, float(ov.get("accept_rmse", map_lock_accept_rmse)))
max_step_trans_local = max(0.01, float(ov.get("max_step_translation_m", map_lock_max_trans_m)))
max_step_rot_local = max(0.5, float(ov.get("max_step_rotation_deg", map_lock_max_rot_deg)))
icp_iter_local = max(5, int(ov.get("icp_max_iter", map_lock_icp_max_iter)))
if (not map_lock_enabled) or (pose_map is None):
return None, info
if (now_t - float(map_lock_last_t)) < float(period_sec_local):
return None, info
map_lock_last_t = float(now_t)
try:
stable_all = stable.get_points()
except Exception:
stable_all = None
n_stable = 0 if stable_all is None else int(len(stable_all))
if stable_all is None or n_stable < int(map_lock_min_stable_pts):
info.update({"status": "skip", "reason": "few_stable", "stable_points": int(n_stable)})
return None, info
src_sensor = np.asarray(points_sensor, dtype=np.float32)
if src_sensor.ndim != 2 or src_sensor.shape[1] != 3 or len(src_sensor) < int(map_lock_min_live_pts):
info.update({"status": "skip", "reason": "few_live", "live_points": int(len(src_sensor))})
return None, info
pose_np = np.asarray(pose_map, dtype=np.float64)
if pose_np.shape != (4, 4):
info.update({"status": "skip", "reason": "bad_pose"})
return None, info
src_world = to_world_points(src_sensor, pose_np)
tgt_world = np.asarray(stable_all, dtype=np.float32)
if float(map_lock_window_radius_m) > 0.0 and len(tgt_world) > 0:
center = np.asarray(pose_np[:3, 3], dtype=np.float32)
rel = tgt_world - center
d2 = np.einsum("ij,ij->i", rel, rel)
win2 = float(map_lock_window_radius_m) * float(map_lock_window_radius_m)
keep = d2 <= win2
min_local = max(120, int(map_lock_min_stable_pts) // 3)
if int(np.count_nonzero(keep)) >= min_local:
tgt_world = tgt_world[keep]
src_ds = voxel_downsample_np(src_world, float(map_lock_voxel_m))
tgt_ds = voxel_downsample_np(tgt_world, float(map_lock_voxel_m))
if len(src_ds) < 60 or len(tgt_ds) < 120:
info.update(
{
"status": "skip",
"reason": "few_downsampled",
"src_points": int(len(src_ds)),
"tgt_points": int(len(tgt_ds)),
}
)
return None, info
# Robust in-process point-to-point ICP (avoids Open3D native crashes seen in map-lock path).
try:
from scipy.spatial import cKDTree
except Exception:
info.update({"status": "skip", "reason": "scipy_missing"})
return None, info
src_icp = np.asarray(src_ds, dtype=np.float64)
tgt_icp = np.asarray(tgt_ds, dtype=np.float64)
tree = cKDTree(tgt_icp)
max_corr = float(max_corr_local)
min_corr = max(40, int(0.15 * len(src_icp)))
def _rigid_fit(src_pts: np.ndarray, dst_pts: np.ndarray) -> Optional[np.ndarray]:
if len(src_pts) < 3 or len(dst_pts) < 3:
return None
c_src = np.mean(src_pts, axis=0)
c_dst = np.mean(dst_pts, axis=0)
src0 = src_pts - c_src
dst0 = dst_pts - c_dst
H = src0.T @ dst0
try:
U, _, Vt = np.linalg.svd(H, full_matrices=False)
except Exception:
return None
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1.0
R = Vt.T @ U.T
t = c_dst - (R @ c_src)
T = np.eye(4, dtype=np.float64)
T[:3, :3] = R
T[:3, 3] = t
return T
corr = np.eye(4, dtype=np.float64)
converged = False
for _ in range(int(icp_iter_local)):
src_w = (src_icp @ corr[:3, :3].T) + corr[:3, 3]
dist, idx = tree.query(src_w, k=1)
mask = np.isfinite(dist) & (dist <= max_corr)
if int(np.count_nonzero(mask)) < min_corr:
break
src_in = src_w[mask]
dst_in = tgt_icp[idx[mask]]
delta = _rigid_fit(src_in, dst_in)
if delta is None:
break
corr = delta @ corr
step_t = float(np.linalg.norm(delta[:3, 3]))
step_r = float(
np.degrees(
np.arccos(np.clip((np.trace(delta[:3, :3]) - 1.0) * 0.5, -1.0, 1.0))
)
)
if step_t < 0.001 and step_r < 0.08:
converged = True
break
src_final = (src_icp @ corr[:3, :3].T) + corr[:3, 3]
dist_f, _ = tree.query(src_final, k=1)
mask_f = np.isfinite(dist_f) & (dist_f <= max_corr)
inliers = int(np.count_nonzero(mask_f))
if inliers < min_corr:
info.update(
{
"ran": True,
"status": "rejected",
"reason": "few_inliers",
"src_points": int(len(src_icp)),
"tgt_points": int(len(tgt_icp)),
}
)
return None, info
fit = float(inliers) / float(max(1, len(src_icp)))
rmse = float(np.sqrt(np.mean((dist_f[mask_f]) ** 2)))
dpos = float(np.linalg.norm(corr[:3, 3]))
drot = float(np.degrees(np.arccos(np.clip((np.trace(corr[:3, :3]) - 1.0) * 0.5, -1.0, 1.0))))
ok = bool(
fit >= float(accept_fit_local)
and rmse <= float(accept_rmse_local)
and dpos <= float(max_step_trans_local)
and drot <= float(max_step_rot_local)
)
info.update(
{
"ran": True,
"status": "accepted" if ok else "rejected",
"fitness": fit,
"rmse": rmse,
"dpos_m": dpos,
"drot_deg": drot,
"src_points": int(len(src_ds)),
"tgt_points": int(len(tgt_ds)),
"converged": bool(converged),
"inliers": int(inliers),
"period_sec": float(period_sec_local),
"accept_fit": float(accept_fit_local),
"accept_rmse": float(accept_rmse_local),
}
)
return (corr if ok else None), info
def check_frame_continuity(points_world: np.ndarray) -> Tuple[bool, Dict[str, Any], Optional[np.ndarray]]:
info: Dict[str, Any] = {"enabled": bool(continuity_enabled), "status": "skip"}
pts = np.asarray(points_world, dtype=np.float32)
pts_ds = voxel_downsample_np(pts, float(continuity_voxel_m))
if (not continuity_enabled) or pts_ds is None or len(pts_ds) < int(continuity_min_points):
info.update({"status": "skip", "reason": "disabled_or_few_points", "points": int(len(pts_ds))})
return True, info, pts_ds
if continuity_prev_pts is None or len(continuity_prev_pts) < int(continuity_min_points):
info.update({"status": "bootstrap", "points": int(len(pts_ds))})
return True, info, pts_ds
try:
from scipy.spatial import cKDTree
tree = cKDTree(np.asarray(continuity_prev_pts, dtype=np.float64))
dist, _ = tree.query(np.asarray(pts_ds, dtype=np.float64), k=1)
except Exception as e:
info.update({"status": "skip", "reason": f"tree_error:{e}"})
return True, info, pts_ds
mask = np.isfinite(dist)
if int(np.count_nonzero(mask)) < int(continuity_min_points):
info.update({"status": "skip", "reason": "few_matches"})
return True, info, pts_ds
d = dist[mask]
inlier_ratio = float(np.mean(d <= float(continuity_match_radius_m)))
median_d = float(np.median(d))
p90_d = float(np.percentile(d, 90.0))
ok = bool(
inlier_ratio >= float(continuity_min_inlier_ratio)
and median_d <= float(continuity_max_median_m)
and p90_d <= float(continuity_max_p90_m)
)
info.update(
{
"status": "ok" if ok else "reject",
"points": int(len(pts_ds)),
"inlier_ratio": float(inlier_ratio),
"median_m": float(median_d),
"p90_m": float(p90_d),
}
)
return ok, info, pts_ds
def cleanup_map_islands(points_world: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
pts = np.asarray(points_world, dtype=np.float32)
info: Dict[str, Any] = {
"enabled": bool(cleanup_enabled),
"status": "skip",
"before": int(len(pts)),
"after": int(len(pts)),
"removed": 0,
}
if (not cleanup_enabled) or len(pts) < int(cleanup_min_total_points):
info["reason"] = "disabled_or_few_points"
return pts, info
pts_ds = voxel_downsample_np(pts, float(cleanup_voxel_m))
if len(pts_ds) < int(cleanup_min_cluster_points):
info["reason"] = "few_downsampled"
return pts, info
try:
from scipy.spatial import cKDTree
tree = cKDTree(np.asarray(pts_ds, dtype=np.float64))
except Exception as e:
info["reason"] = f"tree_error:{e}"
return pts, info
n = int(len(pts_ds))
visited = np.zeros((n,), dtype=bool)
comp_id = np.full((n,), -1, dtype=np.int32)
sizes: List[int] = []
for i in range(n):
if visited[i]:
continue
q = [i]
visited[i] = True
members: List[int] = []
while q:
j = int(q.pop())
members.append(j)
neigh = tree.query_ball_point(pts_ds[j], r=float(cleanup_cluster_radius_m))
for k in neigh:
kk = int(k)
if not visited[kk]:
visited[kk] = True
q.append(kk)
cid = len(sizes)
sizes.append(len(members))
for m in members:
comp_id[m] = int(cid)
if not sizes:
info["reason"] = "no_components"
return pts, info
order = sorted(range(len(sizes)), key=lambda c: int(sizes[c]), reverse=True)
keep_ids = set(order[: int(cleanup_keep_largest_n)])
if not cleanup_strict_largest_only:
for cid, sz in enumerate(sizes):
if int(sz) >= int(cleanup_min_cluster_points):
keep_ids.add(int(cid))
if len(keep_ids) == len(sizes):
info.update({"status": "ok", "components": int(len(sizes)), "kept_components": int(len(keep_ids))})
return pts, info
# Map original points to nearest downsampled component and keep only selected components.
_, nn = tree.query(np.asarray(pts, dtype=np.float64), k=1)
keep_mask = np.isin(comp_id[np.asarray(nn, dtype=np.int32)], np.asarray(list(keep_ids), dtype=np.int32))
cleaned = pts[keep_mask]
removed = int(len(pts) - len(cleaned))
info.update(
{
"status": "ok",
"components": int(len(sizes)),
"kept_components": int(len(keep_ids)),
"before": int(len(pts)),
"after": int(len(cleaned)),
"removed": int(removed),
}
)
return cleaned, info
def push_loc_state() -> None:
st("INFO", {"LOC_STATE": loc_state.snapshot()})
if diagnostics is not None:
diagnostics.log_state("LOC_STATE", _worker_diag_state())
def push_worker_mode() -> None:
st(
"INFO",
{
"MODE": {
"mode": str(worker_mode),
"mapping": bool(mapping_enabled),
"localize_only": bool(localize_only_enabled),
}
},
)
if diagnostics is not None:
diagnostics.log_state("MODE", _worker_diag_state())
def _place_profile_allowed() -> bool:
profile = str(stability_profile).upper().strip()
return bool(place_recog.profile_allowed(profile))
# ---------------- localization ----------------
def do_localize(
force: bool = False,
source: str = "MANUAL",
src_override: Optional[np.ndarray] = None,
) -> Optional[Dict[str, Any]]:
nonlocal ref_map_path, last_localize_t, last_localize_points, last_localize_transform, last_localize_confidence
nonlocal ref_pcd_cache, ref_points_cache, ref_nav_points_cache, ref_nav_cache_src_n, ref_cache_path
nonlocal slam_to_ref_transform, slam_to_ref_valid
nonlocal approx_pose_guess_tf, approx_pose_guess_until_t, approx_pose_guess_fail_count, approx_pose_guess_success_count
now = _now()
src_tag = str(source).upper().strip()
use_live_src = src_override is not None
if not loc_cfg.enabled:
if force:
st("ERROR", "LOCALIZE disabled in config.")
loc_state.update(None, now)
push_loc_state()
return None
if not ref_map_path:
if force:
st("ERROR", "LOCALIZE: load a reference map first.")
loc_state.update(None, now)
push_loc_state()
return None
try:
period = max(0.1, float(loc_cfg.period_sec))
if use_live_src:
# Live relocalization should be frequent but not every frame to avoid jitter.
period = min(period, float(loc_live_period_sec))
if (not force) and ((now - last_localize_t) < period):
return None
req_loc = int(loc_cfg.min_points_for_localize)
if use_live_src:
cur = np.asarray(src_override, dtype=np.float32)
cur_n = int(len(cur))
req_live = max(80, min(req_loc, 1000))
if cur_n < req_live:
if force:
st("ERROR", f"LOCALIZE: not enough live points ({cur_n}/{req_live}).")
loc_state.update(None, now)
push_loc_state()
return None
else:
cur = stable.get_save_points()
cur_n = 0 if cur is None else int(len(cur))
if cur is None or cur_n < req_loc:
if force:
st("ERROR", f"LOCALIZE: not enough stable points ({cur_n}/{req_loc}). Start mapping first.")
loc_state.update(None, now)
push_loc_state()
return None
if (not force) and ((cur_n - int(last_localize_points)) < int(loc_cfg.min_new_points)):
return None
import open3d as o3d
if ref_pcd_cache is None or ref_cache_path != ref_map_path:
ref_pcd_cache = o3d.io.read_point_cloud(ref_map_path)
if hasattr(ref_pcd_cache, "points"):
ref_points_cache = np.asarray(ref_pcd_cache.points, dtype=np.float32).copy()
else:
ref_points_cache = np.asarray(ref_pcd_cache, dtype=np.float32).reshape((-1, 3)).copy()
ref_nav_points_cache = None
ref_nav_cache_src_n = 0
ref_cache_path = ref_map_path
if ref_points_cache is None:
if hasattr(ref_pcd_cache, "points"):
ref_points_cache = np.asarray(ref_pcd_cache.points, dtype=np.float32).copy()
else:
ref_points_cache = np.asarray(ref_pcd_cache, dtype=np.float32).reshape((-1, 3)).copy()
ref_nav_points_cache = None
ref_nav_cache_src_n = 0
ref_np_full = np.asarray(ref_points_cache, dtype=np.float32)
if len(ref_np_full) < int(loc_cfg.min_points_for_localize):
if force:
st("ERROR", "LOCALIZE: reference map has too few points.")
loc_state.update(None, now)
push_loc_state()
return None
if bool(place_cfg.enabled) and _place_profile_allowed():
if (not bool(place_recog.index_ready)) or (str(place_recog.ref_path or "") != str(ref_map_path)):
pr_info = place_recog.build(ref_np_full, ref_map_path)
st("INFO", {"PLACE_RECOG": pr_info})
cur_np_full = np.asarray(cur, dtype=np.float32)
if cur_np_full.ndim != 2 or cur_np_full.shape[1] != 3:
loc_state.update(None, now)
push_loc_state()
return None
# Once localized, keep tracking in a local window around the last pose
# to avoid jumping to similar structures elsewhere in a large map.
tracking_locked = bool(
use_live_src
and slam_to_ref_valid
and src_tag not in ("ON_LOAD", "RECOVERY")
and str(loc_state.state).upper().strip() in ("TRACKING", "DEGRADED")
)
ref_np_track = np.asarray(ref_np_full, dtype=np.float32)
tracking_window_used = False
tracking_window_reason = "full_ref"
if tracking_locked and loc_tracking_window_enabled and len(ref_np_track) > 0:
try:
center = np.asarray(last_localize_transform[:3, 3], dtype=np.float32)
if np.isfinite(center).all():
rel = ref_np_track - center
d2 = np.einsum("ij,ij->i", rel, rel)
win2 = float(loc_tracking_window_radius_m) * float(loc_tracking_window_radius_m)
keep = d2 <= win2
n_keep = int(np.count_nonzero(keep))
if n_keep >= int(loc_tracking_window_min_ref_points):
ref_np_track = ref_np_track[keep]
tracking_window_used = True
tracking_window_reason = "window"
else:
tracking_window_reason = "window_too_few"
except Exception:
tracking_window_reason = "window_error"
if len(ref_np_track) > int(loc_tracking_window_max_ref_points):
step = max(2, int(np.ceil(float(len(ref_np_track)) / float(loc_tracking_window_max_ref_points))))
ref_np_track = ref_np_track[::step]
if tracking_window_used:
tracking_window_reason = "window_decimated"
else:
tracking_window_reason = "full_ref_decimated"
voxel = float(loc_cfg.voxel_localize)
if use_live_src:
voxel = min(voxel, 0.16)
max_corr = max(0.5, float(loc_cfg.max_corr_mult) * voxel)
if use_live_src:
max_corr = min(max_corr, 0.95)
accept_fit_target = float(loc_cfg.accept_fitness)
accept_rmse_target = float(loc_cfg.accept_rmse)
guess_bootstrap = bool(
approx_pose_guess_tf is not None
and float(now) <= float(approx_pose_guess_until_t)
and (not slam_to_ref_valid)
)
guess_strict_local_active = bool(
guess_bootstrap and (not slam_to_ref_valid) and bool(loc_guess_strict_local)
)
if use_live_src:
# Slightly stricter acceptance in live mode to reduce wobble while turning.
accept_fit_target = max(0.34, min(0.98, accept_fit_target + 0.08))
accept_rmse_target = min(accept_rmse_target, 0.26)
if guess_bootstrap:
# If user provided approximate location, allow slight environmental changes.
max_corr = min(1.15, max_corr * 1.15)
accept_fit_target = max(0.22, accept_fit_target - 0.04)
accept_rmse_target = max(accept_rmse_target, 0.34)
init = np.array(last_localize_transform, dtype=np.float64, copy=True)
approx_guess_used = False
approx_guess_active = bool(guess_bootstrap)
if approx_guess_active and not slam_to_ref_valid:
init = np.asarray(approx_pose_guess_tf, dtype=np.float64).copy()
approx_guess_used = True
# Bootstrap with local reference crop around approximate click.
try:
center_guess = np.asarray(init[:3, 3], dtype=np.float32)
rel = ref_np_track - center_guess.reshape((1, 3))
d2 = np.einsum("ij,ij->i", rel, rel)
local_r2 = float(loc_guess_local_radius_m) * float(loc_guess_local_radius_m)
wide_r2 = float(loc_guess_crop_radius_m) * float(loc_guess_crop_radius_m)
keep_local = d2 <= local_r2
n_local = int(np.count_nonzero(keep_local))
min_local_n = int(loc_tracking_window_min_ref_points)
if n_local >= min_local_n:
ref_np_track = ref_np_track[keep_local]
tracking_window_used = True
tracking_window_reason = "approx_local_crop"
else:
keep_wide = d2 <= wide_r2
n_wide = int(np.count_nonzero(keep_wide))
min_wide_n = max(120, min_local_n // 2) if guess_strict_local_active else min_local_n
if n_wide >= min_wide_n:
ref_np_track = ref_np_track[keep_wide]
tracking_window_used = True
tracking_window_reason = "approx_wide_crop"
else:
tracking_window_reason = "approx_crop_too_few"
except Exception:
tracking_window_reason = "approx_crop_error"
src_d = voxel_downsample_np(cur_np_full, voxel)
tgt_d = voxel_downsample_np(ref_np_track, voxel)
if len(src_d) < 40 or len(tgt_d) < 40:
if force:
st("ERROR", "LOCALIZE: not enough downsampled points for ICP.")
loc_state.update(None, now)
push_loc_state()
return None
# Coarse-to-fine ICP improves convergence when the initial offset is large.
coarse_voxel = max(voxel, 0.35)
src_c = voxel_downsample_np(cur_np_full, coarse_voxel)
coarse_tgt_src = ref_np_track if (tracking_locked or approx_guess_used) else ref_np_full
tgt_c = voxel_downsample_np(coarse_tgt_src, coarse_voxel)
def _run_local_icp_chain(init_tf: np.ndarray) -> Dict[str, Any]:
seed = np.asarray(init_tf, dtype=np.float64).copy()
if len(src_c) > 40 and len(tgt_c) > 40:
coarse = _icp_point_to_point_np(
src_c,
tgt_c,
seed,
max_corr_m=max_corr * 1.6,
max_iter=max(15, int(loc_cfg.icp_max_iter) // 2),
min_corr=30,
)
seed = np.array(coarse.get("transform", seed), dtype=np.float64, copy=True)
return _icp_point_to_point_np(
src_d,
tgt_d,
seed,
max_corr_m=max_corr,
max_iter=max(10, int(loc_cfg.icp_max_iter)),
min_corr=35,
)
reg = _run_local_icp_chain(init)
yaw_hypotheses = 0
place_query: Dict[str, Any] = {"ok": False, "reason": "disabled"}
if approx_guess_active and not slam_to_ref_valid and float(loc_guess_yaw_step_deg) < 180.0:
base_yaw = float(_yaw_deg_from_tf(init))
yaw_offsets = np.arange(-180.0, 180.0, float(loc_guess_yaw_step_deg), dtype=np.float64)
best_reg = dict(reg)
for yaw_off in yaw_offsets:
if abs(float(yaw_off)) < 1e-6:
continue
cand = np.asarray(init, dtype=np.float64).copy()
rz = _tf_from_xyzyaw(0.0, 0.0, 0.0, base_yaw + float(yaw_off))
cand[:3, :3] = np.asarray(rz[:3, :3], dtype=np.float64)
rr = _run_local_icp_chain(cand)
yaw_hypotheses += 1
better = (float(rr.get("fitness", 0.0)) > float(best_reg.get("fitness", 0.0))) or (
float(rr.get("fitness", 0.0)) == float(best_reg.get("fitness", 0.0))
and float(rr.get("rmse", 9e9)) < float(best_reg.get("rmse", 9e9))
)
if better:
best_reg = rr
reg = dict(best_reg)
used_recovery = False
def accepted(rr: Dict[str, Any]) -> bool:
return bool(
float(rr.get("fitness", 0.0)) >= float(accept_fit_target)
and float(rr.get("rmse", 9e9)) <= float(accept_rmse_target)
)
# Global relocalization fallback:
# only for bootstrap/lost states, not for normal LIVE tracking.
allow_global_reloc = bool(
(not slam_to_ref_valid)
or src_tag in ("ON_LOAD", "RECOVERY")
or str(loc_state.state).upper().strip() in ("LOST", "RECOVERY")
)
global_reloc_blocked_by_guess = False
if guess_strict_local_active and (not slam_to_ref_valid):
if int(approx_pose_guess_fail_count) < int(loc_guess_global_fallback_after_failures):
allow_global_reloc = False
global_reloc_blocked_by_guess = True
if (not accepted(reg)) and allow_global_reloc:
src_np = np.asarray(src_d, dtype=np.float64)
# Search against the full reference map to recover from wrong initial position.
tgt_global = voxel_downsample_np(ref_np_full, max(float(voxel), 0.24))
tgt_np = np.asarray(tgt_global, dtype=np.float64)
if len(src_np) > 20 and len(tgt_np) > 40:
c_src = np.mean(src_np, axis=0)
anchors = np.zeros((0, 3), dtype=np.float64)
use_place_retrieval = bool(place_cfg.enabled and _place_profile_allowed())
if use_place_retrieval:
approx_center = None
if approx_pose_guess_tf is not None:
try:
approx_center = np.asarray(approx_pose_guess_tf[:3, 3], dtype=np.float32)
except Exception:
approx_center = None
place_query = place_recog.query(cur_np_full, approx_center=approx_center)
if bool(place_query.get("ok", False)):
anchors = np.asarray(
[c.get("center", [0.0, 0.0, 0.0]) for c in place_query.get("candidates", [])],
dtype=np.float64,
).reshape((-1, 3))
if len(anchors) == 0:
anchors = voxel_downsample_np(
np.asarray(tgt_np, dtype=np.float32),
max(float(loc_global_reloc_anchor_voxel_m), float(voxel) * 3.0),
)
anchors = np.asarray(anchors, dtype=np.float64)
if len(anchors) == 0:
anchors = np.asarray([np.mean(tgt_np, axis=0)], dtype=np.float64)
if len(anchors) > int(loc_global_reloc_max_anchors):
step = max(1, int(np.ceil(float(len(anchors)) / float(loc_global_reloc_max_anchors))))
anchors = anchors[::step][: int(loc_global_reloc_max_anchors)]
yaw_step = float(loc_global_reloc_yaw_step_deg)
yaw_candidates = list(np.arange(-180.0, 180.0, yaw_step, dtype=np.float64))
if 0.0 not in yaw_candidates:
yaw_candidates.append(0.0)
best = dict(reg)
for anchor in anchors:
anchor_v = np.asarray(anchor, dtype=np.float64)
for yaw_deg in yaw_candidates:
yaw = np.deg2rad(float(yaw_deg))
cy = float(np.cos(yaw))
sy = float(np.sin(yaw))
rz = np.array(
[[cy, -sy, 0.0], [sy, cy, 0.0], [0.0, 0.0, 1.0]],
dtype=np.float64,
)
init_global = np.eye(4, dtype=np.float64)
init_global[:3, :3] = rz
init_global[:3, 3] = anchor_v - (rz @ c_src)
rr = _icp_point_to_point_np(
src_np,
tgt_np,
init_global,
max_corr_m=float(max_corr) * float(loc_global_reloc_corr_mult),
max_iter=int(loc_global_reloc_coarse_iter),
min_corr=int(loc_global_reloc_min_corr),
)
better = (float(rr.get("fitness", 0.0)) > float(best.get("fitness", 0.0))) or (
float(rr.get("fitness", 0.0)) == float(best.get("fitness", 0.0))
and float(rr.get("rmse", 9e9)) < float(best.get("rmse", 9e9))
)
if better:
best = rr
# Refine the best global candidate.
best_tf = np.asarray(best.get("transform", np.eye(4, dtype=np.float64)), dtype=np.float64)
best_refined = _icp_point_to_point_np(
src_np,
tgt_np,
best_tf,
max_corr_m=float(max_corr) * max(1.2, float(loc_global_reloc_corr_mult) * 0.8),
max_iter=int(loc_global_reloc_refine_iter),
min_corr=max(20, int(loc_global_reloc_min_corr)),
)
if (float(best_refined.get("fitness", 0.0)) > float(best.get("fitness", 0.0))) or (
float(best_refined.get("fitness", 0.0)) == float(best.get("fitness", 0.0))
and float(best_refined.get("rmse", 9e9)) < float(best.get("rmse", 9e9))
):
best = best_refined
if (float(best.get("fitness", 0.0)) > float(reg.get("fitness", 0.0))) or (
float(best.get("fitness", 0.0)) == float(reg.get("fitness", 0.0))
and float(best.get("rmse", 9e9)) < float(reg.get("rmse", 9e9))
):
reg = best
used_recovery = True
reg_tf = np.asarray(reg.get("transform", np.eye(4, dtype=np.float64)), dtype=np.float64)
fit = float(reg.get("fitness", 0.0))
rmse = float(reg.get("rmse", 9e9))
acc = accepted(reg)
temporal_guard_reject = False
bootstrap_pending = False
bootstrap_ok = False
guess_offset_m = -1.0
if approx_guess_active and (approx_pose_guess_tf is not None):
try:
guess_center = np.asarray(approx_pose_guess_tf[:3, 3], dtype=np.float64)
guess_offset_m = float(np.linalg.norm(np.asarray(reg_tf[:3, 3], dtype=np.float64) - guess_center))
except Exception:
guess_offset_m = -1.0
if guess_strict_local_active and guess_offset_m >= 0.0:
if guess_offset_m > float(loc_guess_max_start_offset_m):
acc = False
bidir = _bidir_match_stats(src_d, tgt_d, reg_tf, max_corr_m=max_corr)
if bool(loc_bidir_enabled) and bool(bidir.get("ok", False)):
if (
float(bidir.get("bidir_inlier", 0.0)) < float(loc_bidir_min_ratio)
or float(bidir.get("bidir_rmse", 9e9)) > float(loc_bidir_max_rmse)
):
acc = False
if use_live_src and slam_to_ref_valid and src_tag not in ("RECOVERY", "ON_LOAD"):
dtr_m, drot_deg = _tf_delta(last_localize_transform, reg_tf)
max_step_t = float(loc_live_track_max_step_trans_m) if (localize_only_enabled and loc_live_track_strict) else 0.35
max_step_r = float(loc_live_track_max_step_rot_deg) if (localize_only_enabled and loc_live_track_strict) else 22.0
if (dtr_m > max_step_t or drot_deg > max_step_r) and (fit < (accept_fit_target + 0.10)):
acc = False
temporal_guard_reject = True
if use_live_src and slam_to_ref_valid and src_tag not in ("RECOVERY", "ON_LOAD") and acc:
# Keep tracking lock conservative to prevent map-frame drift.
if localize_only_enabled and loc_live_track_strict:
live_fit_min = max(float(loc_live_track_min_fit), float(accept_fit_target) + 0.02)
else:
live_fit_min = max(0.38, float(accept_fit_target) + 0.04)
if fit < live_fit_min:
acc = False
temporal_guard_reject = True
elif bool(loc_bidir_enabled) and bool(bidir.get("ok", False)):
if localize_only_enabled and loc_live_track_strict:
live_bidir_min = max(float(loc_live_track_min_bidir), float(loc_bidir_min_ratio))
else:
live_bidir_min = max(0.24, float(loc_bidir_min_ratio) + 0.06)
if float(bidir.get("bidir_inlier", 0.0)) < live_bidir_min:
acc = False
temporal_guard_reject = True
if approx_guess_active and (not slam_to_ref_valid):
bootstrap_ok = bool(acc)
if bootstrap_ok and (fit < float(loc_guess_bootstrap_min_fitness)):
bootstrap_ok = False
if bootstrap_ok and bool(bidir.get("ok", False)):
if float(bidir.get("bidir_inlier", 0.0)) < float(loc_guess_bootstrap_min_bidir):
bootstrap_ok = False
if bootstrap_ok and guess_offset_m >= 0.0:
if guess_offset_m > float(loc_guess_max_start_offset_m):
bootstrap_ok = False
if bootstrap_ok:
approx_pose_guess_success_count += 1
# Seed next bootstrap attempt from this best candidate.
loc_frames.last_alignment = np.array(reg_tf, dtype=np.float64, copy=True)
_sync_loc_vars_from_service()
if int(approx_pose_guess_success_count) < int(loc_guess_bootstrap_min_hits):
bootstrap_pending = True
acc = False
else:
approx_pose_guess_success_count = 0
fit_floor = max(1e-6, float(loc_state_cfg.lost_fitness))
fit_span = max(1e-6, float(accept_fit_target) - fit_floor)
fit_score = float(np.clip((fit - fit_floor) / fit_span, 0.0, 1.0))
rmse_cap = max(1e-6, float(accept_rmse_target) * 1.5)
rmse_score = float(np.clip((rmse_cap - rmse) / rmse_cap, 0.0, 1.0))
confidence = float(np.clip(0.62 * fit_score + 0.38 * rmse_score, 0.0, 1.0))
if acc:
confidence = max(confidence, 0.75)
if acc and use_live_src and slam_to_ref_valid and src_tag != "RECOVERY":
# Low-pass localization transform in live mode to reduce visual map shake.
alpha = 0.45 if fit >= (accept_fit_target + 0.08) else 0.30
reg_tf = _blend_rigid_tf(last_localize_transform, reg_tf, alpha_t=alpha, alpha_r=alpha)
sigma_xy = float(max(0.02, rmse * (1.3 - fit_score)))
sigma_z = float(max(0.03, rmse * (1.6 - fit_score)))
sigma_yaw = float(max(0.02, (1.0 - fit_score) * 0.35))
result: Dict[str, Any] = {
"fitness": fit,
"rmse": rmse,
"max_corr": float(max_corr),
"accepted": acc,
"transform": np.array(reg_tf, dtype=np.float64),
"source": src_tag,
"src_points": int(len(src_d)),
"ref_points": int(len(tgt_d)),
"ref_points_full": int(len(ref_np_full)),
"tracking_window_used": bool(tracking_window_used),
"tracking_window_reason": str(tracking_window_reason),
"recovery_used": bool(used_recovery),
"temporal_guard_reject": bool(temporal_guard_reject),
"yaw_hypotheses": int(yaw_hypotheses),
"confidence": float(confidence),
"covariance_diag": [sigma_xy, sigma_xy, sigma_z, sigma_yaw, sigma_yaw, sigma_yaw],
"live_only": bool(use_live_src),
"global_reloc_allowed": bool(allow_global_reloc),
"global_reloc_blocked_by_guess": bool(global_reloc_blocked_by_guess),
"bidir_ok": bool(bidir.get("ok", False)),
"inlier_live_to_ref": float(bidir.get("live_to_ref_inlier", 0.0)),
"inlier_ref_to_live": float(bidir.get("ref_to_live_inlier", 0.0)),
"inlier_bidir": float(bidir.get("bidir_inlier", 0.0)),
"bidir_rmse": float(bidir.get("bidir_rmse", 9e9)),
"approx_guess_used": bool(approx_guess_used),
"approx_guess_fail_count": int(approx_pose_guess_fail_count),
"approx_guess_success_count": int(approx_pose_guess_success_count),
"approx_guess_offset_m": float(guess_offset_m),
"bootstrap_pending": bool(bootstrap_pending),
"place_retrieval_ok": bool(place_query.get("ok", False)),
"place_retrieval_reason": str(place_query.get("reason", "na")),
"place_retrieval_backend": str(place_query.get("backend", place_recog.backend_active)),
"place_retrieval_top_score": float(place_query.get("top_score", 0.0)),
"place_retrieval_candidates": int(len(place_query.get("candidates", []))),
}
last_localize_t = now
if not use_live_src:
last_localize_points = cur_n
if result["accepted"]:
prev_tf = np.array(slam_to_ref_transform, dtype=np.float64, copy=True) if slam_to_ref_valid else None
loc_frames.set_reference_alignment(reg_tf, confidence)
_sync_loc_vars_from_service()
if use_live_src and prev_tf is not None and submap_mode_enabled and submap_mapper.has_points:
try:
delta_tf = np.asarray(slam_to_ref_transform, dtype=np.float64) @ np.linalg.inv(prev_tf)
dt_m, dr_deg = _tf_delta(np.eye(4, dtype=np.float64), delta_tf)
if dt_m > 1.20 or dr_deg > 45.0:
# Large relocalization jump: reset local/global buffer to avoid ghost structures.
submap_mapper.reset()
elif dt_m > 1e-4 or dr_deg > 1e-3:
submap_mapper.apply_correction(delta_tf)
except Exception as _sub_exc:
st("WARN", f"[submap] correction after relocalization failed: {_sub_exc}")
approx_pose_guess_tf = None
approx_pose_guess_until_t = 0.0
approx_pose_guess_fail_count = 0
approx_pose_guess_success_count = 0
session_store.record_success(
ref_map_path,
last_localize_transform,
float(result["fitness"]),
float(result["rmse"]),
)
else:
if approx_guess_active and (not slam_to_ref_valid):
if not bootstrap_pending:
approx_pose_guess_fail_count += 1
loc_frames.update_confidence(max(0.0, min(1.0, loc_frames.confidence * 0.85)))
_sync_loc_vars_from_service()
safety.mark_localization(bool(result["accepted"]), now)
loc_state.update(result, now)
result["state"] = str(loc_state.state)
st("INFO", {"LOCALIZE": result})
push_loc_state()
return result
except Exception as e:
safety.mark_localization(False, now)
loc_state.update(None, now)
push_loc_state()
if diagnostics is not None:
diagnostics.log_exception("LOCALIZE failed", e, _worker_diag_state())
st("ERROR", f"LOCALIZE failed: {e}")
return None
def maybe_auto_localize(src_live: Optional[np.ndarray] = None):
if not loc_cfg.enabled:
return
if not ref_map_path:
return
if src_live is not None and localize_only_enabled:
do_localize(force=False, source="LIVE_ONLY", src_override=src_live)
else:
do_localize(force=False, source="AUTO")
t = _now()
if loc_state.should_recover(t):
loc_state.enter_recovery(t)
push_loc_state()
do_localize(force=True, source="RECOVERY", src_override=src_live if localize_only_enabled else None)
def apply_density_mode(mode: str):
nonlocal density_mode, live_stride
m = str(mode).upper().strip()
if m == "LOW":
density_mode = "LOW"
live_stride = max(2, base_stride * 4)
elif m == "HIGH":
density_mode = "HIGH"
live_stride = 1
else:
density_mode = "MEDIUM"
live_stride = base_stride
st("INFO", {"DENSITY": {"mode": density_mode, "stride": int(live_stride)}})
def _rebuild_filter_runtime(keep_existing: bool = True) -> None:
nonlocal filt
seed_pts = None
if keep_existing and stable is not None:
try:
seed_pts = stable.get_points()
except Exception:
seed_pts = None
vf_cfg = VFilterCfg(
voxel_size=float(runtime_filter_voxel),
hit_threshold=int(runtime_filter_hit_threshold),
decay_seconds=float(runtime_filter_decay),
max_voxels=int(filt_cfg.max_voxels),
)
new_f = VoxelPersistenceFilter(vf_cfg)
if seed_pts is not None and len(seed_pts) > 0 and hasattr(new_f, "seed_stable_points"):
try:
new_f.seed_stable_points(
seed_pts,
now=_now(),
hit_count=int(runtime_filter_hit_threshold),
clear_existing=True,
)
except Exception:
pass
filt = new_f
def apply_filter_tuning(payload: Any) -> None:
nonlocal runtime_filter_voxel, runtime_filter_hit_threshold, runtime_filter_decay
if not isinstance(payload, dict):
st("WARN", "SET_FILTER_TUNING ignored: payload must be dict.")
return
try:
runtime_filter_voxel = max(0.02, float(payload.get("voxel_size", runtime_filter_voxel)))
except Exception:
pass
try:
runtime_filter_hit_threshold = max(1, int(payload.get("hit_threshold", runtime_filter_hit_threshold)))
except Exception:
pass
try:
runtime_filter_decay = max(0.5, float(payload.get("decay_seconds", runtime_filter_decay)))
except Exception:
pass
_rebuild_filter_runtime(keep_existing=True)
st(
"INFO",
{
"FILTER_TUNING": {
"voxel_size": float(runtime_filter_voxel),
"hit_threshold": int(runtime_filter_hit_threshold),
"decay_seconds": float(runtime_filter_decay),
"profile": str(stability_profile),
}
},
)
def apply_stability_profile(profile_name: Any) -> None:
nonlocal stability_profile
p = str(profile_name or "BALANCED").upper().strip()
presets = {
# Keep map history much longer during exploration so points do not fade while moving away.
"MAP_NEW": {"hit_threshold": 2, "decay_seconds": 1800.0, "voxel_size": 0.10, "recommended_density": "MEDIUM"},
"LOCALIZE_MAP": {"hit_threshold": 3, "decay_seconds": 45.0, "voxel_size": 0.20, "recommended_density": "MEDIUM"},
"LIVE_NAV_MAP": {"hit_threshold": 2, "decay_seconds": 18.0, "voxel_size": 0.20, "recommended_density": "MEDIUM"},
"LIVE_NAV_NO_MAP": {"hit_threshold": 2, "decay_seconds": 14.0, "voxel_size": 0.18, "recommended_density": "HIGH"},
"QUICK_DEMO": {"hit_threshold": 2, "decay_seconds": 8.0, "voxel_size": 0.20, "recommended_density": "HIGH"},
"BALANCED": {
"voxel_size": float(max(0.02, filt_cfg.voxel_size)),
"hit_threshold": int(max(1, filt_cfg.hits_threshold)),
"decay_seconds": float(max(0.5, filt_cfg.strict_sec if filt_cfg.use_strict else filt_cfg.window_sec)),
"recommended_density": "MEDIUM",
},
}
if p not in presets:
st("WARN", f"Unknown stability profile: {p}.")
return
stability_profile = p
preset = presets[p]
apply_filter_tuning(
{
"voxel_size": float(preset.get("voxel_size", runtime_filter_voxel)),
"hit_threshold": int(preset["hit_threshold"]),
"decay_seconds": float(preset["decay_seconds"]),
}
)
apply_density_mode(str(preset.get("recommended_density", density_mode)))
st(
"INFO",
{
"WORKFLOW_PROFILE": {
"name": str(stability_profile),
"recommended_density": str(preset.get("recommended_density", density_mode)),
}
},
)
def _submap_profile_allowed() -> bool:
profile = str(stability_profile).upper().strip()
allowed = {str(x).upper().strip() for x in tuple(submap_cfg.apply_profiles)}
return profile in allowed
def apply_min_stable_points(value: Any) -> None:
try:
v = int(value)
except Exception:
st("WARN", f"SET_MIN_STABLE_POINTS ignored: invalid value {value!r}")
return
v = int(np.clip(v, 50, 5_000_000))
map_cfg.min_points_to_save = v
loc_cfg.min_points_for_localize = v
nav_cfg.min_points = v
try:
nav_exporter.update_config(nav_cfg)
except Exception:
pass
st("INFO", {"MIN_STABLE_POINTS": {"value": int(v)}})
def apply_loop_closure_mode(enabled: Any) -> None:
e = _as_bool(enabled, False)
loop_cfg.enabled = bool(e)
loop_backend.cfg.enabled = bool(e)
st("INFO", {"LOOP_MODE": {"enabled": bool(e)}})
def apply_loc_state_machine_mode(enabled: Any) -> None:
e = _as_bool(enabled, True)
loc_state_cfg.enabled = bool(e)
loc_state.set_enabled(bool(e))
st("INFO", {"LOC_MACHINE": {"enabled": bool(e)}})
push_loc_state()
def apply_submap_mode(payload: Any) -> None:
nonlocal submap_cfg, submap_mapper, submap_mode_enabled
merged = {
"enabled": bool(submap_mode_enabled),
"local_window_frames": int(submap_cfg.local_window_frames),
"local_voxel_m": float(submap_cfg.local_voxel_m),
"global_voxel_m": float(submap_cfg.global_voxel_m),
"merge_period_sec": float(submap_cfg.merge_period_sec),
"merge_min_translation_m": float(submap_cfg.merge_min_translation_m),
"merge_min_rotation_deg": float(submap_cfg.merge_min_rotation_deg),
"max_global_points": int(submap_cfg.max_global_points),
"display_max_points": int(submap_cfg.display_max_points),
"apply_profiles": list(submap_cfg.apply_profiles),
}
if isinstance(payload, dict):
merged.update(payload)
elif payload is not None:
merged["enabled"] = _as_bool(payload, bool(submap_mode_enabled))
new_cfg = SubmapConfig.from_dict(merged)
keep_pts = True
if isinstance(payload, dict):
keep_pts = _as_bool(payload.get("keep_points", True), True)
submap_cfg = new_cfg
submap_mode_enabled = bool(new_cfg.enabled)
submap_mapper.set_config(new_cfg, keep_points=bool(keep_pts and submap_mode_enabled))
if not submap_mode_enabled:
submap_mapper.reset()
st(
"INFO",
{
"SUBMAP_MODE": {
"enabled": bool(submap_mode_enabled),
"profile_allowed": bool(_submap_profile_allowed()),
"apply_profiles": list(submap_cfg.apply_profiles),
"local_window_frames": int(submap_cfg.local_window_frames),
"local_voxel_m": float(submap_cfg.local_voxel_m),
"global_voxel_m": float(submap_cfg.global_voxel_m),
}
},
)
def apply_approx_pose(payload: Any) -> None:
nonlocal approx_pose_guess_tf, approx_pose_guess_until_t
nonlocal approx_pose_guess_fail_count, approx_pose_guess_success_count
nonlocal last_localize_transform
if payload is None:
approx_pose_guess_tf = None
approx_pose_guess_until_t = 0.0
approx_pose_guess_fail_count = 0
approx_pose_guess_success_count = 0
st("INFO", {"APPROX_POSE": {"set": False}})
return
if not isinstance(payload, dict):
st("WARN", "SET_APPROX_POSE ignored: payload must be dict.")
return
try:
x = float(payload.get("x"))
y = float(payload.get("y"))
except Exception:
st("WARN", "SET_APPROX_POSE ignored: x,y are required.")
return
z = float(payload.get("z", loc_guess_default_z_m))
yaw_deg = payload.get("yaw_deg", None)
if yaw_deg is None:
yaw_deg = _yaw_deg_from_tf(last_localize_transform)
else:
yaw_deg = float(yaw_deg)
tf = _tf_from_xyzyaw(x, y, z, yaw_deg)
approx_pose_guess_tf = np.asarray(tf, dtype=np.float64)
approx_pose_guess_until_t = float(_now() + float(loc_guess_ttl_sec))
approx_pose_guess_fail_count = 0
approx_pose_guess_success_count = 0
st(
"INFO",
{
"APPROX_POSE": {
"set": True,
"x": float(x),
"y": float(y),
"z": float(z),
"yaw_deg": float(yaw_deg),
"ttl_sec": float(loc_guess_ttl_sec),
}
},
)
def apply_autosave(payload: Any) -> None:
nonlocal autosave_enabled, autosave_interval_sec, autosave_base_name
if not isinstance(payload, dict):
st("WARN", "SET_AUTOSAVE ignored: payload must be dict.")
return
autosave_enabled = _as_bool(payload.get("enabled", autosave_enabled), autosave_enabled)
try:
autosave_interval_sec = max(5.0, float(payload.get("interval_sec", autosave_interval_sec)))
except Exception:
pass
base_in = str(payload.get("base_name", autosave_base_name)).strip()
if base_in:
autosave_base_name = base_in
st(
"INFO",
{
"AUTOSAVE": {
"enabled": bool(autosave_enabled),
"interval_sec": float(autosave_interval_sec),
"base_name": str(autosave_base_name),
}
},
)
def apply_nav_config(payload: Any) -> None:
nonlocal nav_cfg, nav_rt_cfg, nav_runtime, nav_global_planner
if not isinstance(payload, dict):
st("WARN", "SET_NAV_CONFIG ignored: payload must be dict.")
return
candidate = nav_cfg.patched(payload)
if float(candidate.z_max_m) <= float(candidate.z_min_m):
st("WARN", "SET_NAV_CONFIG ignored: z_max_m must be greater than z_min_m.")
return
nav_cfg = candidate
nav_exporter.update_config(nav_cfg)
nav_rt_cfg = nav_rt_cfg.patched(
{
"resolution_m": float(nav_cfg.resolution_m),
"z_min_m": float(nav_cfg.z_min_m),
"z_max_m": float(nav_cfg.z_max_m),
"padding_m": float(nav_cfg.padding_m),
"inflation_radius_m": float(nav_cfg.inflation_radius_m),
}
)
nav_runtime = LiveCostmapRuntime(nav_rt_cfg)
nav_global_planner = GlobalAStarPlanner(blocked_cost=int(nav_rt_cfg.blocked_cost))
st(
"INFO",
{
"NAV_CONFIG": {
"min_points": int(nav_cfg.min_points),
"resolution_m": float(nav_cfg.resolution_m),
"z_min_m": float(nav_cfg.z_min_m),
"z_max_m": float(nav_cfg.z_max_m),
"inflation_radius_m": float(nav_cfg.inflation_radius_m),
"padding_m": float(nav_cfg.padding_m),
}
},
)
def apply_map_quality_config(payload: Any) -> None:
nonlocal map_quality_cfg, map_quality
if not isinstance(payload, dict):
st("WARN", "SET_MAP_QUALITY ignored: payload must be dict.")
return
merged = {
"enabled": map_quality_cfg.enabled,
"near_min_range_m": map_quality_cfg.near_min_range_m,
"ray_consistency_enabled": map_quality_cfg.ray_consistency_enabled,
"ray_bin_deg": map_quality_cfg.ray_bin_deg,
"ray_elev_bin_deg": map_quality_cfg.ray_elev_bin_deg,
"ray_max_behind_m": map_quality_cfg.ray_max_behind_m,
"ray_keep_n": map_quality_cfg.ray_keep_n,
"world_z_clip_enabled": map_quality_cfg.world_z_clip_enabled,
"world_z_min_m": map_quality_cfg.world_z_min_m,
"world_z_max_m": map_quality_cfg.world_z_max_m,
"outlier_filter_enabled": map_quality_cfg.outlier_filter_enabled,
"outlier_voxel_m": map_quality_cfg.outlier_voxel_m,
"outlier_min_points": map_quality_cfg.outlier_min_points,
}
merged.update(payload)
map_quality_cfg = IndoorMapQualityConfig.from_dict(merged)
map_quality = IndoorMapQualityFilter(map_quality_cfg)
st(
"INFO",
{
"MAP_QUALITY": {
"enabled": bool(map_quality_cfg.enabled),
"near_min_range_m": float(map_quality_cfg.near_min_range_m),
"ray_consistency_enabled": bool(map_quality_cfg.ray_consistency_enabled),
"ray_bin_deg": float(map_quality_cfg.ray_bin_deg),
"ray_elev_bin_deg": float(map_quality_cfg.ray_elev_bin_deg),
"ray_max_behind_m": float(map_quality_cfg.ray_max_behind_m),
"ray_keep_n": int(map_quality_cfg.ray_keep_n),
"world_z_clip_enabled": bool(map_quality_cfg.world_z_clip_enabled),
"world_z_min_m": float(map_quality_cfg.world_z_min_m),
"world_z_max_m": float(map_quality_cfg.world_z_max_m),
"outlier_filter_enabled": bool(map_quality_cfg.outlier_filter_enabled),
"outlier_voxel_m": float(map_quality_cfg.outlier_voxel_m),
"outlier_min_points": int(map_quality_cfg.outlier_min_points),
}
},
)
def apply_nav_runtime_config(payload: Any) -> None:
nonlocal nav_rt_cfg, nav_runtime, nav_global_planner, nav_path_world, nav_last_plan_t, nav_last_update_t
if not isinstance(payload, dict):
st("WARN", "SET_NAV_RUNTIME ignored: payload must be dict.")
return
nav_rt_cfg = nav_rt_cfg.patched(payload)
nav_runtime = LiveCostmapRuntime(nav_rt_cfg)
nav_global_planner = GlobalAStarPlanner(blocked_cost=int(nav_rt_cfg.blocked_cost))
nav_path_world = []
nav_last_plan_t = 0.0
nav_last_update_t = 0.0
st(
"INFO",
{
"NAV_RUNTIME": {
"enabled": bool(nav_rt_cfg.enabled),
"resolution_m": float(nav_rt_cfg.resolution_m),
"z_min_m": float(nav_rt_cfg.z_min_m),
"z_max_m": float(nav_rt_cfg.z_max_m),
"inflation_radius_m": float(nav_rt_cfg.inflation_radius_m),
"dynamic_decay_sec": float(nav_rt_cfg.dynamic_decay_sec),
"dynamic_min_hits": int(nav_rt_cfg.dynamic_min_hits),
"blocked_cost": int(nav_rt_cfg.blocked_cost),
}
},
)
def apply_nav_goal(payload: Any) -> None:
nonlocal nav_goal_xy, nav_path_world, nav_last_plan_t
if not isinstance(payload, dict):
st("WARN", "SET_NAV_GOAL ignored: payload must be dict.")
return
try:
x = float(payload.get("x"))
y = float(payload.get("y"))
except Exception:
st("WARN", "SET_NAV_GOAL ignored: x/y required.")
return
nav_goal_xy = (x, y)
nav_path_world = []
nav_last_plan_t = 0.0
st("INFO", {"NAV_GOAL": {"x": float(x), "y": float(y)}})
def clear_nav_goal() -> None:
nonlocal nav_goal_xy, nav_path_world
nav_goal_xy = None
nav_path_world = []
st("INFO", {"NAV_GOAL": None})
def mission_start(payload: Any) -> None:
nonlocal nav_goal_xy, nav_path_world, nav_last_plan_t
if not isinstance(payload, dict):
st("WARN", "MISSION_START ignored: payload must be dict.")
return
raw = payload.get("waypoints", [])
waypoints: List[Tuple[float, float]] = []
if isinstance(raw, list):
for wp in raw:
if isinstance(wp, dict):
try:
waypoints.append((float(wp.get("x")), float(wp.get("y"))))
except Exception:
continue
elif isinstance(wp, (list, tuple)) and len(wp) >= 2:
try:
waypoints.append((float(wp[0]), float(wp[1])))
except Exception:
continue
mission.start(waypoints=waypoints)
goal = mission.current_goal()
nav_goal_xy = None if goal is None else (float(goal[0]), float(goal[1]))
nav_path_world = []
nav_last_plan_t = 0.0
st("INFO", {"MISSION": mission.snapshot()})
def mission_pause() -> None:
mission.pause()
st("INFO", {"MISSION": mission.snapshot()})
def mission_resume() -> None:
mission.resume()
st("INFO", {"MISSION": mission.snapshot()})
def mission_stop() -> None:
nonlocal nav_goal_xy, nav_path_world
mission.stop()
nav_goal_xy = None
nav_path_world = []
st("INFO", {"MISSION": mission.snapshot()})
def _safe_record_base(name: str) -> str:
base = (name or "").strip() or "slam_recording"
for bad in ("/", "\\", ":", "*", "?", "\"", "<", ">", "|"):
base = base.replace(bad, "_")
return base
def record_start(payload: Any) -> None:
nonlocal recording_enabled, recording_frames, recording_poses, recording_base_name, recording_drop_count
base = recording_base_name
if isinstance(payload, dict):
b = str(payload.get("base_name", "")).strip()
if b:
base = b
recording_base_name = _safe_record_base(base)
recording_frames = []
recording_poses = []
recording_drop_count = 0
recording_enabled = True
st(
"INFO",
{
"RECORDING": {
"enabled": True,
"base_name": str(recording_base_name),
"max_frames": int(recording_max_frames),
}
},
)
def record_stop(payload: Any) -> None:
nonlocal recording_enabled, recording_frames, recording_poses, recording_base_name
save = True
base = recording_base_name
if isinstance(payload, dict):
save = _as_bool(payload.get("save", True), True)
b = str(payload.get("base_name", "")).strip()
if b:
base = b
recording_enabled = False
if (not save) or len(recording_frames) == 0:
st(
"INFO",
{
"RECORDING": {
"enabled": False,
"frames": int(len(recording_frames)),
"saved": False,
}
},
)
recording_frames = []
recording_poses = []
return
base = _safe_record_base(base)
stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime())
out_path = Path(map_cfg.data_folder) / f"{base}_{stamp}.npz"
try:
frames_obj = np.empty((len(recording_frames),), dtype=object)
for i, fr in enumerate(recording_frames):
frames_obj[i] = np.asarray(fr, dtype=np.float32)
if len(recording_poses) == len(recording_frames) and len(recording_poses) > 0:
poses_np = np.stack(recording_poses, axis=0).astype(np.float32)
np.savez_compressed(out_path, frames=frames_obj, poses=poses_np)
else:
np.savez_compressed(out_path, frames=frames_obj)
st(
"INFO",
{
"RECORD_SAVED": {
"path": str(out_path),
"frames": int(len(recording_frames)),
"dropped": int(recording_drop_count),
}
},
)
except Exception as e:
st("ERROR", f"RECORD save failed: {e}")
recording_frames = []
recording_poses = []
def _queue_lag() -> int:
try:
return int(data_q.qsize())
except Exception:
return -1
def _update_perf_rollup(now_t: float, stable_count: int) -> None:
nonlocal perf_input_count, perf_publish_count, perf_last_rate_t
nonlocal perf_last_cpu_wall_t, perf_last_cpu_proc_t
nonlocal perf_input_fps, perf_publish_fps, perf_cpu_pct
nonlocal perf_queue_lag, perf_stable_growth, perf_prev_stable_count, perf_prev_stable_t
dt_rates = float(now_t - perf_last_rate_t)
if dt_rates >= 0.8:
perf_input_fps = float(perf_input_count) / max(dt_rates, 1e-6)
perf_publish_fps = float(perf_publish_count) / max(dt_rates, 1e-6)
perf_input_count = 0
perf_publish_count = 0
perf_last_rate_t = now_t
dt_cpu = float(now_t - perf_last_cpu_wall_t)
if dt_cpu >= 0.8:
cpu_now = float(time.process_time())
perf_cpu_pct = 100.0 * float(cpu_now - perf_last_cpu_proc_t) / max(dt_cpu, 1e-6)
perf_last_cpu_proc_t = cpu_now
perf_last_cpu_wall_t = now_t
dt_growth = float(now_t - perf_prev_stable_t)
if dt_growth >= 0.8:
perf_stable_growth = float(stable_count - perf_prev_stable_count) / max(dt_growth, 1e-6)
perf_prev_stable_count = int(stable_count)
perf_prev_stable_t = now_t
perf_queue_lag = _queue_lag()
def maybe_autosave(now_t: float) -> None:
nonlocal last_autosave_t
if not autosave_enabled or not mapping_enabled:
return
if (now_t - float(last_autosave_t)) < float(autosave_interval_sec):
return
try:
if cleanup_enabled:
full_pts = stable.get_points()
if full_pts is not None and len(full_pts) >= int(cleanup_min_total_points):
cleaned, _ = cleanup_map_islands(full_pts)
if cleaned is not None and len(cleaned) >= int(map_cfg.min_points_to_save):
stable.set_points(cleaned)
pts = stable.get_save_points()
n = 0 if pts is None else int(len(pts))
req = int(map_cfg.min_points_to_save)
if n < req:
last_autosave_t = now_t
return
out_path = stable.export_map(str(autosave_base_name))
last_autosave_t = now_t
st("INFO", {"AUTOSAVED": {"path": str(out_path), "points": int(n)}})
except Exception as e:
last_autosave_t = now_t
st("WARN", f"AUTOSAVE failed: {e}")
# Checkpoint the submap alongside autosave
if submap_ckpt is not None and submap_mode_enabled:
submap_ckpt.save(submap_mapper)
# ---------------- lidar frame processor ----------------
def process_points(points: np.ndarray):
nonlocal last_pub, mapping_enabled, localize_only_enabled, worker_mode, last_key_pose, last_frame_t
nonlocal last_guard_pose, last_guard_t, guard_reject_count, guard_last_warn_t
nonlocal odom_to_map_transform, map_lock_last_t, map_lock_accept_count, map_lock_reject_count, map_lock_last_warn_t
nonlocal continuity_prev_pts, continuity_reject_count, continuity_consecutive_rejects, continuity_last_warn_t
nonlocal rotate_freeze_count, rotate_last_warn_t
nonlocal cleanup_last_t, cleanup_removed_total, cleanup_last_info
nonlocal slam_to_ref_transform, slam_to_ref_valid, last_localize_transform
nonlocal perf_input_count, perf_publish_count, perf_icp_ms
nonlocal recording_enabled, recording_frames, recording_poses, recording_drop_count
nonlocal nav_goal_xy, nav_last_update_t, nav_last_plan_t, nav_path_world
nonlocal latest_live_pts, latest_live_localize_pts
nonlocal ref_points_cache, ref_nav_points_cache, ref_nav_cache_src_n
if points is None or len(points) == 0:
return
perf_input_count += 1
pts = points.astype(np.float32, copy=True)
pts[:, 1] *= -1
pts[:, 2] *= -1
# Reject invalid values and out-of-range returns before SLAM.
finite_mask = np.isfinite(pts).all(axis=1)
if not np.all(finite_mask):
pts = pts[finite_mask]
if len(pts) == 0:
return
r2 = np.einsum("ij,ij->i", pts, pts)
min_r = float(map_quality_cfg.near_min_range_m if map_quality_cfg.enabled else 0.15)
min_r2 = min_r * min_r
max_r = float(eng_cfg.max_range)
max_r2 = max_r * max_r if max_r > 0 else np.inf
range_mask = (r2 >= min_r2) & (r2 <= max_r2)
if not np.all(range_mask):
pts = pts[range_mask]
if len(pts) < 40:
return
# pre-downsample stride from config
stride = int(live_stride)
if stride > 1:
pts = pts[::stride]
if map_quality_cfg.enabled:
pts = map_quality.apply_sensor(pts)
if len(pts) < 40:
return
latest_live_pts = np.array(pts, dtype=np.float32, copy=True)
latest_live_localize_pts = None
pose = None
stable_pts_display = None
stable_cols = None
fusion_info: Dict[str, Any] = {}
nav_runtime_info = None
nav_info: Dict[str, Any] = {}
safety_info: Dict[str, Any] = {}
map_lock_info: Dict[str, Any] = {"status": "skip", "ran": False}
continuity_info: Dict[str, Any] = {"status": "na"}
cleanup_info: Dict[str, Any] = dict(cleanup_last_info) if isinstance(cleanup_last_info, dict) else {"status": "na"}
rotate_freeze_active = False
submap_info: Dict[str, Any] = submap_mapper.status(
active=False,
reason="idle",
profile=str(stability_profile),
profile_allowed=_submap_profile_allowed(),
)
ref_match_points_pub = None
ref_match_colors_pub = None
ref_match_stats: Dict[str, Any] = {}
stable_points_in_global_frame = False
mode_now = str(worker_mode)
if mapping_enabled:
now_t = _now()
dt = 0.1 if last_frame_t <= 0 else float(np.clip(now_t - last_frame_t, 0.02, 0.25))
last_frame_t = now_t
ts = np.linspace(0.0, dt, len(pts), dtype=np.float64)
t_icp0 = _now()
slam.register_frame(pts, ts)
perf_icp_ms = 1000.0 * (_now() - t_icp0)
odom_pose = slam.last_pose if hasattr(slam, "last_pose") else getattr(slam, "pose", None)
if odom_pose is not None:
odom_pose, fusion_info = sensor_fusion.fuse_pose(np.asarray(odom_pose, dtype=np.float64), now=now_t)
try:
pose = np.asarray(odom_to_map_transform, dtype=np.float64) @ np.asarray(odom_pose, dtype=np.float64)
except Exception:
pose = np.asarray(odom_pose, dtype=np.float64)
# Reject obviously unstable jumps to avoid "wall on wall" artifacts.
unstable_jump = False
trans_step_for_mode = 0.0
rot_step_for_mode = 0.0
rotating_in_place = False
if pose_guard_enabled and pose is not None:
pose_np = np.asarray(pose, dtype=np.float64)
if last_guard_pose is not None:
dpos = pose_np[:3, 3] - last_guard_pose[:3, 3]
trans_step = float(np.linalg.norm(dpos))
rrel = pose_np[:3, :3] @ last_guard_pose[:3, :3].T
rot_step = float(np.degrees(np.arccos(np.clip((np.trace(rrel) - 1.0) * 0.5, -1.0, 1.0))))
trans_step_for_mode = float(trans_step)
rot_step_for_mode = float(rot_step)
if rotate_mode_enabled and trans_step_for_mode <= float(rotate_in_place_max_trans_m) and rot_step_for_mode >= float(rotate_in_place_min_rot_deg):
rotating_in_place = True
scale = max(1.0, float(dt) / max(1e-3, float(pose_guard_ref_dt)))
max_t = float(pose_guard_max_trans) * scale
max_r = float(pose_guard_max_rot_deg) * scale
if rotating_in_place:
max_r = max(max_r, float(rotate_pose_guard_max_rot_deg) * scale)
if trans_step > max_t or rot_step > max_r:
unstable_jump = True
guard_reject_count += 1
pose = np.array(last_guard_pose, dtype=np.float64, copy=True)
stable_pts_display = stable.get_display_points()
if (now_t - guard_last_warn_t) > 1.0:
guard_last_warn_t = now_t
st(
"WARN",
(
"MOTION_GUARD rejected frame: "
f"dpos={trans_step:.2f}m>{max_t:.2f}m or drot={rot_step:.1f}deg>{max_r:.1f}deg "
f"(count={guard_reject_count})"
),
)
else:
last_guard_pose = np.array(pose_np, dtype=np.float64, copy=True)
last_guard_t = now_t
else:
last_guard_pose = np.array(pose_np, dtype=np.float64, copy=True)
last_guard_t = now_t
elif pose is not None and rotate_mode_enabled and last_key_pose is not None:
# Fallback rotate-in-place detector when pose-guard is disabled.
try:
dpos = np.asarray(pose, dtype=np.float64)[:3, 3] - np.asarray(last_key_pose, dtype=np.float64)[:3, 3]
trans_step_for_mode = float(np.linalg.norm(dpos))
rrel = np.asarray(pose, dtype=np.float64)[:3, :3] @ np.asarray(last_key_pose, dtype=np.float64)[:3, :3].T
rot_step_for_mode = float(np.degrees(np.arccos(np.clip((np.trace(rrel) - 1.0) * 0.5, -1.0, 1.0))))
rotating_in_place = bool(
trans_step_for_mode <= float(rotate_in_place_max_trans_m)
and rot_step_for_mode >= float(rotate_in_place_min_rot_deg)
)
except Exception:
rotating_in_place = False
if not unstable_jump:
map_lock_block_frame = False
# Periodic scan-to-map correction keeps the accumulated map anchored
# and reduces duplicated structures when traversing long paths.
if pose is not None:
ml_overrides = None
if rotating_in_place and rotate_mode_enabled:
ml_overrides = {
"period_sec": float(rotate_map_lock_period_sec),
"accept_fitness": float(rotate_map_lock_accept_fitness),
"accept_rmse": float(rotate_map_lock_accept_rmse),
"max_step_rotation_deg": float(rotate_map_lock_max_step_rot_deg),
}
corr_lock, map_lock_info = compute_map_lock_correction(pts, pose, now_t, overrides=ml_overrides)
if corr_lock is not None:
corr_np = np.asarray(corr_lock, dtype=np.float64)
corr_apply = np.asarray(corr_np, dtype=np.float64, copy=True)
if map_lock_apply_damping:
alpha_t = float(map_lock_apply_alpha_t)
alpha_r = float(map_lock_apply_alpha_r)
max_t_apply = float(map_lock_apply_max_trans_m)
max_r_apply = float(map_lock_apply_max_rot_deg)
if rotating_in_place and rotate_mode_enabled:
alpha_t = float(rotate_map_lock_apply_alpha_t)
alpha_r = float(rotate_map_lock_apply_alpha_r)
max_t_apply = float(rotate_map_lock_apply_max_trans_m)
max_r_apply = float(rotate_map_lock_apply_max_rot_deg)
corr_apply = _blend_rigid_tf(
np.eye(4, dtype=np.float64),
corr_apply,
alpha_t=float(alpha_t),
alpha_r=float(alpha_r),
)
tvec = np.asarray(corr_apply[:3, 3], dtype=np.float64)
tnorm = float(np.linalg.norm(tvec))
if tnorm > float(max_t_apply) and tnorm > 1e-9:
corr_apply[:3, 3] = tvec * (float(max_t_apply) / tnorm)
dpos_apply, drot_apply = _tf_delta(np.eye(4, dtype=np.float64), corr_apply)
if drot_apply > float(max_r_apply) and drot_apply > 1e-6:
corr_apply = _blend_rigid_tf(
np.eye(4, dtype=np.float64),
corr_apply,
alpha_t=1.0,
alpha_r=float(max_r_apply) / float(drot_apply),
)
else:
dpos_apply, drot_apply = _tf_delta(np.eye(4, dtype=np.float64), corr_apply)
map_lock_info["applied_dpos_m"] = float(dpos_apply)
map_lock_info["applied_drot_deg"] = float(drot_apply)
map_lock_info["damped"] = bool(map_lock_apply_damping)
pose = corr_apply @ np.asarray(pose, dtype=np.float64)
loc_frames.apply_map_correction(corr_apply)
_sync_loc_vars_from_service()
if last_guard_pose is not None:
last_guard_pose = corr_apply @ np.asarray(last_guard_pose, dtype=np.float64)
if last_key_pose is not None:
last_key_pose = corr_apply @ np.asarray(last_key_pose, dtype=np.float64)
map_lock_accept_count += 1
elif bool(map_lock_info.get("ran", False)):
map_lock_reject_count += 1
block_on_reject_now = bool(map_lock_block_on_reject)
if rotating_in_place and rotate_mode_enabled:
block_on_reject_now = bool(rotate_map_lock_block_on_reject)
if block_on_reject_now:
map_lock_block_frame = True
# Retry map-lock quickly after a reject to recover alignment faster.
map_lock_last_t = float(now_t - map_lock_period_sec + map_lock_retry_after_sec)
if (now_t - map_lock_last_warn_t) > 1.0:
map_lock_last_warn_t = now_t
fit_r = float(map_lock_info.get("fitness", 0.0))
rmse_r = float(map_lock_info.get("rmse", 9e9))
dpos_r = float(map_lock_info.get("dpos_m", 0.0))
drot_r = float(map_lock_info.get("drot_deg", 0.0))
st(
"WARN",
(
"MAP_LOCK rejected; skipping frame integration: "
f"fit={fit_r:.2f}, rmse={rmse_r:.2f}, "
f"dpos={dpos_r:.2f}m, drot={drot_r:.1f}deg"
),
)
block_on_severe_now = bool(map_lock_block_on_severe)
if rotating_in_place and rotate_mode_enabled:
block_on_severe_now = bool(rotate_map_lock_block_on_severe)
if block_on_severe_now:
fit_r = float(map_lock_info.get("fitness", 0.0))
rmse_r = float(map_lock_info.get("rmse", 9e9))
dpos_r = float(map_lock_info.get("dpos_m", 0.0))
drot_r = float(map_lock_info.get("drot_deg", 0.0))
severe_reject = bool(
fit_r <= float(map_lock_severe_fit)
or rmse_r >= float(map_lock_severe_rmse)
or dpos_r >= float(map_lock_severe_trans_m)
or drot_r >= float(map_lock_severe_rot_deg)
)
if severe_reject:
map_lock_block_frame = True
if (now_t - map_lock_last_warn_t) > 1.0:
map_lock_last_warn_t = now_t
st(
"WARN",
(
"MAP_LOCK severe reject; skipping frame integration: "
f"fit={fit_r:.2f}, rmse={rmse_r:.2f}, "
f"dpos={dpos_r:.2f}m, drot={drot_r:.1f}deg"
),
)
# keyframe gating from config
if map_lock_block_frame:
stable_pts_display = stable.get_display_points()
elif eng_cfg.keyframe_enabled and pose is not None and not pose_delta_ok(pose, last_key_pose):
# skip filter/map update
stable_pts_display = stable.get_display_points()
else:
if pose is not None:
last_key_pose = pose
points_world = to_world_points(pts, pose)
stable_now = stable.get_points()
stable_now_n = 0 if stable_now is None else int(len(stable_now))
rotate_freeze_active = bool(
rotate_mode_enabled
and rotate_freeze_mapping
and rotating_in_place
and stable_now_n >= int(rotate_freeze_min_stable_pts)
)
if rotate_freeze_active:
rotate_freeze_count += 1
continuity_consecutive_rejects = 0
continuity_pts_ds = voxel_downsample_np(points_world, float(continuity_voxel_m))
continuity_prev_pts = (
np.array(continuity_pts_ds, dtype=np.float32, copy=True)
if continuity_pts_ds is not None and len(continuity_pts_ds) > 0
else continuity_prev_pts
)
continuity_info = {
"status": "skip_rotate_freeze",
"trans_step_m": float(trans_step_for_mode),
"rot_step_deg": float(rot_step_for_mode),
"stable_points": int(stable_now_n),
"freeze_count": int(rotate_freeze_count),
}
stable_pts_display = stable.get_display_points()
if (now_t - rotate_last_warn_t) > 1.0:
rotate_last_warn_t = now_t
st(
"INFO",
(
"ROTATE_FREEZE active; skipping map integration during in-place turn "
f"(stable={stable_now_n}, dpos={trans_step_for_mode:.3f}m, drot={rot_step_for_mode:.1f}deg)"
),
)
else:
if rotating_in_place and rotate_mode_enabled and rotate_skip_continuity:
continuity_ok = True
continuity_pts_ds = voxel_downsample_np(points_world, float(continuity_voxel_m))
continuity_info = {
"status": "skip_rotating_in_place",
"trans_step_m": float(trans_step_for_mode),
"rot_step_deg": float(rot_step_for_mode),
}
else:
continuity_ok, continuity_info, continuity_pts_ds = check_frame_continuity(points_world)
if not continuity_ok:
continuity_reject_count += 1
continuity_consecutive_rejects += 1
if continuity_consecutive_rejects >= int(continuity_recover_after_rejects):
continuity_ok = True
continuity_consecutive_rejects = 0
continuity_info = {
"status": "resync_after_rejects",
"reject_streak": int(continuity_recover_after_rejects),
"trans_step_m": float(trans_step_for_mode),
"rot_step_deg": float(rot_step_for_mode),
}
continuity_prev_pts = (
np.array(continuity_pts_ds, dtype=np.float32, copy=True)
if continuity_pts_ds is not None and len(continuity_pts_ds) > 0
else continuity_prev_pts
)
st(
"WARN",
(
"CONTINUITY auto-resync after repeated rejects; "
f"recover_after={int(continuity_recover_after_rejects)}"
),
)
if not continuity_ok:
stable_pts_display = stable.get_display_points()
# Trigger a quick map-lock retry after continuity rejection.
map_lock_last_t = float(now_t - map_lock_period_sec + map_lock_retry_after_sec)
if (now_t - continuity_last_warn_t) > 1.0:
continuity_last_warn_t = now_t
st(
"WARN",
(
"CONTINUITY rejected; skipping frame integration: "
f"inlier={float(continuity_info.get('inlier_ratio', 0.0)):.2f}, "
f"med={float(continuity_info.get('median_m', 0.0)):.2f}m, "
f"p90={float(continuity_info.get('p90_m', 0.0)):.2f}m "
f"(count={int(continuity_reject_count)})"
),
)
else:
continuity_consecutive_rejects = 0
continuity_prev_pts = (
np.array(continuity_pts_ds, dtype=np.float32, copy=True)
if continuity_pts_ds is not None and len(continuity_pts_ds) > 0
else None
)
if map_quality_cfg.enabled:
points_world = map_quality.apply_world(points_world)
filt.update(points_world)
stable_pts = filt.get_stable_points()
stable.set_points(stable_pts)
if (
cleanup_enabled
and stable_pts is not None
and len(stable_pts) >= int(cleanup_min_total_points)
and (now_t - float(cleanup_last_t)) >= float(cleanup_period_sec)
):
cleanup_last_t = float(now_t)
cleaned_pts, cleanup_info = cleanup_map_islands(stable_pts)
cleanup_last_info = dict(cleanup_info)
removed_now = int(cleanup_info.get("removed", 0))
if removed_now > 0:
cleanup_removed_total += int(removed_now)
stable.set_points(cleaned_pts)
if hasattr(filt, "seed_stable_points"):
try:
filt.seed_stable_points(
cleaned_pts,
now=now_t,
hit_count=int(runtime_filter_hit_threshold),
clear_existing=True,
)
except Exception as _cln_exc:
st("WARN", f"[cleanup] filter re-seed after cleanup failed: {_cln_exc}")
st(
"INFO",
{
"MAP_CLEANUP": {
**cleanup_info,
"removed_total": int(cleanup_removed_total),
}
},
)
else:
cleanup_info = dict(cleanup_last_info) if isinstance(cleanup_last_info, dict) else {"status": "na"}
stable_pts_display = stable.get_display_points()
# Lightweight loop closure + pose-graph optimization.
if pose is not None and loop_cfg.enabled:
loop_res = loop_backend.process_frame(pts, np.asarray(pose, dtype=np.float64))
if loop_res.loop_detected or loop_res.optimized or ("rejected" in loop_res.info):
st(
"INFO",
{
"LOOP": {
"detected": bool(loop_res.loop_detected),
"optimized": bool(loop_res.optimized),
**loop_res.info,
}
},
)
if loop_res.optimized:
corr = np.asarray(loop_res.correction, dtype=np.float64)
# Apply correction to map and reseed filter for consistency.
stable_all = stable.get_points()
corrected = apply_transform_points(stable_all, corr)
if corrected is not None:
stable.set_points(corrected)
if hasattr(filt, "seed_stable_points"):
filt.seed_stable_points(
corrected,
now=_now(),
hit_count=int(filt_cfg.hits_threshold),
clear_existing=True,
)
stable_pts_display = stable.get_display_points()
loc_frames.apply_loop_correction(corr, update_reference=True)
_sync_loc_vars_from_service()
if last_guard_pose is not None:
last_guard_pose = corr @ np.asarray(last_guard_pose, dtype=np.float64)
if last_key_pose is not None:
last_key_pose = corr @ np.asarray(last_key_pose, dtype=np.float64)
pose = corr @ np.asarray(pose, dtype=np.float64)
maybe_auto_localize()
maybe_autosave(now_t)
if submap_ckpt is not None and submap_mode_enabled:
submap_ckpt.maybe_save(submap_mapper)
elif localize_only_enabled:
now_t = _now()
dt = 0.1 if last_frame_t <= 0 else float(np.clip(now_t - last_frame_t, 0.02, 0.25))
last_frame_t = now_t
ts = np.linspace(0.0, dt, len(pts), dtype=np.float64)
t_icp0 = _now()
slam.register_frame(pts, ts)
perf_icp_ms = 1000.0 * (_now() - t_icp0)
odom_pose = slam.last_pose if hasattr(slam, "last_pose") else getattr(slam, "pose", None)
if odom_pose is not None:
odom_pose, fusion_info = sensor_fusion.fuse_pose(np.asarray(odom_pose, dtype=np.float64), now=now_t)
pose = np.asarray(odom_pose, dtype=np.float64)
else:
pose = None
live_for_localize = np.asarray(pts, dtype=np.float32)
if pose is not None:
live_for_localize = to_world_points(pts, pose)
if loc_live_stack_enabled:
frame_ds = voxel_downsample_np(live_for_localize, float(loc_live_stack_voxel_m))
if frame_ds is not None and len(frame_ds) > 0:
loc_live_stack.append(np.asarray(frame_ds, dtype=np.float32))
if len(loc_live_stack) >= int(loc_live_stack_min_frames):
cat = np.concatenate(list(loc_live_stack), axis=0)
stack_pts = voxel_downsample_np(cat, float(loc_live_stack_voxel_m))
if len(stack_pts) > int(loc_live_stack_max_points):
step = max(2, int(np.ceil(float(len(stack_pts)) / float(loc_live_stack_max_points))))
stack_pts = stack_pts[::step][: int(loc_live_stack_max_points)]
if stack_pts is not None and len(stack_pts) > 0:
live_for_localize = np.asarray(stack_pts, dtype=np.float32)
latest_live_localize_pts = np.array(live_for_localize, dtype=np.float32, copy=True)
maybe_auto_localize(src_live=live_for_localize)
loc_state_name = str(loc_state.state).upper().strip()
submap_active = bool(
submap_mode_enabled
and _submap_profile_allowed()
and slam_to_ref_valid
and loc_state_name in ("TRACKING", "DEGRADED")
and float(last_localize_confidence) >= 0.45
)
if submap_active:
if pose is not None and np.asarray(pose).shape == (4, 4):
pose_ref = np.asarray(slam_to_ref_transform, dtype=np.float64) @ np.asarray(pose, dtype=np.float64)
pts_ref = to_world_points(pts, pose_ref)
else:
pts_ref = apply_transform_points(pts, slam_to_ref_transform)
pose_ref = (
np.asarray(slam_to_ref_transform, dtype=np.float64)
if np.asarray(slam_to_ref_transform).shape == (4, 4)
else None
)
if pts_ref is not None and len(pts_ref) > 0:
submap_info = submap_mapper.integrate(pts_ref, now=now_t, pose_world=pose_ref)
stable_pts_display = submap_mapper.get_display_points()
stable_points_in_global_frame = True
else:
stable_pts_display = live_for_localize
submap_info = submap_mapper.status(
active=False,
reason="empty_live",
profile=str(stability_profile),
profile_allowed=True,
)
else:
if (not submap_mode_enabled) or (not _submap_profile_allowed()) or (not slam_to_ref_valid):
if submap_mapper.has_points and ((not _submap_profile_allowed()) or (not submap_mode_enabled) or (not slam_to_ref_valid)):
submap_mapper.reset()
stable_pts_display = live_for_localize
submap_info = submap_mapper.status(
active=False,
reason="disabled_or_not_localized",
profile=str(stability_profile),
profile_allowed=_submap_profile_allowed(),
)
# Keep pose in odom frame; publish path maps it to reference when localized.
else:
stable_pts_display = stable.get_display_points()
if recording_enabled:
if len(recording_frames) < int(recording_max_frames):
recording_frames.append(np.array(pts, dtype=np.float32, copy=True))
if pose is not None and np.asarray(pose).shape == (4, 4):
recording_poses.append(np.array(pose, dtype=np.float32, copy=True))
else:
recording_drop_count += 1
recording_enabled = False
st(
"WARN",
(
"RECORDING auto-stopped: reached max frames "
f"({int(recording_max_frames)})."
),
)
# publish throttling
t = _now()
if pub_hz > 0:
if (t - last_pub) < (1.0 / pub_hz):
return
last_pub = t
stable_pts_pub = stable_pts_display
pose_pub = pose
if slam_to_ref_valid and not stable_points_in_global_frame:
stable_pts_pub = apply_transform_points(stable_pts_display, slam_to_ref_transform)
if pose is not None:
try:
pose_pub = slam_to_ref_transform @ np.asarray(pose, dtype=np.float64)
except Exception:
pose_pub = pose
if stable_pts_pub is not None and len(stable_pts_pub) > 0:
if localize_only_enabled:
# Blue live overlay for map localization workflows.
stable_cols = np.zeros((len(stable_pts_pub), 4), dtype=np.float32)
stable_cols[:, 0] = 0.18
stable_cols[:, 1] = 0.58
stable_cols[:, 2] = 1.00
stable_cols[:, 3] = 0.72
else:
alpha_live = 0.90
stable_cols = height_colors_fast(stable_pts_pub, alpha=float(alpha_live))
live_world_for_nav = None
if localize_only_enabled:
if slam_to_ref_valid:
if pose is not None and np.asarray(pose).shape == (4, 4):
try:
pose_ref_live = np.asarray(slam_to_ref_transform, dtype=np.float64) @ np.asarray(pose, dtype=np.float64)
live_world_for_nav = to_world_points(pts, pose_ref_live)
except Exception:
live_world_for_nav = apply_transform_points(pts, slam_to_ref_transform)
else:
live_world_for_nav = apply_transform_points(pts, slam_to_ref_transform)
else:
if pose_pub is not None:
live_world_for_nav = to_world_points(pts, np.asarray(pose_pub, dtype=np.float64))
elif slam_to_ref_valid:
live_world_for_nav = apply_transform_points(pts, slam_to_ref_transform)
if localize_only_enabled:
pose_ref_for_vis = None
if slam_to_ref_valid and pose_pub is not None and np.asarray(pose_pub).shape == (4, 4):
pose_ref_for_vis = np.asarray(pose_pub, dtype=np.float64)
elif slam_to_ref_valid:
pose_ref_for_vis = np.asarray(slam_to_ref_transform, dtype=np.float64)
ref_match_points_pub, ref_match_colors_pub, ref_match_stats = _build_ref_match_visual(
live_world_for_nav,
pose_ref_for_vis,
_now(),
)
else:
ref_match_stats = {"enabled": bool(loc_vis_enabled), "updated": False, "reason": "not_localize_mode"}
nav_static_points = stable_pts_pub
if localize_only_enabled and ref_points_cache is not None and len(ref_points_cache) > 0:
try:
src_n = int(len(ref_points_cache))
if ref_nav_points_cache is None or int(ref_nav_cache_src_n) != src_n:
nav_voxel = max(0.08, float(nav_rt_cfg.resolution_m) * 1.25)
nav_pts = voxel_downsample_np(np.asarray(ref_points_cache, dtype=np.float32), float(nav_voxel))
if nav_pts is None:
nav_pts = np.zeros((0, 3), dtype=np.float32)
if len(nav_pts) > 300000:
step = max(2, int(np.ceil(float(len(nav_pts)) / 300000.0)))
nav_pts = nav_pts[::step]
ref_nav_points_cache = np.asarray(nav_pts, dtype=np.float32)
ref_nav_cache_src_n = src_n
if ref_nav_points_cache is not None and len(ref_nav_points_cache) > 0:
nav_static_points = ref_nav_points_cache
except Exception as _nav_exc:
st("WARN", f"[nav] static point cache update failed: {_nav_exc}")
if (t - nav_last_update_t) >= 0.10:
nav_runtime_info = nav_runtime.update(nav_static_points, live_world_for_nav, t)
nav_last_update_t = t
mission_snapshot = mission.snapshot()
if pose_pub is not None and np.asarray(pose_pub).shape == (4, 4):
mission_snapshot = mission.update_pose(float(pose_pub[0, 3]), float(pose_pub[1, 3]))
goal_from_mission = mission.current_goal()
if goal_from_mission is not None:
nav_goal_xy = (float(goal_from_mission[0]), float(goal_from_mission[1]))
elif mission_snapshot.get("completed", False):
nav_goal_xy = None
nav_path_world = []
if nav_goal_xy is not None and pose_pub is not None and nav_runtime.grid is not None:
if (t - nav_last_plan_t) >= float(nav_plan_period_sec) or len(nav_path_world) <= 2:
grid = nav_runtime.grid
try:
nav_path_world = nav_global_planner.plan(
np.asarray(grid["costmap"], dtype=np.uint8),
np.asarray(grid["origin_xy"], dtype=np.float32),
float(grid["resolution_m"]),
(float(pose_pub[0, 3]), float(pose_pub[1, 3])),
(float(nav_goal_xy[0]), float(nav_goal_xy[1])),
)
except Exception as _astar_exc:
st("WARN", f"[nav] A* planning failed: {_astar_exc}")
nav_path_world = []
nav_last_plan_t = t
elif mission.current_goal() is None:
nav_path_world = []
nav_cmd = {"linear_mps": 0.0, "angular_rps": 0.0, "goal_reached": False, "blocked": False}
if pose_pub is not None and len(nav_path_world) > 0:
nav_cmd = nav_local_planner.compute_command(np.asarray(pose_pub, dtype=np.float64), nav_path_world, nav_runtime)
if bool(nav_cmd.get("goal_reached", False)):
mission_snapshot = mission.update_pose(float(pose_pub[0, 3]), float(pose_pub[1, 3]))
next_goal = mission.current_goal()
nav_goal_xy = None if next_goal is None else (float(next_goal[0]), float(next_goal[1]))
nav_path_world = []
safety_info = safety.evaluate(t, pose_pub, str(loc_state.state), nav_runtime, nav_cmd)
grid_now = nav_runtime.grid
nav_info = {
"mode": mode_now,
"goal": [float(nav_goal_xy[0]), float(nav_goal_xy[1])] if nav_goal_xy is not None else None,
"path_points": int(len(nav_path_world)),
"path_preview": (
[[float(x), float(y)] for (x, y) in nav_path_world[:: max(1, len(nav_path_world) // 60)]]
if len(nav_path_world) > 0
else []
),
"cmd": dict(safety_info.get("cmd", nav_cmd)),
"mission": mission_snapshot,
"costmap": (
{
"shape": [int(grid_now["shape_hw"][0]), int(grid_now["shape_hw"][1])],
"resolution_m": float(grid_now["resolution_m"]),
"origin_xy": [float(grid_now["origin_xy"][0]), float(grid_now["origin_xy"][1])],
"static_cells": int(grid_now["static_cells"]),
"dynamic_cells": int(grid_now["dynamic_cells"]),
"inflated_cells": int(grid_now["inflated_cells"]),
}
if isinstance(grid_now, dict)
else nav_runtime_info
),
}
stable_count = 0 if stable_pts_pub is None else int(len(stable_pts_pub))
_update_perf_rollup(t, stable_count)
perf_payload = {
"input_fps": float(perf_input_fps),
"publish_fps": float(perf_publish_fps),
"icp_ms": float(perf_icp_ms),
"queue_lag": int(perf_queue_lag),
"stable_growth_per_sec": float(perf_stable_growth),
"cpu_percent": float(perf_cpu_pct),
}
payload = {
"stable_points": stable_pts_pub,
"stable_colors": stable_cols,
"pose": pose_pub,
"filter": filt.stats() if hasattr(filt, "stats") else {},
"workflow_profile": str(stability_profile),
"map_lock": {
"enabled": bool(map_lock_enabled),
"accept_count": int(map_lock_accept_count),
"reject_count": int(map_lock_reject_count),
"rotating_in_place": bool(rotating_in_place) if mapping_enabled else False,
"freeze_active": bool(rotate_freeze_active) if mapping_enabled else False,
"freeze_count": int(rotate_freeze_count),
"last": dict(map_lock_info) if isinstance(map_lock_info, dict) else {},
},
"continuity": {
"enabled": bool(continuity_enabled),
"reject_count": int(continuity_reject_count),
"reject_streak": int(continuity_consecutive_rejects),
"recover_after_rejects": int(continuity_recover_after_rejects),
"last": dict(continuity_info) if isinstance(continuity_info, dict) else {},
},
"submap": dict(submap_info) if isinstance(submap_info, dict) else {},
"ref_match_points": ref_match_points_pub,
"ref_match_colors": ref_match_colors_pub,
"ref_match": dict(ref_match_stats) if isinstance(ref_match_stats, dict) else {},
"cleanup": {
"enabled": bool(cleanup_enabled),
"removed_total": int(cleanup_removed_total),
"last": dict(cleanup_info) if isinstance(cleanup_info, dict) else {},
},
"localized": bool(slam_to_ref_valid),
"loc_state": str(loc_state.state),
"loc_confidence": float(last_localize_confidence),
"fusion": fusion_info,
"perf": perf_payload,
"mode": str(worker_mode),
"nav": nav_info,
"safety": safety_info,
"recording": {
"enabled": bool(recording_enabled),
"frames": int(len(recording_frames)),
"dropped": int(recording_drop_count),
},
}
perf_publish_count += 1
_safe_put(data_q, ("FRAME", payload), keep_latest=bool(run_cfg.frame_keep_latest))
# ---------------- lidar callback (lightweight) ----------------
# Keep SDK callback minimal: enqueue latest frame and process in worker loop.
# ---------------- IMU orientation integrator ----------------
# Complementary filter: gyro integrates at full 200 Hz; accelerometer
# corrects roll/pitch drift at a low gain (alpha ≈ 0.97).
# Yaw is gyro-only (no magnetometer) — drifts slowly, but far less than
# KISS-ICP yaw on fast rotations.
# Only used when fusion.enabled = true in config.
class _ImuIntegrator:
__slots__ = ("R", "last_t", "alpha", "last_pose_translation")
def __init__(self, alpha: float = 0.97) -> None:
self.R = np.eye(3, dtype=np.float64)
self.last_t: float = 0.0
self.alpha = float(np.clip(alpha, 0.80, 0.999))
self.last_pose_translation = np.zeros(3, dtype=np.float64)
def reset(self) -> None:
self.R = np.eye(3, dtype=np.float64)
self.last_t = 0.0
self.last_pose_translation[:] = 0.0
def update(self, gyro: np.ndarray, acc: np.ndarray, t: float) -> np.ndarray:
"""Returns a 4×4 pose. Translation comes from last known LiDAR pose."""
if self.last_t <= 0.0:
self.last_t = t
return np.eye(4, dtype=np.float64)
dt = float(t - self.last_t)
if dt <= 0.0 or dt > 0.5:
self.last_t = t
return np.eye(4, dtype=np.float64)
self.last_t = t
# 1 — Gyro integration via Rodrigues
omega = np.asarray(gyro, dtype=np.float64)
angle = float(np.linalg.norm(omega)) * dt
if angle > 1e-9:
axis = omega / (float(np.linalg.norm(omega)) + 1e-15)
K = np.array([[0.0, -axis[2], axis[1]],
[axis[2], 0.0, -axis[0]],
[-axis[1], axis[0], 0.0]], dtype=np.float64)
dR = np.eye(3, dtype=np.float64) + np.sin(angle) * K + (1.0 - np.cos(angle)) * (K @ K)
self.R = self.R @ dR
# 2 — Accelerometer correction for roll/pitch (complementary filter)
a = np.asarray(acc, dtype=np.float64)
a_norm = float(np.linalg.norm(a))
if 7.5 < a_norm < 12.5: # only trust near 1 g; reject dynamic spikes
a_unit = a / a_norm
g_in_sensor = self.R.T @ np.array([0.0, 0.0, -1.0]) # expected gravity direction
cross = np.cross(a_unit, g_in_sensor)
cross_norm = float(np.linalg.norm(cross))
if cross_norm > 1e-6:
corr_angle = float(np.arcsin(np.clip(cross_norm, -1.0, 1.0))) * (1.0 - self.alpha)
axis_c = cross / cross_norm
Kc = np.array([[0.0, -axis_c[2], axis_c[1]],
[axis_c[2], 0.0, -axis_c[0]],
[-axis_c[1], axis_c[0], 0.0]], dtype=np.float64)
dR_c = np.eye(3, dtype=np.float64) + np.sin(corr_angle) * Kc + (1.0 - np.cos(corr_angle)) * (Kc @ Kc)
self.R = dR_c @ self.R
# Re-orthogonalise to prevent numerical drift
try:
U, _, Vt = np.linalg.svd(self.R, full_matrices=False)
self.R = U @ Vt
except np.linalg.LinAlgError:
self.R = np.eye(3, dtype=np.float64)
T = np.eye(4, dtype=np.float64)
T[:3, :3] = self.R
T[:3, 3] = self.last_pose_translation # inject LiDAR translation so fusion doesn't shift position
return T
_imu_integrator = _ImuIntegrator(alpha=float(
((full_cfg.get("fusion", {}) or {}).get("imu_complementary_alpha", 0.97))
))
def on_imu(gyro: np.ndarray, acc: np.ndarray, timestamp: float) -> None:
if not fusion_cfg.enabled:
return
try:
# Keep the integrator's translation in sync with the latest LiDAR pose
with state_lock:
cur_tf = np.array(odom_to_map_transform, dtype=np.float64, copy=True)
_imu_integrator.last_pose_translation = cur_tf[:3, 3].copy()
imu_pose = _imu_integrator.update(gyro, acc, timestamp)
sensor_fusion.update_prior(
sensor="imu",
pose=imu_pose,
confidence=0.85,
timestamp=timestamp,
)
except Exception as _imu_exc:
pass # IMU is advisory; never let it crash the worker
# This avoids running heavy ICP/Open3D inside the Livox SDK thread.
def on_points(points: np.ndarray):
if points is None or len(points) == 0:
return
try:
pts = np.asarray(points, dtype=np.float32)
if pts.ndim != 2 or pts.shape[1] != 3:
return
except Exception:
return
_safe_put(raw_points_q, np.array(pts, dtype=np.float32, copy=True), keep_latest=True)
# ---------------- commands ----------------
def do_connect():
nonlocal lidar, connected
if connected:
st("INFO", "Already connected.")
return
cfg_path = Path(eng_cfg.config_file)
if not cfg_path.exists():
st("ERROR", f"Config missing: {cfg_path}")
return
# Rewrite the per-host IPs inside mid360_config.json before the SDK
# reads them. The shipped file has the workstation IP hardcoded
# four times (cmd_data_ip / push_msg_ip / point_data_ip / imu_data_ip),
# which makes Livox's SDK bind() fail on any machine that isn't the
# workstation. host_ip here is the value resolved by SLAM_engine
# (either config, env, or auto-detected off 192.168.123.0/24).
try:
_mcfg = json.loads(cfg_path.read_text())
_hni = _mcfg.get("MID360", {}).get("host_net_info", {})
for _k in ("cmd_data_ip", "push_msg_ip", "point_data_ip", "imu_data_ip"):
if _k in _hni:
_hni[_k] = str(eng_cfg.host_ip)
cfg_path.write_text(json.dumps(_mcfg, indent=2))
except Exception as _e:
st("WARN", f"could not update mid360_config.json host IPs: {_e}")
try:
lidar = Livox2(
str(cfg_path),
host_ip=str(eng_cfg.host_ip),
debug=bool(livox_debug_enabled),
print_every_n_frames=int(livox_print_every_n),
)
lidar.handle_points = on_points
lidar.handle_imu = on_imu
connected = True
st("INFO", f"CONNECTED to Livox (host_ip={eng_cfg.host_ip})")
except Exception as e:
connected = False
err = str(e)
up = err.upper()
if ("SDKINIT" in up) or ("BIND" in up) or ("SOCKET" in up) or ("ADDRESS ALREADY IN USE" in up):
err = (
f"{err}\n"
"Hint: likely UDP port conflict or wrong host IP.\n"
"1) Close other Livox/SLAM apps using ports 56101-56501.\n"
"2) Ensure GUI host IP matches this PC NIC on LiDAR network.\n"
"3) Verify mid360_config.json host_net_info *_ip and *_port values."
)
st("ERROR", f"Connect failed: {err}")
def do_start():
nonlocal mapping_enabled, localize_only_enabled, worker_mode
nonlocal last_guard_pose, last_guard_t, guard_reject_count
nonlocal continuity_prev_pts, continuity_reject_count, continuity_consecutive_rejects, continuity_last_warn_t
nonlocal rotate_freeze_count, rotate_last_warn_t
nonlocal cleanup_last_t, cleanup_last_info
nonlocal last_frame_t, loc_live_stack, latest_live_localize_pts
if not connected:
st("ERROR", "START denied: not connected.")
return
if self_check_report.get("errors"):
st("ERROR", "START denied: startup self-check has errors. Fix config paths first.")
return
mapping_enabled = True
localize_only_enabled = False
last_frame_t = 0.0
loc_live_stack.clear()
latest_live_localize_pts = None
last_guard_pose = None
last_guard_t = 0.0
guard_reject_count = 0
continuity_prev_pts = None
continuity_reject_count = 0
continuity_consecutive_rejects = 0
continuity_last_warn_t = 0.0
rotate_freeze_count = 0
rotate_last_warn_t = 0.0
cleanup_last_t = 0.0
cleanup_last_info = {"status": "na"}
if not _submap_profile_allowed():
submap_mapper.reset()
worker_mode = "MAPPING"
st("INFO", "MAPPING STARTED.")
push_worker_mode()
def do_start_localize_only():
nonlocal mapping_enabled, localize_only_enabled, worker_mode, last_localize_t, last_localize_confidence
nonlocal last_key_pose, last_guard_pose, last_guard_t, guard_reject_count, last_frame_t
nonlocal odom_to_map_transform, continuity_prev_pts, continuity_consecutive_rejects
nonlocal continuity_reject_count, rotate_freeze_count, loc_live_stack
nonlocal slam_to_ref_valid, latest_live_localize_pts
if not connected:
st("ERROR", "LOCALIZE_ONLY denied: not connected.")
return
if not ref_map_path:
st("ERROR", "LOCALIZE_ONLY denied: load reference map first.")
return
# Start localize-only from a fresh odometry frame to avoid stale-map offsets.
init_slam_stack()
mapping_enabled = False
localize_only_enabled = True
last_frame_t = 0.0
loc_frames.reset()
_sync_loc_vars_from_service()
last_key_pose = None
last_guard_pose = None
last_guard_t = 0.0
guard_reject_count = 0
continuity_prev_pts = None
continuity_consecutive_rejects = 0
continuity_reject_count = 0
rotate_freeze_count = 0
loc_live_stack.clear()
latest_live_localize_pts = None
submap_mapper.reset()
worker_mode = "LOCALIZE_ONLY"
last_localize_t = 0.0
st("INFO", "LOCALIZE_ONLY STARTED.")
push_worker_mode()
def do_pause():
nonlocal mapping_enabled, localize_only_enabled, worker_mode
nonlocal loc_live_stack, latest_live_localize_pts
mapping_enabled = False
localize_only_enabled = False
loc_live_stack.clear()
latest_live_localize_pts = None
worker_mode = "PAUSED"
st("INFO", "MAPPING PAUSED.")
push_worker_mode()
def do_stop():
nonlocal mapping_enabled, localize_only_enabled, worker_mode
nonlocal loc_live_stack, latest_live_localize_pts
mapping_enabled = False
localize_only_enabled = False
loc_live_stack.clear()
latest_live_localize_pts = None
worker_mode = "STOPPED"
st("INFO", "MAPPING STOPPED.")
push_worker_mode()
def do_export(filename_base: str):
try:
# Final cleanup pass: flush any stray islands that arrived since the last
# periodic cleanup (which runs every cleanup_period_sec seconds).
if cleanup_enabled:
full_pts = stable.get_points()
if full_pts is not None and len(full_pts) >= int(cleanup_min_total_points):
cleaned, _ = cleanup_map_islands(full_pts)
if cleaned is not None and len(cleaned) >= int(map_cfg.min_points_to_save):
stable.set_points(cleaned)
pts = stable.get_save_points()
n = 0 if pts is None else len(pts)
req = int(map_cfg.min_points_to_save)
if pts is None or n < req:
st("ERROR", f"EXPORT refused: stable points {n}/{req}. Keep mapping and try again.")
return
out_path = stable.export_map(filename_base)
st("INFO", f"SAVED: {out_path}")
except Exception as e:
st("ERROR", f"EXPORT failed: {e}")
def do_export_nav(filename_base: str):
try:
pts = stable.get_save_points()
n_total = 0 if pts is None else len(pts)
req = int(nav_cfg.min_points)
if pts is None or n_total < req:
st("ERROR", f"EXPORT_NAV refused: stable points {n_total}/{req}.")
return
n_band = int(nav_exporter.count_nav_points(pts))
if n_band < req:
st(
"ERROR",
(
"EXPORT_NAV refused: nav-band points "
f"{n_band}/{req} in z[{float(nav_cfg.z_min_m):.2f}, {float(nav_cfg.z_max_m):.2f}] m."
),
)
return
result = nav_exporter.export(filename_base, pts)
st("INFO", {"NAV_EXPORTED": result})
except Exception as e:
st("ERROR", f"EXPORT_NAV failed: {e}")
def do_load_ref(path_value: str):
nonlocal ref_map_path, ref_pcd_cache, ref_points_cache, ref_nav_points_cache, ref_nav_cache_src_n, ref_cache_path
nonlocal last_localize_t, last_localize_points, last_localize_transform, last_localize_confidence
nonlocal slam_to_ref_transform, slam_to_ref_valid
nonlocal approx_pose_guess_tf, approx_pose_guess_until_t
nonlocal approx_pose_guess_fail_count, approx_pose_guess_success_count
nonlocal ref_match_points_cache, ref_match_confirm_hits, ref_match_last_pub_t, ref_match_last_stats
nonlocal loc_live_stack, last_frame_t, latest_live_localize_pts
ref_map_path = str(path_value)
ref_pcd_cache = None
ref_points_cache = None
ref_nav_points_cache = None
ref_nav_cache_src_n = 0
ref_cache_path = None
ref_match_points_cache = None
ref_match_confirm_hits = None
ref_match_last_pub_t = 0.0
ref_match_last_stats = {}
place_recog.reset()
last_frame_t = 0.0
loc_live_stack.clear()
latest_live_localize_pts = None
submap_mapper.reset()
last_localize_t = 0.0
last_localize_points = 0
loc_frames.reset_reference()
_sync_loc_vars_from_service()
approx_pose_guess_tf = None
approx_pose_guess_until_t = 0.0
approx_pose_guess_fail_count = 0
approx_pose_guess_success_count = 0
loc_state.reset()
st("INFO", f"REF loaded: {Path(ref_map_path).name}")
st("INFO", {"PLACE_RECOG": place_recog.snapshot()})
prior_tf = session_store.get_transform(ref_map_path)
if prior_tf is not None:
loc_frames.last_alignment = np.array(prior_tf, dtype=np.float64, copy=True)
_sync_loc_vars_from_service()
st("INFO", {"SESSION": {"prior_loaded": True, "ref": Path(ref_map_path).name}})
# In map-localization workflows we wait for fresh live scans after START,
# instead of trying to localize immediately from stale/non-live points.
if mapping_enabled and (not localize_only_enabled):
do_localize(force=False, source="ON_LOAD")
push_loc_state()
def do_manual_localize():
src_live = latest_live_localize_pts if localize_only_enabled else None
if localize_only_enabled and (src_live is None or int(len(src_live)) < 80):
st("INFO", "LOCALIZE: waiting for live scan...")
return
do_localize(force=True, source="MANUAL", src_override=src_live)
def do_clear_ref():
nonlocal ref_map_path, ref_pcd_cache, ref_points_cache, ref_nav_points_cache, ref_nav_cache_src_n, ref_cache_path
nonlocal last_localize_t, last_localize_points, last_localize_transform, last_localize_confidence
nonlocal slam_to_ref_transform, slam_to_ref_valid
nonlocal localize_only_enabled, worker_mode
nonlocal approx_pose_guess_tf, approx_pose_guess_until_t
nonlocal approx_pose_guess_fail_count, approx_pose_guess_success_count
nonlocal ref_match_points_cache, ref_match_confirm_hits, ref_match_last_pub_t, ref_match_last_stats
nonlocal loc_live_stack, last_frame_t, latest_live_localize_pts
ref_map_path = None
ref_pcd_cache = None
ref_points_cache = None
ref_nav_points_cache = None
ref_nav_cache_src_n = 0
ref_cache_path = None
ref_match_points_cache = None
ref_match_confirm_hits = None
ref_match_last_pub_t = 0.0
ref_match_last_stats = {}
place_recog.reset()
last_frame_t = 0.0
loc_live_stack.clear()
latest_live_localize_pts = None
submap_mapper.reset()
last_localize_t = 0.0
last_localize_points = 0
loc_frames.reset_reference()
_sync_loc_vars_from_service()
approx_pose_guess_tf = None
approx_pose_guess_until_t = 0.0
approx_pose_guess_fail_count = 0
approx_pose_guess_success_count = 0
localize_only_enabled = False
if not mapping_enabled:
worker_mode = "IDLE"
loc_state.reset()
st("INFO", "REF cleared.")
push_worker_mode()
push_loc_state()
st("INFO", "Worker ready. CONNECT then START.")
st("INFO", {"SELF_CHECK": self_check_report, "source": "worker"})
apply_density_mode("MEDIUM")
apply_stability_profile("BALANCED")
apply_min_stable_points(map_cfg.min_points_to_save)
apply_loop_closure_mode(loop_cfg.enabled)
apply_loc_state_machine_mode(loc_state_cfg.enabled)
apply_submap_mode(
{
"enabled": submap_mode_enabled,
"local_window_frames": submap_cfg.local_window_frames,
"local_voxel_m": submap_cfg.local_voxel_m,
"global_voxel_m": submap_cfg.global_voxel_m,
"merge_period_sec": submap_cfg.merge_period_sec,
"merge_min_translation_m": submap_cfg.merge_min_translation_m,
"merge_min_rotation_deg": submap_cfg.merge_min_rotation_deg,
"max_global_points": submap_cfg.max_global_points,
"display_max_points": submap_cfg.display_max_points,
"apply_profiles": list(submap_cfg.apply_profiles),
"keep_points": False,
}
)
apply_autosave(
{
"enabled": autosave_enabled,
"interval_sec": autosave_interval_sec,
"base_name": autosave_base_name,
}
)
apply_nav_config(
{
"min_points": nav_cfg.min_points,
"resolution_m": nav_cfg.resolution_m,
"z_min_m": nav_cfg.z_min_m,
"z_max_m": nav_cfg.z_max_m,
"inflation_radius_m": nav_cfg.inflation_radius_m,
"padding_m": nav_cfg.padding_m,
}
)
apply_map_quality_config(
{
"enabled": map_quality_cfg.enabled,
"near_min_range_m": map_quality_cfg.near_min_range_m,
"ray_consistency_enabled": map_quality_cfg.ray_consistency_enabled,
"ray_bin_deg": map_quality_cfg.ray_bin_deg,
"ray_elev_bin_deg": map_quality_cfg.ray_elev_bin_deg,
"ray_max_behind_m": map_quality_cfg.ray_max_behind_m,
"ray_keep_n": map_quality_cfg.ray_keep_n,
"world_z_clip_enabled": map_quality_cfg.world_z_clip_enabled,
"world_z_min_m": map_quality_cfg.world_z_min_m,
"world_z_max_m": map_quality_cfg.world_z_max_m,
"outlier_filter_enabled": map_quality_cfg.outlier_filter_enabled,
"outlier_voxel_m": map_quality_cfg.outlier_voxel_m,
"outlier_min_points": map_quality_cfg.outlier_min_points,
}
)
apply_nav_runtime_config(
{
"enabled": nav_rt_cfg.enabled,
"resolution_m": nav_rt_cfg.resolution_m,
"z_min_m": nav_rt_cfg.z_min_m,
"z_max_m": nav_rt_cfg.z_max_m,
"padding_m": nav_rt_cfg.padding_m,
"inflation_radius_m": nav_rt_cfg.inflation_radius_m,
"dynamic_decay_sec": nav_rt_cfg.dynamic_decay_sec,
"dynamic_min_hits": nav_rt_cfg.dynamic_min_hits,
"blocked_cost": nav_rt_cfg.blocked_cost,
}
)
st("INFO", {"MISSION": mission.snapshot()})
push_loc_state()
push_worker_mode()
while running:
try:
cmd = None
try:
cmd = cmd_q.get(timeout=0.02)
except queue.Empty:
cmd = None
if cmd:
if isinstance(cmd, tuple):
name = str(cmd[0]).upper().strip()
payload = cmd[1] if len(cmd) > 1 else None
else:
name = str(cmd).upper().strip()
payload = None
if name == "CONNECT":
_run_locked(do_connect)
elif name == "START":
_run_locked(do_start)
elif name == "START_LOCALIZE_ONLY":
_run_locked(do_start_localize_only)
elif name == "STOP_LOCALIZE_ONLY":
_run_locked(do_stop)
elif name == "PAUSE":
_run_locked(do_pause)
elif name == "STOP":
_run_locked(do_stop)
elif name == "RESET":
_run_locked(reset_all)
elif name == "EXPORT":
_run_locked(do_export, str(payload or "map_robot"))
elif name == "EXPORT_NAV":
_run_locked(do_export_nav, str(payload or "map_robot"))
elif name == "LOAD_REF":
_run_locked(do_load_ref, str(payload))
elif name == "LOCALIZE":
_run_locked(do_manual_localize)
elif name == "CLEAR_REF":
_run_locked(do_clear_ref)
elif name == "SET_DENSITY":
_run_locked(apply_density_mode, str(payload or "MEDIUM"))
elif name == "SET_MIN_STABLE_POINTS":
_run_locked(apply_min_stable_points, payload)
elif name == "SET_LOOP_CLOSURE":
_run_locked(apply_loop_closure_mode, payload)
elif name == "SET_LOC_STATE_MACHINE":
_run_locked(apply_loc_state_machine_mode, payload)
elif name == "SET_SUBMAP_MODE":
_run_locked(apply_submap_mode, payload)
elif name == "SET_APPROX_POSE":
_run_locked(apply_approx_pose, payload if isinstance(payload, dict) else None)
elif name == "CLEAR_APPROX_POSE":
_run_locked(apply_approx_pose, None)
elif name == "SET_AUTOSAVE":
_run_locked(apply_autosave, payload if isinstance(payload, dict) else {})
elif name == "SET_NAV_CONFIG":
_run_locked(apply_nav_config, payload if isinstance(payload, dict) else {})
elif name == "SET_NAV_RUNTIME":
_run_locked(apply_nav_runtime_config, payload if isinstance(payload, dict) else {})
elif name == "SET_NAV_GOAL":
_run_locked(apply_nav_goal, payload if isinstance(payload, dict) else {})
elif name == "CLEAR_NAV_GOAL":
_run_locked(clear_nav_goal)
elif name == "SET_MAP_QUALITY":
_run_locked(apply_map_quality_config, payload if isinstance(payload, dict) else {})
elif name == "SET_FILTER_TUNING":
_run_locked(apply_filter_tuning, payload if isinstance(payload, dict) else {})
elif name == "SET_STABILITY_PROFILE":
_run_locked(apply_stability_profile, payload)
elif name == "RECORD_START":
_run_locked(record_start, payload if isinstance(payload, dict) else {})
elif name == "RECORD_STOP":
_run_locked(record_stop, payload if isinstance(payload, dict) else {})
elif name == "MISSION_START":
_run_locked(mission_start, payload if isinstance(payload, dict) else {})
elif name == "MISSION_PAUSE":
_run_locked(mission_pause)
elif name == "MISSION_RESUME":
_run_locked(mission_resume)
elif name == "MISSION_STOP":
_run_locked(mission_stop)
elif name == "SENSOR_PRIOR":
if isinstance(payload, dict):
def _apply_sensor_prior():
ok = sensor_fusion.update_prior(
sensor=str(payload.get("sensor", "unknown")),
pose=payload.get("pose"),
confidence=float(payload.get("confidence", 1.0)),
timestamp=float(payload.get("timestamp", _now())),
)
if not ok:
st("WARN", "SENSOR_PRIOR ignored: invalid payload.")
_run_locked(_apply_sensor_prior)
else:
st("WARN", "SENSOR_PRIOR ignored: payload must be dict.")
elif name == "SHUTDOWN":
running = False
else:
st("WARN", f"Unknown command: {name}")
latest_pts = None
try:
while True:
latest_pts = raw_points_q.get_nowait()
except queue.Empty:
pass
if latest_pts is not None:
_run_locked(process_points, latest_pts)
except KeyboardInterrupt:
break
except Exception as e:
if diagnostics is not None:
diagnostics.log_exception("Worker loop error", e, _worker_diag_state())
st("ERROR", f"Loop error: {e}\n{traceback.format_exc()}")
try:
if lidar is not None and hasattr(lidar, "shutdown"):
lidar.shutdown()
except Exception:
pass
try:
if recording_enabled and len(recording_frames) > 0:
record_stop({"save": True, "base_name": recording_base_name})
except Exception:
pass
try:
if diagnostics is not None:
diagnostics.close()
except Exception:
pass
st("INFO", "Worker shutdown.")