commit 79873d79f76c2c7d05a701785b2a6ec22322717f Author: kassam Date: Sun Apr 12 19:05:32 2026 +0400 Initial project commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..77fb19b --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +dataset/ +runs/ +models/ +captures/ +Logs/ +__pycache__/ +*.pyc +*.pt +*.pth +*.log diff --git a/Config/logging.json b/Config/logging.json new file mode 100644 index 0000000..3ff6ad5 --- /dev/null +++ b/Config/logging.json @@ -0,0 +1,11 @@ +{ + "level": "INFO", + "format": "%(asctime)s | %(name)s | %(levelname)s | %(message)s", + "file": true, + "console": true, + "categories": { + "Training": "INFO", + "Inference": "INFO", + "Manager": "INFO" + } +} diff --git a/DEPLOY.md b/DEPLOY.md new file mode 100644 index 0000000..26a191e --- /dev/null +++ b/DEPLOY.md @@ -0,0 +1,489 @@ +# Saqr PPE Detection - Deployment Guide +## Unitree G1 Robot + Intel RealSense D435I + +--- + +## Robot Details + +| Item | Value | +|------|-------| +| Robot | Unitree G1 Humanoid | +| IP | `192.168.123.164` | +| User | `unitree` | +| OS | Ubuntu 20.04 (aarch64 / Jetson) | +| Python | 3.10 (conda env: `teleimager`) | +| Camera | Intel RealSense D435I | +| Serial | `243622073459` | +| Port | USB 3.2 @ `/dev/video0` | + +--- + +## Step 1: Train the Model (Dev Machine) + +```bash +cd ~/Robotics_workspace/AI/Saqr +conda activate AI_MSI_yolo +python train.py --dataset dataset --epochs 100 --batch 16 +``` + +Verify model exists: +```bash +ls -lh models/saqr_best.pt +# Expected: ~5.3 MB +``` + +--- + +## Step 2: Deploy to Robot (Dev Machine) + +### Option A: Auto deploy +```bash +cd ~/Robotics_workspace/AI/Saqr +./deploy.sh +``` + +### Option B: Manual SCP +```bash +# Create folders +ssh unitree@192.168.123.164 "mkdir -p ~/Saqr/{models,captures/{SAFE,PARTIAL,UNSAFE},Config,Logs}" + +# Copy project files +scp saqr.py saqr_g1_bridge.py detect.py manager.py logger.py gui.py requirements.txt deploy.sh DEPLOY.md \ + unitree@192.168.123.164:~/Saqr/ + +# Copy config +scp Config/logging.json unitree@192.168.123.164:~/Saqr/Config/ + +# Copy trained model (5.3 MB) +scp models/saqr_best.pt unitree@192.168.123.164:~/Saqr/models/ +``` + +--- + +## Step 3: Install Dependencies (Robot) + +```bash +ssh unitree@192.168.123.164 +``` + +### Fix system clock (required for SSL/pip): +```bash +sudo date -s "2026-04-10 15:00:00" +``` + +### Install into teleimager conda env: +```bash +conda activate teleimager +python -m pip install ultralytics opencv-python-headless numpy PyYAML +``` + +If pip fails (SSL errors), install offline from dev machine: +```bash +# On dev machine: +mkdir -p /tmp/saqr_pkgs +pip download ultralytics opencv-python-headless numpy PyYAML \ + -d /tmp/saqr_pkgs --python-version 3.10 --platform manylinux2014_aarch64 --only-binary=:all: +scp -r /tmp/saqr_pkgs unitree@192.168.123.164:/tmp/saqr_pkgs + +# On robot: +conda activate teleimager +python -m pip install --no-index --find-links=/tmp/saqr_pkgs ultralytics opencv-python-headless numpy PyYAML +``` + +### Install Jetson GPU PyTorch (for CUDA acceleration): +```bash +# Remove pip PyTorch (wrong CUDA version) +python -m pip uninstall torch torchvision -y + +# Install Jetson-specific PyTorch for JetPack 5.1 / CUDA 11.4 +python -m pip install --no-cache-dir \ + https://developer.download.nvidia.com/compute/redist/jp/v51/pytorch/torch-2.1.0a0+41361538.nv23.06-cp310-cp310-linux_aarch64.whl + +python -m pip install --no-cache-dir \ + https://developer.download.nvidia.com/compute/redist/jp/v51/pytorch/torchvision-0.16.1a0+5e8e2f1-cp310-cp310-linux_aarch64.whl +``` + +### Fix Qt / Display (choose one): + +**A) At the robot's physical terminal (monitor connected):** +```bash +xhost +local: +export DISPLAY=:0 +export QT_QPA_PLATFORM=xcb +``` + +**B) Via SSH with X11 forwarding:** +```bash +# From dev machine: +ssh -X unitree@192.168.123.164 +export QT_QPA_PLATFORM=xcb +``` + +**C) Headless / no display (SSH without -X):** +```bash +export QT_QPA_PLATFORM=offscreen +# Always add --headless flag when running saqr.py +``` + +**Make permanent:** +```bash +echo 'export QT_QPA_PLATFORM=offscreen' >> ~/.bashrc +source ~/.bashrc +``` + +**Common error:** `Invalid MIT-MAGIC-COOKIE-1 key` or `could not connect to display :0` +This means you're in SSH without X11 auth. Either use `ssh -X`, run `xhost +local:` on the physical terminal, or switch to headless mode. + +### Fix system clock (required for pip/SSL): +```bash +sudo date -s "2026-04-10 16:00:00" +``` + +### Verify install: +```bash +python -c "from ultralytics import YOLO; print('ultralytics OK')" +python -c "import torch; print('CUDA:', torch.cuda.is_available())" +python -c "import cv2; print('opencv OK')" +``` + +--- + +## Step 4: Run PPE Detection (Robot) + +### Option A: OpenCV + RealSense RGB (recommended, no pyrealsense2 needed): +```bash +conda activate teleimager +cd ~/Saqr + +# === WITH DISPLAY (physical monitor on robot) === +xhost + +export DISPLAY=:0 +python saqr.py --source /dev/video2 --model models/saqr_best.pt + +# === HEADLESS via SSH (no display, saves captures + CSV) === +export QT_QPA_PLATFORM=offscreen +python saqr.py --source /dev/video2 --model models/saqr_best.pt --headless +``` + +**Note:** `/dev/video2` is the RealSense D435I RGB camera accessed directly via OpenCV V4L2. +No pyrealsense2 SDK needed. Pure OpenCV frames (640x480 BGR). + +### Option B: RealSense SDK (pyrealsense2): +```bash +python saqr.py --source realsense --model models/saqr_best.pt --headless +python saqr.py --source realsense:243622073459 --model models/saqr_best.pt --headless +``` + +### Option C: GUI (dev machine only, not on robot): +```bash +# On your dev machine (not the robot): +python gui.py --source 0 --model models/saqr_best.pt +``` +**Note:** gui.py requires PySide6 and a display. It will NOT work on the headless Jetson robot. + +### With OpenCV camera index: +```bash +python saqr.py --source 0 --model models/saqr_best.pt --headless +``` + +### With V4L2 device path: +```bash +python saqr.py --source /dev/video0 --model models/saqr_best.pt --headless +``` + +### With GUI (if display connected): +```bash +python gui.py --source realsense --model models/saqr_best.pt +``` + +### Simple detection (no tracking): +```bash +python detect.py --source realsense --model models/saqr_best.pt +``` + +--- + +## Step 4b: Run with G1 TTS + Reject Action (Bridge) + +`saqr_g1_bridge.py` spawns `saqr.py`, parses its event stream, and drives the +G1 **onboard TTS** and the G1 **arm action client** on each per-person status +transition: + +| Transition | TTS (speaker_id=2, English) | Arm action | +|------------|------------------------------|------------| +| → UNSAFE | "Not safe! Please wear your protective equipment." | `reject` (id=13) + auto `release arm` | +| → SAFE | "Safe." | — | +| → PARTIAL | — | — | + +Requires `unitree_sdk2py` installed on the robot and a reachable DDS bus on +`eth0`. The bridge uses a single `ChannelFactoryInitialize` for both clients. + +### Headless + MJPEG stream (recommended over SSH): +```bash +conda activate marcus # or teleimager — whichever env has unitree_sdk2py +cd ~/Saqr +python3 saqr_g1_bridge.py --iface eth0 --source realsense --headless -- --stream 8080 +``` +Then open `http://192.168.123.164:8080` in your laptop browser. + +### With live OpenCV window (physical monitor on robot): +```bash +xhost +local: >/dev/null 2>&1 +DISPLAY=:0 python3 saqr_g1_bridge.py --iface eth0 --source realsense +``` +`q` in the window quits; Ctrl+C in the terminal is also forwarded to Saqr. + +### Dry run (no TTS, no motion — just see decisions): +```bash +python3 saqr_g1_bridge.py --dry-run --source realsense --headless +``` + +### Bridge CLI flags: + +| Flag | Default | Description | +|------|---------|-------------| +| `--iface` | *(default DDS)* | DDS network interface, e.g. `eth0` | +| `--timeout` | `10.0` | Arm/Audio client timeout (seconds) | +| `--cooldown` | `8.0` | Per-(id, status) seconds before re-triggering | +| `--release-after` | `2.0` | Seconds before auto `release arm` (0 = never) | +| `--speaker-id` | `2` | G1 `TtsMaker` speaker_id (2 = English on current firmware) | +| `--dry-run` | off | Parse events but never call the SDK | +| `--source` | — | Pass through to saqr (`0` / `realsense` / `/dev/video2` / path) | +| `--headless` | off | Pass `--headless` to saqr | +| `--saqr-conf` | — | Pass `--conf` to saqr | +| `--imgsz` | — | Pass `--imgsz` to saqr | +| `--device` | — | Pass `--device` to saqr (`cpu` / `0` / `cuda:0`) | +| `-- ` | — | Everything after `--` is forwarded raw to saqr | + +### Speaker-id reference + +speaker_ids are **locked to a language** — they do NOT auto-detect input text. +On current G1 firmware, `speaker_id=0` is Chinese regardless of what you feed +it. Speaker 2 was confirmed English by running Sanad mode 6 +(`voice_example.py 6`). If the robot's firmware changes, re-scan: +```bash +# On the robot (in a conda env with unitree_sdk2py): +python3 ~/Sanad/voice_example.py 6 +``` +and pass the new id with `--speaker-id N`. + +### What successful output looks like: +``` +[BRIDGE] G1ArmActionClient ready (iface=eth0) +[BRIDGE] G1 AudioClient ready (speaker_id=2) +[BRIDGE] launching: /.../python3 -u /home/unitree/Saqr/saqr.py --source realsense --headless +... +ID 0001 | NEW | SAFE | wearing: helmet, vest | missing: none | ... +[BRIDGE] tts -> 'Safe.' +ID 0002 | NEW | UNSAFE | wearing: none | missing: vest | ... +[BRIDGE] tts -> 'Not safe! Please wear your protective equipment.' +[BRIDGE] -> reject +[BRIDGE] -> release arm +``` + +--- + +## Step 5: Check Results (Robot) + +### Live status: +```bash +cat ~/Saqr/captures/result.csv +``` + +### Event history (audit log): +```bash +cat ~/Saqr/captures/events.csv +``` + +### Captured photos: +```bash +ls ~/Saqr/captures/SAFE/ +ls ~/Saqr/captures/PARTIAL/ +ls ~/Saqr/captures/UNSAFE/ +``` + +### Export CSV report: +```bash +cd ~/Saqr +python manager.py --export +``` + +### Download results to dev machine: +```bash +# From dev machine +scp -r unitree@192.168.123.164:~/Saqr/captures/ ./captures_from_robot/ +scp unitree@192.168.123.164:~/Saqr/captures/events.csv ./events_robot.csv +``` + +--- + +## Camera Source Options + +| Source | Command | Description | +|--------|---------|-------------| +| `/dev/video2` | `--source /dev/video2` | **RGB camera via OpenCV (recommended)** | +| `realsense` | `--source realsense` | RealSense D435I via pyrealsense2 SDK | +| `realsense:SERIAL` | `--source realsense:243622073459` | Specific RealSense by serial | +| `/dev/video4` | `--source /dev/video4` | Second RGB stream (if available) | +| `0` | `--source 0` | First OpenCV camera index | +| `video.mp4` | `--source video.mp4` | Video file | +| `image.jpg` | `--source image.jpg` | Single image | + +### G1 Robot V4L2 Device Map (RealSense D435I): +``` +/dev/video0 - Stereo module (infrared) - won't open with OpenCV +/dev/video1 - Stereo metadata +/dev/video2 - RGB camera (640x480) ← USE THIS +/dev/video3 - RGB metadata +/dev/video4 - RGB camera (secondary stream) +``` + +### Detect cameras on robot: +```bash +# Find working RGB cameras +python -c " +import cv2 +for i in range(10): + cap = cv2.VideoCapture(f'/dev/video{i}', cv2.CAP_V4L2) + if cap.isOpened(): + ret, frame = cap.read() + if ret and frame is not None: + print(f'/dev/video{i}: {frame.shape} OK') + else: + print(f'/dev/video{i}: opened but no frame') + cap.release() +" + +# RealSense devices +rs-enumerate-devices | grep "Serial Number" +``` + +--- + +## Tuning Parameters + +| Parameter | Default | Flag | Description | +|-----------|---------|------|-------------| +| Confidence | 0.35 | `--conf 0.35` | Lower = more detections, higher = fewer false positives | +| Max Missing | 90 | `--max-missing 90` | Frames before track deleted (~3s at 30fps) | +| Match Distance | 250 | `--match-distance 250` | Pixels for track matching | +| Confirm Frames | 5 | `--status-confirm-frames 5` | Frames to confirm a status change | + +### Recommended for G1 patrol: +```bash +python saqr.py --source realsense --model models/saqr_best.pt --headless \ + --conf 0.30 --max-missing 120 --match-distance 300 --status-confirm-frames 7 +``` + +--- + +## Compliance Rules + +| Status | Condition | Color | +|--------|-----------|-------| +| SAFE | Helmet AND vest detected, no violations | Green | +| PARTIAL | Only helmet OR only vest detected | Yellow | +| UNSAFE | `no-helmet` or `no-vest` detected, or nothing detected | Red | + +--- + +## Output Files + +| File | Location | Description | +|------|----------|-------------| +| `result.csv` | `captures/result.csv` | Current state of all tracked persons | +| `events.csv` | `captures/events.csv` | Audit log (NEW / STATUS_CHANGE events) | +| Person crops | `captures/SAFE/*.jpg` | Cropped images of compliant workers | +| Person crops | `captures/PARTIAL/*.jpg` | Workers with incomplete PPE | +| Person crops | `captures/UNSAFE/*.jpg` | Workers violating PPE rules | +| Logs | `Logs/Inference/saqr.log` | Runtime log | + +--- + +## Project Files + +| File | Purpose | +|------|---------| +| `saqr.py` | Main PPE tracking + detection (RealSense + OpenCV) | +| `saqr_g1_bridge.py` | Saqr → G1 bridge (onboard TTS + `reject` arm action on UNSAFE/SAFE transitions) | +| `detect.py` | Simple detection without tracking | +| `gui.py` | PySide6 desktop GUI | +| `manager.py` | Photo management CLI + CSV export | +| `train.py` | YOLO model training | +| `logger.py` | Centralized logging | +| `deploy.sh` | One-command deploy to robot | +| `Config/logging.json` | Log settings | + +--- + +## Troubleshooting + +### RealSense not detected +```bash +# Check USB connection +lsusb | grep Intel + +# Re-enumerate +rs-enumerate-devices | head -10 + +# Reset USB (if needed) +sudo usbreset /dev/bus/usb/002/002 +``` + +### Camera not opening +```bash +# Test RealSense directly +python -c " +import pyrealsense2 as rs +pipe = rs.pipeline() +cfg = rs.config() +cfg.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) +pipe.start(cfg) +frames = pipe.wait_for_frames() +print('Frame:', frames.get_color_frame().get_width(), 'x', frames.get_color_frame().get_height()) +pipe.stop() +" + +# Test OpenCV fallback +python -c "import cv2; c=cv2.VideoCapture(0); print('OK' if c.isOpened() else 'FAIL'); c.release()" + +# Try different source +python saqr.py --source /dev/video0 --model models/saqr_best.pt --headless +``` + +### ModuleNotFoundError: ultralytics +```bash +# Check you're in the right conda env +which python +# Should show: /home/unitree/miniconda3/envs/teleimager/bin/python + +# Install to the correct env +python -m pip install ultralytics +``` + +### System clock wrong (SSL errors) +```bash +sudo date -s "2026-04-10 15:00:00" +``` + +### Model not found +```bash +ls ~/Saqr/models/ +# Should show: saqr_best.pt (~5.3 MB) +``` + +### Low FPS on Jetson +```bash +# Use smaller confidence to reduce load +python saqr.py --source realsense --conf 0.5 --headless + +# Or use headless opencv +export DISPLAY= +python saqr.py --source realsense --headless +``` + +### Too many duplicate track IDs +```bash +# Increase tolerance +python saqr.py --source realsense --max-missing 150 --match-distance 300 --headless +``` diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..6da8361 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,129 @@ +#!/bin/bash +# ============================================================================ +# Saqr PPE Detection - Deploy to Unitree G1 +# ============================================================================ +# +# Usage (from your dev machine): +# ./deploy.sh # deploy + install deps +# ./deploy.sh --run # deploy + install + start detection +# ./deploy.sh --run --source 0 # deploy + start with camera 0 +# +# ============================================================================ + +set -e + +# ── Robot config ────────────────────────────────────────────────────────────── +ROBOT_IP="${ROBOT_IP:-192.168.123.164}" +ROBOT_USER="${ROBOT_USER:-unitree}" +ROBOT_ENV="${ROBOT_ENV:-teleimager}" +REMOTE_DIR="/home/${ROBOT_USER}/Saqr" +SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=10" + +# ── Parse args ──────────────────────────────────────────────────────────────── +RUN_AFTER=false +SOURCE="0" +MODEL="models/saqr_best.pt" +HEADLESS=false + +while [[ $# -gt 0 ]]; do + case $1 in + --run) RUN_AFTER=true; shift ;; + --source) SOURCE="$2"; shift 2 ;; + --model) MODEL="$2"; shift 2 ;; + --headless) HEADLESS=true; shift ;; + --ip) ROBOT_IP="$2"; shift 2 ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +LOCAL_DIR="$(cd "$(dirname "$0")" && pwd)" + +echo "============================================" +echo " Saqr PPE - Deploy to Unitree G1" +echo "============================================" +echo " Robot : ${ROBOT_USER}@${ROBOT_IP}" +echo " Env : ${ROBOT_ENV}" +echo " Remote: ${REMOTE_DIR}" +echo "============================================" + +# ── Step 1: Test connection ─────────────────────────────────────────────────── +echo "" +echo "[1/5] Testing SSH connection..." +ssh ${SSH_OPTS} ${ROBOT_USER}@${ROBOT_IP} "echo 'Connected OK'" || { + echo "[ERROR] Cannot reach ${ROBOT_IP}. Is the robot on?" + exit 1 +} + +# ── Step 2: Create remote directory ─────────────────────────────────────────── +echo "[2/5] Creating remote directory..." +ssh ${SSH_OPTS} ${ROBOT_USER}@${ROBOT_IP} "mkdir -p ${REMOTE_DIR}/{models,captures/{SAFE,PARTIAL,UNSAFE},Config,Logs}" + +# ── Step 3: Copy project files ──────────────────────────────────────────────── +echo "[3/5] Copying project files..." + +# Python files +for f in saqr.py train.py detect.py manager.py logger.py gui.py requirements.txt; do + if [ -f "${LOCAL_DIR}/${f}" ]; then + scp ${SSH_OPTS} "${LOCAL_DIR}/${f}" ${ROBOT_USER}@${ROBOT_IP}:${REMOTE_DIR}/ + fi +done + +# Config +scp ${SSH_OPTS} "${LOCAL_DIR}/Config/logging.json" ${ROBOT_USER}@${ROBOT_IP}:${REMOTE_DIR}/Config/ + +# Trained model (this is the big file) +if [ -f "${LOCAL_DIR}/models/saqr_best.pt" ]; then + echo " Uploading trained model (saqr_best.pt)..." + scp ${SSH_OPTS} "${LOCAL_DIR}/models/saqr_best.pt" ${ROBOT_USER}@${ROBOT_IP}:${REMOTE_DIR}/models/ +else + echo " [WARN] models/saqr_best.pt not found - train first!" +fi + +# Base model (for retraining on robot if needed) +if [ -f "${LOCAL_DIR}/models/yolo11n.pt" ]; then + scp ${SSH_OPTS} "${LOCAL_DIR}/models/yolo11n.pt" ${ROBOT_USER}@${ROBOT_IP}:${REMOTE_DIR}/models/ +fi + +echo " Files copied." + +# ── Step 4: Install dependencies ────────────────────────────────────────────── +echo "[4/5] Installing dependencies on robot..." +ssh ${SSH_OPTS} ${ROBOT_USER}@${ROBOT_IP} << 'INSTALL_EOF' + source ~/miniconda3/etc/profile.d/conda.sh + conda activate teleimager + + pip install -q ultralytics opencv-python numpy PyYAML 2>/dev/null + echo " Dependencies OK" +INSTALL_EOF + +# ── Step 5: Optionally run ──────────────────────────────────────────────────── +if [ "$RUN_AFTER" = true ]; then + echo "[5/5] Starting Saqr PPE detection on robot..." + + HEADLESS_FLAG="" + if [ "$HEADLESS" = true ]; then + HEADLESS_FLAG="--headless" + fi + + ssh ${SSH_OPTS} -t ${ROBOT_USER}@${ROBOT_IP} << RUN_EOF + source ~/miniconda3/etc/profile.d/conda.sh + conda activate teleimager + cd ${REMOTE_DIR} + python saqr.py --source ${SOURCE} --model ${MODEL} ${HEADLESS_FLAG} +RUN_EOF +else + echo "[5/5] Skipped (use --run to start after deploy)" + echo "" + echo "============================================" + echo " Deployed! SSH in to run:" + echo "============================================" + echo "" + echo " ssh ${ROBOT_USER}@${ROBOT_IP}" + echo " conda activate teleimager" + echo " cd ${REMOTE_DIR}" + echo " python saqr.py --source 0 --model models/saqr_best.pt" + echo "" + echo " Or with GUI:" + echo " python gui.py --source 0 --model models/saqr_best.pt" + echo "" +fi diff --git a/detect.py b/detect.py new file mode 100644 index 0000000..7ef3cda --- /dev/null +++ b/detect.py @@ -0,0 +1,147 @@ +""" +Saqr - PPE Detection | Simple Detection (no tracking) +======================================================== +Single-pass YOLO inference: draw PPE boxes on frame, no person tracking. +Green = PPE worn, Red = PPE missing. + +Usage: + python detect.py --source 0 + python detect.py --source image.jpg --model models/saqr_best.pt +""" + +import argparse +import time +from pathlib import Path + +import cv2 +from ultralytics import YOLO + +from logger import get_logger + +log = get_logger("Inference", "detect") + +# Global inference config (set by main()) +_INFER_KWARGS: dict = {"device": "cpu", "half": False, "imgsz": 640} + +VIOLATION = {"no-helmet", "no-vest", "no-boots", "no-gloves", "no-goggles"} +COMPLIANT = {"helmet", "vest", "boots", "gloves", "goggles"} +GREEN = (0, 200, 0) +RED = (0, 0, 220) +BLUE = (200, 100, 0) +WHITE = (255, 255, 255) + + +def box_color(label: str): + if label in VIOLATION: + return RED + if label in COMPLIANT: + return GREEN + return BLUE + + +def draw_boxes(frame, results, model): + for box in results.boxes: + cls_id = int(box.cls) + label = model.names[cls_id] + conf = float(box.conf) + x1, y1, x2, y2 = map(int, box.xyxy[0]) + color = box_color(label) + + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + text = f"{label} {conf:.2f}" + (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.48, 1) + y_t = max(y1, th + 6) + cv2.rectangle(frame, (x1, y_t - th - 4), (x1 + tw + 4, y_t), color, -1) + cv2.putText(frame, text, (x1 + 2, y_t - 3), + cv2.FONT_HERSHEY_SIMPLEX, 0.48, WHITE, 1, cv2.LINE_AA) + + +def run_video(model, source, conf): + cap = cv2.VideoCapture(int(source) if source.isdigit() else source) + if not cap.isOpened(): + log.error(f"Cannot open: {source}") + return + + print("Running - q to quit, s to save.") + prev = time.time() + while True: + ret, frame = cap.read() + if not ret: + break + results = model(frame, conf=conf, verbose=False, **_INFER_KWARGS)[0] + draw_boxes(frame, results, model) + + fps = 1.0 / max(time.time() - prev, 1e-9) + prev = time.time() + cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, WHITE, 2, cv2.LINE_AA) + + cv2.imshow("Saqr Detect", frame) + key = cv2.waitKey(1) & 0xFF + if key == ord("q"): + break + if key == ord("s"): + cv2.imwrite("detect_saved.jpg", frame) + print("Saved: detect_saved.jpg") + + cap.release() + cv2.destroyAllWindows() + + +def run_image(model, path, conf): + frame = cv2.imread(path) + if frame is None: + log.error(f"Cannot read: {path}") + return + results = model(frame, conf=conf, verbose=False)[0] + draw_boxes(frame, results, model) + out = Path(path).stem + "_detect.jpg" + cv2.imwrite(out, frame) + print(f"Saved: {out}") + cv2.imshow("Saqr Detect", frame) + cv2.waitKey(0) + cv2.destroyAllWindows() + + +def main(): + parser = argparse.ArgumentParser(description="Saqr simple PPE detection") + parser.add_argument("--source", default="0") + parser.add_argument("--model", default="models/saqr_best.pt") + parser.add_argument("--conf", type=float, default=0.35) + parser.add_argument("--device", default="0", help="'cpu', '0', 'cuda:0'") + parser.add_argument("--half", action="store_true", help="FP16 inference") + parser.add_argument("--imgsz", type=int, default=320, help="Inference size") + args = parser.parse_args() + + global _INFER_KWARGS + _INFER_KWARGS = {"device": args.device, "half": args.half, "imgsz": args.imgsz} + try: + import torch + if not torch.cuda.is_available() and args.device != "cpu": + log.warning("CUDA unavailable - falling back to CPU") + _INFER_KWARGS["device"] = "cpu" + _INFER_KWARGS["half"] = False + except ImportError: + pass + + root = Path(__file__).parent + model_path = root / args.model + if not model_path.exists(): + model_path = Path(args.model) + if not model_path.exists(): + log.error(f"Model not found: {args.model}") + raise SystemExit(1) + + model = YOLO(str(model_path)) + src = args.source + if src.isdigit() or Path(src).suffix.lower() in {".mp4", ".avi", ".mov", ".mkv"}: + run_video(model, src, args.conf) + elif Path(src).exists(): + run_image(model, src, args.conf) + else: + log.error(f"Source not found: {src}") + raise SystemExit(1) + + +if __name__ == "__main__": + main() diff --git a/gui.py b/gui.py new file mode 100644 index 0000000..ad56a5b --- /dev/null +++ b/gui.py @@ -0,0 +1,531 @@ +""" +Saqr - PPE Safety Tracking | GUI Application +================================================= +PySide6 desktop GUI for real-time PPE compliance monitoring. + +Features: + - Live camera feed with PPE detection overlays + - Start / Stop / Source selection + - Real-time SAFE / PARTIAL / UNSAFE counters + - Track list with per-person status + - Event log panel + - Confidence & tracking parameter controls + - Capture gallery sidebar + - G1 robot camera support (RealSense / V4L2 /dev/videoX) + +Usage: + python gui.py + python gui.py --model models/saqr_best.pt + python gui.py --source 1 +""" + +from __future__ import annotations + +import sys +import time +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional + +import cv2 +import numpy as np +from PySide6.QtCore import Qt, QThread, Signal, Slot, QTimer, QSize +from PySide6.QtGui import QImage, QPixmap, QFont, QColor, QIcon +from PySide6.QtWidgets import ( + QApplication, QMainWindow, QWidget, QLabel, QPushButton, + QVBoxLayout, QHBoxLayout, QGridLayout, QGroupBox, QComboBox, + QSlider, QSpinBox, QDoubleSpinBox, QTextEdit, QSplitter, + QFrame, QScrollArea, QFileDialog, QMessageBox, QStatusBar, +) + +from ultralytics import YOLO + +# Import Saqr core modules +from saqr import ( + collect_detections, group_detections_to_people, status_from_items, + split_wearing_missing, PersonTracker, EventLogger, Track, + save_track_image, emit_event, write_result_csv, draw_track, + draw_counters, setup_capture_dirs, resolve_model_path, + clamp_bbox, STATUSES, EVENTS_CSV, RESULT_CSV, ROOT, CAPTURES_DIR, + now_iso, +) + +from logger import get_logger + +log = get_logger("Inference", "gui") + + +# ── Camera backends (from AI_Photographer patterns) ────────────────────────── +def list_cameras(max_idx: int = 10) -> List[str]: + """Scan for available camera devices.""" + sources = [] + # V4L2 devices + for i in range(max_idx): + dev = f"/dev/video{i}" + if Path(dev).exists(): + sources.append(dev) + # Fallback numeric indices + if not sources: + for i in range(4): + cap = cv2.VideoCapture(i) + if cap.isOpened(): + sources.append(str(i)) + cap.release() + return sources if sources else ["0"] + + +def open_camera(source: str, width: int = 640, height: int = 480, fps: int = 30): + """Open camera with V4L2 backend and MJPEG codec (G1 compatible).""" + if source.startswith("/dev/video"): + cap = cv2.VideoCapture(source, cv2.CAP_V4L2) + elif source.isdigit(): + cap = cv2.VideoCapture(int(source)) + else: + cap = cv2.VideoCapture(source) + + if cap.isOpened(): + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + cap.set(cv2.CAP_PROP_FPS, fps) + return cap + + +# ── Detection Worker Thread ─────────────────────────────────────────────────── +class DetectionWorker(QThread): + frame_ready = Signal(np.ndarray, list) # annotated frame, visible tracks + event_fired = Signal(str) # event message string + stats_updated = Signal(dict) # {SAFE: n, PARTIAL: n, UNSAFE: n, fps: f} + + def __init__(self, parent=None): + super().__init__(parent) + self._running = False + self.model: Optional[YOLO] = None + self.source = "0" + self.conf = 0.35 + self.max_missing = 90 + self.match_distance = 250.0 + self.status_confirm = 5 + self.capture_dirs: Dict[str, Path] = {} + + def configure(self, model_path: str, source: str, conf: float, + max_missing: int, match_dist: float, status_confirm: int): + self.source = source + self.conf = conf + self.max_missing = max_missing + self.match_distance = match_dist + self.status_confirm = status_confirm + self.capture_dirs = setup_capture_dirs(ROOT) + if self.model is None or str(model_path) != getattr(self, '_last_model', ''): + self.model = YOLO(model_path) + self._last_model = str(model_path) + + def run(self): + self._running = True + cap = open_camera(self.source) + if not cap.isOpened(): + self.event_fired.emit(f"[ERROR] Cannot open camera: {self.source}") + return + + ok, first = cap.read() + if not ok: + self.event_fired.emit("[ERROR] Cannot read first frame") + cap.release() + return + + event_logger = EventLogger(EVENTS_CSV) + tracker = PersonTracker( + event_logger=event_logger, + max_missing=self.max_missing, + match_distance=self.match_distance, + status_confirm_frames=self.status_confirm, + ) + + self.event_fired.emit(f"Session started | source={self.source}") + prev = time.time() + frame_idx = 0 + frame = first + + while self._running: + frame_idx += 1 + h, w = frame.shape[:2] + annotated = frame.copy() + + try: + detections = collect_detections(frame, self.model, self.conf) + candidates = group_detections_to_people(detections, w, h) + created, changed = tracker.update(candidates, frame_idx) + visible = tracker.visible_tracks() + + created_ids = {t.track_id for t in created} + changed_ids = {t.track_id for t in changed} + event_ids = created_ids | changed_ids + + for track in visible: + save_track_image(frame, track, self.capture_dirs) + if track.track_id in event_ids: + ev_type = "NEW" if track.track_id in created_ids else "STATUS_CHANGE" + wearing, missing, unknown = split_wearing_missing(track.items) + msg = ( + f"ID {track.track_id:04d} | {ev_type} | {track.status} | " + f"W: {', '.join(wearing) or 'none'} | " + f"M: {', '.join(missing) or 'none'}" + ) + self.event_fired.emit(msg) + emit_event(track, event_logger, ev_type) + draw_track(annotated, track) + + # Write CSV periodically + if frame_idx % 30 == 0: + write_result_csv(list(tracker.tracks.values()), RESULT_CSV) + + except Exception as e: + self.event_fired.emit(f"[ERROR] Frame {frame_idx}: {e}") + visible = tracker.visible_tracks() + + now_t = time.time() + fps = 1.0 / max(now_t - prev, 1e-9) + prev = now_t + + draw_counters(annotated, visible, fps) + + # Emit signals + counts = {s: 0 for s in STATUSES} + for t in visible: + counts[t.status] += 1 + counts["fps"] = fps + counts["tracks"] = len(visible) + + self.frame_ready.emit(annotated, visible) + self.stats_updated.emit(counts) + + ret, frame = cap.read() + if not ret: + break + + cap.release() + write_result_csv(list(tracker.tracks.values()), RESULT_CSV) + self.event_fired.emit("Session ended.") + + def stop(self): + self._running = False + + +# ── Helpers ─────────────────────────────────────────────────────────────────── +def cv_to_qpixmap(frame: np.ndarray, max_w: int = 960, max_h: int = 720) -> QPixmap: + h, w, ch = frame.shape + rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + qimg = QImage(rgb.data, w, h, ch * w, QImage.Format.Format_RGB888) + pix = QPixmap.fromImage(qimg) + return pix.scaled(max_w, max_h, Qt.AspectRatioMode.KeepAspectRatio, + Qt.TransformationMode.SmoothTransformation) + + +# ── Main Window ─────────────────────────────────────────────────────────────── +class SaqrWindow(QMainWindow): + def __init__(self, default_model: str = "models/saqr_best.pt", + default_source: str = "0"): + super().__init__() + self.setWindowTitle("Saqr - PPE Safety Tracking") + self.setMinimumSize(1200, 700) + self._default_model = default_model + self._default_source = default_source + + self.worker: Optional[DetectionWorker] = None + self._build_ui() + self._scan_cameras() + + def _build_ui(self): + central = QWidget() + self.setCentralWidget(central) + main_layout = QHBoxLayout(central) + + # ── Left: Controls ──────────────────────────────────────────────── + left = QVBoxLayout() + left.setMaximumWidth = 300 + + # Model + model_grp = QGroupBox("Model") + model_lay = QVBoxLayout(model_grp) + self.model_label = QLabel(self._default_model) + self.model_label.setWordWrap(True) + btn_model = QPushButton("Browse...") + btn_model.clicked.connect(self._browse_model) + model_lay.addWidget(self.model_label) + model_lay.addWidget(btn_model) + left.addWidget(model_grp) + + # Camera + cam_grp = QGroupBox("Camera Source") + cam_lay = QVBoxLayout(cam_grp) + self.cam_combo = QComboBox() + btn_refresh = QPushButton("Refresh") + btn_refresh.clicked.connect(self._scan_cameras) + cam_lay.addWidget(self.cam_combo) + cam_lay.addWidget(btn_refresh) + left.addWidget(cam_grp) + + # Parameters + param_grp = QGroupBox("Parameters") + param_lay = QGridLayout(param_grp) + + param_lay.addWidget(QLabel("Confidence:"), 0, 0) + self.conf_spin = QDoubleSpinBox() + self.conf_spin.setRange(0.1, 0.9) + self.conf_spin.setSingleStep(0.05) + self.conf_spin.setValue(0.35) + param_lay.addWidget(self.conf_spin, 0, 1) + + param_lay.addWidget(QLabel("Max Missing:"), 1, 0) + self.missing_spin = QSpinBox() + self.missing_spin.setRange(10, 300) + self.missing_spin.setValue(90) + param_lay.addWidget(self.missing_spin, 1, 1) + + param_lay.addWidget(QLabel("Match Dist:"), 2, 0) + self.dist_spin = QDoubleSpinBox() + self.dist_spin.setRange(50, 500) + self.dist_spin.setSingleStep(10) + self.dist_spin.setValue(250) + param_lay.addWidget(self.dist_spin, 2, 1) + + param_lay.addWidget(QLabel("Confirm Frames:"), 3, 0) + self.confirm_spin = QSpinBox() + self.confirm_spin.setRange(1, 20) + self.confirm_spin.setValue(5) + param_lay.addWidget(self.confirm_spin, 3, 1) + + left.addWidget(param_grp) + + # Start / Stop + btn_lay = QHBoxLayout() + self.btn_start = QPushButton("Start") + self.btn_start.setStyleSheet("background-color: #2ecc71; color: white; font-weight: bold; padding: 8px;") + self.btn_start.clicked.connect(self._start) + self.btn_stop = QPushButton("Stop") + self.btn_stop.setStyleSheet("background-color: #e74c3c; color: white; font-weight: bold; padding: 8px;") + self.btn_stop.clicked.connect(self._stop) + self.btn_stop.setEnabled(False) + btn_lay.addWidget(self.btn_start) + btn_lay.addWidget(self.btn_stop) + left.addLayout(btn_lay) + + # Status counters + stats_grp = QGroupBox("Live Status") + stats_lay = QGridLayout(stats_grp) + self.lbl_fps = QLabel("FPS: -") + self.lbl_safe = QLabel("SAFE: 0") + self.lbl_partial = QLabel("PARTIAL: 0") + self.lbl_unsafe = QLabel("UNSAFE: 0") + self.lbl_tracks = QLabel("TRACKS: 0") + + self.lbl_safe.setStyleSheet("color: #27ae60; font-weight: bold; font-size: 14px;") + self.lbl_partial.setStyleSheet("color: #f39c12; font-weight: bold; font-size: 14px;") + self.lbl_unsafe.setStyleSheet("color: #e74c3c; font-weight: bold; font-size: 14px;") + self.lbl_tracks.setStyleSheet("color: #3498db; font-weight: bold; font-size: 14px;") + + stats_lay.addWidget(self.lbl_fps, 0, 0) + stats_lay.addWidget(self.lbl_tracks, 0, 1) + stats_lay.addWidget(self.lbl_safe, 1, 0) + stats_lay.addWidget(self.lbl_partial, 1, 1) + stats_lay.addWidget(self.lbl_unsafe, 2, 0, 1, 2) + left.addWidget(stats_grp) + + left.addStretch() + + # ── Centre: Video feed ──────────────────────────────────────────── + centre = QVBoxLayout() + self.video_label = QLabel("No camera feed") + self.video_label.setAlignment(Qt.AlignmentFlag.AlignCenter) + self.video_label.setStyleSheet( + "background-color: #1a1a2e; color: #666; font-size: 18px; border-radius: 8px;" + ) + self.video_label.setMinimumSize(640, 480) + centre.addWidget(self.video_label) + + # ── Right: Event log ────────────────────────────────────────────── + right = QVBoxLayout() + log_grp = QGroupBox("Event Log") + log_lay = QVBoxLayout(log_grp) + self.event_log = QTextEdit() + self.event_log.setReadOnly(True) + self.event_log.setMaximumWidth(380) + self.event_log.setStyleSheet( + "background-color: #0d1117; color: #c9d1d9; font-family: monospace; font-size: 11px;" + ) + log_lay.addWidget(self.event_log) + + btn_clear = QPushButton("Clear Log") + btn_clear.clicked.connect(self.event_log.clear) + log_lay.addWidget(btn_clear) + + btn_export = QPushButton("Export CSV Report") + btn_export.clicked.connect(self._export_csv) + log_lay.addWidget(btn_export) + + right.addWidget(log_grp) + + # ── Assemble ────────────────────────────────────────────────────── + left_widget = QWidget() + left_widget.setLayout(left) + left_widget.setFixedWidth(260) + + centre_widget = QWidget() + centre_widget.setLayout(centre) + + right_widget = QWidget() + right_widget.setLayout(right) + right_widget.setFixedWidth(380) + + main_layout.addWidget(left_widget) + main_layout.addWidget(centre_widget, stretch=1) + main_layout.addWidget(right_widget) + + # Status bar + self.statusBar().showMessage("Ready - load a model and start detection") + + def _scan_cameras(self): + self.cam_combo.clear() + sources = list_cameras() + self.cam_combo.addItems(sources) + # Set default + idx = self.cam_combo.findText(self._default_source) + if idx >= 0: + self.cam_combo.setCurrentIndex(idx) + elif self.cam_combo.count() > 0: + # Try to add the default as custom + self.cam_combo.addItem(self._default_source) + self.cam_combo.setCurrentIndex(self.cam_combo.count() - 1) + + def _browse_model(self): + path, _ = QFileDialog.getOpenFileName( + self, "Select YOLO Model", str(ROOT / "models"), "Model Files (*.pt)" + ) + if path: + self.model_label.setText(path) + + def _start(self): + model_path = self.model_label.text() + if not Path(model_path).exists(): + # Try relative to ROOT + full = ROOT / model_path + if not full.exists(): + QMessageBox.critical(self, "Error", f"Model not found:\n{model_path}") + return + model_path = str(full) + + source = self.cam_combo.currentText() + + self.worker = DetectionWorker() + self.worker.configure( + model_path=model_path, + source=source, + conf=self.conf_spin.value(), + max_missing=self.missing_spin.value(), + match_dist=self.dist_spin.value(), + status_confirm=self.confirm_spin.value(), + ) + self.worker.frame_ready.connect(self._on_frame) + self.worker.event_fired.connect(self._on_event) + self.worker.stats_updated.connect(self._on_stats) + self.worker.finished.connect(self._on_finished) + self.worker.start() + + self.btn_start.setEnabled(False) + self.btn_stop.setEnabled(True) + self.statusBar().showMessage(f"Running | source={source} | conf={self.conf_spin.value()}") + log.info(f"GUI session started | source={source}") + + def _stop(self): + if self.worker and self.worker.isRunning(): + self.worker.stop() + self.worker.wait(3000) + self.btn_start.setEnabled(True) + self.btn_stop.setEnabled(False) + self.statusBar().showMessage("Stopped") + + @Slot(np.ndarray, list) + def _on_frame(self, frame: np.ndarray, visible: list): + pix = cv_to_qpixmap(frame, self.video_label.width(), self.video_label.height()) + self.video_label.setPixmap(pix) + + @Slot(str) + def _on_event(self, msg: str): + ts = datetime.now().strftime("%H:%M:%S") + color = "#c9d1d9" + if "UNSAFE" in msg: + color = "#f85149" + elif "SAFE" in msg and "UNSAFE" not in msg: + color = "#3fb950" + elif "PARTIAL" in msg: + color = "#d29922" + elif "ERROR" in msg: + color = "#f85149" + + self.event_log.append(f'[{ts}] {msg}') + # Auto-scroll + self.event_log.verticalScrollBar().setValue( + self.event_log.verticalScrollBar().maximum() + ) + + @Slot(dict) + def _on_stats(self, stats: dict): + self.lbl_fps.setText(f"FPS: {stats.get('fps', 0):.1f}") + self.lbl_safe.setText(f"SAFE: {stats.get('SAFE', 0)}") + self.lbl_partial.setText(f"PARTIAL: {stats.get('PARTIAL', 0)}") + self.lbl_unsafe.setText(f"UNSAFE: {stats.get('UNSAFE', 0)}") + self.lbl_tracks.setText(f"TRACKS: {stats.get('tracks', 0)}") + + def _on_finished(self): + self.btn_start.setEnabled(True) + self.btn_stop.setEnabled(False) + self.statusBar().showMessage("Session ended") + + def _export_csv(self): + path, _ = QFileDialog.getSaveFileName( + self, "Export CSV", str(ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"), + "CSV Files (*.csv)" + ) + if path: + from manager import export_csv, load_photos + export_csv(load_photos(), Path(path)) + self._on_event(f"Exported: {path}") + + def closeEvent(self, event): + self._stop() + event.accept() + + +# ── Entry point ─────────────────────────────────────────────────────────────── +def main(): + import argparse + parser = argparse.ArgumentParser(description="Saqr PPE GUI") + parser.add_argument("--model", default="models/saqr_best.pt") + parser.add_argument("--source", default="0") + args = parser.parse_args() + + app = QApplication(sys.argv) + app.setStyle("Fusion") + + # Dark theme + from PySide6.QtGui import QPalette + palette = QPalette() + palette.setColor(QPalette.ColorRole.Window, QColor(30, 30, 46)) + palette.setColor(QPalette.ColorRole.WindowText, QColor(205, 214, 244)) + palette.setColor(QPalette.ColorRole.Base, QColor(24, 24, 37)) + palette.setColor(QPalette.ColorRole.AlternateBase, QColor(30, 30, 46)) + palette.setColor(QPalette.ColorRole.Text, QColor(205, 214, 244)) + palette.setColor(QPalette.ColorRole.Button, QColor(49, 50, 68)) + palette.setColor(QPalette.ColorRole.ButtonText, QColor(205, 214, 244)) + palette.setColor(QPalette.ColorRole.Highlight, QColor(137, 180, 250)) + palette.setColor(QPalette.ColorRole.HighlightedText, QColor(30, 30, 46)) + app.setPalette(palette) + + win = SaqrWindow(default_model=args.model, default_source=args.source) + win.show() + sys.exit(app.exec()) + + +if __name__ == "__main__": + main() diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..63a8a7d --- /dev/null +++ b/logger.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Dict + +_LOGGER_CACHE: Dict[str, logging.Logger] = {} + +_ROOT = Path(__file__).resolve().parent + + +def _load_log_cfg() -> dict: + cfg_path = _ROOT / "Config" / "logging.json" + try: + with open(cfg_path, "r") as f: + return json.load(f) + except Exception: + return {} + + +def _level_from_name(name: str) -> int: + return getattr(logging, str(name).upper(), logging.INFO) + + +def get_logger(category: str, name: str) -> logging.Logger: + """Return a cached logger that writes to Logs//.log.""" + key = f"{category}.{name}" + if key in _LOGGER_CACHE: + return _LOGGER_CACHE[key] + + log_cfg = _load_log_cfg() + + log_dir = _ROOT / "Logs" / category + log_dir.mkdir(parents=True, exist_ok=True) + + logger = logging.getLogger(key) + level_name = ( + log_cfg.get("categories", {}).get(category) + or log_cfg.get("level", "INFO") + ) + logger.setLevel(_level_from_name(level_name)) + logger.propagate = False + + if logger.handlers: + logger.handlers.clear() + + fmt = logging.Formatter( + log_cfg.get("format", "%(asctime)s | %(name)s | %(levelname)s | %(message)s") + ) + + if log_cfg.get("file", True): + fh = logging.FileHandler(log_dir / f"{name}.log", encoding="utf-8") + fh.setFormatter(fmt) + logger.addHandler(fh) + + if log_cfg.get("console", False): + sh = logging.StreamHandler() + sh.setFormatter(fmt) + logger.addHandler(sh) + + _LOGGER_CACHE[key] = logger + return logger diff --git a/manager.py b/manager.py new file mode 100644 index 0000000..6f6aabf --- /dev/null +++ b/manager.py @@ -0,0 +1,437 @@ +""" +Saqr - PPE Detection | Photo Manager +======================================== +Interactive CLI to manage captured PPE photos. + +Features: list, view, move, rename, assign ID, delete, + download/copy, export CSV, update status. + +Usage: + python manager.py # interactive menu + python manager.py --export # quick CSV export +""" + +from __future__ import annotations + +import argparse +import csv +import shutil +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +import cv2 + +from logger import get_logger + +log = get_logger("Manager", "manager") + +ROOT = Path(__file__).parent +CAPTURES_DIR = ROOT / "captures" +STATUSES = ("SAFE", "PARTIAL", "UNSAFE") +IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} + +CLASS_COLUMNS = [ + "boots", "gloves", "goggles", "helmet", + "no-boots", "no-gloves", "no-goggles", "no-helmet", "no-vest", "vest", +] + +# ANSI colours +_C = {"SAFE": "\033[92m", "PARTIAL": "\033[93m", "UNSAFE": "\033[91m", + "BOLD": "\033[1m", "RESET": "\033[0m"} + +def _cs(s): return f"{_C.get(s, '')}{s}{_C['RESET']}" +def _bold(s): return f"{_C['BOLD']}{s}{_C['RESET']}" + + +# ── Data models ─────────────────────────────────────────────────────────────── +@dataclass +class Photo: + path: Path + status: str + filename: str + person_id: str = "" + class_name: str = "unknown" + date_captured: str = "" + + @property + def class_flags(self) -> dict[str, int]: + flags = {c: 0 for c in CLASS_COLUMNS} + stem = self.filename.lower() + for c in CLASS_COLUMNS: + if c in stem: + flags[c] = 1 + return flags + + +@dataclass +class EventRow: + """One row from captures/events.csv (written by saqr.py).""" + timestamp: str + track_id: str + event_type: str + status: str + wearing: str + missing: str + unknown: str + photo: str + path: str + + @property + def class_flags(self) -> dict[str, int]: + worn = {c.strip() for c in self.wearing.split(",") if c.strip()} + return {c: (1 if c in worn else 0) for c in CLASS_COLUMNS} + + @property + def missing_notes(self) -> str: + items = [c.strip() for c in self.missing.split(",") if c.strip()] + return "Missing " + ", ".join(items) if items else "Compliant" + + +# ── Parsing & Loading ───────────────────────────────────────────────────────── +def parse_photo(path: Path, status: str) -> Photo: + stem = path.stem + parts = stem.split("_") + person_id = "" + date_captured = "" + class_name = "unknown" + + # Try to extract track_NNNN format + if stem.startswith("track_") and len(parts) >= 2 and parts[1].isdigit(): + person_id = f"track_{parts[1]}" + elif len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): + try: + date_captured = datetime.strptime( + f"{parts[0]}_{parts[1]}", "%Y%m%d_%H%M%S" + ).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + pass + if len(parts) > 3: + class_name = "_".join(parts[3:]) + + return Photo( + path=path, status=status, filename=path.name, + person_id=person_id, class_name=class_name, date_captured=date_captured, + ) + + +def load_photos() -> list[Photo]: + photos = [] + for status in STATUSES: + folder = CAPTURES_DIR / status + if not folder.exists(): + continue + for f in sorted(folder.iterdir()): + if f.suffix.lower() in IMG_EXTS: + photos.append(parse_photo(f, status)) + return photos + + +def load_events_csv(path: Path) -> list[EventRow]: + if not path.exists(): + return [] + rows = [] + with open(path, "r", newline="", encoding="utf-8") as f: + for row in csv.DictReader(f): + rows.append(EventRow( + timestamp=row.get("timestamp", ""), + track_id=row.get("track_id", ""), + event_type=row.get("event_type", ""), + status=row.get("status", ""), + wearing=row.get("wearing", ""), + missing=row.get("missing", ""), + unknown=row.get("unknown", ""), + photo=row.get("photo", ""), + path=row.get("path", ""), + )) + return rows + + +# ── Core operations ─────────────────────────────────────────────────────────── +def move_photo(photo: Photo, new_status: str) -> Photo: + dst_dir = CAPTURES_DIR / new_status + dst_dir.mkdir(parents=True, exist_ok=True) + dst = dst_dir / photo.filename + shutil.move(str(photo.path), str(dst)) + log.info(f"Moved '{photo.filename}': {photo.status} -> {new_status}") + return Photo(path=dst, status=new_status, filename=photo.filename, + person_id=photo.person_id, class_name=photo.class_name, + date_captured=photo.date_captured) + + +def rename_photo(photo: Photo, new_name: str) -> Photo: + if Path(new_name).suffix.lower() not in IMG_EXTS: + new_name += photo.path.suffix + dst = photo.path.parent / new_name + photo.path.rename(dst) + log.info(f"Renamed: '{photo.filename}' -> '{new_name}'") + return Photo(path=dst, status=photo.status, filename=new_name, + person_id=photo.person_id, class_name=photo.class_name, + date_captured=photo.date_captured) + + +def assign_id(photo: Photo, pid: str) -> Photo: + pid = pid.strip().replace(" ", "_") + dt = datetime.now().strftime("%Y%m%d_%H%M%S") + cls = photo.class_name if photo.class_name != "unknown" else "ppe" + return rename_photo(photo, f"{pid}_{dt}_{cls}{photo.path.suffix}") + + +def delete_photo(photo: Photo) -> None: + photo.path.unlink() + log.info(f"Deleted: '{photo.filename}' ({photo.status})") + + +def copy_photo(photo: Photo, dest: Path) -> Path: + dest.mkdir(parents=True, exist_ok=True) + dst = dest / photo.filename + shutil.copy2(str(photo.path), str(dst)) + log.info(f"Copied '{photo.filename}' -> {dst}") + return dst + + +def export_csv(photos: list[Photo], output: Path) -> None: + event_rows = load_events_csv(CAPTURES_DIR / "events.csv") + + fields = ["photo", "track_id", "event_type", "status", "timestamp", + "wearing", "missing", "unknown", "missing_notes", + *CLASS_COLUMNS, "path"] + + with open(output, "w", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=fields) + w.writeheader() + + if event_rows: + for r in event_rows: + w.writerow({ + "photo": r.photo, "track_id": r.track_id, + "event_type": r.event_type, "status": r.status, + "timestamp": r.timestamp, "wearing": r.wearing, + "missing": r.missing, "unknown": r.unknown, + "missing_notes": r.missing_notes, + **r.class_flags, "path": r.path, + }) + else: + for p in photos: + w.writerow({ + "photo": p.filename, "track_id": p.person_id, + "event_type": "", "status": p.status, + "timestamp": p.date_captured, "wearing": "", + "missing": "", "unknown": "", "missing_notes": "", + **p.class_flags, "path": str(p.path), + }) + + count = len(event_rows) if event_rows else len(photos) + log.info(f"CSV exported: {output} ({count} records)") + + +# ── Display ─────────────────────────────────────────────────────────────────── +def print_header(photos): + counts = {s: sum(1 for p in photos if p.status == s) for s in STATUSES} + print("\n" + "=" * 66) + print(_bold(" Saqr - PPE Photo Manager")) + print("=" * 66) + print(f" {_cs('SAFE')} {counts['SAFE']:3d} | " + f"{_cs('PARTIAL')} {counts['PARTIAL']:3d} | " + f"{_cs('UNSAFE')} {counts['UNSAFE']:3d} | Total: {len(photos)}") + print("=" * 66) + + +def print_table(photos): + print(f"\n {'#':>4} {'STATUS':<8} {'ID':<14} {'DATE':<19} FILENAME") + print(" " + "-" * 68) + for i, p in enumerate(photos): + pid = (p.person_id or "-")[:12] + date = (p.date_captured or "-")[:17] + print(f" {i+1:>4} {p.status:<8} {pid:<14} {date:<19} {p.filename[:28]}") + + +def pick_photo(photos, prompt="Select photo") -> Photo | None: + if not photos: + print(" No photos found.") + return None + print_table(photos) + try: + n = int(input(f"\n {prompt} (0=cancel): ")) + if 1 <= n <= len(photos): + return photos[n - 1] + except ValueError: + pass + return None + + +def show_details(photo): + print(f"\n Filename : {photo.filename}") + print(f" Status : {_cs(photo.status)}") + print(f" ID : {photo.person_id or '-'}") + print(f" Date : {photo.date_captured or '-'}") + print(f" Path : {photo.path}") + + +# ── Menu actions ────────────────────────────────────────────────────────────── +def act_list(photos): + print("\n Filter: [1] All [2] SAFE [3] PARTIAL [4] UNSAFE") + ch = input(" Choice: ").strip() + filt = {"2": "SAFE", "3": "PARTIAL", "4": "UNSAFE"}.get(ch) + sub = [p for p in photos if filt is None or p.status == filt] + if sub: + print_table(sub) + print(f"\n Showing {len(sub)} photo(s).") + else: + print(" None.") + + +def act_view(photos): + p = pick_photo(photos, "View photo") + if not p: + return + show_details(p) + img = cv2.imread(str(p.path)) + if img is not None: + cv2.imshow(f"Saqr - {p.filename}", img) + print(" Press any key to close.") + cv2.waitKey(0) + cv2.destroyAllWindows() + + +def act_move(photos): + p = pick_photo(photos, "Move photo") + if not p: + return photos + show_details(p) + print(f"\n Move to: [1] SAFE [2] PARTIAL [3] UNSAFE") + t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip()) + if not t or t == p.status: + return photos + move_photo(p, t) + print(f" Moved -> {_cs(t)}") + return load_photos() + + +def act_rename(photos): + p = pick_photo(photos, "Rename photo") + if not p: + return photos + name = input(" New filename: ").strip() + if name: + rename_photo(p, name) + return load_photos() + + +def act_assign_id(photos): + p = pick_photo(photos, "Assign ID to photo") + if not p: + return photos + pid = input(" Person ID (e.g. W001): ").strip() + if pid: + assign_id(p, pid) + return load_photos() + + +def act_delete(photos): + p = pick_photo(photos, "Delete photo") + if not p: + return photos + show_details(p) + if input(f" Delete '{p.filename}'? (yes/no): ").strip().lower() in ("y", "yes"): + delete_photo(p) + print(" Deleted.") + return load_photos() + + +def act_download(photos): + p = pick_photo(photos, "Download/copy photo") + if not p: + return + dest = input(" Destination folder: ").strip() + if dest: + dst = copy_photo(p, Path(dest).expanduser()) + print(f" Copied -> {dst}") + + +def act_export(photos): + default = ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + out = input(f" Output [{default.name}]: ").strip() + output = Path(out).expanduser() if out else default + export_csv(photos, output) + print(f" Exported -> {output}") + + +def act_update(photos): + print("\n Re-classify: when PPE compliance changes (UNSAFE -> SAFE etc.)") + p = pick_photo(photos, "Update photo status") + if not p: + return photos + show_details(p) + print(f"\n New status: [1] SAFE [2] PARTIAL [3] UNSAFE") + t = {"1": "SAFE", "2": "PARTIAL", "3": "UNSAFE"}.get(input(" Choice: ").strip()) + if not t or t == p.status: + return photos + move_photo(p, t) + print(f" Updated -> {_cs(t)}") + return load_photos() + + +# ── Main menu ───────────────────────────────────────────────────────────────── +MENU = """ + [1] List photos + [2] View photo + [3] Move photo (change status) + [4] Rename photo + [5] Assign person ID + [6] Delete photo + [7] Download / Copy photo + [8] Export report to CSV + [9] Update status (re-classify) + [0] Exit +""" + + +def run(): + parser = argparse.ArgumentParser(description="Saqr Photo Manager") + parser.add_argument("--export", action="store_true", help="Quick CSV export") + args = parser.parse_args() + + if not CAPTURES_DIR.exists(): + print(f"[ERROR] captures/ not found. Run saqr.py first.") + raise SystemExit(1) + + photos = load_photos() + + if args.export: + out = ROOT / f"ppe_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + export_csv(photos, out) + print(f"Exported: {out}") + return + + log.info("Manager started") + actions = { + "1": act_list, "2": act_view, "3": act_move, "4": act_rename, + "5": act_assign_id, "6": act_delete, "7": act_download, + "8": act_export, "9": act_update, + } + + while True: + photos = load_photos() + print_header(photos) + print(MENU) + ch = input(" Choice: ").strip() + + if ch == "0": + log.info("Manager ended") + print(" Bye.\n") + break + + action = actions.get(ch) + if action: + result = action(photos) + if isinstance(result, list): + photos = result + else: + print(" Unknown option.") + + input("\n Press Enter to continue...") + + +if __name__ == "__main__": + run() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ad1d250 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +ultralytics>=8.0.0 +opencv-python +numpy +PyYAML +PySide6>=6.5.0 diff --git a/run_local.sh b/run_local.sh new file mode 100755 index 0000000..feb2803 --- /dev/null +++ b/run_local.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# ============================================================================ +# Saqr PPE Detection - Run on Local Laptop +# ============================================================================ +# +# Usage: +# ./run_local.sh # webcam 0 +# ./run_local.sh --source 1 # webcam 1 +# ./run_local.sh --source video.mp4 # video file +# ./run_local.sh --gui # PySide6 GUI +# ./run_local.sh --detect # simple detection (no tracking) +# +# ============================================================================ + +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +cd "$SCRIPT_DIR" + +# ── Defaults ────────────────────────────────────────────────────────────────── +SOURCE="0" +MODEL="models/saqr_best.pt" +CONF="0.35" +MODE="saqr" # saqr | gui | detect +HEADLESS=false +MAX_MISSING=90 +MATCH_DIST=250 +CONFIRM=5 + +# ── Parse args ──────────────────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case $1 in + --source) SOURCE="$2"; shift 2 ;; + --model) MODEL="$2"; shift 2 ;; + --conf) CONF="$2"; shift 2 ;; + --gui) MODE="gui"; shift ;; + --detect) MODE="detect"; shift ;; + --headless) HEADLESS=true; shift ;; + --max-missing) MAX_MISSING="$2"; shift 2 ;; + --match-distance) MATCH_DIST="$2"; shift 2 ;; + --confirm) CONFIRM="$2"; shift 2 ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +# ── Check model ─────────────────────────────────────────────────────────────── +if [ ! -f "$MODEL" ]; then + echo "[ERROR] Model not found: $MODEL" + echo "" + echo " Train first:" + echo " conda activate AI_MSI_yolo" + echo " python train.py --dataset dataset --epochs 100" + echo "" + echo " Or specify a different model:" + echo " ./run_local.sh --model /path/to/model.pt" + exit 1 +fi + +# ── Activate conda ──────────────────────────────────────────────────────────── +if command -v conda &>/dev/null; then + source "$(conda info --base)/etc/profile.d/conda.sh" 2>/dev/null || true + conda activate AI_MSI_yolo 2>/dev/null || true +fi + +echo "============================================" +echo " Saqr PPE Detection - Local Laptop" +echo "============================================" +echo " Mode : $MODE" +echo " Source : $SOURCE" +echo " Model : $MODEL" +echo " Conf : $CONF" +echo "============================================" +echo "" + +# ── Run ─────────────────────────────────────────────────────────────────────── +HEADLESS_FLAG="" +if [ "$HEADLESS" = true ]; then + HEADLESS_FLAG="--headless" +fi + +case $MODE in + saqr) + echo "Starting PPE tracking..." + echo " Press q to quit, s to save frame." + echo "" + python saqr.py \ + --source "$SOURCE" \ + --model "$MODEL" \ + --conf "$CONF" \ + --max-missing "$MAX_MISSING" \ + --match-distance "$MATCH_DIST" \ + --status-confirm-frames "$CONFIRM" \ + $HEADLESS_FLAG + ;; + gui) + echo "Starting GUI..." + python gui.py \ + --source "$SOURCE" \ + --model "$MODEL" + ;; + detect) + echo "Starting simple detection (no tracking)..." + echo " Press q to quit, s to save frame." + echo "" + python detect.py \ + --source "$SOURCE" \ + --model "$MODEL" \ + --conf "$CONF" + ;; +esac diff --git a/run_robot.sh b/run_robot.sh new file mode 100755 index 0000000..e4741aa --- /dev/null +++ b/run_robot.sh @@ -0,0 +1,116 @@ +#!/bin/bash +# ============================================================================ +# Saqr PPE Detection - Run on Unitree G1 Robot +# ============================================================================ +# +# Run on the robot's physical terminal (with monitor): +# ./run_robot.sh +# ./run_robot.sh --headless # no display +# ./run_robot.sh --source realsense # use pyrealsense2 SDK +# +# ============================================================================ + +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +cd "$SCRIPT_DIR" + +# ── Defaults ────────────────────────────────────────────────────────────────── +SOURCE="realsense" +MODEL="models/saqr_best.pt" +CONF="0.35" +HEADLESS=false +MAX_MISSING=120 +MATCH_DIST=300 +CONFIRM=7 +DEVICE="0" +IMGSZ=320 +HALF=true +STREAM_PORT=0 + +# ── Parse args ──────────────────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case $1 in + --source) SOURCE="$2"; shift 2 ;; + --model) MODEL="$2"; shift 2 ;; + --conf) CONF="$2"; shift 2 ;; + --headless) HEADLESS=true; shift ;; + --max-missing) MAX_MISSING="$2"; shift 2 ;; + --match-distance) MATCH_DIST="$2"; shift 2 ;; + --confirm) CONFIRM="$2"; shift 2 ;; + --device) DEVICE="$2"; shift 2 ;; + --imgsz) IMGSZ="$2"; shift 2 ;; + --no-half) HALF=false; shift ;; + --stream) STREAM_PORT="$2"; shift 2 ;; + --cpu) DEVICE="cpu"; HALF=false; shift ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +# ── Check model ─────────────────────────────────────────────────────────────── +if [ ! -f "$MODEL" ]; then + echo "[ERROR] Model not found: $MODEL" + echo " Deploy from dev machine: ./deploy.sh" + exit 1 +fi + +# ── Activate conda ──────────────────────────────────────────────────────────── +source ~/miniconda3/etc/profile.d/conda.sh 2>/dev/null || true +conda activate teleimager 2>/dev/null || true + +# ── Fix clock if needed ────────────────────────────────────────────────────── +YEAR=$(date +%Y) +if [ "$YEAR" -lt 2025 ]; then + echo "[WARN] System clock is wrong (year=$YEAR). Fixing..." + echo "123" | sudo -S date -s "2026-04-10 16:00:00" 2>/dev/null || true +fi + +# ── Setup display ───────────────────────────────────────────────────────────── +if [ "$HEADLESS" = true ]; then + export QT_QPA_PLATFORM=offscreen + HEADLESS_FLAG="--headless" + echo "Mode: HEADLESS (no display, results saved to captures/)" +else + xhost + >/dev/null 2>&1 || true + export DISPLAY=:0 + HEADLESS_FLAG="" + echo "Mode: DISPLAY (OpenCV window on monitor)" +fi + +HALF_FLAG="" +if [ "$HALF" = true ]; then + HALF_FLAG="--half" +fi + +STREAM_FLAG="" +if [ "$STREAM_PORT" -gt 0 ]; then + STREAM_FLAG="--stream $STREAM_PORT" +fi + +echo "============================================" +echo " Saqr PPE Detection - Unitree G1 Robot" +echo "============================================" +echo " Source : $SOURCE" +echo " Model : $MODEL" +echo " Device : $DEVICE (half=$HALF, imgsz=$IMGSZ)" +echo " Conf : $CONF" +echo " Stream : ${STREAM_PORT:-disabled}" +echo " Camera : RealSense D435I" +echo "============================================" +echo "" +echo " Press q to quit, s to save frame." +echo "" + +# ── Run ─────────────────────────────────────────────────────────────────────── +python saqr.py \ + --source "$SOURCE" \ + --model "$MODEL" \ + --conf "$CONF" \ + --max-missing "$MAX_MISSING" \ + --match-distance "$MATCH_DIST" \ + --status-confirm-frames "$CONFIRM" \ + --device "$DEVICE" \ + --imgsz "$IMGSZ" \ + $HALF_FLAG \ + $STREAM_FLAG \ + $HEADLESS_FLAG diff --git a/saqr.py b/saqr.py new file mode 100644 index 0000000..52b0d87 --- /dev/null +++ b/saqr.py @@ -0,0 +1,909 @@ +""" +Saqr - PPE Safety Tracking +=========================== +Real-time PPE monitoring with person tracking. + +Pipeline: + 1. YOLO detection -> PPE bounding boxes (helmet, no-helmet, vest, ...) + 2. Heuristic grouping -> cluster nearby PPE boxes into person candidates + 3. Person tracker -> assign stable IDs across frames + 4. Compliance check -> SAFE / PARTIAL / UNSAFE per person + 5. Auto-capture -> save latest crop per tracked person + 6. CSV logging -> result.csv (current state) + events.csv (audit log) + +Compliance rules (helmet + vest focus): + SAFE = helmet AND vest detected, no violations + PARTIAL = only one of helmet / vest detected + UNSAFE = no-helmet or no-vest detected, or nothing detected + +Usage: + python saqr.py --source 0 # webcam (OpenCV) + python saqr.py --source realsense # Intel RealSense D435I + python saqr.py --source 1 --model models/saqr_best.pt + python saqr.py --source video.mp4 --headless +""" + +from __future__ import annotations + +import argparse +import csv +import math +import shutil +import time +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import threading +from http.server import HTTPServer, BaseHTTPRequestHandler + +import cv2 +import numpy as np +from ultralytics import YOLO + +from logger import get_logger + +# Optional RealSense support +try: + import pyrealsense2 as rs + HAS_REALSENSE = True +except ImportError: + HAS_REALSENSE = False + +log = get_logger("Inference", "saqr") + +# ── Paths ───────────────────────────────────────────────────────────────────── +ROOT = Path(__file__).resolve().parent +CAPTURES_DIR = ROOT / "captures" +RESULT_CSV = CAPTURES_DIR / "result.csv" +EVENTS_CSV = CAPTURES_DIR / "events.csv" + +# ── Colours ─────────────────────────────────────────────────────────────────── +GREEN = (0, 200, 0) +YELLOW = (0, 200, 255) +RED = (0, 0, 220) +WHITE = (255, 255, 255) +BLACK = (0, 0, 0) +GRAY = (120, 120, 120) +CYAN = (200, 200, 0) + +# ── PPE class definitions ──────────────────────────────────────────────────── +STATUSES = ("SAFE", "PARTIAL", "UNSAFE") + +CLASS_ORDER = [ + "boots", "gloves", "goggles", "helmet", + "no-boots", "no-gloves", "no-goggles", "no-helmet", "no-vest", "vest", +] +PPE_SET = set(CLASS_ORDER) + +# Positive -> Negative mapping +POSITIVE_TO_NEGATIVE = { + "helmet": "no-helmet", + "vest": "no-vest", + "boots": "no-boots", + "gloves": "no-gloves", + "goggles": "no-goggles", +} +PPE_DISPLAY_ORDER = ["helmet", "vest", "gloves", "goggles", "boots"] + + +# ── Data classes ────────────────────────────────────────────────────────────── +@dataclass +class PPEItem: + label: str + conf: float + bbox: Tuple[int, int, int, int] # x1, y1, x2, y2 + + +@dataclass +class PersonCandidate: + bbox: Tuple[int, int, int, int] + items: Dict[str, float] # label -> best confidence + detections: List[PPEItem] = field(default_factory=list) + + +@dataclass +class Track: + track_id: int + bbox: Tuple[int, int, int, int] + items: Dict[str, float] + status: str + last_seen_frame: int = 0 + last_seen_iso: str = "" + created_iso: str = "" + frames_missing: int = 0 + photo_path: Optional[Path] = None + announced_status: Optional[str] = None + event_count: int = 0 + pending_status: Optional[str] = None + pending_count: int = 0 + + +# ── Utilities ───────────────────────────────────────────────────────────────── +def now_iso() -> str: + return datetime.now().isoformat(timespec="seconds") + + +def clamp_bbox(bbox, w, h): + x1, y1, x2, y2 = bbox + return max(0, x1), max(0, y1), min(w, x2), min(h, y2) + + +def expand_bbox(bbox, w, h, sx=0.8, sy=1.5): + x1, y1, x2, y2 = bbox + bw, bh = x2 - x1, y2 - y1 + cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 + nw, nh = int(bw * (1 + sx)), int(bh * (1 + sy)) + nx1 = max(0, cx - nw // 2) + ny1 = max(0, cy - nh // 2) + return nx1, ny1, min(w, nx1 + nw), min(h, ny1 + nh) + + +def merge_boxes(a, b): + return (min(a[0], b[0]), min(a[1], b[1]), max(a[2], b[2]), max(a[3], b[3])) + + +def box_center(bbox): + return ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) + + +def box_distance(a, b) -> float: + ca, cb = box_center(a), box_center(b) + return math.hypot(ca[0] - cb[0], ca[1] - cb[1]) + + +def resolve_model_path(root: Path, model_arg: str) -> Path: + """Find model weights with fallback: arg -> root/arg -> models/arg.""" + p = Path(model_arg) + if p.exists(): + return p + p = root / model_arg + if p.exists(): + return p + p = root / "models" / Path(model_arg).name + if p.exists(): + return p + raise FileNotFoundError(f"Model not found: {model_arg}") + + +# ── Detection ───────────────────────────────────────────────────────────────── +# Global inference config (set by main(), read by collect_detections) +_INFER_KWARGS: Dict = {"device": "cpu", "half": False, "imgsz": 640} + + +def collect_detections(frame, model: YOLO, conf: float) -> List[PPEItem]: + """Run YOLO and return only PPE-class detections.""" + results = model(frame, conf=conf, verbose=False, **_INFER_KWARGS)[0] + items = [] + for box in results.boxes: + cls_id = int(box.cls) + label = model.names[cls_id] + if label not in PPE_SET: + continue + x1, y1, x2, y2 = map(int, box.xyxy[0]) + items.append(PPEItem(label=label, conf=float(box.conf), bbox=(x1, y1, x2, y2))) + return items + + +# ── Grouping: PPE items -> Person candidates ───────────────────────────────── +def should_merge(candidate: PersonCandidate, item: PPEItem) -> bool: + """Heuristic: is this PPE item close enough to belong to the candidate?""" + cx1, cy1, cx2, cy2 = candidate.bbox + ix1, iy1, ix2, iy2 = item.bbox + cw, ch = cx2 - cx1, cy2 - cy1 + iw, ih = ix2 - ix1, iy2 - iy1 + + cxc, cyc = (cx1 + cx2) / 2, (cy1 + cy2) / 2 + ixc, iyc = (ix1 + ix2) / 2, (iy1 + iy2) / 2 + + max_dx = max(cw, iw) * 1.2 + 40 + max_dy = max(ch, ih) * 1.8 + 50 + + return abs(ixc - cxc) <= max_dx and abs(iyc - cyc) <= max_dy + + +def group_detections_to_people(detections: List[PPEItem], w: int, h: int) -> List[PersonCandidate]: + """Cluster PPE detections into person candidates (2-pass merge).""" + if not detections: + return [] + + # Pass 1: greedy grouping + candidates: List[PersonCandidate] = [] + for item in detections: + merged = False + for cand in candidates: + if should_merge(cand, item): + cand.bbox = merge_boxes(cand.bbox, item.bbox) + cand.items[item.label] = max(cand.items.get(item.label, 0.0), item.conf) + cand.detections.append(item) + merged = True + break + if not merged: + candidates.append(PersonCandidate( + bbox=item.bbox, + items={item.label: item.conf}, + detections=[item], + )) + + # Pass 2: merge nearby candidates + again = True + while again: + again = False + merged_list: List[PersonCandidate] = [] + for person in candidates: + matched = False + for prev in merged_list: + pw = prev.bbox[2] - prev.bbox[0] + ph = prev.bbox[3] - prev.bbox[1] + dist = box_distance(prev.bbox, person.bbox) + th = max(pw, ph) * 0.55 + if dist <= th: + prev.bbox = merge_boxes(prev.bbox, person.bbox) + for label, conf in person.items.items(): + prev.items[label] = max(prev.items.get(label, 0.0), conf) + prev.detections.extend(person.detections) + again = True + matched = True + break + if not matched: + merged_list.append(person) + candidates = merged_list + + # Expand each person bbox for better crop coverage + for cand in candidates: + cand.bbox = expand_bbox(cand.bbox, w, h) + + return candidates + + +# ── Status logic (helmet + vest focus) ──────────────────────────────────────── +def status_from_items(items: Dict[str, float]) -> str: + has_helmet = items.get("helmet", 0.0) > items.get("no-helmet", 0.0) and items.get("helmet", 0.0) > 0 + has_vest = items.get("vest", 0.0) > items.get("no-vest", 0.0) and items.get("vest", 0.0) > 0 + no_helmet = items.get("no-helmet", 0.0) > 0 + no_vest = items.get("no-vest", 0.0) > 0 + + if no_helmet or no_vest: + return "UNSAFE" + if has_helmet and has_vest: + return "SAFE" + if has_helmet or has_vest: + return "PARTIAL" + return "UNSAFE" + + +def split_wearing_missing(items: Dict[str, float]) -> Tuple[List[str], List[str], List[str]]: + wearing, missing, unknown = [], [], [] + for pos in PPE_DISPLAY_ORDER: + neg = POSITIVE_TO_NEGATIVE[pos] + pos_conf = items.get(pos, 0.0) + neg_conf = items.get(neg, 0.0) + if pos_conf > neg_conf and pos_conf > 0: + wearing.append(pos) + elif neg_conf >= pos_conf and neg_conf > 0: + missing.append(pos) + else: + unknown.append(pos) + return wearing, missing, unknown + + +# ── CSV Writers ─────────────────────────────────────────────────────────────── +class EventLogger: + FIELDS = ["timestamp", "track_id", "event_type", "status", + "wearing", "missing", "unknown", "photo", "path"] + + def __init__(self, path: Path): + self.path = path + self.path.parent.mkdir(parents=True, exist_ok=True) + if not self.path.exists(): + with open(self.path, "w", newline="", encoding="utf-8") as f: + csv.DictWriter(f, fieldnames=self.FIELDS).writeheader() + + def append(self, row: Dict[str, str]) -> None: + with open(self.path, "a", newline="", encoding="utf-8") as f: + csv.DictWriter(f, fieldnames=self.FIELDS).writerow(row) + + +def write_result_csv(tracks: List[Track], output: Path) -> None: + output.parent.mkdir(parents=True, exist_ok=True) + fields = ["photo", "track_id", "status", "last_seen", + "wearing", "missing", "unknown", *CLASS_ORDER, "path"] + rows = [] + for track in sorted(tracks, key=lambda t: t.track_id): + wearing, missing, unknown = split_wearing_missing(track.items) + row = { + "photo": track.photo_path.name if track.photo_path else "", + "track_id": track.track_id, + "status": track.status, + "last_seen": track.last_seen_iso, + "wearing": ", ".join(wearing), + "missing": ", ".join(missing), + "unknown": ", ".join(unknown), + "path": str(track.photo_path) if track.photo_path else "", + } + for cls in CLASS_ORDER: + row[cls] = 1 if track.items.get(cls, 0.0) > 0 else 0 + rows.append(row) + + with open(output, "w", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=fields) + w.writeheader() + w.writerows(rows) + + +# ── Person Tracker ──────────────────────────────────────────────────────────── +class PersonTracker: + def __init__( + self, + event_logger: EventLogger, + max_missing: int = 90, + match_distance: float = 250.0, + status_confirm_frames: int = 5, + ): + self.event_logger = event_logger + self.max_missing = max_missing + self.match_distance = match_distance + self.status_confirm_frames = max(1, status_confirm_frames) + self.tracks: Dict[int, Track] = {} + self.next_id = 1 + + def _new_track(self, person: PersonCandidate, frame_idx: int) -> Track: + track = Track( + track_id=self.next_id, + bbox=person.bbox, + items=dict(person.items), + status=status_from_items(person.items), + last_seen_frame=frame_idx, + last_seen_iso=now_iso(), + created_iso=now_iso(), + ) + self.next_id += 1 + self.tracks[track.track_id] = track + return track + + def _match(self, person: PersonCandidate, used: set[int]) -> Optional[Track]: + best, best_dist = None, float("inf") + for tid, track in self.tracks.items(): + if tid in used: + continue + dist = box_distance(track.bbox, person.bbox) + if dist < best_dist and dist <= self.match_distance: + best_dist = dist + best = track + return best + + def update(self, people: List[PersonCandidate], frame_idx: int): + used: set[int] = set() + created: List[Track] = [] + changed: List[Track] = [] + + for person in people: + track = self._match(person, used) + if track is None: + track = self._new_track(person, frame_idx) + created.append(track) + else: + new_status = status_from_items(person.items) + track.bbox = person.bbox + track.items = dict(person.items) + track.last_seen_frame = frame_idx + track.last_seen_iso = now_iso() + track.frames_missing = 0 + + if new_status != track.status: + if track.pending_status == new_status: + track.pending_count += 1 + else: + track.pending_status = new_status + track.pending_count = 1 + if track.pending_count >= self.status_confirm_frames: + track.status = new_status + track.pending_status = None + track.pending_count = 0 + changed.append(track) + else: + track.pending_status = None + track.pending_count = 0 + + used.add(track.track_id) + + # Age and prune missing tracks + stale = [] + for tid, track in self.tracks.items(): + if tid not in used: + track.frames_missing += 1 + if track.frames_missing > self.max_missing: + stale.append(tid) + for tid in stale: + del self.tracks[tid] + + return created, changed + + def visible_tracks(self) -> List[Track]: + return [t for t in self.tracks.values() if t.frames_missing == 0] + + +# ── Track image + event ─────────────────────────────────────────────────────── +def save_track_image(frame, track: Track, capture_dirs: Dict[str, Path]) -> Optional[Path]: + h, w = frame.shape[:2] + x1, y1, x2, y2 = clamp_bbox(track.bbox, w, h) + if x2 <= x1 or y2 <= y1: + return None + crop = frame[y1:y2, x1:x2] + if crop.size == 0: + return None + + target = capture_dirs[track.status] / f"track_{track.track_id:04d}.jpg" + # Move old image if status folder changed + if track.photo_path and track.photo_path != target and track.photo_path.exists(): + try: + track.photo_path.unlink() + except OSError: + pass + + cv2.imwrite(str(target), crop) + track.photo_path = target + return target + + +def emit_event( + track: Track, + event_logger: EventLogger, + event_type: str = "STATUS_CHANGE", + force: bool = False, +) -> None: + if track.photo_path is None: + return + if not force and track.announced_status == track.status: + return + + wearing, missing, unknown = split_wearing_missing(track.items) + msg = ( + f"ID {track.track_id:04d} | {event_type} | {track.status} | " + f"wearing: {', '.join(wearing) or 'none'} | " + f"missing: {', '.join(missing) or 'none'} | " + f"unknown: {', '.join(unknown) or 'none'}" + ) + print(msg, flush=True) + + event_logger.append({ + "timestamp": now_iso(), + "track_id": str(track.track_id), + "event_type": event_type, + "status": track.status, + "wearing": ", ".join(wearing), + "missing": ", ".join(missing), + "unknown": ", ".join(unknown), + "photo": track.photo_path.name if track.photo_path else "", + "path": str(track.photo_path) if track.photo_path else "", + }) + track.announced_status = track.status + track.event_count += 1 + + +# ── Drawing ─────────────────────────────────────────────────────────────────── +def status_color(status: str) -> Tuple: + return {"SAFE": GREEN, "PARTIAL": YELLOW, "UNSAFE": RED}.get(status, GRAY) + + +def draw_track(frame, track: Track): + x1, y1, x2, y2 = track.bbox + color = status_color(track.status) + cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) + + wearing, missing, unknown = split_wearing_missing(track.items) + line1 = f"ID {track.track_id:04d} {track.status}" + w_str = ", ".join(wearing) if wearing else "none" + m_str = ", ".join(missing) if missing else "-" + line2 = f"W:{w_str} M:{m_str}" + + (tw1, th1), _ = cv2.getTextSize(line1, cv2.FONT_HERSHEY_SIMPLEX, 0.55, 1) + (tw2, th2), _ = cv2.getTextSize(line2, cv2.FONT_HERSHEY_SIMPLEX, 0.40, 1) + tw = max(tw1, tw2) + 8 + total_h = th1 + th2 + 12 + y_top = max(0, y1 - total_h - 2) + + cv2.rectangle(frame, (x1, y_top), (x1 + tw, y1), color, -1) + cv2.putText(frame, line1, (x1 + 4, y_top + th1 + 2), + cv2.FONT_HERSHEY_SIMPLEX, 0.55, WHITE, 1, cv2.LINE_AA) + cv2.putText(frame, line2, (x1 + 4, y_top + th1 + th2 + 8), + cv2.FONT_HERSHEY_SIMPLEX, 0.40, WHITE, 1, cv2.LINE_AA) + + +def draw_counters(frame, tracks: List[Track], fps: float): + counts = {s: 0 for s in STATUSES} + for t in tracks: + counts[t.status] += 1 + + lines = [ + (f"FPS: {fps:.1f}", WHITE), + (f"SAFE {counts['SAFE']}", GREEN), + (f"PARTIAL {counts['PARTIAL']}", YELLOW), + (f"UNSAFE {counts['UNSAFE']}", RED), + (f"TRACKS {len(tracks)}", CYAN), + ] + y = 24 + for text, color in lines: + cv2.putText(frame, text, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, BLACK, 4, cv2.LINE_AA) + cv2.putText(frame, text, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2, cv2.LINE_AA) + y += 28 + + +# ── Frame processing ────────────────────────────────────────────────────────── +def process_frame( + frame, + model: YOLO, + tracker: PersonTracker, + frame_idx: int, + conf: float, + capture_dirs: Dict[str, Path], + write_csv: bool = True, +): + annotated = frame.copy() + h, w = annotated.shape[:2] + + detections = collect_detections(frame, model, conf) + candidates = group_detections_to_people(detections, w, h) + created, changed = tracker.update(candidates, frame_idx) + visible = tracker.visible_tracks() + + created_ids = {t.track_id for t in created} + changed_ids = {t.track_id for t in changed} + event_ids = created_ids | changed_ids + + for track in visible: + save_track_image(frame, track, capture_dirs) + if track.track_id in event_ids: + ev_type = "NEW" if track.track_id in created_ids else "STATUS_CHANGE" + emit_event(track, tracker.event_logger, ev_type) + draw_track(annotated, track) + + if write_csv: + write_result_csv(list(tracker.tracks.values()), RESULT_CSV) + + return annotated, visible + + +# ── MJPEG Stream Server (view on laptop browser) ───────────────────────────── +_stream_frame: Optional[bytes] = None +_stream_lock = threading.Lock() + + +class MJPEGHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/": + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write(b'' + b'' + b'') + elif self.path == "/stream": + self.send_response(200) + self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=frame") + self.end_headers() + while True: + with _stream_lock: + jpeg = _stream_frame + if jpeg is None: + time.sleep(0.03) + continue + try: + self.wfile.write(b"--frame\r\n" + b"Content-Type: image/jpeg\r\n\r\n" + jpeg + b"\r\n") + except BrokenPipeError: + break + else: + self.send_error(404) + + def log_message(self, format, *args): + pass # silence per-request logs + + +def start_stream_server(port: int = 8080): + server = HTTPServer(("0.0.0.0", port), MJPEGHandler) + t = threading.Thread(target=server.serve_forever, daemon=True) + t.start() + log.info(f"MJPEG stream server started on http://0.0.0.0:{port}") + return server + + +def update_stream_frame(frame): + global _stream_frame + _, jpeg = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) + with _stream_lock: + _stream_frame = jpeg.tobytes() + + +# ── Camera / video ──────────────────────────────────────────────────────────── +class RealSenseCapture: + """Wraps pyrealsense2 pipeline with an OpenCV-like read() interface.""" + + def __init__(self, width: int = 640, height: int = 480, fps: int = 30, + serial: Optional[str] = None): + if not HAS_REALSENSE: + raise RuntimeError("pyrealsense2 not installed") + self.pipeline = rs.pipeline() + cfg = rs.config() + if serial: + cfg.enable_device(serial) + cfg.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps) + self.profile = self.pipeline.start(cfg) + self._open = True + dev = self.profile.get_device() + log.info(f"RealSense opened | {dev.get_info(rs.camera_info.name)} " + f"serial={dev.get_info(rs.camera_info.serial_number)} " + f"{width}x{height}@{fps}") + + def isOpened(self) -> bool: + return self._open + + def read(self): + if not self._open: + return False, None + try: + frames = self.pipeline.wait_for_frames(timeout_ms=3000) + color = frames.get_color_frame() + if not color: + return False, None + return True, np.asanyarray(color.get_data()) + except Exception: + return False, None + + def release(self): + if self._open: + self.pipeline.stop() + self._open = False + + +def open_capture(source: str): + # RealSense source: "realsense" or "realsense:SERIAL" + if source.lower().startswith("realsense"): + serial = None + if ":" in source: + serial = source.split(":", 1)[1] + return RealSenseCapture(width=640, height=480, fps=30, serial=serial) + + if str(source).isdigit(): + idx = int(source) + cap = cv2.VideoCapture(idx) + if cap.isOpened(): + return cap + cap = cv2.VideoCapture(idx, cv2.CAP_ANY) + if cap.isOpened(): + return cap + cap = cv2.VideoCapture(idx, cv2.CAP_V4L2) + return cap + + # V4L2 device path + if source.startswith("/dev/video"): + cap = cv2.VideoCapture(source, cv2.CAP_V4L2) + if cap.isOpened(): + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) + cap.set(cv2.CAP_PROP_FPS, 30) + return cap + + return cv2.VideoCapture(source) + + +def setup_capture_dirs(base: Path) -> Dict[str, Path]: + dirs = {} + for s in STATUSES: + d = base / "captures" / s + d.mkdir(parents=True, exist_ok=True) + dirs[s] = d + return dirs + + +def run_video( + model: YOLO, + source: str, + conf: float, + capture_dirs: Dict[str, Path], + show_gui: bool, + csv_every_frame: bool, + max_missing: int, + match_distance: float, + status_confirm_frames: int, + stream_port: int = 0, +) -> None: + cap = open_capture(source) + if not cap.isOpened(): + log.error(f"Cannot open source: {source}") + return + + ok, first = cap.read() + if not ok or first is None or first.size == 0: + log.error(f"Cannot read first frame from source: {source}") + cap.release() + return + + event_logger = EventLogger(EVENTS_CSV) + tracker = PersonTracker( + event_logger=event_logger, + max_missing=max_missing, + match_distance=match_distance, + status_confirm_frames=status_confirm_frames, + ) + + # Start MJPEG stream server if requested + if stream_port > 0: + start_stream_server(stream_port) + + log.info(f"Session started | source={source}") + if show_gui: + print("Running - press q to quit, s to save frame.") + + prev = time.time() + frame_idx = 0 + frame = first + + while True: + frame_idx += 1 + try: + annotated, visible = process_frame( + frame, model, tracker, frame_idx, conf, + capture_dirs, write_csv=csv_every_frame, + ) + except Exception as e: + log.exception(f"Frame error #{frame_idx}: {e}") + annotated = frame + visible = tracker.visible_tracks() + + now_t = time.time() + fps = 1.0 / max(now_t - prev, 1e-9) + prev = now_t + + draw_counters(annotated, visible, fps) + + # Send to stream + if stream_port > 0: + update_stream_frame(annotated) + + if show_gui: + cv2.imshow("Saqr PPE Tracking", annotated) + key = cv2.waitKey(1) & 0xFF + if key == ord("q"): + break + if key == ord("s"): + cv2.imwrite("saved_frame.jpg", annotated) + log.info("Frame saved: saved_frame.jpg") + + ret, frame = cap.read() + if not ret: + break + + cap.release() + if show_gui: + cv2.destroyAllWindows() + + # Final CSV write + write_result_csv(list(tracker.tracks.values()), RESULT_CSV) + log.info(f"Session ended | frames={frame_idx} tracks_created={tracker.next_id - 1}") + + +def run_image(model: YOLO, path: str, conf: float, capture_dirs: Dict[str, Path], show_gui: bool): + frame = cv2.imread(path) + if frame is None: + log.error(f"Cannot read image: {path}") + return + + event_logger = EventLogger(EVENTS_CSV) + tracker = PersonTracker(event_logger=event_logger) + + annotated, visible = process_frame(frame, model, tracker, 1, conf, capture_dirs) + draw_counters(annotated, visible, 0.0) + + out = Path(path).stem + "_saqr.jpg" + cv2.imwrite(out, annotated) + log.info(f"Result saved: {out}") + + if show_gui: + cv2.imshow("Saqr PPE Tracking", annotated) + cv2.waitKey(0) + cv2.destroyAllWindows() + + +# ── CLI ─────────────────────────────────────────────────────────────────────── +def main(): + parser = argparse.ArgumentParser(description="Saqr PPE detection with tracking") + parser.add_argument("--source", default="0", + help="0/1=webcam, realsense, realsense:SERIAL, /dev/videoX, or video path") + parser.add_argument("--model", default="models/saqr_best.pt", + help="Trained YOLO weights") + parser.add_argument("--conf", type=float, default=0.35, + help="Detection confidence threshold") + parser.add_argument("--max-missing", type=int, default=90, + help="Frames to keep a lost track alive") + parser.add_argument("--match-distance", type=float, default=250.0, + help="Max pixel distance for track matching") + parser.add_argument("--status-confirm-frames", type=int, default=5, + help="Frames needed to confirm a status change") + parser.add_argument("--headless", action="store_true", + help="Disable OpenCV GUI window") + parser.add_argument("--stream", type=int, default=0, metavar="PORT", + help="Start MJPEG stream on this port (e.g. --stream 8080)") + parser.add_argument("--csv-on-exit", action="store_true", + help="Write result.csv only at session end") + # GPU / inference tuning + parser.add_argument("--device", default="0", + help="Device: 'cpu', '0' (first GPU), 'cuda:0', etc.") + parser.add_argument("--half", action="store_true", + help="Enable FP16 inference (Jetson / RTX GPUs)") + parser.add_argument("--imgsz", type=int, default=320, + help="Inference image size (320 fast, 640 accurate)") + args = parser.parse_args() + + # ── Configure global inference kwargs ──────────────────────────────── + global _INFER_KWARGS + _INFER_KWARGS = { + "device": args.device, + "half": args.half, + "imgsz": args.imgsz, + } + + # ── Log CUDA status ────────────────────────────────────────────────── + try: + import torch + if torch.cuda.is_available(): + dev_name = torch.cuda.get_device_name(0) + log.info(f"CUDA available: {dev_name} | torch={torch.__version__} | " + f"cuda={torch.version.cuda}") + else: + log.warning("CUDA not available - running on CPU (slow)") + if args.device != "cpu": + log.warning(f"Falling back to CPU (you requested device={args.device})") + _INFER_KWARGS["device"] = "cpu" + _INFER_KWARGS["half"] = False + except ImportError: + log.warning("PyTorch not found") + + log.info(f"Inference config: device={_INFER_KWARGS['device']} " + f"half={_INFER_KWARGS['half']} imgsz={_INFER_KWARGS['imgsz']}") + + capture_dirs = setup_capture_dirs(ROOT) + + try: + model_path = resolve_model_path(ROOT, args.model) + except FileNotFoundError as e: + log.error(str(e)) + log.error("Train first: python train.py --dataset dataset") + raise SystemExit(1) + + log.info(f"Loading model: {model_path}") + model = YOLO(str(model_path)) + log.info(f"Classes: {list(model.names.values())}") + + source = args.source + is_live = ( + source.isdigit() + or source.lower().startswith("realsense") + or source.startswith("/dev/video") + ) + is_video_file = source.lower().endswith( + (".mp4", ".avi", ".mov", ".mkv", ".webm") + ) + + if is_live or is_video_file: + run_video( + model, source, args.conf, capture_dirs, + show_gui=not args.headless, + csv_every_frame=not args.csv_on_exit, + max_missing=args.max_missing, + match_distance=args.match_distance, + status_confirm_frames=args.status_confirm_frames, + stream_port=args.stream, + ) + elif Path(source).exists(): + run_image(model, source, args.conf, capture_dirs, show_gui=not args.headless) + else: + log.error(f"Source not found: {source}") + raise SystemExit(1) + + +if __name__ == "__main__": + main() diff --git a/saqr_g1_bridge.py b/saqr_g1_bridge.py new file mode 100644 index 0000000..f95089b --- /dev/null +++ b/saqr_g1_bridge.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +""" +saqr_g1_bridge.py + +Bridge between Saqr PPE detection and the Unitree G1 robot. + +Spawns Saqr (saqr.py in this same folder) as a subprocess, parses its event +stream, and on each per-person status transition: + + * UNSAFE -> announce "Not safe!" via the G1 onboard TtsMaker (English, + speaker_id=2) AND run the 'reject' arm action (id=13). + * SAFE -> announce "Safe!" via the G1 onboard TtsMaker. No arm motion. + * PARTIAL -> nothing. + +Both DDS clients (G1ArmActionClient and G1 AudioClient) share a single +ChannelFactoryInitialize call. The TTS speaker_id was identified by running +Project/Sanad/voice_example.py mode 6 — speaker_id=2 is English on current +G1 firmware (speaker_id=0 is Chinese regardless of input text). + +Saqr event line format (from emit_event in saqr.py): + ID 0001 | NEW | UNSAFE | wearing: ... | missing: ... | unknown: ... + ID 0001 | STATUS_CHANGE | SAFE | wearing: ... | missing: ... | unknown: ... + +Usage: + # default: webcam, default DDS interface + python3 saqr_g1_bridge.py + + # on the robot + python3 saqr_g1_bridge.py --iface eth0 --source realsense --headless + + # dry run (no robot movement / TTS, just print decisions) + python3 saqr_g1_bridge.py --dry-run + + # forward extra args to saqr.py after a `--` + python3 saqr_g1_bridge.py --iface eth0 -- --conf 0.4 --imgsz 640 +""" + +from __future__ import annotations + +import argparse +import os +import re +import signal +import subprocess +import sys +import threading +import time +from pathlib import Path +from typing import Dict, Optional + + +# ── Defaults ───────────────────────────────────────────────────────────────── +HERE = Path(__file__).resolve().parent +SAQR_DIR = HERE # bridge lives next to saqr.py +SAQR_SCRIPT = SAQR_DIR / "saqr.py" + +DANGER_STATUS = "UNSAFE" +SAFE_STATUS = "SAFE" +REJECT_ACTION = "reject" +RELEASE_ACTION = "release arm" + +# G1 onboard TtsMaker (see Project/Sanad/voice_example.py mode 6). +# speaker_id=2 was confirmed English on current G1 firmware. +TTS_SPEAKER_ID = 2 +TTS_VOLUME = 100 + +TTS_TEXT_SAFE = "Safe to enter. Have a good day." +TTS_UNSAFE_WITH_MISSING = ( + "Please stop. Wear your proper safety equipment. You are missing {items}." +) +TTS_UNSAFE_GENERIC = ( + "Please stop. Wear your proper safety equipment." +) + +# ID NNNN | EVENT_TYPE | STATUS | wearing: ... | missing: ... | unknown: ... +EVENT_RE = re.compile( + r"^ID\s+(?P\d+)\s*\|\s*" + r"(?PNEW|STATUS_CHANGE)\s*\|\s*" + r"(?PSAFE|PARTIAL|UNSAFE)\s*\|\s*" + r"wearing:\s*(?P[^|]*?)\s*\|\s*" + r"missing:\s*(?P[^|]*?)\s*\|\s*" + r"unknown:\s*(?P.*?)\s*$" +) + + +def _parse_list_field(s: str) -> list: + """Parse 'helmet, vest' or 'none' into a list of items.""" + s = (s or "").strip() + if not s or s.lower() == "none": + return [] + return [x.strip() for x in s.split(",") if x.strip()] + + +def _human_join(items: list) -> str: + """Join a list in natural English: 'helmet and vest', 'a, b, and c'.""" + if not items: + return "" + if len(items) == 1: + return items[0] + if len(items) == 2: + return f"{items[0]} and {items[1]}" + return ", ".join(items[:-1]) + f", and {items[-1]}" + + +def build_unsafe_tts(missing: list) -> str: + if not missing: + return TTS_UNSAFE_GENERIC + return TTS_UNSAFE_WITH_MISSING.format(items=_human_join(missing)) + + +# ── G1 robot controller (lazy import: SDK only loaded when not in dry-run) ─── +class RobotController: + """Owns both the G1 arm action client and the G1 audio (TTS) client. + + A single ChannelFactoryInitialize call is shared by both clients. + """ + + def __init__( + self, + iface: Optional[str], + timeout: float, + dry_run: bool, + tts_speaker_id: int, + ): + self.dry_run = dry_run + self.tts_speaker_id = tts_speaker_id + self.arm_client = None + self.audio_client = None + self._action_map = None + + if dry_run: + print("[BRIDGE] DRY RUN — G1 SDK will not be loaded.", flush=True) + return + + from unitree_sdk2py.core.channel import ChannelFactoryInitialize + from unitree_sdk2py.g1.arm.g1_arm_action_client import ( + G1ArmActionClient, + action_map, + ) + from unitree_sdk2py.g1.audio.g1_audio_client import AudioClient + + self._action_map = action_map + + if iface: + ChannelFactoryInitialize(0, iface) + else: + ChannelFactoryInitialize(0) + + self.arm_client = G1ArmActionClient() + self.arm_client.SetTimeout(timeout) + self.arm_client.Init() + print(f"[BRIDGE] G1ArmActionClient ready (iface={iface or 'default'})", + flush=True) + + self.audio_client = AudioClient() + self.audio_client.SetTimeout(timeout) + self.audio_client.Init() + try: + self.audio_client.SetVolume(TTS_VOLUME) + except Exception as e: + print(f"[BRIDGE][WARN] AudioClient.SetVolume failed: {e}", flush=True) + print(f"[BRIDGE] G1 AudioClient ready (speaker_id={tts_speaker_id})", + flush=True) + + # ── TTS ───────────────────────────────────────────────────────────────── + def speak(self, text: str): + if self.dry_run: + print(f"[BRIDGE] (dry) would TtsMaker({text!r}, " + f"speaker_id={self.tts_speaker_id})", flush=True) + return + if self.audio_client is None: + return + try: + print(f"[BRIDGE] tts -> {text!r}", flush=True) + code = self.audio_client.TtsMaker(text, self.tts_speaker_id) + if code != 0: + print(f"[BRIDGE][WARN] TtsMaker return code = {code}", flush=True) + except Exception as e: + print(f"[BRIDGE][ERR] TtsMaker failed: {e}", flush=True) + + # ── Arm ───────────────────────────────────────────────────────────────── + def reject(self, release_after: float): + if self.dry_run: + print(f"[BRIDGE] (dry) would run '{REJECT_ACTION}' " + f"then release after {release_after:.1f}s", flush=True) + return + if self.arm_client is None or self._action_map is None: + return + if REJECT_ACTION not in self._action_map: + print(f"[BRIDGE][ERR] '{REJECT_ACTION}' not in SDK action_map", + flush=True) + return + print(f"[BRIDGE] -> {REJECT_ACTION}", flush=True) + self.arm_client.ExecuteAction(self._action_map[REJECT_ACTION]) + if release_after > 0: + time.sleep(release_after) + print(f"[BRIDGE] -> {RELEASE_ACTION}", flush=True) + self.arm_client.ExecuteAction(self._action_map[RELEASE_ACTION]) + + +# ── Bridge ─────────────────────────────────────────────────────────────────── +class Bridge: + def __init__( + self, + robot: RobotController, + cooldown_s: float, + release_after_s: float, + ): + self.robot = robot + self.cooldown_s = cooldown_s + self.release_after_s = release_after_s + self.last_status: Dict[int, str] = {} + # Per-id cooldown is keyed by (track_id, status) so a SAFE announce + # and an UNSAFE announce don't share the same timer. + self.last_trigger_t: Dict[tuple[int, str], float] = {} + self._lock = threading.Lock() + + def handle_line(self, line: str): + line = line.rstrip() + if not line: + return + # Always echo Saqr output so the user still sees the live stream. + print(line, flush=True) + + m = EVENT_RE.match(line) + if not m: + return + + track_id = int(m.group("id")) + status = m.group("status") + missing = _parse_list_field(m.group("missing")) + + with self._lock: + prev = self.last_status.get(track_id) + self.last_status[track_id] = status + + # Only SAFE / UNSAFE transitions trigger the robot. PARTIAL is silent. + if status not in (DANGER_STATUS, SAFE_STATUS): + return + + # Only fire on transitions, not on every NEW/STATUS_CHANGE for the + # same status. + if prev == status: + return + + now = time.time() + last_t = self.last_trigger_t.get((track_id, status), 0.0) + if (now - last_t) < self.cooldown_s: + return + self.last_trigger_t[(track_id, status)] = now + + # Run robot actions outside the lock so we don't block parsing. + try: + if status == DANGER_STATUS: + self.robot.speak(build_unsafe_tts(missing)) + self.robot.reject(release_after=self.release_after_s) + else: # SAFE + self.robot.speak(TTS_TEXT_SAFE) + except Exception as e: + print(f"[BRIDGE][ERR] robot action failed: {e}", flush=True) + + +# ── Saqr subprocess management ─────────────────────────────────────────────── +def build_saqr_cmd(saqr_extra_args: list[str]) -> list[str]: + if not SAQR_SCRIPT.exists(): + sys.exit(f"[BRIDGE][FATAL] saqr.py not found at: {SAQR_SCRIPT}") + # -u for unbuffered stdout (so events arrive line-by-line). + return [sys.executable, "-u", str(SAQR_SCRIPT), *saqr_extra_args] + + +def split_argv(argv: list[str]) -> tuple[list[str], list[str]]: + """Split bridge args from saqr passthrough args at the first '--'.""" + if "--" in argv: + idx = argv.index("--") + return argv[:idx], argv[idx + 1 :] + return argv, [] + + +def main(): + bridge_argv, saqr_extra = split_argv(sys.argv[1:]) + + ap = argparse.ArgumentParser( + description="Bridge Saqr PPE events to the G1 arm 'reject' action." + ) + ap.add_argument("--iface", default=None, + help="DDS network interface (e.g. enp3s0). Optional.") + ap.add_argument("--timeout", type=float, default=10.0, + help="G1 arm client timeout (seconds).") + ap.add_argument("--cooldown", type=float, default=8.0, + help="Per-track-id seconds before reject can re-trigger.") + ap.add_argument("--release-after", type=float, default=2.0, + help="Seconds before auto-running 'release arm' (0 = never).") + ap.add_argument("--dry-run", action="store_true", + help="Parse and decide but never call the SDK.") + ap.add_argument("--speaker-id", type=int, default=TTS_SPEAKER_ID, + help=f"G1 TtsMaker speaker_id (default {TTS_SPEAKER_ID}, English).") + + # Convenience pass-throughs to saqr.py (you can also use `-- ...`). + ap.add_argument("--source", default=None, + help="Saqr --source (0/realsense/path). Default: leave to saqr.") + ap.add_argument("--headless", action="store_true", + help="Pass --headless to saqr.") + ap.add_argument("--saqr-conf", type=float, default=None, + help="Pass --conf to saqr.") + ap.add_argument("--imgsz", type=int, default=None, + help="Pass --imgsz to saqr.") + ap.add_argument("--device", default=None, + help="Pass --device to saqr (e.g. cpu / 0 / cuda:0).") + + args = ap.parse_args(bridge_argv) + + # Build saqr args from convenience flags + raw passthrough. + saqr_args: list[str] = [] + if args.source is not None: + saqr_args += ["--source", args.source] + if args.headless: + saqr_args += ["--headless"] + if args.saqr_conf is not None: + saqr_args += ["--conf", str(args.saqr_conf)] + if args.imgsz is not None: + saqr_args += ["--imgsz", str(args.imgsz)] + if args.device is not None: + saqr_args += ["--device", args.device] + saqr_args += saqr_extra + + robot = RobotController( + iface=args.iface, + timeout=args.timeout, + dry_run=args.dry_run, + tts_speaker_id=args.speaker_id, + ) + bridge = Bridge( + robot=robot, + cooldown_s=args.cooldown, + release_after_s=args.release_after, + ) + + cmd = build_saqr_cmd(saqr_args) + print(f"[BRIDGE] launching: {' '.join(cmd)}", flush=True) + print(f"[BRIDGE] cwd: {SAQR_DIR}", flush=True) + + env = os.environ.copy() + env["PYTHONUNBUFFERED"] = "1" + + proc = subprocess.Popen( + cmd, + cwd=str(SAQR_DIR), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + bufsize=1, + text=True, + env=env, + ) + + def _forward_signal(signum, _frame): + print(f"[BRIDGE] signal {signum} -> stopping saqr", flush=True) + try: + proc.send_signal(signum) + except Exception: + pass + + signal.signal(signal.SIGINT, _forward_signal) + signal.signal(signal.SIGTERM, _forward_signal) + + try: + assert proc.stdout is not None + for line in proc.stdout: + bridge.handle_line(line) + finally: + rc = proc.wait() + print(f"[BRIDGE] saqr exited rc={rc}", flush=True) + sys.exit(rc) + + +if __name__ == "__main__": + main() diff --git a/train.py b/train.py new file mode 100644 index 0000000..a3b159a --- /dev/null +++ b/train.py @@ -0,0 +1,118 @@ +""" +Saqr - PPE Detection | Training Script +========================================= +Train YOLO11n object detection on a PPE dataset (10 classes). + +Classes: helmet, no-helmet, vest, no-vest, boots, no-boots, + gloves, no-gloves, goggles, no-goggles + +Usage: + python train.py --dataset dataset + python train.py --dataset dataset --epochs 50 --batch 8 +""" + +import argparse +import shutil +from pathlib import Path + +import yaml + +from logger import get_logger + +log = get_logger("Training", "train") + +EXPECTED_CLASSES = [ + "boots", "gloves", "goggles", "helmet", "no-boots", + "no-gloves", "no-goggles", "no-helmet", "no-vest", "vest", +] + + +def fix_data_yaml(dataset_root: Path) -> Path: + """Ensure data.yaml has correct absolute paths for each split.""" + yaml_path = dataset_root / "data.yaml" + if not yaml_path.exists(): + log.error(f"data.yaml not found at {yaml_path}") + raise SystemExit(1) + + with open(yaml_path) as f: + cfg = yaml.safe_load(f) + + changed = False + for key, subdir in [("train", "train"), ("val", "valid"), ("test", "test")]: + img_dir = dataset_root / subdir / "images" + if img_dir.exists() and cfg.get(key) != str(img_dir): + cfg[key] = str(img_dir) + changed = True + + if "path" not in cfg or cfg["path"] != str(dataset_root): + cfg["path"] = str(dataset_root) + changed = True + + if changed: + with open(yaml_path, "w") as f: + yaml.dump(cfg, f, default_flow_style=False) + log.info(f"Fixed data.yaml paths -> {yaml_path}") + + log.info(f"Classes ({cfg.get('nc', '?')}): {cfg.get('names', [])}") + return yaml_path + + +def main(): + parser = argparse.ArgumentParser(description="Train Saqr PPE detector (YOLO11n)") + parser.add_argument("--dataset", default="dataset", + help="Root folder containing data.yaml + train/valid/test") + parser.add_argument("--epochs", type=int, default=100) + parser.add_argument("--imgsz", type=int, default=640) + parser.add_argument("--batch", type=int, default=16) + parser.add_argument("--model", default="yolo11n.pt", + help="Base YOLO model (auto-downloaded if not present)") + parser.add_argument("--name", default="saqr_det") + parser.add_argument("--device", default="0", + help="Training device: 'cpu', '0', 'cuda:0', etc.") + args = parser.parse_args() + + root = Path(__file__).parent + dataset_root = root / args.dataset + if not dataset_root.exists(): + log.error(f"Dataset folder not found: {dataset_root}") + raise SystemExit(1) + + yaml_path = fix_data_yaml(dataset_root) + + from ultralytics import YOLO + + log.info(f"Loading base model: {args.model}") + model = YOLO(args.model) + + log.info(f"Training | epochs={args.epochs} imgsz={args.imgsz} " + f"batch={args.batch} device={args.device}") + model.train( + data=str(yaml_path), + epochs=args.epochs, + imgsz=args.imgsz, + batch=args.batch, + device=args.device, + name=args.name, + project=str(root / "runs" / "train"), + exist_ok=True, + ) + + # Copy best/last weights to models/ + models_dir = root / "models" + models_dir.mkdir(exist_ok=True) + weights_dir = root / "runs" / "train" / args.name / "weights" + + for name in ("best.pt", "last.pt"): + src = weights_dir / name + dst = models_dir / f"saqr_{name}" + if src.exists(): + shutil.copy(src, dst) + log.info(f"Saved: {dst}") + + metrics = model.val() + log.info(f"mAP50={metrics.box.map50:.4f} mAP50-95={metrics.box.map:.4f}") + log.info("Next: python saqr.py --source 0") + + +if __name__ == "__main__": + main() diff --git a/use case catalogue.pdf b/use case catalogue.pdf new file mode 100644 index 0000000..d35aea1 Binary files /dev/null and b/use case catalogue.pdf differ diff --git a/view_stream.py b/view_stream.py new file mode 100644 index 0000000..967b705 --- /dev/null +++ b/view_stream.py @@ -0,0 +1,48 @@ +""" +Saqr - View robot PPE stream on laptop via OpenCV +=================================================== +Connects to the robot's MJPEG stream and displays in an OpenCV window. + +Usage: + python view_stream.py + python view_stream.py --ip 192.168.123.164 + python view_stream.py --ip 10.255.254.86 --port 8080 +""" + +import argparse +import cv2 + +def main(): + parser = argparse.ArgumentParser(description="View Saqr PPE stream from robot") + parser.add_argument("--ip", default="192.168.123.164", help="Robot IP address") + parser.add_argument("--port", default="8080", help="Stream port") + args = parser.parse_args() + + url = f"http://{args.ip}:{args.port}/stream" + print(f"Connecting to {url} ...") + + cap = cv2.VideoCapture(url) + if not cap.isOpened(): + print(f"[ERROR] Cannot connect to {url}") + print(f" Try: python view_stream.py --ip 10.255.254.86") + return + + print("Connected! Press q to quit.") + + while True: + ret, frame = cap.read() + if not ret: + print("Stream lost, reconnecting...") + cap.release() + cap = cv2.VideoCapture(url) + continue + + cv2.imshow("Saqr PPE - Robot Stream", frame) + if cv2.waitKey(1) & 0xFF == ord("q"): + break + + cap.release() + cv2.destroyAllWindows() + +if __name__ == "__main__": + main()