first commit

This commit is contained in:
2026-01-02 20:26:44 +03:00
commit 51ed78078b
14 changed files with 841 additions and 0 deletions

39
.gitignore vendored Normal file
View File

@@ -0,0 +1,39 @@
# Python
__pycache__/
*.py[cod]
*$py.class
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Distribution / packaging
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Vosk models
vosk-model-*/
# PyCharm
.idea/
# VS Code
.vscode/

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
A copy of license terms is available at https://picovoice.ai/docs/terms-of-use/

1
LICENSE.txt Executable file
View File

@@ -0,0 +1 @@
A copy of license terms is available at https://picovoice.ai/docs/terms-of-use/

67
ai.py Normal file
View File

@@ -0,0 +1,67 @@
"""
AI module for Perplexity API integration.
Sends user queries and receives AI responses.
"""
import requests
from config import PERPLEXITY_API_KEY, PERPLEXITY_MODEL, PERPLEXITY_API_URL
# System prompt for the AI
SYSTEM_PROMPT = """Ты — голосовой ассистент умной колонки.
Отвечай кратко, по существу, на русском языке.
Избегай длинных списков и сложного форматирования.
Твои ответы будут озвучены голосом, поэтому пиши естественным разговорным языком."""
def ask_ai(user_message: str) -> str:
"""
Send a message to Perplexity AI and get a response.
Args:
user_message: User's question or command
Returns:
AI response text
"""
if not user_message.strip():
return "Извините, я не расслышал вашу команду."
print(f"🤖 Запрос к AI: {user_message}")
headers = {
"Authorization": f"Bearer {PERPLEXITY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": PERPLEXITY_MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_message}
],
"max_tokens": 500,
"temperature": 0.7
}
try:
response = requests.post(
PERPLEXITY_API_URL,
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
data = response.json()
ai_response = data["choices"][0]["message"]["content"]
print(f"💬 Ответ AI: {ai_response[:100]}...")
return ai_response
except requests.exceptions.Timeout:
return "Извините, сервер не отвечает. Попробуйте позже."
except requests.exceptions.RequestException as e:
print(f"❌ Ошибка API: {e}")
return "Произошла ошибка при обращении к AI. Попробуйте ещё раз."
except (KeyError, IndexError) as e:
print(f"❌ Ошибка парсинга ответа: {e}")
return "Не удалось обработать ответ от AI."

72
cleaner.py Normal file
View File

@@ -0,0 +1,72 @@
"""
Response cleaner module.
Removes markdown formatting and special characters from AI responses.
"""
import re
def clean_response(text: str) -> str:
"""
Clean AI response from markdown formatting and special characters.
Args:
text: Raw AI response with possible markdown
Returns:
Clean text suitable for TTS
"""
if not text:
return ""
# Remove citation references like [1], [2], [citation], etc.
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\[citation\s*needed\]', '', text, flags=re.IGNORECASE)
text = re.sub(r'\[source\]', '', text, flags=re.IGNORECASE)
# Remove markdown bold **text** and __text__
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'__(.+?)__', r'\1', text)
# Remove markdown italic *text* and _text_
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'(?<!\w)_(.+?)_(?!\w)', r'\1', text)
# Remove markdown strikethrough ~~text~~
text = re.sub(r'~~(.+?)~~', r'\1', text)
# Remove markdown headers # ## ### etc.
text = re.sub(r'^#{1,6}\s*', '', text, flags=re.MULTILINE)
# Remove markdown links [text](url) -> text
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
# Remove markdown images ![alt](url)
text = re.sub(r'!\[([^\]]*)\]\([^)]+\)', '', text)
# Remove inline code `code`
text = re.sub(r'`([^`]+)`', r'\1', text)
# Remove code blocks ```code```
text = re.sub(r'```[\s\S]*?```', '', text)
# Remove markdown list markers (-, *, +, numbered)
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
# Remove blockquotes
text = re.sub(r'^\s*>\s*', '', text, flags=re.MULTILINE)
# Remove horizontal rules
text = re.sub(r'^[-*_]{3,}\s*$', '', text, flags=re.MULTILINE)
# Remove HTML tags if any
text = re.sub(r'<[^>]+>', '', text)
# Remove extra whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' +', ' ', text)
# Clean up and return
text = text.strip()
return text

