From 2088f366b6b76e0f7908c5fb4bfa4563b2f5750c Mon Sep 17 00:00:00 2001 From: future Date: Sun, 15 Mar 2026 16:36:23 +0300 Subject: [PATCH] Stabilize Deepgram session cleanup --- app/audio/stt.py | 65 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 16 deletions(-) diff --git a/app/audio/stt.py b/app/audio/stt.py index 1c4483b..7070363 100644 --- a/app/audio/stt.py +++ b/app/audio/stt.py @@ -35,6 +35,7 @@ DEEPGRAM_CONNECT_WAIT_SECONDS = 4.0 DEEPGRAM_CONNECT_POLL_SECONDS = 0.001 SENDER_STOP_WAIT_SECONDS = 2.5 SENDER_FORCE_RELEASE_WAIT_SECONDS = 2.5 +DEEPGRAM_FINALIZE_TIMEOUT_SECONDS = 1.5 DEEPGRAM_FINALIZATION_GRACE_SECONDS = 0.35 DEEPGRAM_FINISH_TIMEOUT_SECONDS = 4.0 @@ -203,7 +204,9 @@ class SpeechRecognizer: await asyncio.sleep(0.05) return not thread.is_alive() - async def _run_blocking_cleanup(self, func, timeout_seconds: float, label: str) -> bool: + async def _run_blocking_cleanup( + self, func, timeout_seconds: float, label: str, quiet: bool = False + ) -> bool: """Запускает потенциально подвисающий cleanup в daemon-thread и ждет ограниченное время.""" done_event = threading.Event() error_holder = {} @@ -224,17 +227,21 @@ class SpeechRecognizer: await asyncio.sleep(0.05) if not done_event.is_set(): - print(f"⚠️ {label} timed out; continuing cleanup.") + if not quiet: + print(f"⚠️ {label} timed out; continuing cleanup.") return False error = error_holder.get("error") if error is not None: - print(f"⚠️ {label} failed: {error}") + if not quiet: + print(f"⚠️ {label} failed: {error}") return False return True - def _run_blocking_cleanup_sync(self, func, timeout_seconds: float, label: str) -> bool: + def _run_blocking_cleanup_sync( + self, func, timeout_seconds: float, label: str, quiet: bool = False + ) -> bool: """Sync-версия _run_blocking_cleanup() для use-case в listen().""" done_event = threading.Event() error_holder = {} @@ -252,12 +259,14 @@ class SpeechRecognizer: done_event.wait(timeout=max(0.0, float(timeout_seconds))) if not done_event.is_set(): - print(f"⚠️ {label} timed out; continuing cleanup.") + if not quiet: + print(f"⚠️ {label} timed out; continuing cleanup.") return False error = error_holder.get("error") if error is not None: - print(f"⚠️ {label} failed: {error}") + if not quiet: + print(f"⚠️ {label} failed: {error}") return False return True @@ -289,6 +298,7 @@ class SpeechRecognizer: speech_started_event = asyncio.Event() # Речь обнаружена (VAD) last_speech_activity = time.monotonic() first_speech_activity_at = None + session_error = {"message": None} def mark_speech_activity(): nonlocal last_speech_activity, first_speech_activity_at @@ -298,6 +308,10 @@ class SpeechRecognizer: first_speech_activity_at = now speech_started_event.set() + def mark_session_error(message: str): + if not session_error["message"]: + session_error["message"] = str(message) + # --- Обработчики событий Deepgram --- def on_transcript(unused_self, result, **kwargs): """Вызывается, когда приходит часть текста.""" @@ -346,6 +360,8 @@ class SpeechRecognizer: return def on_error(unused_self, error, **kwargs): + if stop_event.is_set(): + return print(f"Deepgram Error: {error}") try: loop.call_soon_threadsafe(request_stop) @@ -425,6 +441,7 @@ class SpeechRecognizer: except Exception as read_error: if sender_stop_event.is_set(): return + mark_session_error(f"Audio read error during connect: {read_error}") print(f"Audio read error during connect: {read_error}") with contextlib.suppress(RuntimeError): loop.call_soon_threadsafe(request_stop) @@ -436,6 +453,9 @@ class SpeechRecognizer: return if not connect_result["done"]: + mark_session_error( + f"Timeout connecting to Deepgram ({DEEPGRAM_CONNECT_WAIT_SECONDS:.1f}s)" + ) print( f"⏰ Timeout connecting to Deepgram ({DEEPGRAM_CONNECT_WAIT_SECONDS:.1f}s)" ) @@ -444,11 +464,15 @@ class SpeechRecognizer: # Проверяем результат подключения if connect_result["error"] is not None: + mark_session_error( + f"Failed to start Deepgram connection: {connect_result['error']}" + ) print(f"Failed to start Deepgram connection: {connect_result['error']}") loop.call_soon_threadsafe(request_stop) return if connect_result["ok"] is False: + mark_session_error("Failed to start Deepgram connection") print("Failed to start Deepgram connection") loop.call_soon_threadsafe(request_stop) return @@ -471,6 +495,7 @@ class SpeechRecognizer: except Exception as read_error: if sender_stop_event.is_set(): break + mark_session_error(f"Audio read error: {read_error}") print(f"Audio read error: {read_error}") with contextlib.suppress(RuntimeError): loop.call_soon_threadsafe(request_stop) @@ -484,6 +509,7 @@ class SpeechRecognizer: time.sleep(0.002) # Уменьшаем задержку для более быстрого реагирования except Exception as e: + mark_session_error(f"Audio send error: {e}") print(f"Audio send error: {e}") with contextlib.suppress(RuntimeError): loop.call_soon_threadsafe(request_stop) @@ -570,15 +596,13 @@ class SpeechRecognizer: print(f"Error in waiting for events: {e}") request_stop() + heard_speech = speech_started_event.is_set() sender_stopped = await self._wait_for_thread( sender_thread, timeout_seconds=max(SENDER_STOP_WAIT_SECONDS, SENDER_FORCE_RELEASE_WAIT_SECONDS), ) cleanup_unhealthy = False if not sender_stopped: - print("⚠️ Audio sender shutdown timed out; continuing cleanup.") - cleanup_unhealthy = True - def force_close_stream(): stream = stream_holder.get("stream") if not stream: @@ -594,6 +618,7 @@ class SpeechRecognizer: force_close_stream, timeout_seconds=SENDER_FORCE_RELEASE_WAIT_SECONDS, label="STT audio stream force close", + quiet=True, ) # Дадим шанс потоку выйти после принудительного закрытия. @@ -601,14 +626,22 @@ class SpeechRecognizer: if not sender_stopped: cleanup_unhealthy = True - # Небольшая пауза, чтобы получить последние transcript-события перед finish(). - await asyncio.sleep(DEEPGRAM_FINALIZATION_GRACE_SECONDS) + # Сначала мягко просим Deepgram дослать хвост распознавания. + if heard_speech: + await self._run_blocking_cleanup( + dg_connection.finalize, + timeout_seconds=DEEPGRAM_FINALIZE_TIMEOUT_SECONDS, + label="Deepgram finalize", + quiet=True, + ) + await asyncio.sleep(DEEPGRAM_FINALIZATION_GRACE_SECONDS) # Завершаем соединение и ждем последние результаты finish_ok = await self._run_blocking_cleanup( dg_connection.finish, timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS, label="Deepgram finish", + quiet=True, ) if not finish_ok: cleanup_unhealthy = True @@ -617,13 +650,12 @@ class SpeechRecognizer: if not final_text: final_text = latest_interim.strip() self.transcript = final_text + if session_error["message"] and not final_text: + raise RuntimeError(session_error["message"]) if cleanup_unhealthy: - # Если текст уже получен, не теряем команду пользователя. - # Но сбрасываем клиента, чтобы следующая STT-сессия стартовала на чистом соединении. + # Если cleanup подвис, не валим текущую команду и не запускаем ложный retry. + # Просто пересоздаем клиента перед следующим прослушиванием. self.dg_client = None - if final_text: - return final_text - raise RuntimeError("Deepgram session cleanup timed out") return final_text def listen( @@ -689,6 +721,7 @@ class SpeechRecognizer: dg_connection.finish, timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS, label="Deepgram finish (error cleanup)", + quiet=True, ) except: pass # Игнорируем ошибки при завершении