"""AI module.""" import json import re from typing import Optional import requests from .config import ( AI_PROVIDER, ANTHROPIC_API_KEY, ANTHROPIC_API_URL, ANTHROPIC_API_VERSION, ANTHROPIC_MODEL, GEMINI_API_KEY, GEMINI_API_URL, GEMINI_MODEL, OPENAI_API_KEY, OPENAI_API_URL, OPENAI_MODEL, PERPLEXITY_API_KEY, PERPLEXITY_API_URL, PERPLEXITY_MODEL, ZAI_API_KEY, ZAI_API_URL, ZAI_MODEL, ) _HTTP = requests.Session() # Системный промпт SYSTEM_PROMPT = """Ты — Александр, умный голосовой ассистент с человеческим поведением. Веди себя как живой человек: будь дружелюбным, естественным и немного эмоциональным, где это уместно. Твоя главная цель — помогать пользователю и поддерживать интересный диалог. Отвечай кратко и по существу, на русском языке. Избегай длинных списков, сложного форматирования и спецсимволов, так как твои ответы озвучиваются голосом. Пиши в разговорном стиле, как при живом общении, но не забывай о вежливости и правильности твоих ответов. ВАЖНО: Не используй в ответах панибратские или сленговые приветствия и обращения, такие как "Эй", "Хэй", "Слушай" в начале фразы и подобные.""" SYSTEM_PROMPT += ( '\nROLE_JSON: {"name":"Александр","role":"умный голосовой ассистент",' '"language":"ru","style":["дружелюбный","естественный","краткий"],"format":"plain"}' ) # Промпт для перевода TRANSLATION_SYSTEM_PROMPT = """You are a translation engine. Translate from {source} to {target}. Return 2-3 short translation variants only. No explanations, no quotes, no comments. Separate variants with " / " (space slash space). Keep the translation максимально кратким и естественным, без лишних слов.""" _PROVIDER_ALIASES = { "": "perplexity", "anthropic": "anthropic", "claude": "anthropic", "claude_anthropic": "anthropic", "gemini": "gemini", "google": "gemini", "openai": "openai", "perplexity": "perplexity", "z.ai": "zai", "z-ai": "zai", "z_ai": "zai", "zai": "zai", } # В .env нужен только один AI-ключ _PROVIDER_SETTINGS = { "perplexity": { "provider": "perplexity", "protocol": "openai_compatible", "api_key": PERPLEXITY_API_KEY, "model": PERPLEXITY_MODEL, "api_url": PERPLEXITY_API_URL, "name": "Perplexity", "key_var": "PERPLEXITY_API_KEY", "model_var": "PERPLEXITY_MODEL", }, "openai": { "provider": "openai", "protocol": "openai_compatible", "api_key": OPENAI_API_KEY, "model": OPENAI_MODEL, "api_url": OPENAI_API_URL, "name": "OpenAI", "key_var": "OPENAI_API_KEY", "model_var": "OPENAI_MODEL", }, "gemini": { "provider": "gemini", "protocol": "openai_compatible", "api_key": GEMINI_API_KEY, "model": GEMINI_MODEL, "api_url": GEMINI_API_URL, "name": "Gemini", "key_var": "GEMINI_API_KEY", "model_var": "GEMINI_MODEL", }, "zai": { "provider": "zai", "protocol": "openai_compatible", "api_key": ZAI_API_KEY, "model": ZAI_MODEL, "api_url": ZAI_API_URL, "name": "Z.ai", "key_var": "ZAI_API_KEY", "model_var": "ZAI_MODEL", "extra_headers": { "Accept-Language": "en-US,en", }, }, "anthropic": { "provider": "anthropic", "protocol": "anthropic", "api_key": ANTHROPIC_API_KEY, "model": ANTHROPIC_MODEL, "api_url": ANTHROPIC_API_URL, "api_version": ANTHROPIC_API_VERSION, "name": "Anthropic Claude", "key_var": "ANTHROPIC_API_KEY", "model_var": "ANTHROPIC_MODEL", }, } def _has_active_api_key(value) -> bool: return bool(str(value or "").strip()) def _normalize_provider_name(provider_name: str) -> str: normalized = (provider_name or "").strip().lower() return _PROVIDER_ALIASES.get(normalized, normalized) def _get_provider_settings(): """Определяет какой AI провайдер использовать.""" # Ищем активные ключи configured = [ cfg for cfg in _PROVIDER_SETTINGS.values() if _has_active_api_key(cfg.get("api_key")) ] if len(configured) == 1: cfg = configured[0] requested = _normalize_provider_name(AI_PROVIDER) if ( requested and requested in _PROVIDER_SETTINGS and requested != cfg["provider"] ): print( f"⚠️ AI_PROVIDER={AI_PROVIDER!r} не совпадает с единственным " f"активным ключом {cfg['name']}. Используем {cfg['name']}." ) return cfg, None if len(configured) > 1: names = ", ".join(cfg["name"] for cfg in configured) return None, ( "Обнаружено несколько AI API ключей. Оставьте незакомментированным " f"только один ключ. Сейчас активны: {names}. " "Колонка не знает, какой AI использовать." ) provider = _normalize_provider_name(AI_PROVIDER) cfg = _PROVIDER_SETTINGS.get(provider) if cfg: return cfg, None supported = ", ".join(sorted(_PROVIDER_SETTINGS)) print( f"⚠️ Неизвестный AI_PROVIDER={AI_PROVIDER!r}, используем Perplexity. " f"Поддерживаются: {supported}." ) return _PROVIDER_SETTINGS["perplexity"], None def _content_to_text(content) -> str: if isinstance(content, str): return content if isinstance(content, list): parts = [] for item in content: if isinstance(item, dict): text = item.get("text") if text: parts.append(str(text)) elif item is not None: parts.append(str(item)) return "".join(parts) if content is None: return "" return str(content) def _get_provider_config_error(cfg) -> Optional[str]: if not cfg: return "Не настроен AI-провайдер. Проверьте файл .env." if not cfg["api_key"]: return f"Не настроен {cfg['key_var']}. Проверьте файл .env." if not cfg["model"]: return f"Не настроен {cfg['model_var']}. Проверьте файл .env." return None def _build_headers(cfg): if cfg["protocol"] == "anthropic": return { "x-api-key": cfg["api_key"], "anthropic-version": cfg["api_version"], "Content-Type": "application/json", } headers = { "Authorization": f"Bearer {cfg['api_key']}", "Content-Type": "application/json", } headers.update(cfg.get("extra_headers") or {}) return headers def _split_system_messages(messages): """Извлекает system prompt из списка сообщений.""" system_parts = [] chat_messages = [] for message in messages: role = str(message.get("role") or "").strip().lower() content = _content_to_text(message.get("content")) if role == "system": if content: system_parts.append(content) continue if role not in ("user", "assistant"): role = "user" chat_messages.append({"role": role, "content": content}) return "\n\n".join(system_parts), chat_messages def _build_payload(cfg, messages, max_tokens, temperature, stream): if cfg["protocol"] == "anthropic": # У Claude схема чуть отличается: system не живет внутри messages. system_prompt, chat_messages = _split_system_messages(messages) payload = { "model": cfg["model"], "messages": chat_messages, "max_tokens": max_tokens, "temperature": temperature, "stream": stream, } if system_prompt: payload["system"] = system_prompt return payload return { "model": cfg["model"], "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": stream, } def _extract_openai_compatible_content(data: dict) -> str: choices = data.get("choices") or [] if not choices: return "" message = choices[0].get("message") or {} return _content_to_text(message.get("content")) def _extract_anthropic_content(data: dict) -> str: return _content_to_text(data.get("content")) def _extract_response_content(cfg, data: dict) -> str: if cfg["protocol"] == "anthropic": return _extract_anthropic_content(data) return _extract_openai_compatible_content(data) def _iter_openai_compatible_stream(response): for line in response.iter_lines(decode_unicode=True): if not line or not line.startswith("data:"): continue data_str = line[5:].strip() if data_str == "[DONE]": break try: data_json = json.loads(data_str) except json.JSONDecodeError: continue choices = data_json.get("choices") or [] if not choices: continue delta = choices[0].get("delta") or {} content = delta.get("content", "") if isinstance(content, str): if content: yield content continue # Некоторые OpenAI-compatible API присылают контент кусками-объектами. if isinstance(content, list): for item in content: if isinstance(item, dict): text = item.get("text") if text: yield str(text) def _iter_anthropic_stream(response): for line in response.iter_lines(decode_unicode=True): if not line or not line.startswith("data:"): continue data_str = line[5:].strip() if data_str == "[DONE]": break try: data_json = json.loads(data_str) except json.JSONDecodeError: continue chunk_type = data_json.get("type") # Claude отдает поток событиями разных типов, нас интересует только текст. if chunk_type == "content_block_start": content_block = data_json.get("content_block") or {} text = content_block.get("text") if text: yield str(text) elif chunk_type == "content_block_delta": delta = data_json.get("delta") or {} text = delta.get("text") if text: yield str(text) def _iter_stream_chunks(cfg, response): if cfg["protocol"] == "anthropic": yield from _iter_anthropic_stream(response) return yield from _iter_openai_compatible_stream(response) def _log_request_exception(cfg, error: Exception): details = "" response = getattr(error, "response", None) if response is not None: body = (response.text or "").strip() if body: details = f" | body={body[:400]}" print(f"❌ Ошибка API ({cfg['name']}): {error}{details}") def _send_request(messages, max_tokens, temperature, error_text): """ Внутренняя функция для отправки HTTP-запроса к выбранному AI-провайдеру. Args: messages: Список сообщений (история чата). max_tokens: Максимальная длина ответа. temperature: "Креативность" (0.2 - строго, 1.0 - креативно). error_text: Текст ошибки для пользователя в случае сбоя. """ cfg, selection_error = _get_provider_settings() if selection_error: return selection_error config_error = _get_provider_config_error(cfg) if config_error: return config_error try: # Обычный запрос нужен для перевода и мест, где стриминг не требуется. response = _HTTP.post( cfg["api_url"], headers=_build_headers(cfg), json=_build_payload(cfg, messages, max_tokens, temperature, stream=False), timeout=15, ) response.raise_for_status() data = response.json() content = _extract_response_content(cfg, data) if not content: return "Не удалось обработать ответ от AI." return content except requests.exceptions.Timeout: return f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже." except requests.exceptions.RequestException as error: _log_request_exception(cfg, error) return error_text except Exception as error: print(f"❌ Ошибка парсинга ответа ({cfg['name']}): {error}") return "Не удалось обработать ответ от AI." def ask_ai(messages_history: list) -> str: """ Запрос к AI в режиме чата. Принимает историю переписки, добавляет SYSTEM_PROMPT и отправляет запрос. """ if not messages_history: return "Извините, я не расслышал вашу команду." # Логирование последнего запроса last_user_message = "Unknown" for msg in reversed(messages_history): if msg["role"] == "user": last_user_message = msg["content"] break print(f"🤖 Запрос к AI: {last_user_message}") # Формируем полный список сообщений с системной инструкцией в начале messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history) response = _send_request( messages, max_tokens=500, temperature=1.0, # Высокая температура для более живого общения error_text="Произошла ошибка при обращении к AI. Попробуйте ещё раз.", ) if response: print(f"💬 Ответ AI: {response[:100]}...") return response def ask_ai_stream(messages_history: list): """ Generator that yields chunks of the AI response as they arrive. """ cfg, selection_error = _get_provider_settings() if selection_error: yield selection_error return config_error = _get_provider_config_error(cfg) if config_error: yield config_error return if not messages_history: yield "Извините, я не расслышал вашу команду." return # Log the last user message last_user_message = "Unknown" for msg in reversed(messages_history): if msg["role"] == "user": last_user_message = msg["content"] break print(f"🤖 Запрос к AI ({cfg['name']}, Stream): {last_user_message}") messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history) try: # В голосовом режиме удобнее говорить частями, как только они приходят от API. response = _HTTP.post( cfg["api_url"], headers=_build_headers(cfg), json=_build_payload(cfg, messages, 500, 1.0, stream=True), timeout=15, stream=True, ) response.raise_for_status() for chunk in _iter_stream_chunks(cfg, response): yield chunk except requests.exceptions.Timeout: yield f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже." except requests.exceptions.RequestException as error: _log_request_exception(cfg, error) yield "Произошла ошибка связи." except Exception as error: print(f"❌ Streaming Error ({cfg['name']}): {error}") yield "Произошла ошибка связи." def translate_text(text: str, source_lang: str, target_lang: str) -> str: """ Запрос к AI в режиме перевода. Использует специальный промпт для переводчика. """ if not text: return "Извините, я не расслышал текст для перевода." lang_names = {"ru": "Russian", "en": "English"} source_name = lang_names.get(source_lang, source_lang) target_name = lang_names.get(target_lang, target_lang) print(f"🌍 Перевод: {source_name} -> {target_name}: {text[:60]}...") # Формируем промпт с подстановкой языков messages = [ { "role": "system", "content": TRANSLATION_SYSTEM_PROMPT.format( source=source_name, target=target_name ), }, {"role": "user", "content": text}, ] response = _send_request( messages, max_tokens=160, temperature=0.2, # Низкая температура для точности перевода error_text="Произошла ошибка при переводе. Попробуйте ещё раз.", ) cleaned = response.strip() if not cleaned: return cleaned # Normalize to 2-3 variants separated by " / " parts = [] for chunk in re.split(r"(?:\s*/\s*|\n|;|\|)", cleaned): item = chunk.strip(" \t-•") if item: parts.append(item) if not parts: return cleaned parts = parts[:3] return " / ".join(parts)