150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
"""Per-person Track dataclass and the greedy nearest-match PersonTracker.
|
|
|
|
Tracks keep a rolling history of bbox centers so ``is_stationary()`` can gate
|
|
announcements: the bridge only fires TTS + arm actions on people standing
|
|
still at the checkpoint, not on anyone walking past.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
from core.compliance import status_from_items
|
|
from core.events import EventLogger, now_iso
|
|
from core.geometry import box_distance
|
|
from core.grouping import PersonCandidate
|
|
from utils.config import load_config
|
|
|
|
_TRK = load_config("core")["tracking"]
|
|
STATIONARY_CHECK_FRAMES = int(_TRK.get("stationary_check_frames", 15))
|
|
STATIONARY_TOLERANCE_PX = float(_TRK.get("stationary_tolerance_px", 40.0))
|
|
|
|
|
|
@dataclass
|
|
class Track:
|
|
track_id: int
|
|
bbox: Tuple[int, int, int, int]
|
|
items: Dict[str, float]
|
|
status: str
|
|
last_seen_frame: int = 0
|
|
last_seen_iso: str = ""
|
|
created_iso: str = ""
|
|
frames_missing: int = 0
|
|
photo_path: Optional[Path] = None
|
|
announced_status: Optional[str] = None
|
|
event_count: int = 0
|
|
pending_status: Optional[str] = None
|
|
pending_count: int = 0
|
|
# Rolling history of bbox centers (most recent N frames) used to decide
|
|
# whether the person is standing still at the checkpoint.
|
|
recent_centers: List[Tuple[float, float]] = field(default_factory=list)
|
|
|
|
def record_center(self) -> None:
|
|
cx = (self.bbox[0] + self.bbox[2]) / 2.0
|
|
cy = (self.bbox[1] + self.bbox[3]) / 2.0
|
|
self.recent_centers.append((cx, cy))
|
|
if len(self.recent_centers) > STATIONARY_CHECK_FRAMES:
|
|
del self.recent_centers[0]
|
|
|
|
def is_stationary(self) -> bool:
|
|
"""True when the bbox center hasn't moved more than the configured
|
|
tolerance across the last ``STATIONARY_CHECK_FRAMES`` frames."""
|
|
if len(self.recent_centers) < STATIONARY_CHECK_FRAMES:
|
|
return False
|
|
xs = [c[0] for c in self.recent_centers]
|
|
ys = [c[1] for c in self.recent_centers]
|
|
spread = max(max(xs) - min(xs), max(ys) - min(ys))
|
|
return spread <= STATIONARY_TOLERANCE_PX
|
|
|
|
|
|
class PersonTracker:
|
|
def __init__(
|
|
self,
|
|
event_logger: EventLogger,
|
|
max_missing: int = 90,
|
|
match_distance: float = 250.0,
|
|
status_confirm_frames: int = 5,
|
|
):
|
|
self.event_logger = event_logger
|
|
self.max_missing = max_missing
|
|
self.match_distance = match_distance
|
|
self.status_confirm_frames = max(1, status_confirm_frames)
|
|
self.tracks: Dict[int, Track] = {}
|
|
self.next_id = 1
|
|
|
|
def _new_track(self, person: PersonCandidate, frame_idx: int) -> Track:
|
|
track = Track(
|
|
track_id=self.next_id,
|
|
bbox=person.bbox,
|
|
items=dict(person.items),
|
|
status=status_from_items(person.items),
|
|
last_seen_frame=frame_idx,
|
|
last_seen_iso=now_iso(),
|
|
created_iso=now_iso(),
|
|
)
|
|
self.next_id += 1
|
|
self.tracks[track.track_id] = track
|
|
return track
|
|
|
|
def _match(self, person: PersonCandidate, used: set) -> Optional[Track]:
|
|
best, best_dist = None, float("inf")
|
|
for tid, track in self.tracks.items():
|
|
if tid in used:
|
|
continue
|
|
dist = box_distance(track.bbox, person.bbox)
|
|
if dist < best_dist and dist <= self.match_distance:
|
|
best_dist = dist
|
|
best = track
|
|
return best
|
|
|
|
def update(self, people: List[PersonCandidate], frame_idx: int):
|
|
used: set = set()
|
|
created: List[Track] = []
|
|
changed: List[Track] = []
|
|
|
|
for person in people:
|
|
track = self._match(person, used)
|
|
if track is None:
|
|
track = self._new_track(person, frame_idx)
|
|
created.append(track)
|
|
else:
|
|
new_status = status_from_items(person.items)
|
|
track.bbox = person.bbox
|
|
track.items = dict(person.items)
|
|
track.last_seen_frame = frame_idx
|
|
track.last_seen_iso = now_iso()
|
|
track.frames_missing = 0
|
|
|
|
if new_status != track.status:
|
|
if track.pending_status == new_status:
|
|
track.pending_count += 1
|
|
else:
|
|
track.pending_status = new_status
|
|
track.pending_count = 1
|
|
if track.pending_count >= self.status_confirm_frames:
|
|
track.status = new_status
|
|
track.pending_status = None
|
|
track.pending_count = 0
|
|
changed.append(track)
|
|
else:
|
|
track.pending_status = None
|
|
track.pending_count = 0
|
|
|
|
track.record_center()
|
|
used.add(track.track_id)
|
|
|
|
stale = []
|
|
for tid, track in self.tracks.items():
|
|
if tid not in used:
|
|
track.frames_missing += 1
|
|
if track.frames_missing > self.max_missing:
|
|
stale.append(tid)
|
|
for tid in stale:
|
|
del self.tracks[tid]
|
|
|
|
return created, changed
|
|
|
|
def visible_tracks(self) -> List[Track]:
|
|
return [t for t in self.tracks.values() if t.frames_missing == 0]
|