114 lines
3.2 KiB
Python
114 lines
3.2 KiB
Python
"""
|
|
Wake word detection module using Porcupine.
|
|
Listens for the "Alexandr" wake word.
|
|
"""
|
|
import pvporcupine
|
|
import pyaudio
|
|
import struct
|
|
from config import PORCUPINE_ACCESS_KEY, PORCUPINE_KEYWORD_PATH
|
|
|
|
|
|
class WakeWordDetector:
|
|
"""Detects wake word using Porcupine."""
|
|
|
|
def __init__(self):
|
|
self.porcupine = None
|
|
self.audio_stream = None
|
|
self.pa = None
|
|
|
|
def initialize(self):
|
|
"""Initialize Porcupine and audio stream."""
|
|
self.porcupine = pvporcupine.create(
|
|
access_key=PORCUPINE_ACCESS_KEY,
|
|
keyword_paths=[str(PORCUPINE_KEYWORD_PATH)]
|
|
)
|
|
|
|
self.pa = pyaudio.PyAudio()
|
|
self.audio_stream = self.pa.open(
|
|
rate=self.porcupine.sample_rate,
|
|
channels=1,
|
|
format=pyaudio.paInt16,
|
|
input=True,
|
|
frames_per_buffer=self.porcupine.frame_length
|
|
)
|
|
print("🎤 Ожидание wake word 'Alexandr'...")
|
|
|
|
def wait_for_wakeword(self) -> bool:
|
|
"""
|
|
Blocks until wake word is detected.
|
|
Returns True when wake word is detected.
|
|
"""
|
|
if not self.porcupine:
|
|
self.initialize()
|
|
|
|
while True:
|
|
pcm = self.audio_stream.read(self.porcupine.frame_length, exception_on_overflow=False)
|
|
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
|
|
|
|
keyword_index = self.porcupine.process(pcm)
|
|
if keyword_index >= 0:
|
|
print("✅ Wake word обнаружен!")
|
|
return True
|
|
|
|
def check_wakeword_once(self) -> bool:
|
|
"""
|
|
Non-blocking check for wake word.
|
|
Returns True if wake word detected, False otherwise.
|
|
"""
|
|
if not self.porcupine:
|
|
self.initialize()
|
|
|
|
try:
|
|
pcm = self.audio_stream.read(self.porcupine.frame_length, exception_on_overflow=False)
|
|
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
|
|
|
|
keyword_index = self.porcupine.process(pcm)
|
|
if keyword_index >= 0:
|
|
print("🛑 Wake word обнаружен во время ответа!")
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def cleanup(self):
|
|
"""Release resources."""
|
|
if self.audio_stream:
|
|
self.audio_stream.close()
|
|
if self.pa:
|
|
self.pa.terminate()
|
|
if self.porcupine:
|
|
self.porcupine.delete()
|
|
|
|
|
|
# Global instance
|
|
_detector = None
|
|
|
|
|
|
def get_detector() -> WakeWordDetector:
|
|
"""Get or create wake word detector instance."""
|
|
global _detector
|
|
if _detector is None:
|
|
_detector = WakeWordDetector()
|
|
return _detector
|
|
|
|
|
|
def wait_for_wakeword() -> bool:
|
|
"""Wait for wake word detection."""
|
|
return get_detector().wait_for_wakeword()
|
|
|
|
|
|
def cleanup():
|
|
"""Cleanup detector resources."""
|
|
global _detector
|
|
if _detector:
|
|
_detector.cleanup()
|
|
_detector = None
|
|
|
|
|
|
def check_wakeword_once() -> bool:
|
|
"""
|
|
Non-blocking check for wake word.
|
|
Returns True if wake word detected, False otherwise.
|
|
"""
|
|
return get_detector().check_wakeword_once()
|