33
config.py Normal file
View File

@@ -0,0 +1,33 @@
"""
Configuration module for smart speaker.
Loads environment variables from .env file.
"""
import os
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Base paths
BASE_DIR = Path(__file__).parent
# Perplexity API configuration
PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_API_KEY")
PERPLEXITY_MODEL = os.getenv("PERPLEXITY_MODEL", "llama-3.1-sonar-small-online")
PERPLEXITY_API_URL = "https://api.perplexity.ai/chat/completions"
# Porcupine configuration
PORCUPINE_ACCESS_KEY = os.getenv("PORCUPINE_ACCESS_KEY")
PORCUPINE_KEYWORD_PATH = BASE_DIR / "Alexandr_en_linux_v4_0_0.ppn"
# Vosk configuration
VOSK_MODEL_PATH = BASE_DIR / "vosk-model-ru-0.42"
# Audio configuration
SAMPLE_RATE = 16000
CHANNELS = 1
# TTS configuration
TTS_SPEAKER = "xenia" # Available: aidar, baya, kseniya, xenia, eugene
TTS_SAMPLE_RATE = 48000

119
main.py Normal file
View File

@@ -0,0 +1,119 @@
"""
Smart Speaker - Main Application
Голосовой ассистент с wake word detection, STT, AI и TTS.
Flow:
1. Wait for wake word ("Alexandr")
2. Listen to user speech (STT)
3. Send query to AI (Perplexity)
4. Clean response from markdown
5. Speak response (TTS)
6. Loop back to step 1
"""
import signal
import sys
from wakeword import wait_for_wakeword, cleanup as cleanup_wakeword, check_wakeword_once
from stt import listen, cleanup as cleanup_stt
from ai import ask_ai
from cleaner import clean_response
from tts import speak, initialize as init_tts
from sound_level import set_volume, parse_volume_text
def signal_handler(sig, frame):
"""Handle Ctrl+C gracefully."""
print("\n\n👋 Завершение работы...")
cleanup_wakeword()
cleanup_stt()
sys.exit(0)
def main():
"""Main application loop."""
print("=" * 50)
print("🔊 УМНАЯ КОЛОНКА")
print("=" * 50)
print("Скажите 'Alexandr' для активации")
print("Нажмите Ctrl+C для выхода")
print("=" * 50)
print()
# Setup signal handler for graceful exit
signal.signal(signal.SIGINT, signal_handler)
# Pre-initialize TTS model (takes a few seconds)
print("⏳ Инициализация...")
init_tts()
print()
# Main loop
skip_wakeword = False
while True:
try:
# Step 1: Wait for wake word
if not skip_wakeword:
wait_for_wakeword()
skip_wakeword = False
# Step 2: Listen to user speech
user_text = listen(timeout_seconds=7.0)
if not user_text:
speak("Извините, я вас не расслышал. Попробуйте ещё раз.")
continue
# Check for volume command
if user_text.lower().startswith("громкость"):
try:
# Remove "громкость" prefix and strip whitespace
vol_str = user_text.lower().replace("громкость", "", 1).strip()
# Try to parse the number
level = parse_volume_text(vol_str)
if level is not None:
if set_volume(level):
speak(f"Громкость установлена на {level}")
else:
speak("Не удалось установить громкость.")
else:
speak("Я не понял число громкости. Скажите число от одного до десяти.")
continue
except Exception as e:
print(f"❌ Ошибка громкости: {e}")
speak("Не удалось изменить громкость.")
continue
# Step 3: Send to AI
ai_response = ask_ai(user_text)
# Step 4: Clean response
clean_text = clean_response(ai_response)
# Step 5: Speak response (with wake word interrupt support)
completed = speak(clean_text, check_interrupt=check_wakeword_once)
# If interrupted by wake word, go back to waiting for wake word
if not completed:
print("⏹️ Ответ прерван - слушаю следующий вопрос")
skip_wakeword = True
continue
print()
print("-" * 30)
print()
# Step 6: Loop continues...
except KeyboardInterrupt:
signal_handler(None, None)
except Exception as e:
print(f"❌ Ошибка: {e}")
speak("Произошла ошибка. Попробуйте ещё раз.")
if __name__ == "__main__":
main()

