""" Speech-to-Text module using Deepgram API. Recognizes speech from microphone using streaming WebSocket. Supports Russian (default) and English. """ # Модуль распознавания речи (STT - Speech-to-Text). # Использует Deepgram API через веб-сокеты для потокового распознавания в реальном времени. import asyncio import re import time import pyaudio import logging import contextlib import threading from datetime import datetime, timedelta from ..core.config import DEEPGRAM_API_KEY, SAMPLE_RATE, WAKE_WORD_ALIASES from deepgram import ( DeepgramClient, DeepgramClientOptions, LiveTranscriptionEvents, LiveOptions, ) import deepgram.clients.common.v1.abstract_sync_websocket as sdk_ws import websockets.sync.client from ..core.audio_manager import get_audio_manager # --- Патч (исправление) для библиотеки websockets --- # Явно задаём таймауты подключения, чтобы не зависать на долгом handshake. _original_connect = websockets.sync.client.connect DEEPGRAM_CONNECT_TIMEOUT_SECONDS = 3.0 DEEPGRAM_CONNECT_WAIT_SECONDS = 4.0 DEEPGRAM_CONNECT_POLL_SECONDS = 0.001 SENDER_STOP_WAIT_SECONDS = 2.5 SENDER_FORCE_RELEASE_WAIT_SECONDS = 2.5 DEEPGRAM_FINALIZATION_GRACE_SECONDS = 0.35 DEEPGRAM_FINISH_TIMEOUT_SECONDS = 4.0 def _patched_connect(*args, **kwargs): # Принудительно задаём короткие таймауты, даже если SDK передал свои (например, 30с). kwargs["open_timeout"] = DEEPGRAM_CONNECT_TIMEOUT_SECONDS kwargs["ping_timeout"] = DEEPGRAM_CONNECT_TIMEOUT_SECONDS kwargs["close_timeout"] = DEEPGRAM_CONNECT_TIMEOUT_SECONDS print(f"DEBUG: Connecting to Deepgram with timeout={kwargs.get('open_timeout')}s") return _original_connect(*args, **kwargs) # Применяем патч sdk_ws.connect = _patched_connect # Отключаем лишний мусор в логах logging.getLogger("deepgram").setLevel(logging.WARNING) # Базовые пороги для остановки STT INITIAL_SILENCE_TIMEOUT_SECONDS = 5.0 POST_SPEECH_SILENCE_TIMEOUT_SECONDS = 2.0 # Длинный защитный предел, чтобы не обрывать обычную длинную фразу. # Фактическое завершение происходит примерно после 2.0 сек тишины после речи. MAX_ACTIVE_SPEECH_SECONDS = 300.0 _FAST_STOP_UTTERANCE_RE = re.compile( r"^(?:(?:" + "|".join(re.escape(alias) for alias in WAKE_WORD_ALIASES) + r")\s+)?" r"(?:стоп|хватит|перестань|прекрати|замолчи|тихо|пауза)" r"(?:\s+(?:пожалуйста|please))?$", flags=re.IGNORECASE, ) def _normalize_command_text(text: str) -> str: normalized = text.lower().replace("ё", "е") normalized = re.sub(r"[^\w\s]+", " ", normalized, flags=re.UNICODE) normalized = re.sub(r"\s+", " ", normalized, flags=re.UNICODE).strip() return normalized def _is_fast_stop_utterance(text: str) -> bool: normalized = _normalize_command_text(text) if not normalized: return False return _FAST_STOP_UTTERANCE_RE.fullmatch(normalized) is not None class SpeechRecognizer: """Класс распознавания речи через Deepgram.""" def __init__(self): self.dg_client = None self.pa = None self.audio_manager = None self.stream = None self.transcript = "" self.last_successful_operation = datetime.now() self._input_device_index = None self._stream_sample_rate = SAMPLE_RATE def initialize(self): """Инициализация клиента Deepgram и PyAudio.""" if not DEEPGRAM_API_KEY: raise ValueError("DEEPGRAM_API_KEY is not set in environment or config.") print("📦 Инициализация Deepgram STT...") config = DeepgramClientOptions( verbose=logging.WARNING, ) try: self.dg_client = DeepgramClient(DEEPGRAM_API_KEY, config) except Exception as e: print(f"❌ Ошибка при создании клиента Deepgram: {e}") raise self.audio_manager = get_audio_manager() self.pa = self.audio_manager.get_pyaudio() self._input_device_index = self.audio_manager.get_input_device_index() print("✅ Deepgram клиент готов") # Обновляем время последней успешной операции self.last_successful_operation = datetime.now() def check_connection_health(self): """Проверяет здоровье соединения и при необходимости пересоздает клиента.""" # Проверяем, прошло ли больше 15 минут с последней успешной операции if datetime.now() - self.last_successful_operation > timedelta(minutes=15): print("🔄 Обновление соединения Deepgram для предотвращения таймаута...") try: # Очищаем старый клиент if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # Создаем новый клиент self.dg_client = None self.initialize() print("✅ Соединение Deepgram обновлено") except Exception as e: print(f"⚠️ Ошибка при обновлении соединения: {e}") def _get_stream(self): """Открывает аудиопоток PyAudio, если он еще не открыт.""" if self.stream is None: if self.audio_manager is None: self.audio_manager = get_audio_manager() self.stream, self._input_device_index, self._stream_sample_rate = ( self.audio_manager.open_input_stream( rate=SAMPLE_RATE, channels=1, format=pyaudio.paInt16, frames_per_buffer=4096, preferred_index=self._input_device_index, fallback_rates=[48000, 44100, 32000, 22050, 16000, 8000], ) ) if self._stream_sample_rate != SAMPLE_RATE: print( f"⚠️ STT mic stream uses fallback rate={self._stream_sample_rate} " f"(requested {SAMPLE_RATE})" ) return self.stream def _open_stream_for_session(self): """Открывает отдельный входной поток для одной STT-сессии.""" if self.audio_manager is None: self.audio_manager = get_audio_manager() stream, self._input_device_index, sample_rate = self.audio_manager.open_input_stream( rate=SAMPLE_RATE, channels=1, format=pyaudio.paInt16, frames_per_buffer=4096, preferred_index=self._input_device_index, fallback_rates=[48000, 44100, 32000, 22050, 16000, 8000], ) if sample_rate != SAMPLE_RATE: print( f"⚠️ STT mic stream uses fallback rate={sample_rate} " f"(requested {SAMPLE_RATE})" ) return stream, int(sample_rate) def _stop_stream_quietly(self): if not self.stream: return try: if self.stream.is_active(): self.stream.stop_stream() except Exception: pass def _release_stream(self): if not self.stream: return self._stop_stream_quietly() try: self.stream.close() except Exception: pass self.stream = None async def _wait_for_thread(self, thread, timeout_seconds: float) -> bool: """Асинхронно ждет завершения daemon-thread без блокировки event loop.""" deadline = time.monotonic() + timeout_seconds while thread.is_alive() and time.monotonic() < deadline: await asyncio.sleep(0.05) return not thread.is_alive() async def _run_blocking_cleanup(self, func, timeout_seconds: float, label: str) -> bool: """Запускает потенциально подвисающий cleanup в daemon-thread и ждет ограниченное время.""" done_event = threading.Event() error_holder = {} def runner(): try: func() except Exception as exc: error_holder["error"] = exc finally: done_event.set() thread = threading.Thread(target=runner, daemon=True, name=label) thread.start() deadline = time.monotonic() + timeout_seconds while not done_event.is_set() and time.monotonic() < deadline: await asyncio.sleep(0.05) if not done_event.is_set(): print(f"⚠️ {label} timed out; continuing cleanup.") return False error = error_holder.get("error") if error is not None: print(f"⚠️ {label} failed: {error}") return False return True def _run_blocking_cleanup_sync(self, func, timeout_seconds: float, label: str) -> bool: """Sync-версия _run_blocking_cleanup() для use-case в listen().""" done_event = threading.Event() error_holder = {} def runner(): try: func() except Exception as exc: error_holder["error"] = exc finally: done_event.set() thread = threading.Thread(target=runner, daemon=True, name=label) thread.start() done_event.wait(timeout=max(0.0, float(timeout_seconds))) if not done_event.is_set(): print(f"⚠️ {label} timed out; continuing cleanup.") return False error = error_holder.get("error") if error is not None: print(f"⚠️ {label} failed: {error}") return False return True async def _process_audio( self, dg_connection, timeout_seconds, detection_timeout, fast_stop ): """ Асинхронная функция для отправки аудио и получения текста. Args: dg_connection: Активное соединение с Deepgram. timeout_seconds: Аварийный лимит длительности активной речи. detection_timeout: Время ожидания начала речи. fast_stop: Если True, короткая стоп-фраза завершает STT после 1с тишины. """ self.transcript = "" transcript_parts = [] latest_interim = "" loop = asyncio.get_running_loop() effective_detection_timeout = ( detection_timeout if detection_timeout is not None else INITIAL_SILENCE_TIMEOUT_SECONDS ) # События для синхронизации stop_event = asyncio.Event() # Пора останавливаться speech_started_event = asyncio.Event() # Речь обнаружена (VAD) last_speech_activity = time.monotonic() first_speech_activity_at = None def mark_speech_activity(): nonlocal last_speech_activity, first_speech_activity_at now = time.monotonic() last_speech_activity = now if first_speech_activity_at is None: first_speech_activity_at = now speech_started_event.set() # --- Обработчики событий Deepgram --- def on_transcript(unused_self, result, **kwargs): """Вызывается, когда приходит часть текста.""" nonlocal latest_interim sentence = result.channel.alternatives[0].transcript if len(sentence) == 0: return sentence = sentence.strip() if not sentence: return try: loop.call_soon_threadsafe(mark_speech_activity) except RuntimeError: pass if fast_stop: if _is_fast_stop_utterance(sentence): self.transcript = sentence try: loop.call_soon_threadsafe(request_stop) except RuntimeError: pass return if result.is_final: # Собираем только финальные (подтвержденные) фразы transcript_parts.append(sentence) self.transcript = " ".join(transcript_parts).strip() latest_interim = "" else: # Fallback: некоторые сессии завершаются без is_final. latest_interim = sentence def on_speech_started(unused_self, speech_started, **kwargs): """Вызывается, когда VAD (Voice Activity Detection) слышит голос.""" try: loop.call_soon_threadsafe(mark_speech_activity) except RuntimeError: # Event loop might be closed, ignore pass def on_utterance_end(unused_self, utterance_end, **kwargs): """Вызывается, когда Deepgram решает, что фраза закончилась (пауза).""" # Не останавливаемся мгновенно на событии Deepgram. # Остановка управляется локальным порогом тишины POST_SPEECH_SILENCE_TIMEOUT_SECONDS. return def on_error(unused_self, error, **kwargs): print(f"Deepgram Error: {error}") try: loop.call_soon_threadsafe(request_stop) except RuntimeError: # Event loop might be closed, ignore pass # Подписываемся на события dg_connection.on(LiveTranscriptionEvents.Transcript, on_transcript) dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started) dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) dg_connection.on(LiveTranscriptionEvents.Error, on_error) # --- Задача отправки аудио с буферизацией --- sender_stop_event = threading.Event() stream_holder = {"stream": None} def request_stop(): stop_event.set() sender_stop_event.set() def send_audio(): chunks_sent = 0 audio_buffer = [] # Буфер для накопления звука во время подключения stream = None try: stream, stream_sample_rate = self._open_stream_for_session() stream_holder["stream"] = stream options = LiveOptions( model="nova-2", # Самая быстрая и точная модель language=self.current_lang, smart_format=True, # Расстановка знаков препинания encoding="linear16", channels=1, sample_rate=stream_sample_rate, interim_results=True, utterance_end_ms=int(POST_SPEECH_SILENCE_TIMEOUT_SECONDS * 1000), vad_events=True, # Сглаженный порог endpointing, чтобы не резать речь на коротких паузах. endpointing=int(POST_SPEECH_SILENCE_TIMEOUT_SECONDS * 1000), ) # 1. Сразу начинаем захват звука, не дожидаясь сети! stream.start_stream() print("🎤 Stream started (buffering)...") # 2. Запускаем подключение к Deepgram в фоне (через ThreadPool, т.к. start() блокирующий) # Но в данном SDK start() возвращает bool, он может быть блокирующим. # Deepgram Python SDK v3+ start() делает handshake. connect_result = {"done": False, "ok": None, "error": None} def start_connection(): try: connect_result["ok"] = dg_connection.start(options) except Exception as exc: connect_result["error"] = exc finally: connect_result["done"] = True connect_thread = threading.Thread( target=start_connection, daemon=True ) connect_thread.start() # Пока подключаемся, копим данные. # Ждём коротко: если сеть подвисла, быстрее перезапускаем попытку. connect_deadline = time.monotonic() + DEEPGRAM_CONNECT_WAIT_SECONDS while ( not connect_result["done"] and time.monotonic() < connect_deadline and not sender_stop_event.is_set() ): if stream.is_active(): try: data = stream.read(4096, exception_on_overflow=False) except Exception as read_error: if sender_stop_event.is_set(): return print(f"Audio read error during connect: {read_error}") with contextlib.suppress(RuntimeError): loop.call_soon_threadsafe(request_stop) return audio_buffer.append(data) time.sleep(DEEPGRAM_CONNECT_POLL_SECONDS) if sender_stop_event.is_set(): return if not connect_result["done"]: print( f"⏰ Timeout connecting to Deepgram ({DEEPGRAM_CONNECT_WAIT_SECONDS:.1f}s)" ) loop.call_soon_threadsafe(request_stop) return # Проверяем результат подключения if connect_result["error"] is not None: print(f"Failed to start Deepgram connection: {connect_result['error']}") loop.call_soon_threadsafe(request_stop) return if connect_result["ok"] is False: print("Failed to start Deepgram connection") loop.call_soon_threadsafe(request_stop) return print(f"🚀 Connected! Sending buffer ({len(audio_buffer)} chunks)...") # 3. Отправляем накопленный буфер for chunk in audio_buffer: dg_connection.send(chunk) chunks_sent += 1 audio_buffer = None # Освобождаем память # 4. Продолжаем стримить в реальном времени до события остановки. while not sender_stop_event.is_set(): if not stream.is_active(): break try: data = stream.read(4096, exception_on_overflow=False) except Exception as read_error: if sender_stop_event.is_set(): break print(f"Audio read error: {read_error}") with contextlib.suppress(RuntimeError): loop.call_soon_threadsafe(request_stop) break if sender_stop_event.is_set(): break dg_connection.send(data) chunks_sent += 1 if chunks_sent % 50 == 0: print(".", end="", flush=True) time.sleep(0.002) # Уменьшаем задержку для более быстрого реагирования except Exception as e: print(f"Audio send error: {e}") with contextlib.suppress(RuntimeError): loop.call_soon_threadsafe(request_stop) finally: with contextlib.suppress(Exception): if stream and stream.is_active(): stream.stop_stream() with contextlib.suppress(Exception): if stream: stream.close() stream_holder["stream"] = None print(f"\n🛑 Stream stopped. Chunks sent: {chunks_sent}") sender_thread = threading.Thread( target=send_audio, daemon=True, name="deepgram-audio-sender", ) sender_thread.start() if False: # dg_connection.start(options) перенесен внутрь send_audio pass try: # 1. Ждем начала речи (если задан detection_timeout) if ( effective_detection_timeout and effective_detection_timeout > 0 and not stop_event.is_set() ): speech_wait_task = asyncio.create_task(speech_started_event.wait()) stop_wait_task = asyncio.create_task(stop_event.wait()) try: done, pending = await asyncio.wait( {speech_wait_task, stop_wait_task}, timeout=effective_detection_timeout, return_when=asyncio.FIRST_COMPLETED, ) finally: for task in (speech_wait_task, stop_wait_task): if not task.done(): task.cancel() await asyncio.gather( speech_wait_task, stop_wait_task, return_exceptions=True ) if not done: # Если за detection_timeout никто не начал говорить, выходим request_stop() # 2. После старта речи завершаем только по тишине POST_SPEECH_SILENCE_TIMEOUT_SECONDS. # Добавляем длинный защитный лимит, чтобы сессия не зависла навсегда. if not stop_event.is_set(): max_active_speech_seconds = max( timeout_seconds if timeout_seconds else 0.0, MAX_ACTIVE_SPEECH_SECONDS, ) while not stop_event.is_set(): now = time.monotonic() if speech_started_event.is_set(): if ( now - last_speech_activity >= POST_SPEECH_SILENCE_TIMEOUT_SECONDS ): request_stop() break if ( first_speech_activity_at is not None and now - first_speech_activity_at >= max_active_speech_seconds ): print("⏱️ Достигнут защитный лимит активного прослушивания.") request_stop() break await asyncio.sleep(0.05) except asyncio.TimeoutError: pass # Общий таймаут вышел except Exception as e: print(f"Error in waiting for events: {e}") request_stop() sender_stopped = await self._wait_for_thread( sender_thread, timeout_seconds=max(SENDER_STOP_WAIT_SECONDS, SENDER_FORCE_RELEASE_WAIT_SECONDS), ) cleanup_unhealthy = False if not sender_stopped: print("⚠️ Audio sender shutdown timed out; continuing cleanup.") cleanup_unhealthy = True def force_close_stream(): stream = stream_holder.get("stream") if not stream: return with contextlib.suppress(Exception): if stream.is_active(): stream.stop_stream() with contextlib.suppress(Exception): stream.close() stream_holder["stream"] = None await self._run_blocking_cleanup( force_close_stream, timeout_seconds=SENDER_FORCE_RELEASE_WAIT_SECONDS, label="STT audio stream force close", ) # Дадим шанс потоку выйти после принудительного закрытия. sender_stopped = await self._wait_for_thread(sender_thread, timeout_seconds=0.6) if not sender_stopped: cleanup_unhealthy = True # Небольшая пауза, чтобы получить последние transcript-события перед finish(). await asyncio.sleep(DEEPGRAM_FINALIZATION_GRACE_SECONDS) # Завершаем соединение и ждем последние результаты finish_ok = await self._run_blocking_cleanup( dg_connection.finish, timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS, label="Deepgram finish", ) if not finish_ok: cleanup_unhealthy = True final_text = self.transcript.strip() if not final_text: final_text = latest_interim.strip() self.transcript = final_text if cleanup_unhealthy: # Если текст уже получен, не теряем команду пользователя. # Но сбрасываем клиента, чтобы следующая STT-сессия стартовала на чистом соединении. self.dg_client = None if final_text: return final_text raise RuntimeError("Deepgram session cleanup timed out") return final_text def listen( self, timeout_seconds: float = 7.0, detection_timeout: float = INITIAL_SILENCE_TIMEOUT_SECONDS, lang: str = "ru", fast_stop: bool = False, ) -> str: """ Основной метод: слушает микрофон и возвращает текст. Args: timeout_seconds: Защитный лимит длительности активной речи. detection_timeout: Сколько ждать начала речи перед тем как сдаться. lang: Язык ("ru" или "en"). fast_stop: Быстрое завершение для коротких stop-команд. """ if not self.dg_client: self.initialize() # Проверяем здоровье соединения перед началом прослушивания self.check_connection_health() self.current_lang = lang print(f"🎙️ Слушаю ({lang})...") last_error = None # Делаем 3 попытки на случай сбоя сети for attempt in range(3): dg_connection = None try: # Создаем новое live подключение для каждой сессии dg_connection = self.dg_client.listen.live.v("1") # Запускаем асинхронный процесс обработки transcript = asyncio.run( self._process_audio( dg_connection, timeout_seconds, detection_timeout, fast_stop ) ) final_text = transcript.strip() if transcript else "" if final_text: print(f"📝 Распознано: {final_text}") # Обновляем время последней успешной операции self.last_successful_operation = datetime.now() return final_text else: # Если вернулась пустая строка (тишина), считаем это штатным завершением. # Не нужно повторять попытку, как при ошибке сети. # Все равно обновляем время последней успешной операции self.last_successful_operation = datetime.now() return "" except Exception as e: last_error = e print(f"Attempt {attempt + 1} failed: {e}") # Закрываем соединение, если оно было создано if dg_connection: try: self._run_blocking_cleanup_sync( dg_connection.finish, timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS, label="Deepgram finish (error cleanup)", ) except: pass # Игнорируем ошибки при завершении # Принудительно сбрасываем клиента, чтобы след. попытка не унаследовала # подвисшее соединение SDK. self.dg_client = None with contextlib.suppress(Exception): self.initialize() if attempt < 2: # Не ждем после последней попытки print(f"⚠️ Не удалось подключиться к Deepgram, попытка {attempt + 1}/3, повторяю...") time.sleep(1) # Уменьшаем задержку между попытками if last_error: print(f"❌ Ошибка STT после всех попыток: {last_error}") else: print("⚠️ Речь не распознана") return "" def cleanup(self): """Очистка ресурсов.""" if self.stream: try: if self.stream.is_active(): self.stream.stop_stream() except Exception as e: print(f"Ошибка при остановке потока: {e}") try: self.stream.close() except Exception as e: print(f"Ошибка при закрытии потока: {e}") self.stream = None # self.pa.terminate() - Используем общий менеджер # Сбросим клиента для принудительного переподключения self.dg_client = None # Глобальный экземпляр _recognizer = None def get_recognizer() -> SpeechRecognizer: global _recognizer if _recognizer is None: _recognizer = SpeechRecognizer() return _recognizer def listen( timeout_seconds: float = 7.0, detection_timeout: float = INITIAL_SILENCE_TIMEOUT_SECONDS, lang: str = "ru", fast_stop: bool = False, ) -> str: """Внешняя функция для прослушивания.""" return get_recognizer().listen(timeout_seconds, detection_timeout, lang, fast_stop) def cleanup(): """Внешняя функция очистки.""" global _recognizer if _recognizer: try: _recognizer.cleanup() except Exception as e: print(f"Ошибка при очистке STT: {e}") _recognizer = None