другая структура проекта + beads + александр повтори + комментарии везде + readme

This commit is contained in:
2026-01-09 04:14:50 +03:00
parent 242ead5355
commit ce28fede74
31 changed files with 1654 additions and 1333 deletions

0
app/audio/__init__.py Normal file
View File

147
app/audio/local_stt.py Normal file
View File

@@ -0,0 +1,147 @@
"""
Local offline Speech-to-Text module using Vosk.
Used for simple command detection (like "stop") without internet.
"""
# Модуль локального распознавания речи (Vosk).
# Работает полностью оффлайн (без интернета).
# Используется, когда нужно распознать простые команды (например, "стоп" во время будильника),
# чтобы не тратить трафик и время на обращение к облаку.
import os
import sys
import json
import pyaudio
from vosk import Model, KaldiRecognizer
from ..core.config import VOSK_MODEL_PATH, SAMPLE_RATE
class LocalRecognizer:
"""Класс для работы с Vosk."""
def __init__(self):
self.model = None
self.rec = None
self.pa = None
self.stream = None
def initialize(self):
"""Загрузка модели Vosk."""
if not os.path.exists(VOSK_MODEL_PATH):
print(f"❌ Ошибка: Vosk модель не найдена по пути {VOSK_MODEL_PATH}")
return False
print("📦 Инициализация локального STT (Vosk)...")
# Трюк для подавления вывода логов Vosk в консоль (он очень шумный)
try:
null_fd = os.open(os.devnull, os.O_WRONLY)
old_stderr = os.dup(2)
sys.stderr.flush()
os.dup2(null_fd, 2)
os.close(null_fd)
# Сама загрузка модели
self.model = Model(str(VOSK_MODEL_PATH))
# Возвращаем stderr обратно
os.dup2(old_stderr, 2)
os.close(old_stderr)
except Exception as e:
print(f"Error initializing Vosk: {e}")
return False
self.rec = KaldiRecognizer(self.model, SAMPLE_RATE)
self.pa = pyaudio.PyAudio()
return True
def listen_for_keywords(self, keywords: list, timeout: float = 10.0) -> str:
"""
Слушает микрофон заданное время и проверяет наличие ключевых слов.
Args:
keywords: Список слов, которые мы ждем (например, ["стоп", "хватит"]).
timeout: Сколько секунд слушать.
Returns:
Найденное слово или пустую строку.
"""
if not self.model:
if not self.initialize():
return ""
# Открываем поток микрофона
try:
stream = self.pa.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=4096,
)
stream.start_stream()
except Exception as e:
print(f"❌ Ошибка микрофона: {e}")
return ""
import time
start_time = time.time()
print(f"👂 Локальное слушание ожидает: {keywords}")
detected_text = ""
try:
while time.time() - start_time < timeout:
data = stream.read(4096, exception_on_overflow=False)
# Vosk обрабатывает аудио чанками
if self.rec.AcceptWaveform(data):
# Полный результат
res = json.loads(self.rec.Result())
text = res.get("text", "")
if text:
print(f"📝 Локально: {text}")
# Проверяем, есть ли ключевое слово в распознанном тексте
for kw in keywords:
if kw in text:
detected_text = text
break
else:
# Частичный результат (быстрее, чем полный)
res = json.loads(self.rec.PartialResult())
partial = res.get("partial", "")
if partial:
for kw in keywords:
if kw in partial:
detected_text = partial
break
if detected_text:
break
finally:
stream.stop_stream()
stream.close()
return detected_text
def cleanup(self):
if self.pa:
self.pa.terminate()
# Глобальный экземпляр
_local_recognizer = None
def get_local_recognizer():
global _local_recognizer
if _local_recognizer is None:
_local_recognizer = LocalRecognizer()
return _local_recognizer
def listen_for_keywords(keywords: list, timeout: float = 5.0) -> str:
"""Внешняя функция для поиска ключевых слов."""
return get_local_recognizer().listen_for_keywords(keywords, timeout)

87
app/audio/sound_level.py Normal file
View File