26
requirements.txt Normal file
View File

@@ -0,0 +1,26 @@
# Smart Speaker Dependencies
# Python 3.12.8
# Wake word detection
pvporcupine>=3.0.0
# Speech-to-Text
vosk>=0.3.45
# Audio
pyaudio>=0.2.14
sounddevice>=0.4.6
# AI API
requests>=2.31.0
# Environment
python-dotenv>=1.0.0
# TTS (Silero)
torch>=2.0.0
torchaudio>=2.0.0
omegaconf>=2.3.0
# Utils
numpy>=1.24.0

70
sound_level.py Normal file
View File

@@ -0,0 +1,70 @@
"""
Volume control module.
Regulates system volume on a scale from 1 to 10.
"""
import subprocess
import re
NUMBER_MAP = {
"один": 1, "раз": 1, "два": 2, "три": 3, "четыре": 4,
"пять": 5, "шесть": 6, "семь": 7, "восемь": 8, "девять": 9, "десять": 10
}
def set_volume(level: int) -> bool:
"""
Set system volume (1-10 corresponding to 10%-100%).
Args:
level: Integer between 1 and 10
Returns:
True if successful, False otherwise
"""
if not isinstance(level, int):
print(f"❌ Ошибка: Уровень громкости должен быть целым числом, получено {type(level)}")
return False
if level < 1:
level = 1
elif level > 10:
level = 10
percentage = level * 10
try:
# Set volume using amixer
# -q: quiet
# sset: set simple control
# Master: control name
# %: percentage
cmd = ["amixer", "-q", "sset", "Master", f"{percentage}%"]
subprocess.run(cmd, check=True)
print(f"🔊 Громкость установлена на {level} ({percentage}%)")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Ошибка при установке громкости: {e}")
return False
except Exception as e:
print(f"❌ Неизвестная ошибка громкости: {e}")
return False
def parse_volume_text(text: str) -> int | None:
"""
Parse volume level from text (digits or Russian words).
Returns integer 1-10 or None if not found.
"""
text = text.lower()
# 1. Check for digits
num_match = re.search(r'\b(10|[1-9])\b', text)
if num_match:
return int(num_match.group())
# 2. Check for words
for word, value in NUMBER_MAP.items():
if word in text:
return value
return None

122
stt.py Normal file
View File

