Files
smart-speaker/wakeword.py

158 lines
4.5 KiB
Python

"""
Wake word detection module using Porcupine.
Listens for the "Alexandr" wake word.
"""
import pvporcupine
import pyaudio
import struct
from config import PORCUPINE_ACCESS_KEY, PORCUPINE_KEYWORD_PATH
class WakeWordDetector:
"""Detects wake word using Porcupine."""
def __init__(self):
self.porcupine = None
self.audio_stream = None
self.pa = None
self._stream_closed = True # Track state explicitly
def initialize(self):
"""Initialize Porcupine and audio stream."""
self.porcupine = pvporcupine.create(
access_key=PORCUPINE_ACCESS_KEY,
keyword_paths=[str(PORCUPINE_KEYWORD_PATH)]
)
self.pa = pyaudio.PyAudio()
self._open_stream()
print("🎤 Ожидание wake word 'Alexandr'...")
def _open_stream(self):
"""Open the audio stream."""
if self.audio_stream and not self._stream_closed:
return
if self.audio_stream:
try:
self.audio_stream.close()
except: pass
self.audio_stream = self.pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length
)
self._stream_closed = False
def stop_monitoring(self):
"""Explicitly stop and close the stream."""
if self.audio_stream and not self._stream_closed:
try:
self.audio_stream.stop_stream()
self.audio_stream.close()
except: pass
self._stream_closed = True
def wait_for_wakeword(self, timeout: float = None) -> bool:
"""
Blocks until wake word is detected or timeout expires.
Args:
timeout: Maximum seconds to wait. None = infinite.
Returns:
True if wake word detected, False if timeout.
"""
import time
if not self.porcupine:
self.initialize()
# Ensure stream is open
self._open_stream()
start_time = time.time()
while True:
if timeout and (time.time() - start_time > timeout):
return False
pcm = self.audio_stream.read(self.porcupine.frame_length, exception_on_overflow=False)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
print("✅ Wake word обнаружен!")
self.stop_monitoring()
return True
def check_wakeword_once(self) -> bool:
"""
Non-blocking check for wake word.
Returns True if wake word detected, False otherwise.
"""
if not self.porcupine:
self.initialize()
try:
# Ensure stream is open
self._open_stream()
pcm = self.audio_stream.read(self.porcupine.frame_length, exception_on_overflow=False)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
print("🛑 Wake word обнаружен во время ответа!")
return True
return False
except Exception:
return False
def cleanup(self):
"""Release resources."""
self.stop_monitoring()
if self.pa:
self.pa.terminate()
if self.porcupine:
self.porcupine.delete()
# Global instance
_detector = None
def get_detector() -> WakeWordDetector:
"""Get or create wake word detector instance."""
global _detector
if _detector is None:
_detector = WakeWordDetector()
return _detector
def wait_for_wakeword(timeout: float = None) -> bool:
"""Wait for wake word detection."""
return get_detector().wait_for_wakeword(timeout)
def stop_monitoring():
"""Stop monitoring for wake word."""
if _detector:
_detector.stop_monitoring()
def cleanup():
"""Cleanup detector resources."""
global _detector
if _detector:
_detector.cleanup()
_detector = None
def check_wakeword_once() -> bool:
"""
Non-blocking check for wake word.
Returns True if wake word detected, False otherwise.
"""
return get_detector().check_wakeword_once()