Saqr/robot/robot_controller.py

432 lines
19 KiB
Python

"""G1 arm + audio + LowState DDS client owned by the bridge.
Announcements run on a dedicated worker thread. Each queue item is a tuple
``(text, category, key)``. The worker picks WAV playback via
``AudioClient.PlayStream`` when the clip exists under ``assets/audio/`` and
``tts.mode`` allows, otherwise falls back to ``TtsMaker`` with the adaptive
busy-factor backoff for 3104 ("device busy") errors.
"""
from __future__ import annotations
import collections
import datetime
import threading
import time
from typing import Deque, Optional, Tuple
from utils.config import load_config
_ROBOT = load_config("robot")
_TTS = _ROBOT["tts"]
_ARM = _ROBOT["arm"]
TTS_VOLUME = _TTS["volume"]
TTS_SECONDS_PER_CHAR = _TTS["seconds_per_char"]
TTS_MIN_SECONDS = _TTS["min_seconds"]
TTS_QUEUE_MAX = _TTS["queue_max"]
TTS_BUSY_FACTOR_MIN = _TTS["busy_factor"]["min"]
TTS_BUSY_FACTOR_MAX = _TTS["busy_factor"]["max"]
TTS_BUSY_FACTOR_UP = _TTS["busy_factor"]["up"]
TTS_BUSY_FACTOR_DOWN = _TTS["busy_factor"]["down"]
TTS_MODE = _TTS.get("mode", "tts_only") # tts_only | recorded_or_tts | recorded_only
REJECT_ACTION = _ARM["reject_action"]
RELEASE_ACTION = _ARM["release_action"]
# unitree_sdk2py/g1/audio/g1_audio_client.py bug: TtsMaker does
# self.tts_index += self.tts_index # 0+0=0, never increments
# We bypass the broken wrapper and call _Call(ROBOT_API_ID_AUDIO_TTS, ...)
# with our own counter so the firmware sees a unique index per request.
ROBOT_API_ID_AUDIO_TTS = 1001
ROBOT_API_ID_AUDIO_STOP_PLAY = 1004
# Audio RPC timeout: firmware occasionally stalls after long idle + prior
# arm-sdk activity. Short timeout + retry recovers in seconds instead of
# the 10s bridge default.
TTS_RPC_TIMEOUT_S = 3.0
TTS_RETRY_MAX_ATTEMPTS = 2
TTS_RETRY_DELAY_S = 0.5
# Prior-art reset pattern (G1_Lootah/Audio_Recorder/voice_note.txt and
# g1_tts_arabic.py:50): call AUDIO_STOP_PLAY with a 300 ms settle before
# each new audio operation or the firmware state goes stale and subsequent
# TtsMaker calls time out with rc=3104.
AUDIO_STOP_APP_NAME = "tts"
AUDIO_STOP_SETTLE_S = 0.3
# Recorded arm motions (assets/motions/*.jsonl) replace the high-level
# ExecuteAction('reject') call with a teach-and-replay trajectory.
_MOTION = _ROBOT.get("motion", {})
MOTION_ENABLED = bool(_MOTION.get("enabled", True))
MOTION_UNSAFE_FILE = _MOTION.get("unsafe_file", "adnoc1.jsonl")
MOTION_HOME_FILE = _MOTION.get("home_file", "arm_home.jsonl")
MOTION_SPEED = float(_MOTION.get("speed", 1.0))
QueueItem = Tuple[str, Optional[str], Optional[str]] # (text, category, key)
def _ts() -> str:
return datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
class RobotController:
"""Owns both the G1 arm action client and the G1 audio (TTS + PlayStream) client."""
def __init__(self, iface: Optional[str], timeout: float, dry_run: bool,
tts_speaker_id: int, want_lowstate: bool = True):
self.dry_run = dry_run
self.tts_speaker_id = tts_speaker_id
self.arm_client = None
self.audio_client = None
self._action_map = None
self.hub = None
self._lowstate_sub = None
self._player = None # AudioPlayer, lazily initialised
self._arm_replayer = None # ArmReplayer, lazily initialised
self._tts_queue: Deque[QueueItem] = collections.deque(maxlen=TTS_QUEUE_MAX)
self._tts_event = threading.Event()
# Set when the worker is not dispatching AND the queue is empty. Cleared
# on speak(). Callers who want to wait for audio to finish before
# issuing arm commands (so the firmware doesn't serialise-busy on us)
# can await this event.
self._audio_idle = threading.Event()
self._audio_idle.set()
self._tts_worker_stop = threading.Event()
self._tts_worker_thread: Optional[threading.Thread] = None
self._tts_busy_factor: float = TTS_BUSY_FACTOR_MIN
self._tts_last_call_t: float = 0.0
self._tts_call_count: int = 0
self._tts_busy_count: int = 0
self._tts_index: int = 0
if dry_run:
print(f"[BRIDGE {_ts()}] DRY RUN — G1 SDK will not be loaded.", flush=True)
return
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
from unitree_sdk2py.g1.arm.g1_arm_action_client import (
G1ArmActionClient,
action_map,
)
from unitree_sdk2py.g1.audio.g1_audio_client import AudioClient
self._action_map = action_map
if iface:
ChannelFactoryInitialize(0, iface)
else:
ChannelFactoryInitialize(0)
self.arm_client = G1ArmActionClient()
self.arm_client.SetTimeout(timeout)
self.arm_client.Init()
print(f"[BRIDGE {_ts()}] G1ArmActionClient ready (iface={iface or 'default'})",
flush=True)
self.audio_client = AudioClient()
# Override the bridge-wide 10s with a shorter audio-specific timeout
# so TtsMaker hangs (rc=3104 after full timeout) recover quickly via
# the retry loop in _speak_blocking instead of blocking the worker.
self.audio_client.SetTimeout(TTS_RPC_TIMEOUT_S)
self.audio_client.Init()
try:
self.audio_client.SetVolume(TTS_VOLUME)
except Exception as e:
print(f"[BRIDGE {_ts()}][WARN] AudioClient.SetVolume failed: {e}", flush=True)
print(f"[BRIDGE {_ts()}] G1 AudioClient ready (speaker_id={tts_speaker_id}, "
f"tts_mode={TTS_MODE})", flush=True)
# Pre-recorded clip library (WAVs under assets/audio/).
from robot.audio_player import AudioPlayer
self._player = AudioPlayer(self.audio_client)
# Recorded arm-motion replayer (teach-and-replay trajectories under
# assets/motions/) — replaces the high-level ExecuteAction path.
if MOTION_ENABLED:
try:
from robot.arm_replay import ArmReplayer
self._arm_replayer = ArmReplayer()
print(f"[BRIDGE {_ts()}] ArmReplayer ready "
f"(unsafe_file={MOTION_UNSAFE_FILE}, "
f"home_file={MOTION_HOME_FILE}, speed={MOTION_SPEED})",
flush=True)
except Exception as e:
print(f"[BRIDGE {_ts()}][WARN] ArmReplayer init failed: {e}",
flush=True)
self._tts_worker_thread = threading.Thread(
target=self._tts_worker_loop, name="TtsWorker", daemon=True,
)
self._tts_worker_thread.start()
if want_lowstate:
try:
from unitree_sdk2py.core.channel import ChannelSubscriber
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_
from robot.controller import LowStateHub
self.hub = LowStateHub(watchdog_timeout=0.25)
self._lowstate_sub = ChannelSubscriber("rt/lowstate", LowState_)
self._lowstate_sub.Init(self.hub.handler, 10)
print(f"[BRIDGE {_ts()}] Subscribed to rt/lowstate (wireless remote)",
flush=True)
except Exception as e:
print(f"[BRIDGE {_ts()}][WARN] LowState subscribe failed: {e}", flush=True)
print(f"[BRIDGE {_ts()}][WARN] Trigger keys (R2+X / R2+Y) will not work.",
flush=True)
self.hub = None
# ── Public API ──────────────────────────────────────────────────────────
def speak(self, text: str,
category: Optional[str] = None, key: Optional[str] = None):
"""Non-blocking — announcement request for the worker thread.
Freshness policy: a new announcement **cancels** any in-progress play
and **replaces** any queued item. This keeps audio synchronised with
the latest event — otherwise a stale phrase from an earlier event
could finish playing AFTER the newer arm motion has completed.
"""
if self.dry_run:
print(f"[BRIDGE {_ts()}] (dry) would speak({text!r}, category={category!r}, "
f"key={key!r})", flush=True)
return
if self.audio_client is None:
return
item: QueueItem = (text, category, key)
# Drop adjacent duplicates (same text + routing).
if self._tts_queue and self._tts_queue[-1] == item:
return
# Freshness: throw away any stale queued item and cancel the current
# play so the new item takes over immediately.
self._tts_queue.clear()
if self._player is not None:
self._player.cancel()
self._tts_queue.append(item)
self._audio_idle.clear()
self._tts_event.set()
def wait_for_audio_done(self, timeout: float = 15.0) -> bool:
"""Block until the worker has drained the queue AND the current play
is finished. Returns True if audio finished, False on timeout.
Used by callers that need to ensure no audio is in flight before
issuing a low-level arm command (avoids firmware bus contention).
"""
return self._audio_idle.wait(timeout=timeout)
def shutdown_tts(self):
self._tts_worker_stop.set()
self._tts_event.set()
if self._tts_worker_thread is not None:
self._tts_worker_thread.join(timeout=1.0)
# ── Worker thread ───────────────────────────────────────────────────────
def _tts_worker_loop(self):
while not self._tts_worker_stop.is_set():
if not self._tts_queue:
# Queue drained AND no dispatch in progress → audio is idle.
self._audio_idle.set()
self._tts_event.wait(timeout=0.2)
self._tts_event.clear()
continue
try:
item = self._tts_queue.popleft()
except IndexError:
continue
self._dispatch(*item)
def _dispatch(self, text: str, category: Optional[str], key: Optional[str]):
"""Route one queue item to PlayStream or TtsMaker per ``tts.mode``."""
wants_clip = (
TTS_MODE in ("recorded_or_tts", "recorded_only")
and category is not None and key is not None
and self._player is not None
and self._player.has(category, key)
)
if wants_clip:
print(f"[BRIDGE {_ts()}] play -> {category}/{key!r} "
f"(text={text!r})", flush=True)
call_t0 = time.monotonic()
ok = self._player.play(category, key)
dt = time.monotonic() - call_t0
if ok:
print(f"[BRIDGE {_ts()}] play done ({dt*1000:.0f} ms)", flush=True)
return
# Play failed. Decide by mode whether to fall back to TtsMaker.
if TTS_MODE == "recorded_only":
print(f"[BRIDGE {_ts()}][WARN] play failed and tts.mode=recorded_only "
f"— dropping phrase silently", flush=True)
return
print(f"[BRIDGE {_ts()}][WARN] play failed; falling back to TtsMaker",
flush=True)
# fall through to TtsMaker
if TTS_MODE == "recorded_only":
# No clip exists for this phrase and user opted out of TtsMaker.
print(f"[BRIDGE {_ts()}] skip (recorded_only, no clip for "
f"{category}/{key!r}): {text!r}", flush=True)
return
self._speak_blocking(text)
# ── TtsMaker path (fallback + legacy) ───────────────────────────────────
def _estimate_tts_seconds(self, text: str) -> float:
base = max(TTS_MIN_SECONDS, len(text) * TTS_SECONDS_PER_CHAR)
return base * self._tts_busy_factor
def _tts_maker_call(self, text: str, speaker_id: int) -> int:
"""Bypass the SDK's broken TtsMaker (see ROBOT_API_ID_AUDIO_TTS note)
and call the underlying RPC with a real incrementing index."""
import json
self._tts_index += 1
param = json.dumps({
"index": self._tts_index,
"text": text,
"speaker_id": speaker_id,
})
code, _ = self.audio_client._Call(ROBOT_API_ID_AUDIO_TTS, param)
return code
def _speak_blocking(self, text: str):
if self.audio_client is None:
return
now = time.monotonic()
gap_since_last = (now - self._tts_last_call_t) if self._tts_last_call_t else -1.0
est = self._estimate_tts_seconds(text)
qsize = len(self._tts_queue)
self._tts_call_count += 1
gap_str = f"{gap_since_last:5.2f}s" if gap_since_last >= 0 else " n/a"
print(
f"[BRIDGE {_ts()}] tts -> {text!r} "
f"(est={est:.2f}s, gap={gap_str}, busy_x={self._tts_busy_factor:.2f}, "
f"q={qsize}, idx={self._tts_index + 1})",
flush=True,
)
# Firmware reset: AUDIO_STOP_PLAY clears any residual audio stream
# state from the prior TtsMaker/PlayStream, then a 300 ms settle
# before the new request. Without this the firmware time-outs the
# next call with rc=3104 after successful-then-parallel-arm events.
# See G1_Lootah/Audio_Recorder/voice_note.txt line 11.
import json as _json
stop_t0 = time.monotonic()
try:
self.audio_client._Call(
ROBOT_API_ID_AUDIO_STOP_PLAY,
_json.dumps({"app_name": AUDIO_STOP_APP_NAME}),
)
except Exception as e:
print(f"[BRIDGE {_ts()}][WARN] AUDIO_STOP_PLAY raised: {e}", flush=True)
stop_dt = time.monotonic() - stop_t0
if stop_dt > 0.5:
print(f"[BRIDGE {_ts()}][WARN] AUDIO_STOP_PLAY slow "
f"({stop_dt*1000:.0f}ms) — firmware busy", flush=True)
time.sleep(AUDIO_STOP_SETTLE_S)
code = -1
call_dt = 0.0
total_t0 = time.monotonic()
for attempt in range(1, TTS_RETRY_MAX_ATTEMPTS + 1):
call_t0 = time.monotonic()
try:
code = self._tts_maker_call(text, self.tts_speaker_id)
except Exception as e:
print(f"[BRIDGE {_ts()}][ERR] TtsMaker raised: {e}", flush=True)
return
call_dt = time.monotonic() - call_t0
if code == 0:
if attempt > 1:
print(f"[BRIDGE {_ts()}] TtsMaker recovered on attempt "
f"{attempt}/{TTS_RETRY_MAX_ATTEMPTS}", flush=True)
break
if attempt < TTS_RETRY_MAX_ATTEMPTS:
print(f"[BRIDGE {_ts()}][WARN] TtsMaker rc={code} "
f"(call {call_dt*1000:.0f}ms) — retrying in "
f"{TTS_RETRY_DELAY_S:.1f}s ({attempt}/"
f"{TTS_RETRY_MAX_ATTEMPTS})", flush=True)
time.sleep(TTS_RETRY_DELAY_S)
total_dt = time.monotonic() - total_t0
if code != 0:
self._tts_busy_count += 1
self._tts_busy_factor = min(
TTS_BUSY_FACTOR_MAX, self._tts_busy_factor * TTS_BUSY_FACTOR_UP
)
print(
f"[BRIDGE {_ts()}][WARN] TtsMaker rc={code} after "
f"{TTS_RETRY_MAX_ATTEMPTS} attempt(s) ({total_dt*1000:.0f}ms "
f"total; busy_x -> {self._tts_busy_factor:.2f}) — phrase dropped",
flush=True,
)
else:
self._tts_busy_factor = max(
TTS_BUSY_FACTOR_MIN, self._tts_busy_factor * TTS_BUSY_FACTOR_DOWN
)
self._tts_last_call_t = time.monotonic()
remaining = est - total_dt
if remaining > 0:
time.sleep(remaining)
# ── Arm ─────────────────────────────────────────────────────────────────
def reject(self, release_after: float):
"""Trigger the UNSAFE arm motion.
If a recorded motion is configured (``robot.motion.enabled`` + a
readable JSONL under ``assets/motions/``), play it via
``ArmReplayer`` — that's the teach-and-replay trajectory. Otherwise
fall back to the legacy high-level ``ExecuteAction('reject')`` +
optional ``'release arm'`` pair.
"""
if self.dry_run:
if self._arm_replayer is not None or MOTION_ENABLED:
print(f"[BRIDGE {_ts()}] (dry) would play motion "
f"{MOTION_UNSAFE_FILE}", flush=True)
else:
print(f"[BRIDGE {_ts()}] (dry) would run '{REJECT_ACTION}' "
f"then release after {release_after:.1f}s", flush=True)
return
# Preferred path: play a recorded motion.
if self._arm_replayer is not None:
from core.paths import PROJECT_ROOT
motions_dir = PROJECT_ROOT / "assets" / "motions"
motion_path = motions_dir / MOTION_UNSAFE_FILE
home_path = motions_dir / MOTION_HOME_FILE
if not motion_path.exists() or not home_path.exists():
print(f"[BRIDGE {_ts()}][WARN] motion files missing under "
f"{motions_dir} — skipping arm motion", flush=True)
return
print(f"[BRIDGE {_ts()}] -> play_motion {MOTION_UNSAFE_FILE}",
flush=True)
t0 = time.monotonic()
try:
self._arm_replayer.play(motion_path, home_path, speed=MOTION_SPEED)
except Exception as e:
print(f"[BRIDGE {_ts()}][ERR] play_motion failed: {e}",
flush=True)
return
dt = time.monotonic() - t0
print(f"[BRIDGE {_ts()}] motion done ({dt*1000:.0f} ms)", flush=True)
return
# Legacy fallback: high-level reject + release arm action.
if self.arm_client is None or self._action_map is None:
return
if REJECT_ACTION not in self._action_map:
print(f"[BRIDGE {_ts()}][ERR] '{REJECT_ACTION}' not in SDK action_map",
flush=True)
return
print(f"[BRIDGE {_ts()}] -> {REJECT_ACTION}", flush=True)
self.arm_client.ExecuteAction(self._action_map[REJECT_ACTION])
if release_after > 0:
time.sleep(release_after)
print(f"[BRIDGE {_ts()}] -> {RELEASE_ACTION}", flush=True)
self.arm_client.ExecuteAction(self._action_map[RELEASE_ACTION])