Stabilize Deepgram session cleanup

This commit is contained in:
2026-03-15 16:36:23 +03:00
parent 715d7b0ee0
commit 2088f366b6

View File

@@ -35,6 +35,7 @@ DEEPGRAM_CONNECT_WAIT_SECONDS = 4.0
DEEPGRAM_CONNECT_POLL_SECONDS = 0.001 DEEPGRAM_CONNECT_POLL_SECONDS = 0.001
SENDER_STOP_WAIT_SECONDS = 2.5 SENDER_STOP_WAIT_SECONDS = 2.5
SENDER_FORCE_RELEASE_WAIT_SECONDS = 2.5 SENDER_FORCE_RELEASE_WAIT_SECONDS = 2.5
DEEPGRAM_FINALIZE_TIMEOUT_SECONDS = 1.5
DEEPGRAM_FINALIZATION_GRACE_SECONDS = 0.35 DEEPGRAM_FINALIZATION_GRACE_SECONDS = 0.35
DEEPGRAM_FINISH_TIMEOUT_SECONDS = 4.0 DEEPGRAM_FINISH_TIMEOUT_SECONDS = 4.0
@@ -203,7 +204,9 @@ class SpeechRecognizer:
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
return not thread.is_alive() return not thread.is_alive()
async def _run_blocking_cleanup(self, func, timeout_seconds: float, label: str) -> bool: async def _run_blocking_cleanup(
self, func, timeout_seconds: float, label: str, quiet: bool = False
) -> bool:
"""Запускает потенциально подвисающий cleanup в daemon-thread и ждет ограниченное время.""" """Запускает потенциально подвисающий cleanup в daemon-thread и ждет ограниченное время."""
done_event = threading.Event() done_event = threading.Event()
error_holder = {} error_holder = {}
@@ -224,17 +227,21 @@ class SpeechRecognizer:
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
if not done_event.is_set(): if not done_event.is_set():
if not quiet:
print(f"⚠️ {label} timed out; continuing cleanup.") print(f"⚠️ {label} timed out; continuing cleanup.")
return False return False
error = error_holder.get("error") error = error_holder.get("error")
if error is not None: if error is not None:
if not quiet:
print(f"⚠️ {label} failed: {error}") print(f"⚠️ {label} failed: {error}")
return False return False
return True return True
def _run_blocking_cleanup_sync(self, func, timeout_seconds: float, label: str) -> bool: def _run_blocking_cleanup_sync(
self, func, timeout_seconds: float, label: str, quiet: bool = False
) -> bool:
"""Sync-версия _run_blocking_cleanup() для use-case в listen().""" """Sync-версия _run_blocking_cleanup() для use-case в listen()."""
done_event = threading.Event() done_event = threading.Event()
error_holder = {} error_holder = {}
@@ -252,11 +259,13 @@ class SpeechRecognizer:
done_event.wait(timeout=max(0.0, float(timeout_seconds))) done_event.wait(timeout=max(0.0, float(timeout_seconds)))
if not done_event.is_set(): if not done_event.is_set():
if not quiet:
print(f"⚠️ {label} timed out; continuing cleanup.") print(f"⚠️ {label} timed out; continuing cleanup.")
return False return False
error = error_holder.get("error") error = error_holder.get("error")
if error is not None: if error is not None:
if not quiet:
print(f"⚠️ {label} failed: {error}") print(f"⚠️ {label} failed: {error}")
return False return False
return True return True
@@ -289,6 +298,7 @@ class SpeechRecognizer:
speech_started_event = asyncio.Event() # Речь обнаружена (VAD) speech_started_event = asyncio.Event() # Речь обнаружена (VAD)
last_speech_activity = time.monotonic() last_speech_activity = time.monotonic()
first_speech_activity_at = None first_speech_activity_at = None
session_error = {"message": None}
def mark_speech_activity(): def mark_speech_activity():
nonlocal last_speech_activity, first_speech_activity_at nonlocal last_speech_activity, first_speech_activity_at
@@ -298,6 +308,10 @@ class SpeechRecognizer:
first_speech_activity_at = now first_speech_activity_at = now
speech_started_event.set() speech_started_event.set()
def mark_session_error(message: str):
if not session_error["message"]:
session_error["message"] = str(message)
# --- Обработчики событий Deepgram --- # --- Обработчики событий Deepgram ---
def on_transcript(unused_self, result, **kwargs): def on_transcript(unused_self, result, **kwargs):
"""Вызывается, когда приходит часть текста.""" """Вызывается, когда приходит часть текста."""
@@ -346,6 +360,8 @@ class SpeechRecognizer:
return return
def on_error(unused_self, error, **kwargs): def on_error(unused_self, error, **kwargs):
if stop_event.is_set():
return
print(f"Deepgram Error: {error}") print(f"Deepgram Error: {error}")
try: try:
loop.call_soon_threadsafe(request_stop) loop.call_soon_threadsafe(request_stop)
@@ -425,6 +441,7 @@ class SpeechRecognizer:
except Exception as read_error: except Exception as read_error:
if sender_stop_event.is_set(): if sender_stop_event.is_set():
return return
mark_session_error(f"Audio read error during connect: {read_error}")
print(f"Audio read error during connect: {read_error}") print(f"Audio read error during connect: {read_error}")
with contextlib.suppress(RuntimeError): with contextlib.suppress(RuntimeError):
loop.call_soon_threadsafe(request_stop) loop.call_soon_threadsafe(request_stop)
@@ -436,6 +453,9 @@ class SpeechRecognizer:
return return
if not connect_result["done"]: if not connect_result["done"]:
mark_session_error(
f"Timeout connecting to Deepgram ({DEEPGRAM_CONNECT_WAIT_SECONDS:.1f}s)"
)
print( print(
f"⏰ Timeout connecting to Deepgram ({DEEPGRAM_CONNECT_WAIT_SECONDS:.1f}s)" f"⏰ Timeout connecting to Deepgram ({DEEPGRAM_CONNECT_WAIT_SECONDS:.1f}s)"
) )
@@ -444,11 +464,15 @@ class SpeechRecognizer:
# Проверяем результат подключения # Проверяем результат подключения
if connect_result["error"] is not None: if connect_result["error"] is not None:
mark_session_error(
f"Failed to start Deepgram connection: {connect_result['error']}"
)
print(f"Failed to start Deepgram connection: {connect_result['error']}") print(f"Failed to start Deepgram connection: {connect_result['error']}")
loop.call_soon_threadsafe(request_stop) loop.call_soon_threadsafe(request_stop)
return return
if connect_result["ok"] is False: if connect_result["ok"] is False:
mark_session_error("Failed to start Deepgram connection")
print("Failed to start Deepgram connection") print("Failed to start Deepgram connection")
loop.call_soon_threadsafe(request_stop) loop.call_soon_threadsafe(request_stop)
return return
@@ -471,6 +495,7 @@ class SpeechRecognizer:
except Exception as read_error: except Exception as read_error:
if sender_stop_event.is_set(): if sender_stop_event.is_set():
break break
mark_session_error(f"Audio read error: {read_error}")
print(f"Audio read error: {read_error}") print(f"Audio read error: {read_error}")
with contextlib.suppress(RuntimeError): with contextlib.suppress(RuntimeError):
loop.call_soon_threadsafe(request_stop) loop.call_soon_threadsafe(request_stop)
@@ -484,6 +509,7 @@ class SpeechRecognizer:
time.sleep(0.002) # Уменьшаем задержку для более быстрого реагирования time.sleep(0.002) # Уменьшаем задержку для более быстрого реагирования
except Exception as e: except Exception as e:
mark_session_error(f"Audio send error: {e}")
print(f"Audio send error: {e}") print(f"Audio send error: {e}")
with contextlib.suppress(RuntimeError): with contextlib.suppress(RuntimeError):
loop.call_soon_threadsafe(request_stop) loop.call_soon_threadsafe(request_stop)
@@ -570,15 +596,13 @@ class SpeechRecognizer:
print(f"Error in waiting for events: {e}") print(f"Error in waiting for events: {e}")
request_stop() request_stop()
heard_speech = speech_started_event.is_set()
sender_stopped = await self._wait_for_thread( sender_stopped = await self._wait_for_thread(
sender_thread, sender_thread,
timeout_seconds=max(SENDER_STOP_WAIT_SECONDS, SENDER_FORCE_RELEASE_WAIT_SECONDS), timeout_seconds=max(SENDER_STOP_WAIT_SECONDS, SENDER_FORCE_RELEASE_WAIT_SECONDS),
) )
cleanup_unhealthy = False cleanup_unhealthy = False
if not sender_stopped: if not sender_stopped:
print("⚠️ Audio sender shutdown timed out; continuing cleanup.")
cleanup_unhealthy = True
def force_close_stream(): def force_close_stream():
stream = stream_holder.get("stream") stream = stream_holder.get("stream")
if not stream: if not stream:
@@ -594,6 +618,7 @@ class SpeechRecognizer:
force_close_stream, force_close_stream,
timeout_seconds=SENDER_FORCE_RELEASE_WAIT_SECONDS, timeout_seconds=SENDER_FORCE_RELEASE_WAIT_SECONDS,
label="STT audio stream force close", label="STT audio stream force close",
quiet=True,
) )
# Дадим шанс потоку выйти после принудительного закрытия. # Дадим шанс потоку выйти после принудительного закрытия.
@@ -601,7 +626,14 @@ class SpeechRecognizer:
if not sender_stopped: if not sender_stopped:
cleanup_unhealthy = True cleanup_unhealthy = True
# Небольшая пауза, чтобы получить последние transcript-события перед finish(). # Сначала мягко просим Deepgram дослать хвост распознавания.
if heard_speech:
await self._run_blocking_cleanup(
dg_connection.finalize,
timeout_seconds=DEEPGRAM_FINALIZE_TIMEOUT_SECONDS,
label="Deepgram finalize",
quiet=True,
)
await asyncio.sleep(DEEPGRAM_FINALIZATION_GRACE_SECONDS) await asyncio.sleep(DEEPGRAM_FINALIZATION_GRACE_SECONDS)
# Завершаем соединение и ждем последние результаты # Завершаем соединение и ждем последние результаты
@@ -609,6 +641,7 @@ class SpeechRecognizer:
dg_connection.finish, dg_connection.finish,
timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS, timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS,
label="Deepgram finish", label="Deepgram finish",
quiet=True,
) )
if not finish_ok: if not finish_ok:
cleanup_unhealthy = True cleanup_unhealthy = True
@@ -617,13 +650,12 @@ class SpeechRecognizer:
if not final_text: if not final_text:
final_text = latest_interim.strip() final_text = latest_interim.strip()
self.transcript = final_text self.transcript = final_text
if session_error["message"] and not final_text:
raise RuntimeError(session_error["message"])
if cleanup_unhealthy: if cleanup_unhealthy:
# Если текст уже получен, не теряем команду пользователя. # Если cleanup подвис, не валим текущую команду и не запускаем ложный retry.
# Но сбрасываем клиента, чтобы следующая STT-сессия стартовала на чистом соединении. # Просто пересоздаем клиента перед следующим прослушиванием.
self.dg_client = None self.dg_client = None
if final_text:
return final_text
raise RuntimeError("Deepgram session cleanup timed out")
return final_text return final_text
def listen( def listen(
@@ -689,6 +721,7 @@ class SpeechRecognizer:
dg_connection.finish, dg_connection.finish,
timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS, timeout_seconds=DEEPGRAM_FINISH_TIMEOUT_SECONDS,
label="Deepgram finish (error cleanup)", label="Deepgram finish (error cleanup)",
quiet=True,
) )
except: except:
pass # Игнорируем ошибки при завершении pass # Игнорируем ошибки при завершении