Marcus/Core/motion_state.py

47 lines
1.7 KiB
Python

"""
motion_state.py — shared motion-control signals.
Three module-level threading.Events used by Voice/marcus_voice.py to
control in-progress motion. Producers are VoiceModule's motion worker;
consumers are brain motion loops in Brain/command_parser.py,
Brain/executor.py, Navigation/marcus_odometry.py, Navigation/patrol.py.
motion_abort — hard stop. Loops break out and run gradual_stop().
Set by 'stop'/'cancel'/etc. Cleared by the worker
at the start of each new non-stop command, so a
stale set never kills the next motion.
motion_pause — soft hold. Loops do NOT break; they hold zero
velocity and wait for the event to clear, then
resume the same loop iteration where they paused.
While paused they ALSO check motion_abort each
~50ms — a pause-then-stop combo cleanly aborts.
The helper `wait_while_paused` centralizes the polling loop so brain
sites don't repeat the same stanza:
if motion_abort.is_set(): break
wait_while_paused()
if motion_abort.is_set(): break
send_vel(...)
time.sleep(0.05)
Single source of truth — placed in Core/ so both Voice/* and Brain/*
can import without circular deps.
"""
from __future__ import annotations
import threading
import time
motion_abort: threading.Event = threading.Event()
motion_pause: threading.Event = threading.Event()
def wait_while_paused(poll_sec: float = 0.05) -> None:
"""Block while motion_pause is set. Returns when pause clears OR
motion_abort fires (so the caller can immediately bail out via
its existing motion_abort check)."""
while motion_pause.is_set() and not motion_abort.is_set():
time.sleep(poll_sec)