Files
smart-speaker/app/audio/stt.py

516 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Speech-to-Text module using Deepgram API.
Recognizes speech from microphone using streaming WebSocket.
Supports Russian (default) and English.
"""
# Модуль распознавания речи (STT - Speech-to-Text).
# Использует Deepgram API через веб-сокеты для потокового распознавания в реальном времени.
import asyncio
import re
import time
import pyaudio
import logging
from datetime import datetime, timedelta
from ..core.config import DEEPGRAM_API_KEY, SAMPLE_RATE
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
)
import deepgram.clients.common.v1.abstract_sync_websocket as sdk_ws
import websockets.sync.client
from ..core.audio_manager import get_audio_manager
# --- Патч (исправление) для библиотеки websockets ---
# Явно задаём таймауты подключения, чтобы не зависать на долгом handshake.
_original_connect = websockets.sync.client.connect
DEEPGRAM_CONNECT_TIMEOUT_SECONDS = 3.0
DEEPGRAM_CONNECT_WAIT_SECONDS = 1.5
DEEPGRAM_CONNECT_POLL_SECONDS = 0.001
def _patched_connect(*args, **kwargs):
# Принудительно задаём короткие таймауты, даже если SDK передал свои (например, 30с).
kwargs["open_timeout"] = DEEPGRAM_CONNECT_TIMEOUT_SECONDS
kwargs["ping_timeout"] = DEEPGRAM_CONNECT_TIMEOUT_SECONDS
kwargs["close_timeout"] = DEEPGRAM_CONNECT_TIMEOUT_SECONDS
print(f"DEBUG: Connecting to Deepgram with timeout={kwargs.get('open_timeout')}s")
return _original_connect(*args, **kwargs)
# Применяем патч
sdk_ws.connect = _patched_connect
# Отключаем лишний мусор в логах
logging.getLogger("deepgram").setLevel(logging.WARNING)
# Базовые пороги для остановки STT
INITIAL_SILENCE_TIMEOUT_SECONDS = 5.0
POST_SPEECH_SILENCE_TIMEOUT_SECONDS = 3.5
# Длинный защитный предел, чтобы не обрывать обычную длинную фразу.
# Фактическое завершение происходит примерно после 3.5 сек тишины после речи.
MAX_ACTIVE_SPEECH_SECONDS = 300.0
_FAST_STOP_UTTERANCE_RE = re.compile(
r"^(?:(?:александр|алесандр|alexander|alexandr)\s+)?"
r"(?:стоп|хватит|перестань|прекрати|замолчи|тихо|пауза)"
r"(?:\s+(?:пожалуйста|please))?$",
flags=re.IGNORECASE,
)
def _normalize_command_text(text: str) -> str:
normalized = text.lower().replace("ё", "е")
normalized = re.sub(r"[^\w\s]+", " ", normalized, flags=re.UNICODE)
normalized = re.sub(r"\s+", " ", normalized, flags=re.UNICODE).strip()
return normalized
def _is_fast_stop_utterance(text: str) -> bool:
normalized = _normalize_command_text(text)
if not normalized:
return False
return _FAST_STOP_UTTERANCE_RE.fullmatch(normalized) is not None
class SpeechRecognizer:
"""Класс распознавания речи через Deepgram."""
def __init__(self):
self.dg_client = None
self.pa = None
self.stream = None
self.transcript = ""
self.last_successful_operation = datetime.now()
def initialize(self):
"""Инициализация клиента Deepgram и PyAudio."""
if not DEEPGRAM_API_KEY:
raise ValueError("DEEPGRAM_API_KEY is not set in environment or config.")
print("📦 Инициализация Deepgram STT...")
config = DeepgramClientOptions(
verbose=logging.WARNING,
)
try:
self.dg_client = DeepgramClient(DEEPGRAM_API_KEY, config)
except Exception as e:
print(f"❌ Ошибка при создании клиента Deepgram: {e}")
raise
self.pa = get_audio_manager().get_pyaudio()
print("✅ Deepgram клиент готов")
# Обновляем время последней успешной операции
self.last_successful_operation = datetime.now()
def check_connection_health(self):
"""Проверяет здоровье соединения и при необходимости пересоздает клиента."""
# Проверяем, прошло ли больше 15 минут с последней успешной операции
if datetime.now() - self.last_successful_operation > timedelta(minutes=15):
print("🔄 Обновление соединения Deepgram для предотвращения таймаута...")
try:
# Очищаем старый клиент
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
# Создаем новый клиент
self.dg_client = None
self.initialize()
print("✅ Соединение Deepgram обновлено")
except Exception as e:
print(f"⚠️ Ошибка при обновлении соединения: {e}")
def _get_stream(self):
"""Открывает аудиопоток PyAudio, если он еще не открыт."""
if self.stream is None:
self.stream = self.pa.open(
rate=SAMPLE_RATE,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=4096,
)
return self.stream
async def _process_audio(
self, dg_connection, timeout_seconds, detection_timeout, fast_stop
):
"""
Асинхронная функция для отправки аудио и получения текста.
Args:
dg_connection: Активное соединение с Deepgram.
timeout_seconds: Аварийный лимит длительности активной речи.
detection_timeout: Время ожидания начала речи.
fast_stop: Если True, короткая стоп-фраза завершает STT после 1с тишины.
"""
self.transcript = ""
transcript_parts = []
loop = asyncio.get_running_loop()
stream = self._get_stream()
effective_detection_timeout = (
detection_timeout
if detection_timeout is not None
else INITIAL_SILENCE_TIMEOUT_SECONDS
)
# События для синхронизации
stop_event = asyncio.Event() # Пора останавливаться
speech_started_event = asyncio.Event() # Речь обнаружена (VAD)
last_speech_activity = time.monotonic()
first_speech_activity_at = None
def mark_speech_activity():
nonlocal last_speech_activity, first_speech_activity_at
now = time.monotonic()
last_speech_activity = now
if first_speech_activity_at is None:
first_speech_activity_at = now
speech_started_event.set()
# --- Обработчики событий Deepgram ---
def on_transcript(unused_self, result, **kwargs):
"""Вызывается, когда приходит часть текста."""
sentence = result.channel.alternatives[0].transcript
if len(sentence) == 0:
return
try:
loop.call_soon_threadsafe(mark_speech_activity)
except RuntimeError:
pass
if fast_stop:
if _is_fast_stop_utterance(sentence):
self.transcript = sentence.strip()
try:
loop.call_soon_threadsafe(stop_event.set)
except RuntimeError:
pass
return
if result.is_final:
# Собираем только финальные (подтвержденные) фразы
transcript_parts.append(sentence)
self.transcript = " ".join(transcript_parts).strip()
def on_speech_started(unused_self, speech_started, **kwargs):
"""Вызывается, когда VAD (Voice Activity Detection) слышит голос."""
try:
loop.call_soon_threadsafe(mark_speech_activity)
except RuntimeError:
# Event loop might be closed, ignore
pass
def on_utterance_end(unused_self, utterance_end, **kwargs):
"""Вызывается, когда Deepgram решает, что фраза закончилась (пауза)."""
# Не останавливаемся мгновенно на событии Deepgram.
# Остановка управляется локальным порогом тишины POST_SPEECH_SILENCE_TIMEOUT_SECONDS.
return
def on_error(unused_self, error, **kwargs):
print(f"Deepgram Error: {error}")
try:
loop.call_soon_threadsafe(stop_event.set)
except RuntimeError:
# Event loop might be closed, ignore
pass
# Подписываемся на события
dg_connection.on(LiveTranscriptionEvents.Transcript, on_transcript)
dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started)
dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end)
dg_connection.on(LiveTranscriptionEvents.Error, on_error)
# Параметры распознавания
options = LiveOptions(
model="nova-2", # Самая быстрая и точная модель
language=self.current_lang,
smart_format=True, # Расстановка знаков препинания
encoding="linear16",
channels=1,
sample_rate=SAMPLE_RATE,
interim_results=True,
utterance_end_ms=int(POST_SPEECH_SILENCE_TIMEOUT_SECONDS * 1000),
vad_events=True,
# Сглаженный порог endpointing, чтобы не резать речь на коротких паузах.
endpointing=int(POST_SPEECH_SILENCE_TIMEOUT_SECONDS * 1000),
)
# --- Задача отправки аудио с буферизацией ---
async def send_audio():
chunks_sent = 0
audio_buffer = [] # Буфер для накопления звука во время подключения
try:
# 1. Сразу начинаем захват звука, не дожидаясь сети!
stream.start_stream()
print("🎤 Stream started (buffering)...")
# 2. Запускаем подключение к Deepgram в фоне (через ThreadPool, т.к. start() блокирующий)
# Но в данном SDK start() возвращает bool, он может быть блокирующим.
# Deepgram Python SDK v3+ start() делает handshake.
connect_future = loop.run_in_executor(
None, lambda: dg_connection.start(options)
)
# Пока подключаемся, копим данные.
# Ждём коротко: если сеть подвисла, быстрее перезапускаем попытку.
connect_deadline = time.monotonic() + DEEPGRAM_CONNECT_WAIT_SECONDS
while (
not connect_future.done()
and time.monotonic() < connect_deadline
):
if stream.is_active():
data = stream.read(4096, exception_on_overflow=False)
audio_buffer.append(data)
await asyncio.sleep(DEEPGRAM_CONNECT_POLL_SECONDS)
if not connect_future.done():
print(
f"⏰ Timeout connecting to Deepgram ({DEEPGRAM_CONNECT_WAIT_SECONDS:.1f}s)"
)
stop_event.set()
return
# Проверяем результат подключения
if connect_future.result() is False:
print("Failed to start Deepgram connection")
stop_event.set()
return
print(f"🚀 Connected! Sending buffer ({len(audio_buffer)} chunks)...")
# 3. Отправляем накопленный буфер
for chunk in audio_buffer:
dg_connection.send(chunk)
chunks_sent += 1
audio_buffer = None # Освобождаем память
# 4. Продолжаем стримить в реальном времени до события остановки.
while not stop_event.is_set():
if stream.is_active():
data = stream.read(4096, exception_on_overflow=False)
dg_connection.send(data)
chunks_sent += 1
if chunks_sent % 50 == 0:
print(".", end="", flush=True)
await asyncio.sleep(0.002) # Уменьшаем задержку для более быстрого реагирования
except Exception as e:
print(f"Audio send error: {e}")
finally:
if stream.is_active():
stream.stop_stream()
print(f"\n🛑 Stream stopped. Chunks sent: {chunks_sent}")
sender_task = asyncio.create_task(send_audio())
if False: # dg_connection.start(options) перенесен внутрь send_audio
pass
try:
# 1. Ждем начала речи (если задан detection_timeout)
if (
effective_detection_timeout
and effective_detection_timeout > 0
and not stop_event.is_set()
):
speech_wait_task = asyncio.create_task(speech_started_event.wait())
stop_wait_task = asyncio.create_task(stop_event.wait())
try:
done, pending = await asyncio.wait(
{speech_wait_task, stop_wait_task},
timeout=effective_detection_timeout,
return_when=asyncio.FIRST_COMPLETED,
)
finally:
for task in (speech_wait_task, stop_wait_task):
if not task.done():
task.cancel()
await asyncio.gather(
speech_wait_task, stop_wait_task, return_exceptions=True
)
if not done:
# Если за detection_timeout никто не начал говорить, выходим
stop_event.set()
# 2. После старта речи завершаем только по тишине POST_SPEECH_SILENCE_TIMEOUT_SECONDS.
# Добавляем длинный защитный лимит, чтобы сессия не зависла навсегда.
if not stop_event.is_set():
max_active_speech_seconds = max(
timeout_seconds if timeout_seconds else 0.0,
MAX_ACTIVE_SPEECH_SECONDS,
)
while not stop_event.is_set():
now = time.monotonic()
if speech_started_event.is_set():
if (
now - last_speech_activity
>= POST_SPEECH_SILENCE_TIMEOUT_SECONDS
):
stop_event.set()
break
if (
first_speech_activity_at is not None
and now - first_speech_activity_at
>= max_active_speech_seconds
):
print("⏱️ Достигнут защитный лимит активного прослушивания.")
stop_event.set()
break
await asyncio.sleep(0.05)
except asyncio.TimeoutError:
pass # Общий таймаут вышел
except Exception as e:
print(f"Error in waiting for events: {e}")
stop_event.set()
try:
await sender_task
except Exception as e:
print(f"Error waiting for sender task: {e}")
# Завершаем соединение и ждем последние результаты
try:
dg_connection.finish()
except Exception as e:
print(f"Error finishing connection: {e}")
return self.transcript
def listen(
self,
timeout_seconds: float = 7.0,
detection_timeout: float = INITIAL_SILENCE_TIMEOUT_SECONDS,
lang: str = "ru",
fast_stop: bool = False,
) -> str:
"""
Основной метод: слушает микрофон и возвращает текст.
Args:
timeout_seconds: Защитный лимит длительности активной речи.
detection_timeout: Сколько ждать начала речи перед тем как сдаться.
lang: Язык ("ru" или "en").
fast_stop: Быстрое завершение для коротких stop-команд.
"""
if not self.dg_client:
self.initialize()
# Проверяем здоровье соединения перед началом прослушивания
self.check_connection_health()
self.current_lang = lang
print(f"🎙️ Слушаю ({lang})...")
last_error = None
# Делаем 3 попытки на случай сбоя сети
for attempt in range(3):
dg_connection = None
try:
# Создаем новое live подключение для каждой сессии
dg_connection = self.dg_client.listen.live.v("1")
# Запускаем асинхронный процесс обработки
transcript = asyncio.run(
self._process_audio(
dg_connection, timeout_seconds, detection_timeout, fast_stop
)
)
final_text = transcript.strip() if transcript else ""
if final_text:
print(f"📝 Распознано: {final_text}")
# Обновляем время последней успешной операции
self.last_successful_operation = datetime.now()
return final_text
else:
# Если вернулась пустая строка (тишина), считаем это штатным завершением.
# Не нужно повторять попытку, как при ошибке сети.
# Все равно обновляем время последней успешной операции
self.last_successful_operation = datetime.now()
return ""
except Exception as e:
last_error = e
print(f"Attempt {attempt + 1} failed: {e}")
# Закрываем соединение, если оно было создано
if dg_connection:
try:
dg_connection.finish()
except:
pass # Игнорируем ошибки при завершении
if attempt < 2: # Не ждем после последней попытки
print(f"⚠️ Не удалось подключиться к Deepgram, попытка {attempt + 1}/3, повторяю...")
time.sleep(1) # Уменьшаем задержку между попытками
if last_error:
print(f"❌ Ошибка STT после всех попыток: {last_error}")
else:
print("⚠️ Речь не распознана")
return ""
def cleanup(self):
"""Очистка ресурсов."""
if self.stream:
try:
if self.stream.is_active():
self.stream.stop_stream()
except Exception as e:
print(f"Ошибка при остановке потока: {e}")
try:
self.stream.close()
except Exception as e:
print(f"Ошибка при закрытии потока: {e}")
self.stream = None
# self.pa.terminate() - Используем общий менеджер
# Сбросим клиента для принудительного переподключения
self.dg_client = None
# Глобальный экземпляр
_recognizer = None
def get_recognizer() -> SpeechRecognizer:
global _recognizer
if _recognizer is None:
_recognizer = SpeechRecognizer()
return _recognizer
def listen(
timeout_seconds: float = 7.0,
detection_timeout: float = INITIAL_SILENCE_TIMEOUT_SECONDS,
lang: str = "ru",
fast_stop: bool = False,
) -> str:
"""Внешняя функция для прослушивания."""
return get_recognizer().listen(timeout_seconds, detection_timeout, lang, fast_stop)
def cleanup():
"""Внешняя функция очистки."""
global _recognizer
if _recognizer:
try:
_recognizer.cleanup()
except Exception as e:
print(f"Ошибка при очистке STT: {e}")
_recognizer = None