import os import time import json import threading import traceback import numpy as np from pathlib import Path from dataclasses import dataclass # ================================================== # ⚙️ Config (from config/motion_config.json) # ================================================== BASE_DIR = Path(__file__).resolve().parent try: from Project.Sanad.core.config_loader import section as _cfg_section _MCFG = _cfg_section("motion", "sanad_arm_controller") except Exception: _MCFG = {} # Ensure defaults for any missing key _MCFG.setdefault("action_cooldown_sec", 1.0) _MCFG.setdefault("stability_threshold", 0.06) _MCFG.setdefault("gains", {}) _MCFG["gains"].setdefault("kp_high", 300.0) _MCFG["gains"].setdefault("kd_high", 3.0) _MCFG["gains"].setdefault("kp_low", 80.0) _MCFG["gains"].setdefault("kd_low", 3.0) _MCFG["gains"].setdefault("kp_wrist", 40.0) _MCFG["gains"].setdefault("kd_wrist", 1.5) _MCFG.setdefault("weak_motors", [4, 10, 15, 16, 17, 18, 22, 23, 24, 25]) _MCFG.setdefault("wrist_motors", [19, 20, 21, 26, 27, 28]) _MCFG.setdefault("data_subdir", "DataG1") # ================================================== # ✅ Option List # ================================================== @dataclass(frozen=True) class TestOption: name: str id: int file: str = "" OPTION_LIST = [ TestOption(name="release arm", id=0), TestOption(name="shake hand", id=1), TestOption(name="high five", id=2), TestOption(name="hug", id=3), TestOption(name="high wave", id=4), TestOption(name="clap", id=5), TestOption(name="face wave", id=6), TestOption(name="left kiss", id=7), TestOption(name="heart", id=8), TestOption(name="right heart", id=9), TestOption(name="hands up", id=10), TestOption(name="x-ray", id=11), TestOption(name="right hand up", id=12), TestOption(name="reject", id=13), TestOption(name="right kiss", id=14), TestOption(name="two-hand kiss", id=15), TestOption(name="release arm recorded", id=30, file="arm_home.jsonl"), TestOption(name="laugh", id=23, file="laugh.jsonl"), TestOption(name="bird", id=24, file="bird.jsonl"), TestOption(name="change battery", id=25, file="change_battery.jsonl"), TestOption(name="move hands up", id=26, file="hands_up.jsonl"), TestOption(name="move right hand up", id=27, file="right_hand_up.jsonl"), TestOption(name="move left hand up", id=28, file="left_hand_up.jsonl"), ] OPTION_BY_ID = {o.id: o for o in OPTION_LIST} OPTION_BY_NAME = {o.name.lower(): o for o in OPTION_LIST} # ================================================== # 🦾 Unitree SDK Configuration # ================================================== try: from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber from unitree_sdk2py.g1.arm.g1_arm_action_client import G1ArmActionClient, action_map from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_ from unitree_sdk2py.utils.crc import CRC _ROBOT_SDK_AVAILABLE = True except Exception: ChannelFactoryInitialize = None G1ArmActionClient = None action_map = {} LowCmd_ = LowState_ = None unitree_hg_msg_dds__LowCmd_ = None CRC = None _ROBOT_SDK_AVAILABLE = False _UNITREE_IMPORT_ERR = traceback.format_exc() # G1 hardware constants — single source in config/core_config.json from Project.Sanad.config import G1_NUM_MOTOR, ENABLE_ARM_SDK_INDEX, REPLAY_HZ DATA_DIR = BASE_DIR / _MCFG["data_subdir"] ACTION_COOLDOWN_SEC = _MCFG["action_cooldown_sec"] STABILITY_THRESHOLD = _MCFG["stability_threshold"] _G = _MCFG["gains"] KP_HIGH, KD_HIGH = _G["kp_high"], _G["kd_high"] KP_LOW, KD_LOW = _G["kp_low"], _G["kd_low"] KP_WRIST, KD_WRIST = _G["kp_wrist"], _G["kd_wrist"] WEAK_MOTORS = _MCFG["weak_motors"] WRIST_MOTORS = _MCFG["wrist_motors"] class ArmController: def __init__(self, cooldown_sec: float = ACTION_COOLDOWN_SEC): self._ready = False self.cooldown_sec = float(cooldown_sec) self._last_action_time = 0.0 self.low_state = None self.crc = CRC() if CRC else None self._pub = None self._sub = None self._client = None self._is_busy = False self._init_lock = threading.Lock() def init(self) -> bool: with self._init_lock: if self._ready: return True if ChannelFactoryInitialize is None: return False try: ChannelFactoryInitialize(0) self._pub = ChannelPublisher("rt/arm_sdk", LowCmd_) self._pub.Init() self._sub = ChannelSubscriber("rt/lowstate", LowState_) self._sub.Init(self._low_state_handler, 10) if G1ArmActionClient: self._client = G1ArmActionClient() self._client.SetTimeout(10.0) self._client.Init() self._ready = True return True except Exception: return False def _low_state_handler(self, msg: LowState_): self.low_state = msg def _cooldown_ok(self) -> bool: now = time.time() return (now - self._last_action_time) >= self.cooldown_sec def _load_home_pose(self): path = DATA_DIR / "arm_home.jsonl" try: last_q = [0.0] * G1_NUM_MOTOR with open(path, "r", encoding="utf-8") as f: for line in f: d = json.loads(line) if "q" in d: last_q = d["q"] return last_q except Exception: return [0.0] * G1_NUM_MOTOR def _is_pose_stable(self, target_q): if not self.low_state: return False current_q = np.array([self.low_state.motor_state[i].q for i in range(15, 29)]) target_arm_q = np.array(target_q[15:29]) diff = np.abs(current_q - target_arm_q) return np.max(diff) < STABILITY_THRESHOLD def _send_frame(self, arm_q, body_lock_q): if not self._pub: return cmd = unitree_hg_msg_dds__LowCmd_() cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 1.0 for i in range(G1_NUM_MOTOR): cmd.motor_cmd[i].mode = 1 cmd.motor_cmd[i].q = arm_q[i] if i >= 15 else body_lock_q[i] if i in WEAK_MOTORS: cmd.motor_cmd[i].kp, cmd.motor_cmd[i].kd = KP_LOW, KD_LOW elif i in WRIST_MOTORS: cmd.motor_cmd[i].kp, cmd.motor_cmd[i].kd = KP_WRIST, KD_WRIST else: cmd.motor_cmd[i].kp, cmd.motor_cmd[i].kd = KP_HIGH, KD_HIGH cmd.crc = self.crc.Crc(cmd) self._pub.Write(cmd) def _managed_replay(self, filename: str): try: path = DATA_DIR / filename frames = [] with open(path, "r", encoding="utf-8") as f: for line in f: d = json.loads(line) if "q" in d: frames.append(d) if not frames or not self.low_state: return body_lock_q = [self.low_state.motor_state[i].q for i in range(G1_NUM_MOTOR)] home_q = self._load_home_pose() # 1. Smooth match to start pose (90 frames ≈ 1.5s — prevents jerk) start_q = frames[0]["q"] ramp_in = 90 for k in range(ramp_in): alpha = k / ramp_in interp_q = list(body_lock_q) for j in range(15, 29): interp_q[j] = (1 - alpha) * body_lock_q[j] + alpha * start_q[j] self._send_frame(interp_q, body_lock_q) time.sleep(1.0 / REPLAY_HZ) # 2. Play frames last_played_q = start_q for f in frames: self._send_frame(f["q"], body_lock_q) last_played_q = f["q"] time.sleep(1.0 / REPLAY_HZ) # 3. Smooth return to home for k in range(80): alpha = k / 80 interp_home_q = list(body_lock_q) for j in range(15, 29): interp_home_q[j] = (1 - alpha) * last_played_q[j] + alpha * home_q[j] self._send_frame(interp_home_q, body_lock_q) time.sleep(1.0 / REPLAY_HZ) # Sensor confirmation confirm_start = time.time() while time.time() - confirm_start < 2.0: if self._is_pose_stable(home_q): break time.sleep(0.05) finally: if self._pub: cmd = unitree_hg_msg_dds__LowCmd_() cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 0.0 cmd.crc = self.crc.Crc(cmd) for _ in range(5): self._pub.Write(cmd) time.sleep(0.01) self._is_busy = False self._last_action_time = time.time() print("🔓 Ready.") def _managed_sdk_action(self, action_name: str): try: if self._client and action_name in action_map: print(f"🤖 SDK START: {action_name}") self._client.ExecuteAction(action_map.get(action_name)) time.sleep(3.5) finally: self._is_busy = False self._last_action_time = time.time() print("🔓 Ready.") def trigger_action_by_id(self, action_id: int): if not self.init(): return if self._is_busy: return if not self._cooldown_ok(): return opt = OPTION_BY_ID.get(int(action_id)) if opt: self._is_busy = True if opt.file: threading.Thread(target=self._managed_replay, args=(opt.file,), daemon=True).start() elif self._client and opt.name in action_map: threading.Thread(target=self._managed_sdk_action, args=(opt.name,), daemon=True).start() else: self._is_busy = False def trigger_action_by_name(self, action_name: str): opt = OPTION_BY_NAME.get(action_name.lower()) if opt: self.trigger_action_by_id(opt.id) ARM = ArmController()