498 lines
17 KiB
Python
Executable File
498 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""GoWelcome entrypoint.
|
|
|
|
Wires the perception thread, behaviour state machine, and robot interface
|
|
together and runs the fixed-rate control loop. Parses CLI flags into a
|
|
:class:`config.GoWelcomeConfig`, then:
|
|
|
|
build_robot -> balance_stand -> PerceptionThread.start ->
|
|
GoWelcomeStateMachine.step() at ``cfg.loop.rate_hz`` until Ctrl-C.
|
|
|
|
Heavy / optional dependencies (``cv2`` for the debug window) are imported
|
|
lazily so the module stays importable on a bare machine. Run ``--mock`` for a
|
|
webcam/video dry-run with no robot present.
|
|
|
|
Examples::
|
|
|
|
python main.py --mock --audio null --source 0 # off-robot
|
|
python main.py --interface eth0 # real Go2
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
from types import FrameType
|
|
from typing import Optional
|
|
|
|
# Make GoWelcome runnable from ANY working directory: ensure the project root
|
|
# (this file's directory) is importable so `import config` / `gowelcome` resolve
|
|
# regardless of where `python .../main.py` (or welcome.sh) is invoked from.
|
|
_PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
|
|
if _PROJECT_ROOT not in sys.path:
|
|
sys.path.insert(0, _PROJECT_ROOT)
|
|
|
|
from config import GoWelcomeConfig, default_config
|
|
|
|
log = logging.getLogger("gowelcome.main")
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
"""Construct the CLI parser for GoWelcome."""
|
|
p = argparse.ArgumentParser(
|
|
prog="gowelcome",
|
|
description="Autonomous Unitree Go2 backyard greeter (mapless, reactive).",
|
|
)
|
|
p.add_argument(
|
|
"--mock",
|
|
action="store_true",
|
|
help="Use the MockRobot backend (webcam/video, no hardware).",
|
|
)
|
|
p.add_argument(
|
|
"--transport",
|
|
type=str,
|
|
choices=("webrtc", "dds"),
|
|
default=None,
|
|
help="Robot transport: webrtc (default; app protocol, wifi, dog audio) "
|
|
"or dds (official unitree_sdk2py, wired/EDU).",
|
|
)
|
|
p.add_argument(
|
|
"--interface",
|
|
type=str,
|
|
default=None,
|
|
help="[dds] Host network interface to the Go2 (e.g. eth0, wlan0).",
|
|
)
|
|
p.add_argument(
|
|
"--robot-ip",
|
|
type=str,
|
|
default=None,
|
|
help="[webrtc] Robot IP for localsta (e.g. 192.168.1.50).",
|
|
)
|
|
p.add_argument(
|
|
"--serial",
|
|
type=str,
|
|
default=None,
|
|
help="[webrtc] Robot serial number (for discovery / remote).",
|
|
)
|
|
p.add_argument(
|
|
"--aes-key",
|
|
type=str,
|
|
default=None,
|
|
help="[webrtc] Per-device AES-128 key (32 hex), required on Go2 fw >= 1.1.15.",
|
|
)
|
|
p.add_argument(
|
|
"--connection",
|
|
type=str,
|
|
choices=("localsta", "localap", "remote"),
|
|
default=None,
|
|
help="[webrtc] Connection method (default localsta).",
|
|
)
|
|
p.add_argument(
|
|
"--audio-method",
|
|
type=str,
|
|
choices=("audiohub", "stream"),
|
|
default=None,
|
|
help="[webrtc] Greeting playback: audiohub (upload+play, default) or stream (live).",
|
|
)
|
|
p.add_argument(
|
|
"--device",
|
|
type=str,
|
|
default=None,
|
|
help="Inference device for YOLO (cpu, cuda, cuda:0).",
|
|
)
|
|
p.add_argument(
|
|
"--model",
|
|
type=str,
|
|
default=None,
|
|
help="Path to / name of the YOLO weights (ultralytics auto-downloads).",
|
|
)
|
|
p.add_argument(
|
|
"--source",
|
|
type=str,
|
|
default=None,
|
|
help="Mock frame source: webcam index or video file path.",
|
|
)
|
|
p.add_argument(
|
|
"--wav",
|
|
type=str,
|
|
default=None,
|
|
help="Path to the greeting .wav clip.",
|
|
)
|
|
p.add_argument(
|
|
"--audio",
|
|
type=str,
|
|
choices=("host", "go2", "null"),
|
|
default=None,
|
|
help="Audio backend: host (default), go2 (experimental), null (silent).",
|
|
)
|
|
p.add_argument(
|
|
"--audio-device",
|
|
type=str,
|
|
default=None,
|
|
help="Pin a PulseAudio sink for host audio (e.g. a USB/BT speaker on "
|
|
"the dog); plays via 'paplay --device=<sink>'. Empty -> default sink.",
|
|
)
|
|
p.add_argument(
|
|
"--no-avoidance",
|
|
action="store_true",
|
|
help="Disable the on-robot LiDAR firmware obstacle-avoidance layer.",
|
|
)
|
|
p.add_argument(
|
|
"--headless",
|
|
action="store_true",
|
|
help="Run without the cv2 debug window.",
|
|
)
|
|
p.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Perceive and decide, but never send a non-zero velocity.",
|
|
)
|
|
p.add_argument(
|
|
"--conf",
|
|
type=float,
|
|
default=None,
|
|
help="Person detection confidence threshold for APPROACH.",
|
|
)
|
|
p.add_argument(
|
|
"--web",
|
|
action="store_true",
|
|
help="Serve the live annotated camera as MJPEG over HTTP "
|
|
"(open http://<host-ip>:<port>/ in a browser).",
|
|
)
|
|
p.add_argument(
|
|
"--web-port",
|
|
type=int,
|
|
default=None,
|
|
help="Port for the --web dashboard (default 8080).",
|
|
)
|
|
p.add_argument(
|
|
"--geofence",
|
|
action="store_true",
|
|
help="Enable the OPTIONAL GPS keep-in-area geofence (needs an external "
|
|
"GPS receiver). Default is vision-only containment (road/car avoidance).",
|
|
)
|
|
p.add_argument(
|
|
"--gps",
|
|
type=str,
|
|
choices=("auto", "gpsd", "serial", "mock"),
|
|
default=None,
|
|
help="GPS source for --geofence (default auto; mock simulates one). "
|
|
"Implies --geofence.",
|
|
)
|
|
p.add_argument(
|
|
"--radius",
|
|
type=float,
|
|
default=None,
|
|
help="Geofence keep-in radius in metres (default 15). Implies --geofence.",
|
|
)
|
|
p.add_argument(
|
|
"--play",
|
|
type=str,
|
|
choices=("calm", "moderate", "playful"),
|
|
default=None,
|
|
help="Idle dog-play intensity (default moderate; changeable via dashboard).",
|
|
)
|
|
return p
|
|
|
|
|
|
def config_from_args(args: argparse.Namespace) -> GoWelcomeConfig:
|
|
"""Fold parsed CLI ``args`` into a fresh default config.
|
|
|
|
Only flags the user actually supplied override the dataclass defaults, so
|
|
``config.py`` remains the single source of truth for everything else.
|
|
"""
|
|
cfg = default_config()
|
|
|
|
if args.mock:
|
|
cfg.mock = True
|
|
if args.transport is not None:
|
|
cfg.transport = args.transport
|
|
if args.interface is not None:
|
|
cfg.network.interface = args.interface
|
|
if args.robot_ip is not None:
|
|
cfg.webrtc.ip = args.robot_ip
|
|
if args.serial is not None:
|
|
cfg.webrtc.serial_number = args.serial
|
|
if args.aes_key is not None:
|
|
cfg.webrtc.aes_128_key = args.aes_key
|
|
if args.connection is not None:
|
|
cfg.webrtc.connection_method = args.connection
|
|
if args.audio_method is not None:
|
|
cfg.webrtc.audio_method = args.audio_method
|
|
if args.device is not None:
|
|
cfg.perception.device = args.device
|
|
if args.model is not None:
|
|
cfg.perception.model_path = args.model
|
|
if args.source is not None:
|
|
cfg.camera.mock_source = args.source
|
|
if args.wav is not None:
|
|
cfg.greet.wav_path = args.wav
|
|
if args.audio is not None:
|
|
cfg.audio.backend = args.audio
|
|
if args.audio_device is not None:
|
|
cfg.audio.output_device = args.audio_device
|
|
if args.no_avoidance:
|
|
cfg.safety.use_lidar_avoidance = False
|
|
if args.headless:
|
|
cfg.headless = True
|
|
if args.dry_run:
|
|
cfg.dry_run = True
|
|
if args.conf is not None:
|
|
cfg.perception.person_conf = args.conf
|
|
if args.web:
|
|
cfg.web.enabled = True
|
|
if args.web_port is not None:
|
|
cfg.web.port = args.web_port
|
|
# GPS geofence is opt-in (default: vision-only). Any of these flags enable it.
|
|
if args.geofence or args.gps is not None or args.radius is not None:
|
|
cfg.gps.enabled = True
|
|
cfg.geofence.enabled = True
|
|
if args.gps is not None:
|
|
cfg.gps.source = args.gps
|
|
if args.radius is not None:
|
|
cfg.geofence.radius_m = args.radius
|
|
if args.play is not None:
|
|
cfg.play.mode = args.play
|
|
|
|
return cfg
|
|
|
|
|
|
def _draw_overlay(frame, result, state) -> None:
|
|
"""Annotate ``frame`` in place with detections, road coverage and state.
|
|
|
|
Lazily imports ``cv2``. Person boxes are green, danger boxes red. Best
|
|
effort only -- any drawing error is swallowed so the loop never dies on a
|
|
rendering issue.
|
|
"""
|
|
import cv2 # lazy: optional
|
|
|
|
try:
|
|
for det in getattr(result, "persons", []) or []:
|
|
cv2.rectangle(
|
|
frame, (int(det.x1), int(det.y1)), (int(det.x2), int(det.y2)),
|
|
(0, 255, 0), 2,
|
|
)
|
|
cv2.putText(
|
|
frame, f"person {det.conf:.2f}", (int(det.x1), max(0, int(det.y1) - 5)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA,
|
|
)
|
|
for det in getattr(result, "dangers", []) or []:
|
|
cv2.rectangle(
|
|
frame, (int(det.x1), int(det.y1)), (int(det.x2), int(det.y2)),
|
|
(0, 0, 255), 2,
|
|
)
|
|
cv2.putText(
|
|
frame, f"{det.label} {det.conf:.2f}",
|
|
(int(det.x1), max(0, int(det.y1) - 5)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA,
|
|
)
|
|
|
|
road = getattr(result, "road", None)
|
|
coverage = getattr(road, "coverage", 0.0) if road is not None else 0.0
|
|
cv2.putText(
|
|
frame, f"road: {coverage:.0%}", (10, 22),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA,
|
|
)
|
|
state_name = getattr(state, "name", str(state))
|
|
cv2.putText(
|
|
frame, f"state: {state_name}", (10, 46),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA,
|
|
)
|
|
except Exception: # pragma: no cover - drawing must never crash the loop
|
|
log.debug("overlay drawing failed", exc_info=True)
|
|
|
|
|
|
def run(cfg: GoWelcomeConfig) -> int:
|
|
"""Run the GoWelcome control loop until interrupted. Returns an exit code."""
|
|
# Imports deferred until after CLI parsing so ``--help`` works without the
|
|
# full (heavy) dependency tree being importable.
|
|
from gowelcome.perception import PerceptionThread
|
|
from gowelcome.robot.factory import build_robot
|
|
from gowelcome.statemachine import GoWelcomeStateMachine
|
|
from gowelcome.geo import build_gps_source
|
|
|
|
log.info("GoWelcome starting (mock=%s, dry_run=%s, headless=%s)",
|
|
cfg.mock, cfg.dry_run, cfg.headless)
|
|
|
|
# Pre-bind so the finally block can clean up whatever was acquired, even if
|
|
# setup raises partway through (never leave the dog standing with avoidance
|
|
# claimed and no shutdown).
|
|
robot = None
|
|
perception = None
|
|
gps_source = None
|
|
web_server = None
|
|
window_ready = False
|
|
|
|
stop_flag = {"stop": False}
|
|
|
|
def _handle_signal(signum: int, _frame: Optional[FrameType]) -> None:
|
|
# Only set the flag here; the actual shutdown happens in the finally
|
|
# block of the main loop (signal handlers must stay minimal).
|
|
log.warning("signal %s received -> stopping", signum)
|
|
stop_flag["stop"] = True
|
|
|
|
# Register handlers up front (only possible on the main thread).
|
|
try:
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
except ValueError: # pragma: no cover - run() invoked off the main thread
|
|
log.warning("signal handlers not installed (not on the main thread)")
|
|
|
|
try:
|
|
# --- resource acquisition (inside the try so finally always runs) ---
|
|
robot = build_robot(cfg)
|
|
perception = PerceptionThread(robot, cfg)
|
|
perception.start()
|
|
|
|
# GPS geofence (optional): build + start the source before the SM.
|
|
gps_source = build_gps_source(cfg)
|
|
if gps_source is not None:
|
|
gps_source.start()
|
|
log.info("GPS geofence active (source=%s, radius=%.1fm)",
|
|
cfg.gps.source, cfg.geofence.radius_m)
|
|
|
|
robot.balance_stand()
|
|
sm = GoWelcomeStateMachine(robot, perception, cfg, gps_source=gps_source)
|
|
|
|
def _control(cmd):
|
|
action = (cmd or {}).get("action")
|
|
if action == "play_mode":
|
|
ok = sm.set_play_mode(cmd.get("mode", ""))
|
|
return {"ok": ok, "play_mode": sm.play_mode()}
|
|
if action == "pause":
|
|
sm.set_paused(bool(cmd.get("paused", True)))
|
|
return {"ok": True}
|
|
if action == "resume":
|
|
sm.set_paused(False)
|
|
return {"ok": True}
|
|
if action == "estop":
|
|
sm.request_estop()
|
|
return {"ok": True}
|
|
if action == "clear_estop":
|
|
sm.clear_estop()
|
|
return {"ok": True}
|
|
if action == "set_center":
|
|
return {"ok": sm.set_geofence_center()}
|
|
return {"ok": False, "error": f"unknown action {action!r}"}
|
|
|
|
# Optional MJPEG web viewer: stream the latest annotated frame over HTTP.
|
|
if cfg.web.enabled:
|
|
def _jpeg_provider():
|
|
result = perception.latest()
|
|
frame = getattr(result, "frame", None) if result is not None else None
|
|
if frame is None:
|
|
return None
|
|
try:
|
|
import cv2 # lazy: optional
|
|
canvas = frame.copy()
|
|
_draw_overlay(canvas, result, sm.state)
|
|
ok, buf = cv2.imencode(
|
|
".jpg", canvas,
|
|
[cv2.IMWRITE_JPEG_QUALITY, int(cfg.web.jpeg_quality)],
|
|
)
|
|
return buf.tobytes() if ok else None
|
|
except Exception: # pragma: no cover - encode/display failure
|
|
return None
|
|
|
|
try:
|
|
from gowelcome.web import MjpegServer
|
|
web_server = MjpegServer(
|
|
_jpeg_provider, host=cfg.web.host, port=cfg.web.port,
|
|
fps=cfg.web.fps, title="GoWelcome",
|
|
status_provider=sm.status, control_handler=_control,
|
|
)
|
|
web_server.start()
|
|
log.info("dashboard: open http://<host-ip>:%d/ in a browser", cfg.web.port)
|
|
except Exception:
|
|
log.exception("failed to start dashboard; continuing without it")
|
|
web_server = None
|
|
|
|
rate = max(1e-3, float(cfg.loop.rate_hz))
|
|
period = 1.0 / rate
|
|
show_window = (not cfg.headless)
|
|
|
|
last = time.monotonic()
|
|
while not stop_flag["stop"]:
|
|
now = time.monotonic()
|
|
dt = now - last
|
|
last = now
|
|
|
|
state = sm.step(dt)
|
|
|
|
if show_window:
|
|
try:
|
|
result = perception.latest()
|
|
frame = getattr(result, "frame", None) if result is not None else None
|
|
if frame is not None:
|
|
import cv2 # lazy: optional
|
|
canvas = frame.copy()
|
|
_draw_overlay(canvas, result, state)
|
|
cv2.imshow("GoWelcome", canvas)
|
|
window_ready = True
|
|
if (cv2.waitKey(1) & 0xFF) == 27: # ESC
|
|
log.info("ESC pressed -> stopping")
|
|
stop_flag["stop"] = True
|
|
except Exception: # pragma: no cover - no display / no cv2
|
|
log.info("debug window unavailable; continuing headless",
|
|
exc_info=True)
|
|
show_window = False
|
|
|
|
# Sleep the remainder of the loop period.
|
|
elapsed = time.monotonic() - now
|
|
remaining = period - elapsed
|
|
if remaining > 0:
|
|
time.sleep(remaining)
|
|
except KeyboardInterrupt: # pragma: no cover - belt-and-braces
|
|
log.warning("KeyboardInterrupt -> stopping")
|
|
finally:
|
|
log.info("shutting down")
|
|
if web_server is not None:
|
|
try:
|
|
web_server.stop()
|
|
except Exception: # pragma: no cover
|
|
log.exception("web_server.stop() failed")
|
|
if perception is not None:
|
|
try:
|
|
perception.stop()
|
|
except Exception: # pragma: no cover
|
|
log.exception("perception.stop() failed")
|
|
if gps_source is not None:
|
|
try:
|
|
gps_source.stop()
|
|
except Exception: # pragma: no cover
|
|
log.exception("gps_source.stop() failed")
|
|
if robot is not None:
|
|
try:
|
|
robot.shutdown()
|
|
except Exception: # pragma: no cover
|
|
log.exception("robot.shutdown() failed")
|
|
if window_ready:
|
|
try:
|
|
import cv2 # lazy: optional
|
|
cv2.destroyAllWindows()
|
|
except Exception: # pragma: no cover
|
|
pass
|
|
|
|
log.info("GoWelcome stopped")
|
|
return 0
|
|
|
|
|
|
def main(argv: Optional[list[str]] = None) -> int:
|
|
"""Parse args, configure logging, and run. Returns a process exit code."""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)-7s %(name)s: %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
args = build_arg_parser().parse_args(argv)
|
|
cfg = config_from_args(args)
|
|
return run(cfg)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|