286 lines
10 KiB
Python
286 lines
10 KiB
Python
import os
|
|
import time
|
|
import json
|
|
import threading
|
|
import traceback
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from dataclasses import dataclass
|
|
|
|
# ==================================================
|
|
# ⚙️ Config (from config/motion_config.json)
|
|
# ==================================================
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
|
|
try:
|
|
from Project.Sanad.core.config_loader import section as _cfg_section
|
|
_MCFG = _cfg_section("motion", "sanad_arm_controller")
|
|
except Exception:
|
|
_MCFG = {}
|
|
|
|
# Ensure defaults for any missing key
|
|
_MCFG.setdefault("action_cooldown_sec", 1.0)
|
|
_MCFG.setdefault("stability_threshold", 0.06)
|
|
_MCFG.setdefault("gains", {})
|
|
_MCFG["gains"].setdefault("kp_high", 300.0)
|
|
_MCFG["gains"].setdefault("kd_high", 3.0)
|
|
_MCFG["gains"].setdefault("kp_low", 80.0)
|
|
_MCFG["gains"].setdefault("kd_low", 3.0)
|
|
_MCFG["gains"].setdefault("kp_wrist", 40.0)
|
|
_MCFG["gains"].setdefault("kd_wrist", 1.5)
|
|
_MCFG.setdefault("weak_motors", [4, 10, 15, 16, 17, 18, 22, 23, 24, 25])
|
|
_MCFG.setdefault("wrist_motors", [19, 20, 21, 26, 27, 28])
|
|
_MCFG.setdefault("data_subdir", "DataG1")
|
|
|
|
# ==================================================
|
|
# ✅ Option List
|
|
# ==================================================
|
|
@dataclass(frozen=True)
|
|
class TestOption:
|
|
name: str
|
|
id: int
|
|
file: str = ""
|
|
|
|
OPTION_LIST = [
|
|
TestOption(name="release arm", id=0),
|
|
TestOption(name="shake hand", id=1),
|
|
TestOption(name="high five", id=2),
|
|
TestOption(name="hug", id=3),
|
|
TestOption(name="high wave", id=4),
|
|
TestOption(name="clap", id=5),
|
|
TestOption(name="face wave", id=6),
|
|
TestOption(name="left kiss", id=7),
|
|
TestOption(name="heart", id=8),
|
|
TestOption(name="right heart", id=9),
|
|
TestOption(name="hands up", id=10),
|
|
TestOption(name="x-ray", id=11),
|
|
TestOption(name="right hand up", id=12),
|
|
TestOption(name="reject", id=13),
|
|
TestOption(name="right kiss", id=14),
|
|
TestOption(name="two-hand kiss", id=15),
|
|
TestOption(name="release arm recorded", id=30, file="arm_home.jsonl"),
|
|
TestOption(name="laugh", id=23, file="laugh.jsonl"),
|
|
TestOption(name="bird", id=24, file="bird.jsonl"),
|
|
TestOption(name="change battery", id=25, file="change_battery.jsonl"),
|
|
TestOption(name="move hands up", id=26, file="hands_up.jsonl"),
|
|
TestOption(name="move right hand up", id=27, file="right_hand_up.jsonl"),
|
|
TestOption(name="move left hand up", id=28, file="left_hand_up.jsonl"),
|
|
]
|
|
|
|
OPTION_BY_ID = {o.id: o for o in OPTION_LIST}
|
|
OPTION_BY_NAME = {o.name.lower(): o for o in OPTION_LIST}
|
|
|
|
# ==================================================
|
|
# 🦾 Unitree SDK Configuration
|
|
# ==================================================
|
|
try:
|
|
from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber
|
|
from unitree_sdk2py.g1.arm.g1_arm_action_client import G1ArmActionClient, action_map
|
|
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
|
|
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_
|
|
from unitree_sdk2py.utils.crc import CRC
|
|
_ROBOT_SDK_AVAILABLE = True
|
|
except Exception:
|
|
ChannelFactoryInitialize = None
|
|
G1ArmActionClient = None
|
|
action_map = {}
|
|
LowCmd_ = LowState_ = None
|
|
unitree_hg_msg_dds__LowCmd_ = None
|
|
CRC = None
|
|
_ROBOT_SDK_AVAILABLE = False
|
|
_UNITREE_IMPORT_ERR = traceback.format_exc()
|
|
|
|
# G1 hardware constants — single source in config/core_config.json
|
|
from Project.Sanad.config import G1_NUM_MOTOR, ENABLE_ARM_SDK_INDEX, REPLAY_HZ
|
|
DATA_DIR = BASE_DIR / _MCFG["data_subdir"]
|
|
|
|
ACTION_COOLDOWN_SEC = _MCFG["action_cooldown_sec"]
|
|
STABILITY_THRESHOLD = _MCFG["stability_threshold"]
|
|
|
|
_G = _MCFG["gains"]
|
|
KP_HIGH, KD_HIGH = _G["kp_high"], _G["kd_high"]
|
|
KP_LOW, KD_LOW = _G["kp_low"], _G["kd_low"]
|
|
KP_WRIST, KD_WRIST = _G["kp_wrist"], _G["kd_wrist"]
|
|
WEAK_MOTORS = _MCFG["weak_motors"]
|
|
WRIST_MOTORS = _MCFG["wrist_motors"]
|
|
|
|
|
|
class ArmController:
|
|
def __init__(self, cooldown_sec: float = ACTION_COOLDOWN_SEC):
|
|
self._ready = False
|
|
self.cooldown_sec = float(cooldown_sec)
|
|
self._last_action_time = 0.0
|
|
self.low_state = None
|
|
self.crc = CRC() if CRC else None
|
|
self._pub = None
|
|
self._sub = None
|
|
self._client = None
|
|
self._is_busy = False
|
|
self._init_lock = threading.Lock()
|
|
|
|
def init(self) -> bool:
|
|
with self._init_lock:
|
|
if self._ready:
|
|
return True
|
|
if ChannelFactoryInitialize is None:
|
|
return False
|
|
try:
|
|
ChannelFactoryInitialize(0)
|
|
self._pub = ChannelPublisher("rt/arm_sdk", LowCmd_)
|
|
self._pub.Init()
|
|
self._sub = ChannelSubscriber("rt/lowstate", LowState_)
|
|
self._sub.Init(self._low_state_handler, 10)
|
|
if G1ArmActionClient:
|
|
self._client = G1ArmActionClient()
|
|
self._client.SetTimeout(10.0)
|
|
self._client.Init()
|
|
self._ready = True
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def _low_state_handler(self, msg: LowState_):
|
|
self.low_state = msg
|
|
|
|
def _cooldown_ok(self) -> bool:
|
|
now = time.time()
|
|
return (now - self._last_action_time) >= self.cooldown_sec
|
|
|
|
def _load_home_pose(self):
|
|
path = DATA_DIR / "arm_home.jsonl"
|
|
try:
|
|
last_q = [0.0] * G1_NUM_MOTOR
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
d = json.loads(line)
|
|
if "q" in d:
|
|
last_q = d["q"]
|
|
return last_q
|
|
except Exception:
|
|
return [0.0] * G1_NUM_MOTOR
|
|
|
|
def _is_pose_stable(self, target_q):
|
|
if not self.low_state:
|
|
return False
|
|
current_q = np.array([self.low_state.motor_state[i].q for i in range(15, 29)])
|
|
target_arm_q = np.array(target_q[15:29])
|
|
diff = np.abs(current_q - target_arm_q)
|
|
return np.max(diff) < STABILITY_THRESHOLD
|
|
|
|
def _send_frame(self, arm_q, body_lock_q):
|
|
if not self._pub:
|
|
return
|
|
cmd = unitree_hg_msg_dds__LowCmd_()
|
|
cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 1.0
|
|
for i in range(G1_NUM_MOTOR):
|
|
cmd.motor_cmd[i].mode = 1
|
|
cmd.motor_cmd[i].q = arm_q[i] if i >= 15 else body_lock_q[i]
|
|
if i in WEAK_MOTORS:
|
|
cmd.motor_cmd[i].kp, cmd.motor_cmd[i].kd = KP_LOW, KD_LOW
|
|
elif i in WRIST_MOTORS:
|
|
cmd.motor_cmd[i].kp, cmd.motor_cmd[i].kd = KP_WRIST, KD_WRIST
|
|
else:
|
|
cmd.motor_cmd[i].kp, cmd.motor_cmd[i].kd = KP_HIGH, KD_HIGH
|
|
cmd.crc = self.crc.Crc(cmd)
|
|
self._pub.Write(cmd)
|
|
|
|
def _managed_replay(self, filename: str):
|
|
try:
|
|
path = DATA_DIR / filename
|
|
frames = []
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
d = json.loads(line)
|
|
if "q" in d:
|
|
frames.append(d)
|
|
|
|
if not frames or not self.low_state:
|
|
return
|
|
|
|
body_lock_q = [self.low_state.motor_state[i].q for i in range(G1_NUM_MOTOR)]
|
|
home_q = self._load_home_pose()
|
|
|
|
# 1. Smooth match to start pose (90 frames ≈ 1.5s — prevents jerk)
|
|
start_q = frames[0]["q"]
|
|
ramp_in = 90
|
|
for k in range(ramp_in):
|
|
alpha = k / ramp_in
|
|
interp_q = list(body_lock_q)
|
|
for j in range(15, 29):
|
|
interp_q[j] = (1 - alpha) * body_lock_q[j] + alpha * start_q[j]
|
|
self._send_frame(interp_q, body_lock_q)
|
|
time.sleep(1.0 / REPLAY_HZ)
|
|
|
|
# 2. Play frames
|
|
last_played_q = start_q
|
|
for f in frames:
|
|
self._send_frame(f["q"], body_lock_q)
|
|
last_played_q = f["q"]
|
|
time.sleep(1.0 / REPLAY_HZ)
|
|
|
|
# 3. Smooth return to home
|
|
for k in range(80):
|
|
alpha = k / 80
|
|
interp_home_q = list(body_lock_q)
|
|
for j in range(15, 29):
|
|
interp_home_q[j] = (1 - alpha) * last_played_q[j] + alpha * home_q[j]
|
|
self._send_frame(interp_home_q, body_lock_q)
|
|
time.sleep(1.0 / REPLAY_HZ)
|
|
|
|
# Sensor confirmation
|
|
confirm_start = time.time()
|
|
while time.time() - confirm_start < 2.0:
|
|
if self._is_pose_stable(home_q):
|
|
break
|
|
time.sleep(0.05)
|
|
|
|
finally:
|
|
if self._pub:
|
|
cmd = unitree_hg_msg_dds__LowCmd_()
|
|
cmd.motor_cmd[ENABLE_ARM_SDK_INDEX].q = 0.0
|
|
cmd.crc = self.crc.Crc(cmd)
|
|
for _ in range(5):
|
|
self._pub.Write(cmd)
|
|
time.sleep(0.01)
|
|
|
|
self._is_busy = False
|
|
self._last_action_time = time.time()
|
|
print("🔓 Ready.")
|
|
|
|
def _managed_sdk_action(self, action_name: str):
|
|
try:
|
|
if self._client and action_name in action_map:
|
|
print(f"🤖 SDK START: {action_name}")
|
|
self._client.ExecuteAction(action_map.get(action_name))
|
|
time.sleep(3.5)
|
|
finally:
|
|
self._is_busy = False
|
|
self._last_action_time = time.time()
|
|
print("🔓 Ready.")
|
|
|
|
def trigger_action_by_id(self, action_id: int):
|
|
if not self.init():
|
|
return
|
|
if self._is_busy:
|
|
return
|
|
if not self._cooldown_ok():
|
|
return
|
|
|
|
opt = OPTION_BY_ID.get(int(action_id))
|
|
if opt:
|
|
self._is_busy = True
|
|
if opt.file:
|
|
threading.Thread(target=self._managed_replay, args=(opt.file,), daemon=True).start()
|
|
elif self._client and opt.name in action_map:
|
|
threading.Thread(target=self._managed_sdk_action, args=(opt.name,), daemon=True).start()
|
|
else:
|
|
self._is_busy = False
|
|
|
|
def trigger_action_by_name(self, action_name: str):
|
|
opt = OPTION_BY_NAME.get(action_name.lower())
|
|
if opt:
|
|
self.trigger_action_by_id(opt.id)
|
|
|
|
|
|
ARM = ArmController()
|