""" Speech-to-Text module using Deepgram API. Recognizes speech from microphone using streaming WebSocket. Supports Russian (default) and English. """ # Модуль распознавания речи (STT - Speech-to-Text). # Использует Deepgram API через веб-сокеты для потокового распознавания в реальном времени. import asyncio import time import pyaudio import logging from datetime import datetime, timedelta from ..core.config import DEEPGRAM_API_KEY, SAMPLE_RATE from deepgram import ( DeepgramClient, DeepgramClientOptions, LiveTranscriptionEvents, LiveOptions, ) import deepgram.clients.common.v1.abstract_sync_websocket as sdk_ws import websockets.sync.client from ..core.audio_manager import get_audio_manager # --- Патч (исправление) для библиотеки websockets --- # По умолчанию Deepgram SDK использует слишком короткий таймаут подключения. # Это часто вызывает ошибки при медленном SSL рукопожатии. # Мы подменяем функцию connect, чтобы увеличить таймаут до 30 секунд. _original_connect = websockets.sync.client.connect def _patched_connect(*args, **kwargs): kwargs.setdefault("open_timeout", 30) kwargs.setdefault("ping_timeout", 30) kwargs.setdefault("close_timeout", 30) print(f"DEBUG: Connecting to Deepgram with timeout={kwargs.get('open_timeout')}s") return _original_connect(*args, **kwargs) # Применяем патч sdk_ws.connect = _patched_connect # Отключаем лишний мусор в логах logging.getLogger("deepgram").setLevel(logging.WARNING) class SpeechRecognizer: """Класс распознавания речи через Deepgram.""" def __init__(self): self.dg_client = None self.pa = None self.stream = None self.transcript = "" self.last_successful_operation = datetime.now() def initialize(self): """Инициализация клиента Deepgram и PyAudio.""" if not DEEPGRAM_API_KEY: raise ValueError("DEEPGRAM_API_KEY is not set in environment or config.") print("📦 Инициализация Deepgram STT...") config = DeepgramClientOptions( verbose=logging.WARNING, ) try: self.dg_client = DeepgramClient(DEEPGRAM_API_KEY, config) except Exception as e: print(f"❌ Ошибка при создании клиента Deepgram: {e}") raise self.pa = get_audio_manager().get_pyaudio() print("✅ Deepgram клиент готов") # Обновляем время последней успешной операции self.last_successful_operation = datetime.now() def check_connection_health(self): """Проверяет здоровье соединения и при необходимости пересоздает клиента.""" # Проверяем, прошло ли больше 15 минут с последней успешной операции if datetime.now() - self.last_successful_operation > timedelta(minutes=15): print("🔄 Обновление соединения Deepgram для предотвращения таймаута...") try: # Очищаем старый клиент if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # Создаем новый клиент self.dg_client = None self.initialize() print("✅ Соединение Deepgram обновлено") except Exception as e: print(f"⚠️ Ошибка при обновлении соединения: {e}") def _get_stream(self): """Открывает аудиопоток PyAudio, если он еще не открыт.""" if self.stream is None: self.stream = self.pa.open( rate=SAMPLE_RATE, channels=1, format=pyaudio.paInt16, input=True, frames_per_buffer=4096, ) return self.stream async def _process_audio(self, dg_connection, timeout_seconds, detection_timeout): """ Асинхронная функция для отправки аудио и получения текста. Args: dg_connection: Активное соединение с Deepgram. timeout_seconds: Общее время прослушивания. detection_timeout: Время ожидания начала речи. """ self.transcript = "" transcript_parts = [] loop = asyncio.get_running_loop() stream = self._get_stream() # События для синхронизации stop_event = asyncio.Event() # Пора останавливаться speech_started_event = asyncio.Event() # Речь обнаружена (VAD) # --- Обработчики событий Deepgram --- def on_transcript(unused_self, result, **kwargs): """Вызывается, когда приходит часть текста.""" sentence = result.channel.alternatives[0].transcript if len(sentence) == 0: return if result.is_final: # Собираем только финальные (подтвержденные) фразы transcript_parts.append(sentence) self.transcript = " ".join(transcript_parts).strip() def on_speech_started(unused_self, speech_started, **kwargs): """Вызывается, когда VAD (Voice Activity Detection) слышит голос.""" try: loop.call_soon_threadsafe(speech_started_event.set) except RuntimeError: # Event loop might be closed, ignore pass def on_utterance_end(unused_self, utterance_end, **kwargs): """Вызывается, когда Deepgram решает, что фраза закончилась (пауза).""" try: loop.call_soon_threadsafe(stop_event.set) except RuntimeError: # Event loop might be closed, ignore pass def on_error(unused_self, error, **kwargs): print(f"Deepgram Error: {error}") try: loop.call_soon_threadsafe(stop_event.set) except RuntimeError: # Event loop might be closed, ignore pass # Подписываемся на события dg_connection.on(LiveTranscriptionEvents.Transcript, on_transcript) dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started) dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) dg_connection.on(LiveTranscriptionEvents.Error, on_error) # Параметры распознавания options = LiveOptions( model="nova-2", # Самая быстрая и точная модель language=self.current_lang, smart_format=True, # Расстановка знаков препинания encoding="linear16", channels=1, sample_rate=SAMPLE_RATE, interim_results=True, utterance_end_ms=1000, # Пауза 1.0с считается концом фразы (было 1.2) vad_events=True, # Добавляем параметры таймаута для долгой работы endpointing=300, # Таймаут в миллисекундах для автоматического завершения ) # --- Задача отправки аудио с буферизацией --- async def send_audio(): chunks_sent = 0 audio_buffer = [] # Буфер для накопления звука во время подключения try: # 1. Сразу начинаем захват звука, не дожидаясь сети! stream.start_stream() print("🎤 Stream started (buffering)...") # 2. Запускаем подключение к Deepgram в фоне (через ThreadPool, т.к. start() блокирующий) # Но в данном SDK start() возвращает bool, он может быть блокирующим. # Deepgram Python SDK v3+ start() делает handshake. connect_future = loop.run_in_executor( None, lambda: dg_connection.start(options) ) # Пока подключаемся, копим данные timeout_count = 0 max_timeout = 5000 # Максимальное количество итераций ожидания (около 2.5 секунд при 0.0005 задержке) while not connect_future.done() and timeout_count < max_timeout: if stream.is_active(): data = stream.read(4096, exception_on_overflow=False) audio_buffer.append(data) await asyncio.sleep(0.0005) # Уменьшаем задержку для более быстрой обработки timeout_count += 1 if timeout_count >= max_timeout: print("⏰ Timeout connecting to Deepgram") return # Проверяем результат подключения if connect_future.result() is False: print("Failed to start Deepgram connection") return print(f"🚀 Connected! Sending buffer ({len(audio_buffer)} chunks)...") # 3. Отправляем накопленный буфер for chunk in audio_buffer: dg_connection.send(chunk) chunks_sent += 1 audio_buffer = None # Освобождаем память # 4. Продолжаем стримить в реальном времени stream_timeout = 0 max_stream_timeout = int(timeout_seconds / 0.002) # Примерный таймаут в зависимости от timeout_seconds while not stop_event.is_set() and stream_timeout < max_stream_timeout: if stream.is_active(): data = stream.read(4096, exception_on_overflow=False) dg_connection.send(data) chunks_sent += 1 if chunks_sent % 50 == 0: print(".", end="", flush=True) await asyncio.sleep(0.002) # Уменьшаем задержку для более быстрого реагирования stream_timeout += 1 except Exception as e: print(f"Audio send error: {e}") finally: if stream.is_active(): stream.stop_stream() print(f"\n🛑 Stream stopped. Chunks sent: {chunks_sent}") sender_task = asyncio.create_task(send_audio()) if False: # dg_connection.start(options) перенесен внутрь send_audio pass try: # 1. Ждем начала речи (если задан detection_timeout) if detection_timeout: try: await asyncio.wait_for( speech_started_event.wait(), timeout=detection_timeout ) except asyncio.TimeoutError: # Если за detection_timeout никто не начал говорить, выходим stop_event.set() # 2. Если речь началась (или таймаута нет), ждем завершения (stop_event) # stop_event сработает либо по UtteranceEnd (пауза), либо по общему таймауту if not stop_event.is_set(): await asyncio.wait_for(stop_event.wait(), timeout=timeout_seconds) except asyncio.TimeoutError: pass # Общий таймаут вышел except Exception as e: print(f"Error in waiting for events: {e}") stop_event.set() try: await sender_task except Exception as e: print(f"Error waiting for sender task: {e}") # Завершаем соединение и ждем последние результаты try: dg_connection.finish() except Exception as e: print(f"Error finishing connection: {e}") return self.transcript def listen( self, timeout_seconds: float = 7.0, detection_timeout: float = None, lang: str = "ru", ) -> str: """ Основной метод: слушает микрофон и возвращает текст. Args: timeout_seconds: Максимальная длительность фразы. detection_timeout: Сколько ждать начала речи перед тем как сдаться. lang: Язык ("ru" или "en"). """ if not self.dg_client: self.initialize() # Проверяем здоровье соединения перед началом прослушивания self.check_connection_health() self.current_lang = lang print(f"🎙️ Слушаю ({lang})...") last_error = None # Делаем 3 попытки на случай сбоя сети for attempt in range(3): dg_connection = None try: # Создаем новое live подключение для каждой сессии dg_connection = self.dg_client.listen.live.v("1") # Запускаем асинхронный процесс обработки transcript = asyncio.run( self._process_audio( dg_connection, timeout_seconds, detection_timeout ) ) final_text = transcript.strip() if transcript else "" if final_text: print(f"📝 Распознано: {final_text}") # Обновляем время последней успешной операции self.last_successful_operation = datetime.now() return final_text else: # Если вернулась пустая строка (тишина), считаем это штатным завершением. # Не нужно повторять попытку, как при ошибке сети. # Все равно обновляем время последней успешной операции self.last_successful_operation = datetime.now() return "" except Exception as e: last_error = e print(f"Attempt {attempt + 1} failed: {e}") # Закрываем соединение, если оно было создано if dg_connection: try: dg_connection.finish() except: pass # Игнорируем ошибки при завершении if attempt < 2: # Не ждем после последней попытки print(f"⚠️ Не удалось подключиться к Deepgram, попытка {attempt + 1}/3, повторяю...") time.sleep(1) # Уменьшаем задержку между попытками if last_error: print(f"❌ Ошибка STT после всех попыток: {last_error}") else: print("⚠️ Речь не распознана") return "" def cleanup(self): """Очистка ресурсов.""" if self.stream: try: if self.stream.is_active(): self.stream.stop_stream() except Exception as e: print(f"Ошибка при остановке потока: {e}") try: self.stream.close() except Exception as e: print(f"Ошибка при закрытии потока: {e}") self.stream = None # self.pa.terminate() - Используем общий менеджер # Сбросим клиента для принудительного переподключения self.dg_client = None # Глобальный экземпляр _recognizer = None def get_recognizer() -> SpeechRecognizer: global _recognizer if _recognizer is None: _recognizer = SpeechRecognizer() return _recognizer def listen( timeout_seconds: float = 7.0, detection_timeout: float = None, lang: str = "ru" ) -> str: """Внешняя функция для прослушивания.""" return get_recognizer().listen(timeout_seconds, detection_timeout, lang) def cleanup(): """Внешняя функция очистки.""" global _recognizer if _recognizer: try: _recognizer.cleanup() except Exception as e: print(f"Ошибка при очистке STT: {e}") _recognizer = None