ускоренная работа
This commit is contained in:
@@ -13,7 +13,7 @@ import sys
|
||||
import json
|
||||
import pyaudio
|
||||
from vosk import Model, KaldiRecognizer
|
||||
from ..core.config import VOSK_MODEL_PATH, SAMPLE_RATE
|
||||
from config import VOSK_MODEL_PATH, SAMPLE_RATE
|
||||
|
||||
|
||||
class LocalRecognizer:
|
||||
|
||||
@@ -20,6 +20,7 @@ from deepgram import (
|
||||
)
|
||||
import deepgram.clients.common.v1.abstract_sync_websocket as sdk_ws
|
||||
import websockets.sync.client
|
||||
from ..core.audio_manager import get_audio_manager
|
||||
|
||||
# --- Патч (исправление) для библиотеки websockets ---
|
||||
# По умолчанию Deepgram SDK использует слишком короткий таймаут подключения.
|
||||
@@ -63,7 +64,7 @@ class SpeechRecognizer:
|
||||
)
|
||||
self.dg_client = DeepgramClient(DEEPGRAM_API_KEY, config)
|
||||
|
||||
self.pa = pyaudio.PyAudio()
|
||||
self.pa = get_audio_manager().get_pyaudio()
|
||||
print("✅ Deepgram клиент готов")
|
||||
|
||||
def _get_stream(self):
|
||||
@@ -135,38 +136,71 @@ class SpeechRecognizer:
|
||||
channels=1,
|
||||
sample_rate=SAMPLE_RATE,
|
||||
interim_results=True,
|
||||
utterance_end_ms=1200, # Пауза 1.2с считается концом фразы
|
||||
utterance_end_ms=1000, # Пауза 1.0с считается концом фразы (было 1.2)
|
||||
vad_events=True,
|
||||
)
|
||||
|
||||
if dg_connection.start(options) is False:
|
||||
print("Failed to start Deepgram connection")
|
||||
return
|
||||
|
||||
# --- Задача отправки аудио ---
|
||||
# --- Задача отправки аудио с буферизацией ---
|
||||
async def send_audio():
|
||||
chunks_sent = 0
|
||||
audio_buffer = [] # Буфер для накопления звука во время подключения
|
||||
|
||||
try:
|
||||
# 1. Сразу начинаем захват звука, не дожидаясь сети!
|
||||
stream.start_stream()
|
||||
print("🎤 Stream started, sending audio...")
|
||||
print("🎤 Stream started (buffering)...")
|
||||
|
||||
# 2. Запускаем подключение к Deepgram в фоне (через ThreadPool, т.к. start() блокирующий)
|
||||
# Но в данном SDK start() возвращает bool, он может быть блокирующим.
|
||||
# Deepgram Python SDK v3+ start() делает handshake.
|
||||
|
||||
connect_future = loop.run_in_executor(
|
||||
None, lambda: dg_connection.start(options)
|
||||
)
|
||||
|
||||
# Пока подключаемся, копим данные
|
||||
while not connect_future.done():
|
||||
if stream.is_active():
|
||||
data = stream.read(4096, exception_on_overflow=False)
|
||||
audio_buffer.append(data)
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
# Проверяем результат подключения
|
||||
if connect_future.result() is False:
|
||||
print("Failed to start Deepgram connection")
|
||||
return
|
||||
|
||||
print(f"🚀 Connected! Sending buffer ({len(audio_buffer)} chunks)...")
|
||||
|
||||
# 3. Отправляем накопленный буфер
|
||||
for chunk in audio_buffer:
|
||||
dg_connection.send(chunk)
|
||||
chunks_sent += 1
|
||||
|
||||
audio_buffer = None # Освобождаем память
|
||||
|
||||
# 4. Продолжаем стримить в реальном времени
|
||||
while not stop_event.is_set():
|
||||
if stream.is_active():
|
||||
data = stream.read(4096, exception_on_overflow=False)
|
||||
# Отправка данных (синхронная в этой версии SDK)
|
||||
dg_connection.send(data)
|
||||
chunks_sent += 1
|
||||
if chunks_sent % 50 == 0:
|
||||
print(f".", end="", flush=True)
|
||||
# Уступаем время другим задачам
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Audio send error: {e}")
|
||||
finally:
|
||||
stream.stop_stream()
|
||||
if stream.is_active():
|
||||
stream.stop_stream()
|
||||
print(f"\n🛑 Stream stopped. Chunks sent: {chunks_sent}")
|
||||
|
||||
sender_task = asyncio.create_task(send_audio())
|
||||
|
||||
if False: # dg_connection.start(options) перенесен внутрь send_audio
|
||||
pass
|
||||
|
||||
try:
|
||||
# 1. Ждем начала речи (если задан detection_timeout)
|
||||
if detection_timeout:
|
||||
@@ -254,8 +288,7 @@ class SpeechRecognizer:
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
if self.pa:
|
||||
self.pa.terminate()
|
||||
# self.pa.terminate() - Используем общий менеджер
|
||||
|
||||
|
||||
# Глобальный экземпляр
|
||||
|
||||
@@ -10,6 +10,7 @@ import pvporcupine
|
||||
import pyaudio
|
||||
import struct
|
||||
from ..core.config import PORCUPINE_ACCESS_KEY, PORCUPINE_KEYWORD_PATH
|
||||
from ..core.audio_manager import get_audio_manager
|
||||
|
||||
|
||||
class WakeWordDetector:
|
||||
@@ -28,7 +29,8 @@ class WakeWordDetector:
|
||||
access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[str(PORCUPINE_KEYWORD_PATH)]
|
||||
)
|
||||
|
||||
self.pa = pyaudio.PyAudio()
|
||||
# Используем общий экземпляр PyAudio
|
||||
self.pa = get_audio_manager().get_pyaudio()
|
||||
self._open_stream()
|
||||
print("🎤 Ожидание wake word 'Alexandr'...")
|
||||
|
||||
@@ -138,8 +140,7 @@ class WakeWordDetector:
|
||||
def cleanup(self):
|
||||
"""Освобождение ресурсов при выходе."""
|
||||
self.stop_monitoring()
|
||||
if self.pa:
|
||||
self.pa.terminate()
|
||||
# self.pa.terminate() - Не делаем этого, так как PyAudio общий
|
||||
if self.porcupine:
|
||||
self.porcupine.delete()
|
||||
|
||||
|
||||
@@ -93,6 +93,63 @@ def ask_ai(messages_history: list) -> str:
|
||||
return response
|
||||
|
||||
|
||||
def ask_ai_stream(messages_history: list):
|
||||
"""
|
||||
Generator that yields chunks of the AI response as they arrive.
|
||||
"""
|
||||
if not messages_history:
|
||||
yield "Извините, я не расслышал вашу команду."
|
||||
return
|
||||
|
||||
# Log the last user message
|
||||
last_user_message = "Unknown"
|
||||
for msg in reversed(messages_history):
|
||||
if msg["role"] == "user":
|
||||
last_user_message = msg["content"]
|
||||
break
|
||||
print(f"🤖 Запрос к AI (Stream): {last_user_message}")
|
||||
|
||||
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history)
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {PERPLEXITY_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
payload = {
|
||||
"model": PERPLEXITY_MODEL,
|
||||
"messages": messages,
|
||||
"max_tokens": 500,
|
||||
"temperature": 1.0,
|
||||
"stream": True, # Enable streaming
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
PERPLEXITY_API_URL, headers=headers, json=payload, timeout=30, stream=True
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
import json
|
||||
|
||||
for line in response.iter_lines():
|
||||
if line:
|
||||
line_text = line.decode("utf-8")
|
||||
if line_text.startswith("data: "):
|
||||
data_str = line_text[6:] # Skip "data: "
|
||||
if data_str == "[DONE]":
|
||||
break
|
||||
try:
|
||||
data_json = json.loads(data_str)
|
||||
content = data_json["choices"][0]["delta"].get("content", "")
|
||||
if content:
|
||||
yield content
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"❌ Streaming Error: {e}")
|
||||
yield "Произошла ошибка связи."
|
||||
|
||||
|
||||
def translate_text(text: str, source_lang: str, target_lang: str) -> str:
|
||||
"""
|
||||
Запрос к AI в режиме перевода.
|
||||
|
||||
27
app/core/audio_manager.py
Normal file
27
app/core/audio_manager.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import pyaudio
|
||||
import threading
|
||||
|
||||
|
||||
class AudioManager:
|
||||
_instance = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super(AudioManager, cls).__new__(cls)
|
||||
cls._instance.pa = pyaudio.PyAudio()
|
||||
print("🔊 AudioManager: PyAudio initialized (Global)")
|
||||
return cls._instance
|
||||
|
||||
def get_pyaudio(self):
|
||||
return self.pa
|
||||
|
||||
def cleanup(self):
|
||||
if self.pa:
|
||||
self.pa.terminate()
|
||||
self.pa = None
|
||||
|
||||
|
||||
def get_audio_manager():
|
||||
return AudioManager()
|
||||
@@ -44,6 +44,14 @@ PREPOSITION_CASES = {
|
||||
"перед": "ablt",
|
||||
"за": "ablt",
|
||||
"между": "ablt",
|
||||
"около": "gent",
|
||||
"против": "gent",
|
||||
"вместо": "gent",
|
||||
"кроме": "gent",
|
||||
"из-за": "gent",
|
||||
"сквозь": "accs",
|
||||
"через": "accs",
|
||||
"про": "accs",
|
||||
}
|
||||
|
||||
# Соответствие падежей pymorphy и библиотеки num2words
|
||||
@@ -60,6 +68,13 @@ PYMORPHY_TO_NUM2WORDS = {
|
||||
"loc2": "prepositional",
|
||||
}
|
||||
|
||||
# Соответствие родов pymorphy и num2words
|
||||
PYMORPHY_TO_GENDER = {
|
||||
"masc": "m",
|
||||
"femn": "f",
|
||||
"neut": "n",
|
||||
}
|
||||
|
||||
# Названия месяцев в родительном падеже (для поиска дат в тексте)
|
||||
MONTHS_GENITIVE = [
|
||||
"января",
|
||||
@@ -123,6 +138,12 @@ def numbers_to_words(text: str) -> str:
|
||||
|
||||
nw_case = PYMORPHY_TO_NUM2WORDS.get(case_tag, "nominative")
|
||||
|
||||
# FIX: Pymorphy часто определяет "год" как accs (винительный), что для num2words
|
||||
# превращается в родительный (для одушевленных?), давая "2024 года".
|
||||
# Если предлога нет, принудительно ставим именительный.
|
||||
if not prep and year_word.lower().startswith("год"):
|
||||
nw_case = "nominative"
|
||||
|
||||
# Конвертируем число в порядковое числительное (тысяча девятьсот девяносто девятом)
|
||||
words = convert_number(
|
||||
year_str, context_type="ordinal", case=nw_case, gender="m"
|
||||
@@ -171,9 +192,9 @@ def numbers_to_words(text: str) -> str:
|
||||
prefix = f"{prep} " if prep else ""
|
||||
return f"{prefix}{words} {month_word}"
|
||||
|
||||
# Конкатенация regex для месяцев (ВАЖНО: month_regex должен быть вставлен в строку)
|
||||
# Конкатенация regex для месяцев (FIX: используем f-строку)
|
||||
text = re.sub(
|
||||
r"(?i)\b((?:с|к|до|от|на|по)\s+)?(\d{1,2})\s+({month_regex})\b",
|
||||
rf"(?i)\b((?:с|к|до|от|на|по)\s+)?(\d{{1,2}})\s+({month_regex})\b",
|
||||
replace_date_match,
|
||||
text,
|
||||
)
|
||||
@@ -182,20 +203,41 @@ def numbers_to_words(text: str) -> str:
|
||||
def replace_cardinal_match(match):
|
||||
prep = match.group(1)
|
||||
num_str = match.group(2)
|
||||
next_word = match.group(3)
|
||||
|
||||
case = "nominative"
|
||||
gender = "m"
|
||||
|
||||
if prep:
|
||||
morph_case = get_case_from_preposition(prep.strip())
|
||||
if morph_case:
|
||||
case = PYMORPHY_TO_NUM2WORDS.get(morph_case, "nominative")
|
||||
|
||||
words = convert_number(num_str, context_type="cardinal", case=case)
|
||||
# Если есть следующее слово, проверяем его род (для "2 минуты" -> "две")
|
||||
if next_word:
|
||||
word_clean = next_word.strip()
|
||||
parsed = morph.parse(word_clean)[0]
|
||||
if "NOUN" in parsed.tag:
|
||||
morph_gender = parsed.tag.gender
|
||||
gender = PYMORPHY_TO_GENDER.get(morph_gender, "m")
|
||||
|
||||
words = convert_number(
|
||||
num_str, context_type="cardinal", case=case, gender=gender
|
||||
)
|
||||
|
||||
# Если конвертация вернула пустую строку (сбой?), возвращаем цифры
|
||||
if not words:
|
||||
words = num_str
|
||||
|
||||
prefix = f"{prep} " if prep else ""
|
||||
# suffix removed (lookahead)
|
||||
return f"{prefix}{words}"
|
||||
|
||||
# Регулярка теперь захватывает (опционально) следующее слово для определения рода
|
||||
|
||||
preps_list = "|".join(map(re.escape, PREPOSITION_CASES.keys()))
|
||||
text = re.sub(
|
||||
r"(?i)\b((?:в|на|о|об|обо|при|у|от|до|из|с|со|без|для|вокруг|после|к|ко|по|над|под|перед|за|между)\s+)?(\d+(?:[.,]\d+)?)\b",
|
||||
rf"(?i)\b((?:{preps_list})\s+)?(\d+(?:[.,]\d+)?)(?=(\s+[а-яА-ЯёЁ]+))?\b",
|
||||
replace_cardinal_match,
|
||||
text,
|
||||
)
|
||||
@@ -234,13 +276,13 @@ def clean_response(text: str, language: str = "ru") -> str:
|
||||
# Удаление заголовков Markdown (# Header)
|
||||
text = re.sub(r"^#{1,6}\s*", "", text, flags=re.MULTILINE)
|
||||
|
||||
# Удаление картинок  -> удаляем полностью
|
||||
text = re.sub(r"!\x5B([^\x5D]*)\x5D\([^)]+\)", "", text)
|
||||
|
||||
# Удаление ссылок [text](url) -> оставляем только text
|
||||
# \x5B = [, \x5D = ]
|
||||
text = re.sub(r"\x5B([^\x5D]+)\x5D\([^)]+\)", r"\1", text)
|
||||
|
||||
# Удаление картинок  -> удаляем полностью
|
||||
text = re.sub(r"!\x5B([^\x5D]*)\x5D\([^)]+\)", "", text)
|
||||
|
||||
# Удаление inline кода `code`
|
||||
text = re.sub(r"`([^`]+)`", r"\1", text)
|
||||
|
||||
|
||||
131
app/main.py
131
app/main.py
@@ -16,8 +16,15 @@ Flow:
|
||||
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import queue
|
||||
import re
|
||||
import os
|
||||
from collections import deque
|
||||
|
||||
# Для воспроизведения звуков (mp3)
|
||||
from pygame import mixer
|
||||
|
||||
# Импорт наших модулей
|
||||
from .audio.wakeword import (
|
||||
wait_for_wakeword,
|
||||
@@ -26,7 +33,7 @@ from .audio.wakeword import (
|
||||
stop_monitoring as stop_wakeword_monitoring,
|
||||
)
|
||||
from .audio.stt import listen, cleanup as cleanup_stt, get_recognizer
|
||||
from .core.ai import ask_ai, translate_text
|
||||
from .core.ai import ask_ai, ask_ai_stream, translate_text
|
||||
from .core.cleaner import clean_response
|
||||
from .audio.tts import speak, initialize as init_tts
|
||||
from .audio.sound_level import set_volume, parse_volume_text
|
||||
@@ -124,8 +131,19 @@ def main():
|
||||
# Устанавливаем перехватчик Ctrl+C
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
# Предварительная инициализация моделей (занимает пару секунд при старте)
|
||||
# Предварительная инициализация моделей
|
||||
print("⏳ Инициализация моделей...")
|
||||
|
||||
# Инициализация звуковой системы для эффектов
|
||||
mixer.init()
|
||||
ding_sound_path = "assets/sounds/ding.wav"
|
||||
ding_sound = None
|
||||
if os.path.exists(ding_sound_path):
|
||||
ding_sound = mixer.Sound(ding_sound_path)
|
||||
ding_sound.set_volume(0.3)
|
||||
else:
|
||||
print(f"⚠️ Звук {ding_sound_path} не найден")
|
||||
|
||||
get_recognizer().initialize() # Подключение к Deepgram
|
||||
init_tts() # Загрузка нейросети для синтеза речи (Silero)
|
||||
alarm_clock = get_alarm_clock() # Загрузка будильников
|
||||
@@ -163,6 +181,10 @@ def main():
|
||||
if not detected:
|
||||
continue
|
||||
|
||||
# Воспроизводим звук активации
|
||||
if ding_sound:
|
||||
ding_sound.play()
|
||||
|
||||
# Фраза услышана! Слушаем команду пользователя (7 секунд тишины макс)
|
||||
user_text = listen(timeout_seconds=7.0)
|
||||
else:
|
||||
@@ -205,7 +227,8 @@ def main():
|
||||
]
|
||||
# Проверяем точное совпадение или если фраза начинается с "повтори" (но не "повтори за мной")
|
||||
if user_text_lower in repeat_phrases or (
|
||||
user_text_lower.startswith("повтори") and "за мной" not in user_text_lower
|
||||
user_text_lower.startswith("повтори")
|
||||
and "за мной" not in user_text_lower
|
||||
):
|
||||
if last_response:
|
||||
print(f"🔁 Повторяю: {last_response}")
|
||||
@@ -296,39 +319,103 @@ def main():
|
||||
print("⏹️ Перевод прерван - слушаю следующий вопрос")
|
||||
continue
|
||||
|
||||
# --- Шаг 3: Запрос к AI (обычный чат) ---
|
||||
# --- Шаг 3: Запрос к AI (Streaming) ---
|
||||
# Добавляем сообщение пользователя в историю
|
||||
chat_history.append({"role": "user", "content": user_text})
|
||||
|
||||
# Отправляем историю диалога в Perplexity
|
||||
ai_response = ask_ai(list(chat_history))
|
||||
# Очередь для предложений, которые нужно озвучить
|
||||
tts_q = queue.Queue()
|
||||
# Флаг прерывания для worker-а
|
||||
interrupt_event = threading.Event()
|
||||
|
||||
# Добавляем ответ AI в историю
|
||||
chat_history.append({"role": "assistant", "content": ai_response})
|
||||
def tts_worker():
|
||||
"""Фоновый поток, читающий предложения из очереди и озвучивающий их."""
|
||||
while True:
|
||||
item = tts_q.get()
|
||||
if item is None: # Poison pill (сигнал остановки)
|
||||
tts_q.task_done()
|
||||
break
|
||||
|
||||
# --- Шаг 4: Очистка ответа ---
|
||||
# Убираем Markdown (**жирный**, *курсив*) и готовим числа для озвучки
|
||||
clean_text = clean_response(ai_response, language="ru")
|
||||
text, lang = item
|
||||
|
||||
# Сохраняем последний ответ для функции "еще раз"
|
||||
last_response = clean_text
|
||||
# Если уже было прерывание, просто пропускаем (чистим очередь)
|
||||
if interrupt_event.is_set():
|
||||
tts_q.task_done()
|
||||
continue
|
||||
|
||||
# --- Шаг 5: Озвучка ответа ---
|
||||
# check_interrupt=check_wakeword_once позволяет прервать речь, сказав "Alexandr"
|
||||
completed = speak(
|
||||
clean_text, check_interrupt=check_wakeword_once, language="ru"
|
||||
)
|
||||
# Озвучиваем
|
||||
completed = speak(
|
||||
text, check_interrupt=check_wakeword_once, language=lang
|
||||
)
|
||||
|
||||
# После озвучки обязательно закрываем поток микрофона, который открывался для проверки прерывания
|
||||
if not completed:
|
||||
interrupt_event.set() # Сообщаем всем, что нас перебили
|
||||
|
||||
tts_q.task_done()
|
||||
|
||||
# Запускаем поток озвучки
|
||||
worker_thread = threading.Thread(target=tts_worker, daemon=True)
|
||||
worker_thread.start()
|
||||
|
||||
full_response = ""
|
||||
buffer = ""
|
||||
|
||||
try:
|
||||
# Получаем генератор потока от AI
|
||||
stream_generator = ask_ai_stream(list(chat_history))
|
||||
|
||||
print("🤖 AI говорит: ", end="", flush=True)
|
||||
|
||||
for chunk in stream_generator:
|
||||
# Если в процессе генерации нас перебили (на ранних фразах), прерываем получение
|
||||
if interrupt_event.is_set():
|
||||
break
|
||||
|
||||
buffer += chunk
|
||||
full_response += chunk
|
||||
print(chunk, end="", flush=True)
|
||||
|
||||
# Проверяем на конец предложения (. ! ? + пробел или конец строки)
|
||||
# Эвристика: ищем знаки препинания, после которых идет пробел или перевод строки
|
||||
if re.search(r"[.!?\n]+(?:\s|$)", buffer):
|
||||
# Очищаем и отправляем в очередь
|
||||
clean_chunk = clean_response(buffer, language="ru")
|
||||
if clean_chunk.strip():
|
||||
tts_q.put((clean_chunk, "ru"))
|
||||
buffer = ""
|
||||
|
||||
# Отправляем остаток (если есть)
|
||||
if buffer.strip() and not interrupt_event.is_set():
|
||||
clean_chunk = clean_response(buffer, language="ru")
|
||||
if clean_chunk.strip():
|
||||
tts_q.put((clean_chunk, "ru"))
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Ошибка стриминга: {e}")
|
||||
speak("Произошла ошибка при получении ответа.")
|
||||
|
||||
# Ждем, пока все договорится
|
||||
# Добавляем poison pill, чтобы поток завершился, когда очередь пуста
|
||||
tts_q.put(None)
|
||||
worker_thread.join()
|
||||
|
||||
print() # Перенос строки после вывода AI
|
||||
|
||||
# Добавляем полный ответ AI в историю
|
||||
chat_history.append({"role": "assistant", "content": full_response})
|
||||
|
||||
# Сохраняем для "повтори"
|
||||
last_response = clean_response(full_response, language="ru")
|
||||
|
||||
# После озвучки обязательно закрываем поток микрофона
|
||||
stop_wakeword_monitoring()
|
||||
|
||||
# Включаем режим диалога (следующий запрос можно говорить без имени)
|
||||
skip_wakeword = True
|
||||
|
||||
if not completed:
|
||||
if interrupt_event.is_set():
|
||||
print("⏹️ Ответ прерван - слушаю следующий вопрос")
|
||||
# Если перебили, значит есть новый вопрос, сразу слушаем его (цикл перезапустится)
|
||||
pass
|
||||
# Если перебили, цикл перезапустится и skip_wakeword уже True
|
||||
|
||||
print()
|
||||
print("-" * 30)
|
||||
|
||||
BIN
assets/sounds/ding.wav
Normal file
BIN
assets/sounds/ding.wav
Normal file
Binary file not shown.
@@ -71,3 +71,4 @@ urllib3==2.6.2
|
||||
vosk==0.3.45
|
||||
websockets==15.0.1
|
||||
yarl==1.22.0
|
||||
pygame
|
||||
|
||||
28
scripts/generate_ding.py
Normal file
28
scripts/generate_ding.py
Normal file
@@ -0,0 +1,28 @@
|
||||
import wave
|
||||
import math
|
||||
import struct
|
||||
|
||||
|
||||
def generate_ding(filename="assets/sounds/ding.wav", frequency=800, duration=0.15):
|
||||
sample_rate = 44100
|
||||
n_frames = int(sample_rate * duration)
|
||||
|
||||
with wave.open(filename, "w") as wav_file:
|
||||
wav_file.setnchannels(1)
|
||||
wav_file.setsampwidth(2)
|
||||
wav_file.setframerate(sample_rate)
|
||||
|
||||
data = []
|
||||
for i in range(n_frames):
|
||||
# Затухающая синусоида
|
||||
t = i / sample_rate
|
||||
value = int(
|
||||
32767.0 * math.sin(2 * math.pi * frequency * t) * (1 - t / duration)
|
||||
)
|
||||
data.append(struct.pack("<h", value))
|
||||
|
||||
wav_file.writeframes(b"".join(data))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate_ding()
|
||||
Reference in New Issue
Block a user