feat: refine assistant logic and update docs

This commit is contained in:
future
2026-04-09 21:03:02 +03:00
parent ebe79c3692
commit 42c064a274
19 changed files with 1958 additions and 492 deletions

View File

@@ -19,12 +19,14 @@ import sounddevice as sd
import torch
from ..core.audio_manager import get_audio_manager
from ..core.config import TTS_EN_SPEAKER, TTS_SAMPLE_RATE, TTS_SPEAKER
from ..core.config import TTS_EN_SPEAKER, TTS_SAMPLE_RATE, TTS_SPEAKER, TTS_SPEED
# Подавляем предупреждения Silero о длинном тексте (мы сами его режем)
warnings.filterwarnings("ignore", message="Text string is longer than 1000 symbols")
_EN_WORD_RE = re.compile(r"[A-Za-z][A-Za-z0-9'-]*")
_MIXED_TTS_BUFFERED_SWITCHES = 3
_INTERRUPT_POLL_SECONDS = 0.01
class TextToSpeech:
@@ -34,6 +36,7 @@ class TextToSpeech:
self.model_ru = None
self.model_en = None
self.sample_rate = TTS_SAMPLE_RATE
self.speed_factor = float(TTS_SPEED)
self.speaker_ru = TTS_SPEAKER
self.speaker_en = TTS_EN_SPEAKER
self._interrupted = False
@@ -41,6 +44,23 @@ class TextToSpeech:
self._audio_manager = None
self._output_device_index = None
def _apply_speed(self, audio_np: np.ndarray) -> np.ndarray:
"""Применяет небольшой time-stretch без изменения остальной логики TTS."""
audio = np.asarray(audio_np, dtype=np.float32)
if audio.size == 0:
return audio
speed = max(0.85, min(1.15, float(self.speed_factor)))
if abs(speed - 1.0) < 0.01:
return audio
# speed < 1.0 -> медленнее (длина массива больше), speed > 1.0 -> быстрее.
target_length = max(1, int(round(audio.size / speed)))
x_old = np.arange(audio.size, dtype=np.float32)
x_new = np.linspace(0.0, float(max(0, audio.size - 1)), target_length)
stretched = np.interp(x_new, x_old, audio)
return np.asarray(stretched, dtype=np.float32)
def _load_model(self, language: str):
"""
Загрузка и кэширование модели Silero TTS.
@@ -52,21 +72,12 @@ class TextToSpeech:
if self.model_en:
return self.model_en
print("📦 Загрузка модели Silero TTS (en)...")
try:
model, _ = torch.hub.load(
repo_or_dir="snakers4/silero-models",
model="silero_tts",
language="en",
speaker="v5_en",
)
except Exception as exc:
print(f"⚠️ Не удалось загрузить v5_en, пробую v3_en: {exc}")
model, _ = torch.hub.load(
repo_or_dir="snakers4/silero-models",
model="silero_tts",
language="en",
speaker="v3_en",
)
model, _ = torch.hub.load(
repo_or_dir="snakers4/silero-models",
model="silero_tts",
language="en",
speaker="v3_en",
)
model.to(device)
self.model_en = model
return model
@@ -185,28 +196,7 @@ class TextToSpeech:
if not text.strip():
return True
# Выбор модели
if language == "en":
model = self._load_model("en")
speaker = self.speaker_en
else:
model = self._load_model("ru")
speaker = self.speaker_ru
# Проверка наличия спикера в модели (защита от ошибок конфига).
# Для русского языка сохраняем мужской голос по умолчанию.
if hasattr(model, "speakers") and model.speakers:
if language == "ru":
male_speakers = ("eugene", "aidar")
if speaker not in model.speakers or speaker not in male_speakers:
for candidate in male_speakers:
if candidate in model.speakers:
speaker = candidate
break
else:
speaker = model.speakers[0]
elif speaker not in model.speakers:
speaker = model.speakers[0]
model, speaker = self._get_model_and_speaker(language)
# Разбиваем текст на куски
chunks = self._split_text(text)
@@ -233,7 +223,7 @@ class TextToSpeech:
)
# Конвертация в numpy массив для sounddevice
audio_np = audio.numpy()
audio_np = self._apply_speed(audio.numpy())
if check_interrupt:
if not self._play_audio_with_interrupt(audio_np, check_interrupt):
@@ -256,10 +246,104 @@ class TextToSpeech:
else:
return False
def _get_model_and_speaker(self, language: str):
"""Возвращает модель и подходящий голос для языка."""
# Выбор модели
if language == "en":
model = self._load_model("en")
speaker = self.speaker_en
else:
model = self._load_model("ru")
speaker = self.speaker_ru
# Проверка наличия спикера в модели (защита от ошибок конфига).
# Для русского языка сохраняем мужской голос по умолчанию.
if hasattr(model, "speakers") and model.speakers:
if language == "ru":
male_speakers = ("eugene", "aidar")
if speaker not in model.speakers or speaker not in male_speakers:
for candidate in male_speakers:
if candidate in model.speakers:
speaker = candidate
break
else:
speaker = model.speakers[0]
elif speaker not in model.speakers:
speaker = model.speakers[0]
return model, speaker
def _synthesize_language_audio(self, text: str, language: str) -> np.ndarray | None:
"""Собирает аудио для одного языка без промежуточного воспроизведения."""
if not text.strip():
return np.asarray([], dtype=np.float32)
model, speaker = self._get_model_and_speaker(language)
chunks = self._split_text(text)
audio_parts = []
for chunk in chunks:
if self._interrupted:
return None
audio = model.apply_tts(text=chunk, speaker=speaker, sample_rate=self.sample_rate)
audio_parts.append(self._apply_speed(audio.numpy()))
if not audio_parts:
return np.asarray([], dtype=np.float32)
return np.concatenate(audio_parts)
def _count_language_switches(self, segments: list[tuple[str, str]]) -> int:
if len(segments) < 2:
return 0
return sum(
1
for idx in range(1, len(segments))
if segments[idx - 1][1] != segments[idx][1]
)
def _speak_mixed_buffered(
self, segments: list[tuple[str, str]], check_interrupt=None
) -> bool:
"""Сначала собирает mixed RU/EN аудио, затем проигрывает единым потоком."""
print(f"🔊 Mixed TTS: буферизация сегментов ({len(segments)} шт.)")
self._interrupted = False
self._stop_flag.clear()
audio_parts = []
for idx, (segment, lang) in enumerate(segments, start=1):
if not segment.strip():
continue
if check_interrupt and check_interrupt():
self._interrupted = True
return False
try:
audio_np = self._synthesize_language_audio(segment, language=lang)
except Exception as exc:
print(f"❌ Ошибка mixed TTS (сегмент {idx}/{len(segments)}): {exc}")
return False
if audio_np is None:
return False
if audio_np.size:
audio_parts.append(audio_np)
if not audio_parts:
return True
full_audio = np.concatenate(audio_parts)
if check_interrupt:
return self._play_audio_with_interrupt(full_audio, check_interrupt)
return self._play_audio_blocking(full_audio)
def _speak_mixed(
self, segments: list[tuple[str, str]], check_interrupt=None
) -> bool:
"""Озвучивание текста с переключением RU/EN по сегментам."""
if self._count_language_switches(segments) >= _MIXED_TTS_BUFFERED_SWITCHES:
return self._speak_mixed_buffered(
segments, check_interrupt=check_interrupt
)
for segment, lang in segments:
if not segment.strip():
continue
@@ -390,6 +474,7 @@ class TextToSpeech:
return
except Exception:
pass
time.sleep(_INTERRUPT_POLL_SECONDS)
def _play_with_interrupt_sounddevice(
self, audio_np: np.ndarray, check_interrupt
@@ -407,11 +492,18 @@ class TextToSpeech:
# Запускаем воспроизведение (неблокирующее)
sd.play(audio_np, self.sample_rate)
# Ждем окончания воспроизведения в цикле
while sd.get_stream().active:
# Ждем окончания воспроизведения в цикле.
while True:
if self._interrupted:
break
time.sleep(0.02) # Уменьшаем задержку для более быстрого реагирования
stream = sd.get_stream()
if stream is None or not stream.active:
break
time.sleep(0.02)
if not self._interrupted:
# Добираем хвост буфера даже если stream.active мигнул в False чуть раньше.
sd.wait()
finally:
# Сообщаем потоку-наблюдателю, что пора завершаться