568 lines
25 KiB
Python
568 lines
25 KiB
Python
"""LocoController — manual G1 locomotion via the Unitree LocoClient (N2 Phase 1).
|
|
|
|
Ported from the proven scripts in G1_Lootah/Controller (g1_mode_controller.py,
|
|
keyboard_controller.py, hanger_boot_sequence.py). Design notes:
|
|
|
|
* **One DDS init per process.** The arm controller owns the single
|
|
`ChannelFactoryInitialize(0, nic)` (motion/arm_controller.py). This class
|
|
NEVER initialises DDS — it lazily builds its `LocoClient` /
|
|
`MotionSwitcherClient` only after `arm._initialized` is True.
|
|
* **Default DISARMED.** `_armed` starts False every boot and gates every WRITE
|
|
method. Reads (status / fsm / joints), E-STOP and disarm are ALWAYS allowed.
|
|
* **StopMove watchdog.** Continuous `Move(..., True)` never self-terminates, so a
|
|
daemon thread StopMoves if no `move()` refresh arrives within
|
|
`watchdog_timeout_sec`. The frontend re-sends setpoints at ~10 Hz, so a tab
|
|
close / network drop trips the watchdog within the timeout.
|
|
* **Velocity caps.** Symmetric clamp on vx/vy/vyaw — Walk 0.6, Run 1.2.
|
|
* **Allow-anytime-warn.** move/step never hard-block on FSM; if not walk-ready
|
|
they still execute but return a `warning`.
|
|
* **Sim fallback.** When `unitree_sdk2py` is absent (workstation), every write
|
|
returns `{"simulated": True}` (never raises) so the whole UI is testable.
|
|
|
|
SDK facts confirmed from source — do not "fix" them:
|
|
* `LocoClient.Move(vx, vy, vyaw, True)` — the continuous-mode kwarg is misspelled
|
|
`continous_move` (one n); we pass it POSITIONALLY to avoid a TypeError.
|
|
* `LocoClient` has NO StandUp()/Squat() — use SetFsmId(4)/SetFsmId(2).
|
|
* FSM id / mode are read via the private RPC `bot._Call(7001/7002, "{}")`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
from typing import Any, Optional
|
|
|
|
from Project.Sanad.core.config_loader import section as _cfg_section
|
|
from Project.Sanad.core.logger import get_logger
|
|
|
|
log = get_logger("loco_controller")
|
|
|
|
# -- SDK import (optional) -----------------------------------------------------
|
|
try:
|
|
from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient
|
|
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import (
|
|
MotionSwitcherClient,
|
|
)
|
|
_HAS_SDK = True
|
|
except ImportError:
|
|
LocoClient = None
|
|
MotionSwitcherClient = None
|
|
_HAS_SDK = False
|
|
log.warning("Unitree SDK not available — LocoController in simulation mode")
|
|
|
|
# LocoClient general RPC api-ids for FSM read-back (stable across SDK builds).
|
|
ROBOT_API_ID_LOCO_GET_FSM_ID = 7001
|
|
ROBOT_API_ID_LOCO_GET_FSM_MODE = 7002
|
|
|
|
# G1 29-DoF joint names for indices 12-28 (0-11 legs, 12-14 waist, 15-21 left
|
|
# arm, 22-28 right arm). Used by the Diagnostics joint read-out.
|
|
JOINT_NAMES = {
|
|
12: "WAIST_YAW", 13: "WAIST_ROLL", 14: "WAIST_PITCH",
|
|
15: "L_SHOULDER_PITCH", 16: "L_SHOULDER_ROLL", 17: "L_SHOULDER_YAW",
|
|
18: "L_ELBOW", 19: "L_WRIST_ROLL", 20: "L_WRIST_PITCH", 21: "L_WRIST_YAW",
|
|
22: "R_SHOULDER_PITCH", 23: "R_SHOULDER_ROLL", 24: "R_SHOULDER_YAW",
|
|
25: "R_ELBOW", 26: "R_WRIST_ROLL", 27: "R_WRIST_PITCH", 28: "R_WRIST_YAW",
|
|
}
|
|
|
|
# Discrete step pad — (vx, vy, vyaw) sign per direction; magnitude is
|
|
# step_speed_frac * cap_walk (a gentle single step).
|
|
_STEP_DIRS = {
|
|
"forward": (1.0, 0.0, 0.0),
|
|
"backward": (-1.0, 0.0, 0.0),
|
|
"slide_left": (0.0, 1.0, 0.0),
|
|
"slide_right": (0.0, -1.0, 0.0),
|
|
"rotate_left": (0.0, 0.0, 1.0),
|
|
"rotate_right": (0.0, 0.0, -1.0),
|
|
}
|
|
|
|
_POSTURES = (
|
|
"zero_torque", "damp", "stand_up", "squat", "sit",
|
|
"low_stand", "high_stand", "lie_to_stand",
|
|
)
|
|
|
|
|
|
class LocoController:
|
|
"""Thread-safe manual locomotion control with a simulation fallback."""
|
|
|
|
def __init__(self, arm=None):
|
|
self._arm = arm # shared ArmController (owns the ONE DDS init)
|
|
self._bot = None # LocoClient (lazy)
|
|
self._msc = None # MotionSwitcherClient (lazy)
|
|
self._lc_ready = False
|
|
self._lock = threading.RLock() # serialise all loco client WRITE calls
|
|
self._armed = False # in-memory MANUAL gate — OFF every boot
|
|
|
|
self._cur_v = (0.0, 0.0, 0.0) # last commanded (vx, vy, vyaw)
|
|
self._teleop_active = False
|
|
self._last_msc_mode: Optional[str] = None
|
|
|
|
# watchdog
|
|
self._last_move_ts = 0.0
|
|
self._wd_thread: Optional[threading.Thread] = None
|
|
self._wd_stop = threading.Event()
|
|
self._wd_stop.set() # not running until armed
|
|
# Monotonic stop-generation counter, bumped under _lock by
|
|
# estop/stop/disarm. move()/step()/prep_mode() capture it at start and
|
|
# bail the instant it changes — so E-STOP preempts an in-flight motion
|
|
# immediately AND can never be silently "un-cancelled" by a concurrent
|
|
# command (a lock-free Event clear() could; an int compare under the
|
|
# lock cannot).
|
|
self._stop_gen = 0
|
|
# Serializes the discrete blocking operations (step/prep_mode) so two
|
|
# can't overlap and interleave Move commands. Continuous teleop move()
|
|
# is intentionally NOT guarded by this.
|
|
self._discrete_busy = False
|
|
|
|
cfg = _cfg_section("motion", "loco_controller")
|
|
self._cap_walk = float(cfg.get("cap_walk", 0.6))
|
|
self._cap_run = float(cfg.get("cap_run", 1.2))
|
|
self._lin_step = float(cfg.get("lin_step", 0.05))
|
|
self._ang_step = float(cfg.get("ang_step", 0.2))
|
|
self._wd_timeout = float(cfg.get("watchdog_timeout_sec", 0.5))
|
|
self._block_window = float(cfg.get("arm_block_window_sec", 1.5))
|
|
self._step_dur = float(cfg.get("step_duration_sec", 0.6))
|
|
self._step_frac = float(cfg.get("step_speed_frac", 0.5))
|
|
self._loco_timeout = float(cfg.get("loco_timeout_sec", 10.0))
|
|
self._msc_timeout = float(cfg.get("msc_timeout_sec", 5.0))
|
|
|
|
# ── client lifecycle ─────────────────────────────────────────────────────
|
|
|
|
def _ensure_client(self) -> bool:
|
|
"""Lazily build LocoClient + MotionSwitcherClient. Returns readiness.
|
|
|
|
Never initialises DDS — requires the shared arm to have already run the
|
|
single ChannelFactoryInitialize.
|
|
"""
|
|
if not _HAS_SDK:
|
|
return False
|
|
if self._lc_ready:
|
|
return True
|
|
if self._arm is None or not getattr(self._arm, "_initialized", False):
|
|
return False
|
|
with self._lock:
|
|
if self._lc_ready:
|
|
return True
|
|
try:
|
|
bot = LocoClient()
|
|
bot.SetTimeout(self._loco_timeout)
|
|
bot.Init()
|
|
msc = MotionSwitcherClient()
|
|
msc.SetTimeout(self._msc_timeout)
|
|
msc.Init()
|
|
self._bot = bot
|
|
self._msc = msc
|
|
self._lc_ready = True
|
|
log.info("LocoClient + MotionSwitcherClient ready")
|
|
except Exception as exc:
|
|
log.error("LocoClient init failed: %s", exc)
|
|
self._lc_ready = False
|
|
return self._lc_ready
|
|
|
|
def _safe_call(self, name: str, fn, *a, **kw):
|
|
try:
|
|
return True, fn(*a, **kw)
|
|
except Exception as exc:
|
|
log.error("%s failed: %s", name, exc)
|
|
return False, None
|
|
|
|
def _rpc_get_int(self, api_id: int):
|
|
bot = self._bot
|
|
if bot is None:
|
|
return None
|
|
try:
|
|
code, data = bot._Call(api_id, "{}")
|
|
if code == 0 and data:
|
|
return json.loads(data).get("data")
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
@staticmethod
|
|
def _clamp(v: float, cap: float) -> float:
|
|
return max(-cap, min(cap, float(v)))
|
|
|
|
# ── FSM / readiness ──────────────────────────────────────────────────────
|
|
|
|
def fsm_id(self):
|
|
return self._rpc_get_int(ROBOT_API_ID_LOCO_GET_FSM_ID)
|
|
|
|
def fsm_mode(self):
|
|
return self._rpc_get_int(ROBOT_API_ID_LOCO_GET_FSM_MODE)
|
|
|
|
def _walk_ready_warning(self) -> Optional[str]:
|
|
"""allow-anytime-warn: None when ready, else a human message."""
|
|
if not self._lc_ready:
|
|
return None
|
|
fid = self.fsm_id()
|
|
fmode = self.fsm_mode()
|
|
if fid == 200 and fmode not in (None, 2):
|
|
return None
|
|
return (f"Robot not in walk-ready FSM (id={fid}, mode={fmode}). "
|
|
f"Command sent anyway.")
|
|
|
|
# ── arm flag + watchdog ──────────────────────────────────────────────────
|
|
|
|
def is_armed(self) -> bool:
|
|
return self._armed
|
|
|
|
def movement_active(self) -> bool:
|
|
"""True when the robot may be walking: manual armed, teleop active, OR a
|
|
move/step issued within the block window. Used as the arm's motion-block
|
|
predicate so the arm never replays while the robot is (or just was)
|
|
moving — regardless of whether the MANUAL gate or the GEMINI gate
|
|
(Phase 3 voice dispatch, which calls move/step directly) triggered it."""
|
|
if self._armed or self._teleop_active:
|
|
return True
|
|
return (time.monotonic() - self._last_move_ts) < self._block_window
|
|
|
|
def arm_movement(self) -> dict:
|
|
"""Unlock manual control. Cancels any in-flight arm motion first so the
|
|
arm and locomotion are never active simultaneously (movement wins)."""
|
|
try:
|
|
if self._arm is not None and getattr(self._arm, "is_busy", False):
|
|
log.info("arming movement — cancelling in-flight arm motion")
|
|
self._arm.cancel()
|
|
except Exception:
|
|
log.exception("arm.cancel() on arm_movement failed")
|
|
with self._lock:
|
|
self._armed = True
|
|
self._start_watchdog()
|
|
log.info("movement ARMED")
|
|
return {"ok": True, "armed": True}
|
|
|
|
def disarm_movement(self) -> dict:
|
|
with self._lock:
|
|
self._stop_gen += 1 # break any in-flight step/prep/move
|
|
self._armed = False
|
|
self._teleop_active = False
|
|
self._wd_stop.set()
|
|
try:
|
|
self._raw_stop()
|
|
except Exception:
|
|
log.exception("StopMove on disarm failed")
|
|
log.info("movement DISARMED")
|
|
return {"ok": True, "armed": False}
|
|
|
|
def _start_watchdog(self):
|
|
self._wd_stop.clear()
|
|
if self._wd_thread is None or not self._wd_thread.is_alive():
|
|
self._wd_thread = threading.Thread(
|
|
target=self._watchdog_loop, daemon=True, name="loco-watchdog")
|
|
self._wd_thread.start()
|
|
|
|
def _watchdog_loop(self):
|
|
period = max(0.02, min(0.1, self._wd_timeout / 2.0))
|
|
while not self._wd_stop.is_set():
|
|
fire = False
|
|
# Read-and-decide under the lock (atomic check-then-act); the actual
|
|
# StopMove runs after release so the critical section stays tiny.
|
|
with self._lock:
|
|
if self._teleop_active and (time.monotonic() - self._last_move_ts) > self._wd_timeout:
|
|
self._teleop_active = False
|
|
fire = True
|
|
if fire:
|
|
log.warning("watchdog: teleop setpoint stale (>%.2fs) — StopMove",
|
|
self._wd_timeout)
|
|
try:
|
|
self._raw_stop()
|
|
except Exception:
|
|
log.exception("watchdog StopMove failed")
|
|
self._wd_stop.wait(period)
|
|
|
|
def _raw_stop(self) -> bool:
|
|
"""Issue StopMove if the client is up; no-op in sim. Lock-light."""
|
|
if not self._lc_ready or self._bot is None:
|
|
return False
|
|
with self._lock:
|
|
ok, _ = self._safe_call("StopMove", self._bot.StopMove)
|
|
return ok
|
|
|
|
# ── movement ─────────────────────────────────────────────────────────────
|
|
|
|
def move(self, vx: float, vy: float, vyaw: float, run: bool = False) -> dict:
|
|
cap = self._cap_run if run else self._cap_walk
|
|
cvx, cvy, cvyaw = self._clamp(vx, cap), self._clamp(vy, cap), self._clamp(vyaw, cap)
|
|
capped = (cvx, cvy, cvyaw) != (float(vx), float(vy), float(vyaw))
|
|
warning = self._walk_ready_warning()
|
|
sent = {"vx": cvx, "vy": cvy, "vyaw": cvyaw}
|
|
|
|
with self._lock:
|
|
my_gen = self._stop_gen # capture under lock
|
|
|
|
if not self._ensure_client():
|
|
with self._lock: # sim: record intent for UI/watchdog
|
|
self._cur_v = (cvx, cvy, cvyaw)
|
|
self._last_move_ts = time.monotonic()
|
|
self._teleop_active = True
|
|
self._start_watchdog()
|
|
return {"ok": True, "sent": sent, "capped": capped,
|
|
"warning": warning, "simulated": True}
|
|
with self._lock:
|
|
# If an E-STOP / stop / disarm landed since we captured my_gen, do NOT
|
|
# (re)command velocity — and do NOT stamp the motion flags (so a
|
|
# cancelled tick doesn't extend the arm-block window).
|
|
if self._stop_gen != my_gen:
|
|
return {"ok": False, "cancelled": True, "sent": sent,
|
|
"capped": capped, "warning": warning, "simulated": False}
|
|
self._cur_v = (cvx, cvy, cvyaw)
|
|
self._last_move_ts = time.monotonic()
|
|
self._teleop_active = True
|
|
self._safe_call("SetBalanceMode", self._bot.SetBalanceMode, 1)
|
|
ok, _ = self._safe_call("Move", self._bot.Move, cvx, cvy, cvyaw, True)
|
|
self._start_watchdog()
|
|
return {"ok": bool(ok), "sent": sent, "capped": capped,
|
|
"warning": warning, "simulated": False}
|
|
|
|
def stop_move(self) -> dict:
|
|
"""Halt translation/rotation. Allowed even when disarmed."""
|
|
with self._lock:
|
|
self._stop_gen += 1
|
|
self._teleop_active = False
|
|
if not self._ensure_client():
|
|
return {"ok": True, "simulated": True}
|
|
ok = self._raw_stop()
|
|
return {"ok": bool(ok), "simulated": False}
|
|
|
|
def estop(self) -> dict:
|
|
"""Emergency stop = StopMove only (no Damp / FSM change → keeps posture).
|
|
ALWAYS allowed, even disarmed and in sim. Bumps the stop generation so any
|
|
in-flight move()/step()/prep_mode() bails immediately (no lock wait)."""
|
|
with self._lock:
|
|
self._stop_gen += 1
|
|
self._teleop_active = False
|
|
self._cur_v = (0.0, 0.0, 0.0)
|
|
if not self._ensure_client():
|
|
log.warning("E-STOP (sim)")
|
|
return {"ok": True, "simulated": True}
|
|
ok = self._raw_stop()
|
|
log.warning("E-STOP — StopMove issued")
|
|
return {"ok": bool(ok), "simulated": False}
|
|
|
|
def step(self, direction: str) -> dict:
|
|
"""Discrete one-step pad: Move for step_duration then StopMove.
|
|
Blocking (~step_duration); call via asyncio.to_thread from the route.
|
|
|
|
The sleep loop does NOT hold self._lock, so E-STOP / StopMove (which take
|
|
the lock briefly) preempt it immediately; the loop also bails the moment
|
|
the stop generation changes."""
|
|
if direction not in _STEP_DIRS:
|
|
return {"ok": False, "reason": f"unknown direction: {direction}"}
|
|
sx, sy, syaw = _STEP_DIRS[direction]
|
|
k = self._cap_walk * self._step_frac
|
|
vx, vy, vyaw = sx * k, sy * k, syaw * k
|
|
warning = self._walk_ready_warning()
|
|
with self._lock:
|
|
if self._discrete_busy:
|
|
return {"ok": False, "dir": direction, "reason": "busy",
|
|
"warning": warning, "simulated": not self._lc_ready}
|
|
self._discrete_busy = True
|
|
my_gen = self._stop_gen
|
|
self._last_move_ts = time.monotonic()
|
|
self._teleop_active = True
|
|
self._start_watchdog()
|
|
if not self._ensure_client():
|
|
with self._lock:
|
|
self._teleop_active = False
|
|
self._discrete_busy = False
|
|
return {"ok": True, "dir": direction, "warning": warning, "simulated": True}
|
|
try:
|
|
with self._lock:
|
|
if self._stop_gen != my_gen: # stopped before we began
|
|
return {"ok": False, "dir": direction, "cancelled": True,
|
|
"warning": warning, "simulated": False}
|
|
self._safe_call("SetBalanceMode", self._bot.SetBalanceMode, 1)
|
|
self._safe_call("Move", self._bot.Move, vx, vy, vyaw, True)
|
|
t_end = time.monotonic() + self._step_dur
|
|
while time.monotonic() < t_end:
|
|
if self._stop_gen != my_gen:
|
|
break
|
|
with self._lock:
|
|
self._last_move_ts = time.monotonic() # keep watchdog fed
|
|
time.sleep(0.05)
|
|
finally:
|
|
with self._lock:
|
|
self._safe_call("StopMove", self._bot.StopMove)
|
|
self._teleop_active = False
|
|
self._discrete_busy = False
|
|
return {"ok": True, "dir": direction, "warning": warning, "simulated": False}
|
|
|
|
# ── postures / modes ─────────────────────────────────────────────────────
|
|
|
|
def prep_mode(self) -> dict:
|
|
"""PREP — StopMove → Damp → StandUp(FSM4) → height ramp → BalanceStand(0).
|
|
Exact order from g1_mode_controller.prep_mode, minus the blocking input().
|
|
Blocking (~1s); call via asyncio.to_thread."""
|
|
if not self._ensure_client():
|
|
return {"ok": True, "mode": "prep", "simulated": True}
|
|
with self._lock:
|
|
if self._discrete_busy:
|
|
return {"ok": False, "mode": "prep", "reason": "busy", "simulated": False}
|
|
self._discrete_busy = True
|
|
my_gen = self._stop_gen
|
|
self._safe_call("StopMove", self._bot.StopMove)
|
|
self._safe_call("Damp", self._bot.Damp)
|
|
self._safe_call("SetFsmId(4)", self._bot.SetFsmId, 4)
|
|
try:
|
|
# Height ramp OUTSIDE the lock so E-STOP can preempt at any time.
|
|
h = 0.02
|
|
while h <= 0.5 + 1e-9:
|
|
if self._stop_gen != my_gen:
|
|
log.warning("PREP cancelled (E-STOP)")
|
|
return {"ok": False, "mode": "prep", "cancelled": True, "simulated": False}
|
|
with self._lock:
|
|
self._safe_call("SetStandHeight", self._bot.SetStandHeight, round(h, 3))
|
|
time.sleep(0.03)
|
|
h += 0.02
|
|
with self._lock:
|
|
self._safe_call("BalanceStand", self._bot.BalanceStand, 0)
|
|
self._safe_call("SetStandHeight", self._bot.SetStandHeight, 0.22)
|
|
finally:
|
|
with self._lock:
|
|
self._discrete_busy = False
|
|
log.info("PREP complete")
|
|
return {"ok": True, "mode": "prep", "simulated": False}
|
|
|
|
def ready_start_mode(self) -> dict:
|
|
"""READY = PREP then Start (FSM 200 / balance engaged)."""
|
|
self.prep_mode()
|
|
if not self._ensure_client():
|
|
return {"ok": True, "mode": "ready", "simulated": True}
|
|
with self._lock:
|
|
if hasattr(self._bot, "Start"):
|
|
ok, _ = self._safe_call("Start", self._bot.Start)
|
|
else:
|
|
ok, _ = self._safe_call("SetFsmId(200)", self._bot.SetFsmId, 200)
|
|
log.info("READY/START complete")
|
|
return {"ok": bool(ok), "mode": "ready", "simulated": False}
|
|
|
|
def posture(self, name: str) -> dict:
|
|
if name not in _POSTURES:
|
|
return {"ok": False, "reason": f"unknown posture: {name}"}
|
|
if not self._ensure_client():
|
|
return {"ok": True, "posture": name, "simulated": True}
|
|
bot = self._bot
|
|
with self._lock:
|
|
if name == "zero_torque":
|
|
ok, _ = self._safe_call("ZeroTorque", bot.ZeroTorque)
|
|
elif name == "damp":
|
|
ok, _ = self._safe_call("Damp", bot.Damp)
|
|
elif name == "stand_up":
|
|
ok, _ = self._safe_call("SetFsmId(4)", bot.SetFsmId, 4)
|
|
elif name == "squat":
|
|
ok, _ = self._safe_call("SetFsmId(2)", bot.SetFsmId, 2)
|
|
elif name == "sit":
|
|
ok, _ = self._safe_call("Sit", bot.Sit)
|
|
elif name == "low_stand":
|
|
ok, _ = self._safe_call("LowStand", bot.LowStand)
|
|
elif name == "high_stand":
|
|
ok, _ = self._safe_call("HighStand", bot.HighStand)
|
|
elif name == "lie_to_stand":
|
|
if hasattr(bot, "Lie2StandUp"):
|
|
ok, _ = self._safe_call("Lie2StandUp", bot.Lie2StandUp)
|
|
else:
|
|
ok, _ = self._safe_call("SetFsmId(702)", bot.SetFsmId, 702)
|
|
else: # unreachable (guarded above)
|
|
ok = False
|
|
return {"ok": bool(ok), "posture": name, "simulated": False}
|
|
|
|
def set_balance_mode(self, mode: int) -> dict:
|
|
if not self._ensure_client():
|
|
return {"ok": True, "balance_mode": int(mode), "simulated": True}
|
|
with self._lock:
|
|
ok, _ = self._safe_call("SetBalanceMode", self._bot.SetBalanceMode, int(mode))
|
|
return {"ok": bool(ok), "balance_mode": int(mode), "simulated": False}
|
|
|
|
def set_stand_height(self, h: float) -> dict:
|
|
if not self._ensure_client():
|
|
return {"ok": True, "height": float(h), "simulated": True}
|
|
with self._lock:
|
|
ok, _ = self._safe_call("SetStandHeight", self._bot.SetStandHeight, float(h))
|
|
return {"ok": bool(ok), "height": float(h), "simulated": False}
|
|
|
|
# ── MotionSwitcher ───────────────────────────────────────────────────────
|
|
|
|
def msc_check(self) -> dict:
|
|
if not self._ensure_client() or self._msc is None:
|
|
return {"mode_name": None, "simulated": not self._lc_ready}
|
|
try:
|
|
ret = self._msc.CheckMode()
|
|
name = None
|
|
if isinstance(ret, tuple) and len(ret) >= 2 and isinstance(ret[1], dict):
|
|
name = ret[1].get("name")
|
|
elif isinstance(ret, dict):
|
|
name = ret.get("name")
|
|
self._last_msc_mode = name
|
|
return {"mode_name": name}
|
|
except Exception as exc:
|
|
log.error("msc_check failed: %s", exc)
|
|
return {"mode_name": None}
|
|
|
|
def msc_select_ai(self) -> dict:
|
|
if not self._ensure_client() or self._msc is None:
|
|
return {"ok": True, "simulated": True}
|
|
with self._lock:
|
|
ok, _ = self._safe_call("SelectMode(ai)", self._msc.SelectMode, "ai")
|
|
return {"ok": bool(ok), "simulated": False}
|
|
|
|
def msc_release(self) -> dict:
|
|
if not self._ensure_client() or self._msc is None:
|
|
return {"ok": True, "simulated": True}
|
|
with self._lock:
|
|
ok, _ = self._safe_call("ReleaseMode", self._msc.ReleaseMode)
|
|
return {"ok": bool(ok), "simulated": False}
|
|
|
|
def reconnect(self) -> dict:
|
|
"""Drop and rebuild Loco + MSC clients (does NOT re-init the DDS factory)."""
|
|
with self._lock:
|
|
self._bot = None
|
|
self._msc = None
|
|
self._lc_ready = False
|
|
ok = self._ensure_client()
|
|
return {"ok": bool(ok), "lc_ready": self._lc_ready}
|
|
|
|
# ── reads ────────────────────────────────────────────────────────────────
|
|
|
|
def joints(self) -> dict:
|
|
q: list = []
|
|
try:
|
|
if self._arm is not None:
|
|
q = self._arm.get_current_q()
|
|
except Exception:
|
|
q = []
|
|
out = []
|
|
for idx in range(12, 29):
|
|
val = q[idx] if idx < len(q) else 0.0
|
|
out.append({"idx": idx, "name": JOINT_NAMES.get(idx, f"motor_{idx}"),
|
|
"q": float(val)})
|
|
return {"joints": out}
|
|
|
|
def status(self) -> dict:
|
|
# Polling /status lazily brings up the client once arm DDS is ready.
|
|
self._ensure_client()
|
|
fid = self.fsm_id() if self._lc_ready else None
|
|
fmode = self.fsm_mode() if self._lc_ready else None
|
|
walk_ready = bool(self._lc_ready and fid == 200 and fmode not in (None, 2))
|
|
return {
|
|
"sdk_available": _HAS_SDK,
|
|
"lc_ready": self._lc_ready,
|
|
"armed": self._armed,
|
|
"fsm_id": fid,
|
|
"fsm_mode": fmode,
|
|
"walk_ready": walk_ready,
|
|
"msc_mode": self._last_msc_mode,
|
|
"teleop_active": self._teleop_active,
|
|
"last_velocity": {"vx": self._cur_v[0], "vy": self._cur_v[1], "vyaw": self._cur_v[2]},
|
|
"caps": {"walk": self._cap_walk, "run": self._cap_run},
|
|
"arm_initialized": bool(self._arm is not None and getattr(self._arm, "_initialized", False)),
|
|
}
|
|
|
|
# ── shutdown helper ──────────────────────────────────────────────────────
|
|
|
|
def shutdown(self):
|
|
"""Best-effort StopMove + disarm for process shutdown."""
|
|
try:
|
|
self.estop()
|
|
finally:
|
|
self.disarm_movement()
|