@@ -0,0 +1,122 @@
"""
Speech-to-Text module using Vosk.
Recognizes Russian speech from microphone.
"""
import json
import pyaudio
from vosk import Model, KaldiRecognizer
from config import VOSK_MODEL_PATH, SAMPLE_RATE
class SpeechRecognizer:
"""Speech recognizer using Vosk."""
def __init__(self):
self.model = None
self.recognizer = None
self.pa = None
self.stream = None
def initialize(self):
"""Initialize Vosk model and audio stream."""
print("📦 Загрузка модели Vosk...")
self.model = Model(str(VOSK_MODEL_PATH))
self.recognizer = KaldiRecognizer(self.model, SAMPLE_RATE)
self.recognizer.SetWords(True)
self.pa = pyaudio.PyAudio()
self.stream = self.pa.open(
rate=SAMPLE_RATE,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=4096
)
print("✅ Модель Vosk загружена")
def listen(self, timeout_seconds: float = 5.0) -> str:
"""
Listen to microphone and transcribe speech.
Args:
timeout_seconds: Maximum time to listen for speech
Returns:
Transcribed text from speech
"""
if not self.model:
self.initialize()
print("🎙️ Слушаю... (говорите)")
# Reset recognizer for new recognition
self.recognizer = KaldiRecognizer(self.model, SAMPLE_RATE)
frames_to_read = int(SAMPLE_RATE * timeout_seconds / 4096)
silence_frames = 0
max_silence_frames = 10 # About 2.5 seconds of silence
for _ in range(frames_to_read):
data = self.stream.read(4096, exception_on_overflow=False)
if self.recognizer.AcceptWaveform(data):
result = json.loads(self.recognizer.Result())
text = result.get("text", "").strip()
if text:
print(f"📝 Распознано: {text}")
return text
silence_frames += 1
else:
# Check partial result
partial = json.loads(self.recognizer.PartialResult())
if partial.get("partial", ""):
silence_frames = 0
else:
silence_frames += 1
# Stop if too much silence after speech
if silence_frames > max_silence_frames:
break
# Get final result
result = json.loads(self.recognizer.FinalResult())
text = result.get("text", "").strip()
if text:
print(f"📝 Распознано: {text}")
else:
print("⚠️ Речь не распознана")
return text
def cleanup(self):
"""Release resources."""
if self.stream:
self.stream.close()
if self.pa:
self.pa.terminate()
# Global instance
_recognizer = None
def get_recognizer() -> SpeechRecognizer:
"""Get or create speech recognizer instance."""
global _recognizer
if _recognizer is None:
_recognizer = SpeechRecognizer()
return _recognizer
def listen(timeout_seconds: float = 5.0) -> str:
"""Listen to microphone and return transcribed text."""
return get_recognizer().listen(timeout_seconds)
def cleanup():
"""Cleanup recognizer resources."""
global _recognizer
if _recognizer:
_recognizer.cleanup()
_recognizer = None

178
tts.py Normal file
View File

@@ -0,0 +1,178 @@
"""
Text-to-Speech module using Silero TTS.
Generates natural Russian speech with Xenia voice.
Supports interruption via wake word detection using threading.
"""
import torch
import sounddevice as sd
import numpy as np
import threading
import time
from config import TTS_SPEAKER, TTS_SAMPLE_RATE
class TextToSpeech:
"""Text-to-Speech using Silero TTS with wake word interruption support."""
def __init__(self):
self.model = None
self.sample_rate = TTS_SAMPLE_RATE
self.speaker = TTS_SPEAKER
self._interrupted = False
self._stop_flag = threading.Event()
def initialize(self):
"""Initialize Silero TTS model."""
print("📦 Загрузка модели Silero TTS...")
# Load Silero TTS model
self.model, _ = torch.hub.load(
repo_or_dir='snakers4/silero-models',
model='silero_tts',
language='ru',
speaker='v4_ru'
)
print(f"✅ Модель TTS загружена (голос: {self.speaker})")
def speak(self, text: str, check_interrupt=None) -> bool:
"""
Convert text to speech and play it.
Args:
text: Text to synthesize and speak
check_interrupt: Optional callback function that returns True if playback should stop
Returns:
True if playback completed normally, False if interrupted
"""
if not text.strip():
return True
if not self.model:
self.initialize()
print(f"🔊 Озвучивание: {text[:50]}...")
self._interrupted = False
self._stop_flag.clear()
try:
# Generate audio
audio = self.model.apply_tts(
text=text,
speaker=self.speaker,
sample_rate=self.sample_rate
)
# Convert to numpy array
audio_np = audio.numpy()
if check_interrupt:
# Play with interrupt checking in parallel thread
return self._play_with_interrupt(audio_np, check_interrupt)
else:
# Standard playback
sd.play(audio_np, self.sample_rate)
sd.wait()
print("✅ Воспроизведение завершено")
return True
except Exception as e:
print(f"❌ Ошибка TTS: {e}")
return False
def _check_interrupt_worker(self, check_interrupt):
"""
Worker thread that continuously checks for interrupt signal.
"""
while not self._stop_flag.is_set():
try:
if check_interrupt():
self._interrupted = True
sd.stop()
print("⏹️ Воспроизведение прервано!")
return
except Exception:
pass
def _play_with_interrupt(self, audio_np: np.ndarray, check_interrupt) -> bool:
"""
Play audio with interrupt checking in parallel thread.
Args:
audio_np: Audio data as numpy array
check_interrupt: Callback that returns True if should interrupt
Returns:
True if completed normally, False if interrupted
"""
# Start interrupt checker thread
checker_thread = threading.Thread(
target=self._check_interrupt_worker,
args=(check_interrupt,),
daemon=True
)
checker_thread.start()
try:
# Play audio (non-blocking start)
sd.play(audio_np, self.sample_rate)
# Wait for playback to finish or interrupt
while sd.get_stream().active:
if self._interrupted:
break
time.sleep(0.05)
finally:
# Signal checker thread to stop
self._stop_flag.set()
checker_thread.join(timeout=0.5)
if self._interrupted:
return False
print("✅ Воспроизведение завершено")
return True
@property
def was_interrupted(self) -> bool:
"""Check if the last playback was interrupted."""
return self._interrupted
# Global instance
_tts = None
def get_tts() -> TextToSpeech:
"""Get or create TTS instance."""
global _tts
if _tts is None:
_tts = TextToSpeech()
return _tts
def speak(text: str, check_interrupt=None) -> bool:
"""
Synthesize and speak the given text.
Args:
text: Text to speak
check_interrupt: Optional callback for interrupt checking
Returns:
True if completed normally, False if interrupted
"""
return get_tts().speak(text, check_interrupt)
def was_interrupted() -> bool:
"""Check if the last speak() call was interrupted."""
return get_tts().was_interrupted
def initialize():
"""Pre-initialize TTS model."""
get_tts().initialize()

