Files
smart-speaker/app/core/ai.py

535 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI module with pluggable providers."""
# Модуль общения с искусственным интеллектом.
# Обрабатывает запросы пользователя и переводы через выбранный API-провайдер.
import json
import re
from typing import Optional
import requests
from .config import (
AI_PROVIDER,
ANTHROPIC_API_KEY,
ANTHROPIC_API_URL,
ANTHROPIC_API_VERSION,
ANTHROPIC_MODEL,
GEMINI_API_KEY,
GEMINI_API_URL,
GEMINI_MODEL,
OPENAI_API_KEY,
OPENAI_API_URL,
OPENAI_MODEL,
PERPLEXITY_API_KEY,
PERPLEXITY_API_URL,
PERPLEXITY_MODEL,
ZAI_API_KEY,
ZAI_API_URL,
ZAI_MODEL,
)
_HTTP = requests.Session()
# Системный промпт (инструкция) для AI.
# Задает личность ассистента: имя "Александр", стиль общения, краткость.
SYSTEM_PROMPT = """Ты — Александр, умный голосовой ассистент с человеческим поведением.
Веди себя как живой человек: будь дружелюбным, естественным и немного эмоциональным, где это уместно.
Твоя главная цель — помогать пользователю и поддерживать интересный диалог.
Отвечай кратко и по существу, на русском языке.
Избегай длинных списков, сложного форматирования и спецсимволов, так как твои ответы озвучиваются голосом.
Пиши в разговорном стиле, как при живом общении, но не забывай о вежливости и правильности твоих ответов.
ВАЖНО: Не используй в ответах панибратские или сленговые приветствия и обращения, такие как "Эй", "Хэй", "Слушай" в начале фразы и подобные."""
SYSTEM_PROMPT += (
'\nROLE_JSON: {"name":"Александр","role":"умный голосовой ассистент",'
'"language":"ru","style":["дружелюбный","естественный","краткий"],"format":"plain"}'
)
# Системный промпт для режима переводчика.
# Требует возвращать ТОЛЬКО перевод, без лишних слов ("Конечно, вот перевод...").
TRANSLATION_SYSTEM_PROMPT = """You are a translation engine.
Translate from {source} to {target}.
Return 2-3 short translation variants only.
No explanations, no quotes, no comments.
Separate variants with " / " (space slash space).
Keep the translation максимально кратким и естественным, без лишних слов."""
_PROVIDER_ALIASES = {
"": "perplexity",
"anthropic": "anthropic",
"claude": "anthropic",
"claude_anthropic": "anthropic",
"gemini": "gemini",
"google": "gemini",
"openai": "openai",
"perplexity": "perplexity",
"z.ai": "zai",
"z-ai": "zai",
"z_ai": "zai",
"zai": "zai",
}
# В реальном .env у пользователя должен быть активен только один AI-ключ.
# Поэтому настройки храним в одном словаре, а ниже отдельно проверяем конфликт
# конфигурации, чтобы ассистент не делал "лучшее предположение" молча.
_PROVIDER_SETTINGS = {
"perplexity": {
"provider": "perplexity",
"protocol": "openai_compatible",
"api_key": PERPLEXITY_API_KEY,
"model": PERPLEXITY_MODEL,
"api_url": PERPLEXITY_API_URL,
"name": "Perplexity",
"key_var": "PERPLEXITY_API_KEY",
"model_var": "PERPLEXITY_MODEL",
},
"openai": {
"provider": "openai",
"protocol": "openai_compatible",
"api_key": OPENAI_API_KEY,
"model": OPENAI_MODEL,
"api_url": OPENAI_API_URL,
"name": "OpenAI",
"key_var": "OPENAI_API_KEY",
"model_var": "OPENAI_MODEL",
},
"gemini": {
"provider": "gemini",
"protocol": "openai_compatible",
"api_key": GEMINI_API_KEY,
"model": GEMINI_MODEL,
"api_url": GEMINI_API_URL,
"name": "Gemini",
"key_var": "GEMINI_API_KEY",
"model_var": "GEMINI_MODEL",
},
"zai": {
"provider": "zai",
"protocol": "openai_compatible",
"api_key": ZAI_API_KEY,
"model": ZAI_MODEL,
"api_url": ZAI_API_URL,
"name": "Z.ai",
"key_var": "ZAI_API_KEY",
"model_var": "ZAI_MODEL",
"extra_headers": {
"Accept-Language": "en-US,en",
},
},
"anthropic": {
"provider": "anthropic",
"protocol": "anthropic",
"api_key": ANTHROPIC_API_KEY,
"model": ANTHROPIC_MODEL,
"api_url": ANTHROPIC_API_URL,
"api_version": ANTHROPIC_API_VERSION,
"name": "Anthropic Claude",
"key_var": "ANTHROPIC_API_KEY",
"model_var": "ANTHROPIC_MODEL",
},
}
def _has_active_api_key(value) -> bool:
return bool(str(value or "").strip())
def _normalize_provider_name(provider_name: str) -> str:
normalized = (provider_name or "").strip().lower()
return _PROVIDER_ALIASES.get(normalized, normalized)
def _get_provider_settings():
# Сначала ищем реально активные ключи. Это главный источник истины:
# если ключ один, используем именно его, даже если AI_PROVIDER указан иначе.
configured = [
cfg
for cfg in _PROVIDER_SETTINGS.values()
if _has_active_api_key(cfg.get("api_key"))
]
if len(configured) == 1:
cfg = configured[0]
requested = _normalize_provider_name(AI_PROVIDER)
if requested and requested in _PROVIDER_SETTINGS and requested != cfg["provider"]:
print(
f"⚠️ AI_PROVIDER={AI_PROVIDER!r} не совпадает с единственным "
f"активным ключом {cfg['name']}. Используем {cfg['name']}."
)
return cfg, None
if len(configured) > 1:
names = ", ".join(cfg["name"] for cfg in configured)
return None, (
"Обнаружено несколько AI API ключей. Оставьте незакомментированным "
f"только один ключ. Сейчас активны: {names}. "
"Колонка не знает, какой AI использовать."
)
provider = _normalize_provider_name(AI_PROVIDER)
cfg = _PROVIDER_SETTINGS.get(provider)
if cfg:
return cfg, None
supported = ", ".join(sorted(_PROVIDER_SETTINGS))
print(
f"⚠️ Неизвестный AI_PROVIDER={AI_PROVIDER!r}, используем Perplexity. "
f"Поддерживаются: {supported}."
)
return _PROVIDER_SETTINGS["perplexity"], None
def _content_to_text(content) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
text = item.get("text")
if text:
parts.append(str(text))
elif item is not None:
parts.append(str(item))
return "".join(parts)
if content is None:
return ""
return str(content)
def _get_provider_config_error(cfg) -> Optional[str]:
if not cfg:
return "Не настроен AI-провайдер. Проверьте файл .env."
if not cfg["api_key"]:
return f"Не настроен {cfg['key_var']}. Проверьте файл .env."
if not cfg["model"]:
return f"Не настроен {cfg['model_var']}. Проверьте файл .env."
return None
def _build_headers(cfg):
if cfg["protocol"] == "anthropic":
return {
"x-api-key": cfg["api_key"],
"anthropic-version": cfg["api_version"],
"Content-Type": "application/json",
}
headers = {
"Authorization": f"Bearer {cfg['api_key']}",
"Content-Type": "application/json",
}
headers.update(cfg.get("extra_headers") or {})
return headers
def _split_system_messages(messages):
system_parts = []
chat_messages = []
for message in messages:
role = str(message.get("role") or "").strip().lower()
content = _content_to_text(message.get("content"))
if role == "system":
if content:
system_parts.append(content)
continue
if role not in ("user", "assistant"):
role = "user"
chat_messages.append({"role": role, "content": content})
# Anthropic хранит системную инструкцию отдельно от обычной истории чата.
return "\n\n".join(system_parts), chat_messages
def _build_payload(cfg, messages, max_tokens, temperature, stream):
if cfg["protocol"] == "anthropic":
# У Claude схема чуть отличается: system не живет внутри messages.
system_prompt, chat_messages = _split_system_messages(messages)
payload = {
"model": cfg["model"],
"messages": chat_messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": stream,
}
if system_prompt:
payload["system"] = system_prompt
return payload
return {
"model": cfg["model"],
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": stream,
}
def _extract_openai_compatible_content(data: dict) -> str:
choices = data.get("choices") or []
if not choices:
return ""
message = choices[0].get("message") or {}
return _content_to_text(message.get("content"))
def _extract_anthropic_content(data: dict) -> str:
return _content_to_text(data.get("content"))
def _extract_response_content(cfg, data: dict) -> str:
if cfg["protocol"] == "anthropic":
return _extract_anthropic_content(data)
return _extract_openai_compatible_content(data)
def _iter_openai_compatible_stream(response):
for line in response.iter_lines(decode_unicode=True):
if not line or not line.startswith("data:"):
continue
data_str = line[5:].strip()
if data_str == "[DONE]":
break
try:
data_json = json.loads(data_str)
except json.JSONDecodeError:
continue
choices = data_json.get("choices") or []
if not choices:
continue
delta = choices[0].get("delta") or {}
content = delta.get("content", "")
if isinstance(content, str):
if content:
yield content
continue
# Некоторые OpenAI-compatible API присылают контент кусками-объектами.
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
text = item.get("text")
if text:
yield str(text)
def _iter_anthropic_stream(response):
for line in response.iter_lines(decode_unicode=True):
if not line or not line.startswith("data:"):
continue
data_str = line[5:].strip()
if data_str == "[DONE]":
break
try:
data_json = json.loads(data_str)
except json.JSONDecodeError:
continue
chunk_type = data_json.get("type")
# Claude отдает поток событиями разных типов, нас интересует только текст.
if chunk_type == "content_block_start":
content_block = data_json.get("content_block") or {}
text = content_block.get("text")
if text:
yield str(text)
elif chunk_type == "content_block_delta":
delta = data_json.get("delta") or {}
text = delta.get("text")
if text:
yield str(text)
def _iter_stream_chunks(cfg, response):
if cfg["protocol"] == "anthropic":
yield from _iter_anthropic_stream(response)
return
yield from _iter_openai_compatible_stream(response)
def _log_request_exception(cfg, error: Exception):
details = ""
response = getattr(error, "response", None)
if response is not None:
body = (response.text or "").strip()
if body:
details = f" | body={body[:400]}"
print(f"❌ Ошибка API ({cfg['name']}): {error}{details}")
def _send_request(messages, max_tokens, temperature, error_text):
"""
Внутренняя функция для отправки HTTP-запроса к выбранному AI-провайдеру.
Args:
messages: Список сообщений (история чата).
max_tokens: Максимальная длина ответа.
temperature: "Креативность" (0.2 - строго, 1.0 - креативно).
error_text: Текст ошибки для пользователя в случае сбоя.
"""
cfg, selection_error = _get_provider_settings()
if selection_error:
return selection_error
config_error = _get_provider_config_error(cfg)
if config_error:
return config_error
try:
# Обычный запрос нужен для перевода и мест, где стриминг не требуется.
response = _HTTP.post(
cfg["api_url"],
headers=_build_headers(cfg),
json=_build_payload(cfg, messages, max_tokens, temperature, stream=False),
timeout=15,
)
response.raise_for_status()
data = response.json()
content = _extract_response_content(cfg, data)
if not content:
return "Не удалось обработать ответ от AI."
return content
except requests.exceptions.Timeout:
return f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже."
except requests.exceptions.RequestException as error:
_log_request_exception(cfg, error)
return error_text
except Exception as error:
print(f"❌ Ошибка парсинга ответа ({cfg['name']}): {error}")
return "Не удалось обработать ответ от AI."
def ask_ai(messages_history: list) -> str:
"""
Запрос к AI в режиме чата.
Принимает историю переписки, добавляет SYSTEM_PROMPT и отправляет запрос.
"""
if not messages_history:
return "Извините, я не расслышал вашу команду."
# Логирование последнего запроса
last_user_message = "Unknown"
for msg in reversed(messages_history):
if msg["role"] == "user":
last_user_message = msg["content"]
break
print(f"🤖 Запрос к AI: {last_user_message}")
# Формируем полный список сообщений с системной инструкцией в начале
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history)
response = _send_request(
messages,
max_tokens=500,
temperature=1.0, # Высокая температура для более живого общения
error_text="Произошла ошибка при обращении к AI. Попробуйте ещё раз.",
)
if response:
print(f"💬 Ответ AI: {response[:100]}...")
return response
def ask_ai_stream(messages_history: list):
"""
Generator that yields chunks of the AI response as they arrive.
"""
cfg, selection_error = _get_provider_settings()
if selection_error:
yield selection_error
return
config_error = _get_provider_config_error(cfg)
if config_error:
yield config_error
return
if not messages_history:
yield "Извините, я не расслышал вашу команду."
return
# Log the last user message
last_user_message = "Unknown"
for msg in reversed(messages_history):
if msg["role"] == "user":
last_user_message = msg["content"]
break
print(f"🤖 Запрос к AI ({cfg['name']}, Stream): {last_user_message}")
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history)
try:
# В голосовом режиме удобнее говорить частями, как только они приходят от API.
response = _HTTP.post(
cfg["api_url"],
headers=_build_headers(cfg),
json=_build_payload(cfg, messages, 500, 1.0, stream=True),
timeout=15,
stream=True,
)
response.raise_for_status()
for chunk in _iter_stream_chunks(cfg, response):
yield chunk
except requests.exceptions.Timeout:
yield f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже."
except requests.exceptions.RequestException as error:
_log_request_exception(cfg, error)
yield "Произошла ошибка связи."
except Exception as error:
print(f"❌ Streaming Error ({cfg['name']}): {error}")
yield "Произошла ошибка связи."
def translate_text(text: str, source_lang: str, target_lang: str) -> str:
"""
Запрос к AI в режиме перевода.
Использует специальный промпт для переводчика.
"""
if not text:
return "Извините, я не расслышал текст для перевода."
lang_names = {"ru": "Russian", "en": "English"}
source_name = lang_names.get(source_lang, source_lang)
target_name = lang_names.get(target_lang, target_lang)
print(f"🌍 Перевод: {source_name} -> {target_name}: {text[:60]}...")
# Формируем промпт с подстановкой языков
messages = [
{
"role": "system",
"content": TRANSLATION_SYSTEM_PROMPT.format(
source=source_name, target=target_name
),
},
{"role": "user", "content": text},
]
response = _send_request(
messages,
max_tokens=160,
temperature=0.2, # Низкая температура для точности перевода
error_text="Произошла ошибка при переводе. Попробуйте ещё раз.",
)
cleaned = response.strip()
if not cleaned:
return cleaned
# Normalize to 2-3 variants separated by " / "
parts = []
for chunk in re.split(r"(?:\s*/\s*|\n|;|\|)", cleaned):
item = chunk.strip(" \t-•")
if item:
parts.append(item)
if not parts:
return cleaned
parts = parts[:3]
return " / ".join(parts)