446 lines
17 KiB
Python
446 lines
17 KiB
Python
"""Alarm clock module."""
|
||
|
||
# Модуль будильника.
|
||
# Отвечает за хранение будильников (в JSON файле), их проверку и воспроизведение звука.
|
||
|
||
import json
|
||
import subprocess
|
||
import re
|
||
from datetime import datetime
|
||
from ..core.config import BASE_DIR
|
||
from ..audio.stt import listen
|
||
from ..core.commands import is_stop_command
|
||
from ..core.roman import replace_roman_numerals
|
||
|
||
# Файл базы данных будильников
|
||
ALARM_FILE = BASE_DIR / "data" / "alarms.json"
|
||
# Звуковой файл сигнала
|
||
ALARM_SOUND = BASE_DIR / "assets" / "sounds" / "Apex-1.mp3"
|
||
ASK_ALARM_TIME_PROMPT = "На какое время мне поставить будильник?"
|
||
|
||
_NUMBER_UNITS = {
|
||
"ноль": 0,
|
||
"один": 1,
|
||
"одна": 1,
|
||
"два": 2,
|
||
"две": 2,
|
||
"три": 3,
|
||
"четыре": 4,
|
||
"пять": 5,
|
||
"шесть": 6,
|
||
"семь": 7,
|
||
"восемь": 8,
|
||
"девять": 9,
|
||
}
|
||
_NUMBER_TEENS = {
|
||
"десять": 10,
|
||
"одиннадцать": 11,
|
||
"двенадцать": 12,
|
||
"тринадцать": 13,
|
||
"четырнадцать": 14,
|
||
"пятнадцать": 15,
|
||
"шестнадцать": 16,
|
||
"семнадцать": 17,
|
||
"восемнадцать": 18,
|
||
"девятнадцать": 19,
|
||
}
|
||
_NUMBER_TENS = {
|
||
"двадцать": 20,
|
||
"тридцать": 30,
|
||
"сорок": 40,
|
||
"пятьдесят": 50,
|
||
}
|
||
_PARTS_OF_DAY = {"утра", "дня", "вечера", "ночи"}
|
||
_FILLER_WORDS = {"мне", "меня", "пожалуйста", "на", "в", "во", "к", "и"}
|
||
_HOUR_WORDS = {"час", "часа", "часов"}
|
||
_MINUTE_WORDS = {"минута", "минуту", "минуты", "минут"}
|
||
|
||
|
||
def _parse_number_tokens(tokens, start_index: int):
|
||
if start_index >= len(tokens):
|
||
return None, 0
|
||
|
||
token = tokens[start_index]
|
||
if token.isdigit():
|
||
return int(token), 1
|
||
|
||
if token in _NUMBER_TEENS:
|
||
return _NUMBER_TEENS[token], 1
|
||
|
||
if token in _NUMBER_TENS:
|
||
value = _NUMBER_TENS[token]
|
||
if start_index + 1 < len(tokens):
|
||
next_token = tokens[start_index + 1]
|
||
if next_token in _NUMBER_UNITS:
|
||
value += _NUMBER_UNITS[next_token]
|
||
return value, 2
|
||
return value, 1
|
||
|
||
if token in _NUMBER_UNITS:
|
||
return _NUMBER_UNITS[token], 1
|
||
|
||
return None, 0
|
||
|
||
|
||
def _apply_part_of_day(hour: int, part_of_day: str | None) -> int:
|
||
if not part_of_day:
|
||
return hour
|
||
|
||
if part_of_day == "утра":
|
||
return 0 if hour == 12 else hour
|
||
if part_of_day == "ночи":
|
||
return 0 if hour == 12 else hour
|
||
if part_of_day in {"дня", "вечера"} and hour < 12:
|
||
return hour + 12
|
||
return hour
|
||
|
||
|
||
def _extract_alarm_time_words(text: str):
|
||
tokens = re.findall(r"[a-zа-я0-9]+", text.lower().replace("ё", "е"))
|
||
markers = {"будильник", "разбуди", "поставь", "установи", "включи", "на", "в", "к"}
|
||
|
||
for index, token in enumerate(tokens):
|
||
if token not in markers:
|
||
continue
|
||
|
||
current = index + 1
|
||
while current < len(tokens) and tokens[current] in _FILLER_WORDS:
|
||
current += 1
|
||
|
||
hour, consumed = _parse_number_tokens(tokens, current)
|
||
if hour is None:
|
||
continue
|
||
current += consumed
|
||
|
||
if current < len(tokens) and tokens[current] in _HOUR_WORDS:
|
||
current += 1
|
||
|
||
minute = 0
|
||
if current < len(tokens) and tokens[current] not in _PARTS_OF_DAY:
|
||
parsed_minute, minute_consumed = _parse_number_tokens(tokens, current)
|
||
if parsed_minute is not None:
|
||
minute = parsed_minute
|
||
current += minute_consumed
|
||
if current < len(tokens) and tokens[current] in _MINUTE_WORDS:
|
||
current += 1
|
||
|
||
part_of_day = None
|
||
if current < len(tokens) and tokens[current] in _PARTS_OF_DAY:
|
||
part_of_day = tokens[current]
|
||
|
||
if 0 <= hour <= 23 and 0 <= minute <= 59:
|
||
return _apply_part_of_day(hour, part_of_day), minute
|
||
|
||
return None
|
||
|
||
|
||
class AlarmClock:
|
||
def __init__(self):
|
||
self.alarms = []
|
||
self.load_alarms()
|
||
|
||
def _normalize_days(self, days):
|
||
if not days:
|
||
return None
|
||
unique = sorted({int(day) for day in days})
|
||
return unique
|
||
|
||
def _days_key(self, days):
|
||
normalized = self._normalize_days(days)
|
||
return tuple(normalized) if normalized else None
|
||
|
||
def _format_days_phrase(self, days):
|
||
normalized = self._normalize_days(days)
|
||
if not normalized:
|
||
return ""
|
||
|
||
day_set = set(normalized)
|
||
if day_set == {0, 1, 2, 3, 4}:
|
||
return "по будням"
|
||
if day_set == {5, 6}:
|
||
return "по выходным"
|
||
if len(day_set) == 7:
|
||
return "каждый день"
|
||
|
||
names = {
|
||
0: "понедельникам",
|
||
1: "вторникам",
|
||
2: "средам",
|
||
3: "четвергам",
|
||
4: "пятницам",
|
||
5: "субботам",
|
||
6: "воскресеньям",
|
||
}
|
||
ordered = [names[d] for d in normalized if d in names]
|
||
if not ordered:
|
||
return ""
|
||
if len(ordered) == 1:
|
||
return f"по {ordered[0]}"
|
||
return "по " + ", ".join(ordered[:-1]) + " и " + ordered[-1]
|
||
|
||
def _extract_alarm_days(self, text: str):
|
||
text = text.lower().replace("ё", "е")
|
||
days = set()
|
||
|
||
if re.search(r"\b(каждый день|ежедневно)\b", text):
|
||
return [0, 1, 2, 3, 4, 5, 6]
|
||
|
||
if re.search(r"\b(?:по\s+будн\w*|в\s+будн\w*|будн\w*)\b", text):
|
||
days.update([0, 1, 2, 3, 4])
|
||
|
||
if re.search(r"\b(?:по\s+выходн\w*|в\s+выходн\w*|выходн\w*)\b", text):
|
||
days.update([5, 6])
|
||
|
||
day_patterns = {
|
||
0: r"\bпонедельн\w*\b",
|
||
1: r"\bвторник\w*\b",
|
||
2: r"\bсред\w*\b",
|
||
3: r"\bчетверг\w*\b",
|
||
4: r"\bпятниц\w*\b",
|
||
5: r"\bсуббот\w*\b",
|
||
6: r"\bвоскресен\w*\b",
|
||
}
|
||
for day_idx, pattern in day_patterns.items():
|
||
if re.search(pattern, text):
|
||
days.add(day_idx)
|
||
|
||
return self._normalize_days(days)
|
||
|
||
def load_alarms(self):
|
||
"""Загрузка списка будильников из JSON файла."""
|
||
if ALARM_FILE.exists():
|
||
try:
|
||
with open(ALARM_FILE, "r", encoding="utf-8") as f:
|
||
self.alarms = json.load(f)
|
||
except Exception as e:
|
||
print(f"❌ Ошибка загрузки будильников: {e}")
|
||
self.alarms = []
|
||
|
||
def save_alarms(self):
|
||
"""Сохранение списка будильников в JSON файл."""
|
||
try:
|
||
with open(ALARM_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(self.alarms, f, indent=4)
|
||
except Exception as e:
|
||
print(f"❌ Ошибка сохранения будильников: {e}")
|
||
|
||
def add_alarm(self, hour: int, minute: int):
|
||
"""Добавление нового будильника (или обновление существующего)."""
|
||
return self.add_alarm_with_days(hour, minute, days=None)
|
||
|
||
def add_alarm_with_days(self, hour: int, minute: int, days=None):
|
||
"""Добавление нового будильника (или обновление существующего) с днями недели."""
|
||
days_key = self._days_key(days)
|
||
for alarm in self.alarms:
|
||
if (
|
||
alarm.get("hour") == hour
|
||
and alarm.get("minute") == minute
|
||
and self._days_key(alarm.get("days")) == days_key
|
||
):
|
||
alarm["active"] = True
|
||
alarm["days"] = days_key
|
||
alarm["last_triggered"] = None
|
||
self.save_alarms()
|
||
return
|
||
|
||
self.alarms.append(
|
||
{"hour": hour, "minute": minute, "active": True, "days": days_key}
|
||
)
|
||
self.save_alarms()
|
||
days_phrase = self._format_days_phrase(days_key)
|
||
suffix = f" {days_phrase}" if days_phrase else ""
|
||
print(f"⏰ Будильник установлен на {hour:02d}:{minute:02d}{suffix}")
|
||
|
||
def cancel_all_alarms(self):
|
||
"""Выключение (деактивация) всех будильников."""
|
||
for alarm in self.alarms:
|
||
alarm["active"] = False
|
||
self.save_alarms()
|
||
print("🔕 Все будильники отменены.")
|
||
|
||
def describe_alarms(self) -> str:
|
||
"""Возвращает текстовое описание активных будильников."""
|
||
active = [
|
||
alarm
|
||
for alarm in self.alarms
|
||
if alarm.get("active") and "hour" in alarm and "minute" in alarm
|
||
]
|
||
if not active:
|
||
return "Активных будильников нет."
|
||
|
||
active.sort(key=lambda a: (a["hour"], a["minute"]))
|
||
items = []
|
||
for alarm in active:
|
||
time_str = f"{alarm['hour']:02d}:{alarm['minute']:02d}"
|
||
days_phrase = self._format_days_phrase(alarm.get("days"))
|
||
if days_phrase:
|
||
items.append(f"{days_phrase} в {time_str}")
|
||
else:
|
||
items.append(time_str)
|
||
return "Активные будильники: " + ", ".join(items) + "."
|
||
|
||
def check_alarms(self):
|
||
"""
|
||
Проверка: не пора ли звенеть?
|
||
Вызывается в главном цикле.
|
||
Возвращает True, если будильник сработал.
|
||
"""
|
||
now = datetime.now()
|
||
triggered = False
|
||
|
||
for alarm in self.alarms:
|
||
if alarm["active"]:
|
||
if alarm["hour"] == now.hour and alarm["minute"] == now.minute:
|
||
days = self._normalize_days(alarm.get("days"))
|
||
if days and now.weekday() not in days:
|
||
continue
|
||
|
||
last_triggered = alarm.get("last_triggered")
|
||
if last_triggered:
|
||
try:
|
||
last_dt = datetime.fromisoformat(last_triggered)
|
||
if (
|
||
last_dt.date() == now.date()
|
||
and last_dt.hour == now.hour
|
||
and last_dt.minute == now.minute
|
||
):
|
||
continue
|
||
except Exception:
|
||
pass
|
||
|
||
print(
|
||
f"⏰ ВРЕМЯ БУДИЛЬНИКА: {alarm['hour']:02d}:{alarm['minute']:02d}"
|
||
)
|
||
if not days:
|
||
# Одноразовый будильник, выключаем после срабатывания
|
||
alarm["active"] = False
|
||
alarm["last_triggered"] = now.isoformat()
|
||
triggered = True
|
||
self.trigger_alarm() # Запуск звука и ожидание стоп-слова
|
||
break # Звоним только один за раз
|
||
|
||
if triggered:
|
||
self.save_alarms()
|
||
return True
|
||
return False
|
||
|
||
def trigger_alarm(self):
|
||
"""
|
||
Логика срабатывания будильника.
|
||
Запускает воспроизведение MP3 через mpg123 и слушает команду "Стоп".
|
||
Использует облачное распознавание речи для остановки.
|
||
"""
|
||
print("🔔 БУДИЛЬНИК ЗВОНИТ! (Скажите 'Стоп' или 'Александр стоп')")
|
||
|
||
# Запуск плеера mpg123 в бесконечном цикле (--loop -1)
|
||
cmd = ["mpg123", "-q", "--loop", "-1", str(ALARM_SOUND)]
|
||
|
||
try:
|
||
process = subprocess.Popen(cmd)
|
||
except FileNotFoundError:
|
||
print(
|
||
"❌ Ошибка: mpg123 не найден. Установите его: sudo apt install mpg123"
|
||
)
|
||
return
|
||
|
||
try:
|
||
# Цикл ожидания стоп-команды
|
||
while True:
|
||
text = listen(timeout_seconds=3.0, detection_timeout=3.0, fast_stop=True)
|
||
if text:
|
||
if is_stop_command(text, mode="lenient"):
|
||
print(f"🛑 Будильник остановлен по команде: '{text}'")
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка во время будильника: {e}")
|
||
finally:
|
||
# Обязательно убиваем процесс плеера
|
||
process.terminate()
|
||
try:
|
||
process.wait(timeout=1)
|
||
except subprocess.TimeoutExpired:
|
||
process.kill()
|
||
print("🔕 Будильник выключен.")
|
||
|
||
def parse_command(self, text: str) -> str | None:
|
||
"""
|
||
Парсинг команды установки будильника из текста.
|
||
Примеры: "разбуди в 7:30", "будильник на 8 утра".
|
||
"""
|
||
text = replace_roman_numerals(text.lower())
|
||
if "будильник" not in text and "разбуди" not in text:
|
||
return None
|
||
|
||
if "будильник" in text and re.search(
|
||
r"(какие|какой|список|активн|покажи|сколько|есть ли)", text
|
||
):
|
||
return self.describe_alarms()
|
||
|
||
if "отмени" in text:
|
||
self.cancel_all_alarms()
|
||
return "Хорошо, я отменил все будильники."
|
||
|
||
days = self._extract_alarm_days(text)
|
||
|
||
# Поиск формата "7:30", "7.30" и вариантов с "в/на/к".
|
||
match = re.search(r"(?:\b(?:на|в|во|к)\s+)?(\d{1,2})[:.-](\d{2})\b", text)
|
||
if match:
|
||
h, m = int(match.group(1)), int(match.group(2))
|
||
period_match = re.search(
|
||
r"\b(?:на|в|во|к)?\s*" + re.escape(match.group(0).strip()) + r"\s+(утра|дня|вечера|ночи)\b",
|
||
text,
|
||
)
|
||
part_of_day = period_match.group(1) if period_match else None
|
||
h = _apply_part_of_day(h, part_of_day)
|
||
if 0 <= h <= 23 and 0 <= m <= 59:
|
||
self.add_alarm_with_days(h, m, days=days)
|
||
days_phrase = self._format_days_phrase(days)
|
||
suffix = f" {days_phrase}" if days_phrase else ""
|
||
return f"Я установил будильник на {h} часов {m} минут{suffix}."
|
||
|
||
# Поиск формата цифрами: "в 7 утра", "на 7", "к 6 30"
|
||
match_time = re.search(
|
||
r"(?:\b(?:на|в|во|к)\s+)?(\d{1,2})(?:\s*(?:часов|часа|час))?(?:\s+(\d{1,2})(?:\s*(?:минут|минуты|минута))?)?(?:\s+(утра|дня|вечера|ночи))?\b",
|
||
text,
|
||
)
|
||
|
||
if match_time:
|
||
h = int(match_time.group(1))
|
||
m = int(match_time.group(2)) if match_time.group(2) else 0
|
||
h = _apply_part_of_day(h, match_time.group(3))
|
||
|
||
if 0 <= h <= 23 and 0 <= m <= 59:
|
||
self.add_alarm_with_days(h, m, days=days)
|
||
days_phrase = self._format_days_phrase(days)
|
||
suffix = f" {days_phrase}" if days_phrase else ""
|
||
return f"Хорошо, разбужу вас в {h}:{m:02d}{suffix}."
|
||
|
||
# Поиск формата словами: "в семь утра", "будильник семь тридцать"
|
||
word_time = _extract_alarm_time_words(text)
|
||
if word_time:
|
||
h, m = word_time
|
||
self.add_alarm_with_days(h, m, days=days)
|
||
days_phrase = self._format_days_phrase(days)
|
||
suffix = f" {days_phrase}" if days_phrase else ""
|
||
return f"Хорошо, разбужу вас в {h}:{m:02d}{suffix}."
|
||
|
||
if re.search(r"(постав|установ|запусти|включи|разбуди)", text) or text.strip() in {
|
||
"будильник",
|
||
"поставь будильник",
|
||
}:
|
||
return ASK_ALARM_TIME_PROMPT
|
||
|
||
return "Я не понял время для будильника. Пожалуйста, скажите точное время, например 'семь тридцать'."
|
||
|
||
|
||
# Глобальный экземпляр
|
||
_alarm_clock = None
|
||
|
||
|
||
def get_alarm_clock():
|
||
global _alarm_clock
|
||
if _alarm_clock is None:
|
||
_alarm_clock = AlarmClock()
|
||
return _alarm_clock
|