113
wakeword.py Normal file
View File

@@ -0,0 +1,113 @@
"""
Wake word detection module using Porcupine.
Listens for the "Alexandr" wake word.
"""
import pvporcupine
import pyaudio
import struct
from config import PORCUPINE_ACCESS_KEY, PORCUPINE_KEYWORD_PATH
class WakeWordDetector:
"""Detects wake word using Porcupine."""
def __init__(self):
self.porcupine = None
self.audio_stream = None
self.pa = None
def initialize(self):
"""Initialize Porcupine and audio stream."""
self.porcupine = pvporcupine.create(
access_key=PORCUPINE_ACCESS_KEY,
keyword_paths=[str(PORCUPINE_KEYWORD_PATH)]
)
self.pa = pyaudio.PyAudio()
self.audio_stream = self.pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length
)
print("🎤 Ожидание wake word 'Alexandr'...")
def wait_for_wakeword(self) -> bool:
"""
Blocks until wake word is detected.
Returns True when wake word is detected.
"""
if not self.porcupine:
self.initialize()
while True:
pcm = self.audio_stream.read(self.porcupine.frame_length, exception_on_overflow=False)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
print("✅ Wake word обнаружен!")
return True
def check_wakeword_once(self) -> bool:
"""
Non-blocking check for wake word.
Returns True if wake word detected, False otherwise.
"""
if not self.porcupine:
self.initialize()
try:
pcm = self.audio_stream.read(self.porcupine.frame_length, exception_on_overflow=False)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
print("🛑 Wake word обнаружен во время ответа!")
return True
return False
except Exception:
return False
def cleanup(self):
"""Release resources."""
if self.audio_stream:
self.audio_stream.close()
if self.pa:
self.pa.terminate()
if self.porcupine:
self.porcupine.delete()
# Global instance
_detector = None
def get_detector() -> WakeWordDetector:
"""Get or create wake word detector instance."""
global _detector
if _detector is None:
_detector = WakeWordDetector()
return _detector
def wait_for_wakeword() -> bool:
"""Wait for wake word detection."""
return get_detector().wait_for_wakeword()
def cleanup():
"""Cleanup detector resources."""
global _detector
if _detector:
_detector.cleanup()
_detector = None
def check_wakeword_once() -> bool:
"""
Non-blocking check for wake word.
Returns True if wake word detected, False otherwise.
"""
return get_detector().check_wakeword_once()