Marcus/Navigation/marcus_odometry.py

820 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
marcus_odometry.py — Precise Distance & Position Module
=========================================================
Project : Marcus | YS Lootah Technology
Hardware : Unitree G1 EDU + Jetson Orin NX
Purpose : Track robot position precisely and execute accurate movements.
Two sources (priority order):
1. ROS2 /dog_odom — joint encoder data → ±2cm accuracy
2. Dead reckoning — velocity × time integration → ±10cm accuracy
Import in marcus_brain.py
--------------------------
from marcus_odometry import Odometry
odom = Odometry()
odom.start(zmq_sock=sock) ← pass existing ZMQ socket
odom.walk_distance(1.0) ← walk exactly 1 meter
odom.turn_degrees(90) ← turn exactly 90° left
odom.turn_degrees(-90) ← turn exactly 90° right
odom.get_position() ← {x, y, heading, source}
odom.reset() ← zero at current location
Standalone test
---------------
conda run -n marcus python3 Navigation/marcus_odometry.py
Date : April 2026
"""
import os
import sys
import time
import math
import json
import threading
import zmq
# ══════════════════════════════════════════════════════════════════════════════
# CONFIGURATION — loaded from Config/config_Odometry.json
# ══════════════════════════════════════════════════════════════════════════════
_PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_DIR not in sys.path:
sys.path.insert(0, _PROJECT_DIR)
try:
from Core.config_loader import load_config
_cfg = load_config("Odometry")
except Exception:
_cfg = {}
ZMQ_HOST = str(_cfg.get("zmq_host", "127.0.0.1"))
ZMQ_PORT = int(_cfg.get("zmq_port", 5556))
ROS2_ODOM_TOPIC = str(_cfg.get("ros2_odom_topic", "/dog_odom"))
ODOM_INTERFACE = str(_cfg.get("odom_interface", "eth0"))
# Movement defaults
DEFAULT_WALK_SPEED = float(_cfg.get("default_walk_speed", 0.25)) # m/s — slower = more accurate
DEFAULT_TURN_SPEED = float(_cfg.get("default_turn_speed", 0.25)) # rad/s
DIST_TOLERANCE = float(_cfg.get("dist_tolerance", 0.05)) # meters — stop within 5cm
ANGLE_TOLERANCE = float(_cfg.get("angle_tolerance", 2.0)) # degrees — stop within 2°
SAFETY_TIMEOUT_MULT = float(_cfg.get("safety_timeout_mult", 3.0)) # timeout = (distance/speed) × this
ODOM_STALE_WARN = 1.0 # warn if odom not updated for this many seconds (internal only)
# Dead reckoning
DR_UPDATE_HZ = int(_cfg.get("dr_update_hz", 20)) # integration rate
# ══════════════════════════════════════════════════════════════════════════════
# ODOMETRY CLASS
# ══════════════════════════════════════════════════════════════════════════════
class Odometry:
"""
Precise robot position tracking and movement control.
Thread-safe. Start with odom.start(zmq_sock).
If ROS2 is not available, falls back to dead reckoning automatically.
Usage:
odom = Odometry()
odom.start(zmq_sock=sock)
odom.walk_distance(1.0)
odom.turn_degrees(90)
pos = odom.get_position()
"""
def __init__(self):
self._lock = threading.Lock()
self._alive = [False]
self._sock = None # ZMQ PUB socket (shared from marcus_brain)
# Position state
self._x = 0.0 # meters from start
self._y = 0.0 # meters from start
self._heading = 0.0 # degrees — 0=start, +left, -right
self._vx = 0.0 # current velocity (for dead reckoning)
self._vy = 0.0
self._vyaw = 0.0
self._source = "none" # "ros2" or "dead_reckoning"
self._last_update = 0.0 # timestamp of last position update
# Start position snapshot (for return_to_start)
self._start_x = 0.0
self._start_y = 0.0
self._start_heading = 0.0
# Running flag
self._running = False
# ── STARTUP ───────────────────────────────────────────────────────────────
def start(self, zmq_sock=None) -> bool:
"""
Start odometry tracking in background thread.
Args:
zmq_sock : existing ZMQ PUB socket from marcus_brain.py.
If None, creates its own socket on ZMQ_PORT.
Passing the shared socket avoids port conflicts.
Returns:
True always — runs in either ROS2 or dead reckoning mode.
Edge cases:
- ZMQ port already in use → catches and warns
- Called twice → second call is no-op with warning
- ROS2 not sourced → falls back to dead reckoning automatically
"""
if self._running:
print(" [Odom] ⚠️ Already running — ignoring duplicate start()")
return True
# Wire up ZMQ socket
if zmq_sock is not None:
self._sock = zmq_sock
else:
self._init_own_zmq()
if self._sock is None:
print(" [Odom] ⚠️ No ZMQ socket — movement functions disabled")
return False
# Reset position
self._reset_state()
self._alive[0] = True
self._running = True
# Try ROS2 first
ros2_ok = self._try_start_ros2()
if not ros2_ok:
# Fall back to dead reckoning
threading.Thread(
target=self._dead_reckoning_loop,
daemon=True,
name="odom-dr"
).start()
print(" [Odom] Dead reckoning mode — accuracy ±10cm")
else:
print(" [Odom] ROS2 /dog_odom mode — accuracy ±2cm")
time.sleep(0.5)
return True
def stop(self):
"""Stop odometry tracking."""
self._alive[0] = False
self._running = False
print(" [Odom] Stopped")
def _init_own_zmq(self):
"""Create own ZMQ socket if no shared socket provided."""
try:
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind(f"tcp://{ZMQ_HOST}:{ZMQ_PORT}")
self._sock = sock
time.sleep(0.5)
except zmq.ZMQError as e:
if "Address already in use" in str(e):
print(f" [Odom] ⚠️ ZMQ port {ZMQ_PORT} already in use.")
print(" [Odom] Pass zmq_sock=sock from marcus_brain.py instead.")
else:
print(f" [Odom] ⚠️ ZMQ error: {e}")
self._sock = None
def _reset_state(self):
"""Reset position to zero."""
with self._lock:
self._x = self._y = self._heading = 0.0
self._vx = self._vy = self._vyaw = 0.0
self._start_x = self._start_y = self._start_heading = 0.0
self._last_update = time.time()
# ── ROS2 ODOMETRY ─────────────────────────────────────────────────────────
def _try_start_ros2(self) -> bool:
"""
Try to subscribe to /dog_odom via ROS2.
Returns True if ROS2 is available, False otherwise.
Edge cases:
- ROS2 not installed → ImportError caught
- ROS2 not sourced → caught
- Topic not publishing → timeout after 2s
"""
try:
import rclpy
# Skip ROS2 on this system — DDS init causes bad_alloc
# due to memory conflict with Holosoma + YOLO
# Dead reckoning fallback is used instead
return False
rclpy.init()
except Exception:
return False
# Quick check: can we create a node?
try:
from rclpy.node import Node
from nav_msgs.msg import Odometry as OdomMsg
class _OdomNode(Node):
def __init__(self, parent):
super().__init__("marcus_odom")
self._parent = parent
self.create_subscription(
OdomMsg, ROS2_ODOM_TOPIC,
self._callback, 10
)
def _callback(self, msg):
pos = msg.pose.pose.position
ori = msg.pose.pose.orientation
twist = msg.twist.twist
# Quaternion → yaw
siny = 2.0 * (ori.w * ori.z + ori.x * ori.y)
cosy = 1.0 - 2.0 * (ori.y**2 + ori.z**2)
yaw = math.degrees(math.atan2(siny, cosy))
with self._parent._lock:
self._parent._x = pos.x
self._parent._y = pos.y
self._parent._heading = yaw
self._parent._vx = twist.linear.x
self._parent._vy = twist.linear.y
self._parent._vyaw = twist.angular.z
self._parent._source = "ros2"
self._parent._last_update = time.time()
node = _OdomNode(self)
def _spin():
while self._alive[0]:
rclpy.spin_once(node, timeout_sec=0.02)
# Start spin thread and wait 2s for first message
t = threading.Thread(target=_spin, daemon=True, name="odom-ros2")
t.start()
deadline = time.time() + 2.0
while time.time() < deadline:
time.sleep(0.1)
if self._source == "ros2":
return True
# No message received — ROS2 available but topic silent
print(" [Odom] ⚠️ /dog_odom topic not publishing — using dead reckoning")
return False
except Exception as e:
return False
# ── DEAD RECKONING ────────────────────────────────────────────────────────
def _dead_reckoning_loop(self):
"""
Estimate position by integrating velocity × time.
Updates position at DR_UPDATE_HZ.
Edge cases:
- Very large dt (system sleep) → capped at 0.5s to avoid jumps
- Heading wrap-around at 360°/0° → handled by modulo
"""
dt_target = 1.0 / DR_UPDATE_HZ
t_last = time.time()
while self._alive[0]:
time.sleep(dt_target)
now = time.time()
dt = min(now - t_last, 0.5) # cap at 0.5s to prevent position jumps
t_last = now
with self._lock:
heading_rad = math.radians(self._heading)
vx = self._vx
vy = self._vy
vyaw = self._vyaw
# World-frame position update
self._x += (vx * math.cos(heading_rad) - vy * math.sin(heading_rad)) * dt
self._y += (vx * math.sin(heading_rad) + vy * math.cos(heading_rad)) * dt
self._heading = (self._heading + math.degrees(vyaw) * dt) % 360
self._source = "dead_reckoning"
self._last_update = now
# ── VELOCITY CONTROL ──────────────────────────────────────────────────────
def _send_vel(self, vx: float = 0.0, vy: float = 0.0, vyaw: float = 0.0):
"""Send velocity to Holosoma. Also updates dead reckoning velocity state."""
if self._sock:
try:
self._sock.send_string(json.dumps({"vel": {"vx": vx, "vy": vy, "vyaw": vyaw}}))
except Exception as e:
print(f" [Odom] ⚠️ ZMQ send error: {e}")
# Update dead reckoning state
with self._lock:
self._vx = vx
self._vy = vy
self._vyaw = vyaw
def _gradual_stop(self, steps: int = 20):
"""Smooth stop — 20 zero-velocity messages over ~1 second."""
for _ in range(steps):
self._send_vel(0.0, 0.0, 0.0)
time.sleep(0.05)
def _check_stale(self):
"""Warn if odometry hasn't been updated recently."""
if time.time() - self._last_update > ODOM_STALE_WARN:
print(f" [Odom] ⚠️ Position data may be stale (>{ODOM_STALE_WARN:.0f}s since last update)")
# ── PUBLIC MOVEMENT API ───────────────────────────────────────────────────
def get_position(self) -> dict:
"""
Return current robot position.
Returns:
{"x": float, "y": float, "heading": float,
"source": "ros2"|"dead_reckoning"|"none"}
x, y in meters from start position.
heading in degrees (0 = start direction, +left, -right, 0-360).
"""
with self._lock:
return {
"x": round(self._x, 4),
"y": round(self._y, 4),
"heading": round(self._heading, 2),
"source": self._source,
}
def get_distance_from_start(self) -> float:
"""Return Euclidean distance in meters from the starting position."""
with self._lock:
dx = self._x - self._start_x
dy = self._y - self._start_y
return round(math.sqrt(dx**2 + dy**2), 3)
def reset(self):
"""
Reset odometry to zero at current physical location.
Use when robot is at a known reference point.
"""
with self._lock:
self._x = self._y = self._heading = 0.0
self._start_x = self._start_y = self._start_heading = 0.0
print(" [Odom] ✅ Reset to (0, 0, 0°)")
def is_running(self) -> bool:
return self._running
def status_str(self) -> str:
"""Return formatted status string for display."""
pos = self.get_position()
return (f"x={pos['x']:.2f}m y={pos['y']:.2f}m "
f"heading={pos['heading']:.1f}° source={pos['source']}")
def walk_distance(self, meters: float, speed: float = None,
direction: str = "forward") -> bool:
"""
Walk a precise distance using odometry feedback.
Args:
meters : distance in meters (must be > 0)
speed : m/s (default DEFAULT_WALK_SPEED = 0.25)
direction : "forward" or "backward"
Returns:
True if target reached within tolerance.
False if timeout, invalid input, or odometry not running.
Edge cases:
- meters <= 0 → rejected
- speed <= 0 or > 0.5 → clamped with warning
- Odometry not running → time-based fallback with warning
- Safety timeout = (meters / speed) × SAFETY_TIMEOUT_MULT
- Keyboard interrupt → gradual_stop then re-raise
"""
# Validate inputs
if meters <= 0:
print(f" [Odom] ⚠️ Invalid distance: {meters}m — must be positive")
return False
if speed is None:
speed = DEFAULT_WALK_SPEED
if speed <= 0:
print(f" [Odom] ⚠️ Speed must be positive — using default {DEFAULT_WALK_SPEED}")
speed = DEFAULT_WALK_SPEED
elif speed > 0.5:
print(f" [Odom] ⚠️ Speed {speed} exceeds safe limit — clamping to 0.4 m/s")
speed = 0.4
if not self._running:
print(" [Odom] ⚠️ Odometry not started — using time-based fallback")
return self._time_based_walk(meters, speed, direction)
self._check_stale()
vx = speed if direction == "forward" else -speed
with self._lock:
start_x = self._x
start_y = self._y
timeout = (meters / speed) * SAFETY_TIMEOUT_MULT
t0 = time.time()
print(f" [Odom] Walking {meters:.2f}m {direction} at {speed:.2f} m/s "
f"(timeout: {timeout:.1f}s)")
try:
while time.time() - t0 < timeout:
with self._lock:
dx = self._x - start_x
dy = self._y - start_y
covered = math.sqrt(dx**2 + dy**2)
if covered >= meters - DIST_TOLERANCE:
self._gradual_stop()
print(f" [Odom] ✅ Walked {covered:.3f}m (target {meters:.2f}m)")
return True
self._send_vel(vx=vx)
time.sleep(0.05)
except KeyboardInterrupt:
print(" [Odom] Walk interrupted by user")
self._gradual_stop()
raise
self._gradual_stop()
with self._lock:
dx = self._x - start_x
dy = self._y - start_y
covered = math.sqrt(dx**2 + dy**2)
print(f" [Odom] ⚠️ Timeout — walked {covered:.3f}m of {meters:.2f}m")
return False
def _time_based_walk(self, meters: float, speed: float,
direction: str = "forward") -> bool:
"""
Time-based fallback when odometry is not running.
Accuracy ~±10cm.
"""
print(f" [Odom] Time-based fallback: {meters:.2f}m ≈ {meters/speed:.1f}s")
vx = speed if direction == "forward" else -speed
duration = meters / speed
t0 = time.time()
try:
while time.time() - t0 < duration:
self._send_vel(vx=vx)
time.sleep(0.05)
except KeyboardInterrupt:
self._gradual_stop()
raise
self._gradual_stop()
print(f" [Odom] ✅ Time-based walk complete (~{meters:.2f}m)")
return True
def turn_degrees(self, degrees: float, speed: float = None) -> bool:
"""
Turn a precise number of degrees using odometry feedback.
Args:
degrees : degrees to turn.
Positive = left (counter-clockwise)
Negative = right (clockwise)
speed : angular speed in rad/s (default DEFAULT_TURN_SPEED)
Returns:
True if reached target within tolerance.
False if invalid input, timeout, or odometry not running.
Edge cases:
- degrees == 0 → no-op
- |degrees| > 360 → clamped to 360 with warning
- speed <= 0 → default
- speed > 0.5 → clamped
- Handles 0°/360° wrap-around correctly
- Keyboard interrupt → graceful stop
"""
if degrees == 0:
print(" [Odom] ⚠️ Turn 0° — nothing to do")
return True
if abs(degrees) > 360:
print(f" [Odom] ⚠️ Turn {degrees}° > 360 — clamping to {360 * (1 if degrees > 0 else -1)}°")
degrees = 360.0 * (1 if degrees > 0 else -1)
if speed is None:
speed = DEFAULT_TURN_SPEED
if speed <= 0:
speed = DEFAULT_TURN_SPEED
elif speed > 0.5:
print(f" [Odom] ⚠️ Turn speed clamped to 0.4 rad/s")
speed = 0.4
if not self._running:
print(" [Odom] ⚠️ Odometry not started — using time-based fallback")
return self._time_based_turn(degrees, speed)
self._check_stale()
vyaw = speed if degrees > 0 else -speed
target_abs = abs(degrees)
with self._lock:
start_heading = self._heading
timeout = (target_abs / math.degrees(speed)) * SAFETY_TIMEOUT_MULT
t0 = time.time()
direction = "left" if degrees > 0 else "right"
print(f" [Odom] Turning {degrees:.1f}° {direction} at {math.degrees(speed):.1f}°/s "
f"(timeout: {timeout:.1f}s)")
try:
while time.time() - t0 < timeout:
with self._lock:
current = self._heading
# Compute angular delta — handle wrap-around
delta = abs(current - start_heading)
if delta > 180:
delta = 360 - delta
if delta >= target_abs - ANGLE_TOLERANCE:
self._gradual_stop()
print(f" [Odom] ✅ Turned {delta:.1f}° (target {target_abs:.1f}°)")
return True
self._send_vel(vyaw=vyaw)
time.sleep(0.05)
except KeyboardInterrupt:
print(" [Odom] Turn interrupted by user")
self._gradual_stop()
raise
self._gradual_stop()
print(f" [Odom] ⚠️ Timeout on turn")
return False
def _time_based_turn(self, degrees: float, speed: float) -> bool:
"""Time-based turn fallback. ~±5° accuracy."""
target_abs = abs(degrees)
vyaw = speed if degrees > 0 else -speed
duration = target_abs / math.degrees(speed)
print(f" [Odom] Time-based fallback: {degrees:.1f}° ≈ {duration:.1f}s")
t0 = time.time()
try:
while time.time() - t0 < duration:
self._send_vel(vyaw=vyaw)
time.sleep(0.05)
except KeyboardInterrupt:
self._gradual_stop()
raise
self._gradual_stop()
print(f" [Odom] ✅ Time-based turn complete (~{degrees:.1f}°)")
return True
def navigate_to(self, target_x: float, target_y: float,
target_heading: float = None,
speed: float = None) -> bool:
"""
Navigate to absolute coordinates.
Steps:
1. Rotate to face target direction
2. Walk straight-line distance to target
3. Optionally rotate to target_heading
Args:
target_x, target_y : destination in meters (same frame as odometry)
target_heading : final heading in degrees (None = don't adjust)
speed : walk speed in m/s
Edge cases:
- Already at target → skip walk, only rotate if needed
- Odometry not running → time-based fallback (imprecise)
- target_heading None → skip final rotation
"""
with self._lock:
curr_x = self._x
curr_y = self._y
curr_h = self._heading
dx = target_x - curr_x
dy = target_y - curr_y
dist = math.sqrt(dx**2 + dy**2)
print(f" [Odom] Navigate to ({target_x:.2f}, {target_y:.2f}) — "
f"dist={dist:.2f}m")
if dist < DIST_TOLERANCE:
print(" [Odom] Already at target position")
else:
# Rotate to face target
angle_to_target = math.degrees(math.atan2(dy, dx))
rotation_needed = angle_to_target - curr_h
# Normalize to -180..180
while rotation_needed > 180: rotation_needed -= 360
while rotation_needed < -180: rotation_needed += 360
if abs(rotation_needed) > ANGLE_TOLERANCE:
self.turn_degrees(rotation_needed, speed=DEFAULT_TURN_SPEED)
time.sleep(0.3)
self.walk_distance(dist, speed=speed)
time.sleep(0.3)
# Rotate to final heading if specified
if target_heading is not None:
with self._lock:
curr_h = self._heading
final_rot = target_heading - curr_h
while final_rot > 180: final_rot -= 360
while final_rot < -180: final_rot += 360
if abs(final_rot) > ANGLE_TOLERANCE:
self.turn_degrees(final_rot)
print(f" [Odom] ✅ Arrived at ({target_x:.2f}, {target_y:.2f})")
return True
def return_to_start(self, speed: float = None) -> bool:
"""
Navigate back to the position where start() was called.
Edge cases:
- Already at start → no movement
- Odometry not running → cannot navigate (warn)
"""
if not self._running:
print(" [Odom] ⚠️ Cannot return to start — odometry not running")
return False
print(" [Odom] Returning to start position...")
return self.navigate_to(
self._start_x, self._start_y,
self._start_heading, speed=speed
)
def patrol_route(self, waypoints: list,
speed: float = None,
loop: bool = False) -> bool:
"""
Walk through a list of waypoints in order.
Args:
waypoints : list of dicts with keys x, y, heading (all optional)
e.g. [{"x": 1.0, "y": 0.0, "heading": 90}]
speed : travel speed in m/s
loop : if True, return to first waypoint after last
Returns:
True if all waypoints reached.
Edge cases:
- Empty waypoints → warns
- Waypoint missing x/y → skipped with warning
- Keyboard interrupt → stops gracefully at current position
- loop=True appends waypoint[0] to end of route
"""
if not waypoints:
print(" [Odom] ⚠️ patrol_route: empty waypoints list")
return False
route = list(waypoints)
if loop:
route.append(waypoints[0])
print(f" [Odom] Patrol route: {len(route)} waypoints | loop={loop}")
try:
for i, wp in enumerate(route, 1):
if "x" not in wp or "y" not in wp:
print(f" [Odom] ⚠️ Waypoint {i} missing x/y — skipping")
continue
print(f" [Odom] Waypoint {i}/{len(route)}: ({wp['x']:.2f}, {wp['y']:.2f})")
heading = wp.get("heading", None)
self.navigate_to(wp["x"], wp["y"], heading, speed=speed)
time.sleep(0.5)
except KeyboardInterrupt:
print("\n [Odom] Patrol route interrupted by user")
self._gradual_stop()
return False
print(" [Odom] ✅ Patrol route complete")
return True
def __repr__(self):
pos = self.get_position()
return (f"Odometry(x={pos['x']:.2f}, y={pos['y']:.2f}, "
f"h={pos['heading']:.1f}°, source={pos['source']}, "
f"running={self._running})")
# ══════════════════════════════════════════════════════════════════════════════
# STANDALONE TEST
# ══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
import sys
print("Marcus Odometry — Standalone Test")
print("===================================\n")
odom = Odometry()
ok = odom.start()
time.sleep(1.5)
print(f"\nInitial state: {odom}")
print()
print("Commands:")
print(" walk <m> — walk precisely N meters")
print(" back <m> — walk backward N meters")
print(" turn <deg> — turn precisely N degrees (+left, -right)")
print(" status — current position")
print(" reset — zero position")
print(" goto <x> <y> — navigate to coordinates")
print(" home — return to start")
print(" square — walk 1m×1m square")
print(" q — quit")
print()
while True:
try:
raw = input("Odom: ").strip().lower()
except (EOFError, KeyboardInterrupt):
break
if not raw:
continue
parts = raw.split()
cmd = parts[0]
if cmd == "q":
break
elif cmd == "status":
print(f" {odom.status_str()}")
print(f" Distance from start: {odom.get_distance_from_start():.3f}m")
elif cmd == "reset":
odom.reset()
elif cmd == "walk" and len(parts) >= 2:
try:
odom.walk_distance(float(parts[1]))
except ValueError:
print(" Usage: walk <meters>")
elif cmd == "back" and len(parts) >= 2:
try:
odom.walk_distance(float(parts[1]), direction="backward")
except ValueError:
print(" Usage: back <meters>")
elif cmd == "turn" and len(parts) >= 2:
try:
odom.turn_degrees(float(parts[1]))
except ValueError:
print(" Usage: turn <degrees> (+left, -right)")
elif cmd == "goto" and len(parts) >= 3:
try:
odom.navigate_to(float(parts[1]), float(parts[2]))
except ValueError:
print(" Usage: goto <x> <y>")
elif cmd == "home":
odom.return_to_start()
elif cmd == "square":
print(" Walking 1m × 1m square...")
odom.walk_distance(1.0)
odom.turn_degrees(90)
odom.walk_distance(1.0)
odom.turn_degrees(90)
odom.walk_distance(1.0)
odom.turn_degrees(90)
odom.walk_distance(1.0)
odom.turn_degrees(90)
print(" ✅ Square complete")
else:
print(f" Unknown: {raw}")
odom.stop()
print("Odometry stopped.")