Files
smart-speaker/app/audio/wakeword.py

195 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Wake word detection module using Porcupine.
Listens for the "Alexandr" wake word.
"""
# Этот модуль отвечает за "уши" ассистента в режиме ожидания.
# Он использует библиотеку Porcupine для эффективного (мало CPU) обнаружения ключевой фразы "Alexandr".
import pvporcupine
import pyaudio
import struct
from ..core.config import (
PORCUPINE_ACCESS_KEY,
PORCUPINE_KEYWORD_PATH,
PORCUPINE_SENSITIVITY,
)
from ..core.audio_manager import get_audio_manager
class WakeWordDetector:
"""Класс для обнаружения wake word с использованием Porcupine."""
def __init__(self):
self.porcupine = None
self.audio_stream = None
self.pa = None
self._stream_closed = True # Флаг состояния потока (закрыт/открыт)
self._last_hit_ts = 0.0
def initialize(self):
"""Инициализация Porcupine и PyAudio."""
# Создаем экземпляр Porcupine с нашим ключом доступа и файлом модели (.ppn)
self.porcupine = pvporcupine.create(
access_key=PORCUPINE_ACCESS_KEY,
keyword_paths=[str(PORCUPINE_KEYWORD_PATH)],
sensitivities=[PORCUPINE_SENSITIVITY],
)
# Используем общий экземпляр PyAudio
self.pa = get_audio_manager().get_pyaudio()
self._open_stream()
print(f"🎤 Ожидание wake word 'Alexandr' (sens={PORCUPINE_SENSITIVITY:.2f})...")
def _open_stream(self):
"""Открытие аудиопотока с микрофона."""
if self.audio_stream and not self._stream_closed:
return # Уже открыт
# Если был открыт старый поток, пробуем закрыть
if self.audio_stream:
try:
self.audio_stream.close()
except Exception:
pass
# Открываем поток с параметрами, которые требует Porcupine
self.audio_stream = self.pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length,
)
self._stream_closed = False
def stop_monitoring(self):
"""Явная остановка и закрытие потока (чтобы освободить микрофон для других задач)."""
if self.audio_stream and not self._stream_closed:
try:
self.audio_stream.stop_stream()
self.audio_stream.close()
except Exception:
pass
self._stream_closed = True
def wait_for_wakeword(self, timeout: float = None) -> bool:
"""
Блокирующая функция: ждет, пока не будет услышана фраза "Alexandr"
или пока не истечет timeout.
Args:
timeout: Максимальное время ожидания в секундах. None = ждать бесконечно.
Returns:
True, если фраза обнаружена. False, если вышел таймаут.
"""
import time
if not self.porcupine:
self.initialize()
# Убеждаемся, что поток открыт
self._open_stream()
start_time = time.time()
while True:
# Проверка таймаута
if timeout and (time.time() - start_time > timeout):
return False
# Читаем небольшой кусочек аудио (frame)
pcm = self.audio_stream.read(
self.porcupine.frame_length, exception_on_overflow=False
)
# Конвертируем байты в кортеж чисел (требование Porcupine)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
# Обрабатываем фрейм через Porcupine
keyword_index = self.porcupine.process(pcm)
# Если keyword_index >= 0, значит ключевое слово обнаружено
if keyword_index >= 0:
print("✅ Wake word обнаружен!")
# Важно: закрываем поток, чтобы освободить микрофон для STT (Deepgram)
self.stop_monitoring()
return True
def check_wakeword_once(self) -> bool:
"""
Неблокирующая проверка (один кадр).
Используется во время того, как ассистент говорит (TTS),
чтобы проверить, не пытается ли пользователь его перебить.
Returns:
True, если фраза обнаружена прямо сейчас.
"""
import time
if not self.porcupine:
self.initialize()
try:
self._open_stream()
pcm = self.audio_stream.read(
self.porcupine.frame_length, exception_on_overflow=False
)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
now = time.time()
if now - self._last_hit_ts < 0.2: # Уменьшаем интервал для более быстрой реакции
return False
self._last_hit_ts = now
print("🛑 Wake word обнаружен во время ответа!")
return True
return False
except Exception:
return False
def cleanup(self):
"""Освобождение ресурсов при выходе."""
self.stop_monitoring()
# self.pa.terminate() - Не делаем этого, так как PyAudio общий
if self.porcupine:
self.porcupine.delete()
# Глобальный экземпляр детектора (Singleton)
_detector = None
def get_detector() -> WakeWordDetector:
"""Получить или создать глобальный экземпляр детектора."""
global _detector
if _detector is None:
_detector = WakeWordDetector()
return _detector
def wait_for_wakeword(timeout: float = None) -> bool:
"""Внешняя функция для ожидания wake word."""
return get_detector().wait_for_wakeword(timeout)
def stop_monitoring():
"""Внешняя функция для остановки мониторинга."""
if _detector:
_detector.stop_monitoring()
def cleanup():
"""Внешняя функция очистки ресурсов."""
global _detector
if _detector:
_detector.cleanup()
_detector = None
def check_wakeword_once() -> bool:
"""Внешняя функция для быстрой проверки."""
return get_detector().check_wakeword_once()