Saqr/robot/arm_replay.py

196 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Replay a recorded arm motion on the G1 via low-level joint control.
Refactored from ``G1_Lootah/Manual_Recorder/g1_replay_v4_stable.py``:
* No interactive prompts / keyboard poller — fully blocking auto play.
* Assumes ``ChannelFactoryInitialize`` was already called by the bridge.
* Shares the DDS session; owns its own publisher + subscriber handles.
Motion files are standard JSONL where each line is either metadata
(``{"meta": {...}}``) or a frame (``{"t": <seconds>, "q": [29 floats]}``).
Arms are joints 1528; body joints 014 are held locked at the standing
pose while the recorded trajectory plays on the arms.
"""
from __future__ import annotations
import datetime
import json
import time
from pathlib import Path
from typing import List, Optional
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_
from unitree_sdk2py.utils.crc import CRC
def _ts() -> str:
return datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
G1_NUM_MOTOR = 29
ENABLE_ARM_SDK_INDEX = 29
REPLAY_HZ = 60.0
# Gains copied from the reference script (ROBOT_ARM.PY standard).
KP_HIGH = 300.0
KD_HIGH = 3.0
KP_LOW = 80.0
KD_LOW = 3.0
KP_WRIST = 40.0
KD_WRIST = 1.5
WEAK_MOTORS = {4, 10, 15, 16, 17, 18, 22, 23, 24, 25}
WRIST_MOTORS = {19, 20, 21, 26, 27, 28}
# Move-to-start and return-to-home phase lengths (frames at REPLAY_HZ).
STEPS_TO_START = 60 # ~1 s smooth entry
STEPS_TO_HOME = 180 # ~3 s smooth exit
class ArmReplayer:
"""Plays recorded joint trajectories. Construct AFTER ChannelFactoryInitialize."""
def __init__(self):
self._arm_pub = ChannelPublisher("rt/arm_sdk", LowCmd_)
self._arm_pub.Init()
self._state_sub = ChannelSubscriber("rt/lowstate", LowState_)
self._state_sub.Init(self._on_state, 10)
self._low_state: Optional[LowState_] = None
self._first_state = False
self._low_cmd = unitree_hg_msg_dds__LowCmd_()
self._crc = CRC()
# ── DDS callbacks ───────────────────────────────────────────────────────
def _on_state(self, msg: LowState_) -> None:
self._low_state = msg
self._first_state = True
# ── internals ───────────────────────────────────────────────────────────
def _send_frame(self, arm_q: List[float], body_lock_q: List[float]) -> None:
self._low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 1.0
for i in range(G1_NUM_MOTOR):
self._low_cmd.motor_cmd[i].mode = 1
self._low_cmd.motor_cmd[i].dq = 0
self._low_cmd.motor_cmd[i].tau = 0
# Arms follow the replay; body holds its standing pose.
self._low_cmd.motor_cmd[i].q = arm_q[i] if i >= 15 else body_lock_q[i]
if i in WEAK_MOTORS:
self._low_cmd.motor_cmd[i].kp = KP_LOW
self._low_cmd.motor_cmd[i].kd = KD_LOW
elif i in WRIST_MOTORS:
self._low_cmd.motor_cmd[i].kp = KP_WRIST
self._low_cmd.motor_cmd[i].kd = KD_WRIST
else:
self._low_cmd.motor_cmd[i].kp = KP_HIGH
self._low_cmd.motor_cmd[i].kd = KD_HIGH
self._low_cmd.crc = self._crc.Crc(self._low_cmd)
self._arm_pub.Write(self._low_cmd)
def _disable_sdk(self) -> None:
"""Hand arm control back to the internal brain after replay."""
self._low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 0.0
self._low_cmd.crc = self._crc.Crc(self._low_cmd)
for _ in range(10):
self._arm_pub.Write(self._low_cmd)
time.sleep(0.02)
@staticmethod
def _load_home_q(path: Path) -> List[float]:
last_q = None
with open(path, "r") as f:
for line in f:
d = json.loads(line)
if "q" in d and len(d["q"]) == G1_NUM_MOTOR:
last_q = d["q"]
if last_q is None:
raise ValueError(f"No valid frame in {path}")
return last_q
@staticmethod
def _load_frames(path: Path) -> List[dict]:
frames = []
with open(path, "r") as f:
for line in f:
d = json.loads(line)
if "q" in d:
frames.append(d)
if not frames:
raise ValueError(f"No frames in {path}")
return frames
# ── public API ──────────────────────────────────────────────────────────
def play(self, motion_path: Path, home_path: Path,
speed: float = 1.0, wait_state_s: float = 3.0) -> bool:
"""Blocking replay. Returns True on success, False if anything fails.
- ``motion_path`` : JSONL of recorded frames
- ``home_path`` : JSONL whose last frame is the arm "home" pose
- ``speed`` : 1.0 = original speed; 2.0 = twice as fast
- ``wait_state_s``: max seconds to wait for the first LowState_ message
"""
# Wait for the first lowstate so we know the robot's current pose.
t0 = time.time()
while not self._first_state:
if time.time() - t0 > wait_state_s:
print(f"[arm_replay {_ts()}][WARN] no LowState within {wait_state_s}s"
f" — aborting replay", flush=True)
return False
time.sleep(0.05)
try:
home_q = self._load_home_q(home_path)
frames = self._load_frames(motion_path)
except Exception as e:
print(f"[arm_replay {_ts()}][WARN] load failed: {e}", flush=True)
return False
body_lock_q = [self._low_state.motor_state[i].q for i in range(G1_NUM_MOTOR)]
start_q = frames[0]["q"]
t_base = frames[0]["t"]
print(f"[arm_replay {_ts()}] playing {motion_path.name} "
f"({len(frames)} frames, {(frames[-1]['t']-t_base):.1f}s)", flush=True)
# 1. Smooth move arms to the trajectory's starting pose.
for k in range(STEPS_TO_START):
alpha = k / STEPS_TO_START
interp = list(body_lock_q)
for j in range(15, 29):
interp[j] = (1 - alpha) * body_lock_q[j] + alpha * start_q[j]
self._send_frame(interp, body_lock_q)
time.sleep(1.0 / REPLAY_HZ)
# 2. Play the recorded trajectory at `speed`.
last_q = start_q
t_play = 0.0
t_real = time.time()
while True:
t_now = time.time()
t_play += (t_now - t_real) * speed
t_real = t_now
target = None
for f in frames:
if f["t"] - t_base >= t_play:
target = f
break
if target is None:
break
self._send_frame(target["q"], body_lock_q)
last_q = target["q"]
time.sleep(1.0 / REPLAY_HZ)
# 3. Smooth return to home pose.
for k in range(STEPS_TO_HOME):
alpha = k / STEPS_TO_HOME
interp = list(last_q)
for j in range(15, 29):
interp[j] = (1 - alpha) * last_q[j] + alpha * home_q[j]
self._send_frame(interp, body_lock_q)
time.sleep(1.0 / REPLAY_HZ)
# 4. Hand arm control back to the internal brain.
self._disable_sdk()
print(f"[arm_replay {_ts()}] done", flush=True)
return True