Saqr/core/tracking.py

150 lines
5.4 KiB
Python

"""Per-person Track dataclass and the greedy nearest-match PersonTracker.
Tracks keep a rolling history of bbox centers so ``is_stationary()`` can gate
announcements: the bridge only fires TTS + arm actions on people standing
still at the checkpoint, not on anyone walking past.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from core.compliance import status_from_items
from core.events import EventLogger, now_iso
from core.geometry import box_distance
from core.grouping import PersonCandidate
from utils.config import load_config
_TRK = load_config("core")["tracking"]
STATIONARY_CHECK_FRAMES = int(_TRK.get("stationary_check_frames", 15))
STATIONARY_TOLERANCE_PX = float(_TRK.get("stationary_tolerance_px", 40.0))
@dataclass
class Track:
track_id: int
bbox: Tuple[int, int, int, int]
items: Dict[str, float]
status: str
last_seen_frame: int = 0
last_seen_iso: str = ""
created_iso: str = ""
frames_missing: int = 0
photo_path: Optional[Path] = None
announced_status: Optional[str] = None
event_count: int = 0
pending_status: Optional[str] = None
pending_count: int = 0
# Rolling history of bbox centers (most recent N frames) used to decide
# whether the person is standing still at the checkpoint.
recent_centers: List[Tuple[float, float]] = field(default_factory=list)
def record_center(self) -> None:
cx = (self.bbox[0] + self.bbox[2]) / 2.0
cy = (self.bbox[1] + self.bbox[3]) / 2.0
self.recent_centers.append((cx, cy))
if len(self.recent_centers) > STATIONARY_CHECK_FRAMES:
del self.recent_centers[0]
def is_stationary(self) -> bool:
"""True when the bbox center hasn't moved more than the configured
tolerance across the last ``STATIONARY_CHECK_FRAMES`` frames."""
if len(self.recent_centers) < STATIONARY_CHECK_FRAMES:
return False
xs = [c[0] for c in self.recent_centers]
ys = [c[1] for c in self.recent_centers]
spread = max(max(xs) - min(xs), max(ys) - min(ys))
return spread <= STATIONARY_TOLERANCE_PX
class PersonTracker:
def __init__(
self,
event_logger: EventLogger,
max_missing: int = 90,
match_distance: float = 250.0,
status_confirm_frames: int = 5,
):
self.event_logger = event_logger
self.max_missing = max_missing
self.match_distance = match_distance
self.status_confirm_frames = max(1, status_confirm_frames)
self.tracks: Dict[int, Track] = {}
self.next_id = 1
def _new_track(self, person: PersonCandidate, frame_idx: int) -> Track:
track = Track(
track_id=self.next_id,
bbox=person.bbox,
items=dict(person.items),
status=status_from_items(person.items),
last_seen_frame=frame_idx,
last_seen_iso=now_iso(),
created_iso=now_iso(),
)
self.next_id += 1
self.tracks[track.track_id] = track
return track
def _match(self, person: PersonCandidate, used: set) -> Optional[Track]:
best, best_dist = None, float("inf")
for tid, track in self.tracks.items():
if tid in used:
continue
dist = box_distance(track.bbox, person.bbox)
if dist < best_dist and dist <= self.match_distance:
best_dist = dist
best = track
return best
def update(self, people: List[PersonCandidate], frame_idx: int):
used: set = set()
created: List[Track] = []
changed: List[Track] = []
for person in people:
track = self._match(person, used)
if track is None:
track = self._new_track(person, frame_idx)
created.append(track)
else:
new_status = status_from_items(person.items)
track.bbox = person.bbox
track.items = dict(person.items)
track.last_seen_frame = frame_idx
track.last_seen_iso = now_iso()
track.frames_missing = 0
if new_status != track.status:
if track.pending_status == new_status:
track.pending_count += 1
else:
track.pending_status = new_status
track.pending_count = 1
if track.pending_count >= self.status_confirm_frames:
track.status = new_status
track.pending_status = None
track.pending_count = 0
changed.append(track)
else:
track.pending_status = None
track.pending_count = 0
track.record_center()
used.add(track.track_id)
stale = []
for tid, track in self.tracks.items():
if tid not in used:
track.frames_missing += 1
if track.frames_missing > self.max_missing:
stale.append(tid)
for tid in stale:
del self.tracks[tid]
return created, changed
def visible_tracks(self) -> List[Track]:
return [t for t in self.tracks.values() if t.frames_missing == 0]