332 lines
12 KiB
Python
332 lines
12 KiB
Python
"""Behaviour tests for :class:`gowelcome.statemachine.GoWelcomeStateMachine`.
|
|
|
|
Driven entirely off-robot with a recording fake ``RobotInterface`` and a fake
|
|
perception object exposing ``latest()``. Stdlib only -- no cv2, ultralytics,
|
|
SDK or hardware. Crafted ``PerceptionResult`` snapshots use a ``ts`` close to
|
|
``time.monotonic()`` so the machine treats them as fresh.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import List, Optional, Tuple
|
|
|
|
from config import default_config
|
|
from gowelcome.statemachine import GoWelcomeStateMachine
|
|
from gowelcome.types import Detection, PerceptionResult, RoadInfo, State
|
|
from gowelcome.robot.interface import RobotInterface
|
|
|
|
|
|
FRAME_W = 640
|
|
FRAME_H = 480
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Test doubles
|
|
# --------------------------------------------------------------------------- #
|
|
class FakeRobot(RobotInterface):
|
|
"""Records every command so tests can assert on the issued behaviour."""
|
|
|
|
def __init__(self) -> None:
|
|
self.drives: List[Tuple[float, float, float]] = []
|
|
self.stops: int = 0
|
|
self.gestures: List[str] = []
|
|
self.greetings: int = 0
|
|
self.balance_stands: int = 0
|
|
self.stand_ups: int = 0
|
|
self.damps: int = 0
|
|
self.avoidance: Optional[bool] = None
|
|
self.shutdowns: int = 0
|
|
|
|
# --- perception input (unused by the SM; present to satisfy the ABC) ----
|
|
def get_frame(self):
|
|
return None
|
|
|
|
def frame_size(self) -> "tuple[int, int]":
|
|
return (FRAME_W, FRAME_H)
|
|
|
|
# --- locomotion ---------------------------------------------------------
|
|
def drive(self, vx: float, vy: float, vyaw: float) -> None:
|
|
self.drives.append((vx, vy, vyaw))
|
|
|
|
def stop(self) -> None:
|
|
self.stops += 1
|
|
|
|
def set_avoidance(self, on: bool) -> None:
|
|
self.avoidance = on
|
|
|
|
# --- posture / expression ----------------------------------------------
|
|
def balance_stand(self) -> None:
|
|
self.balance_stands += 1
|
|
|
|
def stand_up(self) -> None:
|
|
self.stand_ups += 1
|
|
|
|
def damp(self) -> None:
|
|
self.damps += 1
|
|
|
|
def gesture(self, name: str) -> None:
|
|
self.gestures.append(name)
|
|
|
|
# --- greeting payload ---------------------------------------------------
|
|
def play_greeting(self) -> None:
|
|
self.greetings += 1
|
|
|
|
# --- lifecycle ----------------------------------------------------------
|
|
def shutdown(self) -> None:
|
|
self.shutdowns += 1
|
|
|
|
|
|
class FakePerception:
|
|
"""Minimal stand-in for PerceptionThread serving a single snapshot.
|
|
|
|
By default each ``latest()`` re-stamps the snapshot's ``ts`` to "now" so it
|
|
reads as *fresh* on every tick -- mirroring a real perception thread that
|
|
keeps republishing. Pass ``keep_ts=True`` to leave ``ts`` untouched (used to
|
|
exercise the stale-perception path).
|
|
"""
|
|
|
|
def __init__(
|
|
self, result: Optional[PerceptionResult] = None, keep_ts: bool = False
|
|
) -> None:
|
|
self._result = result
|
|
self._keep_ts = keep_ts
|
|
|
|
def set(self, result: Optional[PerceptionResult]) -> None:
|
|
self._result = result
|
|
|
|
def latest(self) -> Optional[PerceptionResult]:
|
|
if self._result is not None and not self._keep_ts:
|
|
self._result.ts = time.monotonic()
|
|
return self._result
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Snapshot builders
|
|
# --------------------------------------------------------------------------- #
|
|
def _fresh_ts() -> float:
|
|
"""A timestamp the state machine will treat as fresh (within timeout)."""
|
|
return time.monotonic()
|
|
|
|
|
|
def _person_box(h_px: int, cx: float = FRAME_W / 2.0, conf: float = 0.95) -> Detection:
|
|
"""A centred person box of the requested pixel height."""
|
|
w_px = 80
|
|
return Detection(
|
|
label="person",
|
|
conf=conf,
|
|
x1=int(cx - w_px / 2),
|
|
y1=int(FRAME_H / 2 - h_px / 2),
|
|
x2=int(cx + w_px / 2),
|
|
y2=int(FRAME_H / 2 + h_px / 2),
|
|
)
|
|
|
|
|
|
def _result(
|
|
persons=None,
|
|
dangers=None,
|
|
road: Optional[RoadInfo] = None,
|
|
) -> PerceptionResult:
|
|
persons = persons or []
|
|
dangers = dangers or []
|
|
return PerceptionResult(
|
|
frame_w=FRAME_W,
|
|
frame_h=FRAME_H,
|
|
detections=list(persons) + list(dangers),
|
|
persons=list(persons),
|
|
dangers=list(dangers),
|
|
road=road,
|
|
ts=_fresh_ts(),
|
|
seq=1,
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Tests
|
|
# --------------------------------------------------------------------------- #
|
|
def test_no_person_wanders_and_drives():
|
|
"""With an empty fresh snapshot the machine wanders (drives forward)."""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
perc = FakePerception(_result())
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
state = sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert state is State.WANDER
|
|
assert robot.drives, "WANDER should issue a drive command"
|
|
vx, _vy, _vyaw = robot.drives[-1]
|
|
assert vx > 0.0 # cruising forward
|
|
|
|
|
|
def test_near_person_progresses_wander_to_approach_to_greet():
|
|
"""A big, confident, centred person -> APPROACH then eventually GREET."""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
|
|
# Box tall enough to be 'arrived' immediately (>= stop_height_ratio).
|
|
arrived_h = int(cfg.servo.stop_height_ratio * FRAME_H) + 60
|
|
perc = FakePerception(_result(persons=[_person_box(arrived_h)]))
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
dt = 1.0 / cfg.loop.rate_hz
|
|
|
|
# First tick: a qualifying person -> APPROACH; arriving immediately also
|
|
# kicks the machine into GREET within the same handling.
|
|
state = sm.step(dt)
|
|
assert state in (State.APPROACH, State.GREET)
|
|
|
|
# Drive the machine forward in time until it greets, then completes.
|
|
greeted = False
|
|
deadline = time.monotonic() + cfg.greet.settle_time + \
|
|
len(cfg.greet.gestures) * cfg.greet.gesture_gap + 2.0
|
|
while time.monotonic() < deadline:
|
|
st = sm.step(dt)
|
|
if st is State.GREET:
|
|
greeted = True
|
|
if greeted and st is State.WANDER:
|
|
break
|
|
# Tiny real sleep so monotonic-gated greet sub-sequence advances.
|
|
time.sleep(0.01)
|
|
|
|
assert greeted, "machine should pass through GREET"
|
|
assert robot.greetings >= 1, "greeting audio should have been played"
|
|
assert robot.gestures, "at least one gesture should have been dispatched"
|
|
# The configured gestures are dispatched in order.
|
|
assert robot.gestures[: len(cfg.greet.gestures)] == list(cfg.greet.gestures)
|
|
|
|
|
|
def test_low_confidence_person_still_approaches_but_below_conf_is_handled():
|
|
"""Persons present -> APPROACH regardless (perception applies the conf gate).
|
|
|
|
The state machine treats any populated ``persons`` list as a target; the
|
|
confidence gate lives in the perception layer. A far (small) box should
|
|
APPROACH and drive forward, not greet.
|
|
"""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
far = _person_box(h_px=60) # ratio ~0.125, well below stop ratio
|
|
perc = FakePerception(_result(persons=[far]))
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
state = sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert state is State.APPROACH
|
|
vx, _vy, _vyaw = robot.drives[-1]
|
|
assert vx > 0.0 # approaching, not arrived
|
|
assert robot.greetings == 0
|
|
|
|
|
|
def test_danger_box_forces_avoid_danger():
|
|
"""A danger detection preempts everything -> AVOID_DANGER + reverse drive."""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
danger = Detection(label="car", conf=0.9, x1=200, y1=200, x2=440, y2=460)
|
|
road = RoadInfo(coverage=0.4, left=0.6, center=0.4, right=0.1)
|
|
perc = FakePerception(_result(dangers=[danger], road=road))
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
state = sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert state is State.AVOID_DANGER
|
|
# Backup phase reverses (vx < 0) while turning away.
|
|
vx, _vy, vyaw = robot.drives[-1]
|
|
assert vx < 0.0
|
|
# Road is heavy on the LEFT (right < left) -> clearer_side +1 (right clear).
|
|
# Steering AWAY from the road = turning right (CW) = negative yaw.
|
|
assert vyaw < 0.0
|
|
|
|
|
|
def test_avoid_turns_away_from_road_both_sides():
|
|
"""The escape pivot must always turn AWAY from the road-heavy side.
|
|
|
|
Regression for the clearer_side sign bug: road on the left -> turn right
|
|
(vyaw < 0); road on the right -> turn left (vyaw > 0).
|
|
"""
|
|
cfg = default_config()
|
|
danger = Detection(label="car", conf=0.9, x1=200, y1=200, x2=440, y2=460)
|
|
|
|
# Road heavy on the LEFT -> dog should turn RIGHT (negative yaw).
|
|
robot_l = FakeRobot()
|
|
road_left = RoadInfo(coverage=0.4, left=0.7, center=0.4, right=0.1)
|
|
sm_l = GoWelcomeStateMachine(
|
|
robot_l, FakePerception(_result(dangers=[danger], road=road_left)), cfg
|
|
)
|
|
sm_l.step(1.0 / cfg.loop.rate_hz)
|
|
assert sm_l.state is State.AVOID_DANGER
|
|
assert robot_l.drives[-1][2] < 0.0 # vyaw < 0 -> turn right, away from left road
|
|
|
|
# Road heavy on the RIGHT -> dog should turn LEFT (positive yaw).
|
|
robot_r = FakeRobot()
|
|
road_right = RoadInfo(coverage=0.4, left=0.1, center=0.4, right=0.7)
|
|
sm_r = GoWelcomeStateMachine(
|
|
robot_r, FakePerception(_result(dangers=[danger], road=road_right)), cfg
|
|
)
|
|
sm_r.step(1.0 / cfg.loop.rate_hz)
|
|
assert sm_r.state is State.AVOID_DANGER
|
|
assert robot_r.drives[-1][2] > 0.0 # vyaw > 0 -> turn left, away from right road
|
|
|
|
|
|
def test_danger_overrides_person():
|
|
"""When both a person and a danger are present, danger wins."""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
person = _person_box(int(cfg.servo.stop_height_ratio * FRAME_H) + 60)
|
|
danger = Detection(label="truck", conf=0.9, x1=100, y1=150, x2=420, y2=460)
|
|
perc = FakePerception(_result(persons=[person], dangers=[danger]))
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
state = sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert state is State.AVOID_DANGER
|
|
assert robot.greetings == 0
|
|
|
|
|
|
def test_road_coverage_triggers_avoid_danger():
|
|
"""High centre-road coverage (no boxes) still triggers AVOID_DANGER."""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
trig = cfg.perception.road_trigger_coverage
|
|
road = RoadInfo(coverage=0.9, left=0.9, center=trig + 0.1, right=0.2)
|
|
perc = FakePerception(_result(road=road))
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
state = sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert state is State.AVOID_DANGER
|
|
|
|
|
|
def test_stale_perception_stops_robot():
|
|
"""A snapshot older than the perception timeout -> stop(), no drive."""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
stale = _result(persons=[_person_box(200)])
|
|
# Backdate ts well beyond the timeout (keep_ts so latest() preserves it).
|
|
stale.ts = time.monotonic() - (cfg.safety.perception_timeout + 5.0)
|
|
perc = FakePerception(stale, keep_ts=True)
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert robot.stops >= 1
|
|
assert not robot.drives # never moved on stale data
|
|
|
|
|
|
def test_none_perception_stops_robot():
|
|
"""A missing snapshot (latest() -> None) -> stop(), no drive."""
|
|
cfg = default_config()
|
|
robot = FakeRobot()
|
|
perc = FakePerception(None)
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert robot.stops >= 1
|
|
assert not robot.drives
|
|
|
|
|
|
def test_dry_run_never_drives():
|
|
"""In dry-run mode the machine logs but issues stop() instead of drive()."""
|
|
cfg = default_config()
|
|
cfg.dry_run = True
|
|
robot = FakeRobot()
|
|
perc = FakePerception(_result()) # would normally WANDER (drive forward)
|
|
sm = GoWelcomeStateMachine(robot, perc, cfg)
|
|
|
|
sm.step(1.0 / cfg.loop.rate_hz)
|
|
assert not robot.drives, "dry_run must suppress drive()"
|
|
assert robot.stops >= 1
|