""" Wake word detection module using Porcupine. Listens for the configured wake word. """ # Этот модуль отвечает за "уши" ассистента в режиме ожидания. # Он использует библиотеку Porcupine для эффективного (мало CPU) обнаружения ключевой фразы. import pvporcupine import pyaudio import struct import numpy as np from ..core.config import ( PORCUPINE_ACCESS_KEY, PORCUPINE_KEYWORD_PATH, PORCUPINE_SENSITIVITY, WAKE_WORD, ) from ..core.audio_manager import get_audio_manager class WakeWordDetector: """Класс для обнаружения wake word с использованием Porcupine.""" def __init__(self): self.porcupine = None self.audio_stream = None self.pa = None self._audio_manager = None self._input_device_index = None self._capture_sample_rate = None self._capture_frame_length = None self._resampled_pcm_buffer = np.array([], dtype=np.int16) self._stream_closed = True # Флаг состояния потока (закрыт/открыт) self._last_hit_ts = 0.0 def initialize(self): """Инициализация Porcupine и PyAudio.""" # Создаем экземпляр Porcupine с нашим ключом доступа и файлом модели (.ppn) self.porcupine = pvporcupine.create( access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[str(PORCUPINE_KEYWORD_PATH)], sensitivities=[PORCUPINE_SENSITIVITY], ) # Используем общий экземпляр PyAudio self._audio_manager = get_audio_manager() self.pa = self._audio_manager.get_pyaudio() self._open_stream() print( f"🎤 Ожидание wake word '{WAKE_WORD}' " f"(sens={PORCUPINE_SENSITIVITY:.2f}, mic_rate={self._capture_sample_rate})..." ) def _open_stream(self): """Открытие аудиопотока с микрофона.""" if self.audio_stream and not self._stream_closed: return # Уже открыт # Если был открыт старый поток, пробуем закрыть if self.audio_stream: try: self.audio_stream.close() except Exception: pass target_rate = int(self.porcupine.sample_rate) fallback_rates = [48000, 44100, 32000, 22050, 16000] self.audio_stream, self._input_device_index, actual_rate = self._audio_manager.open_input_stream( rate=target_rate, channels=1, format=pyaudio.paInt16, frames_per_buffer=self.porcupine.frame_length, preferred_index=self._input_device_index, fallback_rates=fallback_rates, ) self._capture_sample_rate = int(actual_rate) self._capture_frame_length = max( 64, int( round( self.porcupine.frame_length * self._capture_sample_rate / target_rate ) ), ) self._resampled_pcm_buffer = np.array([], dtype=np.int16) self._stream_closed = False def stop_monitoring(self): """Явная остановка и закрытие потока (чтобы освободить микрофон для других задач).""" if self.audio_stream and not self._stream_closed: try: self.audio_stream.stop_stream() self.audio_stream.close() except Exception: pass self._stream_closed = True def _resample_to_target_rate(self, pcm: np.ndarray) -> np.ndarray: target_rate = int(self.porcupine.sample_rate) source_rate = int(self._capture_sample_rate or target_rate) if source_rate == target_rate: return pcm if pcm.size == 0: return np.array([], dtype=np.int16) target_length = max(1, int(round(pcm.size * target_rate / source_rate))) x_old = np.arange(pcm.size, dtype=np.float32) x_new = np.linspace(0.0, float(max(0, pcm.size - 1)), target_length) resampled = np.interp(x_new, x_old, pcm.astype(np.float32)) return np.asarray(resampled, dtype=np.int16) def _read_porcupine_frame(self): target_length = int(self.porcupine.frame_length) if self._capture_sample_rate == self.porcupine.sample_rate: pcm = self.audio_stream.read(target_length, exception_on_overflow=False) return np.asarray(struct.unpack_from("h" * target_length, pcm), dtype=np.int16) while self._resampled_pcm_buffer.size < target_length: raw = self.audio_stream.read( self._capture_frame_length, exception_on_overflow=False ) captured = np.frombuffer(raw, dtype=np.int16) converted = self._resample_to_target_rate(captured) if converted.size: self._resampled_pcm_buffer = np.concatenate( (self._resampled_pcm_buffer, converted) ) frame = self._resampled_pcm_buffer[:target_length] self._resampled_pcm_buffer = self._resampled_pcm_buffer[target_length:] return frame def wait_for_wakeword(self, timeout: float = None) -> bool: """ Блокирующая функция: ждет, пока не будет услышана wake word или пока не истечет timeout. Args: timeout: Максимальное время ожидания в секундах. None = ждать бесконечно. Returns: True, если фраза обнаружена. False, если вышел таймаут. """ import time if not self.porcupine: self.initialize() # Убеждаемся, что поток открыт self._open_stream() start_time = time.time() while True: # Проверка таймаута if timeout and (time.time() - start_time > timeout): return False # Читаем небольшой кусочек аудио (frame) pcm = self._read_porcupine_frame() # Обрабатываем фрейм через Porcupine keyword_index = self.porcupine.process(pcm.tolist()) # Если keyword_index >= 0, значит ключевое слово обнаружено if keyword_index >= 0: print("✅ Wake word обнаружен!") # Важно: закрываем поток, чтобы освободить микрофон для STT (Deepgram) self.stop_monitoring() return True def check_wakeword_once(self) -> bool: """ Неблокирующая проверка (один кадр). Используется во время того, как ассистент говорит (TTS), чтобы проверить, не пытается ли пользователь его перебить. Returns: True, если фраза обнаружена прямо сейчас. """ import time if not self.porcupine: self.initialize() try: self._open_stream() pcm = self._read_porcupine_frame() keyword_index = self.porcupine.process(pcm.tolist()) if keyword_index >= 0: now = time.time() if now - self._last_hit_ts < 0.2: # Уменьшаем интервал для более быстрой реакции return False self._last_hit_ts = now print("🛑 Wake word обнаружен во время ответа!") return True return False except Exception: return False def cleanup(self): """Освобождение ресурсов при выходе.""" self.stop_monitoring() # self.pa.terminate() - Не делаем этого, так как PyAudio общий if self.porcupine: self.porcupine.delete() # Глобальный экземпляр детектора (Singleton) _detector = None def get_detector() -> WakeWordDetector: """Получить или создать глобальный экземпляр детектора.""" global _detector if _detector is None: _detector = WakeWordDetector() return _detector def wait_for_wakeword(timeout: float = None) -> bool: """Внешняя функция для ожидания wake word.""" return get_detector().wait_for_wakeword(timeout) def stop_monitoring(): """Внешняя функция для остановки мониторинга.""" if _detector: _detector.stop_monitoring() def cleanup(): """Внешняя функция очистки ресурсов.""" global _detector if _detector: _detector.cleanup() _detector = None def check_wakeword_once() -> bool: """Внешняя функция для быстрой проверки.""" return get_detector().check_wakeword_once()