"""Replay a recorded arm motion on the G1 via low-level joint control. Refactored from ``G1_Lootah/Manual_Recorder/g1_replay_v4_stable.py``: * No interactive prompts / keyboard poller — fully blocking auto play. * Assumes ``ChannelFactoryInitialize`` was already called by the bridge. * Shares the DDS session; owns its own publisher + subscriber handles. Motion files are standard JSONL where each line is either metadata (``{"meta": {...}}``) or a frame (``{"t": , "q": [29 floats]}``). Arms are joints 15–28; body joints 0–14 are held locked at the standing pose while the recorded trajectory plays on the arms. """ from __future__ import annotations import datetime import json import time from pathlib import Path from typing import List, Optional from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_ from unitree_sdk2py.utils.crc import CRC def _ts() -> str: return datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] G1_NUM_MOTOR = 29 ENABLE_ARM_SDK_INDEX = 29 REPLAY_HZ = 60.0 # Gains copied from the reference script (ROBOT_ARM.PY standard). KP_HIGH = 300.0 KD_HIGH = 3.0 KP_LOW = 80.0 KD_LOW = 3.0 KP_WRIST = 40.0 KD_WRIST = 1.5 WEAK_MOTORS = {4, 10, 15, 16, 17, 18, 22, 23, 24, 25} WRIST_MOTORS = {19, 20, 21, 26, 27, 28} # Move-to-start and return-to-home phase lengths (frames at REPLAY_HZ). STEPS_TO_START = 60 # ~1 s smooth entry STEPS_TO_HOME = 180 # ~3 s smooth exit class ArmReplayer: """Plays recorded joint trajectories. Construct AFTER ChannelFactoryInitialize.""" def __init__(self): self._arm_pub = ChannelPublisher("rt/arm_sdk", LowCmd_) self._arm_pub.Init() self._state_sub = ChannelSubscriber("rt/lowstate", LowState_) self._state_sub.Init(self._on_state, 10) self._low_state: Optional[LowState_] = None self._first_state = False self._low_cmd = unitree_hg_msg_dds__LowCmd_() self._crc = CRC() # ── DDS callbacks ─────────────────────────────────────────────────────── def _on_state(self, msg: LowState_) -> None: self._low_state = msg self._first_state = True # ── internals ─────────────────────────────────────────────────────────── def _send_frame(self, arm_q: List[float], body_lock_q: List[float]) -> None: self._low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 1.0 for i in range(G1_NUM_MOTOR): self._low_cmd.motor_cmd[i].mode = 1 self._low_cmd.motor_cmd[i].dq = 0 self._low_cmd.motor_cmd[i].tau = 0 # Arms follow the replay; body holds its standing pose. self._low_cmd.motor_cmd[i].q = arm_q[i] if i >= 15 else body_lock_q[i] if i in WEAK_MOTORS: self._low_cmd.motor_cmd[i].kp = KP_LOW self._low_cmd.motor_cmd[i].kd = KD_LOW elif i in WRIST_MOTORS: self._low_cmd.motor_cmd[i].kp = KP_WRIST self._low_cmd.motor_cmd[i].kd = KD_WRIST else: self._low_cmd.motor_cmd[i].kp = KP_HIGH self._low_cmd.motor_cmd[i].kd = KD_HIGH self._low_cmd.crc = self._crc.Crc(self._low_cmd) self._arm_pub.Write(self._low_cmd) def _disable_sdk(self) -> None: """Hand arm control back to the internal brain after replay.""" self._low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 0.0 self._low_cmd.crc = self._crc.Crc(self._low_cmd) for _ in range(10): self._arm_pub.Write(self._low_cmd) time.sleep(0.02) @staticmethod def _load_home_q(path: Path) -> List[float]: last_q = None with open(path, "r") as f: for line in f: d = json.loads(line) if "q" in d and len(d["q"]) == G1_NUM_MOTOR: last_q = d["q"] if last_q is None: raise ValueError(f"No valid frame in {path}") return last_q @staticmethod def _load_frames(path: Path) -> List[dict]: frames = [] with open(path, "r") as f: for line in f: d = json.loads(line) if "q" in d: frames.append(d) if not frames: raise ValueError(f"No frames in {path}") return frames # ── public API ────────────────────────────────────────────────────────── def play(self, motion_path: Path, home_path: Path, speed: float = 1.0, wait_state_s: float = 3.0) -> bool: """Blocking replay. Returns True on success, False if anything fails. - ``motion_path`` : JSONL of recorded frames - ``home_path`` : JSONL whose last frame is the arm "home" pose - ``speed`` : 1.0 = original speed; 2.0 = twice as fast - ``wait_state_s``: max seconds to wait for the first LowState_ message """ # Wait for the first lowstate so we know the robot's current pose. t0 = time.time() while not self._first_state: if time.time() - t0 > wait_state_s: print(f"[arm_replay {_ts()}][WARN] no LowState within {wait_state_s}s" f" — aborting replay", flush=True) return False time.sleep(0.05) try: home_q = self._load_home_q(home_path) frames = self._load_frames(motion_path) except Exception as e: print(f"[arm_replay {_ts()}][WARN] load failed: {e}", flush=True) return False body_lock_q = [self._low_state.motor_state[i].q for i in range(G1_NUM_MOTOR)] start_q = frames[0]["q"] t_base = frames[0]["t"] print(f"[arm_replay {_ts()}] playing {motion_path.name} " f"({len(frames)} frames, {(frames[-1]['t']-t_base):.1f}s)", flush=True) # 1. Smooth move arms to the trajectory's starting pose. for k in range(STEPS_TO_START): alpha = k / STEPS_TO_START interp = list(body_lock_q) for j in range(15, 29): interp[j] = (1 - alpha) * body_lock_q[j] + alpha * start_q[j] self._send_frame(interp, body_lock_q) time.sleep(1.0 / REPLAY_HZ) # 2. Play the recorded trajectory at `speed`. last_q = start_q t_play = 0.0 t_real = time.time() while True: t_now = time.time() t_play += (t_now - t_real) * speed t_real = t_now target = None for f in frames: if f["t"] - t_base >= t_play: target = f break if target is None: break self._send_frame(target["q"], body_lock_q) last_q = target["q"] time.sleep(1.0 / REPLAY_HZ) # 3. Smooth return to home pose. for k in range(STEPS_TO_HOME): alpha = k / STEPS_TO_HOME interp = list(last_q) for j in range(15, 29): interp[j] = (1 - alpha) * last_q[j] + alpha * home_q[j] self._send_frame(interp, body_lock_q) time.sleep(1.0 / REPLAY_HZ) # 4. Hand arm control back to the internal brain. self._disable_sdk() print(f"[arm_replay {_ts()}] done", flush=True) return True