import asyncio import base64 import json import pyaudio import websockets import os import array import time import traceback import inspect import functools # ================================================== # ⚙️ CONFIGURATION # ================================================== # توصية: خله في env بدل ما تحطه بالكود: # export GEMINI_API_KEY="YOUR_KEY" API_KEY = os.environ.get("GEMINI_API_KEY", "AIzaSyB8B1AkhWJSq4sNr-Pk8KsVfkxTbuV7kyo") MODEL = "models/gemini-2.5-flash-native-audio-preview-12-2025" URI = ( "wss://generativelanguage.googleapis.com/ws/" "google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent" f"?key={API_KEY}" ) FORMAT = pyaudio.paInt16 CHANNELS = 1 SEND_SAMPLE_RATE = 16000 RECEIVE_SAMPLE_RATE = 24000 CHUNK_SIZE = 512 VOICE_NAME = "Charon" # ================================================== # ✅ Python 3.8 Compatibility (fix TaskGroup + to_thread) # ================================================== if hasattr(asyncio, "to_thread"): to_thread = asyncio.to_thread else: async def to_thread(func, *args, **kwargs): loop = asyncio.get_running_loop() return await loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) # ================================================== # 🧠 System Prompt Loader # ================================================== def load_system_prompt(): base_dir = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(base_dir, "go2_script.txt") print(f"📂 Looking for script at: {path}") try: with open(path, "r", encoding="utf-8-sig") as f: content = f.read().strip() print("✅ 'Sanad' persona loaded successfully.") return content except FileNotFoundError: print("⚠️ 'go2_script.txt' not found. Using default Emirati persona.") return ( "You are Sanad (Bousandah), a wise and friendly Emirati assistant. " "Speak strictly in the UAE dialect (Khaleeji). " "Be helpful, concise, and use local greetings like 'Marhaba' and 'Ya Khoy'." ) SYSTEM_PROMPT = load_system_prompt() # ================================================== # 🎤 Main Client Class # ================================================== class HamadGeminiVoice: def __init__(self): self.audio_q = None # ✅ create inside run() to bind to the correct loop self.speaking = False self.interrupted = False self.pya = pyaudio.PyAudio() # Smart interruption tuning self.MIN_THRESHOLD = 3000 self.barge_in_threshold = 3000 self.REQUIRED_LOUD_CHUNKS = 5 # --- stability knobs --- self.PREBUFFER_CHUNKS = 4 self.PLAYBACK_TIMEOUT = 0.35 self.BARGE_IN_COOLDOWN = 0.7 self.AI_SPEAK_GRACE = 0.25 self._last_ai_audio_time = 0.0 self._ai_speaking_since = 0.0 self._barge_in_block_until = 0.0 # ✅ Echo-loop protection (reduce self-hearing) self.ECHO_GUARD_SEC = 0.8 self._ignore_input_until = 0.0 self.SEND_SILENCE_WHEN_SPEAKING = True self.SPEAKING_ENERGY_GATE = 0.85 self._silence_pcm = b"\x00" * (CHUNK_SIZE * 2) def audio_energy(self, pcm): try: samples = array.array("h", pcm) if not samples: return 0 return sum(abs(s) for s in samples) // len(samples) except Exception: return 0 def calibrate_mic(self): print("\n🤫 Calibrating Microphone... (Please remain silent)") try: stream = self.pya.open( format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE, input=True, frames_per_buffer=CHUNK_SIZE, ) values = [] for _ in range(40): data = stream.read(CHUNK_SIZE, exception_on_overflow=False) values.append(self.audio_energy(data)) stream.stop_stream() stream.close() avg_noise = sum(values) / len(values) self.barge_in_threshold = max(self.MIN_THRESHOLD, avg_noise * 3.0) print(f"✅ Baseline Noise: {avg_noise:.1f}") print(f"✅ Interruption Threshold: {self.barge_in_threshold:.1f}") except Exception as e: print(f"⚠️ Calibration failed: {e}. Using default threshold.") def _ws_connect_kwargs(self): kwargs = {"max_size": None} try: sig = inspect.signature(websockets.connect) if "extra_headers" in sig.parameters: kwargs["extra_headers"] = {"Content-Type": "application/json"} else: kwargs["additional_headers"] = {"Content-Type": "application/json"} except Exception: kwargs["extra_headers"] = {"Content-Type": "application/json"} return kwargs async def run(self): # ✅ create Queue inside the running loop (Python 3.8 fix) self.audio_q = asyncio.Queue() self.calibrate_mic() print(f"\n🚀 Connecting to Gemini ({MODEL})...") async with websockets.connect(URI, **self._ws_connect_kwargs()) as ws: setup_msg = { "setup": { "model": MODEL, "generationConfig": { "responseModalities": ["AUDIO"], "speechConfig": { "voiceConfig": { "prebuiltVoiceConfig": {"voiceName": VOICE_NAME} } }, }, "inputAudioTranscription": {}, "systemInstruction": {"parts": [{"text": SYSTEM_PROMPT}]}, } } await ws.send(json.dumps(setup_msg)) await ws.recv() print("🎙️ Connected! Sanad is listening. (Press Ctrl+C to stop)") # ✅ Python 3.8 fix: replace TaskGroup with gather() tasks = [ asyncio.create_task(self.capture_mic(ws)), asyncio.create_task(self.receive_audio(ws)), asyncio.create_task(self.play_audio()), ] try: await asyncio.gather(*tasks) finally: for t in tasks: t.cancel() async def capture_mic(self, ws): stream = await to_thread( self.pya.open, format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE, input=True, frames_per_buffer=CHUNK_SIZE, ) loud_chunks = 0 while True: try: data = await to_thread( stream.read, CHUNK_SIZE, exception_on_overflow=False ) energy = self.audio_energy(data) now = time.time() # --- SMART INTERRUPTION LOGIC (barge-in) --- if self.speaking and (now >= self._barge_in_block_until): if (now - self._ai_speaking_since) >= self.AI_SPEAK_GRACE: if energy > self.barge_in_threshold: loud_chunks += 1 else: loud_chunks = 0 if loud_chunks > self.REQUIRED_LOUD_CHUNKS: print(f"🛑 Interruption! (Energy: {energy})") self.interrupted = True self.speaking = False loud_chunks = 0 self._barge_in_block_until = now + self.BARGE_IN_COOLDOWN # Drain playback queue while not self.audio_q.empty(): try: self.audio_q.get_nowait() except asyncio.QueueEmpty: break # Prevent echo loop: during AI speaking send silence unless user is loud data_to_send = data if self.SEND_SILENCE_WHEN_SPEAKING and self.speaking: gate = self.barge_in_threshold * self.SPEAKING_ENERGY_GATE if energy < gate: data_to_send = self._silence_pcm b64_audio = base64.b64encode(data_to_send).decode("utf-8") msg = { "realtime_input": { "media_chunks": [ { "data": b64_audio, "mime_type": f"audio/pcm;rate={SEND_SAMPLE_RATE}", } ] } } await ws.send(json.dumps(msg)) except websockets.exceptions.ConnectionClosed: print("⚠️ WebSocket closed.") break except Exception as e: print(f"❌ Mic Error: {e}") break async def receive_audio(self, ws): async for msg in ws: try: response = json.loads(msg) server_content = response.get("serverContent", {}) if server_content.get("interrupted"): self.interrupted = False # Print USER transcription (optional), but ignore during AI speaking window to reduce echo prints input_tr = ( server_content.get("inputTranscription") or server_content.get("input_transcription") or server_content.get("inputAudioTranscription") or server_content.get("input_audio_transcription") ) if isinstance(input_tr, dict): text = (input_tr.get("text") or "").strip() if text and (time.time() >= self._ignore_input_until) and (not self.speaking): print(f"📝 USER SAID: {text}") if self.interrupted: continue # AUDIO playback from model model_turn = server_content.get("modelTurn") if model_turn: parts = model_turn.get("parts", []) for part in parts: inline_data = part.get("inlineData") if inline_data: audio_b64 = inline_data.get("data") if audio_b64: now = time.time() if not self.speaking: self._ai_speaking_since = now self.speaking = True self._last_ai_audio_time = now # While AI audio is arriving, ignore mic transcription briefly self._ignore_input_until = now + self.ECHO_GUARD_SEC audio_bytes = base64.b64decode(audio_b64) await self.audio_q.put(audio_bytes) except Exception as e: print(f"❌ Parse Error: {e}") async def play_audio(self): stream = await to_thread( self.pya.open, format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE, output=True, frames_per_buffer=CHUNK_SIZE, ) buffered = False while True: try: if self.interrupted: await asyncio.sleep(0.01) continue if self.speaking and not buffered: while ( self.audio_q.qsize() < self.PREBUFFER_CHUNKS and self.speaking and not self.interrupted ): await asyncio.sleep(0.01) buffered = True try: data = await asyncio.wait_for( self.audio_q.get(), timeout=self.PLAYBACK_TIMEOUT ) except asyncio.TimeoutError: if self.audio_q.empty() and (time.time() - self._last_ai_audio_time) > 0.25: self.speaking = False buffered = False continue if data: await to_thread(stream.write, data) if self.audio_q.empty(): if (time.time() - self._last_ai_audio_time) > 0.25: self.speaking = False buffered = False except Exception as e: print(f"❌ Speaker Error: {e}") break if __name__ == "__main__": try: client = HamadGeminiVoice() asyncio.run(client.run()) except KeyboardInterrupt: print("\n👋 Ma'a Salama (Goodbye)!") except Exception as e: print(f"\n❌ Fatal Error: {e}")