1202 lines
45 KiB
Python
1202 lines
45 KiB
Python
import copy
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
APP_DATA_DEFAULT = PROJECT_ROOT / "Data"
|
|
CONFIG_JSON = APP_DATA_DEFAULT / "Settings" / "config.json"
|
|
LEGACY_CONFIG_JSONS = (
|
|
APP_DATA_DEFAULT / "config.json",
|
|
PROJECT_ROOT / "Scripts" / "config.json",
|
|
)
|
|
LEGACY_CONFIG_JSON = LEGACY_CONFIG_JSONS[0]
|
|
|
|
AUDIO_PROMPT_FILE_DEFAULTS: Dict[str, str] = {
|
|
"welcome_single": "welcome_single.wav",
|
|
"welcome_group": "welcome_group.wav",
|
|
"welcome_returning": "welcome_returning.wav",
|
|
"frame_single": "frame_single.wav",
|
|
"frame_group": "frame_group.wav",
|
|
"confirm_reminder": "confirm_reminder.wav",
|
|
"visitor_left": "visitor_left.wav",
|
|
"declined": "declined.wav",
|
|
"confirm_timeout": "confirm_timeout.wav",
|
|
"session_cancelled": "session_cancelled.wav",
|
|
"framing_timeout": "framing_timeout.wav",
|
|
"countdown_intro": "countdown_intro.wav",
|
|
"count_3": "count_3.wav",
|
|
"count_2": "count_2.wav",
|
|
"count_1": "count_1.wav",
|
|
"smile": "smile.wav",
|
|
"countdown_cancelled": "countdown_cancelled.wav",
|
|
"lost_from_frame": "lost_from_frame.wav",
|
|
"retake_recommended": "retake_recommended.wav",
|
|
"retake_yes": "retake_yes.wav",
|
|
"retake_limit": "retake_limit.wav",
|
|
"photo_saved_thanks": "photo_saved_thanks.wav",
|
|
}
|
|
|
|
_FALLBACK: Dict[str, Any] = {
|
|
"paths": {
|
|
"data_dir": "AI_Photographer/Data/G1",
|
|
"app_data_dir": "AI_Photographer/Data",
|
|
"app_settings_dir": "AI_Photographer/Data/Settings",
|
|
"app_scripts_dir": "AI_Photographer/Data/Scripts",
|
|
"app_runtime_dir": "AI_Photographer/Data/Runtime",
|
|
"app_notes_dir": "AI_Photographer/Data/Notes",
|
|
"audio_prompts_dir": "AI_Photographer/Data/Audio",
|
|
"audio_prompt_records_file": "AI_Photographer/Data/Settings/audio_prompt_records.json",
|
|
"scripts_dir": "AI_Photographer/Scripts",
|
|
"web_dir": "AI_Photographer/Web",
|
|
"photos_dir": "AI_Photographer/photos/Captures",
|
|
"people_dir": "AI_Photographer/photos/people",
|
|
"samples_dir": "AI_Photographer/photos/samples",
|
|
"replay_recordings_dir": "AI_Photographer/Data/G1",
|
|
"replay_recorder_script": "",
|
|
"home_file": "arm_home.jsonl",
|
|
"photo_phrases_file": "AI_Photographer/Data/Scripts/photo_command_ai.txt",
|
|
"sanad_script_file": "AI_Photographer/Data/Scripts/sanad_script.txt",
|
|
"runtime_health_file": "AI_Photographer/Data/Runtime/runtime_health.json",
|
|
"autonomous_state_file": "AI_Photographer/Data/Runtime/autonomous_state.json",
|
|
"upload_db": "AI_Photographer/Data/Runtime/upload_db.json",
|
|
},
|
|
"timing": {
|
|
"photo_total_sec": 10.0,
|
|
"photo_thanks_sec": 3.0,
|
|
"photo_delay_sec": 5.0,
|
|
"replay_capture_end_margin_sec": 0.25,
|
|
"loop_rate": 10.0,
|
|
"ai_query_interval": 1.0,
|
|
},
|
|
"server": {"photo_server_port": 8080},
|
|
"gemini": {
|
|
"api_key": "",
|
|
"mic_enabled": True,
|
|
"model": "models/gemini-2.5-flash-native-audio-preview-12-2025",
|
|
"voice_name": "Charon",
|
|
"system_prompt_fallback": (
|
|
"You are Sanad (Bousandah), a friendly Emirati photographer assistant. "
|
|
"Speak in UAE dialect (Khaleeji). Be short, energetic, and helpful."
|
|
),
|
|
},
|
|
"upload": {
|
|
"method": "http",
|
|
"url": "",
|
|
"s3_bucket": "",
|
|
"s3_region": "",
|
|
"s3_key": "",
|
|
"s3_secret": "",
|
|
},
|
|
"mode": {
|
|
"default_mode": "manual",
|
|
"current_mode": "manual",
|
|
},
|
|
"replay": {
|
|
"active_file": "photo_G3.jsonl",
|
|
},
|
|
"camera": {
|
|
"camera_index": 0,
|
|
"frame_width": 640,
|
|
"frame_height": 480,
|
|
"fps": 30,
|
|
"preferred_realsense_serial": "243622071722",
|
|
},
|
|
"vision": {
|
|
"detection_backend": "yolo",
|
|
"yolo_runtime": "ultralytics",
|
|
"yolo_ultralytics_device": "cpu",
|
|
"person_yolo_onnx": "",
|
|
"face_yolo_onnx": "",
|
|
"input_size": 640,
|
|
"person_class_id": 0,
|
|
"person_score_thresh": 0.35,
|
|
"face_score_thresh": 0.35,
|
|
"nms_iou_thresh": 0.45,
|
|
"group_min_people": 3,
|
|
"group_link_distance_px": 220.0,
|
|
"yolo_strict_required": True,
|
|
"gemini_context_hz": 8.0,
|
|
"gemini_context_silent": True,
|
|
"idle_voice_listen_enabled": True,
|
|
"hard_target_lock_enabled": True,
|
|
"retake_prompt_enabled": True,
|
|
"autonomous_greeting_replay_enabled": True,
|
|
"autonomous_greeting_replay_file": "right_hand_up.jsonl",
|
|
"autonomous_capture_replay_enabled": True,
|
|
"retake_max_per_session": 1,
|
|
"framing_headroom_min_ratio": 0.06,
|
|
"framing_headroom_max_ratio": 0.25,
|
|
"framing_eye_line_min_ratio": 0.28,
|
|
"framing_eye_line_max_ratio": 0.48,
|
|
"framing_retake_score_threshold": 0.68,
|
|
"face_recognition_enabled": True,
|
|
"face_recognition_threshold": 0.88,
|
|
},
|
|
"watchdog": {
|
|
"ws_initial_backoff_sec": 1.0,
|
|
"ws_max_backoff_sec": 20.0,
|
|
"component_restart_delay_sec": 1.0,
|
|
"camera_capture_retry_count": 2,
|
|
"camera_capture_retry_delay_sec": 0.8,
|
|
},
|
|
"audio_prompts": {
|
|
"mode": "audio",
|
|
"fallback_to_gemini": True,
|
|
"files": copy.deepcopy(AUDIO_PROMPT_FILE_DEFAULTS),
|
|
},
|
|
}
|
|
|
|
|
|
def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
|
|
out = dict(base)
|
|
for k, v in override.items():
|
|
if isinstance(v, dict) and isinstance(out.get(k), dict):
|
|
out[k] = _deep_merge(out[k], v)
|
|
else:
|
|
out[k] = v
|
|
return out
|
|
|
|
|
|
def _load_config() -> Dict[str, Any]:
|
|
for cfg_path in (CONFIG_JSON, *LEGACY_CONFIG_JSONS):
|
|
if not cfg_path.exists():
|
|
continue
|
|
try:
|
|
raw = json.loads(cfg_path.read_text(encoding="utf-8"))
|
|
if isinstance(raw, dict):
|
|
return _deep_merge(_FALLBACK, raw)
|
|
except Exception:
|
|
pass
|
|
return _FALLBACK
|
|
|
|
|
|
_CFG = _load_config()
|
|
|
|
|
|
def _c(path: str, default: Any = None) -> Any:
|
|
cur: Any = _CFG
|
|
for part in path.split("."):
|
|
if not isinstance(cur, dict) or part not in cur:
|
|
return default
|
|
cur = cur[part]
|
|
return cur
|
|
|
|
|
|
def _env(name: str, default: Any, cast):
|
|
val = os.environ.get(name)
|
|
if val is None or val == "":
|
|
return default
|
|
try:
|
|
return cast(val)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _as_path(rel_or_abs: str) -> Path:
|
|
p = Path(str(rel_or_abs)).expanduser()
|
|
if not p.is_absolute():
|
|
parts = p.parts
|
|
if parts and parts[0] == PROJECT_ROOT.name:
|
|
p = (PROJECT_ROOT.parent / p).resolve()
|
|
else:
|
|
p = (PROJECT_ROOT / p).resolve()
|
|
return p
|
|
|
|
|
|
# ==================================================
|
|
# PATHS
|
|
# ==================================================
|
|
DATA_DIR = _as_path(_c("paths.data_dir", "Data/G1"))
|
|
APP_DATA_DIR = _as_path(_c("paths.app_data_dir", "Data"))
|
|
APP_SETTINGS_DIR = _as_path(_c("paths.app_settings_dir", "Data/Settings"))
|
|
APP_SCRIPTS_DIR = _as_path(_c("paths.app_scripts_dir", "Data/Scripts"))
|
|
APP_RUNTIME_DIR = _as_path(_c("paths.app_runtime_dir", "Data/Runtime"))
|
|
APP_NOTES_DIR = _as_path(_c("paths.app_notes_dir", "Data/Notes"))
|
|
AUDIO_PROMPTS_DIR = _as_path(
|
|
_c("paths.audio_prompts_dir", "AI_Photographer/Data/Audio")
|
|
)
|
|
AUDIO_PROMPT_RECORDS_FILE = _as_path(
|
|
_c(
|
|
"paths.audio_prompt_records_file",
|
|
"AI_Photographer/Data/Settings/audio_prompt_records.json",
|
|
)
|
|
)
|
|
SCRIPTS_DIR = _as_path(_c("paths.scripts_dir", "Scripts"))
|
|
WEB_DIR = _as_path(_c("paths.web_dir", "Web"))
|
|
PHOTOS_DIR = _as_path(_c("paths.photos_dir", "photos/Captures"))
|
|
PEOPLE_DIR = _as_path(_c("paths.people_dir", "photos/people"))
|
|
SAMPLES_DIR = _as_path(_c("paths.samples_dir", "photos/samples"))
|
|
|
|
REPLAY_FILE = (DATA_DIR / str(_c("replay.active_file", "photo_G3.jsonl"))).resolve()
|
|
REPLAY_RECORDINGS_DIR = _as_path(_c("paths.replay_recordings_dir", "Data/G1"))
|
|
_REPLAY_RECORDER_SCRIPT_RAW = str(_c("paths.replay_recorder_script", "") or "").strip()
|
|
REPLAY_RECORDER_SCRIPT = _as_path(_REPLAY_RECORDER_SCRIPT_RAW) if _REPLAY_RECORDER_SCRIPT_RAW else ""
|
|
HOME_FILE = (DATA_DIR / str(_c("paths.home_file", "arm_home.jsonl"))).resolve()
|
|
PHOTO_PHRASES_FILE = _as_path(_c("paths.photo_phrases_file", "Data/Scripts/photo_command_ai.txt"))
|
|
SANAD_SCRIPT_FILE = _as_path(_c("paths.sanad_script_file", "Data/Scripts/sanad_script.txt"))
|
|
RUNTIME_HEALTH_FILE = _as_path(_c("paths.runtime_health_file", "Data/Runtime/runtime_health.json"))
|
|
AUTONOMOUS_STATE_FILE = _as_path(_c("paths.autonomous_state_file", "Data/Runtime/autonomous_state.json"))
|
|
UPLOAD_DB = _as_path(_c("paths.upload_db", "Data/Runtime/upload_db.json"))
|
|
|
|
# ==================================================
|
|
# TIMING / PHOTOGRAPHER
|
|
# ==================================================
|
|
PHOTO_TOTAL_SEC = _env("PHOTO_TOTAL_SEC", float(_c("timing.photo_total_sec", 10.0)), float)
|
|
PHOTO_THANKS_SEC = _env("PHOTO_THANKS_SEC", float(_c("timing.photo_thanks_sec", 3.0)), float)
|
|
PHOTO_DELAY_SEC = _env("PHOTO_DELAY_SEC", float(_c("timing.photo_delay_sec", 5.0)), float)
|
|
REPLAY_CAPTURE_END_MARGIN_SEC = _env(
|
|
"REPLAY_CAPTURE_END_MARGIN_SEC",
|
|
float(_c("timing.replay_capture_end_margin_sec", 0.25)),
|
|
float,
|
|
)
|
|
LOOP_RATE = _env("LOOP_RATE", float(_c("timing.loop_rate", 10.0)), float)
|
|
AI_QUERY_INTERVAL = _env("AI_QUERY_INTERVAL", float(_c("timing.ai_query_interval", 1.0)), float)
|
|
|
|
# ==================================================
|
|
# PHOTO SERVER
|
|
# ==================================================
|
|
PHOTO_SERVER_PORT = _env("PHOTO_SERVER_PORT", int(_c("server.photo_server_port", 8080)), int)
|
|
|
|
# ==================================================
|
|
# GEMINI
|
|
# ==================================================
|
|
# Gemini key source: Data/Settings/config.json -> gemini.api_key
|
|
GEMINI_API_KEY = str(_c("gemini.api_key", "")).strip()
|
|
GEMINI_MODEL = _env("GEMINI_MODEL", str(_c("gemini.model", "models/gemini-2.5-flash-native-audio-preview-12-2025")).strip(), str).strip()
|
|
VOICE_NAME = _env("VOICE_NAME", str(_c("gemini.voice_name", "Charon")).strip() or "Charon", str).strip() or "Charon"
|
|
SYSTEM_PROMPT_FALLBACK = str(_c("gemini.system_prompt_fallback", _FALLBACK["gemini"]["system_prompt_fallback"]))
|
|
|
|
# Keep alias for compatibility
|
|
MODEL = GEMINI_MODEL
|
|
URI = (
|
|
"wss://generativelanguage.googleapis.com/ws/"
|
|
"google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent"
|
|
f"?key={GEMINI_API_KEY}"
|
|
)
|
|
|
|
|
|
def validate_api_key(key: str) -> bool:
|
|
k = (key or "").strip()
|
|
if not k:
|
|
return False
|
|
if "your_" in k.lower() or "replace" in k.lower():
|
|
return False
|
|
return len(k) >= 20
|
|
|
|
|
|
def load_system_prompt() -> str:
|
|
try:
|
|
if SANAD_SCRIPT_FILE.exists():
|
|
content = SANAD_SCRIPT_FILE.read_text(encoding="utf-8-sig").strip()
|
|
if content:
|
|
return content
|
|
except Exception:
|
|
pass
|
|
return SYSTEM_PROMPT_FALLBACK
|
|
|
|
|
|
def read_gemini_mic_enabled() -> bool:
|
|
raw = _read_config_json_raw()
|
|
gemini_cfg = raw.get("gemini")
|
|
if not isinstance(gemini_cfg, dict):
|
|
gemini_cfg = {}
|
|
return _coerce_bool(gemini_cfg.get("mic_enabled", _FALLBACK["gemini"]["mic_enabled"]), True)
|
|
|
|
|
|
def write_gemini_mic_enabled(enabled: Any) -> bool:
|
|
raw = _read_config_json_raw()
|
|
gemini_cfg = raw.get("gemini")
|
|
if not isinstance(gemini_cfg, dict):
|
|
gemini_cfg = {}
|
|
gemini_cfg["mic_enabled"] = _coerce_bool(enabled, bool(_FALLBACK["gemini"]["mic_enabled"]))
|
|
raw["gemini"] = gemini_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return bool(gemini_cfg["mic_enabled"])
|
|
|
|
|
|
def read_camera_preferred_realsense_serial() -> str:
|
|
raw = _read_config_json_raw()
|
|
camera_cfg = raw.get("camera")
|
|
if not isinstance(camera_cfg, dict):
|
|
camera_cfg = {}
|
|
return str(camera_cfg.get("preferred_realsense_serial", _FALLBACK["camera"]["preferred_realsense_serial"]) or "").strip()
|
|
|
|
|
|
def write_camera_preferred_realsense_serial(serial: Any) -> str:
|
|
raw = _read_config_json_raw()
|
|
camera_cfg = raw.get("camera")
|
|
if not isinstance(camera_cfg, dict):
|
|
camera_cfg = {}
|
|
camera_cfg["preferred_realsense_serial"] = str(serial or "").strip()
|
|
raw["camera"] = camera_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return str(camera_cfg["preferred_realsense_serial"])
|
|
|
|
|
|
def resolve_replay_path(path_value: Any) -> Path:
|
|
p = Path(str(path_value or "").strip()).expanduser()
|
|
if not p.is_absolute():
|
|
p = (DATA_DIR / p).resolve()
|
|
return p
|
|
|
|
|
|
def read_selected_replay_name() -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_replay_block(raw)
|
|
replay_cfg = raw.get("replay", {})
|
|
value = str(replay_cfg.get("active_file", "") or "").strip()
|
|
if value:
|
|
return value.replace("\\", "/").lstrip("/")
|
|
try:
|
|
return str(REPLAY_FILE.resolve().relative_to(DATA_DIR)).replace("\\", "/")
|
|
except Exception:
|
|
return REPLAY_FILE.name
|
|
|
|
|
|
def read_selected_replay_path() -> Path:
|
|
return resolve_replay_path(read_selected_replay_name())
|
|
|
|
|
|
def write_selected_replay_name(name: Any) -> str:
|
|
global REPLAY_FILE
|
|
clean = str(name or "").strip().replace("\\", "/").lstrip("/")
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_replay_block(raw)
|
|
replay_cfg = raw.get("replay", {})
|
|
replay_cfg["active_file"] = clean
|
|
raw["replay"] = replay_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
try:
|
|
REPLAY_FILE = resolve_replay_path(clean)
|
|
except Exception:
|
|
pass
|
|
return clean
|
|
|
|
|
|
# ==================================================
|
|
# UPLOAD / CLOUD
|
|
# ==================================================
|
|
UPLOAD_METHOD = _env("UPLOAD_METHOD", str(_c("upload.method", "http")).strip(), str).strip()
|
|
UPLOAD_URL = _env("UPLOAD_URL", str(_c("upload.url", "")).strip(), str).strip()
|
|
UPLOAD_S3_BUCKET = _env("UPLOAD_S3_BUCKET", str(_c("upload.s3_bucket", "")).strip(), str).strip()
|
|
UPLOAD_S3_REGION = _env("UPLOAD_S3_REGION", str(_c("upload.s3_region", "")).strip(), str).strip()
|
|
UPLOAD_S3_KEY = _env("UPLOAD_S3_KEY", str(_c("upload.s3_key", "")).strip(), str).strip()
|
|
UPLOAD_S3_SECRET = _env("UPLOAD_S3_SECRET", str(_c("upload.s3_secret", "")).strip(), str).strip()
|
|
|
|
# ==================================================
|
|
# MODE / CONTROL
|
|
# ==================================================
|
|
DEFAULT_MODE = _env("DEFAULT_MODE", str(_c("mode.default_mode", "manual")).strip(), str).strip() or "manual"
|
|
if DEFAULT_MODE == "command":
|
|
DEFAULT_MODE = "ai"
|
|
if DEFAULT_MODE not in ("manual", "ai"):
|
|
DEFAULT_MODE = "manual"
|
|
|
|
try:
|
|
UPLOAD_DB.parent.mkdir(parents=True, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
for _dir in (
|
|
APP_DATA_DIR,
|
|
APP_SETTINGS_DIR,
|
|
APP_SCRIPTS_DIR,
|
|
APP_RUNTIME_DIR,
|
|
APP_NOTES_DIR,
|
|
AUDIO_PROMPTS_DIR,
|
|
AUDIO_PROMPT_RECORDS_FILE.parent,
|
|
PHOTOS_DIR,
|
|
PEOPLE_DIR,
|
|
SAMPLES_DIR,
|
|
REPLAY_RECORDINGS_DIR,
|
|
):
|
|
try:
|
|
_dir.mkdir(parents=True, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _read_config_json_raw() -> Dict[str, Any]:
|
|
for cfg_path in (CONFIG_JSON, *LEGACY_CONFIG_JSONS):
|
|
try:
|
|
if cfg_path.exists():
|
|
raw = json.loads(cfg_path.read_text(encoding="utf-8"))
|
|
if isinstance(raw, dict):
|
|
return raw
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _write_config_json_raw(raw: Dict[str, Any]) -> None:
|
|
CONFIG_JSON.parent.mkdir(parents=True, exist_ok=True)
|
|
CONFIG_JSON.write_text(json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _coerce_mode(value: Any) -> str:
|
|
m = str(value or "").strip().lower()
|
|
if m == "command":
|
|
return "ai"
|
|
if m not in ("manual", "ai"):
|
|
return "manual"
|
|
return m
|
|
|
|
|
|
def _coerce_bool(value: Any, default: bool = False) -> bool:
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, (int, float)):
|
|
return bool(value)
|
|
if isinstance(value, str):
|
|
return value.strip().lower() in ("1", "true", "yes", "on", "y")
|
|
return bool(default)
|
|
|
|
|
|
def _coerce_detection_backend(value: Any) -> str:
|
|
v = str(value or "").strip().lower()
|
|
if v in ("yolo", "normal"):
|
|
return v
|
|
return "yolo"
|
|
|
|
|
|
def _coerce_audio_prompt_mode(value: Any) -> str:
|
|
v = str(value or "").strip().lower()
|
|
if v in ("audio", "gemini"):
|
|
return v
|
|
return "audio"
|
|
|
|
|
|
def _coerce_yolo_runtime(value: Any) -> str:
|
|
v = str(value or "").strip().lower()
|
|
if v in ("ultralytics", "opencv"):
|
|
return v
|
|
return "ultralytics"
|
|
|
|
|
|
def _coerce_int(value: Any, default: int, min_v: int | None = None, max_v: int | None = None) -> int:
|
|
try:
|
|
out = int(value)
|
|
except Exception:
|
|
out = int(default)
|
|
if min_v is not None:
|
|
out = max(min_v, out)
|
|
if max_v is not None:
|
|
out = min(max_v, out)
|
|
return out
|
|
|
|
|
|
def _coerce_float(value: Any, default: float, min_v: float | None = None, max_v: float | None = None) -> float:
|
|
try:
|
|
out = float(value)
|
|
except Exception:
|
|
out = float(default)
|
|
if min_v is not None:
|
|
out = max(min_v, out)
|
|
if max_v is not None:
|
|
out = min(max_v, out)
|
|
return out
|
|
|
|
|
|
def _ensure_mode_block(raw: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(raw, dict) or not raw:
|
|
raw = copy.deepcopy(_FALLBACK)
|
|
mode_cfg = raw.get("mode")
|
|
if not isinstance(mode_cfg, dict):
|
|
mode_cfg = {}
|
|
default_mode = _coerce_mode(mode_cfg.get("default_mode", DEFAULT_MODE))
|
|
current_mode = _coerce_mode(mode_cfg.get("current_mode", default_mode))
|
|
mode_cfg["default_mode"] = default_mode
|
|
mode_cfg["current_mode"] = current_mode
|
|
raw["mode"] = mode_cfg
|
|
return raw
|
|
|
|
|
|
def _ensure_replay_block(raw: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(raw, dict) or not raw:
|
|
raw = copy.deepcopy(_FALLBACK)
|
|
replay_cfg = raw.get("replay")
|
|
if not isinstance(replay_cfg, dict):
|
|
replay_cfg = {}
|
|
active_file = str(replay_cfg.get("active_file", _FALLBACK["replay"]["active_file"]) or "").strip()
|
|
replay_cfg["active_file"] = active_file or str(_FALLBACK["replay"]["active_file"])
|
|
raw["replay"] = replay_cfg
|
|
return raw
|
|
|
|
|
|
def _ensure_vision_block(raw: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(raw, dict) or not raw:
|
|
raw = copy.deepcopy(_FALLBACK)
|
|
vision_cfg = raw.get("vision")
|
|
if not isinstance(vision_cfg, dict):
|
|
vision_cfg = {}
|
|
vision_cfg["detection_backend"] = _coerce_detection_backend(
|
|
vision_cfg.get("detection_backend", _FALLBACK["vision"]["detection_backend"])
|
|
)
|
|
vision_cfg["yolo_runtime"] = _coerce_yolo_runtime(
|
|
vision_cfg.get("yolo_runtime", _FALLBACK["vision"]["yolo_runtime"])
|
|
)
|
|
dev = str(vision_cfg.get("yolo_ultralytics_device", _FALLBACK["vision"]["yolo_ultralytics_device"]) or "").strip()
|
|
vision_cfg["yolo_ultralytics_device"] = dev or "cpu"
|
|
try:
|
|
group_min = int(vision_cfg.get("group_min_people", _FALLBACK["vision"]["group_min_people"]))
|
|
except Exception:
|
|
group_min = int(_FALLBACK["vision"]["group_min_people"])
|
|
vision_cfg["group_min_people"] = max(2, group_min)
|
|
|
|
vision_cfg["yolo_strict_required"] = _coerce_bool(
|
|
vision_cfg.get("yolo_strict_required", _FALLBACK["vision"]["yolo_strict_required"]),
|
|
bool(_FALLBACK["vision"]["yolo_strict_required"]),
|
|
)
|
|
|
|
try:
|
|
hz = float(vision_cfg.get("gemini_context_hz", _FALLBACK["vision"]["gemini_context_hz"]))
|
|
except Exception:
|
|
hz = float(_FALLBACK["vision"]["gemini_context_hz"])
|
|
vision_cfg["gemini_context_hz"] = max(0.5, min(30.0, hz))
|
|
|
|
vision_cfg["gemini_context_silent"] = _coerce_bool(
|
|
vision_cfg.get("gemini_context_silent", _FALLBACK["vision"]["gemini_context_silent"]),
|
|
bool(_FALLBACK["vision"]["gemini_context_silent"]),
|
|
)
|
|
vision_cfg["idle_voice_listen_enabled"] = _coerce_bool(
|
|
vision_cfg.get("idle_voice_listen_enabled", _FALLBACK["vision"]["idle_voice_listen_enabled"]),
|
|
bool(_FALLBACK["vision"]["idle_voice_listen_enabled"]),
|
|
)
|
|
|
|
vision_cfg["hard_target_lock_enabled"] = _coerce_bool(
|
|
vision_cfg.get("hard_target_lock_enabled", _FALLBACK["vision"]["hard_target_lock_enabled"]),
|
|
bool(_FALLBACK["vision"]["hard_target_lock_enabled"]),
|
|
)
|
|
vision_cfg["retake_prompt_enabled"] = _coerce_bool(
|
|
vision_cfg.get("retake_prompt_enabled", _FALLBACK["vision"]["retake_prompt_enabled"]),
|
|
bool(_FALLBACK["vision"]["retake_prompt_enabled"]),
|
|
)
|
|
vision_cfg["autonomous_greeting_replay_enabled"] = _coerce_bool(
|
|
vision_cfg.get("autonomous_greeting_replay_enabled", _FALLBACK["vision"]["autonomous_greeting_replay_enabled"]),
|
|
bool(_FALLBACK["vision"]["autonomous_greeting_replay_enabled"]),
|
|
)
|
|
greet_replay_file = str(
|
|
vision_cfg.get("autonomous_greeting_replay_file", _FALLBACK["vision"]["autonomous_greeting_replay_file"]) or ""
|
|
).strip()
|
|
vision_cfg["autonomous_greeting_replay_file"] = (
|
|
greet_replay_file or str(_FALLBACK["vision"]["autonomous_greeting_replay_file"])
|
|
)
|
|
vision_cfg["autonomous_capture_replay_enabled"] = _coerce_bool(
|
|
vision_cfg.get("autonomous_capture_replay_enabled", _FALLBACK["vision"]["autonomous_capture_replay_enabled"]),
|
|
bool(_FALLBACK["vision"]["autonomous_capture_replay_enabled"]),
|
|
)
|
|
vision_cfg["retake_max_per_session"] = _coerce_int(
|
|
vision_cfg.get("retake_max_per_session", _FALLBACK["vision"]["retake_max_per_session"]),
|
|
int(_FALLBACK["vision"]["retake_max_per_session"]),
|
|
min_v=0,
|
|
max_v=5,
|
|
)
|
|
vision_cfg["framing_headroom_min_ratio"] = _coerce_float(
|
|
vision_cfg.get("framing_headroom_min_ratio", _FALLBACK["vision"]["framing_headroom_min_ratio"]),
|
|
float(_FALLBACK["vision"]["framing_headroom_min_ratio"]),
|
|
min_v=0.0,
|
|
max_v=0.8,
|
|
)
|
|
vision_cfg["framing_headroom_max_ratio"] = _coerce_float(
|
|
vision_cfg.get("framing_headroom_max_ratio", _FALLBACK["vision"]["framing_headroom_max_ratio"]),
|
|
float(_FALLBACK["vision"]["framing_headroom_max_ratio"]),
|
|
min_v=0.0,
|
|
max_v=0.95,
|
|
)
|
|
if vision_cfg["framing_headroom_max_ratio"] <= vision_cfg["framing_headroom_min_ratio"]:
|
|
vision_cfg["framing_headroom_max_ratio"] = min(0.95, vision_cfg["framing_headroom_min_ratio"] + 0.05)
|
|
|
|
vision_cfg["framing_eye_line_min_ratio"] = _coerce_float(
|
|
vision_cfg.get("framing_eye_line_min_ratio", _FALLBACK["vision"]["framing_eye_line_min_ratio"]),
|
|
float(_FALLBACK["vision"]["framing_eye_line_min_ratio"]),
|
|
min_v=0.0,
|
|
max_v=0.9,
|
|
)
|
|
vision_cfg["framing_eye_line_max_ratio"] = _coerce_float(
|
|
vision_cfg.get("framing_eye_line_max_ratio", _FALLBACK["vision"]["framing_eye_line_max_ratio"]),
|
|
float(_FALLBACK["vision"]["framing_eye_line_max_ratio"]),
|
|
min_v=0.05,
|
|
max_v=1.0,
|
|
)
|
|
if vision_cfg["framing_eye_line_max_ratio"] <= vision_cfg["framing_eye_line_min_ratio"]:
|
|
vision_cfg["framing_eye_line_max_ratio"] = min(1.0, vision_cfg["framing_eye_line_min_ratio"] + 0.05)
|
|
|
|
vision_cfg["framing_retake_score_threshold"] = _coerce_float(
|
|
vision_cfg.get("framing_retake_score_threshold", _FALLBACK["vision"]["framing_retake_score_threshold"]),
|
|
float(_FALLBACK["vision"]["framing_retake_score_threshold"]),
|
|
min_v=0.0,
|
|
max_v=1.0,
|
|
)
|
|
vision_cfg["face_recognition_enabled"] = _coerce_bool(
|
|
vision_cfg.get("face_recognition_enabled", _FALLBACK["vision"]["face_recognition_enabled"]),
|
|
bool(_FALLBACK["vision"]["face_recognition_enabled"]),
|
|
)
|
|
vision_cfg["face_recognition_threshold"] = _coerce_float(
|
|
vision_cfg.get("face_recognition_threshold", _FALLBACK["vision"]["face_recognition_threshold"]),
|
|
float(_FALLBACK["vision"]["face_recognition_threshold"]),
|
|
min_v=0.5,
|
|
max_v=0.995,
|
|
)
|
|
raw["vision"] = vision_cfg
|
|
return raw
|
|
|
|
|
|
def _ensure_watchdog_block(raw: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(raw, dict) or not raw:
|
|
raw = copy.deepcopy(_FALLBACK)
|
|
wd_cfg = raw.get("watchdog")
|
|
if not isinstance(wd_cfg, dict):
|
|
wd_cfg = {}
|
|
|
|
wd_cfg["ws_initial_backoff_sec"] = _coerce_float(
|
|
wd_cfg.get("ws_initial_backoff_sec", _FALLBACK["watchdog"]["ws_initial_backoff_sec"]),
|
|
float(_FALLBACK["watchdog"]["ws_initial_backoff_sec"]),
|
|
min_v=0.1,
|
|
max_v=60.0,
|
|
)
|
|
wd_cfg["ws_max_backoff_sec"] = _coerce_float(
|
|
wd_cfg.get("ws_max_backoff_sec", _FALLBACK["watchdog"]["ws_max_backoff_sec"]),
|
|
float(_FALLBACK["watchdog"]["ws_max_backoff_sec"]),
|
|
min_v=0.5,
|
|
max_v=300.0,
|
|
)
|
|
if wd_cfg["ws_max_backoff_sec"] < wd_cfg["ws_initial_backoff_sec"]:
|
|
wd_cfg["ws_max_backoff_sec"] = wd_cfg["ws_initial_backoff_sec"]
|
|
|
|
wd_cfg["component_restart_delay_sec"] = _coerce_float(
|
|
wd_cfg.get("component_restart_delay_sec", _FALLBACK["watchdog"]["component_restart_delay_sec"]),
|
|
float(_FALLBACK["watchdog"]["component_restart_delay_sec"]),
|
|
min_v=0.1,
|
|
max_v=20.0,
|
|
)
|
|
wd_cfg["camera_capture_retry_count"] = _coerce_int(
|
|
wd_cfg.get("camera_capture_retry_count", _FALLBACK["watchdog"]["camera_capture_retry_count"]),
|
|
int(_FALLBACK["watchdog"]["camera_capture_retry_count"]),
|
|
min_v=0,
|
|
max_v=10,
|
|
)
|
|
wd_cfg["camera_capture_retry_delay_sec"] = _coerce_float(
|
|
wd_cfg.get("camera_capture_retry_delay_sec", _FALLBACK["watchdog"]["camera_capture_retry_delay_sec"]),
|
|
float(_FALLBACK["watchdog"]["camera_capture_retry_delay_sec"]),
|
|
min_v=0.0,
|
|
max_v=30.0,
|
|
)
|
|
|
|
raw["watchdog"] = wd_cfg
|
|
return raw
|
|
|
|
|
|
def _ensure_audio_prompts_block(raw: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(raw, dict) or not raw:
|
|
raw = copy.deepcopy(_FALLBACK)
|
|
prompt_cfg = raw.get("audio_prompts")
|
|
if not isinstance(prompt_cfg, dict):
|
|
prompt_cfg = {}
|
|
|
|
prompt_cfg["mode"] = _coerce_audio_prompt_mode(
|
|
prompt_cfg.get("mode", _FALLBACK["audio_prompts"]["mode"])
|
|
)
|
|
|
|
prompt_cfg["fallback_to_gemini"] = _coerce_bool(
|
|
prompt_cfg.get("fallback_to_gemini", _FALLBACK["audio_prompts"]["fallback_to_gemini"]),
|
|
bool(_FALLBACK["audio_prompts"]["fallback_to_gemini"]),
|
|
)
|
|
|
|
files_cfg = prompt_cfg.get("files")
|
|
if not isinstance(files_cfg, dict):
|
|
files_cfg = {}
|
|
|
|
normalized_files: Dict[str, str] = {}
|
|
for key, default_name in AUDIO_PROMPT_FILE_DEFAULTS.items():
|
|
clean = str(files_cfg.get(key, default_name) or "").strip().replace("\\", "/").lstrip("/")
|
|
normalized_files[key] = clean or default_name
|
|
|
|
prompt_cfg["files"] = normalized_files
|
|
raw["audio_prompts"] = prompt_cfg
|
|
return raw
|
|
|
|
|
|
def _refresh_cached_cfg(raw: Dict[str, Any]) -> None:
|
|
global _CFG
|
|
_CFG = _deep_merge(_FALLBACK, raw if isinstance(raw, dict) else {})
|
|
|
|
|
|
def read_runtime_mode() -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_mode_block(raw)
|
|
mode_cfg = raw.get("mode", {})
|
|
return _coerce_mode(mode_cfg.get("current_mode", mode_cfg.get("default_mode", DEFAULT_MODE)))
|
|
|
|
|
|
def write_runtime_mode(mode: str) -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_mode_block(raw)
|
|
mode_cfg = raw.get("mode", {})
|
|
mode_cfg["current_mode"] = _coerce_mode(mode)
|
|
raw["mode"] = mode_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return mode_cfg["current_mode"]
|
|
|
|
|
|
def read_vision_detector_backend() -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
return _coerce_detection_backend(vision_cfg.get("detection_backend", "yolo"))
|
|
|
|
|
|
def write_vision_detector_backend(backend: str) -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
vision_cfg["detection_backend"] = _coerce_detection_backend(backend)
|
|
raw["vision"] = vision_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return str(vision_cfg["detection_backend"])
|
|
|
|
|
|
def read_vision_yolo_runtime() -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
return _coerce_yolo_runtime(vision_cfg.get("yolo_runtime", _FALLBACK["vision"]["yolo_runtime"]))
|
|
|
|
|
|
def read_vision_yolo_ultralytics_device() -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
dev = str(vision_cfg.get("yolo_ultralytics_device", _FALLBACK["vision"]["yolo_ultralytics_device"]) or "").strip()
|
|
return dev or "cpu"
|
|
|
|
|
|
def read_vision_yolo_strict_required() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
return _coerce_bool(vision_cfg.get("yolo_strict_required", _FALLBACK["vision"]["yolo_strict_required"]), True)
|
|
|
|
|
|
def read_vision_gemini_context_hz() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
try:
|
|
hz = float(vision_cfg.get("gemini_context_hz", _FALLBACK["vision"]["gemini_context_hz"]))
|
|
except Exception:
|
|
hz = float(_FALLBACK["vision"]["gemini_context_hz"])
|
|
return max(0.5, min(30.0, hz))
|
|
|
|
|
|
def read_vision_gemini_context_silent() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
return _coerce_bool(vision_cfg.get("gemini_context_silent", _FALLBACK["vision"]["gemini_context_silent"]), True)
|
|
|
|
|
|
def read_vision_idle_voice_listen_enabled() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
return _coerce_bool(
|
|
vision_cfg.get("idle_voice_listen_enabled", _FALLBACK["vision"]["idle_voice_listen_enabled"]),
|
|
True,
|
|
)
|
|
|
|
|
|
def _write_vision_cfg_value(key: str, value: Any, *, bool_field: bool = False) -> Any:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
raw = _ensure_watchdog_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
if bool_field:
|
|
vision_cfg[key] = _coerce_bool(value, bool(_FALLBACK["vision"].get(key, False)))
|
|
else:
|
|
vision_cfg[key] = value
|
|
raw["vision"] = vision_cfg
|
|
raw = _ensure_vision_block(raw)
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return raw.get("vision", {}).get(key)
|
|
|
|
|
|
def read_vision_hard_target_lock_enabled() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
return _coerce_bool(raw.get("vision", {}).get("hard_target_lock_enabled", True), True)
|
|
|
|
|
|
def write_vision_hard_target_lock_enabled(enabled: Any) -> bool:
|
|
return bool(_write_vision_cfg_value("hard_target_lock_enabled", enabled, bool_field=True))
|
|
|
|
|
|
def read_vision_retake_prompt_enabled() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
return _coerce_bool(raw.get("vision", {}).get("retake_prompt_enabled", True), True)
|
|
|
|
|
|
def write_vision_retake_prompt_enabled(enabled: Any) -> bool:
|
|
return bool(_write_vision_cfg_value("retake_prompt_enabled", enabled, bool_field=True))
|
|
|
|
|
|
def read_vision_autonomous_greeting_replay_enabled() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
return _coerce_bool(
|
|
raw.get("vision", {}).get(
|
|
"autonomous_greeting_replay_enabled",
|
|
_FALLBACK["vision"]["autonomous_greeting_replay_enabled"],
|
|
),
|
|
bool(_FALLBACK["vision"]["autonomous_greeting_replay_enabled"]),
|
|
)
|
|
|
|
|
|
def read_vision_autonomous_greeting_replay_file() -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
val = str(
|
|
raw.get("vision", {}).get(
|
|
"autonomous_greeting_replay_file",
|
|
_FALLBACK["vision"]["autonomous_greeting_replay_file"],
|
|
)
|
|
or ""
|
|
).strip()
|
|
return val or str(_FALLBACK["vision"]["autonomous_greeting_replay_file"])
|
|
|
|
|
|
def read_vision_autonomous_capture_replay_enabled() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
return _coerce_bool(
|
|
raw.get("vision", {}).get(
|
|
"autonomous_capture_replay_enabled",
|
|
_FALLBACK["vision"]["autonomous_capture_replay_enabled"],
|
|
),
|
|
bool(_FALLBACK["vision"]["autonomous_capture_replay_enabled"]),
|
|
)
|
|
|
|
|
|
def write_vision_autonomous_greeting_replay_enabled(enabled: Any) -> bool:
|
|
return bool(_write_vision_cfg_value("autonomous_greeting_replay_enabled", enabled, bool_field=True))
|
|
|
|
|
|
def write_vision_autonomous_greeting_replay_file(filename: Any) -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
cleaned = str(filename or "").strip().replace("\\", "/").lstrip("/")
|
|
vision_cfg["autonomous_greeting_replay_file"] = cleaned or str(
|
|
_FALLBACK["vision"]["autonomous_greeting_replay_file"]
|
|
)
|
|
raw["vision"] = vision_cfg
|
|
raw = _ensure_vision_block(raw)
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return str(raw.get("vision", {}).get("autonomous_greeting_replay_file", ""))
|
|
|
|
|
|
def write_vision_autonomous_capture_replay_enabled(enabled: Any) -> bool:
|
|
return bool(_write_vision_cfg_value("autonomous_capture_replay_enabled", enabled, bool_field=True))
|
|
|
|
|
|
def read_vision_retake_max_per_session() -> int:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
v = raw.get("vision", {}).get("retake_max_per_session", _FALLBACK["vision"]["retake_max_per_session"])
|
|
return _coerce_int(v, int(_FALLBACK["vision"]["retake_max_per_session"]), min_v=0, max_v=5)
|
|
|
|
|
|
def read_vision_framing_headroom_min_ratio() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
v = raw.get("vision", {}).get("framing_headroom_min_ratio", _FALLBACK["vision"]["framing_headroom_min_ratio"])
|
|
return _coerce_float(v, float(_FALLBACK["vision"]["framing_headroom_min_ratio"]), min_v=0.0, max_v=0.8)
|
|
|
|
|
|
def read_vision_framing_headroom_max_ratio() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
v = raw.get("vision", {}).get("framing_headroom_max_ratio", _FALLBACK["vision"]["framing_headroom_max_ratio"])
|
|
return _coerce_float(v, float(_FALLBACK["vision"]["framing_headroom_max_ratio"]), min_v=0.0, max_v=0.95)
|
|
|
|
|
|
def read_vision_framing_eye_line_min_ratio() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
v = raw.get("vision", {}).get("framing_eye_line_min_ratio", _FALLBACK["vision"]["framing_eye_line_min_ratio"])
|
|
return _coerce_float(v, float(_FALLBACK["vision"]["framing_eye_line_min_ratio"]), min_v=0.0, max_v=0.9)
|
|
|
|
|
|
def read_vision_framing_eye_line_max_ratio() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
v = raw.get("vision", {}).get("framing_eye_line_max_ratio", _FALLBACK["vision"]["framing_eye_line_max_ratio"])
|
|
return _coerce_float(v, float(_FALLBACK["vision"]["framing_eye_line_max_ratio"]), min_v=0.05, max_v=1.0)
|
|
|
|
|
|
def read_vision_framing_retake_score_threshold() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
v = raw.get("vision", {}).get(
|
|
"framing_retake_score_threshold",
|
|
_FALLBACK["vision"]["framing_retake_score_threshold"],
|
|
)
|
|
return _coerce_float(v, float(_FALLBACK["vision"]["framing_retake_score_threshold"]), min_v=0.0, max_v=1.0)
|
|
|
|
|
|
def read_vision_face_recognition_enabled() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
return _coerce_bool(
|
|
raw.get("vision", {}).get("face_recognition_enabled", _FALLBACK["vision"]["face_recognition_enabled"]),
|
|
bool(_FALLBACK["vision"]["face_recognition_enabled"]),
|
|
)
|
|
|
|
|
|
def write_vision_face_recognition_enabled(enabled: Any) -> bool:
|
|
return bool(_write_vision_cfg_value("face_recognition_enabled", enabled, bool_field=True))
|
|
|
|
|
|
def read_vision_face_recognition_threshold() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
v = raw.get("vision", {}).get("face_recognition_threshold", _FALLBACK["vision"]["face_recognition_threshold"])
|
|
return _coerce_float(v, float(_FALLBACK["vision"]["face_recognition_threshold"]), min_v=0.5, max_v=0.995)
|
|
|
|
|
|
def write_vision_face_recognition_threshold(value: Any) -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_vision_block(raw)
|
|
vision_cfg = raw.get("vision", {})
|
|
vision_cfg["face_recognition_threshold"] = _coerce_float(
|
|
value,
|
|
float(_FALLBACK["vision"]["face_recognition_threshold"]),
|
|
min_v=0.5,
|
|
max_v=0.995,
|
|
)
|
|
raw["vision"] = vision_cfg
|
|
raw = _ensure_vision_block(raw)
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return float(raw.get("vision", {}).get("face_recognition_threshold", _FALLBACK["vision"]["face_recognition_threshold"]))
|
|
|
|
|
|
def read_watchdog_ws_initial_backoff_sec() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_watchdog_block(raw)
|
|
wd = raw.get("watchdog", {})
|
|
return _coerce_float(
|
|
wd.get("ws_initial_backoff_sec", _FALLBACK["watchdog"]["ws_initial_backoff_sec"]),
|
|
float(_FALLBACK["watchdog"]["ws_initial_backoff_sec"]),
|
|
min_v=0.1,
|
|
max_v=60.0,
|
|
)
|
|
|
|
|
|
def read_watchdog_ws_max_backoff_sec() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_watchdog_block(raw)
|
|
wd = raw.get("watchdog", {})
|
|
return _coerce_float(
|
|
wd.get("ws_max_backoff_sec", _FALLBACK["watchdog"]["ws_max_backoff_sec"]),
|
|
float(_FALLBACK["watchdog"]["ws_max_backoff_sec"]),
|
|
min_v=0.5,
|
|
max_v=300.0,
|
|
)
|
|
|
|
|
|
def read_watchdog_component_restart_delay_sec() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_watchdog_block(raw)
|
|
wd = raw.get("watchdog", {})
|
|
return _coerce_float(
|
|
wd.get("component_restart_delay_sec", _FALLBACK["watchdog"]["component_restart_delay_sec"]),
|
|
float(_FALLBACK["watchdog"]["component_restart_delay_sec"]),
|
|
min_v=0.1,
|
|
max_v=20.0,
|
|
)
|
|
|
|
|
|
def read_watchdog_camera_capture_retry_count() -> int:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_watchdog_block(raw)
|
|
wd = raw.get("watchdog", {})
|
|
return _coerce_int(
|
|
wd.get("camera_capture_retry_count", _FALLBACK["watchdog"]["camera_capture_retry_count"]),
|
|
int(_FALLBACK["watchdog"]["camera_capture_retry_count"]),
|
|
min_v=0,
|
|
max_v=10,
|
|
)
|
|
|
|
|
|
def read_watchdog_camera_capture_retry_delay_sec() -> float:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_watchdog_block(raw)
|
|
wd = raw.get("watchdog", {})
|
|
return _coerce_float(
|
|
wd.get("camera_capture_retry_delay_sec", _FALLBACK["watchdog"]["camera_capture_retry_delay_sec"]),
|
|
float(_FALLBACK["watchdog"]["camera_capture_retry_delay_sec"]),
|
|
min_v=0.0,
|
|
max_v=30.0,
|
|
)
|
|
|
|
|
|
def read_audio_prompts_fallback_to_gemini() -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_audio_prompts_block(raw)
|
|
prompt_cfg = raw.get("audio_prompts", {})
|
|
return _coerce_bool(prompt_cfg.get("fallback_to_gemini", True), True)
|
|
|
|
|
|
def read_audio_prompt_mode() -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_audio_prompts_block(raw)
|
|
prompt_cfg = raw.get("audio_prompts", {})
|
|
return _coerce_audio_prompt_mode(prompt_cfg.get("mode", "audio"))
|
|
|
|
|
|
def write_audio_prompt_mode(mode: Any) -> str:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_audio_prompts_block(raw)
|
|
prompt_cfg = raw.get("audio_prompts", {})
|
|
prompt_cfg["mode"] = _coerce_audio_prompt_mode(mode)
|
|
raw["audio_prompts"] = prompt_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return str(prompt_cfg["mode"])
|
|
|
|
|
|
def write_audio_prompts_fallback_to_gemini(enabled: Any) -> bool:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_audio_prompts_block(raw)
|
|
prompt_cfg = raw.get("audio_prompts", {})
|
|
prompt_cfg["fallback_to_gemini"] = _coerce_bool(enabled, True)
|
|
raw["audio_prompts"] = prompt_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return bool(prompt_cfg["fallback_to_gemini"])
|
|
|
|
|
|
def read_audio_prompt_files() -> Dict[str, str]:
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_audio_prompts_block(raw)
|
|
prompt_cfg = raw.get("audio_prompts", {})
|
|
files_cfg = prompt_cfg.get("files", {})
|
|
out: Dict[str, str] = {}
|
|
for key, default_name in AUDIO_PROMPT_FILE_DEFAULTS.items():
|
|
clean = str(files_cfg.get(key, default_name) or "").strip().replace("\\", "/").lstrip("/")
|
|
out[key] = clean or default_name
|
|
return out
|
|
|
|
|
|
def read_audio_prompt_filename(key: str) -> str:
|
|
key = str(key or "").strip()
|
|
if key not in AUDIO_PROMPT_FILE_DEFAULTS:
|
|
raise KeyError(f"unknown audio prompt key: {key}")
|
|
return str(read_audio_prompt_files().get(key, AUDIO_PROMPT_FILE_DEFAULTS[key]))
|
|
|
|
|
|
def write_audio_prompt_filename(key: str, filename: Any) -> str:
|
|
key = str(key or "").strip()
|
|
if key not in AUDIO_PROMPT_FILE_DEFAULTS:
|
|
raise KeyError(f"unknown audio prompt key: {key}")
|
|
raw = _read_config_json_raw()
|
|
raw = _ensure_audio_prompts_block(raw)
|
|
prompt_cfg = raw.get("audio_prompts", {})
|
|
files_cfg = prompt_cfg.get("files", {})
|
|
clean = str(filename or "").strip().replace("\\", "/").lstrip("/")
|
|
files_cfg[key] = clean or AUDIO_PROMPT_FILE_DEFAULTS[key]
|
|
prompt_cfg["files"] = files_cfg
|
|
raw["audio_prompts"] = prompt_cfg
|
|
_write_config_json_raw(raw)
|
|
_refresh_cached_cfg(raw)
|
|
return str(files_cfg[key])
|
|
|
|
|
|
# Ensure runtime mode keys exist in config.json.
|
|
try:
|
|
_raw_cfg = _read_config_json_raw()
|
|
_normalized = _ensure_mode_block(_raw_cfg)
|
|
_normalized = _ensure_vision_block(_normalized)
|
|
_normalized = _ensure_watchdog_block(_normalized)
|
|
_normalized = _ensure_audio_prompts_block(_normalized)
|
|
if _normalized != _raw_cfg or not CONFIG_JSON.exists():
|
|
_write_config_json_raw(_normalized)
|
|
_refresh_cached_cfg(_normalized)
|
|
except Exception:
|
|
pass
|
|
|
|
# CAMERA
|
|
# ==================================================
|
|
CAMERA_INDEX = _env("CAMERA_INDEX", int(_c("camera.camera_index", 0)), int)
|
|
FRAME_WIDTH = _env("FRAME_WIDTH", int(_c("camera.frame_width", 640)), int)
|
|
FRAME_HEIGHT = _env("FRAME_HEIGHT", int(_c("camera.frame_height", 480)), int)
|
|
FPS = _env("FPS", int(_c("camera.fps", 30)), int)
|
|
|
|
# ==================================================
|
|
# VISION / DETECTOR
|
|
# ==================================================
|
|
VISION_DETECTION_BACKEND = _coerce_detection_backend(_c("vision.detection_backend", "yolo"))
|
|
VISION_YOLO_RUNTIME = _coerce_yolo_runtime(_c("vision.yolo_runtime", "ultralytics"))
|
|
VISION_YOLO_ULTRALYTICS_DEVICE = str(_c("vision.yolo_ultralytics_device", "cpu") or "").strip() or "cpu"
|
|
VISION_PERSON_YOLO_ONNX = str(_c("vision.person_yolo_onnx", "")).strip()
|
|
VISION_FACE_YOLO_ONNX = str(_c("vision.face_yolo_onnx", "")).strip()
|
|
VISION_INPUT_SIZE = _env("DETECTOR_INPUT_SIZE", int(_c("vision.input_size", 640)), int)
|
|
VISION_PERSON_CLASS_ID = _env("DETECTOR_PERSON_CLASS_ID", int(_c("vision.person_class_id", 0)), int)
|
|
VISION_PERSON_SCORE_THRESH = _env(
|
|
"DETECTOR_PERSON_SCORE_THRESH",
|
|
float(_c("vision.person_score_thresh", 0.35)),
|
|
float,
|
|
)
|
|
VISION_FACE_SCORE_THRESH = _env(
|
|
"DETECTOR_FACE_SCORE_THRESH",
|
|
float(_c("vision.face_score_thresh", 0.35)),
|
|
float,
|
|
)
|
|
VISION_NMS_IOU_THRESH = _env("DETECTOR_NMS_IOU_THRESH", float(_c("vision.nms_iou_thresh", 0.45)), float)
|
|
VISION_GROUP_MIN_PEOPLE = _env("DETECTOR_GROUP_MIN_PEOPLE", int(_c("vision.group_min_people", 3)), int)
|
|
VISION_GROUP_LINK_DISTANCE_PX = _env(
|
|
"DETECTOR_GROUP_LINK_DISTANCE_PX",
|
|
float(_c("vision.group_link_distance_px", 220.0)),
|
|
float,
|
|
)
|
|
VISION_YOLO_STRICT_REQUIRED = _coerce_bool(_c("vision.yolo_strict_required", True), True)
|
|
VISION_GEMINI_CONTEXT_HZ = max(0.5, float(_c("vision.gemini_context_hz", 8.0)))
|
|
VISION_GEMINI_CONTEXT_SILENT = _coerce_bool(_c("vision.gemini_context_silent", True), True)
|
|
VISION_HARD_TARGET_LOCK_ENABLED = _coerce_bool(_c("vision.hard_target_lock_enabled", True), True)
|
|
VISION_RETAKE_PROMPT_ENABLED = _coerce_bool(_c("vision.retake_prompt_enabled", True), True)
|
|
VISION_RETAKE_MAX_PER_SESSION = _coerce_int(_c("vision.retake_max_per_session", 1), 1, min_v=0, max_v=5)
|
|
VISION_FRAMING_HEADROOM_MIN_RATIO = _coerce_float(_c("vision.framing_headroom_min_ratio", 0.06), 0.06, min_v=0.0, max_v=0.8)
|
|
VISION_FRAMING_HEADROOM_MAX_RATIO = _coerce_float(_c("vision.framing_headroom_max_ratio", 0.25), 0.25, min_v=0.0, max_v=0.95)
|
|
VISION_FRAMING_EYE_LINE_MIN_RATIO = _coerce_float(_c("vision.framing_eye_line_min_ratio", 0.28), 0.28, min_v=0.0, max_v=0.9)
|
|
VISION_FRAMING_EYE_LINE_MAX_RATIO = _coerce_float(_c("vision.framing_eye_line_max_ratio", 0.48), 0.48, min_v=0.05, max_v=1.0)
|
|
VISION_FRAMING_RETAKE_SCORE_THRESHOLD = _coerce_float(_c("vision.framing_retake_score_threshold", 0.68), 0.68, min_v=0.0, max_v=1.0)
|
|
|
|
# ==================================================
|
|
# WATCHDOG
|
|
# ==================================================
|
|
WATCHDOG_WS_INITIAL_BACKOFF_SEC = _coerce_float(_c("watchdog.ws_initial_backoff_sec", 1.0), 1.0, min_v=0.1, max_v=60.0)
|
|
WATCHDOG_WS_MAX_BACKOFF_SEC = _coerce_float(_c("watchdog.ws_max_backoff_sec", 20.0), 20.0, min_v=0.5, max_v=300.0)
|
|
WATCHDOG_COMPONENT_RESTART_DELAY_SEC = _coerce_float(
|
|
_c("watchdog.component_restart_delay_sec", 1.0),
|
|
1.0,
|
|
min_v=0.1,
|
|
max_v=20.0,
|
|
)
|
|
WATCHDOG_CAMERA_CAPTURE_RETRY_COUNT = _coerce_int(_c("watchdog.camera_capture_retry_count", 2), 2, min_v=0, max_v=10)
|
|
WATCHDOG_CAMERA_CAPTURE_RETRY_DELAY_SEC = _coerce_float(
|
|
_c("watchdog.camera_capture_retry_delay_sec", 0.8),
|
|
0.8,
|
|
min_v=0.0,
|
|
max_v=30.0,
|
|
)
|