"""Per-person Track dataclass and the greedy nearest-match PersonTracker. Tracks keep a rolling history of bbox centers so ``is_stationary()`` can gate announcements: the bridge only fires TTS + arm actions on people standing still at the checkpoint, not on anyone walking past. """ from __future__ import annotations from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Tuple from core.compliance import status_from_items from core.events import EventLogger, now_iso from core.geometry import box_distance from core.grouping import PersonCandidate from utils.config import load_config _TRK = load_config("core")["tracking"] STATIONARY_CHECK_FRAMES = int(_TRK.get("stationary_check_frames", 15)) STATIONARY_TOLERANCE_PX = float(_TRK.get("stationary_tolerance_px", 40.0)) @dataclass class Track: track_id: int bbox: Tuple[int, int, int, int] items: Dict[str, float] status: str last_seen_frame: int = 0 last_seen_iso: str = "" created_iso: str = "" frames_missing: int = 0 photo_path: Optional[Path] = None announced_status: Optional[str] = None event_count: int = 0 pending_status: Optional[str] = None pending_count: int = 0 # Rolling history of bbox centers (most recent N frames) used to decide # whether the person is standing still at the checkpoint. recent_centers: List[Tuple[float, float]] = field(default_factory=list) def record_center(self) -> None: cx = (self.bbox[0] + self.bbox[2]) / 2.0 cy = (self.bbox[1] + self.bbox[3]) / 2.0 self.recent_centers.append((cx, cy)) if len(self.recent_centers) > STATIONARY_CHECK_FRAMES: del self.recent_centers[0] def is_stationary(self) -> bool: """True when the bbox center hasn't moved more than the configured tolerance across the last ``STATIONARY_CHECK_FRAMES`` frames.""" if len(self.recent_centers) < STATIONARY_CHECK_FRAMES: return False xs = [c[0] for c in self.recent_centers] ys = [c[1] for c in self.recent_centers] spread = max(max(xs) - min(xs), max(ys) - min(ys)) return spread <= STATIONARY_TOLERANCE_PX class PersonTracker: def __init__( self, event_logger: EventLogger, max_missing: int = 90, match_distance: float = 250.0, status_confirm_frames: int = 5, ): self.event_logger = event_logger self.max_missing = max_missing self.match_distance = match_distance self.status_confirm_frames = max(1, status_confirm_frames) self.tracks: Dict[int, Track] = {} self.next_id = 1 def _new_track(self, person: PersonCandidate, frame_idx: int) -> Track: track = Track( track_id=self.next_id, bbox=person.bbox, items=dict(person.items), status=status_from_items(person.items), last_seen_frame=frame_idx, last_seen_iso=now_iso(), created_iso=now_iso(), ) self.next_id += 1 self.tracks[track.track_id] = track return track def _match(self, person: PersonCandidate, used: set) -> Optional[Track]: best, best_dist = None, float("inf") for tid, track in self.tracks.items(): if tid in used: continue dist = box_distance(track.bbox, person.bbox) if dist < best_dist and dist <= self.match_distance: best_dist = dist best = track return best def update(self, people: List[PersonCandidate], frame_idx: int): used: set = set() created: List[Track] = [] changed: List[Track] = [] for person in people: track = self._match(person, used) if track is None: track = self._new_track(person, frame_idx) created.append(track) else: new_status = status_from_items(person.items) track.bbox = person.bbox track.items = dict(person.items) track.last_seen_frame = frame_idx track.last_seen_iso = now_iso() track.frames_missing = 0 if new_status != track.status: if track.pending_status == new_status: track.pending_count += 1 else: track.pending_status = new_status track.pending_count = 1 if track.pending_count >= self.status_confirm_frames: track.status = new_status track.pending_status = None track.pending_count = 0 changed.append(track) else: track.pending_status = None track.pending_count = 0 track.record_center() used.add(track.track_id) stale = [] for tid, track in self.tracks.items(): if tid not in used: track.frames_missing += 1 if track.frames_missing > self.max_missing: stale.append(tid) for tid in stale: del self.tracks[tid] return created, changed def visible_tracks(self) -> List[Track]: return [t for t in self.tracks.values() if t.frames_missing == 0]