528 lines
21 KiB
Python
528 lines
21 KiB
Python
"""The reactive behaviour brain for GoWelcome.
|
|
|
|
Owns high-level arbitration: it reads the latest perception snapshot and GPS
|
|
fix, decides which :class:`~gowelcome.types.State` should be active, and drives
|
|
the robot. Reactive and mapless -- every decision is a function of the most
|
|
recent perception + GPS plus a few monotonic timers.
|
|
|
|
Arbitration priority inside :meth:`step` (highest first):
|
|
e-stop / pause (dashboard) -- hard override, halt
|
|
stale perception -- halt until fresh frames
|
|
AVOID_DANGER (cars / road) -- immediate collision safety wins
|
|
GPS hold (no fix while geofencing) -- don't roam blind
|
|
finish an in-progress GREET -- brief, stationary
|
|
BOUNDARY (geofence) -- home back; never chase out of the area
|
|
APPROACH a person
|
|
WANDER + idle dog-play
|
|
|
|
The single public entry point is :meth:`step`, which runs one control tick and
|
|
never blocks. Dashboard control methods (``set_play_mode`` / ``set_paused`` /
|
|
``request_estop`` / ``set_geofence_center`` / ``status``) are thread-safe.
|
|
|
|
Velocity convention: vx forward+, vy left+, vyaw CCW/left+ (rad/s).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
import random
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
from config import GoWelcomeConfig
|
|
from gowelcome.types import Detection, PerceptionResult, State
|
|
from gowelcome.robot.interface import RobotInterface
|
|
from gowelcome.perception.vision_thread import PerceptionThread
|
|
from gowelcome.control.servoing import VisualServo
|
|
from gowelcome.geo import GeoFence, GpsSource
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_PLAY_MODES = ("calm", "moderate", "playful")
|
|
|
|
|
|
class GoWelcomeStateMachine:
|
|
"""Reactive five-state behaviour arbiter for the Go2 backyard greeter.
|
|
|
|
Args:
|
|
robot: Locomotion / posture / audio backend (real or mock).
|
|
perception: Source of the latest :class:`PerceptionResult` snapshot.
|
|
cfg: Fully-populated :class:`GoWelcomeConfig`.
|
|
gps_source: Optional GPS reader; when present and ``cfg.geofence.enabled``,
|
|
the geofence BOUNDARY behaviour is active.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
robot: RobotInterface,
|
|
perception: PerceptionThread,
|
|
cfg: GoWelcomeConfig,
|
|
gps_source: Optional[GpsSource] = None,
|
|
) -> None:
|
|
self._robot = robot
|
|
self._perception = perception
|
|
self._cfg = cfg
|
|
self._servo = VisualServo(cfg.servo)
|
|
|
|
self._state: State = State.WANDER
|
|
|
|
# --- geofence (optional) -----------------------------------------
|
|
self._gps = gps_source
|
|
self._fence: Optional[GeoFence] = (
|
|
GeoFence(cfg.geofence)
|
|
if (cfg.geofence.enabled and gps_source is not None)
|
|
else None
|
|
)
|
|
self._fix = None # latest valid GpsFix (or None)
|
|
self._gps_warned = False
|
|
|
|
# --- cross-state bookkeeping -------------------------------------
|
|
self._wander_t = 0.0
|
|
self._last_greet_time = float("-inf")
|
|
self._last_cmd = (0.0, 0.0, 0.0)
|
|
|
|
# --- APPROACH ----------------------------------------------------
|
|
self._lost_frames = 0
|
|
|
|
# --- GREET -------------------------------------------------------
|
|
self._greet_start = 0.0
|
|
self._greet_audio_played = False
|
|
self._greet_gestures_done = 0
|
|
|
|
# --- AVOID_DANGER ------------------------------------------------
|
|
self._avoid_start = 0.0
|
|
self._avoid_side = 1
|
|
self._avoid_clear_count = 0
|
|
|
|
# --- dog-play ----------------------------------------------------
|
|
self._play_until = 0.0 # while now < this, hold still and perform a trick
|
|
self._next_play_t = 0.0 # when to trigger the next trick
|
|
|
|
# --- runtime controls (dashboard) -------------------------------
|
|
self._ctrl_lock = threading.Lock()
|
|
self._play_mode = cfg.play.mode if cfg.play.mode in _PLAY_MODES else "moderate"
|
|
self._paused = False
|
|
self._estop = False
|
|
self._damped = False
|
|
|
|
self._schedule_next_play(time.monotonic())
|
|
logger.info(
|
|
"GoWelcomeStateMachine initialised (geofence=%s, play=%s)",
|
|
self._fence is not None, self._play_mode,
|
|
)
|
|
|
|
# ================================================================== #
|
|
# Public API
|
|
# ================================================================== #
|
|
@property
|
|
def state(self) -> State:
|
|
return self._state
|
|
|
|
def step(self, dt: float) -> State:
|
|
"""Run one control tick; returns the (possibly new) current state."""
|
|
now = time.monotonic()
|
|
|
|
# 0. Dashboard hard overrides.
|
|
if self._is_estopped():
|
|
if not self._damped:
|
|
self._robot.damp()
|
|
self._damped = True
|
|
self._command_stop()
|
|
return self._state
|
|
self._damped = False
|
|
if self._is_paused():
|
|
self._command_stop()
|
|
return self._state
|
|
|
|
perc = self._perception.latest()
|
|
|
|
# 1. Stale / missing perception -> halt, hold state.
|
|
if perc is None or (now - perc.ts) > self._cfg.safety.perception_timeout:
|
|
self._command_stop()
|
|
return self._state
|
|
|
|
# 2. Update the GPS fix + capture the geofence centre.
|
|
self._update_gps()
|
|
|
|
# 3. DANGER (cars / road) -- immediate collision safety, always wins.
|
|
danger = self._is_danger(perc)
|
|
if danger and self._state is not State.AVOID_DANGER:
|
|
self._set_state(State.AVOID_DANGER, perc=perc)
|
|
if self._state is State.AVOID_DANGER:
|
|
self._run_avoid(perc, now, dt)
|
|
return self._state
|
|
|
|
# 4. GPS gating: if geofencing but we don't know where we are, hold.
|
|
if self._fence is not None:
|
|
if not self._fence.has_center:
|
|
self._hold("waiting for GPS fix to set geofence centre")
|
|
return self._state
|
|
if self._fix is None and self._cfg.geofence.no_fix_behavior == "stop":
|
|
self._hold("GPS fix lost -> holding (geofence safety)")
|
|
return self._state
|
|
|
|
# 5. Finish an in-progress GREET (stationary, brief).
|
|
if self._state is State.GREET:
|
|
self._run_greet(now)
|
|
return self._state
|
|
|
|
# 6. BOUNDARY: near/over the geofence edge -> home back to centre.
|
|
if self._fence is not None and self._fix is not None and \
|
|
self._fence.breached(self._fix.lat, self._fix.lon):
|
|
if self._state is not State.BOUNDARY:
|
|
self._set_state(State.BOUNDARY, perc=perc)
|
|
if self._state is State.BOUNDARY:
|
|
self._run_boundary(now, dt)
|
|
return self._state
|
|
|
|
# 7. A qualifying person -> APPROACH (respecting the re-greet cooldown).
|
|
cooled = (now - self._last_greet_time) >= self._cfg.greet.cooldown
|
|
if perc.persons and cooled:
|
|
if self._state is not State.APPROACH:
|
|
self._set_state(State.APPROACH, perc=perc)
|
|
self._run_approach(perc, now, dt)
|
|
return self._state
|
|
|
|
# 8. Fallback -> WANDER (+ idle dog-play).
|
|
if self._state is not State.WANDER:
|
|
self._set_state(State.WANDER, perc=perc)
|
|
self._run_wander(now, dt, perc)
|
|
return self._state
|
|
|
|
# --- dashboard controls (thread-safe) -----------------------------
|
|
def set_play_mode(self, mode: str) -> bool:
|
|
"""Set the idle dog-play intensity (calm/moderate/playful)."""
|
|
mode = (mode or "").strip().lower()
|
|
if mode not in _PLAY_MODES:
|
|
return False
|
|
with self._ctrl_lock:
|
|
self._play_mode = mode
|
|
logger.info("play mode -> %s", mode)
|
|
return True
|
|
|
|
def play_mode(self) -> str:
|
|
with self._ctrl_lock:
|
|
return self._play_mode
|
|
|
|
def set_paused(self, paused: bool) -> None:
|
|
with self._ctrl_lock:
|
|
self._paused = bool(paused)
|
|
logger.info("paused -> %s", paused)
|
|
|
|
def request_estop(self) -> None:
|
|
with self._ctrl_lock:
|
|
self._estop = True
|
|
logger.warning("E-STOP requested")
|
|
|
|
def clear_estop(self) -> None:
|
|
with self._ctrl_lock:
|
|
self._estop = False
|
|
logger.info("E-STOP cleared")
|
|
|
|
def set_geofence_center(self, lat: Optional[float] = None,
|
|
lon: Optional[float] = None) -> bool:
|
|
"""Re-centre the geofence here (uses the current fix if lat/lon omitted)."""
|
|
if self._fence is None:
|
|
return False
|
|
if lat is None or lon is None:
|
|
if self._fix is None:
|
|
return False
|
|
lat, lon = self._fix.lat, self._fix.lon
|
|
self._fence.set_center(lat, lon)
|
|
logger.info("geofence centre set to (%.6f, %.6f)", lat, lon)
|
|
return True
|
|
|
|
def status(self) -> dict:
|
|
"""A JSON-serialisable snapshot for the dashboard (thread-safe reads)."""
|
|
fix = self._fix
|
|
snap = {
|
|
"state": self._state.value,
|
|
"play_mode": self.play_mode(),
|
|
"paused": self._is_paused(),
|
|
"estop": self._is_estopped(),
|
|
"last_cmd": {"vx": round(self._last_cmd[0], 3),
|
|
"vy": round(self._last_cmd[1], 3),
|
|
"vyaw": round(self._last_cmd[2], 3)},
|
|
"gps": None,
|
|
"geofence": None,
|
|
}
|
|
if fix is not None:
|
|
snap["gps"] = {"lat": fix.lat, "lon": fix.lon,
|
|
"sats": fix.num_sats, "hdop": fix.hdop,
|
|
"course_deg": fix.course_deg}
|
|
if self._fence is not None and self._fence.has_center:
|
|
g = {"center": self._fence.center, "radius_m": self._cfg.geofence.radius_m}
|
|
if fix is not None:
|
|
d = self._fence.distance_from_center(fix.lat, fix.lon)
|
|
g["distance_m"] = round(d, 1) if d is not None else None
|
|
g["inside"] = not self._fence.breached(fix.lat, fix.lon)
|
|
snap["geofence"] = g
|
|
return snap
|
|
|
|
# ================================================================== #
|
|
# Transition management
|
|
# ================================================================== #
|
|
def _set_state(self, new: State, perc: Optional[PerceptionResult] = None) -> None:
|
|
if new is self._state:
|
|
return
|
|
old = self._state
|
|
self._state = new
|
|
logger.info("State transition: %s -> %s", old.value, new.value)
|
|
now = time.monotonic()
|
|
|
|
if new is State.WANDER:
|
|
self._servo.reset()
|
|
self._lost_frames = 0
|
|
self._schedule_next_play(now)
|
|
elif new is State.APPROACH:
|
|
self._servo.reset()
|
|
self._lost_frames = 0
|
|
elif new is State.GREET:
|
|
self._greet_start = now
|
|
self._greet_audio_played = False
|
|
self._greet_gestures_done = 0
|
|
self._command_stop()
|
|
self._robot.balance_stand()
|
|
elif new is State.AVOID_DANGER:
|
|
self._avoid_start = now
|
|
self._avoid_clear_count = 0
|
|
if perc is not None and perc.road is not None:
|
|
self._avoid_side = -perc.road.clearer_side
|
|
else:
|
|
self._avoid_side = 1
|
|
elif new is State.BOUNDARY:
|
|
self._servo.reset()
|
|
|
|
# ================================================================== #
|
|
# Shared helpers
|
|
# ================================================================== #
|
|
def _is_danger(self, perc: PerceptionResult) -> bool:
|
|
if perc.dangers:
|
|
return True
|
|
road = perc.road
|
|
return road is not None and \
|
|
road.center >= self._cfg.perception.road_trigger_coverage
|
|
|
|
def _update_gps(self) -> None:
|
|
"""Refresh the latest valid fix and capture the geofence centre."""
|
|
if self._gps is None:
|
|
self._fix = None
|
|
return
|
|
fix = self._gps.latest()
|
|
self._fix = fix
|
|
if fix is not None and self._fence is not None:
|
|
self._fence.maybe_capture_center(fix.lat, fix.lon)
|
|
|
|
def _command_drive(self, vx: float, vy: float, vyaw: float) -> None:
|
|
"""Issue a locomotion command, honouring ``dry_run`` + feeding the GPS sim."""
|
|
self._last_cmd = (vx, vy, vyaw)
|
|
if self._cfg.dry_run:
|
|
logger.info("[dry_run] drive(vx=%.3f, vy=%.3f, vyaw=%.3f) suppressed",
|
|
vx, vy, vyaw)
|
|
self._robot.stop()
|
|
eff = (0.0, 0.0, 0.0)
|
|
else:
|
|
self._robot.drive(vx, vy, vyaw)
|
|
eff = (vx, vy, vyaw)
|
|
if self._gps is not None:
|
|
try:
|
|
self._gps.on_command(*eff)
|
|
except Exception: # pragma: no cover
|
|
pass
|
|
|
|
def _command_stop(self) -> None:
|
|
self._last_cmd = (0.0, 0.0, 0.0)
|
|
self._robot.stop()
|
|
if self._gps is not None:
|
|
try:
|
|
self._gps.on_command(0.0, 0.0, 0.0)
|
|
except Exception: # pragma: no cover
|
|
pass
|
|
|
|
def _hold(self, why: str) -> None:
|
|
if not self._gps_warned:
|
|
logger.warning("%s", why)
|
|
self._gps_warned = True
|
|
self._command_stop()
|
|
|
|
def _is_paused(self) -> bool:
|
|
with self._ctrl_lock:
|
|
return self._paused
|
|
|
|
def _is_estopped(self) -> bool:
|
|
with self._ctrl_lock:
|
|
return self._estop
|
|
|
|
# ================================================================== #
|
|
# WANDER (+ dog-play)
|
|
# ================================================================== #
|
|
def _run_wander(self, now: float, dt: float,
|
|
perc: Optional[PerceptionResult] = None) -> None:
|
|
# While performing a trick, hold still so the gesture completes.
|
|
if now < self._play_until:
|
|
self._command_stop()
|
|
return
|
|
self._wander_t += dt
|
|
wc = self._cfg.wander
|
|
vx = wc.forward_speed
|
|
phase = 2.0 * math.pi * self._wander_t / wc.yaw_sweep_period
|
|
vyaw = wc.yaw_sweep_rate * math.sin(phase)
|
|
|
|
# Vision-only area containment: veer away from the road/cars and slow
|
|
# down BEFORE reaching the hard AVOID_DANGER trigger, so the dog keeps
|
|
# its distance from the road rather than only reacting at the edge.
|
|
yaw_bias, speed_scale = self._road_car_steer(perc)
|
|
vx *= speed_scale
|
|
vyaw += yaw_bias
|
|
|
|
self._command_drive(vx, 0.0, vyaw)
|
|
# Only do idle tricks when not actively keeping away from road/cars.
|
|
if yaw_bias == 0.0 and speed_scale >= 0.99:
|
|
self._maybe_play(now)
|
|
|
|
def _road_car_steer(self, perc: Optional[PerceptionResult]):
|
|
"""Soft 'keep away from road/cars' steer for WANDER.
|
|
|
|
Returns ``(yaw_bias_rad_s, forward_speed_scale)``. The road repulsion
|
|
veers toward the clearer side and slows as pavement fills the view; the
|
|
car repulsion veers away from the nearest detected vehicle (even far
|
|
ones the hard AVOID_DANGER ignores). ``+vyaw`` = turn left/CCW.
|
|
"""
|
|
ac = self._cfg.avoid
|
|
if not ac.soft_avoid_enabled or perc is None:
|
|
return 0.0, 1.0
|
|
yaw_bias = 0.0
|
|
speed_scale = 1.0
|
|
|
|
road = perc.road
|
|
if road is not None and \
|
|
max(road.left, road.center, road.right) >= ac.soft_road_coverage:
|
|
# road heavier on the left -> (right-left) < 0 -> negative yaw = turn right.
|
|
yaw_bias += ac.road_repulsion_gain * (road.right - road.left)
|
|
speed_scale *= max(0.15, 1.0 - ac.road_slowdown_gain * road.center)
|
|
|
|
cars = [d for d in perc.detections
|
|
if d.label in self._cfg.perception.danger_classes]
|
|
if cars:
|
|
car = max(cars, key=lambda d: d.area) # most prominent / nearest
|
|
# car on the right (offset>0) -> positive yaw = turn left, away from it.
|
|
yaw_bias += ac.car_repulsion_gain * car.horizontal_offset(perc.frame_w)
|
|
speed_scale *= max(0.3, 1.0 - car.height_ratio(perc.frame_h))
|
|
|
|
return yaw_bias, speed_scale
|
|
|
|
def _maybe_play(self, now: float) -> None:
|
|
if self._cfg.dry_run or now < self._next_play_t:
|
|
return
|
|
actions = self._cfg.play.actions
|
|
if not actions:
|
|
return
|
|
action = actions[random.randrange(len(actions))]
|
|
logger.info("PLAY: %s", action)
|
|
self._robot.gesture(action)
|
|
self._play_until = now + self._cfg.play.action_hold
|
|
self._schedule_next_play(now + self._cfg.play.action_hold)
|
|
|
|
def _schedule_next_play(self, after: float) -> None:
|
|
pc = self._cfg.play
|
|
base = pc.interval_for(self.play_mode())
|
|
j = base * pc.jitter
|
|
self._next_play_t = after + base + random.uniform(-j, j)
|
|
|
|
# ================================================================== #
|
|
# APPROACH
|
|
# ================================================================== #
|
|
def _run_approach(self, perc: PerceptionResult, now: float, dt: float) -> None:
|
|
target: Optional[Detection] = perc.biggest_person()
|
|
if target is None:
|
|
self._lost_frames += 1
|
|
if self._lost_frames > self._cfg.loop.person_lost_frames:
|
|
logger.info("APPROACH lost person -> WANDER")
|
|
self._set_state(State.WANDER, perc=perc)
|
|
self._run_wander(now, dt, perc)
|
|
return
|
|
self._command_drive(0.0, 0.0, 0.0)
|
|
return
|
|
self._lost_frames = 0
|
|
vx, vyaw, arrived = self._servo.compute(target, perc.frame_w, perc.frame_h, dt)
|
|
self._command_drive(vx, 0.0, vyaw)
|
|
if arrived:
|
|
logger.info("APPROACH arrived -> GREET")
|
|
self._set_state(State.GREET, perc=perc)
|
|
self._run_greet(time.monotonic())
|
|
|
|
# ================================================================== #
|
|
# GREET
|
|
# ================================================================== #
|
|
def _greet_total_duration(self) -> float:
|
|
gc = self._cfg.greet
|
|
return gc.settle_time + len(gc.gestures) * gc.gesture_gap
|
|
|
|
def _run_greet(self, now: float) -> None:
|
|
gc = self._cfg.greet
|
|
elapsed = now - self._greet_start
|
|
if not self._greet_audio_played:
|
|
self._robot.play_greeting()
|
|
self._greet_audio_played = True
|
|
while self._greet_gestures_done < len(gc.gestures):
|
|
due_at = gc.settle_time + self._greet_gestures_done * gc.gesture_gap
|
|
if elapsed < due_at:
|
|
break
|
|
name = gc.gestures[self._greet_gestures_done]
|
|
logger.info("GREET gesture %r", name)
|
|
self._robot.gesture(name)
|
|
self._greet_gestures_done += 1
|
|
if self._greet_gestures_done >= len(gc.gestures) and \
|
|
elapsed >= self._greet_total_duration():
|
|
self._last_greet_time = now
|
|
logger.info("GREET complete -> WANDER (cooldown %.1fs)", gc.cooldown)
|
|
self._set_state(State.WANDER, perc=None)
|
|
|
|
# ================================================================== #
|
|
# AVOID_DANGER
|
|
# ================================================================== #
|
|
def _run_avoid(self, perc: PerceptionResult, now: float, dt: float) -> None:
|
|
ac = self._cfg.avoid
|
|
elapsed = now - self._avoid_start
|
|
if elapsed < ac.backup_duration:
|
|
self._command_drive(-ac.backup_speed, 0.0, self._avoid_side * ac.turn_rate)
|
|
else:
|
|
self._command_drive(0.0, 0.0, self._avoid_side * ac.turn_rate)
|
|
if self._is_danger(perc):
|
|
self._avoid_clear_count = 0
|
|
else:
|
|
self._avoid_clear_count += 1
|
|
if self._avoid_clear_count >= ac.clear_frames:
|
|
logger.info("AVOID_DANGER clear -> WANDER")
|
|
self._set_state(State.WANDER, perc=perc)
|
|
self._run_wander(now, dt, perc)
|
|
|
|
# ================================================================== #
|
|
# BOUNDARY (geofence homing)
|
|
# ================================================================== #
|
|
def _run_boundary(self, now: float, dt: float) -> None:
|
|
fix = self._fix
|
|
if fix is None or self._fence is None:
|
|
# Lost GPS mid-return: hold (don't roam blind near the edge).
|
|
self._command_stop()
|
|
return
|
|
if self._fence.cleared(fix.lat, fix.lon):
|
|
logger.info("BOUNDARY cleared -> WANDER")
|
|
self._set_state(State.WANDER, perc=None)
|
|
self._run_wander(now, dt)
|
|
return
|
|
gc = self._cfg.geofence
|
|
# GPS course-over-ground is pure noise at low ground speed, so a stale/
|
|
# garbage course could momentarily steer the wrong way. Below the speed
|
|
# threshold, hand homing_yaw course=None -> its gentle re-acquire turn,
|
|
# which drives forward to regain a valid track instead of trusting junk.
|
|
course = (
|
|
fix.course_deg
|
|
if (fix.speed_mps is not None and fix.speed_mps >= gc.min_speed_for_course)
|
|
else None
|
|
)
|
|
vyaw = self._fence.homing_yaw(fix.lat, fix.lon, course)
|
|
self._command_drive(gc.return_speed, 0.0, vyaw)
|