493 lines
16 KiB
Python
493 lines
16 KiB
Python
"""Timer module."""
|
||
|
||
# Модуль таймера.
|
||
# Отвечает за установку таймеров (в оперативной памяти), их проверку и воспроизведение звука.
|
||
|
||
import subprocess
|
||
import re
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
from ..core.config import BASE_DIR
|
||
from ..audio.stt import listen
|
||
from ..core.commands import is_stop_command
|
||
|
||
# Morphological analysis for better recognition of number words.
|
||
try:
|
||
import pymorphy3
|
||
|
||
_MORPH = pymorphy3.MorphAnalyzer()
|
||
except Exception:
|
||
_MORPH = None
|
||
|
||
# Звуковой файл сигнала (используем тот же, что и для будильника)
|
||
ALARM_SOUND = BASE_DIR / "assets" / "sounds" / "Apex-1.mp3"
|
||
TIMER_FILE = BASE_DIR / "data" / "timers.json"
|
||
|
||
# --- Number words parsing helpers (ru) ---
|
||
_NUMBER_UNITS = {
|
||
"ноль": 0,
|
||
"один": 1,
|
||
"два": 2,
|
||
"три": 3,
|
||
"четыре": 4,
|
||
"пять": 5,
|
||
"шесть": 6,
|
||
"семь": 7,
|
||
"восемь": 8,
|
||
"девять": 9,
|
||
}
|
||
_NUMBER_TEENS = {
|
||
"десять": 10,
|
||
"одиннадцать": 11,
|
||
"двенадцать": 12,
|
||
"тринадцать": 13,
|
||
"четырнадцать": 14,
|
||
"пятнадцать": 15,
|
||
"шестнадцать": 16,
|
||
"семнадцать": 17,
|
||
"восемнадцать": 18,
|
||
"девятнадцать": 19,
|
||
}
|
||
_NUMBER_TENS = {
|
||
"двадцать": 20,
|
||
"тридцать": 30,
|
||
"сорок": 40,
|
||
"пятьдесят": 50,
|
||
"шестьдесят": 60,
|
||
"семьдесят": 70,
|
||
"восемьдесят": 80,
|
||
"девяносто": 90,
|
||
}
|
||
_NUMBER_HUNDREDS = {
|
||
"сто": 100,
|
||
"двести": 200,
|
||
"триста": 300,
|
||
"четыреста": 400,
|
||
"пятьсот": 500,
|
||
"шестьсот": 600,
|
||
"семьсот": 700,
|
||
"восемьсот": 800,
|
||
"девятьсот": 900,
|
||
}
|
||
_NUMBER_SPECIAL = {
|
||
"пол": 0.5,
|
||
"полтора": 1.5,
|
||
"полторы": 1.5,
|
||
}
|
||
_NUMBER_LEMMAS = (
|
||
set(_NUMBER_UNITS)
|
||
| set(_NUMBER_TEENS)
|
||
| set(_NUMBER_TENS)
|
||
| set(_NUMBER_HUNDREDS)
|
||
| set(_NUMBER_SPECIAL)
|
||
)
|
||
_IGNORED_LEMMAS = {
|
||
"на",
|
||
"в",
|
||
"во",
|
||
"за",
|
||
"через",
|
||
"по",
|
||
"к",
|
||
"ко",
|
||
"с",
|
||
"со",
|
||
"и",
|
||
}
|
||
_UNIT_LEMMAS = {
|
||
"час": "hours",
|
||
"минута": "minutes",
|
||
"секунда": "seconds",
|
||
"мин": "minutes",
|
||
"сек": "seconds",
|
||
}
|
||
_UNIT_FORMS = {
|
||
"hours": ("час", "часа", "часов"),
|
||
"minutes": ("минуту", "минуты", "минут"),
|
||
"seconds": ("секунду", "секунды", "секунд"),
|
||
}
|
||
|
||
# Optional ordinal formatting for list numbering.
|
||
try:
|
||
from num2words import num2words
|
||
except Exception:
|
||
num2words = None
|
||
|
||
|
||
def _lemmatize(token: str) -> str:
|
||
if _MORPH is None:
|
||
return token
|
||
return _MORPH.parse(token)[0].normal_form
|
||
|
||
|
||
def _tokenize_with_lemmas(text: str):
|
||
tokens = []
|
||
for match in re.finditer(r"[a-zA-Zа-яА-ЯёЁ]+|\d+", text.lower()):
|
||
raw = match.group(0)
|
||
lemma = _lemmatize(raw) if not raw.isdigit() else raw
|
||
tokens.append({"raw": raw, "lemma": lemma})
|
||
return tokens
|
||
|
||
|
||
def _parse_number_lemmas(lemmas):
|
||
if not lemmas:
|
||
return None
|
||
|
||
if len(lemmas) == 1 and lemmas[0] in _NUMBER_SPECIAL:
|
||
return _NUMBER_SPECIAL[lemmas[0]]
|
||
|
||
total = 0
|
||
idx = 0
|
||
|
||
if lemmas[idx] in _NUMBER_HUNDREDS:
|
||
total += _NUMBER_HUNDREDS[lemmas[idx]]
|
||
idx += 1
|
||
|
||
if idx < len(lemmas) and lemmas[idx] in _NUMBER_TEENS:
|
||
total += _NUMBER_TEENS[lemmas[idx]]
|
||
idx += 1
|
||
else:
|
||
if idx < len(lemmas) and lemmas[idx] in _NUMBER_TENS:
|
||
total += _NUMBER_TENS[lemmas[idx]]
|
||
idx += 1
|
||
if idx < len(lemmas) and lemmas[idx] in _NUMBER_UNITS:
|
||
total += _NUMBER_UNITS[lemmas[idx]]
|
||
idx += 1
|
||
|
||
if total == 0 and lemmas[0] in _NUMBER_UNITS:
|
||
return _NUMBER_UNITS[lemmas[0]]
|
||
|
||
return total if total > 0 else None
|
||
|
||
|
||
def _normalize_timer_text(text: str) -> str:
|
||
# Split "полчаса/полминуты/полсекунды" into "пол часа" for easier parsing.
|
||
return re.sub(
|
||
r"(?i)\bпол(?=(?:час|часа|минут|минуты|минуту|секунд|секунды|секунду|мин|сек)\b)",
|
||
"пол ",
|
||
text,
|
||
)
|
||
|
||
|
||
def _find_word_number_before_unit(tokens, unit_index):
|
||
collected = []
|
||
idx = unit_index - 1
|
||
while idx >= 0 and len(collected) < 4:
|
||
lemma = tokens[idx]["lemma"]
|
||
if lemma in _IGNORED_LEMMAS:
|
||
idx -= 1
|
||
continue
|
||
if lemma in _NUMBER_LEMMAS:
|
||
collected.insert(0, lemma)
|
||
idx -= 1
|
||
continue
|
||
break
|
||
return _parse_number_lemmas(collected)
|
||
|
||
|
||
def _extract_word_time_values(text: str):
|
||
tokens = _tokenize_with_lemmas(text)
|
||
values = {"hours": None, "minutes": None, "seconds": None}
|
||
|
||
for idx, token in enumerate(tokens):
|
||
lemma = token["lemma"]
|
||
key = _UNIT_LEMMAS.get(lemma)
|
||
if not key:
|
||
continue
|
||
value = _find_word_number_before_unit(tokens, idx)
|
||
if value is not None:
|
||
values[key] = value
|
||
return values
|
||
|
||
|
||
def _format_unit(value: int, unit_key: str) -> str:
|
||
if unit_key not in _UNIT_FORMS:
|
||
return f"{value}"
|
||
|
||
one, few, many = _UNIT_FORMS[unit_key]
|
||
n = abs(int(value))
|
||
if n % 100 in (11, 12, 13, 14):
|
||
word = many
|
||
elif n % 10 == 1:
|
||
word = one
|
||
elif n % 10 in (2, 3, 4):
|
||
word = few
|
||
else:
|
||
word = many
|
||
return f"{value} {word}"
|
||
|
||
|
||
def _format_duration(total_seconds: float) -> str:
|
||
total_seconds = int(round(total_seconds))
|
||
if total_seconds < 0:
|
||
total_seconds = 0
|
||
|
||
hours = total_seconds // 3600
|
||
minutes = (total_seconds % 3600) // 60
|
||
seconds = total_seconds % 60
|
||
|
||
parts = []
|
||
if hours:
|
||
parts.append(_format_unit(hours, "hours"))
|
||
if minutes:
|
||
parts.append(_format_unit(minutes, "minutes"))
|
||
if seconds or not parts:
|
||
parts.append(_format_unit(seconds, "seconds"))
|
||
return " ".join(parts)
|
||
|
||
|
||
def _format_ordinal_index(index: int) -> str:
|
||
if num2words is None:
|
||
return f"{index}-й"
|
||
try:
|
||
return num2words(index, lang="ru", to="ordinal", case="nominative", gender="m")
|
||
except Exception:
|
||
return f"{index}-й"
|
||
|
||
|
||
class TimerManager:
|
||
def __init__(self):
|
||
# Список активных таймеров: {"end_time": datetime, "label": str}
|
||
self.timers = []
|
||
self.load_timers()
|
||
|
||
def load_timers(self):
|
||
"""Загрузка списка таймеров из JSON файла."""
|
||
if TIMER_FILE.exists():
|
||
try:
|
||
with open(TIMER_FILE, "r", encoding="utf-8") as f:
|
||
raw = json.load(f)
|
||
except Exception as e:
|
||
print(f"❌ Ошибка загрузки таймеров: {e}")
|
||
return
|
||
|
||
timers = []
|
||
for item in raw:
|
||
try:
|
||
end_time = datetime.fromisoformat(item["end_time"])
|
||
except Exception:
|
||
continue
|
||
label = item.get("label", "")
|
||
timers.append({"end_time": end_time, "label": label})
|
||
|
||
self.timers = sorted(timers, key=lambda x: x["end_time"])
|
||
|
||
def save_timers(self):
|
||
"""Сохранение списка таймеров в JSON файл."""
|
||
payload = [
|
||
{"end_time": t["end_time"].isoformat(), "label": t.get("label", "")}
|
||
for t in self.timers
|
||
]
|
||
try:
|
||
with open(TIMER_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(payload, f, indent=4)
|
||
except Exception as e:
|
||
print(f"❌ Ошибка сохранения таймеров: {e}")
|
||
|
||
def describe_timers(self) -> str:
|
||
"""Возвращает текстовое описание активных таймеров."""
|
||
if not self.timers:
|
||
return "Активных таймеров нет."
|
||
|
||
now = datetime.now()
|
||
items = []
|
||
for idx, timer in enumerate(self.timers, start=1):
|
||
remaining = (timer["end_time"] - now).total_seconds()
|
||
label = timer.get("label", "").strip()
|
||
ordinal = _format_ordinal_index(idx)
|
||
if remaining <= 0:
|
||
status = "сработает сейчас"
|
||
else:
|
||
status = f"осталось {_format_duration(remaining)}"
|
||
|
||
if label:
|
||
items.append(f"{ordinal}) {label} — {status}")
|
||
else:
|
||
items.append(f"{ordinal}) {status}")
|
||
|
||
return "Активные таймеры: " + "; ".join(items) + "."
|
||
|
||
def add_timer(self, seconds: int, label: str):
|
||
"""Добавление нового таймера."""
|
||
end_time = datetime.now() + timedelta(seconds=seconds)
|
||
self.timers.append({"end_time": end_time, "label": label})
|
||
# Сортируем, чтобы ближайший был первым
|
||
self.timers.sort(key=lambda x: x["end_time"])
|
||
self.save_timers()
|
||
print(f"⏳ Таймер установлен на {label} (до {end_time.strftime('%H:%M:%S')})")
|
||
|
||
def cancel_all_timers(self):
|
||
"""Отмена всех таймеров."""
|
||
count = len(self.timers)
|
||
self.timers = []
|
||
self.save_timers()
|
||
print(f"🔕 Все таймеры ({count}) отменены.")
|
||
|
||
def check_timers(self):
|
||
"""
|
||
Проверка: не истек ли какой-то таймер?
|
||
Вызывается в главном цикле.
|
||
Возвращает True, если таймер сработал (и был обработан).
|
||
"""
|
||
if not self.timers:
|
||
return False
|
||
|
||
now = datetime.now()
|
||
# Смотрим первый (самый ранний) таймер
|
||
# Используем индекс 0, так как список отсортирован
|
||
first_timer = self.timers[0]
|
||
|
||
if now >= first_timer["end_time"]:
|
||
# Таймер сработал!
|
||
# Удаляем его из списка
|
||
label = first_timer["label"]
|
||
self.timers.pop(0)
|
||
self.save_timers()
|
||
|
||
print(f"⌛ ТАЙМЕР ИСТЕК: {label}")
|
||
self.trigger_timer(label)
|
||
return True
|
||
|
||
return False
|
||
|
||
def trigger_timer(self, label: str):
|
||
"""
|
||
Логика срабатывания таймера.
|
||
Запускает воспроизведение MP3 и слушает команду "Стоп".
|
||
"""
|
||
print(f"🔔 ТАЙМЕР НА {label} СРАБОТАЛ! (Скажите 'Стоп')")
|
||
|
||
# Запуск плеера mpg123 в бесконечном цикле
|
||
cmd = ["mpg123", "-q", "--loop", "-1", str(ALARM_SOUND)]
|
||
|
||
try:
|
||
process = subprocess.Popen(cmd)
|
||
except FileNotFoundError:
|
||
print(
|
||
"❌ Ошибка: mpg123 не найден. Установите его: sudo apt install mpg123"
|
||
)
|
||
return
|
||
|
||
try:
|
||
# Цикл ожидания стоп-команды
|
||
while True:
|
||
text = listen(timeout_seconds=3.0, detection_timeout=3.0)
|
||
if text:
|
||
if is_stop_command(text, mode="lenient"):
|
||
print(f"🛑 Таймер остановлен по команде: '{text}'")
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка во время таймера: {e}")
|
||
finally:
|
||
# Обязательно убиваем процесс плеера
|
||
process.terminate()
|
||
try:
|
||
process.wait(timeout=1)
|
||
except subprocess.TimeoutExpired:
|
||
process.kill()
|
||
print("🔕 Таймер выключен.")
|
||
|
||
def parse_command(self, text: str) -> str | None:
|
||
"""
|
||
Парсинг команды установки таймера.
|
||
Примеры: "таймер на 5 минут", "засеки 10 секунд".
|
||
"""
|
||
text = _normalize_timer_text(text.lower())
|
||
|
||
# Ключевые слова для таймера
|
||
if not any(word in text for word in ["таймер", "засеки", "поставь таймер"]):
|
||
return None
|
||
|
||
if "таймер" in text and re.search(
|
||
r"(какие|какой|список|активн|покажи|сколько|есть ли)", text
|
||
):
|
||
return self.describe_timers()
|
||
|
||
if "отмени" in text or "удали" in text:
|
||
self.cancel_all_timers()
|
||
return "Хорошо, все таймеры отменены."
|
||
|
||
# Поиск времени
|
||
# Ищем комбинации: число + (час/мин/сек)
|
||
# Пример: "1 час 30 минут", "5 минут", "30 секунд"
|
||
|
||
total_seconds = 0
|
||
parts = []
|
||
hours = None
|
||
minutes = None
|
||
seconds = None
|
||
has_fractional = False
|
||
|
||
# Часы
|
||
match_hours = re.search(r"(\d+)\s*(?:час|часа|часов)", text)
|
||
if match_hours:
|
||
hours = int(match_hours.group(1))
|
||
|
||
# Минуты
|
||
match_minutes = re.search(r"(\d+)\s*(?:мин|минуту|минуты|минут)", text)
|
||
if match_minutes:
|
||
minutes = int(match_minutes.group(1))
|
||
|
||
# Секунды
|
||
match_seconds = re.search(r"(\d+)\s*(?:сек|секунду|секунды|секунд)", text)
|
||
if match_seconds:
|
||
seconds = int(match_seconds.group(1))
|
||
|
||
# Дополняем числительные словами (например, "одну минуту")
|
||
word_values = _extract_word_time_values(text)
|
||
if hours is None and word_values["hours"] is not None:
|
||
hours = word_values["hours"]
|
||
if minutes is None and word_values["minutes"] is not None:
|
||
minutes = word_values["minutes"]
|
||
if seconds is None and word_values["seconds"] is not None:
|
||
seconds = word_values["seconds"]
|
||
|
||
if hours is not None and isinstance(hours, float) and hours % 1 != 0:
|
||
has_fractional = True
|
||
if minutes is not None and isinstance(minutes, float) and minutes % 1 != 0:
|
||
has_fractional = True
|
||
if seconds is not None and isinstance(seconds, float) and seconds % 1 != 0:
|
||
has_fractional = True
|
||
|
||
found_time = any(value is not None for value in [hours, minutes, seconds])
|
||
if found_time:
|
||
total_seconds = (
|
||
(hours or 0) * 3600 + (minutes or 0) * 60 + (seconds or 0)
|
||
)
|
||
if has_fractional:
|
||
total_seconds = int(round(total_seconds))
|
||
h = total_seconds // 3600
|
||
m = (total_seconds % 3600) // 60
|
||
s = total_seconds % 60
|
||
else:
|
||
h = int(hours) if hours is not None else 0
|
||
m = int(minutes) if minutes is not None else 0
|
||
s = int(seconds) if seconds is not None else 0
|
||
|
||
if h:
|
||
parts.append(_format_unit(h, "hours"))
|
||
if m:
|
||
parts.append(_format_unit(m, "minutes"))
|
||
if s or not parts:
|
||
parts.append(_format_unit(s, "seconds"))
|
||
|
||
if found_time and total_seconds > 0:
|
||
label = " ".join(parts)
|
||
self.add_timer(total_seconds, label)
|
||
return f"Поставил таймер на {label}."
|
||
|
||
# Если сказали "таймер", но не нашли время
|
||
return "Я не понял, на сколько поставить таймер. Скажите, например, 'таймер на 5 минут'."
|
||
|
||
|
||
# Глобальный экземпляр
|
||
_timer_manager = None
|
||
|
||
|
||
def get_timer_manager():
|
||
global _timer_manager
|
||
if _timer_manager is None:
|
||
_timer_manager = TimerManager()
|
||
return _timer_manager
|