""" Text-to-Speech module using Silero TTS. Generates natural Russian speech with Xenia voice. Supports interruption via wake word detection using threading. """ import torch import sounddevice as sd import numpy as np import threading import time import warnings import re from config import TTS_SPEAKER, TTS_SAMPLE_RATE # Suppress Silero TTS warning about text length warnings.filterwarnings("ignore", message="Text string is longer than 1000 symbols") class TextToSpeech: """Text-to-Speech using Silero TTS with wake word interruption support.""" def __init__(self): self.model = None self.sample_rate = TTS_SAMPLE_RATE self.speaker = TTS_SPEAKER self._interrupted = False self._stop_flag = threading.Event() def initialize(self): """Initialize Silero TTS model.""" print("📦 Загрузка модели Silero TTS v5...") # Load Silero TTS model device = torch.device('cpu') self.model, _ = torch.hub.load( repo_or_dir="snakers4/silero-models", model="silero_tts", language="ru", speaker="v5_ru", ) self.model.to(device) print(f"✅ Модель TTS v5 загружена (голос: {self.speaker})") def _split_text(self, text: str, max_length: int = 900) -> list[str]: """Split text into chunks smaller than max_length.""" if len(text) <= max_length: return [text] chunks = [] # Split by sentence endings, keeping the punctuation # pattern matches [.!?] followed by optional newlines parts = re.split(r"([.!?]+\s*)", text) current_chunk = "" # Reconstruct sentences. re.split with groups returns [text, delimiter, text, delimiter...] # We iterate through parts. If part is a delimiter (matches pattern), we append to previous text. for part in parts: # If the part combined with current_chunk exceeds max_length, save current_chunk if len(current_chunk) + len(part) > max_length: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = "" current_chunk += part # If even a single part is too big (very long sentence without punctuation), force split while len(current_chunk) > max_length: # Try to split by space split_idx = current_chunk.rfind(" ", 0, max_length) if split_idx == -1: # No space found, hard cut split_idx = max_length chunks.append(current_chunk[:split_idx].strip()) current_chunk = current_chunk[split_idx:].lstrip() if current_chunk: chunks.append(current_chunk.strip()) # Filter empty chunks return [c for c in chunks if c] def speak(self, text: str, check_interrupt=None) -> bool: """ Convert text to speech and play it. Args: text: Text to synthesize and speak check_interrupt: Optional callback function that returns True if playback should stop Returns: True if playback completed normally, False if interrupted """ if not text.strip(): return True if not self.model: self.initialize() # Split text into manageable chunks chunks = self._split_text(text) total_chunks = len(chunks) if total_chunks > 1: print(f"🔊 Озвучивание (частей: {total_chunks}): {text[:50]}...") else: print(f"🔊 Озвучивание: {text[:50]}...") self._interrupted = False self._stop_flag.clear() success = True for i, chunk in enumerate(chunks): if self._interrupted: break try: # Generate audio for chunk audio = self.model.apply_tts( text=chunk, speaker=self.speaker, sample_rate=self.sample_rate ) # Convert to numpy array audio_np = audio.numpy() if check_interrupt: # Play with interrupt checking in parallel thread if not self._play_with_interrupt(audio_np, check_interrupt): success = False break else: # Standard playback sd.play(audio_np, self.sample_rate) sd.wait() except Exception as e: print(f"❌ Ошибка TTS (часть {i + 1}/{total_chunks}): {e}") success = False # Continue with next chunk? or break? # Usually if one fails, we might want to try others, but for "too long" error # splitting should solve it. If it fails for other reasons, maybe better to stop. # But let's keep trying subsequent chunks in case it's a specific symbol issue. if success and not self._interrupted: print("✅ Воспроизведение завершено") return True elif self._interrupted: return False else: return False def _check_interrupt_worker(self, check_interrupt): """ Worker thread that continuously checks for interrupt signal. """ while not self._stop_flag.is_set(): try: if check_interrupt(): self._interrupted = True sd.stop() print("⏹️ Воспроизведение прервано!") return except Exception: pass def _play_with_interrupt(self, audio_np: np.ndarray, check_interrupt) -> bool: """ Play audio with interrupt checking in parallel thread. Args: audio_np: Audio data as numpy array check_interrupt: Callback that returns True if should interrupt Returns: True if completed normally, False if interrupted """ # Start interrupt checker thread checker_thread = threading.Thread( target=self._check_interrupt_worker, args=(check_interrupt,), daemon=True ) checker_thread.start() try: # Play audio (non-blocking start) sd.play(audio_np, self.sample_rate) # Wait for playback to finish or interrupt while sd.get_stream().active: if self._interrupted: break time.sleep(0.05) finally: # Signal checker thread to stop self._stop_flag.set() checker_thread.join(timeout=0.5) if self._interrupted: return False return True @property def was_interrupted(self) -> bool: """Check if the last playback was interrupted.""" return self._interrupted # Global instance _tts = None def get_tts() -> TextToSpeech: """Get or create TTS instance.""" global _tts if _tts is None: _tts = TextToSpeech() return _tts def speak(text: str, check_interrupt=None) -> bool: """ Synthesize and speak the given text. Args: text: Text to speak check_interrupt: Optional callback for interrupt checking Returns: True if completed normally, False if interrupted """ return get_tts().speak(text, check_interrupt) def was_interrupted() -> bool: """Check if the last speak() call was interrupted.""" return get_tts().was_interrupted def initialize(): """Pre-initialize TTS model.""" get_tts().initialize()