@@ -0,0 +1,87 @@
"""
Volume control module.
Regulates system volume on a scale from 1 to 10.
"""
# Модуль управления громкостью системы.
# Работает через системную утилиту amixer (ALSA) в Linux.
import subprocess
import re
# Карта для перевода слов в цифры ("пять" -> 5)
NUMBER_MAP = {
"один": 1,
"раз": 1,
"два": 2,
"три": 3,
"четыре": 4,
"пять": 5,
"шесть": 6,
"семь": 7,
"восемь": 8,
"девять": 9,
"десять": 10,
}
def set_volume(level: int) -> bool:
"""
Устанавливает системную громкость (шкала 1-10).
1 -> 10%
10 -> 100%
Args:
level: Число от 1 до 10.
Returns:
True, если успешно.
"""
if not isinstance(level, int):
print(
f"❌ Ошибка: Уровень громкости должен быть целым числом, получено {type(level)}"
)
return False
# Ограничение диапазона
if level < 1:
level = 1
elif level > 10:
level = 10
percentage = level * 10
try:
# Вызов команды amixer для изменения громкости Master канала
# -q: quiet (без вывода)
# sset: simple set
cmd = ["amixer", "-q", "sset", "Master", f"{percentage}%"]
subprocess.run(cmd, check=True)
print(f"🔊 Громкость установлена на {level} ({percentage}%)")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Ошибка при установке громкости: {e}")
return False
except Exception as e:
print(f"❌ Неизвестная ошибка громкости: {e}")
return False
def parse_volume_text(text: str) -> int | None:
"""
Пытается найти число громкости в тексте.
Понимает и цифры ("5"), и слова ("пять").
"""
text = text.lower()
# 1. Ищем цифры (1-10)
num_match = re.search(r"\b(10|[1-9])\b", text)
if num_match:
return int(num_match.group())
# 2. Ищем слова из словаря
for word, value in NUMBER_MAP.items():
if word in text:
return value
return None

284
app/audio/stt.py Normal file
View File

