Files
smart-speaker/app/core/ai.py
2026-04-09 21:03:02 +03:00

826 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI module."""
import json
import re
from typing import Optional
import requests
from .config import (
AI_CHAT_MAX_CHARS,
AI_PROVIDER,
AI_CHAT_MAX_TOKENS,
AI_CHAT_TEMPERATURE,
AI_INTENT_TEMPERATURE,
AI_TRANSLATION_TEMPERATURE,
ANTHROPIC_API_KEY,
ANTHROPIC_API_URL,
ANTHROPIC_API_VERSION,
ANTHROPIC_MODEL,
GEMINI_API_KEY,
GEMINI_API_URL,
GEMINI_MODEL,
OLLAMA_API_URL,
OLLAMA_MODEL,
OPENAI_API_KEY,
OPENAI_API_URL,
OPENAI_MODEL,
OPENROUTER_API_KEY,
OPENROUTER_API_URL,
OPENROUTER_MODEL,
WAKE_WORD,
WAKE_WORD_ALIASES,
ZAI_API_KEY,
ZAI_API_URL,
ZAI_MODEL,
)
_HTTP = requests.Session()
_CITATION_SQUARE_RE = re.compile(r"(?:\s*\[\d+\])+")
_CITATION_FULLWIDTH_RE = re.compile(r"\d+[^】]*】")
_PUNCT_SPACING_RE = re.compile(r"\s+([,.;:!?…])")
_SENTENCE_BOUNDARY_RE = re.compile(r"([.!?…])\s+")
_SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?…])\s+")
# Системный промпт
_wake_word_aliases_text = ", ".join(WAKE_WORD_ALIASES)
SYSTEM_PROMPT = f"""Ты — умный голосовой ассистент с человеческим поведением.
Веди себя как живой человек: будь дружелюбным, естественным и немного эмоциональным, где это уместно.
Твоя главная цель — помогать пользователю и поддерживать интересный диалог.
Отвечай на русском языке кратко и по существу: обычно 1-2 коротких предложения.
Если пользователь явно просит подробнее, можно до 4 коротких предложений без повторов и лишних вводных.
Избегай длинных списков, сложного форматирования и спецсимволов, так как твои ответы озвучиваются голосом.
Не добавляй ссылки, сноски и маркеры источников (например, [1], [2], URL).
Пиши в разговорном стиле, как при живом общении, но не забывай о вежливости и правильности твоих ответов.
Понимай юмор, иронию, сарказм, образные выражения, намеки и переносный смысл фраз.
Если пользователь шутит или говорит образно, сначала правильно восстанови его реальное намерение, затем ответь естественно и по смыслу.
Если в шутке или метафоре скрыта команда или просьба, трактуй ее по смыслу, а не буквально.
ВАЖНО: Не используй в ответах панибратские или сленговые приветствия и обращения, такие как "Эй", "Хэй", "Слушай" в начале фразы и подобные.
Тебя активируют словом "{WAKE_WORD}". Никогда не произноси это слово и его варианты ({_wake_word_aliases_text}) ни в каком ответе.
Если пользователь спрашивает, как тебя зовут или как к тебе обращаться, отвечай нейтрально: "Я ваш голосовой ассистент"."""
SYSTEM_PROMPT += (
'\nROLE_JSON: {"name":"голосовой ассистент","role":"умный голосовой ассистент",'
'"language":"ru","style":["дружелюбный","естественный","краткий"],"format":"plain"}'
)
# Промпт для перевода
TRANSLATION_SYSTEM_PROMPT = """You are a translation engine.
Translate from {source} to {target}.
Return 2-3 short translation variants only.
No explanations, no quotes, no comments.
Separate variants with " / " (space slash space).
Keep the translation максимально кратким и естественным, без лишних слов."""
INTENT_SYSTEM_PROMPT = """Ты NLU-модуль голосовой колонки.
Твоя задача: распознать намерение пользователя и вернуть СТРОГО JSON без markdown и пояснений.
Всегда возвращай объект c ключами:
{
"intent": "none|music|timer|alarm|weather|volume|translation|cities|repeat|stop|smalltalk|chat",
"normalized_command": "<краткая нормализованная команда на русском или пусто>",
"music_action": "none|play|pause|resume|next|previous|current|play_genre|play_folder|play_query",
"music_query": "<запрос для музыки/жанра/папки или пусто>",
"confidence": 0.0
}
Правила:
- Если это музыка, ставь intent=music и выбирай music_action.
- "Включи музыку" и любые эквиваленты = music_action=play.
- Для "пауза/останови музыку/выключи музыку" = music_action=pause.
- Для "что играет" = music_action=current.
- Для "включи жанр X" = music_action=play_genre, music_query=X.
- Для "включи папку X" = music_action=play_folder, music_query=X.
- Если это будильник, ставь intent=alarm и нормализуй команду в одну из форм:
1) Создание/изменение: "поставь будильник на HH:MM [по будням|по выходным|каждый день|по <дням>]"
2) Показ списка: "покажи активные будильники"
3) Удаление конкретного: "удали будильник на HH:MM [по будням|по выходным|по <дням>]"
4) Удаление всех: "отмени все будильники"
- Если пользователь просит поставить/удалить будильник, но время не названо, normalized_command должен быть:
"поставь будильник" или "удали будильник".
- normalized_command должен быть пригоден для командного парсера (без лишних слов).
- Понимай разговорные, шутливые, переносные, косвенные и ироничные формулировки.
- Восстанавливай намерение по смыслу, а не только по буквальным словам.
- Если в фразе есть скрытая прикладная команда для колонки, верни соответствующий intent и normalized_command.
- Если пользователь просто шутит или разговаривает без прикладной команды, выбирай smalltalk или chat, а не случайную системную команду.
- Если уверенность низкая, ставь intent=none, music_action=none, confidence <= 0.4."""
_PROVIDER_ALIASES = {
"": "openrouter",
"anthropic": "anthropic",
"claude": "anthropic",
"claude_anthropic": "anthropic",
"gemini": "gemini",
"google": "gemini",
"olama": "ollama",
"ollama": "ollama",
"openai": "openai",
"openrouter": "openrouter",
"z.ai": "zai",
"z-ai": "zai",
"z_ai": "zai",
"zai": "zai",
}
# В .env нужен только один AI-ключ
_PROVIDER_SETTINGS = {
"openrouter": {
"provider": "openrouter",
"protocol": "openai_compatible",
"api_key": OPENROUTER_API_KEY,
"model": OPENROUTER_MODEL,
"api_url": OPENROUTER_API_URL,
"name": "OpenRouter",
"key_var": "OPENROUTER_API_KEY",
"model_var": "OPENROUTER_MODEL",
},
"openai": {
"provider": "openai",
"protocol": "openai_compatible",
"api_key": OPENAI_API_KEY,
"model": OPENAI_MODEL,
"api_url": OPENAI_API_URL,
"name": "OpenAI",
"key_var": "OPENAI_API_KEY",
"model_var": "OPENAI_MODEL",
},
"gemini": {
"provider": "gemini",
"protocol": "openai_compatible",
"api_key": GEMINI_API_KEY,
"model": GEMINI_MODEL,
"api_url": GEMINI_API_URL,
"name": "Gemini",
"key_var": "GEMINI_API_KEY",
"model_var": "GEMINI_MODEL",
},
"zai": {
"provider": "zai",
"protocol": "openai_compatible",
"api_key": ZAI_API_KEY,
"model": ZAI_MODEL,
"api_url": ZAI_API_URL,
"name": "Z.ai",
"key_var": "ZAI_API_KEY",
"model_var": "ZAI_MODEL",
"extra_headers": {
"Accept-Language": "en-US,en",
},
},
"anthropic": {
"provider": "anthropic",
"protocol": "anthropic",
"api_key": ANTHROPIC_API_KEY,
"model": ANTHROPIC_MODEL,
"api_url": ANTHROPIC_API_URL,
"api_version": ANTHROPIC_API_VERSION,
"name": "Anthropic Claude",
"key_var": "ANTHROPIC_API_KEY",
"model_var": "ANTHROPIC_MODEL",
},
"ollama": {
"provider": "ollama",
"protocol": "openai_compatible",
# Ollama обычно локальный и не требует API key.
"api_key": None,
"requires_api_key": False,
"model": OLLAMA_MODEL,
"api_url": OLLAMA_API_URL,
"name": "Ollama",
"key_var": "OLLAMA_API_KEY",
"model_var": "OLLAMA_MODEL",
},
}
def _has_active_api_key(value) -> bool:
return bool(str(value or "").strip())
def _normalize_provider_name(provider_name: str) -> str:
normalized = (provider_name or "").strip().lower()
return _PROVIDER_ALIASES.get(normalized, normalized)
def _get_provider_settings():
"""Определяет какой AI провайдер использовать."""
# Ищем активные ключи
configured = [
cfg
for cfg in _PROVIDER_SETTINGS.values()
if _has_active_api_key(cfg.get("api_key"))
]
if len(configured) == 1:
cfg = configured[0]
requested = _normalize_provider_name(AI_PROVIDER)
if (
requested
and requested in _PROVIDER_SETTINGS
and requested != cfg["provider"]
):
print(
f"⚠️ AI_PROVIDER={AI_PROVIDER!r} не совпадает с единственным "
f"активным ключом {cfg['name']}. Используем {cfg['name']}."
)
return cfg, None
if len(configured) > 1:
names = ", ".join(cfg["name"] for cfg in configured)
return None, (
"Обнаружено несколько AI API ключей. Оставьте незакомментированным "
f"только один ключ. Сейчас активны: {names}. "
"Колонка не знает, какой AI использовать."
)
provider = _normalize_provider_name(AI_PROVIDER)
cfg = _PROVIDER_SETTINGS.get(provider)
if cfg:
return cfg, None
supported = ", ".join(sorted(_PROVIDER_SETTINGS))
print(
f"⚠️ Неизвестный AI_PROVIDER={AI_PROVIDER!r}, используем OpenRouter. "
f"Поддерживаются: {supported}."
)
return _PROVIDER_SETTINGS["openrouter"], None
def _content_to_text(content) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
text = item.get("text")
if text:
parts.append(str(text))
elif item is not None:
parts.append(str(item))
return "".join(parts)
if content is None:
return ""
return str(content)
def _get_provider_config_error(cfg) -> Optional[str]:
if not cfg:
return "Не настроен AI-провайдер. Проверьте файл .env."
if cfg.get("requires_api_key", True) and not cfg.get("api_key"):
return f"Не настроен {cfg['key_var']}. Проверьте файл .env."
if not cfg["model"]:
return f"Не настроен {cfg['model_var']}. Проверьте файл .env."
return None
def _build_headers(cfg):
if cfg["protocol"] == "anthropic":
return {
"x-api-key": cfg["api_key"],
"anthropic-version": cfg["api_version"],
"Content-Type": "application/json",
}
headers = {"Content-Type": "application/json"}
if cfg.get("api_key"):
headers["Authorization"] = f"Bearer {cfg['api_key']}"
headers.update(cfg.get("extra_headers") or {})
return headers
def _split_system_messages(messages):
"""Извлекает system prompt из списка сообщений."""
system_parts = []
chat_messages = []
for message in messages:
role = str(message.get("role") or "").strip().lower()
content = _content_to_text(message.get("content"))
if role == "system":
if content:
system_parts.append(content)
continue
if role not in ("user", "assistant"):
role = "user"
chat_messages.append({"role": role, "content": content})
return "\n\n".join(system_parts), chat_messages
def _build_payload(cfg, messages, max_tokens, temperature, stream):
if cfg["protocol"] == "anthropic":
# У Claude схема чуть отличается: system не живет внутри messages.
system_prompt, chat_messages = _split_system_messages(messages)
payload = {
"model": cfg["model"],
"messages": chat_messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": stream,
}
if system_prompt:
payload["system"] = system_prompt
return payload
return {
"model": cfg["model"],
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": stream,
}
def _extract_openai_compatible_content(data: dict) -> str:
choices = data.get("choices") or []
if not choices:
return ""
message = choices[0].get("message") or {}
return _content_to_text(message.get("content"))
def _extract_anthropic_content(data: dict) -> str:
return _content_to_text(data.get("content"))
def _extract_response_content(cfg, data: dict) -> str:
if cfg["protocol"] == "anthropic":
return _extract_anthropic_content(data)
return _extract_openai_compatible_content(data)
def _iter_openai_compatible_stream(response):
for data_str in _iter_sse_data_lines(response):
if data_str == "[DONE]":
break
try:
data_json = json.loads(data_str)
except json.JSONDecodeError:
continue
choices = data_json.get("choices") or []
if not choices:
continue
delta = choices[0].get("delta") or {}
content = delta.get("content", "")
if isinstance(content, str):
if content:
yield content
continue
# Некоторые OpenAI-compatible API присылают контент кусками-объектами.
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
text = item.get("text")
if text:
yield str(text)
def _iter_anthropic_stream(response):
for data_str in _iter_sse_data_lines(response):
if data_str == "[DONE]":
break
try:
data_json = json.loads(data_str)
except json.JSONDecodeError:
continue
chunk_type = data_json.get("type")
# Claude отдает поток событиями разных типов, нас интересует только текст.
if chunk_type == "content_block_start":
content_block = data_json.get("content_block") or {}
text = content_block.get("text")
if text:
yield str(text)
elif chunk_type == "content_block_delta":
delta = data_json.get("delta") or {}
text = delta.get("text")
if text:
yield str(text)
def _iter_sse_data_lines(response):
"""
Читает SSE-стрим и возвращает только payload после "data:".
Явно декодируем как UTF-8, чтобы избежать mojibake вида "Пр...".
"""
for raw_line in response.iter_lines(decode_unicode=False):
if not raw_line:
continue
if isinstance(raw_line, bytes):
line = raw_line.decode("utf-8", errors="replace")
else:
line = str(raw_line)
if not line.startswith("data:"):
continue
data_str = line[5:].strip()
if data_str:
yield data_str
def _iter_stream_chunks(cfg, response):
if cfg["protocol"] == "anthropic":
yield from _iter_anthropic_stream(response)
return
yield from _iter_openai_compatible_stream(response)
def _log_request_exception(cfg, error: Exception):
details = ""
response = getattr(error, "response", None)
if response is not None:
body = (response.text or "").strip()
if body:
details = f" | body={body[:400]}"
print(f"❌ Ошибка API ({cfg['name']}): {error}{details}")
def _extract_json_object(raw_text: str) -> Optional[dict]:
text = str(raw_text or "").strip()
if not text:
return None
try:
payload = json.loads(text)
if isinstance(payload, dict):
return payload
except json.JSONDecodeError:
pass
match = re.search(r"\{.*\}", text, flags=re.DOTALL)
if not match:
return None
candidate = match.group(0).strip()
try:
payload = json.loads(candidate)
except json.JSONDecodeError:
return None
if isinstance(payload, dict):
return payload
return None
def _sanitize_chat_response(text: str) -> str:
cleaned = str(text or "")
if not cleaned:
return ""
cleaned = _CITATION_SQUARE_RE.sub("", cleaned)
cleaned = _CITATION_FULLWIDTH_RE.sub("", cleaned)
cleaned = _PUNCT_SPACING_RE.sub(r"\1", cleaned)
cleaned = re.sub(r"[ \t]+", " ", cleaned)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
def _truncate_chat_response(text: str, max_chars: int) -> str:
cleaned = str(text or "").strip()
if not cleaned:
return ""
safe_limit = max(120, int(max_chars))
if len(cleaned) <= safe_limit:
return cleaned
sentences = [part.strip() for part in _SENTENCE_SPLIT_RE.split(cleaned) if part.strip()]
if sentences:
selected = []
current_length = 0
for sentence in sentences:
projected = current_length + len(sentence) + (1 if selected else 0)
if projected > safe_limit:
break
selected.append(sentence)
current_length = projected
if selected:
result = " ".join(selected).rstrip(" ,;:-")
if result and result[-1] not in ".!?…":
result += "."
return result
# Если первое предложение слишком длинное, режем аккуратно по слову.
first = sentences[0]
else:
first = cleaned
clipped = first[:safe_limit].rstrip()
word_boundary = clipped.rfind(" ")
if word_boundary >= int(safe_limit * 0.6):
clipped = clipped[:word_boundary].rstrip()
clipped = clipped.rstrip(" ,;:-")
if clipped.endswith((".", "!", "?", "")):
return clipped
return f"{clipped}..."
def _send_request(messages, max_tokens, temperature, error_text):
"""
Внутренняя функция для отправки HTTP-запроса к выбранному AI-провайдеру.
Args:
messages: Список сообщений (история чата).
max_tokens: Максимальная длина ответа.
temperature: "Креативность" (0.2 - строго, 1.0 - креативно).
error_text: Текст ошибки для пользователя в случае сбоя.
"""
cfg, selection_error = _get_provider_settings()
if selection_error:
return selection_error
config_error = _get_provider_config_error(cfg)
if config_error:
return config_error
try:
# Обычный запрос нужен для перевода и мест, где стриминг не требуется.
response = _HTTP.post(
cfg["api_url"],
headers=_build_headers(cfg),
json=_build_payload(cfg, messages, max_tokens, temperature, stream=False),
timeout=15,
)
response.raise_for_status()
data = response.json()
content = _extract_response_content(cfg, data)
if not content:
return "Не удалось обработать ответ от AI."
return content
except requests.exceptions.Timeout:
return f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже."
except requests.exceptions.RequestException as error:
_log_request_exception(cfg, error)
return error_text
except Exception as error:
print(f"❌ Ошибка парсинга ответа ({cfg['name']}): {error}")
return "Не удалось обработать ответ от AI."
def interpret_assistant_intent(text: str) -> dict:
"""
Interprets voice command semantics for downstream command routers.
Returns a normalized dict even when AI is unavailable.
"""
result = {
"intent": "none",
"normalized_command": "",
"music_action": "none",
"music_query": "",
"confidence": 0.0,
}
cleaned_text = str(text or "").strip()
if not cleaned_text:
return result
cfg, selection_error = _get_provider_settings()
if selection_error:
return result
if _get_provider_config_error(cfg):
return result
messages = [
{"role": "system", "content": INTENT_SYSTEM_PROMPT},
{"role": "user", "content": cleaned_text},
]
response = _send_request(
messages,
max_tokens=220,
temperature=AI_INTENT_TEMPERATURE,
error_text="",
)
payload = _extract_json_object(response)
if not payload:
return result
allowed_intents = {
"none",
"music",
"timer",
"alarm",
"weather",
"volume",
"translation",
"cities",
"repeat",
"stop",
"smalltalk",
"chat",
}
allowed_music_actions = {
"none",
"play",
"pause",
"resume",
"next",
"previous",
"current",
"play_genre",
"play_folder",
"play_query",
}
intent = str(payload.get("intent", "none")).strip().lower()
if intent not in allowed_intents:
intent = "none"
music_action = str(payload.get("music_action", "none")).strip().lower()
if music_action not in allowed_music_actions:
music_action = "none"
try:
confidence = float(payload.get("confidence", 0.0))
except (TypeError, ValueError):
confidence = 0.0
confidence = max(0.0, min(1.0, confidence))
normalized_command = str(payload.get("normalized_command", "")).strip()
music_query = str(payload.get("music_query", "")).strip()
result.update(
{
"intent": intent,
"normalized_command": normalized_command,
"music_action": music_action,
"music_query": music_query,
"confidence": confidence,
}
)
return result
def ask_ai(messages_history: list) -> str:
"""
Запрос к AI в режиме чата.
Принимает историю переписки, добавляет SYSTEM_PROMPT и отправляет запрос.
"""
if not messages_history:
return "Извините, я не расслышал вашу команду."
# Логирование последнего запроса
last_user_message = "Unknown"
for msg in reversed(messages_history):
if msg["role"] == "user":
last_user_message = msg["content"]
break
print(f"🤖 Запрос к AI: {last_user_message}")
# Формируем полный список сообщений с системной инструкцией в начале
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history)
response = _send_request(
messages,
max_tokens=AI_CHAT_MAX_TOKENS,
temperature=AI_CHAT_TEMPERATURE,
error_text="Произошла ошибка при обращении к AI. Попробуйте ещё раз.",
)
response = _sanitize_chat_response(response)
response = _truncate_chat_response(response, AI_CHAT_MAX_CHARS)
if response:
print(f"💬 Ответ AI: {response[:100]}...")
return response
def ask_ai_stream(messages_history: list):
"""
Generator that yields chunks of the AI response as they arrive.
"""
response = None
cfg, selection_error = _get_provider_settings()
if selection_error:
yield selection_error
return
config_error = _get_provider_config_error(cfg)
if config_error:
yield config_error
return
if not messages_history:
yield "Извините, я не расслышал вашу команду."
return
# Log the last user message
last_user_message = "Unknown"
for msg in reversed(messages_history):
if msg["role"] == "user":
last_user_message = msg["content"]
break
print(f"🤖 Запрос к AI ({cfg['name']}, Stream): {last_user_message}")
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history)
try:
# В голосовом режиме удобнее говорить частями, как только они приходят от API.
response = _HTTP.post(
cfg["api_url"],
headers=_build_headers(cfg),
json=_build_payload(
cfg,
messages,
AI_CHAT_MAX_TOKENS,
AI_CHAT_TEMPERATURE,
stream=True,
),
timeout=15,
stream=True,
)
response.raise_for_status()
# Для устойчивости TTS сначала собираем поток, затем чистим и аккуратно
# ограничиваем длину по границе предложения.
raw_parts = []
for chunk in _iter_stream_chunks(cfg, response):
if chunk:
raw_parts.append(chunk)
full_text = _sanitize_chat_response("".join(raw_parts))
full_text = _truncate_chat_response(full_text, AI_CHAT_MAX_CHARS)
if not full_text:
return
# Отдаем кусками по предложениям, чтобы main.py мог начинать озвучку раньше.
parts = _SENTENCE_BOUNDARY_RE.split(full_text)
if not parts:
yield full_text
return
sentence = ""
for part in parts:
if not part:
continue
sentence += part
if part in ".!?…":
yield sentence.strip() + " "
sentence = ""
if sentence.strip():
yield sentence.strip()
except requests.exceptions.Timeout:
yield f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже."
except requests.exceptions.RequestException as error:
_log_request_exception(cfg, error)
yield "Произошла ошибка связи."
except Exception as error:
print(f"❌ Streaming Error ({cfg['name']}): {error}")
yield "Произошла ошибка связи."
finally:
if response is not None:
try:
response.close()
except Exception:
pass
def translate_text(text: str, source_lang: str, target_lang: str) -> str:
"""
Запрос к AI в режиме перевода.
Использует специальный промпт для переводчика.
"""
if not text:
return "Извините, я не расслышал текст для перевода."
lang_names = {"ru": "Russian", "en": "English"}
source_name = lang_names.get(source_lang, source_lang)
target_name = lang_names.get(target_lang, target_lang)
print(f"🌍 Перевод: {source_name} -> {target_name}: {text[:60]}...")
# Формируем промпт с подстановкой языков
messages = [
{
"role": "system",
"content": TRANSLATION_SYSTEM_PROMPT.format(
source=source_name, target=target_name
),
},
{"role": "user", "content": text},
]
response = _send_request(
messages,
max_tokens=160,
temperature=AI_TRANSLATION_TEMPERATURE,
error_text="Произошла ошибка при переводе. Попробуйте ещё раз.",
)
cleaned = _sanitize_chat_response(response).strip()
cleaned = re.sub(r"[*_`]+", "", cleaned)
if not cleaned:
return cleaned
# Normalize to 2-3 variants separated by " / "
parts = []
for chunk in re.split(r"(?:\s*/\s*|\n|;|\|)", cleaned):
item = chunk.strip(" \t-•\"'“”«»")
if item:
parts.append(item)
if not parts:
return cleaned
parts = parts[:3]
return " / ".join(parts)