Marcus/API/lidar_api.py
2026-04-12 18:50:22 +04:00

283 lines
9.6 KiB
Python

"""
lidar_api.py — Livox Mid-360 LiDAR interface via SlamEngineClient
==================================================================
Background thread polls SLAM data_q at 20Hz.
Exposes obstacle_ahead(), get_slam_pose(), get_nav_cmd(), get_loc_state().
The SLAM subprocess handles all point cloud processing, costmap building,
and safety evaluation internally. We just read the results.
"""
import math
import os
import sys
import threading
import time
import numpy as np
from Core.config_loader import load_config, config_path
from Core.logger import log
_cfg = load_config("LiDAR")
LIDAR_AVAILABLE = False
# ── Shared state (thread-safe) ───────────────────────────────────────────────
_lock = threading.Lock()
_latest_safety = [{}]
_latest_pose = [None] # 4x4 numpy or None
_latest_nav = [{}]
_latest_loc_state = ["IDLE"]
_latest_mode = ["IDLE"]
_latest_perf = [{}]
_lidar_alive = [False]
_client = [None]
_last_frame_time = [0.0]
# ══════════════════════════════════════════════════════════════════════════════
# INIT
# ══════════════════════════════════════════════════════════════════════════════
def init_lidar():
"""
Start SLAM subprocess + poll thread.
Returns True if LiDAR connected successfully.
"""
global LIDAR_AVAILABLE
if not _cfg.get("enabled", True):
print(" [LiDAR] Disabled in config")
return False
slam_dir = config_path(_cfg.get("slam_source_dir", "Lidar"))
if slam_dir not in sys.path:
sys.path.insert(0, slam_dir)
try:
from SLAM_engine import SlamEngineClient
except ImportError as e:
print(f" [LiDAR] SlamEngineClient not found: {e}")
log(f"LiDAR import failed: {e}", "error", "lidar")
return False
try:
client = SlamEngineClient()
_client[0] = client
print(" [LiDAR] SLAM engine created")
log("SLAM engine created", "info", "lidar")
except Exception as e:
print(f" [LiDAR] SLAM init error: {e}")
log(f"SLAM init error: {e}", "error", "lidar")
return False
# Start subprocess + connect to hardware
if _cfg.get("auto_connect", True):
try:
client.start_process()
client.connect()
print(" [LiDAR] Connected to Livox Mid-360")
log("Connected to LiDAR", "info", "lidar")
if _cfg.get("start_localize_only", False):
client.start_localize_only()
print(" [LiDAR] Localize-only mode")
elif _cfg.get("auto_start_mapping", False):
client.start_mapping()
print(" [LiDAR] Mapping started")
except Exception as e:
print(f" [LiDAR] Connect error: {e}")
log(f"LiDAR connect error: {e}", "error", "lidar")
return False
# Start poll thread
_lidar_alive[0] = True
threading.Thread(target=_poll_loop, daemon=True, name="lidar-poll").start()
# Wait for first FRAME (up to 5s)
deadline = time.time() + 5.0
while time.time() < deadline:
if _last_frame_time[0] > 0:
break
time.sleep(0.1)
if _last_frame_time[0] > 0:
LIDAR_AVAILABLE = True
print(" [LiDAR] Receiving data")
log("LiDAR available — receiving frames", "info", "lidar")
else:
print(" [LiDAR] No data yet (will keep trying)")
LIDAR_AVAILABLE = True # still mark available — data may arrive later
return True
# ══════════════════════════════════════════════════════════════════════════════
# POLL LOOP
# ══════════════════════════════════════════════════════════════════════════════
def _poll_loop():
"""Drain data_q, keep latest FRAME payload, update shared state."""
client = _client[0]
if client is None:
return
poll_interval = 1.0 / max(1, _cfg.get("poll_hz", 20))
while _lidar_alive[0]:
try:
# Drain data_q — keep only the latest FRAME
payload = None
while True:
try:
item = client.data_q.get_nowait()
if isinstance(item, tuple) and len(item) >= 2 and item[0] == "FRAME":
payload = item[1]
except Exception:
break
if payload is not None:
_update_state(payload)
# Drain status_q (just log, don't block)
while True:
try:
msg = client.status_q.get_nowait()
if isinstance(msg, tuple) and len(msg) >= 2:
level, text = str(msg[0]), str(msg[1])
if level == "ERROR":
log(f"SLAM: {text}", "error", "lidar")
elif level == "WARN":
log(f"SLAM: {text}", "warning", "lidar")
except Exception:
break
except Exception as e:
log(f"LiDAR poll error: {e}", "error", "lidar")
time.sleep(poll_interval)
def _update_state(payload):
"""Extract safety/pose/nav from FRAME payload into shared state."""
with _lock:
_latest_safety[0] = payload.get("safety", {})
_latest_pose[0] = payload.get("pose")
_latest_nav[0] = payload.get("nav", {})
_latest_loc_state[0] = str(payload.get("loc_state", "IDLE"))
_latest_mode[0] = str(payload.get("mode", "IDLE"))
_latest_perf[0] = payload.get("perf", {})
_last_frame_time[0] = time.time()
# ══════════════════════════════════════════════════════════════════════════════
# PUBLIC API
# ══════════════════════════════════════════════════════════════════════════════
def obstacle_ahead(radius=None):
"""
Returns True if the SLAM safety system detected an obstacle.
Checks safety.emergency OR safety.hold from the latest FRAME.
This uses the SLAM's own costmap + SafetySupervisor — 0.50m collision zone,
localization loss detection, and 0.8s emergency hold.
Safe to call even when LiDAR is not available (returns False).
"""
if not LIDAR_AVAILABLE or _last_frame_time[0] == 0:
return False
with _lock:
safety = _latest_safety[0]
if not safety:
return False
return bool(safety.get("emergency", False)) or bool(safety.get("hold", False))
def get_slam_pose():
"""
Returns (x, y, heading_deg) from the SLAM 4x4 pose matrix.
Returns None if no pose available.
More accurate than dead reckoning (±2cm vs ±10cm).
"""
with _lock:
pose = _latest_pose[0]
if pose is None:
return None
pose = np.asarray(pose, dtype=np.float64)
if pose.shape != (4, 4):
return None
x = float(pose[0, 3])
y = float(pose[1, 3])
yaw = math.degrees(math.atan2(float(pose[1, 0]), float(pose[0, 0])))
return {"x": round(x, 4), "y": round(y, 4), "heading": round(yaw, 2), "source": "slam"}
def get_nav_cmd():
"""
Returns the SLAM path planner's recommended velocity command.
{"linear_mps": float, "angular_rps": float, "blocked": bool, "goal_reached": bool}
Returns None if no nav data.
"""
with _lock:
nav = _latest_nav[0]
cmd = nav.get("cmd") if nav else None
if not cmd or not isinstance(cmd, dict):
return None
return cmd
def get_loc_state():
"""Returns 'TRACKING', 'DEGRADED', 'LOST', or 'RECOVERY'."""
with _lock:
return _latest_loc_state[0]
def get_safety_reasons():
"""Returns list of current safety trigger reasons, or empty list."""
with _lock:
safety = _latest_safety[0]
return safety.get("reasons", []) if safety else []
def get_lidar_status():
"""Full status dict for diagnostics."""
with _lock:
return {
"available": LIDAR_AVAILABLE,
"mode": _latest_mode[0],
"loc_state": _latest_loc_state[0],
"safety": dict(_latest_safety[0]) if _latest_safety[0] else {},
"perf": dict(_latest_perf[0]) if _latest_perf[0] else {},
"last_frame_age": round(time.time() - _last_frame_time[0], 1) if _last_frame_time[0] > 0 else -1,
"pose": get_slam_pose(),
}
def get_client():
"""Return the SlamEngineClient instance (for direct commands)."""
return _client[0]
def stop_lidar():
"""Clean shutdown."""
global LIDAR_AVAILABLE
_lidar_alive[0] = False
LIDAR_AVAILABLE = False
client = _client[0]
if client is not None:
try:
client.stop_process()
except Exception:
pass
log("LiDAR stopped", "info", "lidar")
print(" [LiDAR] Stopped")