"""Wrapper for Livox-SDK **2** (push-mode, no broadcast). Tested against Livox-SDK2 1.2.x – build it first: git clone https://github.com/Livox-SDK/Livox-SDK2.git cd Livox-SDK2 && mkdir build && cd build cmake .. -DCMAKE_BUILD_TYPE=Release && make -j$(nproc) sudo make install # installs liblivox_lidar_sdk.so → /usr/local/lib Create a JSON config (see livox_lidar_quick_start/mid360_config.json) that points the LiDAR to *your* host-IP (192.168.123.222) and save it e.g. as ``mid360_config.json`` in this repo. Pass that path to ``Livox2``. """ from __future__ import annotations import ctypes as _C import json import os import sys import threading import time from ctypes import ( POINTER, c_char_p, c_uint8, c_uint16, c_uint32, c_float, c_bool, ) from pathlib import Path from typing import Optional import numpy as np # ---------------- dynamic library ------------------------------------------------ def _load_lib(): for name in ( "liblivox_lidar_sdk_shared.so", "liblivox_lidar_sdk.so", "livox_lidar_sdk.dll", # Windows ): try: return _C.cdll.LoadLibrary(name) except OSError: continue raise OSError( "liblivox_lidar_sdk shared library not found. Build & install " "Livox-SDK2 first (see wrapper docstring)." ) _lib = _load_lib() # ---------------- ctypes mapping -------------------------------------------------- class _LivoxLidarEthernetPacket(_C.Structure): _pack_ = 1 _fields_ = [ ("version", c_uint8), ("length", c_uint16), ("time_interval", c_uint16), ("dot_num", c_uint16), ("udp_cnt", c_uint16), ("frame_cnt", c_uint8), ("data_type", c_uint8), ("time_type", c_uint8), ("rsvd", c_uint8 * 12), ("crc32", c_uint32), ("timestamp", c_uint8 * 8), ("data", c_uint8 * 1), ] class _ImuRawPoint(_C.Structure): """MID-360 IMU sample (data_type == 0). 200 Hz, 6-DOF BMI088.""" _pack_ = 1 _fields_ = [ ("gyro_x", c_float), # rad/s ("gyro_y", c_float), ("gyro_z", c_float), ("acc_x", c_float), # m/s² ("acc_y", c_float), ("acc_z", c_float), ] class _CartesianHighPoint(_C.Structure): _pack_ = 1 _fields_ = [ ("x", _C.c_int32), ("y", _C.c_int32), ("z", _C.c_int32), ("reflectivity", c_uint8), ("tag", c_uint8), ] # Callback typedef _PointCb = _C.CFUNCTYPE(None, c_uint32, c_uint8, POINTER(_LivoxLidarEthernetPacket), _C.c_void_p) # Info change callback class _LivoxLidarInfo(_C.Structure): _fields_ = [ ("dev_type", c_uint8), ("sn", _C.c_char * 16), ("lidar_ip", _C.c_char * 16), ] _InfoChangeCb = _C.CFUNCTYPE(None, c_uint32, POINTER(_LivoxLidarInfo), _C.c_void_p) # --------------------------------------------------------------------------- # Additional API we use for push-mode # --------------------------------------------------------------------------- _lib.SetLivoxLidarInfoChangeCallback.argtypes = (_InfoChangeCb, _C.c_void_p) _lib.SetLivoxLidarWorkMode.argtypes = (c_uint32, c_uint8, _C.c_void_p, _C.c_void_p) _lib.SetLivoxLidarWorkMode.restype = c_uint32 _lib.EnableLivoxLidarPointSend.argtypes = (c_uint32, _C.c_void_p, _C.c_void_p) _lib.EnableLivoxLidarPointSend.restype = c_uint32 _lib.SetLivoxLidarPclDataType.argtypes = (c_uint32, c_uint8, _C.c_void_p, _C.c_void_p) # Point-cloud observer (interface side; lets SDK join multicast) _lib.LivoxLidarAddPointCloudObserver.argtypes = (_PointCb, _C.c_void_p) _lib.LivoxLidarAddPointCloudObserver.restype = c_uint16 # ---------------- function prototypes ------------------------------------------- _lib.LivoxLidarSdkInit.argtypes = (c_char_p, c_char_p, _C.c_void_p) _lib.LivoxLidarSdkInit.restype = c_bool _lib.LivoxLidarSdkStart.argtypes = () _lib.LivoxLidarSdkStart.restype = c_bool _lib.LivoxLidarSdkUninit.argtypes = () _lib.LivoxLidarSdkUninit.restype = None _lib.SetLivoxLidarPointCloudCallBack.argtypes = (_PointCb, _C.c_void_p) # ---------------- Pythonic wrapper ---------------------------------------------- class Livox2: """Minimal wrapper around Livox-SDK2 push-mode pipeline.""" def __init__(self, config_path: str | Path, host_ip: str, *, frame_time: float = 0.20, frame_packets: int = 120, debug: bool = True, print_every_n_frames: int = 1): self._config_path = os.fspath(config_path).encode() if not _lib.LivoxLidarSdkInit(self._config_path, host_ip.encode(), None): raise RuntimeError("LivoxLidarSdkInit failed – check config path & JSON") # Register callback *before* starting threads (matches vendor sample) self._cb = _PointCb(self._on_packet) _lib.SetLivoxLidarPointCloudCallBack(self._cb, None) # start SDK threads _lib.LivoxLidarSdkStart() # Register info-change callback to learn lidar handle once, then start it. self._info_cb = _InfoChangeCb(self._on_info_change) _lib.SetLivoxLidarInfoChangeCallback(self._info_cb, None) self._running = True # Aggregation parameters for pseudo-frames self._frame_time = float(frame_time) self._frame_packets = int(frame_packets) self._debug = bool(debug) self._print_every_n_frames = max(1, int(print_every_n_frames)) self._frame_print_counter = 0 self._frame_state: dict = {} self._frame_lock = threading.Lock() # ------------------------------------------------------------------ def spin(self): try: while self._running: time.sleep(0.01) except KeyboardInterrupt: pass finally: self.shutdown() def shutdown(self): if self._running: _lib.LivoxLidarSdkUninit() self._running = False # ------------------------------------------------------------------ def handle_points(self, xyz: np.ndarray): # noqa: D401 if self._debug: print(f"frame {len(xyz)} pts") def handle_imu(self, gyro: np.ndarray, acc: np.ndarray, timestamp: float) -> None: # noqa: D401 """Override to receive IMU data. Parameters ---------- gyro : (3,) float32 – angular velocity in rad/s (x, y, z) acc : (3,) float32 – linear acceleration in m/s² (x, y, z) timestamp : float – time.time() when packet was received """ # ------------------------------------------------------------------ def _on_packet(self, handle: int, dev_type: int, pkt_ptr, _client): pkt = pkt_ptr.contents n = pkt.dot_num if n == 0: return if pkt.data_type == 0: # IMU samples (200 Hz, BMI088) _ArrI = _ImuRawPoint * n samples = _C.cast(pkt.data, POINTER(_ArrI)).contents arr = np.ctypeslib.as_array(samples) # Average all samples in the packet (typically 1–5 at 200 Hz) gyro = np.array( [arr["gyro_x"].mean(), arr["gyro_y"].mean(), arr["gyro_z"].mean()], dtype=np.float32, ) acc = np.array( [arr["acc_x"].mean(), arr["acc_y"].mean(), arr["acc_z"].mean()], dtype=np.float32, ) try: self.handle_imu(gyro, acc, time.time()) except Exception as exc: print(f"[Livox2] Exception in handle_imu: {exc}", file=sys.stderr) return if pkt.data_type == 1: # Cartesian High _Arr = _CartesianHighPoint * n points = _C.cast(pkt.data, POINTER(_Arr)).contents arr = np.ctypeslib.as_array(points) xyz = np.stack((arr["x"], arr["y"], arr["z"]), axis=1).astype(np.float32) / 1000.0 elif pkt.data_type == 2: # Cartesian Low (int16, cm) class _LowPoint(_C.Structure): _fields_ = [ ("x", _C.c_int16), ("y", _C.c_int16), ("z", _C.c_int16), ("reflectivity", c_uint8), ("tag", c_uint8), ] _ArrL = _LowPoint * n pts = _C.cast(pkt.data, POINTER(_ArrL)).contents arr = np.ctypeslib.as_array(pts) xyz = np.stack((arr["x"], arr["y"], arr["z"]), axis=1).astype(np.float32) / 100.0 else: return # -------------------------------------------------------------- # Aggregate packets belonging to the same "frame" (full 360°) # -------------------------------------------------------------- # Each UDP packet contains only a tiny slice of a full scan – for the # MID-360 that's merely 96 points. Feeding such sparse subsets into a # SLAM backend like KISS-ICP is ineffective and typically produces an # empty map. The packet header provides a monotonically increasing # `frame_cnt` field which we can use to group packets that belong to # the same rotation. We buffer points until the counter changes, then # emit the *previous* frame in one batch via ``handle_points``. # # A small dictionary maps → current frame accumulator so # that multi-lidar setups would still work (although untested). # -------------------------------------------------------------- # ------------------------------------------------------------------ # Aggregate packets for ~1 full rotation (≈50 ms @ 20 Hz) # Lock only protects buffer state; handle_points is called outside. # ------------------------------------------------------------------ frame_xyz = None elapsed = 0.0 with self._frame_lock: buf, last_t = self._frame_state.get(handle, ([], time.time())) buf.append(xyz) now = time.time() elapsed = now - last_t # Heuristic flush conditions: either 0.2 s have passed (≈4 full scans # at 20 Hz) *or* we already gathered ≥ 120 packets (~12 k points). if elapsed >= self._frame_time or len(buf) >= self._frame_packets: frame_xyz = np.concatenate(buf, axis=0) self._frame_state[handle] = ([], now) else: self._frame_state[handle] = (buf, last_t) if frame_xyz is not None: self._frame_print_counter += 1 if self._debug and (self._frame_print_counter % self._print_every_n_frames == 0): print(f"[Livox2] frame {frame_xyz.shape[0]} pts (Δt={elapsed*1000:.1f} ms)") try: self.handle_points(frame_xyz) except Exception as exc: print("Exception in handle_points:", exc, file=sys.stderr) # ------------------------------------------------------------------ def _on_info_change(self, handle: int, info_ptr, _client): if self._debug: print(f"[Livox2] InfoChange handle={handle}") # Set work mode to NORMAL (1) to begin emitting points. kNormal = 1 _lib.SetLivoxLidarWorkMode(handle, kNormal, None, None) # Ensure point-cloud sending is enabled _lib.EnableLivoxLidarPointSend(handle, None, None) # Ensure data type is Cartesian High (1) _lib.SetLivoxLidarPclDataType(handle, 1, None, None) if __name__ == "__main__": cfg = Path("mid360_config.json") if not cfg.exists(): # generate a bare-bones config for 192.168.123.222 host_ip = os.environ.get("HOST_IP", "192.168.123.222") data = { "MID360": { "lidar_net_info": { "cmd_data_port": 56100, "push_msg_port": 56200, "point_data_port": 56300, "imu_data_port": 56400, "log_data_port": 56500, }, "host_net_info": [ { "host_ip": host_ip, "multicast_ip": "224.1.1.5", "cmd_data_port": 56101, "push_msg_port": 56201, "point_data_port": 56301, "imu_data_port": 56401, "log_data_port": 56501, } ], } } cfg.write_text(json.dumps(data, indent=2)) print("[Livox2] Wrote default mid360_config.json with host_ip", host_ip) lidar = Livox2(cfg, host_ip="192.168.123.222") lidar.spin()