Marcus/Lidar/SLAM_NavRuntime.py
2026-04-12 18:50:22 +04:00

515 lines
19 KiB
Python

from __future__ import annotations
import heapq
import math
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
def _safe_float(v: Any, default: float) -> float:
try:
return float(v)
except Exception:
return float(default)
def _safe_int(v: Any, default: int) -> int:
try:
return int(v)
except Exception:
return int(default)
def _as_bool(v: Any, default: bool) -> bool:
if v is None:
return default
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return bool(v)
return str(v).strip().lower() in ("1", "true", "yes", "on")
def _inflate_binary(mask: np.ndarray, radius_cells: int) -> np.ndarray:
if radius_cells <= 0:
return mask.copy()
h, w = mask.shape
out = mask.copy()
rr = int(radius_cells) * int(radius_cells)
offsets: List[Tuple[int, int]] = []
for dy in range(-radius_cells, radius_cells + 1):
for dx in range(-radius_cells, radius_cells + 1):
if (dy * dy + dx * dx) <= rr:
offsets.append((dy, dx))
for dy, dx in offsets:
ys = max(0, -dy)
ye = min(h, h - dy)
xs = max(0, -dx)
xe = min(w, w - dx)
yd = max(0, dy)
xd = max(0, dx)
out[yd : yd + (ye - ys), xd : xd + (xe - xs)] |= mask[ys:ye, xs:xe]
return out
@dataclass
class LiveCostmapConfig:
enabled: bool = True
resolution_m: float = 0.10
z_min_m: float = -0.40
z_max_m: float = 1.20
padding_m: float = 0.80
inflation_radius_m: float = 0.25
dynamic_decay_sec: float = 1.5
dynamic_min_hits: int = 2
blocked_cost: int = 220
max_width_cells: int = 900
max_height_cells: int = 900
@staticmethod
def from_dict(d: Dict[str, Any] | None) -> "LiveCostmapConfig":
src = d or {}
return LiveCostmapConfig(
enabled=_as_bool(src.get("enabled", True), True),
resolution_m=max(0.02, _safe_float(src.get("resolution_m", 0.10), 0.10)),
z_min_m=_safe_float(src.get("z_min_m", -0.40), -0.40),
z_max_m=_safe_float(src.get("z_max_m", 1.20), 1.20),
padding_m=max(0.0, _safe_float(src.get("padding_m", 0.80), 0.80)),
inflation_radius_m=max(0.0, _safe_float(src.get("inflation_radius_m", 0.25), 0.25)),
dynamic_decay_sec=max(0.2, _safe_float(src.get("dynamic_decay_sec", 1.5), 1.5)),
dynamic_min_hits=max(1, _safe_int(src.get("dynamic_min_hits", 2), 2)),
blocked_cost=int(np.clip(_safe_int(src.get("blocked_cost", 220), 220), 100, 255)),
max_width_cells=max(100, _safe_int(src.get("max_width_cells", 900), 900)),
max_height_cells=max(100, _safe_int(src.get("max_height_cells", 900), 900)),
)
def patched(self, patch: Dict[str, Any] | None) -> "LiveCostmapConfig":
src = patch or {}
return LiveCostmapConfig.from_dict(
{
"enabled": src.get("enabled", self.enabled),
"resolution_m": src.get("resolution_m", self.resolution_m),
"z_min_m": src.get("z_min_m", self.z_min_m),
"z_max_m": src.get("z_max_m", self.z_max_m),
"padding_m": src.get("padding_m", self.padding_m),
"inflation_radius_m": src.get("inflation_radius_m", self.inflation_radius_m),
"dynamic_decay_sec": src.get("dynamic_decay_sec", self.dynamic_decay_sec),
"dynamic_min_hits": src.get("dynamic_min_hits", self.dynamic_min_hits),
"blocked_cost": src.get("blocked_cost", self.blocked_cost),
"max_width_cells": src.get("max_width_cells", self.max_width_cells),
"max_height_cells": src.get("max_height_cells", self.max_height_cells),
}
)
class LiveCostmapRuntime:
"""
Maintains real-time static + dynamic + inflated obstacle layers.
Static layer comes from SLAM stable points; dynamic layer comes from live points.
"""
def __init__(self, cfg: LiveCostmapConfig):
self.cfg = cfg
self._dynamic_cells: Dict[Tuple[int, int], Tuple[int, float]] = {}
self._grid: Optional[Dict[str, Any]] = None
def reset(self) -> None:
self._dynamic_cells.clear()
self._grid = None
def _clip_nav_band(self, points: Optional[np.ndarray]) -> np.ndarray:
if points is None:
return np.zeros((0, 3), dtype=np.float32)
pts = np.asarray(points, dtype=np.float32)
if pts.ndim != 2 or pts.shape[1] < 3 or len(pts) == 0:
return np.zeros((0, 3), dtype=np.float32)
zmin = float(self.cfg.z_min_m)
zmax = float(self.cfg.z_max_m)
m = (pts[:, 2] >= zmin) & (pts[:, 2] <= zmax)
return pts[m, :3]
@staticmethod
def _grid_indices(xy: np.ndarray, min_xy: np.ndarray, res: float, w: int, h: int) -> Tuple[np.ndarray, np.ndarray]:
gx = np.floor((xy[:, 0] - min_xy[0]) / res).astype(np.int32)
gy = np.floor((xy[:, 1] - min_xy[1]) / res).astype(np.int32)
gx = np.clip(gx, 0, w - 1)
gy = np.clip(gy, 0, h - 1)
return gx, gy
def world_to_grid(self, x: float, y: float) -> Optional[Tuple[int, int]]:
if self._grid is None:
return None
origin = np.asarray(self._grid["origin_xy"], dtype=np.float64)
res = float(self._grid["resolution_m"])
shape = tuple(self._grid["shape_hw"])
h, w = int(shape[0]), int(shape[1])
gx = int(math.floor((float(x) - origin[0]) / res))
gy = int(math.floor((float(y) - origin[1]) / res))
if gx < 0 or gy < 0 or gx >= w or gy >= h:
return None
return gx, gy
def grid_to_world(self, gx: int, gy: int) -> Optional[Tuple[float, float]]:
if self._grid is None:
return None
origin = np.asarray(self._grid["origin_xy"], dtype=np.float64)
res = float(self._grid["resolution_m"])
x = origin[0] + (float(gx) + 0.5) * res
y = origin[1] + (float(gy) + 0.5) * res
return float(x), float(y)
def cost_at_world(self, x: float, y: float) -> int:
if self._grid is None:
return 0
idx = self.world_to_grid(x, y)
if idx is None:
return 255
gx, gy = idx
cost = np.asarray(self._grid["costmap"], dtype=np.uint8)
return int(cost[gy, gx])
def occupied_near(self, x: float, y: float, radius_m: float, cost_thresh: Optional[int] = None) -> bool:
if self._grid is None:
return False
thresh = int(self.cfg.blocked_cost if cost_thresh is None else cost_thresh)
idx = self.world_to_grid(x, y)
if idx is None:
return True
gx, gy = idx
cost = np.asarray(self._grid["costmap"], dtype=np.uint8)
h, w = cost.shape
rc = int(math.ceil(max(0.0, float(radius_m)) / float(self._grid["resolution_m"])))
ys = max(0, gy - rc)
ye = min(h, gy + rc + 1)
xs = max(0, gx - rc)
xe = min(w, gx + rc + 1)
return bool(np.any(cost[ys:ye, xs:xe] >= thresh))
def _compute_bounds(self, static_pts: np.ndarray, live_pts: np.ndarray) -> Optional[Tuple[np.ndarray, np.ndarray]]:
has_static = len(static_pts) > 0
has_live = len(live_pts) > 0
if not has_static and not has_live:
return None
if has_static and has_live:
all_xy = np.vstack((static_pts[:, :2], live_pts[:, :2]))
elif has_static:
all_xy = static_pts[:, :2]
else:
all_xy = live_pts[:, :2]
pad = float(self.cfg.padding_m)
mn = all_xy.min(axis=0) - pad
mx = all_xy.max(axis=0) + pad
return mn.astype(np.float64), mx.astype(np.float64)
def _clamp_grid_size(self, mn: np.ndarray, mx: np.ndarray, res: float) -> Tuple[np.ndarray, np.ndarray, float, int, int]:
span = np.maximum(mx - mn, res)
w = int(np.ceil(span[0] / res)) + 1
h = int(np.ceil(span[1] / res)) + 1
res_out = float(res)
if w > int(self.cfg.max_width_cells) or h > int(self.cfg.max_height_cells):
sx = float(w) / float(self.cfg.max_width_cells)
sy = float(h) / float(self.cfg.max_height_cells)
scale = max(sx, sy)
res_out = res_out * scale
span = np.maximum(mx - mn, res_out)
w = int(np.ceil(span[0] / res_out)) + 1
h = int(np.ceil(span[1] / res_out)) + 1
return mn, mx, float(res_out), int(w), int(h)
def update(
self,
stable_points_world: Optional[np.ndarray],
live_points_world: Optional[np.ndarray],
now: float,
) -> Optional[Dict[str, Any]]:
if not self.cfg.enabled:
return None
static_pts = self._clip_nav_band(stable_points_world)
live_pts = self._clip_nav_band(live_points_world)
bounds = self._compute_bounds(static_pts, live_pts)
if bounds is None:
# decay dynamic memory even when there is no input
self._dynamic_cells = {
k: v
for k, v in self._dynamic_cells.items()
if (float(now) - float(v[1])) <= float(self.cfg.dynamic_decay_sec)
}
return None
mn, mx = bounds
mn, mx, res, w, h = self._clamp_grid_size(mn, mx, float(self.cfg.resolution_m))
static_occ = np.zeros((h, w), dtype=bool)
if len(static_pts) > 0:
gx, gy = self._grid_indices(static_pts[:, :2], mn, res, w, h)
static_occ[gy, gx] = True
# age out dynamic memory first
max_age = float(self.cfg.dynamic_decay_sec)
new_dyn: Dict[Tuple[int, int], Tuple[int, float]] = {}
for (cx, cy), (hits, ts) in self._dynamic_cells.items():
if (float(now) - float(ts)) <= max_age and 0 <= cx < w and 0 <= cy < h:
new_dyn[(cx, cy)] = (int(hits), float(ts))
self._dynamic_cells = new_dyn
if len(live_pts) > 0:
gx_l, gy_l = self._grid_indices(live_pts[:, :2], mn, res, w, h)
for gx, gy in zip(gx_l.tolist(), gy_l.tolist()):
if static_occ[gy, gx]:
continue
k = (int(gx), int(gy))
old_hits, _ = self._dynamic_cells.get(k, (0, float(now)))
self._dynamic_cells[k] = (int(old_hits) + 1, float(now))
dynamic_occ = np.zeros((h, w), dtype=bool)
min_hits = int(self.cfg.dynamic_min_hits)
for (gx, gy), (hits, ts) in self._dynamic_cells.items():
if hits >= min_hits and (float(now) - float(ts)) <= max_age:
if 0 <= gx < w and 0 <= gy < h:
dynamic_occ[gy, gx] = True
occ = static_occ | dynamic_occ
inf_cells = int(np.ceil(float(self.cfg.inflation_radius_m) / res))
inflated = _inflate_binary(occ, inf_cells)
cost = np.zeros((h, w), dtype=np.uint8)
cost[inflated] = 180
cost[dynamic_occ] = 220
cost[static_occ] = 255
out = {
"costmap": cost,
"static_mask": static_occ,
"dynamic_mask": dynamic_occ,
"inflated_mask": inflated,
"origin_xy": mn.astype(np.float32),
"resolution_m": float(res),
"shape_hw": [int(h), int(w)],
"static_cells": int(np.count_nonzero(static_occ)),
"dynamic_cells": int(np.count_nonzero(dynamic_occ)),
"inflated_cells": int(np.count_nonzero(inflated)),
}
self._grid = out
return {
"shape": [int(h), int(w)],
"resolution_m": float(res),
"origin_xy": [float(mn[0]), float(mn[1])],
"static_cells": int(out["static_cells"]),
"dynamic_cells": int(out["dynamic_cells"]),
"inflated_cells": int(out["inflated_cells"]),
}
@property
def grid(self) -> Optional[Dict[str, Any]]:
return self._grid
class GlobalAStarPlanner:
def __init__(self, blocked_cost: int = 220):
self.blocked_cost = int(np.clip(int(blocked_cost), 100, 255))
@staticmethod
def _heur(a: Tuple[int, int], b: Tuple[int, int]) -> float:
return float(math.hypot(float(a[0] - b[0]), float(a[1] - b[1])))
@staticmethod
def _neighbors(x: int, y: int, w: int, h: int) -> Iterable[Tuple[int, int, float]]:
for dy in (-1, 0, 1):
for dx in (-1, 0, 1):
if dx == 0 and dy == 0:
continue
nx = x + dx
ny = y + dy
if 0 <= nx < w and 0 <= ny < h:
step = 1.41421356 if (dx != 0 and dy != 0) else 1.0
yield nx, ny, step
def plan(
self,
costmap: np.ndarray,
origin_xy: np.ndarray,
resolution_m: float,
start_xy: Tuple[float, float],
goal_xy: Tuple[float, float],
max_expansions: int = 120000,
) -> List[Tuple[float, float]]:
cost = np.asarray(costmap, dtype=np.uint8)
if cost.ndim != 2:
return []
h, w = cost.shape
origin = np.asarray(origin_xy, dtype=np.float64)
res = float(resolution_m)
def to_grid(x: float, y: float) -> Optional[Tuple[int, int]]:
gx = int(math.floor((float(x) - origin[0]) / res))
gy = int(math.floor((float(y) - origin[1]) / res))
if gx < 0 or gy < 0 or gx >= w or gy >= h:
return None
return gx, gy
def to_world(gx: int, gy: int) -> Tuple[float, float]:
return (
float(origin[0] + (float(gx) + 0.5) * res),
float(origin[1] + (float(gy) + 0.5) * res),
)
s = to_grid(float(start_xy[0]), float(start_xy[1]))
g = to_grid(float(goal_xy[0]), float(goal_xy[1]))
if s is None or g is None:
return []
if int(cost[s[1], s[0]]) >= self.blocked_cost or int(cost[g[1], g[0]]) >= self.blocked_cost:
return []
open_heap: List[Tuple[float, float, Tuple[int, int]]] = []
heapq.heappush(open_heap, (self._heur(s, g), 0.0, s))
came_from: Dict[Tuple[int, int], Tuple[int, int]] = {}
g_score: Dict[Tuple[int, int], float] = {s: 0.0}
closed: Dict[Tuple[int, int], bool] = {}
expansions = 0
while open_heap:
_, cur_g, cur = heapq.heappop(open_heap)
if closed.get(cur, False):
continue
closed[cur] = True
if cur == g:
break
expansions += 1
if expansions >= int(max_expansions):
import sys
print(
f"[A*] expansion limit {max_expansions} reached; no path found "
f"from {start_xy} to {goal_xy}",
file=sys.stderr,
)
return []
cx, cy = cur
for nx, ny, step in self._neighbors(cx, cy, w, h):
if int(cost[ny, nx]) >= self.blocked_cost:
continue
ng = float(cur_g + step + (float(cost[ny, nx]) / 255.0) * 2.0)
node = (nx, ny)
if ng < float(g_score.get(node, 1e18)):
g_score[node] = ng
came_from[node] = cur
f = ng + self._heur(node, g)
heapq.heappush(open_heap, (f, ng, node))
if g not in came_from and g != s:
return []
path_cells: List[Tuple[int, int]] = [g]
cur = g
visited_back: set = {g}
while cur != s:
nxt = came_from.get(cur)
if nxt is None or nxt in visited_back:
return []
visited_back.add(nxt)
cur = nxt
path_cells.append(cur)
path_cells.reverse()
return [to_world(px, py) for (px, py) in path_cells]
@dataclass
class LocalPlannerConfig:
lookahead_m: float = 0.8
max_linear_mps: float = 0.6
max_angular_rps: float = 1.3
goal_tolerance_m: float = 0.30
collision_probe_m: float = 0.6
@staticmethod
def from_dict(d: Dict[str, Any] | None) -> "LocalPlannerConfig":
src = d or {}
return LocalPlannerConfig(
lookahead_m=max(0.1, _safe_float(src.get("lookahead_m", 0.8), 0.8)),
max_linear_mps=max(0.05, _safe_float(src.get("max_linear_mps", 0.6), 0.6)),
max_angular_rps=max(0.1, _safe_float(src.get("max_angular_rps", 1.3), 1.3)),
goal_tolerance_m=max(0.05, _safe_float(src.get("goal_tolerance_m", 0.30), 0.30)),
collision_probe_m=max(0.1, _safe_float(src.get("collision_probe_m", 0.6), 0.6)),
)
class LocalReactivePlanner:
def __init__(self, cfg: LocalPlannerConfig):
self.cfg = cfg
@staticmethod
def _yaw_from_pose(pose: np.ndarray) -> float:
# ZYX yaw from rotation matrix
return float(math.atan2(float(pose[1, 0]), float(pose[0, 0])))
@staticmethod
def _wrap_pi(a: float) -> float:
while a > math.pi:
a -= 2.0 * math.pi
while a < -math.pi:
a += 2.0 * math.pi
return a
def compute_command(
self,
pose_world: np.ndarray,
path_world: List[Tuple[float, float]],
runtime: LiveCostmapRuntime,
) -> Dict[str, Any]:
cmd = {
"linear_mps": 0.0,
"angular_rps": 0.0,
"goal_reached": False,
"blocked": False,
}
if pose_world is None or np.asarray(pose_world).shape != (4, 4):
return cmd
if not path_world:
return cmd
p = np.asarray(pose_world, dtype=np.float64)
x = float(p[0, 3])
y = float(p[1, 3])
yaw = self._yaw_from_pose(p)
goal_x, goal_y = float(path_world[-1][0]), float(path_world[-1][1])
dist_goal = float(math.hypot(goal_x - x, goal_y - y))
if dist_goal <= float(self.cfg.goal_tolerance_m):
cmd["goal_reached"] = True
return cmd
target = path_world[-1]
for wx, wy in path_world:
d = float(math.hypot(float(wx) - x, float(wy) - y))
if d >= float(self.cfg.lookahead_m):
target = (float(wx), float(wy))
break
dx = float(target[0] - x)
dy = float(target[1] - y)
target_yaw = float(math.atan2(dy, dx))
yaw_err = self._wrap_pi(target_yaw - yaw)
ang = float(np.clip(1.5 * yaw_err, -float(self.cfg.max_angular_rps), float(self.cfg.max_angular_rps)))
lin_scale = max(0.0, 1.0 - min(1.0, abs(yaw_err) / math.pi))
lin = float(self.cfg.max_linear_mps) * lin_scale
# Probe short horizon for immediate collision.
n_probe = 6
step = float(self.cfg.collision_probe_m) / float(n_probe)
for i in range(1, n_probe + 1):
px = x + float(i) * step * math.cos(yaw)
py = y + float(i) * step * math.sin(yaw)
if runtime.cost_at_world(px, py) >= int(runtime.cfg.blocked_cost):
lin = 0.0
cmd["blocked"] = True
break
cmd["linear_mps"] = float(lin)
cmd["angular_rps"] = float(ang)
return cmd