""" Wake word detection module using Porcupine. Listens for the "Alexandr" wake word. """ # Этот модуль отвечает за "уши" ассистента в режиме ожидания. # Он использует библиотеку Porcupine для эффективного (мало CPU) обнаружения ключевой фразы "Alexandr". import pvporcupine import pyaudio import struct from ..core.config import PORCUPINE_ACCESS_KEY, PORCUPINE_KEYWORD_PATH from ..core.audio_manager import get_audio_manager class WakeWordDetector: """Класс для обнаружения wake word с использованием Porcupine.""" def __init__(self): self.porcupine = None self.audio_stream = None self.pa = None self._stream_closed = True # Флаг состояния потока (закрыт/открыт) self._last_hit_ts = 0.0 def initialize(self): """Инициализация Porcupine и PyAudio.""" # Создаем экземпляр Porcupine с нашим ключом доступа и файлом модели (.ppn) self.porcupine = pvporcupine.create( access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[str(PORCUPINE_KEYWORD_PATH)] ) # Используем общий экземпляр PyAudio self.pa = get_audio_manager().get_pyaudio() self._open_stream() print("🎤 Ожидание wake word 'Alexandr'...") def _open_stream(self): """Открытие аудиопотока с микрофона.""" if self.audio_stream and not self._stream_closed: return # Уже открыт # Если был открыт старый поток, пробуем закрыть if self.audio_stream: try: self.audio_stream.close() except: pass # Открываем поток с параметрами, которые требует Porcupine self.audio_stream = self.pa.open( rate=self.porcupine.sample_rate, channels=1, format=pyaudio.paInt16, input=True, frames_per_buffer=self.porcupine.frame_length, ) self._stream_closed = False def stop_monitoring(self): """Явная остановка и закрытие потока (чтобы освободить микрофон для других задач).""" if self.audio_stream and not self._stream_closed: try: self.audio_stream.stop_stream() self.audio_stream.close() except: pass self._stream_closed = True def wait_for_wakeword(self, timeout: float = None) -> bool: """ Блокирующая функция: ждет, пока не будет услышана фраза "Alexandr" или пока не истечет timeout. Args: timeout: Максимальное время ожидания в секундах. None = ждать бесконечно. Returns: True, если фраза обнаружена. False, если вышел таймаут. """ import time if not self.porcupine: self.initialize() # Убеждаемся, что поток открыт self._open_stream() start_time = time.time() while True: # Проверка таймаута if timeout and (time.time() - start_time > timeout): return False # Читаем небольшой кусочек аудио (frame) pcm = self.audio_stream.read( self.porcupine.frame_length, exception_on_overflow=False ) # Конвертируем байты в кортеж чисел (требование Porcupine) pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm) # Обрабатываем фрейм через Porcupine keyword_index = self.porcupine.process(pcm) # Если keyword_index >= 0, значит ключевое слово обнаружено if keyword_index >= 0: print("✅ Wake word обнаружен!") # Важно: закрываем поток, чтобы освободить микрофон для STT (Deepgram) self.stop_monitoring() return True def check_wakeword_once(self) -> bool: """ Неблокирующая проверка (один кадр). Используется во время того, как ассистент говорит (TTS), чтобы проверить, не пытается ли пользователь его перебить. Returns: True, если фраза обнаружена прямо сейчас. """ import time if not self.porcupine: self.initialize() try: self._open_stream() pcm = self.audio_stream.read( self.porcupine.frame_length, exception_on_overflow=False ) pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm) keyword_index = self.porcupine.process(pcm) if keyword_index >= 0: now = time.time() if now - self._last_hit_ts < 0.4: return False self._last_hit_ts = now print("🛑 Wake word обнаружен во время ответа!") return True return False except Exception: return False def cleanup(self): """Освобождение ресурсов при выходе.""" self.stop_monitoring() # self.pa.terminate() - Не делаем этого, так как PyAudio общий if self.porcupine: self.porcupine.delete() # Глобальный экземпляр детектора (Singleton) _detector = None def get_detector() -> WakeWordDetector: """Получить или создать глобальный экземпляр детектора.""" global _detector if _detector is None: _detector = WakeWordDetector() return _detector def wait_for_wakeword(timeout: float = None) -> bool: """Внешняя функция для ожидания wake word.""" return get_detector().wait_for_wakeword(timeout) def stop_monitoring(): """Внешняя функция для остановки мониторинга.""" if _detector: _detector.stop_monitoring() def cleanup(): """Внешняя функция очистки ресурсов.""" global _detector if _detector: _detector.cleanup() _detector = None def check_wakeword_once() -> bool: """Внешняя функция для быстрой проверки.""" return get_detector().check_wakeword_once()