105 lines
2.9 KiB
Python
105 lines
2.9 KiB
Python
"""
|
|
error_events.py
|
|
|
|
Small structured error event sink:
|
|
- In-memory counters per (source, stage)
|
|
- JSONL event log (append-only)
|
|
- JSON counters snapshot for quick dashboard/debug fetch
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
|
|
try:
|
|
from Core import settings as config
|
|
|
|
_STATE_DIR = Path(config.APP_RUNTIME_DIR)
|
|
_LEGACY_STATE_DIR = Path(config.APP_DATA_DIR)
|
|
except Exception:
|
|
_STATE_DIR = Path(__file__).resolve().parents[1] / "Data" / "Runtime"
|
|
_LEGACY_STATE_DIR = Path(__file__).resolve().parents[1] / "Data"
|
|
|
|
_STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
_COUNTERS_PATH = _STATE_DIR / "error_counters.json"
|
|
_EVENTS_PATH = _STATE_DIR / "error_events.jsonl"
|
|
_LEGACY_COUNTERS_PATH = _LEGACY_STATE_DIR / "error_counters.json"
|
|
|
|
_LOCK = threading.Lock()
|
|
_COUNTERS: Dict[str, int] = {}
|
|
_LAST_EVENT_TS: Dict[str, float] = {}
|
|
_MIN_INTERVAL_SEC = 0.75
|
|
|
|
|
|
def _load_counters() -> Dict[str, int]:
|
|
for candidate in (_COUNTERS_PATH, _LEGACY_COUNTERS_PATH):
|
|
try:
|
|
if candidate.exists():
|
|
raw = json.loads(candidate.read_text(encoding="utf-8"))
|
|
if isinstance(raw, dict):
|
|
out: Dict[str, int] = {}
|
|
for k, v in raw.items():
|
|
try:
|
|
out[str(k)] = int(v)
|
|
except Exception:
|
|
continue
|
|
return out
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
with _LOCK:
|
|
_COUNTERS.update(_load_counters())
|
|
|
|
|
|
def record_error(source: str, stage: str, exc: Any = None, context: Dict[str, Any] | None = None) -> int:
|
|
"""
|
|
Record one structured error event.
|
|
Returns current counter value for this error key.
|
|
"""
|
|
now = time.time()
|
|
src = str(source or "unknown")
|
|
stg = str(stage or "unknown")
|
|
key = f"{src}:{stg}"
|
|
msg = "" if exc is None else str(exc)
|
|
payload = {
|
|
"time": now,
|
|
"source": src,
|
|
"stage": stg,
|
|
"message": msg,
|
|
"context": context or {},
|
|
}
|
|
|
|
with _LOCK:
|
|
count = _COUNTERS.get(key, 0) + 1
|
|
_COUNTERS[key] = count
|
|
payload["count"] = count
|
|
|
|
# Throttle repetitive event writes for the same key.
|
|
last = float(_LAST_EVENT_TS.get(key, 0.0))
|
|
should_write_event = (now - last) >= _MIN_INTERVAL_SEC
|
|
if should_write_event:
|
|
_LAST_EVENT_TS[key] = now
|
|
try:
|
|
with _EVENTS_PATH.open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
_COUNTERS_PATH.write_text(json.dumps(_COUNTERS, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
return count
|
|
|
|
|
|
def get_error_counters() -> Dict[str, int]:
|
|
with _LOCK:
|
|
return dict(_COUNTERS)
|