#!/usr/bin/env python3 """GoWelcome entrypoint. Wires the perception thread, behaviour state machine, and robot interface together and runs the fixed-rate control loop. Parses CLI flags into a :class:`config.GoWelcomeConfig`, then: build_robot -> balance_stand -> PerceptionThread.start -> GoWelcomeStateMachine.step() at ``cfg.loop.rate_hz`` until Ctrl-C. Heavy / optional dependencies (``cv2`` for the debug window) are imported lazily so the module stays importable on a bare machine. Run ``--mock`` for a webcam/video dry-run with no robot present. Examples:: python main.py --mock --audio null --source 0 # off-robot python main.py --interface eth0 # real Go2 """ from __future__ import annotations import argparse import logging import os import signal import sys import time from types import FrameType from typing import Optional # Make GoWelcome runnable from ANY working directory: ensure the project root # (this file's directory) is importable so `import config` / `gowelcome` resolve # regardless of where `python .../main.py` (or welcome.sh) is invoked from. _PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from config import GoWelcomeConfig, default_config log = logging.getLogger("gowelcome.main") def build_arg_parser() -> argparse.ArgumentParser: """Construct the CLI parser for GoWelcome.""" p = argparse.ArgumentParser( prog="gowelcome", description="Autonomous Unitree Go2 backyard greeter (mapless, reactive).", ) p.add_argument( "--mock", action="store_true", help="Use the MockRobot backend (webcam/video, no hardware).", ) p.add_argument( "--transport", type=str, choices=("webrtc", "dds"), default=None, help="Robot transport: webrtc (default; app protocol, wifi, dog audio) " "or dds (official unitree_sdk2py, wired/EDU).", ) p.add_argument( "--interface", type=str, default=None, help="[dds] Host network interface to the Go2 (e.g. eth0, wlan0).", ) p.add_argument( "--robot-ip", type=str, default=None, help="[webrtc] Robot IP for localsta (e.g. 192.168.1.50).", ) p.add_argument( "--serial", type=str, default=None, help="[webrtc] Robot serial number (for discovery / remote).", ) p.add_argument( "--aes-key", type=str, default=None, help="[webrtc] Per-device AES-128 key (32 hex), required on Go2 fw >= 1.1.15.", ) p.add_argument( "--connection", type=str, choices=("localsta", "localap", "remote"), default=None, help="[webrtc] Connection method (default localsta).", ) p.add_argument( "--audio-method", type=str, choices=("audiohub", "stream"), default=None, help="[webrtc] Greeting playback: audiohub (upload+play, default) or stream (live).", ) p.add_argument( "--device", type=str, default=None, help="Inference device for YOLO (cpu, cuda, cuda:0).", ) p.add_argument( "--model", type=str, default=None, help="Path to / name of the YOLO weights (ultralytics auto-downloads).", ) p.add_argument( "--source", type=str, default=None, help="Mock frame source: webcam index or video file path.", ) p.add_argument( "--wav", type=str, default=None, help="Path to the greeting .wav clip.", ) p.add_argument( "--audio", type=str, choices=("host", "go2", "null"), default=None, help="Audio backend: host (default), go2 (experimental), null (silent).", ) p.add_argument( "--audio-device", type=str, default=None, help="Pin a PulseAudio sink for host audio (e.g. a USB/BT speaker on " "the dog); plays via 'paplay --device='. Empty -> default sink.", ) p.add_argument( "--no-avoidance", action="store_true", help="Disable the on-robot LiDAR firmware obstacle-avoidance layer.", ) p.add_argument( "--headless", action="store_true", help="Run without the cv2 debug window.", ) p.add_argument( "--dry-run", action="store_true", help="Perceive and decide, but never send a non-zero velocity.", ) p.add_argument( "--conf", type=float, default=None, help="Person detection confidence threshold for APPROACH.", ) p.add_argument( "--web", action="store_true", help="Serve the live annotated camera as MJPEG over HTTP " "(open http://:/ in a browser).", ) p.add_argument( "--web-port", type=int, default=None, help="Port for the --web dashboard (default 8080).", ) p.add_argument( "--geofence", action="store_true", help="Enable the OPTIONAL GPS keep-in-area geofence (needs an external " "GPS receiver). Default is vision-only containment (road/car avoidance).", ) p.add_argument( "--gps", type=str, choices=("auto", "gpsd", "serial", "mock"), default=None, help="GPS source for --geofence (default auto; mock simulates one). " "Implies --geofence.", ) p.add_argument( "--radius", type=float, default=None, help="Geofence keep-in radius in metres (default 15). Implies --geofence.", ) p.add_argument( "--play", type=str, choices=("calm", "moderate", "playful"), default=None, help="Idle dog-play intensity (default moderate; changeable via dashboard).", ) return p def config_from_args(args: argparse.Namespace) -> GoWelcomeConfig: """Fold parsed CLI ``args`` into a fresh default config. Only flags the user actually supplied override the dataclass defaults, so ``config.py`` remains the single source of truth for everything else. """ cfg = default_config() if args.mock: cfg.mock = True if args.transport is not None: cfg.transport = args.transport if args.interface is not None: cfg.network.interface = args.interface if args.robot_ip is not None: cfg.webrtc.ip = args.robot_ip if args.serial is not None: cfg.webrtc.serial_number = args.serial if args.aes_key is not None: cfg.webrtc.aes_128_key = args.aes_key if args.connection is not None: cfg.webrtc.connection_method = args.connection if args.audio_method is not None: cfg.webrtc.audio_method = args.audio_method if args.device is not None: cfg.perception.device = args.device if args.model is not None: cfg.perception.model_path = args.model if args.source is not None: cfg.camera.mock_source = args.source if args.wav is not None: cfg.greet.wav_path = args.wav if args.audio is not None: cfg.audio.backend = args.audio if args.audio_device is not None: cfg.audio.output_device = args.audio_device if args.no_avoidance: cfg.safety.use_lidar_avoidance = False if args.headless: cfg.headless = True if args.dry_run: cfg.dry_run = True if args.conf is not None: cfg.perception.person_conf = args.conf if args.web: cfg.web.enabled = True if args.web_port is not None: cfg.web.port = args.web_port # GPS geofence is opt-in (default: vision-only). Any of these flags enable it. if args.geofence or args.gps is not None or args.radius is not None: cfg.gps.enabled = True cfg.geofence.enabled = True if args.gps is not None: cfg.gps.source = args.gps if args.radius is not None: cfg.geofence.radius_m = args.radius if args.play is not None: cfg.play.mode = args.play return cfg def _draw_overlay(frame, result, state) -> None: """Annotate ``frame`` in place with detections, road coverage and state. Lazily imports ``cv2``. Person boxes are green, danger boxes red. Best effort only -- any drawing error is swallowed so the loop never dies on a rendering issue. """ import cv2 # lazy: optional try: for det in getattr(result, "persons", []) or []: cv2.rectangle( frame, (int(det.x1), int(det.y1)), (int(det.x2), int(det.y2)), (0, 255, 0), 2, ) cv2.putText( frame, f"person {det.conf:.2f}", (int(det.x1), max(0, int(det.y1) - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA, ) for det in getattr(result, "dangers", []) or []: cv2.rectangle( frame, (int(det.x1), int(det.y1)), (int(det.x2), int(det.y2)), (0, 0, 255), 2, ) cv2.putText( frame, f"{det.label} {det.conf:.2f}", (int(det.x1), max(0, int(det.y1) - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA, ) road = getattr(result, "road", None) coverage = getattr(road, "coverage", 0.0) if road is not None else 0.0 cv2.putText( frame, f"road: {coverage:.0%}", (10, 22), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA, ) state_name = getattr(state, "name", str(state)) cv2.putText( frame, f"state: {state_name}", (10, 46), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA, ) except Exception: # pragma: no cover - drawing must never crash the loop log.debug("overlay drawing failed", exc_info=True) def run(cfg: GoWelcomeConfig) -> int: """Run the GoWelcome control loop until interrupted. Returns an exit code.""" # Imports deferred until after CLI parsing so ``--help`` works without the # full (heavy) dependency tree being importable. from gowelcome.perception import PerceptionThread from gowelcome.robot.factory import build_robot from gowelcome.statemachine import GoWelcomeStateMachine from gowelcome.geo import build_gps_source log.info("GoWelcome starting (mock=%s, dry_run=%s, headless=%s)", cfg.mock, cfg.dry_run, cfg.headless) # Pre-bind so the finally block can clean up whatever was acquired, even if # setup raises partway through (never leave the dog standing with avoidance # claimed and no shutdown). robot = None perception = None gps_source = None web_server = None window_ready = False stop_flag = {"stop": False} def _handle_signal(signum: int, _frame: Optional[FrameType]) -> None: # Only set the flag here; the actual shutdown happens in the finally # block of the main loop (signal handlers must stay minimal). log.warning("signal %s received -> stopping", signum) stop_flag["stop"] = True # Register handlers up front (only possible on the main thread). try: signal.signal(signal.SIGINT, _handle_signal) signal.signal(signal.SIGTERM, _handle_signal) except ValueError: # pragma: no cover - run() invoked off the main thread log.warning("signal handlers not installed (not on the main thread)") try: # --- resource acquisition (inside the try so finally always runs) --- robot = build_robot(cfg) perception = PerceptionThread(robot, cfg) perception.start() # GPS geofence (optional): build + start the source before the SM. gps_source = build_gps_source(cfg) if gps_source is not None: gps_source.start() log.info("GPS geofence active (source=%s, radius=%.1fm)", cfg.gps.source, cfg.geofence.radius_m) robot.balance_stand() sm = GoWelcomeStateMachine(robot, perception, cfg, gps_source=gps_source) def _control(cmd): action = (cmd or {}).get("action") if action == "play_mode": ok = sm.set_play_mode(cmd.get("mode", "")) return {"ok": ok, "play_mode": sm.play_mode()} if action == "pause": sm.set_paused(bool(cmd.get("paused", True))) return {"ok": True} if action == "resume": sm.set_paused(False) return {"ok": True} if action == "estop": sm.request_estop() return {"ok": True} if action == "clear_estop": sm.clear_estop() return {"ok": True} if action == "set_center": return {"ok": sm.set_geofence_center()} return {"ok": False, "error": f"unknown action {action!r}"} # Optional MJPEG web viewer: stream the latest annotated frame over HTTP. if cfg.web.enabled: def _jpeg_provider(): result = perception.latest() frame = getattr(result, "frame", None) if result is not None else None if frame is None: return None try: import cv2 # lazy: optional canvas = frame.copy() _draw_overlay(canvas, result, sm.state) ok, buf = cv2.imencode( ".jpg", canvas, [cv2.IMWRITE_JPEG_QUALITY, int(cfg.web.jpeg_quality)], ) return buf.tobytes() if ok else None except Exception: # pragma: no cover - encode/display failure return None try: from gowelcome.web import MjpegServer web_server = MjpegServer( _jpeg_provider, host=cfg.web.host, port=cfg.web.port, fps=cfg.web.fps, title="GoWelcome", status_provider=sm.status, control_handler=_control, ) web_server.start() log.info("dashboard: open http://:%d/ in a browser", cfg.web.port) except Exception: log.exception("failed to start dashboard; continuing without it") web_server = None rate = max(1e-3, float(cfg.loop.rate_hz)) period = 1.0 / rate show_window = (not cfg.headless) last = time.monotonic() while not stop_flag["stop"]: now = time.monotonic() dt = now - last last = now state = sm.step(dt) if show_window: try: result = perception.latest() frame = getattr(result, "frame", None) if result is not None else None if frame is not None: import cv2 # lazy: optional canvas = frame.copy() _draw_overlay(canvas, result, state) cv2.imshow("GoWelcome", canvas) window_ready = True if (cv2.waitKey(1) & 0xFF) == 27: # ESC log.info("ESC pressed -> stopping") stop_flag["stop"] = True except Exception: # pragma: no cover - no display / no cv2 log.info("debug window unavailable; continuing headless", exc_info=True) show_window = False # Sleep the remainder of the loop period. elapsed = time.monotonic() - now remaining = period - elapsed if remaining > 0: time.sleep(remaining) except KeyboardInterrupt: # pragma: no cover - belt-and-braces log.warning("KeyboardInterrupt -> stopping") finally: log.info("shutting down") if web_server is not None: try: web_server.stop() except Exception: # pragma: no cover log.exception("web_server.stop() failed") if perception is not None: try: perception.stop() except Exception: # pragma: no cover log.exception("perception.stop() failed") if gps_source is not None: try: gps_source.stop() except Exception: # pragma: no cover log.exception("gps_source.stop() failed") if robot is not None: try: robot.shutdown() except Exception: # pragma: no cover log.exception("robot.shutdown() failed") if window_ready: try: import cv2 # lazy: optional cv2.destroyAllWindows() except Exception: # pragma: no cover pass log.info("GoWelcome stopped") return 0 def main(argv: Optional[list[str]] = None) -> int: """Parse args, configure logging, and run. Returns a process exit code.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s: %(message)s", datefmt="%H:%M:%S", ) args = build_arg_parser().parse_args(argv) cfg = config_from_args(args) return run(cfg) if __name__ == "__main__": sys.exit(main())