@@ -0,0 +1,284 @@
"""
Speech-to-Text module using Deepgram API.
Recognizes speech from microphone using streaming WebSocket.
Supports Russian (default) and English.
"""
# Модуль распознавания речи (STT - Speech-to-Text).
# Использует Deepgram API через веб-сокеты для потокового распознавания в реальном времени.
import asyncio
import time
import pyaudio
import logging
from ..core.config import DEEPGRAM_API_KEY, SAMPLE_RATE
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
)
import deepgram.clients.common.v1.abstract_sync_websocket as sdk_ws
import websockets.sync.client
# --- Патч (исправление) для библиотеки websockets ---
# По умолчанию Deepgram SDK использует слишком короткий таймаут подключения.
# Это часто вызывает ошибки при медленном SSL рукопожатии.
# Мы подменяем функцию connect, чтобы увеличить таймаут до 30 секунд.
_original_connect = websockets.sync.client.connect
def _patched_connect(*args, **kwargs):
kwargs.setdefault("open_timeout", 30)
kwargs.setdefault("ping_timeout", 30)
kwargs.setdefault("close_timeout", 30)
print(f"DEBUG: Connecting to Deepgram with timeout={kwargs.get('open_timeout')}s")
return _original_connect(*args, **kwargs)
# Применяем патч
sdk_ws.connect = _patched_connect
# Отключаем лишний мусор в логах
logging.getLogger("deepgram").setLevel(logging.WARNING)
class SpeechRecognizer:
"""Класс распознавания речи через Deepgram."""
def __init__(self):
self.dg_client = None
self.pa = None
self.stream = None
self.transcript = ""
def initialize(self):
"""Инициализация клиента Deepgram и PyAudio."""
if not DEEPGRAM_API_KEY:
raise ValueError("DEEPGRAM_API_KEY is not set in environment or config.")
print("📦 Инициализация Deepgram STT...")
config = DeepgramClientOptions(
verbose=logging.WARNING,
)
self.dg_client = DeepgramClient(DEEPGRAM_API_KEY, config)
self.pa = pyaudio.PyAudio()
print("✅ Deepgram клиент готов")
def _get_stream(self):
"""Открывает аудиопоток PyAudio, если он еще не открыт."""
if self.stream is None:
self.stream = self.pa.open(
rate=SAMPLE_RATE,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=4096,
)
return self.stream
async def _process_audio(self, dg_connection, timeout_seconds, detection_timeout):
"""
Асинхронная функция для отправки аудио и получения текста.
Args:
dg_connection: Активное соединение с Deepgram.
timeout_seconds: Общее время прослушивания.
detection_timeout: Время ожидания начала речи.
"""
self.transcript = ""
transcript_parts = []
loop = asyncio.get_running_loop()
stream = self._get_stream()
# События для синхронизации
stop_event = asyncio.Event() # Пора останавливаться
speech_started_event = asyncio.Event() # Речь обнаружена (VAD)
# --- Обработчики событий Deepgram ---
def on_transcript(unused_self, result, **kwargs):
"""Вызывается, когда приходит часть текста."""
sentence = result.channel.alternatives[0].transcript
if len(sentence) == 0:
return
if result.is_final:
# Собираем только финальные (подтвержденные) фразы
transcript_parts.append(sentence)
self.transcript = " ".join(transcript_parts).strip()
def on_speech_started(unused_self, speech_started, **kwargs):
"""Вызывается, когда VAD (Voice Activity Detection) слышит голос."""
loop.call_soon_threadsafe(speech_started_event.set)
def on_utterance_end(unused_self, utterance_end, **kwargs):
"""Вызывается, когда Deepgram решает, что фраза закончилась (пауза)."""
loop.call_soon_threadsafe(stop_event.set)
def on_error(unused_self, error, **kwargs):
print(f"Error: {error}")
loop.call_soon_threadsafe(stop_event.set)
# Подписываемся на события
dg_connection.on(LiveTranscriptionEvents.Transcript, on_transcript)
dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started)
dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end)
dg_connection.on(LiveTranscriptionEvents.Error, on_error)
# Параметры распознавания
options = LiveOptions(
model="nova-2", # Самая быстрая и точная модель
language=self.current_lang,
smart_format=True, # Расстановка знаков препинания
encoding="linear16",
channels=1,
sample_rate=SAMPLE_RATE,
interim_results=True,
utterance_end_ms=1200, # Пауза 1.2с считается концом фразы
vad_events=True,
)
if dg_connection.start(options) is False:
print("Failed to start Deepgram connection")
return
# --- Задача отправки аудио ---
async def send_audio():
chunks_sent = 0
try:
stream.start_stream()
print("🎤 Stream started, sending audio...")
while not stop_event.is_set():
if stream.is_active():
data = stream.read(4096, exception_on_overflow=False)
# Отправка данных (синхронная в этой версии SDK)
dg_connection.send(data)
chunks_sent += 1
if chunks_sent % 50 == 0:
print(f".", end="", flush=True)
# Уступаем время другим задачам
await asyncio.sleep(0.005)
except Exception as e:
print(f"Audio send error: {e}")
finally:
stream.stop_stream()
print(f"\n🛑 Stream stopped. Chunks sent: {chunks_sent}")
sender_task = asyncio.create_task(send_audio())
try:
# 1. Ждем начала речи (если задан detection_timeout)
if detection_timeout:
try:
await asyncio.wait_for(
speech_started_event.wait(), timeout=detection_timeout
)
except asyncio.TimeoutError:
# Если за detection_timeout (5 сек) никто не начал говорить, выходим
stop_event.set()
# 2. Если речь началась (или таймаута нет), ждем завершения (stop_event)
# stop_event сработает либо по UtteranceEnd (пауза), либо по общему таймауту
if not stop_event.is_set():
await asyncio.wait_for(stop_event.wait(), timeout=timeout_seconds)
except asyncio.TimeoutError:
pass # Общий таймаут вышел
stop_event.set()
await sender_task
# Завершаем соединение и ждем последние результаты
dg_connection.finish()
return self.transcript
def listen(
self,
timeout_seconds: float = 7.0,
detection_timeout: float = None,
lang: str = "ru",
) -> str:
"""
Основной метод: слушает микрофон и возвращает текст.
Args:
timeout_seconds: Максимальная длительность фразы.
detection_timeout: Сколько ждать начала речи перед тем как сдаться.
lang: Язык ("ru" или "en").
"""
if not self.dg_client:
self.initialize()
self.current_lang = lang
print(f"🎙️ Слушаю ({lang})...")
last_error = None
# Делаем 2 попытки на случай сбоя сети
for attempt in range(2):
# Создаем новое live подключение для каждой сессии
dg_connection = self.dg_client.listen.live.v("1")
try:
# Запускаем асинхронный процесс обработки
transcript = asyncio.run(
self._process_audio(
dg_connection, timeout_seconds, detection_timeout
)
)
final_text = transcript.strip() if transcript else ""
if final_text:
print(f"📝 Распознано: {final_text}")
return final_text
else:
# Если вернулась пустая строка (тишина), считаем это штатным завершением.
# Не нужно повторять попытку, как при ошибке сети.
return ""
except Exception as e:
last_error = e
if attempt == 0:
print("⚠️ Не удалось подключиться к Deepgram, повторяю...")
time.sleep(1)
if last_error:
print(f"❌ Ошибка STT: {last_error}")
else:
print("⚠️ Речь не распознана")
return ""
def cleanup(self):
"""Очистка ресурсов."""
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.pa:
self.pa.terminate()
# Глобальный экземпляр
_recognizer = None
def get_recognizer() -> SpeechRecognizer:
global _recognizer
if _recognizer is None:
_recognizer = SpeechRecognizer()
return _recognizer
def listen(
timeout_seconds: float = 7.0, detection_timeout: float = None, lang: str = "ru"
) -> str:
"""Внешняя функция для прослушивания."""
return get_recognizer().listen(timeout_seconds, detection_timeout, lang)
def cleanup():
"""Внешняя функция очистки."""
global _recognizer
if _recognizer:
_recognizer.cleanup()
_recognizer = None

