GoWelcome/tests/test_statemachine.py

332 lines
12 KiB
Python

"""Behaviour tests for :class:`gowelcome.statemachine.GoWelcomeStateMachine`.
Driven entirely off-robot with a recording fake ``RobotInterface`` and a fake
perception object exposing ``latest()``. Stdlib only -- no cv2, ultralytics,
SDK or hardware. Crafted ``PerceptionResult`` snapshots use a ``ts`` close to
``time.monotonic()`` so the machine treats them as fresh.
"""
from __future__ import annotations
import time
from typing import List, Optional, Tuple
from config import default_config
from gowelcome.statemachine import GoWelcomeStateMachine
from gowelcome.types import Detection, PerceptionResult, RoadInfo, State
from gowelcome.robot.interface import RobotInterface
FRAME_W = 640
FRAME_H = 480
# --------------------------------------------------------------------------- #
# Test doubles
# --------------------------------------------------------------------------- #
class FakeRobot(RobotInterface):
"""Records every command so tests can assert on the issued behaviour."""
def __init__(self) -> None:
self.drives: List[Tuple[float, float, float]] = []
self.stops: int = 0
self.gestures: List[str] = []
self.greetings: int = 0
self.balance_stands: int = 0
self.stand_ups: int = 0
self.damps: int = 0
self.avoidance: Optional[bool] = None
self.shutdowns: int = 0
# --- perception input (unused by the SM; present to satisfy the ABC) ----
def get_frame(self):
return None
def frame_size(self) -> "tuple[int, int]":
return (FRAME_W, FRAME_H)
# --- locomotion ---------------------------------------------------------
def drive(self, vx: float, vy: float, vyaw: float) -> None:
self.drives.append((vx, vy, vyaw))
def stop(self) -> None:
self.stops += 1
def set_avoidance(self, on: bool) -> None:
self.avoidance = on
# --- posture / expression ----------------------------------------------
def balance_stand(self) -> None:
self.balance_stands += 1
def stand_up(self) -> None:
self.stand_ups += 1
def damp(self) -> None:
self.damps += 1
def gesture(self, name: str) -> None:
self.gestures.append(name)
# --- greeting payload ---------------------------------------------------
def play_greeting(self) -> None:
self.greetings += 1
# --- lifecycle ----------------------------------------------------------
def shutdown(self) -> None:
self.shutdowns += 1
class FakePerception:
"""Minimal stand-in for PerceptionThread serving a single snapshot.
By default each ``latest()`` re-stamps the snapshot's ``ts`` to "now" so it
reads as *fresh* on every tick -- mirroring a real perception thread that
keeps republishing. Pass ``keep_ts=True`` to leave ``ts`` untouched (used to
exercise the stale-perception path).
"""
def __init__(
self, result: Optional[PerceptionResult] = None, keep_ts: bool = False
) -> None:
self._result = result
self._keep_ts = keep_ts
def set(self, result: Optional[PerceptionResult]) -> None:
self._result = result
def latest(self) -> Optional[PerceptionResult]:
if self._result is not None and not self._keep_ts:
self._result.ts = time.monotonic()
return self._result
# --------------------------------------------------------------------------- #
# Snapshot builders
# --------------------------------------------------------------------------- #
def _fresh_ts() -> float:
"""A timestamp the state machine will treat as fresh (within timeout)."""
return time.monotonic()
def _person_box(h_px: int, cx: float = FRAME_W / 2.0, conf: float = 0.95) -> Detection:
"""A centred person box of the requested pixel height."""
w_px = 80
return Detection(
label="person",
conf=conf,
x1=int(cx - w_px / 2),
y1=int(FRAME_H / 2 - h_px / 2),
x2=int(cx + w_px / 2),
y2=int(FRAME_H / 2 + h_px / 2),
)
def _result(
persons=None,
dangers=None,
road: Optional[RoadInfo] = None,
) -> PerceptionResult:
persons = persons or []
dangers = dangers or []
return PerceptionResult(
frame_w=FRAME_W,
frame_h=FRAME_H,
detections=list(persons) + list(dangers),
persons=list(persons),
dangers=list(dangers),
road=road,
ts=_fresh_ts(),
seq=1,
)
# --------------------------------------------------------------------------- #
# Tests
# --------------------------------------------------------------------------- #
def test_no_person_wanders_and_drives():
"""With an empty fresh snapshot the machine wanders (drives forward)."""
cfg = default_config()
robot = FakeRobot()
perc = FakePerception(_result())
sm = GoWelcomeStateMachine(robot, perc, cfg)
state = sm.step(1.0 / cfg.loop.rate_hz)
assert state is State.WANDER
assert robot.drives, "WANDER should issue a drive command"
vx, _vy, _vyaw = robot.drives[-1]
assert vx > 0.0 # cruising forward
def test_near_person_progresses_wander_to_approach_to_greet():
"""A big, confident, centred person -> APPROACH then eventually GREET."""
cfg = default_config()
robot = FakeRobot()
# Box tall enough to be 'arrived' immediately (>= stop_height_ratio).
arrived_h = int(cfg.servo.stop_height_ratio * FRAME_H) + 60
perc = FakePerception(_result(persons=[_person_box(arrived_h)]))
sm = GoWelcomeStateMachine(robot, perc, cfg)
dt = 1.0 / cfg.loop.rate_hz
# First tick: a qualifying person -> APPROACH; arriving immediately also
# kicks the machine into GREET within the same handling.
state = sm.step(dt)
assert state in (State.APPROACH, State.GREET)
# Drive the machine forward in time until it greets, then completes.
greeted = False
deadline = time.monotonic() + cfg.greet.settle_time + \
len(cfg.greet.gestures) * cfg.greet.gesture_gap + 2.0
while time.monotonic() < deadline:
st = sm.step(dt)
if st is State.GREET:
greeted = True
if greeted and st is State.WANDER:
break
# Tiny real sleep so monotonic-gated greet sub-sequence advances.
time.sleep(0.01)
assert greeted, "machine should pass through GREET"
assert robot.greetings >= 1, "greeting audio should have been played"
assert robot.gestures, "at least one gesture should have been dispatched"
# The configured gestures are dispatched in order.
assert robot.gestures[: len(cfg.greet.gestures)] == list(cfg.greet.gestures)
def test_low_confidence_person_still_approaches_but_below_conf_is_handled():
"""Persons present -> APPROACH regardless (perception applies the conf gate).
The state machine treats any populated ``persons`` list as a target; the
confidence gate lives in the perception layer. A far (small) box should
APPROACH and drive forward, not greet.
"""
cfg = default_config()
robot = FakeRobot()
far = _person_box(h_px=60) # ratio ~0.125, well below stop ratio
perc = FakePerception(_result(persons=[far]))
sm = GoWelcomeStateMachine(robot, perc, cfg)
state = sm.step(1.0 / cfg.loop.rate_hz)
assert state is State.APPROACH
vx, _vy, _vyaw = robot.drives[-1]
assert vx > 0.0 # approaching, not arrived
assert robot.greetings == 0
def test_danger_box_forces_avoid_danger():
"""A danger detection preempts everything -> AVOID_DANGER + reverse drive."""
cfg = default_config()
robot = FakeRobot()
danger = Detection(label="car", conf=0.9, x1=200, y1=200, x2=440, y2=460)
road = RoadInfo(coverage=0.4, left=0.6, center=0.4, right=0.1)
perc = FakePerception(_result(dangers=[danger], road=road))
sm = GoWelcomeStateMachine(robot, perc, cfg)
state = sm.step(1.0 / cfg.loop.rate_hz)
assert state is State.AVOID_DANGER
# Backup phase reverses (vx < 0) while turning away.
vx, _vy, vyaw = robot.drives[-1]
assert vx < 0.0
# Road is heavy on the LEFT (right < left) -> clearer_side +1 (right clear).
# Steering AWAY from the road = turning right (CW) = negative yaw.
assert vyaw < 0.0
def test_avoid_turns_away_from_road_both_sides():
"""The escape pivot must always turn AWAY from the road-heavy side.
Regression for the clearer_side sign bug: road on the left -> turn right
(vyaw < 0); road on the right -> turn left (vyaw > 0).
"""
cfg = default_config()
danger = Detection(label="car", conf=0.9, x1=200, y1=200, x2=440, y2=460)
# Road heavy on the LEFT -> dog should turn RIGHT (negative yaw).
robot_l = FakeRobot()
road_left = RoadInfo(coverage=0.4, left=0.7, center=0.4, right=0.1)
sm_l = GoWelcomeStateMachine(
robot_l, FakePerception(_result(dangers=[danger], road=road_left)), cfg
)
sm_l.step(1.0 / cfg.loop.rate_hz)
assert sm_l.state is State.AVOID_DANGER
assert robot_l.drives[-1][2] < 0.0 # vyaw < 0 -> turn right, away from left road
# Road heavy on the RIGHT -> dog should turn LEFT (positive yaw).
robot_r = FakeRobot()
road_right = RoadInfo(coverage=0.4, left=0.1, center=0.4, right=0.7)
sm_r = GoWelcomeStateMachine(
robot_r, FakePerception(_result(dangers=[danger], road=road_right)), cfg
)
sm_r.step(1.0 / cfg.loop.rate_hz)
assert sm_r.state is State.AVOID_DANGER
assert robot_r.drives[-1][2] > 0.0 # vyaw > 0 -> turn left, away from right road
def test_danger_overrides_person():
"""When both a person and a danger are present, danger wins."""
cfg = default_config()
robot = FakeRobot()
person = _person_box(int(cfg.servo.stop_height_ratio * FRAME_H) + 60)
danger = Detection(label="truck", conf=0.9, x1=100, y1=150, x2=420, y2=460)
perc = FakePerception(_result(persons=[person], dangers=[danger]))
sm = GoWelcomeStateMachine(robot, perc, cfg)
state = sm.step(1.0 / cfg.loop.rate_hz)
assert state is State.AVOID_DANGER
assert robot.greetings == 0
def test_road_coverage_triggers_avoid_danger():
"""High centre-road coverage (no boxes) still triggers AVOID_DANGER."""
cfg = default_config()
robot = FakeRobot()
trig = cfg.perception.road_trigger_coverage
road = RoadInfo(coverage=0.9, left=0.9, center=trig + 0.1, right=0.2)
perc = FakePerception(_result(road=road))
sm = GoWelcomeStateMachine(robot, perc, cfg)
state = sm.step(1.0 / cfg.loop.rate_hz)
assert state is State.AVOID_DANGER
def test_stale_perception_stops_robot():
"""A snapshot older than the perception timeout -> stop(), no drive."""
cfg = default_config()
robot = FakeRobot()
stale = _result(persons=[_person_box(200)])
# Backdate ts well beyond the timeout (keep_ts so latest() preserves it).
stale.ts = time.monotonic() - (cfg.safety.perception_timeout + 5.0)
perc = FakePerception(stale, keep_ts=True)
sm = GoWelcomeStateMachine(robot, perc, cfg)
sm.step(1.0 / cfg.loop.rate_hz)
assert robot.stops >= 1
assert not robot.drives # never moved on stale data
def test_none_perception_stops_robot():
"""A missing snapshot (latest() -> None) -> stop(), no drive."""
cfg = default_config()
robot = FakeRobot()
perc = FakePerception(None)
sm = GoWelcomeStateMachine(robot, perc, cfg)
sm.step(1.0 / cfg.loop.rate_hz)
assert robot.stops >= 1
assert not robot.drives
def test_dry_run_never_drives():
"""In dry-run mode the machine logs but issues stop() instead of drive()."""
cfg = default_config()
cfg.dry_run = True
robot = FakeRobot()
perc = FakePerception(_result()) # would normally WANDER (drive forward)
sm = GoWelcomeStateMachine(robot, perc, cfg)
sm.step(1.0 / cfg.loop.rate_hz)
assert not robot.drives, "dry_run must suppress drive()"
assert robot.stops >= 1