""" executor.py — Execute LLaVA movement decisions With LiDAR obstacle interrupt — stops immediately if obstacle detected. """ import time import threading from API.zmq_api import send_vel, gradual_stop, MOVE_MAP, STEP_PAUSE from API.arm_api import ALL_ARM_NAMES, do_arm from Core.motion_state import motion_abort, wait_while_paused def _obstacle_check(): """Check LiDAR safety — returns True if obstacle detected. Safe if LiDAR unavailable.""" try: from API.lidar_api import obstacle_ahead return obstacle_ahead() except ImportError: return False def execute_action(move: str, duration: float): """Execute a single movement step. Stops if LiDAR detects obstacle.""" if move in ALL_ARM_NAMES: do_arm(move) return if move == "stop" or move is None: gradual_stop() return if move in MOVE_MAP: vx, vy, vyaw = MOVE_MAP[move] t0 = time.time() while time.time() - t0 < duration: if motion_abort.is_set(): gradual_stop() print(" [executor] motion_abort set — stopping") return # Honour a soft pause: hold zero velocity and wait until # the user resumes (or aborts). wait_while_paused returns # immediately if not paused. Re-check abort right after. if motion_pause_check(): send_vel(0.0, 0.0, 0.0) wait_while_paused() if motion_abort.is_set(): gradual_stop(); return if _obstacle_check(): gradual_stop() print(" [Safety] LiDAR obstacle — stopping") return send_vel(vx, vy, vyaw) time.sleep(0.05) gradual_stop() time.sleep(STEP_PAUSE) else: print(f" Unknown move: '{move}' — skipping") def move_step(move: str, duration: float): """Lightweight step for goal/patrol loops. Stops if LiDAR detects obstacle.""" if move in MOVE_MAP: vx, vy, vyaw = MOVE_MAP[move] t0 = time.time() while time.time() - t0 < duration: if motion_abort.is_set(): send_vel(0.0, 0.0, 0.0) print(" [executor] motion_abort set — pausing step") return if motion_pause_check(): send_vel(0.0, 0.0, 0.0) wait_while_paused() if motion_abort.is_set(): send_vel(0.0, 0.0, 0.0); return if _obstacle_check(): send_vel(0.0, 0.0, 0.0) print(" [Safety] LiDAR obstacle — pausing step") return send_vel(vx, vy, vyaw) time.sleep(0.05) send_vel(0.0, 0.0, 0.0) time.sleep(0.1) def motion_pause_check() -> bool: """Returns True if a soft-pause is active. Tiny indirection so we don't have to import motion_pause directly into every motion site.""" from Core.motion_state import motion_pause return motion_pause.is_set() def merge_actions(actions: list) -> list: """Merge consecutive same-direction steps into one smooth movement.""" if not actions: return actions merged = [dict(actions[0])] for action in actions[1:]: if action.get("move") == merged[-1].get("move"): merged[-1]["duration"] = merged[-1].get("duration", 0) + action.get("duration", 0) else: merged.append(dict(action)) return merged def execute(d: dict): """Run full LLaVA decision — movements in sequence, then arm in background.""" if d.get("abort"): print(f" ABORT: {d['abort']}") gradual_stop() return speak = d.get("speak", "") actions = merge_actions(d.get("actions", [])) arm_cmd = d.get("arm", None) print(f"Sanad: {speak}") if not actions: gradual_stop() else: for i, action in enumerate(actions): move = action.get("move") dur = float(action.get("duration", 2.0)) print(f" Step {i+1}/{len(actions)}: {move} for {dur}s") execute_action(move, dur) if arm_cmd: print(f" Arm: {arm_cmd}") threading.Thread(target=do_arm, args=(arm_cmd,), daemon=True).start() return speak