306 lines
12 KiB
Python
306 lines
12 KiB
Python
"""LLM layer — Qwen 2.5 Instruct via Ollama (default) or self-managed llama.cpp.
|
|
|
|
Phase 3 of the local pipeline. Two backends, selectable via
|
|
`config/local_config.json > llm.backend`:
|
|
|
|
"ollama" — talk to a running `ollama serve` daemon (default).
|
|
No subprocess management, no CUDA build. Just:
|
|
ollama pull qwen2.5:1.5b
|
|
# daemon usually auto-starts; if not: `ollama serve &`
|
|
|
|
"llama_cpp" — launch our own `llama-server` subprocess. Requires
|
|
a CUDA build of llama.cpp and a GGUF file at
|
|
`model/local/<llm.model_subdir>`.
|
|
|
|
Both backends stream tokens and chunk them on sentence delimiters so
|
|
the TTS can start synthesising before the LLM finishes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
from typing import AsyncIterator, Optional
|
|
|
|
from Project.Sanad.config import MODEL_DIR
|
|
from Project.Sanad.core.config_loader import section as _cfg_section
|
|
from Project.Sanad.core.logger import get_logger
|
|
|
|
log = get_logger("local_llm")
|
|
_CFG = _cfg_section("local", "llm")
|
|
|
|
BACKEND = (_CFG.get("backend") or "ollama").strip().lower()
|
|
|
|
# Ollama
|
|
OLLAMA_HOST = _CFG.get("ollama_host", "127.0.0.1")
|
|
OLLAMA_PORT = int(_CFG.get("ollama_port", 11434))
|
|
OLLAMA_MODEL = _CFG.get("ollama_model", "qwen2.5:1.5b")
|
|
OLLAMA_KEEP_ALIVE = _CFG.get("ollama_keep_alive", "5m")
|
|
|
|
# llama.cpp
|
|
MODEL_SUBDIR = _CFG.get("model_subdir", "qwen2.5-1.5b-instruct-q4_k_m.gguf")
|
|
SERVER_BIN = _CFG.get("server_binary", "llama-server")
|
|
HOST = _CFG.get("host", "127.0.0.1")
|
|
PORT = int(_CFG.get("port", 8080))
|
|
N_GPU_LAYERS = _CFG.get("n_gpu_layers", 99)
|
|
CTX_SIZE = _CFG.get("ctx_size", 2048)
|
|
THREADS = _CFG.get("threads", 4)
|
|
STARTUP_TIMEOUT = _CFG.get("startup_timeout_sec", 30)
|
|
|
|
# Shared generation params
|
|
REQUEST_TIMEOUT = _CFG.get("request_timeout_sec", 30)
|
|
MAX_TOKENS = _CFG.get("max_tokens", 200)
|
|
TEMPERATURE = _CFG.get("temperature", 0.7)
|
|
TOP_P = _CFG.get("top_p", 0.9)
|
|
STOP_SEQS = list(_CFG.get("stop", ["<|im_end|>"]))
|
|
CHUNK_DELIMS = _CFG.get("chunk_delimiters", ".,?!؟،")
|
|
CHUNK_MIN_CHARS = int(_CFG.get("chunk_min_chars", 8))
|
|
|
|
LOCAL_MODEL_PATH = MODEL_DIR / "local" / MODEL_SUBDIR
|
|
|
|
|
|
class LlamaServer:
|
|
"""Thin wrapper — owns subprocess (llama.cpp) or no-op (ollama)."""
|
|
|
|
def __init__(self) -> None:
|
|
self._proc: Optional[subprocess.Popen] = None
|
|
|
|
# ─── lifecycle ────────────────────────────────────────
|
|
|
|
def start(self) -> None:
|
|
if BACKEND == "ollama":
|
|
self._check_ollama()
|
|
log.info("LLM backend=ollama model=%s (@ %s:%d)",
|
|
OLLAMA_MODEL, OLLAMA_HOST, OLLAMA_PORT)
|
|
return
|
|
if BACKEND == "llama_cpp":
|
|
self._start_llama_cpp()
|
|
return
|
|
raise RuntimeError(f"unknown llm.backend: {BACKEND!r}")
|
|
|
|
def stop(self) -> None:
|
|
if self._proc is None:
|
|
return
|
|
try:
|
|
self._proc.terminate()
|
|
self._proc.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
self._proc.kill()
|
|
self._proc.wait(timeout=2)
|
|
except Exception as exc:
|
|
log.warning("llama-server stop error: %s", exc)
|
|
self._proc = None
|
|
|
|
def alive(self) -> bool:
|
|
if BACKEND == "ollama":
|
|
return self._ping_ollama()
|
|
return self._proc is not None and self._proc.poll() is None
|
|
|
|
# ─── Ollama backend ───────────────────────────────────
|
|
|
|
def _check_ollama(self) -> None:
|
|
"""Verify the Ollama daemon is running + the model is pulled."""
|
|
import urllib.request
|
|
tags_url = f"http://{OLLAMA_HOST}:{OLLAMA_PORT}/api/tags"
|
|
try:
|
|
with urllib.request.urlopen(tags_url, timeout=3) as r:
|
|
body = json.loads(r.read().decode("utf-8"))
|
|
except Exception as exc:
|
|
raise RuntimeError(
|
|
f"Ollama daemon not reachable at {tags_url} — is `ollama serve` running? ({exc})"
|
|
)
|
|
models = [m.get("name", "") for m in body.get("models", [])]
|
|
if not any(OLLAMA_MODEL in m for m in models):
|
|
raise RuntimeError(
|
|
f"Ollama model {OLLAMA_MODEL!r} not pulled. "
|
|
f"Run: `ollama pull {OLLAMA_MODEL}`. Available: {models}"
|
|
)
|
|
|
|
def _ping_ollama(self) -> bool:
|
|
import urllib.request
|
|
try:
|
|
with urllib.request.urlopen(
|
|
f"http://{OLLAMA_HOST}:{OLLAMA_PORT}/api/tags", timeout=1,
|
|
) as r:
|
|
return r.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
async def _stream_ollama(self, user_text: str, system_prompt: str,
|
|
cancel: asyncio.Event) -> AsyncIterator[str]:
|
|
import aiohttp
|
|
url = f"http://{OLLAMA_HOST}:{OLLAMA_PORT}/api/generate"
|
|
payload = {
|
|
"model": OLLAMA_MODEL,
|
|
"system": system_prompt,
|
|
"prompt": user_text,
|
|
"stream": True,
|
|
"keep_alive": OLLAMA_KEEP_ALIVE,
|
|
"options": {
|
|
"num_predict": MAX_TOKENS,
|
|
"temperature": TEMPERATURE,
|
|
"top_p": TOP_P,
|
|
"stop": STOP_SEQS,
|
|
},
|
|
}
|
|
buf = ""
|
|
async with aiohttp.ClientSession() as sess:
|
|
try:
|
|
async with sess.post(
|
|
url, json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) as resp:
|
|
async for raw in resp.content:
|
|
if cancel.is_set():
|
|
log.info("LLM stream cancelled (barge-in)")
|
|
return
|
|
line = raw.decode("utf-8", errors="ignore").strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
token = obj.get("response", "")
|
|
if token:
|
|
buf += token
|
|
if len(buf) >= CHUNK_MIN_CHARS and buf[-1] in CHUNK_DELIMS:
|
|
yield buf.strip()
|
|
buf = ""
|
|
if obj.get("done"):
|
|
break
|
|
except asyncio.CancelledError:
|
|
return
|
|
except Exception as exc:
|
|
log.warning("Ollama stream error: %s", exc)
|
|
return
|
|
if buf.strip():
|
|
yield buf.strip()
|
|
|
|
# ─── llama.cpp backend ────────────────────────────────
|
|
|
|
def _start_llama_cpp(self) -> None:
|
|
if self._proc is not None and self._proc.poll() is None:
|
|
return
|
|
if not LOCAL_MODEL_PATH.exists():
|
|
raise RuntimeError(f"LLM model not found at {LOCAL_MODEL_PATH}")
|
|
bin_path = shutil.which(SERVER_BIN) or SERVER_BIN
|
|
cmd = [
|
|
bin_path,
|
|
"-m", str(LOCAL_MODEL_PATH),
|
|
"--host", HOST,
|
|
"--port", str(PORT),
|
|
"--n-gpu-layers", str(N_GPU_LAYERS),
|
|
"--ctx-size", str(CTX_SIZE),
|
|
"--threads", str(THREADS),
|
|
"--log-disable",
|
|
]
|
|
log.info("launching llama-server: %s", " ".join(cmd))
|
|
self._proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
self._wait_llama_cpp_ready()
|
|
log.info("llama-server ready (pid=%d)", self._proc.pid)
|
|
|
|
def _wait_llama_cpp_ready(self) -> None:
|
|
import urllib.request
|
|
deadline = time.time() + STARTUP_TIMEOUT
|
|
url = f"http://{HOST}:{PORT}/health"
|
|
while time.time() < deadline:
|
|
if self._proc and self._proc.poll() is not None:
|
|
stderr = self._proc.stderr.read() if self._proc.stderr else ""
|
|
raise RuntimeError(
|
|
f"llama-server exited early (code={self._proc.returncode}): {stderr[:500]}"
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=1) as r:
|
|
if r.status == 200:
|
|
return
|
|
except Exception:
|
|
time.sleep(0.3)
|
|
raise RuntimeError(f"llama-server did not come up within {STARTUP_TIMEOUT}s")
|
|
|
|
async def _stream_llama_cpp(self, user_text: str, system_prompt: str,
|
|
cancel: asyncio.Event) -> AsyncIterator[str]:
|
|
import aiohttp
|
|
prompt = self._format_chatml_prompt(user_text, system_prompt)
|
|
payload = {
|
|
"prompt": prompt,
|
|
"stream": True,
|
|
"n_predict": MAX_TOKENS,
|
|
"temperature": TEMPERATURE,
|
|
"top_p": TOP_P,
|
|
"stop": STOP_SEQS,
|
|
"cache_prompt": True,
|
|
}
|
|
url = f"http://{HOST}:{PORT}/completion"
|
|
buf = ""
|
|
async with aiohttp.ClientSession() as sess:
|
|
try:
|
|
async with sess.post(
|
|
url, json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) as resp:
|
|
async for raw in resp.content:
|
|
if cancel.is_set():
|
|
log.info("LLM stream cancelled (barge-in)")
|
|
return
|
|
line = raw.decode("utf-8", errors="ignore").strip()
|
|
if not line.startswith("data:"):
|
|
continue
|
|
line = line[len("data:"):].strip()
|
|
if not line or line == "[DONE]":
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
token = obj.get("content", "")
|
|
if not token:
|
|
if obj.get("stop"):
|
|
break
|
|
continue
|
|
buf += token
|
|
if len(buf) >= CHUNK_MIN_CHARS and buf[-1] in CHUNK_DELIMS:
|
|
yield buf.strip()
|
|
buf = ""
|
|
except asyncio.CancelledError:
|
|
return
|
|
except Exception as exc:
|
|
log.warning("llama-server stream error: %s", exc)
|
|
return
|
|
if buf.strip():
|
|
yield buf.strip()
|
|
|
|
@staticmethod
|
|
def _format_chatml_prompt(user_text: str, system_prompt: str) -> str:
|
|
return (
|
|
f"<|im_start|>system\n{system_prompt}<|im_end|>\n"
|
|
f"<|im_start|>user\n{user_text}<|im_end|>\n"
|
|
f"<|im_start|>assistant\n"
|
|
)
|
|
|
|
# ─── public streaming entry point ─────────────────────
|
|
|
|
async def stream(self, user_text: str, system_prompt: str,
|
|
cancel: asyncio.Event) -> AsyncIterator[str]:
|
|
"""Yield sentence-sized text chunks as the LLM generates.
|
|
|
|
Chunk boundaries: any char in `CHUNK_DELIMS` AND buffer length
|
|
≥ `CHUNK_MIN_CHARS`. The final buffer is flushed on completion
|
|
even without a delimiter. If `cancel` is set, the request is
|
|
aborted and the generator returns.
|
|
"""
|
|
if BACKEND == "ollama":
|
|
async for chunk in self._stream_ollama(user_text, system_prompt, cancel):
|
|
yield chunk
|
|
elif BACKEND == "llama_cpp":
|
|
async for chunk in self._stream_llama_cpp(user_text, system_prompt, cancel):
|
|
yield chunk
|
|
else:
|
|
raise RuntimeError(f"unknown llm.backend: {BACKEND!r}")
|