AI_Photographer/Core/audio_prompts.py
2026-04-12 18:52:37 +04:00

371 lines
13 KiB
Python

from __future__ import annotations
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
from Core import settings as config
DEFAULT_PROMPT_TEXTS: Dict[str, str] = {
"welcome_single": "Hello, welcome. We will take a photo together. Would you like a photo?",
"welcome_group": "Hello everyone, welcome. We will take a photo together. Would your group like a photo?",
"welcome_returning": "Welcome back. Would you like another photo?",
"frame_single": "Great. Please stand with me in front of the camera, stay in the center, and look at the camera.",
"frame_group": "Great. Please stand with me in front of the camera, stay together in the center, and look at the camera.",
"confirm_reminder": "Please say yes photo to continue, or no photo to cancel.",
"visitor_left": "No worries. I will wait here for the next visitor.",
"declined": "No problem. We can do it anytime.",
"confirm_timeout": "No problem. I will wait here. Come back anytime for a photo.",
"session_cancelled": "Okay. Session cancelled.",
"framing_timeout": "I still need a better frame. Please step in front of me and say yes photo when ready.",
"countdown_intro": "Look at the camera, stay ready, hold your pose with me, keep still, keep your smile soft, and in a moment I will count down for the photo.",
"count_3": "Three.",
"count_2": "Two.",
"count_1": "One.",
"smile": "Smile.",
"countdown_cancelled": "Countdown cancelled.",
"lost_from_frame": "I lost you from the frame. Let us try again.",
"retake_recommended": "Photo captured. I recommend a retake. Say yes photo to retake, or no photo to keep this one.",
"retake_yes": "Great. Let us retake. Hold your pose.",
"retake_limit": "Retake limit reached. Keeping the current photo.",
"photo_saved_thanks": "Thank you. Photo saved. Do not forget to check your photos.",
}
PROMPT_KEYS = tuple(DEFAULT_PROMPT_TEXTS.keys())
RECORD_INDEX_PATH = config.AUDIO_PROMPT_RECORDS_FILE.resolve()
LEGACY_RECORD_INDEX_PATH = (config.AUDIO_PROMPTS_DIR / "records.json").resolve()
def _clean_key(key: str) -> str:
clean = str(key or "").strip()
if clean not in DEFAULT_PROMPT_TEXTS:
raise KeyError(f"unknown audio prompt key: {clean}")
return clean
def _safe_filename(name: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", str(name or "").strip()).strip("._")
if not cleaned:
raise ValueError("invalid prompt filename")
if not cleaned.lower().endswith(".wav"):
raise ValueError("audio prompt files must be .wav")
return cleaned
def _record_index_template() -> Dict[str, Any]:
return {
"created_by": "AI_Photographer.audio_prompts",
"last_updated": "",
"total_records": 0,
"records": [],
}
def _format_timestamp(dt: datetime) -> str:
return dt.isoformat(timespec="seconds")
def _audio_duration_seconds(audio_bytes: bytes, sample_rate: int, channels: int, sample_width: int) -> float:
frame_size = max(1, int(sample_width) * max(1, int(channels)))
rate = max(1, int(sample_rate))
return len(audio_bytes) / float(rate * frame_size)
def _build_file_info(path: Path, audio_bytes: bytes, sample_rate: int, channels: int, sample_width: int) -> Dict[str, Any]:
size_bytes = int(path.stat().st_size) if path.exists() else int(len(audio_bytes))
return {
"path": _stored_project_path(path),
"name": path.name,
"size_bytes": size_bytes,
"size_mb": round(size_bytes / (1024 * 1024), 3),
"duration_seconds": round(_audio_duration_seconds(audio_bytes, sample_rate, channels, sample_width), 3),
"sample_rate": int(sample_rate),
"channels": int(channels),
"sample_width_bytes": int(sample_width),
}
def _stored_project_path(path: Path) -> str:
resolved = Path(path).resolve()
try:
rel = resolved.relative_to(config.PROJECT_ROOT)
return f"{config.PROJECT_ROOT.name}/{rel.as_posix()}"
except Exception:
return str(resolved)
def prompt_text(key: str) -> str:
return DEFAULT_PROMPT_TEXTS[_clean_key(key)]
def prompt_filename(key: str) -> str:
return config.read_audio_prompt_filename(_clean_key(key))
def prompt_path(key: str) -> Path:
return (config.AUDIO_PROMPTS_DIR / prompt_filename(key)).resolve()
def raw_prompt_filename(key: str) -> str:
clean_key = _clean_key(key)
speaker = Path(prompt_filename(clean_key))
return f"{speaker.stem}_raw.wav"
def raw_prompt_path(key: str) -> Path:
return (config.AUDIO_PROMPTS_DIR / raw_prompt_filename(key)).resolve()
def prompt_exists(key: str) -> bool:
return prompt_path(key).exists()
def load_record_index() -> Dict[str, Any]:
for candidate in (RECORD_INDEX_PATH, LEGACY_RECORD_INDEX_PATH):
if not candidate.exists():
continue
try:
payload = json.loads(candidate.read_text(encoding="utf-8"))
if not isinstance(payload, dict) or not isinstance(payload.get("records"), list):
raise ValueError("invalid record index structure")
payload.setdefault("created_by", "AI_Photographer.audio_prompts")
payload.setdefault("last_updated", "")
payload.setdefault("total_records", len(payload.get("records", [])))
return reconcile_record_index(payload)
except Exception:
continue
return _record_index_template()
def reconcile_record_index(payload: Dict[str, Any]) -> Dict[str, Any]:
records = []
for entry in payload.get("records", []):
key = str(entry.get("record_name", "") or "").strip()
if key not in DEFAULT_PROMPT_TEXTS:
continue
speaker_path = prompt_path(key)
raw_path = raw_prompt_path(key)
if not speaker_path.exists():
continue
files = entry.setdefault("files", {})
speaker_info = files.get("speaker_recording")
if isinstance(speaker_info, dict):
speaker_info["path"] = _stored_project_path(speaker_path)
speaker_info["name"] = speaker_path.name
raw_info = files.get("gemini_raw_output")
if isinstance(raw_info, dict):
if raw_path.exists():
raw_info["path"] = _stored_project_path(raw_path)
raw_info["name"] = raw_path.name
else:
files.pop("gemini_raw_output", None)
records.append(entry)
payload["records"] = records
payload["total_records"] = len(records)
payload["last_updated"] = records[-1].get("timeline", {}).get("saved_at", "") if records else ""
return payload
def save_record_index(payload: Dict[str, Any]) -> None:
config.AUDIO_PROMPTS_DIR.mkdir(parents=True, exist_ok=True)
normalized = reconcile_record_index(dict(payload))
RECORD_INDEX_PATH.write_text(json.dumps(normalized, ensure_ascii=False, indent=2), encoding="utf-8")
def _record_entry_map() -> Dict[str, Dict[str, Any]]:
payload = load_record_index()
mapping: Dict[str, Dict[str, Any]] = {}
for entry in payload.get("records", []):
key = str(entry.get("record_name", "") or "").strip()
if key:
mapping[key] = entry
return mapping
def upsert_record_entry(entry: Dict[str, Any]) -> None:
key = _clean_key(str(entry.get("record_name", "") or ""))
payload = load_record_index()
records = [item for item in payload.get("records", []) if str(item.get("record_name", "") or "").strip() != key]
records.append(entry)
payload["records"] = records
payload["total_records"] = len(records)
payload["last_updated"] = entry.get("timeline", {}).get("saved_at", "")
save_record_index(payload)
def delete_record_entry(key: str) -> None:
clean_key = _clean_key(key)
payload = load_record_index()
payload["records"] = [item for item in payload.get("records", []) if str(item.get("record_name", "") or "").strip() != clean_key]
payload["total_records"] = len(payload["records"])
payload["last_updated"] = payload["records"][-1].get("timeline", {}).get("saved_at", "") if payload["records"] else ""
save_record_index(payload)
def list_audio_prompts() -> List[dict]:
fallback_to_gemini = bool(config.read_audio_prompts_fallback_to_gemini())
record_map = _record_entry_map()
items: List[dict] = []
for key in PROMPT_KEYS:
path = prompt_path(key)
raw_path = raw_prompt_path(key)
exists = path.exists()
raw_exists = raw_path.exists()
st = path.stat() if exists else None
entry = record_map.get(key, {})
speaker_info = entry.get("files", {}).get("speaker_recording", {}) if isinstance(entry, dict) else {}
raw_info = entry.get("files", {}).get("gemini_raw_output", {}) if isinstance(entry, dict) else {}
timeline = entry.get("timeline", {}) if isinstance(entry, dict) else {}
items.append(
{
"key": key,
"text": prompt_text(key),
"filename": prompt_filename(key),
"exists": bool(exists),
"size": int(st.st_size) if st else 0,
"mtime": float(st.st_mtime) if st else 0.0,
"raw_filename": raw_path.name,
"raw_exists": bool(raw_exists),
"raw_size": int(raw_info.get("size_bytes", 0) or 0),
"replay_count": int(entry.get("replay_count", 0) or 0),
"saved_at": str(timeline.get("saved_at", "") or ""),
"speaker_duration_seconds": float(speaker_info.get("duration_seconds", 0.0) or 0.0),
"raw_duration_seconds": float(raw_info.get("duration_seconds", 0.0) or 0.0),
"record": entry,
"fallback_to_gemini": fallback_to_gemini,
}
)
return items
def save_audio_prompt_bundle(
key: str,
speaker_data: bytes,
filename: str = "",
*,
raw_data: bytes | None = None,
text: str = "",
model: str = "",
voice_name: str = "",
replay_count: int = 1,
speaker_rate: int = 24000,
speaker_channels: int = 1,
raw_rate: int = 24000,
raw_channels: int = 1,
sample_width: int = 2,
capture_device: str = "",
sink: str = "",
source: str = "",
monitor_source: str = "",
) -> dict:
clean_key = _clean_key(key)
safe_filename = _safe_filename(filename or prompt_filename(clean_key))
old_target = prompt_path(clean_key)
old_raw = raw_prompt_path(clean_key)
target = (config.AUDIO_PROMPTS_DIR / safe_filename).resolve()
target.parent.mkdir(parents=True, exist_ok=True)
if old_target.exists() and old_target != target:
try:
old_target.unlink()
except Exception:
pass
target.write_bytes(speaker_data)
config.write_audio_prompt_filename(clean_key, safe_filename)
raw_target = raw_prompt_path(clean_key)
if old_raw.exists() and old_raw != raw_target:
try:
old_raw.unlink()
except Exception:
pass
if raw_data:
raw_target.write_bytes(raw_data)
elif raw_target.exists():
try:
raw_target.unlink()
except Exception:
pass
now = datetime.now()
entry = {
"record_name": clean_key,
"text": str(text or prompt_text(clean_key)).strip(),
"model": str(model or config.GEMINI_MODEL),
"voice_name": str(voice_name or config.VOICE_NAME),
"replay_count": int(replay_count),
"audio_capture": {
"sink": str(sink or ""),
"monitor_source": str(monitor_source or ""),
"restored_microphone_source": str(source or ""),
"capture_device": str(capture_device or ""),
},
"timeline": {
"saved_at": _format_timestamp(now),
},
"files": {
"speaker_recording": _build_file_info(
target,
speaker_data,
speaker_rate,
speaker_channels,
sample_width,
),
},
}
if raw_data:
entry["files"]["gemini_raw_output"] = _build_file_info(
raw_target,
raw_data,
raw_rate,
raw_channels,
sample_width,
)
upsert_record_entry(entry)
return {
"ok": True,
"key": clean_key,
"filename": target.name,
"raw_filename": raw_target.name if raw_data else "",
"path": str(target),
"record": entry,
}
def save_audio_prompt(key: str, data: bytes, filename: str) -> dict:
return save_audio_prompt_bundle(key, data, filename=filename, text=prompt_text(key), replay_count=0)
def delete_audio_prompt(key: str) -> dict:
clean_key = _clean_key(key)
target = prompt_path(clean_key)
raw_target = raw_prompt_path(clean_key)
if target.exists():
target.unlink()
if raw_target.exists():
raw_target.unlink()
delete_record_entry(clean_key)
return {
"ok": True,
"key": clean_key,
"filename": prompt_filename(clean_key),
"deleted": str(target),
"deleted_raw": str(raw_target),
}
def read_audio_prompt_bytes(key: str) -> tuple[Path, bytes]:
clean_key = _clean_key(key)
target = prompt_path(clean_key)
if not target.exists():
raise FileNotFoundError(f"audio prompt not found for key: {clean_key}")
return target, target.read_bytes()