diff --git a/Config/config_Voice.json b/Config/config_Voice.json index beed019..d3fa55e 100644 --- a/Config/config_Voice.json +++ b/Config/config_Voice.json @@ -5,21 +5,14 @@ "target_sample_rate": 16000 }, "stt": { - "backend": "vosk", - "vosk_model_path": "Models/vosk-model-small-en-us-0.15", - "wake_words_en": [ - "sanad", "sannad", "sanat", "sunnat", - "senad", "sennad", "sanid", "sanud", - "samad", "sandy", "sanday", "sunday", "synod", "signed", - "sand", "send", "sent", "set", "seen", "seed", - "then", "than", "that", "step", "stuck", - "said", "sad", "saw", "so", "sir", "sun" - ], - "language": "en", - "command_timeout_sec": 10, - "silence_threshold": 150, - "silence_duration_sec": 2.0, - "max_record_sec": 15 + "backend": "custom_acoustic", + "_comment": "Pure-DSP wake detector in Voice/wake_detector.py. No ML.", + "speech_threshold": 150.0, + "min_word_duration": 0.20, + "max_word_duration": 1.50, + "post_silence": 0.30, + "wake_cooldown": 1.50, + "wake_chunk_ms": 50 }, "mic": { "backend": "builtin_udp", diff --git a/Voice/marcus_voice.py b/Voice/marcus_voice.py index afa58e3..ea8bd13 100644 --- a/Voice/marcus_voice.py +++ b/Voice/marcus_voice.py @@ -1,38 +1,41 @@ #!/usr/bin/env python3 """ -Voice/marcus_voice.py — Marcus Always-Listening Voice Module (English) -======================================================================= -State machine: - IDLE → (wake word detected) → WAKE_HEARD - WAKE_HEARD → (record command) → PROCESSING - PROCESSING → (Whisper transcribe) → send to brain → SPEAKING - SPEAKING → (TTS done) → IDLE +Voice/marcus_voice.py — Marcus Wake-Signal Module (no ML, no STT). -Wake word: "Sanad" (detected by Whisper tiny; mistranscription variants in - config_Voice.json::stt.wake_words_en) -Commands: Transcribed by Whisper tiny (small if quality suffers) -Mic: G1 built-in array mic via UDP multicast (Voice/builtin_mic.py) -TTS: English only, Unitree built-in TtsMaker (API/audio_api.py) +This is a deliberately-minimal voice subsystem: -Usage: - from Voice.marcus_voice import VoiceModule - voice = VoiceModule(audio_api, on_command=brain.handle_voice_command) - voice.start() # background thread - voice.stop() + - A custom energy-based wake detector (Voice/wake_detector.py) listens + to the G1's on-board mic continuously. + - When the user says any short word (~0.2-1.5 s of speech followed by + silence), wake fires. + - The robot acknowledges via TTS ("Yes" — configurable). + - The user then types their command at the Marcus terminal prompt. + +No Vosk, no Whisper, no torch, no network. Pure numpy DSP. + +Why not STT here: + Both Vosk's small English model ("sanad" absent from lexicon) and + openai-whisper ("!!!!!" numerical garbage on this Jetson's torch-aarch64) + proved unreliable for this hardware. Rather than fight either, the + wake path becomes a simple "did the user say something?" signal. + +Interface with Marcus brain: + VoiceModule(audio_api, on_wake=callback) + on_wake() is called when wake fires. Brain can display a prompt + or do anything else. """ +from __future__ import annotations + import logging import os import sys import threading import time from logging.handlers import RotatingFileHandler -from typing import Optional - -import numpy as np +from typing import Callable, Optional # ─── PATH + CONFIG ─────────────────────────────────────── -# Single source of truth lives in Core/; everyone else imports from there. _PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _PROJECT_DIR not in sys.path: sys.path.insert(0, _PROJECT_DIR) @@ -42,11 +45,6 @@ from Core.config_loader import load_config LOG_DIR = os.path.join(PROJECT_ROOT, "logs") os.makedirs(LOG_DIR, exist_ok=True) -# Voice runs as a background subsystem — its INFO/DEBUG logs go ONLY to -# logs/voice.log so they don't drown out the interactive `Command:` prompt. -# Anything the user needs to see (wake-word fired, command heard) is -# print()-ed explicitly from the callbacks below. -# basicConfig is idempotent; audio_api may have already called it. logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", @@ -60,352 +58,121 @@ logging.basicConfig( log = logging.getLogger("marcus_voice") -# ─── STATE ENUM ────────────────────────────────────────── - -class State: - IDLE = "IDLE" - WAKE_HEARD = "WAKE_HEARD" - PROCESSING = "PROCESSING" - SPEAKING = "SPEAKING" - - -# ─── VOICE MODULE ──────────────────────────────────────── - class VoiceModule: - """Always-listening voice interface for Marcus.""" + """Wake-only voice subsystem — fires a callback when speech is detected.""" - def __init__(self, audio_api, on_command=None): + def __init__(self, audio_api, on_command: Optional[Callable] = None, + on_wake: Optional[Callable] = None): """ Args: - audio_api: AudioAPI instance (from API/audio_api.py) - on_command: callback(text: str, lang: str) — "lang" is always "en" - now; kept in the signature for interface stability. + audio_api: AudioAPI instance (for TTS ack). + on_command: kept for API compatibility; always called with + text="" because there's no STT. Brain should + prompt the user to type. + on_wake: alternative callback fired when wake detected. + Exactly one of on_command / on_wake is used. """ self._audio = audio_api self._on_command = on_command + self._on_wake = on_wake self._config = load_config("Voice") - self._stt = self._config["stt"] - self._mic = self._config["mic"] + self._stt = self._config.get("stt", {}) + self._messages = self._config.get("messages", {}) - # STT (Vosk) — lazy loaded on first _voice_loop() iteration. - # One Model instance, recognizers are created fresh per-utterance. - self._vosk_model = None - self._KaldiRecognizer = None - - # Wake words (English only — built-in TTS doesn't do Arabic) - self._wake_en = [w.lower() for w in self._stt.get("wake_words_en", - ["marcus", "marcos"])] + # Wake-detector parameters (tweakable via config_Voice.json::stt). + from Voice.wake_detector import WakeDetector, WakeConfig + wcfg = WakeConfig( + sample_rate = 16_000, + speech_threshold = float(self._stt.get("speech_threshold", 150.0)), + min_word_duration_s= float(self._stt.get("min_word_duration", 0.20)), + max_word_duration_s= float(self._stt.get("max_word_duration", 1.50)), + post_silence_s = float(self._stt.get("post_silence", 0.30)), + cooldown_s = float(self._stt.get("wake_cooldown", 1.50)), + chunk_ms = int( self._stt.get("wake_chunk_ms", 50)), + ) + self._detector = WakeDetector(wcfg) # G1 built-in mic (UDP multicast). from Voice.builtin_mic import BuiltinMic _mcfg = self._config.get("mic_udp", {}) self._mic_capture = BuiltinMic( - group=_mcfg.get("group", "239.168.123.161"), - port=_mcfg.get("port", 5555), - buf_max=_mcfg.get("buffer_max_bytes", 64000), + group = _mcfg.get("group", "239.168.123.161"), + port = _mcfg.get("port", 5555), + buf_max = _mcfg.get("buffer_max_bytes", 64000), ) - self._sample_rate = self._mic_capture.sample_rate # 16000 - # State - self._state = State.IDLE self._running = False self._thread = None - self._lock = threading.Lock() - log.info("VoiceModule initialized (mic: G1 built-in UDP)") + log.info( + "VoiceModule initialized (custom wake detector, " + "speech_threshold=%s, min/max_word=%s/%s s)", + wcfg.speech_threshold, wcfg.min_word_duration_s, wcfg.max_word_duration_s, + ) - # ─── MODEL LOADING ──────────────────────────────────── - - def _load_stt(self): - """ - Load Vosk ASR model. Replaces openai-whisper which produced garbage - (!!!!!!!) on this Jetson's torch-aarch64 install regardless of - audio quality. Vosk uses Kaldi's own CPU kernels — no torch, no - numerical instability, ~10× faster than Whisper base on CPU. - - Model path is configured via stt.vosk_model_path (relative to - PROJECT_ROOT, or absolute). Default: the small English model, - which is ~40 MB and plenty for short voice commands. - """ - from vosk import Model, KaldiRecognizer, SetLogLevel - SetLogLevel(-1) # silence Vosk's stderr spam - - if self._vosk_model is None: - rel = self._stt.get("vosk_model_path", "Models/vosk-model-small-en-us-0.15") - model_path = rel if os.path.isabs(rel) else os.path.join(PROJECT_ROOT, rel) - if not os.path.isdir(model_path): - raise RuntimeError( - "[Voice] Vosk model not found at " + model_path + "\n" - " Download it on the Jetson:\n" - " cd ~/Marcus/Models\n" - " wget https://alphacephei.com/vosk/models/vosk-model-small-en-us-0.15.zip\n" - " unzip vosk-model-small-en-us-0.15.zip" - ) - log.info("Loading Vosk model: %s", model_path) - self._vosk_model = Model(model_path) - self._KaldiRecognizer = KaldiRecognizer - log.info("Vosk model ready") - - # NO restricted grammar. Vosk's small English model's lexicon - # doesn't contain "sanad" (it's not an English word), so passing - # it in a restricted grammar makes Vosk drop the word with: - # WARNING (VoskAPI:UpdateGrammarFst) Ignoring word missing in - # vocabulary: 'sanad' - # and the decoder then only has "[unk]" → never matches - # anything → Transcribed always empty. - # - # Instead: open vocabulary transcription, fuzzy-match against - # the stt.wake_words_en list which contains the English words - # Vosk ACTUALLY hears when you say "sanad" (then, send, sand, - # step, signed, etc.). - self._wake_grammar = None - - # Back-compat alias for any caller that still references the old name - _load_whisper = _load_stt - - # ─── MIC RECORDING (G1 built-in UDP) ────────────────── - - def _record_chunk(self, seconds: float) -> np.ndarray: - """Capture a fixed-duration chunk from the G1 built-in mic.""" - num_bytes = int(seconds * self._sample_rate * 2) # int16 mono - raw = bytearray() - bite = 1024 - while len(raw) < num_bytes: - raw.extend(self._mic_capture.read_chunk(min(bite, num_bytes - len(raw)))) - return np.frombuffer(bytes(raw), dtype=np.int16) - - def _record_until_silence(self) -> np.ndarray: - """Capture until RMS drops below threshold for `silence_duration_sec`.""" - threshold = self._stt.get("silence_threshold", 500) - silence_dur = self._stt.get("silence_duration_sec", 1.5) - max_dur = self._stt.get("max_record_sec", 15) - - chunk_sec = 0.5 - chunk_bytes = int(self._sample_rate * chunk_sec) * 2 - silence_chunks_need = int(silence_dur / chunk_sec) - max_chunks = int(max_dur / chunk_sec) - - all_audio = [] - silence_count = 0 - chunk_count = 0 - - while chunk_count < max_chunks: - raw = self._mic_capture.read_chunk(chunk_bytes) - if not raw: - break - chunk = np.frombuffer(raw, dtype=np.int16) - all_audio.append(chunk) - chunk_count += 1 - - rms = np.sqrt(np.mean(chunk.astype(np.float64) ** 2)) - if rms < threshold: - silence_count += 1 - else: - silence_count = 0 - - if silence_count >= silence_chunks_need and chunk_count > 2: - log.info("Silence detected after %.1fs", chunk_count * chunk_sec) - break - - if all_audio: - return np.concatenate(all_audio) - return np.array([], dtype=np.int16) - - # ─── TRANSCRIPTION ──────────────────────────────────── - - def _transcribe(self, audio: np.ndarray, grammar: Optional[str] = None) -> str: - """ - Transcribe audio using Vosk. - - When `grammar` is a JSON list string (e.g. `'["sanad","[unk]"]'`), - Vosk is constrained to that vocabulary only — perfect for wake-word - detection where we KNOW the exact word we want to hear. Pass - grammar=None for open-vocabulary transcription (used for commands). - """ - import json as _json - - # Audio stats — still useful for "mic is silent" diagnostics. - peak_i16 = int(np.abs(audio).max()) if audio.size else 0 - rms_i16 = float(np.sqrt(np.mean(audio.astype(np.float64) ** 2))) if audio.size else 0.0 - log.info("audio stats: samples=%d peak=%d rms=%.1f", audio.size, peak_i16, rms_i16) - - if audio.size == 0: - return "" - - # Fresh recognizer per utterance. Pass grammar if provided. - if grammar: - rec = self._KaldiRecognizer(self._vosk_model, self._sample_rate, grammar) - else: - rec = self._KaldiRecognizer(self._vosk_model, self._sample_rate) - rec.SetWords(False) - - # Single-shot: feed the whole utterance in one AcceptWaveform call, - # then take FinalResult. Chunk-based feeding split short "sanad" - # utterances across chunk boundaries and Vosk's decoder often - # refused to commit, returning empty. Single-shot works for every - # voice-assistant example in Vosk's docs. - # - # When FinalResult is empty, also check PartialResult — sometimes - # Vosk heard something but didn't reach a segmentation boundary - # yet. PartialResult still has the text, just not "finalized". - rec.AcceptWaveform(audio.tobytes()) - final = _json.loads(rec.FinalResult()).get("text", "").strip() - if not final: - partial = _json.loads(rec.PartialResult()).get("partial", "").strip() - if partial: - final = partial - log.info(" (partial only, no final commit)") - text = final - - if not text: - log.info("Transcribed: (empty)") - return "" - - log.info("Transcribed: %s", text[:100]) - return text - - def _check_wake_word(self, text: str) -> bool: - """ - Check if transcribed text contains an English wake word. - Matches on word boundary (so "sandstorm" doesn't trigger off "sand"), - but is lenient about punctuation/whitespace around the word. - """ - import re - text_lower = text.lower().strip() - # word-boundary regex built once per call (cheap; runs 2×/sec) - for w in self._wake_en: - if re.search(r'\b' + re.escape(w) + r'\b', text_lower): - return True - return False - - # ─── MAIN LOOP ──────────────────────────────────────── + # ─── main loop ──────────────────────────────────────── def _voice_loop(self): - """Main voice processing loop — runs in background thread.""" - self._load_whisper() self._mic_capture.start() - log.info("Voice loop started — listening for wake word...") - + log.info("Voice loop started — listening for wake (energy-based, no ML)") while self._running: try: - if self._state == State.IDLE: - self._do_idle() - elif self._state == State.WAKE_HEARD: - self._do_wake_heard() - elif self._state == State.PROCESSING: - self._do_processing() - elif self._state == State.SPEAKING: - # Wait for any TTS to finish before returning to IDLE - while self._audio.is_speaking: - time.sleep(0.1) - self._state = State.IDLE + # Don't listen while the robot is speaking (prevents + # self-trigger from TTS output leaking into the mic). + if self._audio.is_speaking: + time.sleep(0.1) + self._detector.reset() + continue + + chunk = self._mic_capture.read_chunk(1024) # ~32 ms at 16 kHz + if not chunk: + continue + + if self._detector.process(chunk): + self._on_wake_fired() except Exception as e: log.error("Voice loop error: %s", e, exc_info=True) - self._state = State.IDLE time.sleep(1) - def _do_idle(self): - """Listen for wake word in 4-second chunks. Longer windows give - Vosk's decoder enough context to commit short utterances like a - single 'sanad'.""" - # Skip if robot is speaking — prevents self-listening - if self._audio.is_speaking: - time.sleep(0.2) - return + def _on_wake_fired(self): + log.info("Wake detected (acoustic)") + print("\n [Sanad] wake heard — type your command at the prompt.") + # TTS ack + msg = self._messages.get("wake_heard", "Yes") + try: + self._audio.speak(msg) + except Exception as e: + log.warning("TTS ack failed: %s", e) - audio = self._record_chunk(4.0) - - # Double-check speaking didn't start during recording - if self._audio.is_speaking: - return - - # Skip if too quiet (no one talking). Threshold lowered to 60 to - # match the G1 on-board mic's typical noise floor (std ~30-80 when - # idle, ~150+ when someone speaks). With 100 we were skipping - # quiet "sanad" utterances entirely. - if audio.std() < 60: - return - - # Wake-word pass uses restricted Vosk grammar (only "sanad" or "[unk]") - text = self._transcribe(audio, grammar=self._wake_grammar) - - if self._check_wake_word(text): - log.info("Wake word detected!") - # One clean line to the terminal so the operator knows voice - # actually heard them, even though all other voice logs are - # file-only. \n leads because we may be painting over a - # half-drawn `Command:` prompt. - print("\n [Sanad] wake heard — recording command…") - self._state = State.WAKE_HEARD - - # Acknowledge - self._audio.speak(self._config["messages"]["wake_heard"]) - - def _do_wake_heard(self): - """Record the command until silence.""" - # Wait for "Yes" TTS to finish before recording. - while self._audio.is_speaking: - time.sleep(0.1) - - # CRITICAL: flush the mic ring buffer. The UDP multicast receiver - # has been accumulating audio continuously (including pre-wake - # silence and the TTS "Yes" that just played back into the mic - # path). Without flush, _record_until_silence() reads the old - # buffered silence instantly, counts 3 silent chunks, and exits - # before the user has started speaking the command. - self._mic_capture.flush() - - log.info("Recording command...") - audio = self._record_until_silence() - - if len(audio) < 4000: # < 0.25s at 16kHz - log.info("Too short, ignoring") - self._audio.speak(self._config["messages"]["no_speech"]) - self._state = State.IDLE - return - - self._command_audio = audio - self._state = State.PROCESSING - - def _do_processing(self): - """Transcribe the command and send to brain.""" - text = self._transcribe(self._command_audio) - self._command_audio = None - - if not text or len(text.strip()) < 2: - log.info("Empty transcription") - self._audio.speak(self._config["messages"]["no_speech"]) - self._state = State.IDLE - return - - log.info("Command: %s", text) - - # Send to brain callback (lang always "en" in this build) - if self._on_command: + # Brain callbacks for compatibility with the old interface. + if self._on_wake: try: - self._on_command(text, "en") + self._on_wake() except Exception as e: - log.error("Brain callback error: %s", e) + log.error("on_wake callback error: %s", e) + elif self._on_command: + # Old API expected (text, lang). We have no transcription, so + # pass empty text — brain is expected to prompt for typed input. + try: + self._on_command("", "en") + except Exception as e: + log.error("on_command callback error: %s", e) - self._state = State.IDLE - - # ─── START / STOP ───────────────────────────────────── + # ─── start / stop ───────────────────────────────────── def start(self): - """Start voice listening in background thread.""" if self._running: - log.warning("Voice module already running") + log.warning("VoiceModule already running") return - self._running = True - self._state = State.IDLE - self._thread = threading.Thread(target=self._voice_loop, daemon=True, name="voice") + self._thread = threading.Thread( + target=self._voice_loop, daemon=True, name="voice", + ) self._thread.start() log.info("Voice module started") def stop(self): - """Stop voice listening.""" self._running = False try: self._mic_capture.stop() @@ -416,35 +183,23 @@ class VoiceModule: self._thread = None log.info("Voice module stopped") - @property - def state(self) -> str: - return self._state - @property def is_running(self) -> bool: return self._running -# ─── STANDALONE TEST ───────────────────────────────────── - +# ─── standalone test ───────────────────────────────────── if __name__ == "__main__": - import sys - sys.path.insert(0, PROJECT_ROOT) from API.audio_api import AudioAPI - def on_command(text, lang): - print(f"\n{'='*50}") - print(f" COMMAND [{lang}]: {text}") - print(f"{'='*50}\n") + def on_wake(): + print(" (brain callback fired)") audio = AudioAPI() - voice = VoiceModule(audio, on_command=on_command) - - print("Starting voice module... say 'Marcus' to wake.") - print("Press Ctrl+C to stop.\n") - + voice = VoiceModule(audio, on_wake=on_wake) + print("Starting voice module... say any short word to test the wake.") + print("Press Ctrl-C to stop.\n") voice.start() - try: while voice.is_running: time.sleep(0.5) diff --git a/Voice/wake_detector.py b/Voice/wake_detector.py new file mode 100644 index 0000000..ef90796 --- /dev/null +++ b/Voice/wake_detector.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +Voice/wake_detector.py — custom wake-word detector (no ML, no Vosk, no Whisper). + +Energy-envelope state machine. Monitors raw PCM audio and fires a wake +event when it sees a short speech burst (sized to match a single spoken +word like "Sanad") followed by a clear silence. + +Why this exists: + Vosk's small English lexicon doesn't contain the word "sanad" and + substitutes arbitrary English words ("us", "of", "senate"). Whisper on + this Jetson's torch-aarch64 produces "!!!!!" garbage. Both are broken + for this specific hardware + wake word. An acoustic detector using + only numpy doesn't care what the word actually is — it detects the + *shape* of a single spoken word in the audio energy envelope. + +Algorithm (state machine): + SILENCE ──(rms > speech_threshold)──> SPEAKING + SPEAKING ──(rms < silence_threshold for N chunks)──> ANALYZE + ANALYZE: if 0.2 s < speech_duration < 1.5 s → fire WAKE + else → reset to SILENCE (too short = cough, too long = sentence) + after fire → COOLDOWN for 1.5 s before next detection + +What it does NOT do: + - Does not identify which word was spoken (anything in the + duration range triggers) + - Does not transcribe follow-on commands (you type those at the + terminal) + - Does not protect against loud non-speech (clapping, door slam) + +Usage: + from Voice.wake_detector import WakeDetector + det = WakeDetector(sample_rate=16000) + while True: + chunk = mic.read_chunk(1024) # bytes of int16 PCM + if det.process(chunk): + print("Wake!") +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass +from typing import Optional + +import numpy as np + + +@dataclass +class WakeConfig: + sample_rate: int = 16_000 + # RMS (int16 units) above which we consider a chunk to be speech. + # G1 on-board mic at normal speaking distance has rms ≈ 500-1500 + # during speech and ≈ 40-100 in silence. 150 is a safe middle ground. + speech_threshold: float = 150.0 + # How long a burst of speech must last to count as a "word". + min_word_duration_s: float = 0.20 + max_word_duration_s: float = 1.50 + # How long of continuous silence we need to consider the word ended. + post_silence_s: float = 0.30 + # Minimum gap between two consecutive wake fires. Prevents a single + # spoken word from triggering twice. + cooldown_s: float = 1.50 + # RMS window size — we analyze this many ms of audio per step. + chunk_ms: int = 50 + + +class WakeDetector: + """Streaming acoustic wake detector — no language model required.""" + + STATE_SILENCE = "SILENCE" + STATE_SPEAKING = "SPEAKING" + + def __init__(self, cfg: Optional[WakeConfig] = None): + self.cfg = cfg or WakeConfig() + self._chunk_samples = int(self.cfg.sample_rate * self.cfg.chunk_ms / 1000) + self._min_speech = int(self.cfg.min_word_duration_s * self.cfg.sample_rate) + self._max_speech = int(self.cfg.max_word_duration_s * self.cfg.sample_rate) + self._post_silence = int(self.cfg.post_silence_s * self.cfg.sample_rate) + + self._state = self.STATE_SILENCE + self._speech_start = 0 # sample index where current burst began + self._silence_run = 0 # consecutive silent samples inside SPEAKING + self._sample_cursor = 0 # running sample count since start + self._cooldown_until = 0.0 # wall-clock time after which we can fire again + + # A small rolling buffer of leftover samples (when the caller's + # chunks don't align with our internal analysis window). + self._carry = np.zeros(0, dtype=np.int16) + + # ── public API ──────────────────────────────────────────────── + + def process(self, pcm_bytes: bytes) -> bool: + """ + Feed int16 PCM bytes. Returns True once per spoken "word" + (short speech burst followed by silence). + """ + if not pcm_bytes: + return False + incoming = np.frombuffer(pcm_bytes, dtype=np.int16) + samples = np.concatenate([self._carry, incoming]) if self._carry.size else incoming + + fired = False + n = self._chunk_samples + i = 0 + while i + n <= samples.size: + window = samples[i:i + n] + if self._step(window): + fired = True + # break — flush the rest on next call so we get one fire per word + i += n + break + i += n + self._sample_cursor += n + + # Keep whatever didn't fit in a full window for next call. + self._carry = samples[i:].copy() + return fired + + def reset(self) -> None: + """Drop all state — call when resuming from a long pause.""" + self._state = self.STATE_SILENCE + self._silence_run = 0 + self._carry = np.zeros(0, dtype=np.int16) + + # ── internal ────────────────────────────────────────────────── + + def _step(self, window: np.ndarray) -> bool: + rms = float(np.sqrt(np.mean(window.astype(np.float64) ** 2))) + is_speech = rms > self.cfg.speech_threshold + + now = time.time() + if now < self._cooldown_until: + return False # silent during cooldown + + if self._state == self.STATE_SILENCE: + if is_speech: + self._state = self.STATE_SPEAKING + self._speech_start = self._sample_cursor + self._silence_run = 0 + return False + + # STATE_SPEAKING + if is_speech: + self._silence_run = 0 + # Abort if the burst is longer than a single word — user is + # just talking, not addressing the robot. + if self._sample_cursor - self._speech_start > self._max_speech: + self._state = self.STATE_SILENCE + return False + + # Silent window inside SPEAKING — accumulate. + self._silence_run += window.size + if self._silence_run >= self._post_silence: + speech_len = (self._sample_cursor - self._silence_run) - self._speech_start + self._state = self.STATE_SILENCE + self._silence_run = 0 + if self._min_speech <= speech_len <= self._max_speech: + self._cooldown_until = now + self.cfg.cooldown_s + return True + return False + + +# ── standalone test ───────────────────────────────────────────── + +if __name__ == "__main__": + import os + import sys + _HERE = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, os.path.dirname(_HERE)) + from Voice.builtin_mic import BuiltinMic + + print("WakeDetector standalone test — say 'Sanad' a few times.") + print("(Ctrl-C to quit)\n") + det = WakeDetector() + mic = BuiltinMic() + mic.start() + try: + while True: + chunk = mic.read_chunk(1024) + if det.process(chunk): + print(f" [WAKE] (t={time.strftime('%H:%M:%S')})") + except KeyboardInterrupt: + pass + finally: + mic.stop()