"""Timer module.""" # Модуль таймера. # Отвечает за установку таймеров (в оперативной памяти), их проверку и воспроизведение звука. import subprocess import re import json from datetime import datetime, timedelta from ..core.config import BASE_DIR from ..audio.stt import listen from ..core.commands import is_stop_command # Morphological analysis for better recognition of number words. try: import pymorphy3 _MORPH = pymorphy3.MorphAnalyzer() except Exception: _MORPH = None # Звуковой файл сигнала (используем тот же, что и для будильника) ALARM_SOUND = BASE_DIR / "assets" / "sounds" / "Apex-1.mp3" TIMER_FILE = BASE_DIR / "data" / "timers.json" # --- Number words parsing helpers (ru) --- _NUMBER_UNITS = { "ноль": 0, "один": 1, "два": 2, "три": 3, "четыре": 4, "пять": 5, "шесть": 6, "семь": 7, "восемь": 8, "девять": 9, } _NUMBER_TEENS = { "десять": 10, "одиннадцать": 11, "двенадцать": 12, "тринадцать": 13, "четырнадцать": 14, "пятнадцать": 15, "шестнадцать": 16, "семнадцать": 17, "восемнадцать": 18, "девятнадцать": 19, } _NUMBER_TENS = { "двадцать": 20, "тридцать": 30, "сорок": 40, "пятьдесят": 50, "шестьдесят": 60, "семьдесят": 70, "восемьдесят": 80, "девяносто": 90, } _NUMBER_HUNDREDS = { "сто": 100, "двести": 200, "триста": 300, "четыреста": 400, "пятьсот": 500, "шестьсот": 600, "семьсот": 700, "восемьсот": 800, "девятьсот": 900, } _NUMBER_SPECIAL = { "пол": 0.5, "полтора": 1.5, "полторы": 1.5, } _NUMBER_LEMMAS = ( set(_NUMBER_UNITS) | set(_NUMBER_TEENS) | set(_NUMBER_TENS) | set(_NUMBER_HUNDREDS) | set(_NUMBER_SPECIAL) ) _IGNORED_LEMMAS = { "на", "в", "во", "за", "через", "по", "к", "ко", "с", "со", "и", } _UNIT_LEMMAS = { "час": "hours", "минута": "minutes", "секунда": "seconds", "мин": "minutes", "сек": "seconds", } _UNIT_FORMS = { "hours": ("час", "часа", "часов"), "minutes": ("минуту", "минуты", "минут"), "seconds": ("секунду", "секунды", "секунд"), } # Optional ordinal formatting for list numbering. try: from num2words import num2words except Exception: num2words = None def _lemmatize(token: str) -> str: if _MORPH is None: return token return _MORPH.parse(token)[0].normal_form def _tokenize_with_lemmas(text: str): tokens = [] for match in re.finditer(r"[a-zA-Zа-яА-ЯёЁ]+|\d+", text.lower()): raw = match.group(0) lemma = _lemmatize(raw) if not raw.isdigit() else raw tokens.append({"raw": raw, "lemma": lemma}) return tokens def _parse_number_lemmas(lemmas): if not lemmas: return None if len(lemmas) == 1 and lemmas[0] in _NUMBER_SPECIAL: return _NUMBER_SPECIAL[lemmas[0]] total = 0 idx = 0 if lemmas[idx] in _NUMBER_HUNDREDS: total += _NUMBER_HUNDREDS[lemmas[idx]] idx += 1 if idx < len(lemmas) and lemmas[idx] in _NUMBER_TEENS: total += _NUMBER_TEENS[lemmas[idx]] idx += 1 else: if idx < len(lemmas) and lemmas[idx] in _NUMBER_TENS: total += _NUMBER_TENS[lemmas[idx]] idx += 1 if idx < len(lemmas) and lemmas[idx] in _NUMBER_UNITS: total += _NUMBER_UNITS[lemmas[idx]] idx += 1 if total == 0 and lemmas[0] in _NUMBER_UNITS: return _NUMBER_UNITS[lemmas[0]] return total if total > 0 else None def _normalize_timer_text(text: str) -> str: # Split "полчаса/полминуты/полсекунды" into "пол часа" for easier parsing. return re.sub( r"(?i)\bпол(?=(?:час|часа|минут|минуты|минуту|секунд|секунды|секунду|мин|сек)\b)", "пол ", text, ) def _find_word_number_before_unit(tokens, unit_index): collected = [] idx = unit_index - 1 while idx >= 0 and len(collected) < 4: lemma = tokens[idx]["lemma"] if lemma in _IGNORED_LEMMAS: idx -= 1 continue if lemma in _NUMBER_LEMMAS: collected.insert(0, lemma) idx -= 1 continue break return _parse_number_lemmas(collected) def _extract_word_time_values(text: str): tokens = _tokenize_with_lemmas(text) values = {"hours": None, "minutes": None, "seconds": None} for idx, token in enumerate(tokens): lemma = token["lemma"] key = _UNIT_LEMMAS.get(lemma) if not key: continue value = _find_word_number_before_unit(tokens, idx) if value is not None: values[key] = value return values def _format_unit(value: int, unit_key: str) -> str: if unit_key not in _UNIT_FORMS: return f"{value}" one, few, many = _UNIT_FORMS[unit_key] n = abs(int(value)) if n % 100 in (11, 12, 13, 14): word = many elif n % 10 == 1: word = one elif n % 10 in (2, 3, 4): word = few else: word = many return f"{value} {word}" def _format_duration(total_seconds: float) -> str: total_seconds = int(round(total_seconds)) if total_seconds < 0: total_seconds = 0 hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 parts = [] if hours: parts.append(_format_unit(hours, "hours")) if minutes: parts.append(_format_unit(minutes, "minutes")) if seconds or not parts: parts.append(_format_unit(seconds, "seconds")) return " ".join(parts) def _format_ordinal_index(index: int) -> str: if num2words is None: return f"{index}-й" try: return num2words(index, lang="ru", to="ordinal", case="nominative", gender="m") except Exception: return f"{index}-й" class TimerManager: def __init__(self): # Список активных таймеров: {"end_time": datetime, "label": str} self.timers = [] self.load_timers() def load_timers(self): """Загрузка списка таймеров из JSON файла.""" if TIMER_FILE.exists(): try: with open(TIMER_FILE, "r", encoding="utf-8") as f: raw = json.load(f) except Exception as e: print(f"❌ Ошибка загрузки таймеров: {e}") return timers = [] for item in raw: try: end_time = datetime.fromisoformat(item["end_time"]) except Exception: continue label = item.get("label", "") timers.append({"end_time": end_time, "label": label}) self.timers = sorted(timers, key=lambda x: x["end_time"]) def save_timers(self): """Сохранение списка таймеров в JSON файл.""" payload = [ {"end_time": t["end_time"].isoformat(), "label": t.get("label", "")} for t in self.timers ] try: with open(TIMER_FILE, "w", encoding="utf-8") as f: json.dump(payload, f, indent=4) except Exception as e: print(f"❌ Ошибка сохранения таймеров: {e}") def describe_timers(self) -> str: """Возвращает текстовое описание активных таймеров.""" if not self.timers: return "Активных таймеров нет." now = datetime.now() items = [] for idx, timer in enumerate(self.timers, start=1): remaining = (timer["end_time"] - now).total_seconds() label = timer.get("label", "").strip() ordinal = _format_ordinal_index(idx) if remaining <= 0: status = "сработает сейчас" else: status = f"осталось {_format_duration(remaining)}" if label: items.append(f"{ordinal}) {label} — {status}") else: items.append(f"{ordinal}) {status}") return "Активные таймеры: " + "; ".join(items) + "." def add_timer(self, seconds: int, label: str): """Добавление нового таймера.""" end_time = datetime.now() + timedelta(seconds=seconds) self.timers.append({"end_time": end_time, "label": label}) # Сортируем, чтобы ближайший был первым self.timers.sort(key=lambda x: x["end_time"]) self.save_timers() print(f"⏳ Таймер установлен на {label} (до {end_time.strftime('%H:%M:%S')})") def cancel_all_timers(self): """Отмена всех таймеров.""" count = len(self.timers) self.timers = [] self.save_timers() print(f"🔕 Все таймеры ({count}) отменены.") def check_timers(self): """ Проверка: не истек ли какой-то таймер? Вызывается в главном цикле. Возвращает True, если таймер сработал (и был обработан). """ if not self.timers: return False now = datetime.now() # Смотрим первый (самый ранний) таймер # Используем индекс 0, так как список отсортирован first_timer = self.timers[0] if now >= first_timer["end_time"]: # Таймер сработал! # Удаляем его из списка label = first_timer["label"] self.timers.pop(0) self.save_timers() print(f"⌛ ТАЙМЕР ИСТЕК: {label}") self.trigger_timer(label) return True return False def trigger_timer(self, label: str): """ Логика срабатывания таймера. Запускает воспроизведение MP3 и слушает команду "Стоп". """ print(f"🔔 ТАЙМЕР НА {label} СРАБОТАЛ! (Скажите 'Стоп')") # Запуск плеера mpg123 в бесконечном цикле cmd = ["mpg123", "-q", "--loop", "-1", str(ALARM_SOUND)] try: process = subprocess.Popen(cmd) except FileNotFoundError: print( "❌ Ошибка: mpg123 не найден. Установите его: sudo apt install mpg123" ) return try: # Цикл ожидания стоп-команды while True: text = listen(timeout_seconds=3.0, detection_timeout=3.0) if text: if is_stop_command(text, mode="lenient"): print(f"🛑 Таймер остановлен по команде: '{text}'") break except Exception as e: print(f"❌ Ошибка во время таймера: {e}") finally: # Обязательно убиваем процесс плеера process.terminate() try: process.wait(timeout=1) except subprocess.TimeoutExpired: process.kill() print("🔕 Таймер выключен.") def parse_command(self, text: str) -> str | None: """ Парсинг команды установки таймера. Примеры: "таймер на 5 минут", "засеки 10 секунд". """ text = _normalize_timer_text(text.lower()) # Ключевые слова для таймера if not any(word in text for word in ["таймер", "засеки", "поставь таймер"]): return None if "таймер" in text and re.search( r"(какие|какой|список|активн|покажи|сколько|есть ли)", text ): return self.describe_timers() if "отмени" in text or "удали" in text: self.cancel_all_timers() return "Хорошо, все таймеры отменены." # Поиск времени # Ищем комбинации: число + (час/мин/сек) # Пример: "1 час 30 минут", "5 минут", "30 секунд" total_seconds = 0 parts = [] hours = None minutes = None seconds = None has_fractional = False # Часы match_hours = re.search(r"(\d+)\s*(?:час|часа|часов)", text) if match_hours: hours = int(match_hours.group(1)) # Минуты match_minutes = re.search(r"(\d+)\s*(?:мин|минуту|минуты|минут)", text) if match_minutes: minutes = int(match_minutes.group(1)) # Секунды match_seconds = re.search(r"(\d+)\s*(?:сек|секунду|секунды|секунд)", text) if match_seconds: seconds = int(match_seconds.group(1)) # Дополняем числительные словами (например, "одну минуту") word_values = _extract_word_time_values(text) if hours is None and word_values["hours"] is not None: hours = word_values["hours"] if minutes is None and word_values["minutes"] is not None: minutes = word_values["minutes"] if seconds is None and word_values["seconds"] is not None: seconds = word_values["seconds"] if hours is not None and isinstance(hours, float) and hours % 1 != 0: has_fractional = True if minutes is not None and isinstance(minutes, float) and minutes % 1 != 0: has_fractional = True if seconds is not None and isinstance(seconds, float) and seconds % 1 != 0: has_fractional = True found_time = any(value is not None for value in [hours, minutes, seconds]) if found_time: total_seconds = ( (hours or 0) * 3600 + (minutes or 0) * 60 + (seconds or 0) ) if has_fractional: total_seconds = int(round(total_seconds)) h = total_seconds // 3600 m = (total_seconds % 3600) // 60 s = total_seconds % 60 else: h = int(hours) if hours is not None else 0 m = int(minutes) if minutes is not None else 0 s = int(seconds) if seconds is not None else 0 if h: parts.append(_format_unit(h, "hours")) if m: parts.append(_format_unit(m, "minutes")) if s or not parts: parts.append(_format_unit(s, "seconds")) if found_time and total_seconds > 0: label = " ".join(parts) self.add_timer(total_seconds, label) return f"Поставил таймер на {label}." # Если сказали "таймер", но не нашли время return "Я не понял, на сколько поставить таймер. Скажите, например, 'таймер на 5 минут'." # Глобальный экземпляр _timer_manager = None def get_timer_manager(): global _timer_manager if _timer_manager is None: _timer_manager = TimerManager() return _timer_manager