Marcus/Lidar/SLAM_Diagnostics.py
2026-04-12 18:50:22 +04:00

82 lines
2.5 KiB
Python

from __future__ import annotations
import faulthandler
import json
import time
import traceback
from pathlib import Path
from typing import Any, Dict, Optional
def _safe_jsonable(value: Any) -> Any:
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, dict):
return {str(k): _safe_jsonable(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_safe_jsonable(v) for v in value]
try:
import numpy as np
if isinstance(value, np.ndarray):
if value.ndim == 0:
return float(value)
return {
"shape": list(value.shape),
"dtype": str(value.dtype),
}
if isinstance(value, (np.floating,)):
return float(value)
if isinstance(value, (np.integer,)):
return int(value)
if isinstance(value, (np.bool_,)):
return bool(value)
except Exception:
pass
return str(value)
class WorkerDiagnostics:
def __init__(self, log_path: Path):
self.log_path = Path(log_path)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._fh = open(self.log_path, "a", encoding="utf-8")
self.note("worker start")
try:
faulthandler.enable(file=self._fh, all_threads=True)
except Exception:
pass
def note(self, message: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
self._fh.write(f"\n[{ts}] {message}\n")
self._fh.flush()
def log_state(self, label: str, state: Optional[Dict[str, Any]]) -> None:
payload = _safe_jsonable(state or {})
self.note(f"{label}: {json.dumps(payload, ensure_ascii=True, sort_keys=True)}")
def log_exception(self, label: str, exc: BaseException, state: Optional[Dict[str, Any]] = None) -> None:
self.note(f"{label}: {type(exc).__name__}: {exc}")
if state:
self.log_state("state", state)
self._fh.write(traceback.format_exc())
if not traceback.format_exc().endswith("\n"):
self._fh.write("\n")
self._fh.flush()
def close(self) -> None:
try:
if faulthandler.is_enabled():
faulthandler.disable()
except Exception:
pass
try:
self._fh.flush()
self._fh.close()
except Exception:
pass
def __del__(self) -> None:
self.close()