265
app/audio/tts.py Normal file
View File

@@ -0,0 +1,265 @@
"""
Text-to-Speech module using Silero TTS.
Generates natural Russian speech.
Supports interruption via wake word detection using threading.
"""
# Модуль синтеза речи (TTS - Text-to-Speech).
# Использует нейросеть Silero TTS для качественной русской речи.
# Также поддерживает прерывание речи, если пользователь скажет "Alexandr".
import torch
import sounddevice as sd
import numpy as np
import threading
import time
import warnings
import re
from ..core.config import TTS_SPEAKER, TTS_EN_SPEAKER, TTS_SAMPLE_RATE
# Подавляем предупреждения Silero о длинном тексте (мы сами его режем)
warnings.filterwarnings("ignore", message="Text string is longer than 1000 symbols")
class TextToSpeech:
"""Класс синтеза речи с поддержкой прерывания."""
def __init__(self):
self.model_ru = None
self.model_en = None
self.sample_rate = TTS_SAMPLE_RATE
self.speaker_ru = TTS_SPEAKER
self.speaker_en = TTS_EN_SPEAKER
self._interrupted = False
self._stop_flag = threading.Event()
def _load_model(self, language: str):
"""
Загрузка и кэширование модели Silero TTS.
Загружается один раз при первом обращении.
"""
device = torch.device("cpu") # Работаем на процессоре (достаточно быстро)
if language == "en":
if self.model_en:
return self.model_en
print("📦 Загрузка модели Silero TTS (en)...")
model, _ = torch.hub.load(
repo_or_dir="snakers4/silero-models",
model="silero_tts",
language="en",
speaker="v3_en",
)
model.to(device)
self.model_en = model
return model
# По умолчанию русский
if self.model_ru:
return self.model_ru
print("📦 Загрузка модели Silero TTS (ru)...")
model, _ = torch.hub.load(
repo_or_dir="snakers4/silero-models",
model="silero_tts",
language="ru",
speaker="v5_ru",
)
model.to(device)
self.model_ru = model
return model
def initialize(self):
"""Предварительная инициализация (прогрев) русской модели."""
self._load_model("ru")
def _split_text(self, text: str, max_length: int = 900) -> list[str]:
"""
Разбивает длинный текст на части (чанки), так как Silero не принимает >1000 символов.
Старается разбивать по предложениям (.!?).
"""
if len(text) <= max_length:
return [text]
chunks = []
# Разбиваем по знакам препинания, сохраняя их
parts = re.split(r"([.!?]+\s*)", text)
current_chunk = ""
for part in parts:
# Если добавление части превысит лимит, сохраняем текущий кусок
if len(current_chunk) + len(part) > max_length:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk += part
# Если даже одна часть огромная (нет знаков препинания), режем жестко по пробелам
while len(current_chunk) > max_length:
split_idx = current_chunk.rfind(" ", 0, max_length)
if split_idx == -1:
split_idx = max_length # Если нет пробелов, режем посередине слова
chunks.append(current_chunk[:split_idx].strip())
current_chunk = current_chunk[split_idx:].lstrip()
if current_chunk:
chunks.append(current_chunk.strip())
return [c for c in chunks if c]
def speak(self, text: str, check_interrupt=None, language: str = "ru") -> bool:
"""
Основная функция: генерирует аудио и воспроизводит его.
Args:
text: Текст для озвучки.
check_interrupt: Функция, возвращающая True, если надо прерваться (например, check_wakeword_once).
language: "ru" или "en".
Returns:
True, если договорил до конца.
False, если был прерван.
"""
if not text.strip():
return True
# Выбор модели
if language == "en":
model = self._load_model("en")
speaker = self.speaker_en
else:
model = self._load_model("ru")
speaker = self.speaker_ru
# Проверка наличия спикера в модели (защита от ошибок конфига)
if hasattr(model, "speakers") and speaker not in model.speakers:
if model.speakers:
speaker = model.speakers[0]
# Разбиваем текст на куски
chunks = self._split_text(text)
total_chunks = len(chunks)
if total_chunks > 1:
print(f"🔊 Озвучивание (частей: {total_chunks}): {text[:50]}...")
else:
print(f"🔊 Озвучивание: {text[:50]}...")
self._interrupted = False
self._stop_flag.clear()
success = True
for i, chunk in enumerate(chunks):
if self._interrupted:
break
try:
# Генерация аудио (тензор)
audio = model.apply_tts(
text=chunk, speaker=speaker, sample_rate=self.sample_rate
)
# Конвертация в numpy массив для sounddevice
audio_np = audio.numpy()
if check_interrupt:
# Воспроизведение с проверкой прерывания (сложная логика)
if not self._play_with_interrupt(audio_np, check_interrupt):
success = False
break
else:
# Обычное воспроизведение (блокирующее)
sd.play(audio_np, self.sample_rate)
sd.wait()
except Exception as e:
print(f"❌ Ошибка TTS (часть {i + 1}/{total_chunks}): {e}")
success = False
if success and not self._interrupted:
print("✅ Воспроизведение завершено")
return True
elif self._interrupted:
return False
else:
return False
def _check_interrupt_worker(self, check_interrupt):
"""
Фоновая функция для потока: постоянно опрашивает check_interrupt.
Если вернуло True -> останавливаем звук.
"""
while not self._stop_flag.is_set():
try:
if check_interrupt():
self._interrupted = True
sd.stop() # Немедленная остановка звука
print("⏹️ Воспроизведение прервано!")
return
except Exception:
pass
def _play_with_interrupt(self, audio_np: np.ndarray, check_interrupt) -> bool:
"""
Воспроизводит аудио, параллельно проверяя условие прерывания в отдельном потоке.
"""
# Запускаем поток-наблюдатель
checker_thread = threading.Thread(
target=self._check_interrupt_worker, args=(check_interrupt,), daemon=True
)
checker_thread.start()
try:
# Запускаем воспроизведение (неблокирующее)
sd.play(audio_np, self.sample_rate)
# Ждем окончания воспроизведения в цикле
while sd.get_stream().active:
if self._interrupted:
break
time.sleep(0.05)
finally:
# Сообщаем потоку-наблюдателю, что пора завершаться
self._stop_flag.set()
checker_thread.join(timeout=0.5)
if self._interrupted:
return False
return True
@property
def was_interrupted(self) -> bool:
"""Был ли прерван последний вызов speak."""
return self._interrupted
# Глобальный экземпляр TTS
_tts = None
def get_tts() -> TextToSpeech:
"""Получить или создать экземпляр TTS."""
global _tts
if _tts is None:
_tts = TextToSpeech()
return _tts
def speak(text: str, check_interrupt=None, language: str = "ru") -> bool:
"""Внешняя функция для озвучивания."""
return get_tts().speak(text, check_interrupt, language)
def was_interrupted() -> bool:
"""Проверка флага прерывания."""
return get_tts().was_interrupted
def initialize():
"""Предварительная загрузка моделей."""
get_tts().initialize()

