"""AI module.""" import json import re from typing import Optional import requests from .config import ( AI_CHAT_MAX_CHARS, AI_PROVIDER, AI_CHAT_MAX_TOKENS, AI_CHAT_TEMPERATURE, AI_INTENT_TEMPERATURE, AI_TRANSLATION_TEMPERATURE, ANTHROPIC_API_KEY, ANTHROPIC_API_URL, ANTHROPIC_API_VERSION, ANTHROPIC_MODEL, GEMINI_API_KEY, GEMINI_API_URL, GEMINI_MODEL, OLLAMA_API_URL, OLLAMA_MODEL, OPENAI_API_KEY, OPENAI_API_URL, OPENAI_MODEL, OPENROUTER_API_KEY, OPENROUTER_API_URL, OPENROUTER_MODEL, WAKE_WORD, WAKE_WORD_ALIASES, ZAI_API_KEY, ZAI_API_URL, ZAI_MODEL, ) _HTTP = requests.Session() _CITATION_SQUARE_RE = re.compile(r"(?:\s*\[\d+\])+") _CITATION_FULLWIDTH_RE = re.compile(r"【\d+[^】]*】") _PUNCT_SPACING_RE = re.compile(r"\s+([,.;:!?…])") _SENTENCE_BOUNDARY_RE = re.compile(r"([.!?…])\s+") _SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?…])\s+") # Системный промпт _wake_word_aliases_text = ", ".join(WAKE_WORD_ALIASES) SYSTEM_PROMPT = f"""Ты — умный голосовой ассистент с человеческим поведением. Веди себя как живой человек: будь дружелюбным, естественным и немного эмоциональным, где это уместно. Твоя главная цель — помогать пользователю и поддерживать интересный диалог. Отвечай на русском языке кратко и по существу: обычно 1-2 коротких предложения. Если пользователь явно просит подробнее, можно до 4 коротких предложений без повторов и лишних вводных. Избегай длинных списков, сложного форматирования и спецсимволов, так как твои ответы озвучиваются голосом. Не добавляй ссылки, сноски и маркеры источников (например, [1], [2], URL). Пиши в разговорном стиле, как при живом общении, но не забывай о вежливости и правильности твоих ответов. Понимай юмор, иронию, сарказм, образные выражения, намеки и переносный смысл фраз. Если пользователь шутит или говорит образно, сначала правильно восстанови его реальное намерение, затем ответь естественно и по смыслу. Если в шутке или метафоре скрыта команда или просьба, трактуй ее по смыслу, а не буквально. ВАЖНО: Не используй в ответах панибратские или сленговые приветствия и обращения, такие как "Эй", "Хэй", "Слушай" в начале фразы и подобные. Тебя активируют словом "{WAKE_WORD}". Никогда не произноси это слово и его варианты ({_wake_word_aliases_text}) ни в каком ответе. Если пользователь спрашивает, как тебя зовут или как к тебе обращаться, отвечай нейтрально: "Я ваш голосовой ассистент".""" SYSTEM_PROMPT += ( '\nROLE_JSON: {"name":"голосовой ассистент","role":"умный голосовой ассистент",' '"language":"ru","style":["дружелюбный","естественный","краткий"],"format":"plain"}' ) # Промпт для перевода TRANSLATION_SYSTEM_PROMPT = """You are a translation engine. Translate from {source} to {target}. Return 2-3 short translation variants only. No explanations, no quotes, no comments. Separate variants with " / " (space slash space). Keep the translation максимально кратким и естественным, без лишних слов.""" INTENT_SYSTEM_PROMPT = """Ты NLU-модуль голосовой колонки. Твоя задача: распознать намерение пользователя и вернуть СТРОГО JSON без markdown и пояснений. Всегда возвращай объект c ключами: { "intent": "none|music|timer|alarm|weather|volume|translation|cities|repeat|stop|smalltalk|chat", "normalized_command": "<краткая нормализованная команда на русском или пусто>", "music_action": "none|play|pause|resume|next|previous|current|play_genre|play_folder|play_query", "music_query": "<запрос для музыки/жанра/папки или пусто>", "confidence": 0.0 } Правила: - Если это музыка, ставь intent=music и выбирай music_action. - "Включи музыку" и любые эквиваленты = music_action=play. - Для "пауза/останови музыку/выключи музыку" = music_action=pause. - Для "что играет" = music_action=current. - Для "включи жанр X" = music_action=play_genre, music_query=X. - Для "включи папку X" = music_action=play_folder, music_query=X. - Если это будильник, ставь intent=alarm и нормализуй команду в одну из форм: 1) Создание/изменение: "поставь будильник на HH:MM [по будням|по выходным|каждый день|по <дням>]" 2) Показ списка: "покажи активные будильники" 3) Удаление конкретного: "удали будильник на HH:MM [по будням|по выходным|по <дням>]" 4) Удаление всех: "отмени все будильники" - Если пользователь просит поставить/удалить будильник, но время не названо, normalized_command должен быть: "поставь будильник" или "удали будильник". - normalized_command должен быть пригоден для командного парсера (без лишних слов). - Понимай разговорные, шутливые, переносные, косвенные и ироничные формулировки. - Восстанавливай намерение по смыслу, а не только по буквальным словам. - Если в фразе есть скрытая прикладная команда для колонки, верни соответствующий intent и normalized_command. - Если пользователь просто шутит или разговаривает без прикладной команды, выбирай smalltalk или chat, а не случайную системную команду. - Если уверенность низкая, ставь intent=none, music_action=none, confidence <= 0.4.""" _PROVIDER_ALIASES = { "": "openrouter", "anthropic": "anthropic", "claude": "anthropic", "claude_anthropic": "anthropic", "gemini": "gemini", "google": "gemini", "olama": "ollama", "ollama": "ollama", "openai": "openai", "openrouter": "openrouter", "z.ai": "zai", "z-ai": "zai", "z_ai": "zai", "zai": "zai", } # В .env нужен только один AI-ключ _PROVIDER_SETTINGS = { "openrouter": { "provider": "openrouter", "protocol": "openai_compatible", "api_key": OPENROUTER_API_KEY, "model": OPENROUTER_MODEL, "api_url": OPENROUTER_API_URL, "name": "OpenRouter", "key_var": "OPENROUTER_API_KEY", "model_var": "OPENROUTER_MODEL", }, "openai": { "provider": "openai", "protocol": "openai_compatible", "api_key": OPENAI_API_KEY, "model": OPENAI_MODEL, "api_url": OPENAI_API_URL, "name": "OpenAI", "key_var": "OPENAI_API_KEY", "model_var": "OPENAI_MODEL", }, "gemini": { "provider": "gemini", "protocol": "openai_compatible", "api_key": GEMINI_API_KEY, "model": GEMINI_MODEL, "api_url": GEMINI_API_URL, "name": "Gemini", "key_var": "GEMINI_API_KEY", "model_var": "GEMINI_MODEL", }, "zai": { "provider": "zai", "protocol": "openai_compatible", "api_key": ZAI_API_KEY, "model": ZAI_MODEL, "api_url": ZAI_API_URL, "name": "Z.ai", "key_var": "ZAI_API_KEY", "model_var": "ZAI_MODEL", "extra_headers": { "Accept-Language": "en-US,en", }, }, "anthropic": { "provider": "anthropic", "protocol": "anthropic", "api_key": ANTHROPIC_API_KEY, "model": ANTHROPIC_MODEL, "api_url": ANTHROPIC_API_URL, "api_version": ANTHROPIC_API_VERSION, "name": "Anthropic Claude", "key_var": "ANTHROPIC_API_KEY", "model_var": "ANTHROPIC_MODEL", }, "ollama": { "provider": "ollama", "protocol": "openai_compatible", # Ollama обычно локальный и не требует API key. "api_key": None, "requires_api_key": False, "model": OLLAMA_MODEL, "api_url": OLLAMA_API_URL, "name": "Ollama", "key_var": "OLLAMA_API_KEY", "model_var": "OLLAMA_MODEL", }, } def _has_active_api_key(value) -> bool: return bool(str(value or "").strip()) def _normalize_provider_name(provider_name: str) -> str: normalized = (provider_name or "").strip().lower() return _PROVIDER_ALIASES.get(normalized, normalized) def _get_provider_settings(): """Определяет какой AI провайдер использовать.""" # Ищем активные ключи configured = [ cfg for cfg in _PROVIDER_SETTINGS.values() if _has_active_api_key(cfg.get("api_key")) ] if len(configured) == 1: cfg = configured[0] requested = _normalize_provider_name(AI_PROVIDER) if ( requested and requested in _PROVIDER_SETTINGS and requested != cfg["provider"] ): print( f"⚠️ AI_PROVIDER={AI_PROVIDER!r} не совпадает с единственным " f"активным ключом {cfg['name']}. Используем {cfg['name']}." ) return cfg, None if len(configured) > 1: names = ", ".join(cfg["name"] for cfg in configured) return None, ( "Обнаружено несколько AI API ключей. Оставьте незакомментированным " f"только один ключ. Сейчас активны: {names}. " "Колонка не знает, какой AI использовать." ) provider = _normalize_provider_name(AI_PROVIDER) cfg = _PROVIDER_SETTINGS.get(provider) if cfg: return cfg, None supported = ", ".join(sorted(_PROVIDER_SETTINGS)) print( f"⚠️ Неизвестный AI_PROVIDER={AI_PROVIDER!r}, используем OpenRouter. " f"Поддерживаются: {supported}." ) return _PROVIDER_SETTINGS["openrouter"], None def _content_to_text(content) -> str: if isinstance(content, str): return content if isinstance(content, list): parts = [] for item in content: if isinstance(item, dict): text = item.get("text") if text: parts.append(str(text)) elif item is not None: parts.append(str(item)) return "".join(parts) if content is None: return "" return str(content) def _get_provider_config_error(cfg) -> Optional[str]: if not cfg: return "Не настроен AI-провайдер. Проверьте файл .env." if cfg.get("requires_api_key", True) and not cfg.get("api_key"): return f"Не настроен {cfg['key_var']}. Проверьте файл .env." if not cfg["model"]: return f"Не настроен {cfg['model_var']}. Проверьте файл .env." return None def _build_headers(cfg): if cfg["protocol"] == "anthropic": return { "x-api-key": cfg["api_key"], "anthropic-version": cfg["api_version"], "Content-Type": "application/json", } headers = {"Content-Type": "application/json"} if cfg.get("api_key"): headers["Authorization"] = f"Bearer {cfg['api_key']}" headers.update(cfg.get("extra_headers") or {}) return headers def _split_system_messages(messages): """Извлекает system prompt из списка сообщений.""" system_parts = [] chat_messages = [] for message in messages: role = str(message.get("role") or "").strip().lower() content = _content_to_text(message.get("content")) if role == "system": if content: system_parts.append(content) continue if role not in ("user", "assistant"): role = "user" chat_messages.append({"role": role, "content": content}) return "\n\n".join(system_parts), chat_messages def _build_payload(cfg, messages, max_tokens, temperature, stream): if cfg["protocol"] == "anthropic": # У Claude схема чуть отличается: system не живет внутри messages. system_prompt, chat_messages = _split_system_messages(messages) payload = { "model": cfg["model"], "messages": chat_messages, "max_tokens": max_tokens, "temperature": temperature, "stream": stream, } if system_prompt: payload["system"] = system_prompt return payload return { "model": cfg["model"], "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": stream, } def _extract_openai_compatible_content(data: dict) -> str: choices = data.get("choices") or [] if not choices: return "" message = choices[0].get("message") or {} return _content_to_text(message.get("content")) def _extract_anthropic_content(data: dict) -> str: return _content_to_text(data.get("content")) def _extract_response_content(cfg, data: dict) -> str: if cfg["protocol"] == "anthropic": return _extract_anthropic_content(data) return _extract_openai_compatible_content(data) def _iter_openai_compatible_stream(response): for data_str in _iter_sse_data_lines(response): if data_str == "[DONE]": break try: data_json = json.loads(data_str) except json.JSONDecodeError: continue choices = data_json.get("choices") or [] if not choices: continue delta = choices[0].get("delta") or {} content = delta.get("content", "") if isinstance(content, str): if content: yield content continue # Некоторые OpenAI-compatible API присылают контент кусками-объектами. if isinstance(content, list): for item in content: if isinstance(item, dict): text = item.get("text") if text: yield str(text) def _iter_anthropic_stream(response): for data_str in _iter_sse_data_lines(response): if data_str == "[DONE]": break try: data_json = json.loads(data_str) except json.JSONDecodeError: continue chunk_type = data_json.get("type") # Claude отдает поток событиями разных типов, нас интересует только текст. if chunk_type == "content_block_start": content_block = data_json.get("content_block") or {} text = content_block.get("text") if text: yield str(text) elif chunk_type == "content_block_delta": delta = data_json.get("delta") or {} text = delta.get("text") if text: yield str(text) def _iter_sse_data_lines(response): """ Читает SSE-стрим и возвращает только payload после "data:". Явно декодируем как UTF-8, чтобы избежать mojibake вида "Пр...". """ for raw_line in response.iter_lines(decode_unicode=False): if not raw_line: continue if isinstance(raw_line, bytes): line = raw_line.decode("utf-8", errors="replace") else: line = str(raw_line) if not line.startswith("data:"): continue data_str = line[5:].strip() if data_str: yield data_str def _iter_stream_chunks(cfg, response): if cfg["protocol"] == "anthropic": yield from _iter_anthropic_stream(response) return yield from _iter_openai_compatible_stream(response) def _log_request_exception(cfg, error: Exception): details = "" response = getattr(error, "response", None) if response is not None: body = (response.text or "").strip() if body: details = f" | body={body[:400]}" print(f"❌ Ошибка API ({cfg['name']}): {error}{details}") def _extract_json_object(raw_text: str) -> Optional[dict]: text = str(raw_text or "").strip() if not text: return None try: payload = json.loads(text) if isinstance(payload, dict): return payload except json.JSONDecodeError: pass match = re.search(r"\{.*\}", text, flags=re.DOTALL) if not match: return None candidate = match.group(0).strip() try: payload = json.loads(candidate) except json.JSONDecodeError: return None if isinstance(payload, dict): return payload return None def _sanitize_chat_response(text: str) -> str: cleaned = str(text or "") if not cleaned: return "" cleaned = _CITATION_SQUARE_RE.sub("", cleaned) cleaned = _CITATION_FULLWIDTH_RE.sub("", cleaned) cleaned = _PUNCT_SPACING_RE.sub(r"\1", cleaned) cleaned = re.sub(r"[ \t]+", " ", cleaned) cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned.strip() def _truncate_chat_response(text: str, max_chars: int) -> str: cleaned = str(text or "").strip() if not cleaned: return "" safe_limit = max(120, int(max_chars)) if len(cleaned) <= safe_limit: return cleaned sentences = [part.strip() for part in _SENTENCE_SPLIT_RE.split(cleaned) if part.strip()] if sentences: selected = [] current_length = 0 for sentence in sentences: projected = current_length + len(sentence) + (1 if selected else 0) if projected > safe_limit: break selected.append(sentence) current_length = projected if selected: result = " ".join(selected).rstrip(" ,;:-") if result and result[-1] not in ".!?…": result += "." return result # Если первое предложение слишком длинное, режем аккуратно по слову. first = sentences[0] else: first = cleaned clipped = first[:safe_limit].rstrip() word_boundary = clipped.rfind(" ") if word_boundary >= int(safe_limit * 0.6): clipped = clipped[:word_boundary].rstrip() clipped = clipped.rstrip(" ,;:-") if clipped.endswith((".", "!", "?", "…")): return clipped return f"{clipped}..." def _send_request(messages, max_tokens, temperature, error_text): """ Внутренняя функция для отправки HTTP-запроса к выбранному AI-провайдеру. Args: messages: Список сообщений (история чата). max_tokens: Максимальная длина ответа. temperature: "Креативность" (0.2 - строго, 1.0 - креативно). error_text: Текст ошибки для пользователя в случае сбоя. """ cfg, selection_error = _get_provider_settings() if selection_error: return selection_error config_error = _get_provider_config_error(cfg) if config_error: return config_error try: # Обычный запрос нужен для перевода и мест, где стриминг не требуется. response = _HTTP.post( cfg["api_url"], headers=_build_headers(cfg), json=_build_payload(cfg, messages, max_tokens, temperature, stream=False), timeout=15, ) response.raise_for_status() data = response.json() content = _extract_response_content(cfg, data) if not content: return "Не удалось обработать ответ от AI." return content except requests.exceptions.Timeout: return f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже." except requests.exceptions.RequestException as error: _log_request_exception(cfg, error) return error_text except Exception as error: print(f"❌ Ошибка парсинга ответа ({cfg['name']}): {error}") return "Не удалось обработать ответ от AI." def interpret_assistant_intent(text: str) -> dict: """ Interprets voice command semantics for downstream command routers. Returns a normalized dict even when AI is unavailable. """ result = { "intent": "none", "normalized_command": "", "music_action": "none", "music_query": "", "confidence": 0.0, } cleaned_text = str(text or "").strip() if not cleaned_text: return result cfg, selection_error = _get_provider_settings() if selection_error: return result if _get_provider_config_error(cfg): return result messages = [ {"role": "system", "content": INTENT_SYSTEM_PROMPT}, {"role": "user", "content": cleaned_text}, ] response = _send_request( messages, max_tokens=220, temperature=AI_INTENT_TEMPERATURE, error_text="", ) payload = _extract_json_object(response) if not payload: return result allowed_intents = { "none", "music", "timer", "alarm", "weather", "volume", "translation", "cities", "repeat", "stop", "smalltalk", "chat", } allowed_music_actions = { "none", "play", "pause", "resume", "next", "previous", "current", "play_genre", "play_folder", "play_query", } intent = str(payload.get("intent", "none")).strip().lower() if intent not in allowed_intents: intent = "none" music_action = str(payload.get("music_action", "none")).strip().lower() if music_action not in allowed_music_actions: music_action = "none" try: confidence = float(payload.get("confidence", 0.0)) except (TypeError, ValueError): confidence = 0.0 confidence = max(0.0, min(1.0, confidence)) normalized_command = str(payload.get("normalized_command", "")).strip() music_query = str(payload.get("music_query", "")).strip() result.update( { "intent": intent, "normalized_command": normalized_command, "music_action": music_action, "music_query": music_query, "confidence": confidence, } ) return result def ask_ai(messages_history: list) -> str: """ Запрос к AI в режиме чата. Принимает историю переписки, добавляет SYSTEM_PROMPT и отправляет запрос. """ if not messages_history: return "Извините, я не расслышал вашу команду." # Логирование последнего запроса last_user_message = "Unknown" for msg in reversed(messages_history): if msg["role"] == "user": last_user_message = msg["content"] break print(f"🤖 Запрос к AI: {last_user_message}") # Формируем полный список сообщений с системной инструкцией в начале messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history) response = _send_request( messages, max_tokens=AI_CHAT_MAX_TOKENS, temperature=AI_CHAT_TEMPERATURE, error_text="Произошла ошибка при обращении к AI. Попробуйте ещё раз.", ) response = _sanitize_chat_response(response) response = _truncate_chat_response(response, AI_CHAT_MAX_CHARS) if response: print(f"💬 Ответ AI: {response[:100]}...") return response def ask_ai_stream(messages_history: list): """ Generator that yields chunks of the AI response as they arrive. """ response = None cfg, selection_error = _get_provider_settings() if selection_error: yield selection_error return config_error = _get_provider_config_error(cfg) if config_error: yield config_error return if not messages_history: yield "Извините, я не расслышал вашу команду." return # Log the last user message last_user_message = "Unknown" for msg in reversed(messages_history): if msg["role"] == "user": last_user_message = msg["content"] break print(f"🤖 Запрос к AI ({cfg['name']}, Stream): {last_user_message}") messages = [{"role": "system", "content": SYSTEM_PROMPT}] + list(messages_history) try: # В голосовом режиме удобнее говорить частями, как только они приходят от API. response = _HTTP.post( cfg["api_url"], headers=_build_headers(cfg), json=_build_payload( cfg, messages, AI_CHAT_MAX_TOKENS, AI_CHAT_TEMPERATURE, stream=True, ), timeout=15, stream=True, ) response.raise_for_status() # Для устойчивости TTS сначала собираем поток, затем чистим и аккуратно # ограничиваем длину по границе предложения. raw_parts = [] for chunk in _iter_stream_chunks(cfg, response): if chunk: raw_parts.append(chunk) full_text = _sanitize_chat_response("".join(raw_parts)) full_text = _truncate_chat_response(full_text, AI_CHAT_MAX_CHARS) if not full_text: return # Отдаем кусками по предложениям, чтобы main.py мог начинать озвучку раньше. parts = _SENTENCE_BOUNDARY_RE.split(full_text) if not parts: yield full_text return sentence = "" for part in parts: if not part: continue sentence += part if part in ".!?…": yield sentence.strip() + " " sentence = "" if sentence.strip(): yield sentence.strip() except requests.exceptions.Timeout: yield f"Извините, сервер {cfg['name']} не отвечает. Попробуйте позже." except requests.exceptions.RequestException as error: _log_request_exception(cfg, error) yield "Произошла ошибка связи." except Exception as error: print(f"❌ Streaming Error ({cfg['name']}): {error}") yield "Произошла ошибка связи." finally: if response is not None: try: response.close() except Exception: pass def translate_text(text: str, source_lang: str, target_lang: str) -> str: """ Запрос к AI в режиме перевода. Использует специальный промпт для переводчика. """ if not text: return "Извините, я не расслышал текст для перевода." lang_names = {"ru": "Russian", "en": "English"} source_name = lang_names.get(source_lang, source_lang) target_name = lang_names.get(target_lang, target_lang) print(f"🌍 Перевод: {source_name} -> {target_name}: {text[:60]}...") # Формируем промпт с подстановкой языков messages = [ { "role": "system", "content": TRANSLATION_SYSTEM_PROMPT.format( source=source_name, target=target_name ), }, {"role": "user", "content": text}, ] response = _send_request( messages, max_tokens=160, temperature=AI_TRANSLATION_TEMPERATURE, error_text="Произошла ошибка при переводе. Попробуйте ещё раз.", ) cleaned = _sanitize_chat_response(response).strip() cleaned = re.sub(r"[*_`]+", "", cleaned) if not cleaned: return cleaned # Normalize to 2-3 variants separated by " / " parts = [] for chunk in re.split(r"(?:\s*/\s*|\n|;|\|)", cleaned): item = chunk.strip(" \t-•\"'“”«»") if item: parts.append(item) if not parts: return cleaned parts = parts[:3] return " / ".join(parts)