Saqr/saqr/core/tracking.py

119 lines
4.0 KiB
Python

"""Per-person Track dataclass and the greedy nearest-match PersonTracker."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from saqr.core.compliance import status_from_items
from saqr.core.events import EventLogger, now_iso
from saqr.core.geometry import box_distance
from saqr.core.grouping import PersonCandidate
@dataclass
class Track:
track_id: int
bbox: Tuple[int, int, int, int]
items: Dict[str, float]
status: str
last_seen_frame: int = 0
last_seen_iso: str = ""
created_iso: str = ""
frames_missing: int = 0
photo_path: Optional[Path] = None
announced_status: Optional[str] = None
event_count: int = 0
pending_status: Optional[str] = None
pending_count: int = 0
class PersonTracker:
def __init__(
self,
event_logger: EventLogger,
max_missing: int = 90,
match_distance: float = 250.0,
status_confirm_frames: int = 5,
):
self.event_logger = event_logger
self.max_missing = max_missing
self.match_distance = match_distance
self.status_confirm_frames = max(1, status_confirm_frames)
self.tracks: Dict[int, Track] = {}
self.next_id = 1
def _new_track(self, person: PersonCandidate, frame_idx: int) -> Track:
track = Track(
track_id=self.next_id,
bbox=person.bbox,
items=dict(person.items),
status=status_from_items(person.items),
last_seen_frame=frame_idx,
last_seen_iso=now_iso(),
created_iso=now_iso(),
)
self.next_id += 1
self.tracks[track.track_id] = track
return track
def _match(self, person: PersonCandidate, used: set) -> Optional[Track]:
best, best_dist = None, float("inf")
for tid, track in self.tracks.items():
if tid in used:
continue
dist = box_distance(track.bbox, person.bbox)
if dist < best_dist and dist <= self.match_distance:
best_dist = dist
best = track
return best
def update(self, people: List[PersonCandidate], frame_idx: int):
used: set = set()
created: List[Track] = []
changed: List[Track] = []
for person in people:
track = self._match(person, used)
if track is None:
track = self._new_track(person, frame_idx)
created.append(track)
else:
new_status = status_from_items(person.items)
track.bbox = person.bbox
track.items = dict(person.items)
track.last_seen_frame = frame_idx
track.last_seen_iso = now_iso()
track.frames_missing = 0
if new_status != track.status:
if track.pending_status == new_status:
track.pending_count += 1
else:
track.pending_status = new_status
track.pending_count = 1
if track.pending_count >= self.status_confirm_frames:
track.status = new_status
track.pending_status = None
track.pending_count = 0
changed.append(track)
else:
track.pending_status = None
track.pending_count = 0
used.add(track.track_id)
stale = []
for tid, track in self.tracks.items():
if tid not in used:
track.frames_missing += 1
if track.frames_missing > self.max_missing:
stale.append(tid)
for tid in stale:
del self.tracks[tid]
return created, changed
def visible_tracks(self) -> List[Track]:
return [t for t in self.tracks.values() if t.frames_missing == 0]