180
app/audio/wakeword.py Normal file
View File

@@ -0,0 +1,180 @@
"""
Wake word detection module using Porcupine.
Listens for the "Alexandr" wake word.
"""
# Этот модуль отвечает за "уши" ассистента в режиме ожидания.
# Он использует библиотеку Porcupine для эффективного (мало CPU) обнаружения ключевой фразы "Alexandr".
import pvporcupine
import pyaudio
import struct
from ..core.config import PORCUPINE_ACCESS_KEY, PORCUPINE_KEYWORD_PATH
class WakeWordDetector:
"""Класс для обнаружения wake word с использованием Porcupine."""
def __init__(self):
self.porcupine = None
self.audio_stream = None
self.pa = None
self._stream_closed = True # Флаг состояния потока (закрыт/открыт)
def initialize(self):
"""Инициализация Porcupine и PyAudio."""
# Создаем экземпляр Porcupine с нашим ключом доступа и файлом модели (.ppn)
self.porcupine = pvporcupine.create(
access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[str(PORCUPINE_KEYWORD_PATH)]
)
self.pa = pyaudio.PyAudio()
self._open_stream()
print("🎤 Ожидание wake word 'Alexandr'...")
def _open_stream(self):
"""Открытие аудиопотока с микрофона."""
if self.audio_stream and not self._stream_closed:
return # Уже открыт
# Если был открыт старый поток, пробуем закрыть
if self.audio_stream:
try:
self.audio_stream.close()
except:
pass
# Открываем поток с параметрами, которые требует Porcupine
self.audio_stream = self.pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length,
)
self._stream_closed = False
def stop_monitoring(self):
"""Явная остановка и закрытие потока (чтобы освободить микрофон для других задач)."""
if self.audio_stream and not self._stream_closed:
try:
self.audio_stream.stop_stream()
self.audio_stream.close()
except:
pass
self._stream_closed = True
def wait_for_wakeword(self, timeout: float = None) -> bool:
"""
Блокирующая функция: ждет, пока не будет услышана фраза "Alexandr"
или пока не истечет timeout.
Args:
timeout: Максимальное время ожидания в секундах. None = ждать бесконечно.
Returns:
True, если фраза обнаружена. False, если вышел таймаут.
"""
import time
if not self.porcupine:
self.initialize()
# Убеждаемся, что поток открыт
self._open_stream()
start_time = time.time()
while True:
# Проверка таймаута
if timeout and (time.time() - start_time > timeout):
return False
# Читаем небольшой кусочек аудио (frame)
pcm = self.audio_stream.read(
self.porcupine.frame_length, exception_on_overflow=False
)
# Конвертируем байты в кортеж чисел (требование Porcupine)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
# Обрабатываем фрейм через Porcupine
keyword_index = self.porcupine.process(pcm)
# Если keyword_index >= 0, значит ключевое слово обнаружено
if keyword_index >= 0:
print("✅ Wake word обнаружен!")
# Важно: закрываем поток, чтобы освободить микрофон для STT (Deepgram)
self.stop_monitoring()
return True
def check_wakeword_once(self) -> bool:
"""
Неблокирующая проверка (один кадр).
Используется во время того, как ассистент говорит (TTS),
чтобы проверить, не пытается ли пользователь его перебить.
Returns:
True, если фраза обнаружена прямо сейчас.
"""
if not self.porcupine:
self.initialize()
try:
self._open_stream()
pcm = self.audio_stream.read(
self.porcupine.frame_length, exception_on_overflow=False
)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
print("🛑 Wake word обнаружен во время ответа!")
return True
return False
except Exception:
return False
def cleanup(self):
"""Освобождение ресурсов при выходе."""
self.stop_monitoring()
if self.pa:
self.pa.terminate()
if self.porcupine:
self.porcupine.delete()
# Глобальный экземпляр детектора (Singleton)
_detector = None
def get_detector() -> WakeWordDetector:
"""Получить или создать глобальный экземпляр детектора."""
global _detector
if _detector is None:
_detector = WakeWordDetector()
return _detector
def wait_for_wakeword(timeout: float = None) -> bool:
"""Внешняя функция для ожидания wake word."""
return get_detector().wait_for_wakeword(timeout)
def stop_monitoring():
"""Внешняя функция для остановки мониторинга."""
if _detector:
_detector.stop_monitoring()
def cleanup():
"""Внешняя функция очистки ресурсов."""
global _detector
if _detector:
_detector.cleanup()
_detector = None
def check_wakeword_once() -> bool:
"""Внешняя функция для быстрой проверки."""
return get_detector().check_wakeword_once()