# replay_engine.py import json import time from pathlib import Path from unitree_sdk2py.core.channel import ChannelPublisher from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ from unitree_sdk2py.utils.crc import CRC from Core.Logger import Logs G1_NUM_MOTOR = 29 ENABLE_ARM_SDK_INDEX = 29 REPLAY_HZ = 60.0 KP_HIGH = 300.0 KD_HIGH = 3.0 KP_LOW = 80.0 KD_LOW = 3.0 KP_WRIST = 40.0 KD_WRIST = 1.5 WEAK_MOTORS = [4, 10, 15, 16, 17, 18, 22, 23, 24, 25] WRIST_MOTORS = [19, 20, 21, 26, 27, 28] sanad_logger = Logs() sanad_logger.LogEngine("G1_Logs", "replay_engine") def load_home_pose(home_path: Path): try: last_valid_q = None with open(home_path, "r") as f: for line in f: d = json.loads(line) if "q" in d and len(d["q"]) == G1_NUM_MOTOR: last_valid_q = d["q"] if last_valid_q: sanad_logger.print_and_log(f"✅ Loaded Home Pose from {home_path}", "info") return last_valid_q sanad_logger.print_and_log(f"⚠️ Home file has no valid q: {home_path}", "warning") except FileNotFoundError: sanad_logger.print_and_log(f"⚠️ Home file not found: {home_path}", "warning") sanad_logger.print_and_log("⚠️ Using Default Home (Arms at 0.0)", "warning") return [0.0] * G1_NUM_MOTOR class ReplayWithHome: def __init__(self, hub, watchdog_disable_after=1.0): self.hub = hub self.low_cmd = unitree_hg_msg_dds__LowCmd_() self.crc = CRC() self.arm_pub = ChannelPublisher("rt/arm_sdk", LowCmd_) self.arm_pub.Init() self.watchdog_disable_after = float(watchdog_disable_after) self.is_playing = False def _cancel(self) -> bool: if hasattr(self.hub, "hard_cancel_combo"): return bool(self.hub.hard_cancel_combo()) return bool(self.hub.combo_r2l1()) def _send_frame(self, arm_target_q, body_lock_q): self.low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 1.0 for i in range(G1_NUM_MOTOR): self.low_cmd.motor_cmd[i].mode = 1 self.low_cmd.motor_cmd[i].dq = 0 self.low_cmd.motor_cmd[i].tau = 0 self.low_cmd.motor_cmd[i].q = arm_target_q[i] if i >= 15 else body_lock_q[i] if i in WEAK_MOTORS: self.low_cmd.motor_cmd[i].kp = KP_LOW self.low_cmd.motor_cmd[i].kd = KD_LOW elif i in WRIST_MOTORS: self.low_cmd.motor_cmd[i].kp = KP_WRIST self.low_cmd.motor_cmd[i].kd = KD_WRIST else: self.low_cmd.motor_cmd[i].kp = KP_HIGH self.low_cmd.motor_cmd[i].kd = KD_HIGH self.low_cmd.crc = self.crc.Crc(self.low_cmd) self.arm_pub.Write(self.low_cmd) def _disable_sdk(self): sanad_logger.print_and_log("🔌 Disabling SDK...", "info") self.low_cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 0.0 self.low_cmd.crc = self.crc.Crc(self.low_cmd) for _ in range(10): self.arm_pub.Write(self.low_cmd) time.sleep(0.02) def _return_home(self, last_q, body_lock_q, home_q, steps=180): sanad_logger.print_and_log("🏡 Returning arms to HOME...", "info") for k in range(steps): stale_for = time.time() - self.hub.last_state_time if stale_for > self.watchdog_disable_after: sanad_logger.print_and_log("🛑 WATCHDOG: lost during home. Disable SDK.", "error") self._disable_sdk() return if not self.hub.fresh(): self._send_frame(last_q, body_lock_q) time.sleep(1.0 / REPLAY_HZ) continue alpha = k / steps interp_q = list(last_q) for j in range(15, 29): interp_q[j] = (1 - alpha) * last_q[j] + alpha * home_q[j] self._send_frame(interp_q, body_lock_q) time.sleep(1.0 / REPLAY_HZ) sanad_logger.print_and_log("✅ Home Reached.", "info") def run(self, replay_file: Path, home_file: Path, speed: float, trigger_callback=None): self.is_playing = True cancel = False try: sanad_logger.print_and_log(f"🎬 Replay: {replay_file}", "info") while not self.hub.first_state: time.sleep(0.05) home_q = load_home_pose(home_file) body_lock_q = [self.hub.low_state.motor_state[i].q for i in range(G1_NUM_MOTOR)] frames = [] with open(replay_file, "r") as f: for line in f: line = line.strip() if not line: continue try: d = json.loads(line) except Exception: continue if "q" in d and "t" in d: frames.append(d) if not frames: sanad_logger.print_and_log("❌ No frames found.", "error") return file_start_q = frames[0]["q"] last_played_q = file_start_q # Move to start for k in range(60): if self._cancel(): cancel = True sanad_logger.print_and_log("🛑 CANCEL during move-to-start.", "warning") break if not self.hub.fresh(): self._send_frame(last_played_q, body_lock_q) time.sleep(1.0 / REPLAY_HZ) continue alpha = k / 60 interp_q = list(body_lock_q) for j in range(15, 29): interp_q[j] = (1 - alpha) * body_lock_q[j] + alpha * file_start_q[j] self._send_frame(interp_q, body_lock_q) last_played_q = interp_q time.sleep(1.0 / REPLAY_HZ) # Play if not cancel: sanad_logger.print_and_log("▶️ Playing...", "info") play_elapsed = 0.0 last_real = time.time() while True: if self._cancel(): cancel = True sanad_logger.print_and_log("🛑 CANCEL during replay.", "warning") break stale_for = time.time() - self.hub.last_state_time if stale_for > self.watchdog_disable_after: sanad_logger.print_and_log("🛑 WATCHDOG: lost. Disable SDK.", "error") self._disable_sdk() return if not self.hub.fresh(): self._send_frame(last_played_q, body_lock_q) time.sleep(1.0 / REPLAY_HZ) continue now = time.time() dt = now - last_real last_real = now play_elapsed += dt * speed target = None for fr in frames: if fr["t"] - frames[0]["t"] >= play_elapsed: target = fr break if target is None: break # If this frame contains a trigger marker, call the callback (non-blocking) try: meta = target.get("meta") if isinstance(target, dict) else None if trigger_callback and meta: # support meta: "trigger" or meta: {"trigger": true} call_cb = False if isinstance(meta, str) and "trigger" in meta.lower(): call_cb = True elif isinstance(meta, dict) and meta.get("trigger"): call_cb = True if call_cb: try: from threading import Thread as _Thread _Thread(target=lambda: trigger_callback(target), daemon=True).start() sanad_logger.print_and_log("🔔 Trigger callback invoked (replay).", "info") except Exception as _e: sanad_logger.print_and_log(f"Trigger callback error: {_e}", "warning") except Exception: pass self._send_frame(target["q"], body_lock_q) last_played_q = target["q"] time.sleep(1.0 / REPLAY_HZ) # Always home self._return_home(last_played_q, body_lock_q, home_q, steps=180) self._disable_sdk() except Exception as e: sanad_logger.print_and_log(f"❌ Replay exception: {e}", "error") try: self._disable_sdk() except Exception: pass finally: self.is_playing = False