169 lines
5.8 KiB
Python

"""Script/prompt file management — CRUD for sanad_script.txt, sanad_rule.txt, etc."""
from __future__ import annotations
import asyncio
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from Project.Sanad.config import SCRIPTS_DIR
from Project.Sanad.core import persona as _persona
from Project.Sanad.dashboard.routes._safe_io import (
atomic_write_text, MAX_UPLOAD_BYTES,
)
router = APIRouter()
MAX_SCRIPT_BYTES = MAX_UPLOAD_BYTES
def _safe_path(name: str) -> Path:
cleaned = name.strip()
if not cleaned or "/" in cleaned or "\\" in cleaned or cleaned in {".", ".."}:
raise HTTPException(400, "Invalid script name.")
path = (SCRIPTS_DIR / cleaned).resolve()
if not str(path).startswith(str(SCRIPTS_DIR.resolve())):
raise HTTPException(400, "Path traversal denied.")
return path
@router.get("/")
async def list_scripts():
SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
active = _persona.active_persona_name()
default = _persona.default_persona_name()
items = []
for p in sorted(SCRIPTS_DIR.iterdir(), key=lambda x: x.name.lower()):
if not p.is_file():
continue
st = p.stat()
items.append({
"name": p.name,
"size_bytes": st.st_size,
"modified_at": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
"active": p.name == active, # the persona Gemini loads now
"is_default": p.name == default, # the fallback (sanad_script.txt)
})
return {"path": str(SCRIPTS_DIR), "files": items,
"active": active, "default": default}
class ScriptActive(BaseModel):
name: str | None = None # None / "" / the default name → revert to default
restart: bool = False # also restart the live voice so it takes effect now
@router.get("/active")
async def get_active():
"""Which persona Gemini will load, and the default it falls back to."""
return {"active": _persona.active_persona_name(),
"default": _persona.default_persona_name()}
@router.post("/active")
async def set_active(payload: ScriptActive):
"""Select the persona script Gemini uses. With restart=true, the live voice
session is bounced so the new persona takes effect immediately; otherwise it
applies on the next voice (re)connect."""
try:
active = _persona.set_active_persona(payload.name)
except FileNotFoundError:
raise HTTPException(404, f"Script not found: {payload.name}")
restarted = False
if payload.restart:
try:
from Project.Sanad.main import live_sub
if live_sub is not None and hasattr(live_sub, "start"):
if hasattr(live_sub, "is_running") and live_sub.is_running():
await asyncio.to_thread(live_sub.stop)
await asyncio.sleep(1.5)
await asyncio.to_thread(live_sub.start)
restarted = True
except Exception:
pass # selection is saved regardless of restart success
return {"ok": True, "active": active,
"default": _persona.default_persona_name(), "restarted": restarted}
class ScriptLoad(BaseModel):
name: str
@router.post("/load")
async def load_script(payload: ScriptLoad):
path = _safe_path(payload.name)
if not path.exists():
raise HTTPException(404, f"Script not found: {payload.name}")
content = path.read_text(encoding="utf-8-sig")
st = path.stat()
return {
"name": path.name,
"content": content,
"size_bytes": st.st_size,
"modified_at": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
}
class ScriptSave(BaseModel):
name: str
content: str
@router.post("/save")
async def save_script(payload: ScriptSave):
if len(payload.content.encode("utf-8")) > MAX_SCRIPT_BYTES:
raise HTTPException(413, f"Content too large (max {MAX_SCRIPT_BYTES} bytes).")
path = _safe_path(payload.name)
SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
atomic_write_text(path, payload.content)
return {"ok": True, "name": path.name, "size_bytes": path.stat().st_size}
class ScriptCreate(BaseModel):
name: str
content: str = ""
@router.post("/create")
async def create_script(payload: ScriptCreate):
if len(payload.content.encode("utf-8")) > MAX_SCRIPT_BYTES:
raise HTTPException(413, f"Content too large (max {MAX_SCRIPT_BYTES} bytes).")
path = _safe_path(payload.name)
if path.exists():
raise HTTPException(409, f"File already exists: {payload.name}")
SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
atomic_write_text(path, payload.content)
return {"ok": True, "name": path.name}
class ScriptRename(BaseModel):
old_name: str
new_name: str
@router.post("/rename")
async def rename_script(payload: ScriptRename):
old = _safe_path(payload.old_name)
new = _safe_path(payload.new_name)
if not old.exists():
raise HTTPException(404, f"Not found: {payload.old_name}")
if new.exists():
raise HTTPException(409, f"Already exists: {payload.new_name}")
old.rename(new)
return {"ok": True, "old_name": payload.old_name, "new_name": new.name}
class ScriptDelete(BaseModel):
name: str
@router.post("/delete")
async def delete_script(payload: ScriptDelete):
path = _safe_path(payload.name)
if not path.exists():
raise HTTPException(404, f"Not found: {payload.name}")
if path.name == _persona.default_persona_name():
raise HTTPException(409, f"Cannot delete the default persona ({path.name}).")
path.unlink()
# If the active selection was the deleted file, resolution auto-falls-back
# to the default — no extra cleanup needed.
return {"ok": True, "deleted": payload.name}