322 lines
13 KiB
Python
322 lines
13 KiB
Python
"""Alarm clock module."""
|
||
|
||
# Модуль будильника.
|
||
# Отвечает за хранение будильников (в JSON файле), их проверку и воспроизведение звука.
|
||
|
||
import json
|
||
import subprocess
|
||
import re
|
||
from datetime import datetime
|
||
from ..core.config import BASE_DIR
|
||
from ..audio.stt import listen
|
||
from ..core.commands import is_stop_command
|
||
from ..core.roman import replace_roman_numerals
|
||
|
||
# Файл базы данных будильников
|
||
ALARM_FILE = BASE_DIR / "data" / "alarms.json"
|
||
# Звуковой файл сигнала
|
||
ALARM_SOUND = BASE_DIR / "assets" / "sounds" / "Apex-1.mp3"
|
||
ASK_ALARM_TIME_PROMPT = "На какое время мне поставить будильник?"
|
||
|
||
|
||
class AlarmClock:
|
||
def __init__(self):
|
||
self.alarms = []
|
||
self.load_alarms()
|
||
|
||
def _normalize_days(self, days):
|
||
if not days:
|
||
return None
|
||
unique = sorted({int(day) for day in days})
|
||
return unique
|
||
|
||
def _days_key(self, days):
|
||
normalized = self._normalize_days(days)
|
||
return tuple(normalized) if normalized else None
|
||
|
||
def _format_days_phrase(self, days):
|
||
normalized = self._normalize_days(days)
|
||
if not normalized:
|
||
return ""
|
||
|
||
day_set = set(normalized)
|
||
if day_set == {0, 1, 2, 3, 4}:
|
||
return "по будням"
|
||
if day_set == {5, 6}:
|
||
return "по выходным"
|
||
if len(day_set) == 7:
|
||
return "каждый день"
|
||
|
||
names = {
|
||
0: "понедельникам",
|
||
1: "вторникам",
|
||
2: "средам",
|
||
3: "четвергам",
|
||
4: "пятницам",
|
||
5: "субботам",
|
||
6: "воскресеньям",
|
||
}
|
||
ordered = [names[d] for d in normalized if d in names]
|
||
if not ordered:
|
||
return ""
|
||
if len(ordered) == 1:
|
||
return f"по {ordered[0]}"
|
||
return "по " + ", ".join(ordered[:-1]) + " и " + ordered[-1]
|
||
|
||
def _extract_alarm_days(self, text: str):
|
||
text = text.lower().replace("ё", "е")
|
||
days = set()
|
||
|
||
if re.search(r"\b(каждый день|ежедневно)\b", text):
|
||
return [0, 1, 2, 3, 4, 5, 6]
|
||
|
||
if re.search(r"\b(по будн|в будн|будние)\b", text):
|
||
days.update([0, 1, 2, 3, 4])
|
||
|
||
if re.search(r"\b(по выходн|в выходн|выходные)\b", text):
|
||
days.update([5, 6])
|
||
|
||
day_patterns = {
|
||
0: r"\bпонедельн\w*\b",
|
||
1: r"\bвторник\w*\b",
|
||
2: r"\bсред\w*\b",
|
||
3: r"\bчетверг\w*\b",
|
||
4: r"\bпятниц\w*\b",
|
||
5: r"\bсуббот\w*\b",
|
||
6: r"\bвоскресен\w*\b",
|
||
}
|
||
for day_idx, pattern in day_patterns.items():
|
||
if re.search(pattern, text):
|
||
days.add(day_idx)
|
||
|
||
return self._normalize_days(days)
|
||
|
||
def load_alarms(self):
|
||
"""Загрузка списка будильников из JSON файла."""
|
||
if ALARM_FILE.exists():
|
||
try:
|
||
with open(ALARM_FILE, "r", encoding="utf-8") as f:
|
||
self.alarms = json.load(f)
|
||
except Exception as e:
|
||
print(f"❌ Ошибка загрузки будильников: {e}")
|
||
self.alarms = []
|
||
|
||
def save_alarms(self):
|
||
"""Сохранение списка будильников в JSON файл."""
|
||
try:
|
||
with open(ALARM_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(self.alarms, f, indent=4)
|
||
except Exception as e:
|
||
print(f"❌ Ошибка сохранения будильников: {e}")
|
||
|
||
def add_alarm(self, hour: int, minute: int):
|
||
"""Добавление нового будильника (или обновление существующего)."""
|
||
return self.add_alarm_with_days(hour, minute, days=None)
|
||
|
||
def add_alarm_with_days(self, hour: int, minute: int, days=None):
|
||
"""Добавление нового будильника (или обновление существующего) с днями недели."""
|
||
days_key = self._days_key(days)
|
||
for alarm in self.alarms:
|
||
if (
|
||
alarm.get("hour") == hour
|
||
and alarm.get("minute") == minute
|
||
and self._days_key(alarm.get("days")) == days_key
|
||
):
|
||
alarm["active"] = True
|
||
alarm["days"] = days_key
|
||
alarm["last_triggered"] = None
|
||
self.save_alarms()
|
||
return
|
||
|
||
self.alarms.append(
|
||
{"hour": hour, "minute": minute, "active": True, "days": days_key}
|
||
)
|
||
self.save_alarms()
|
||
days_phrase = self._format_days_phrase(days_key)
|
||
suffix = f" {days_phrase}" if days_phrase else ""
|
||
print(f"⏰ Будильник установлен на {hour:02d}:{minute:02d}{suffix}")
|
||
|
||
def cancel_all_alarms(self):
|
||
"""Выключение (деактивация) всех будильников."""
|
||
for alarm in self.alarms:
|
||
alarm["active"] = False
|
||
self.save_alarms()
|
||
print("🔕 Все будильники отменены.")
|
||
|
||
def describe_alarms(self) -> str:
|
||
"""Возвращает текстовое описание активных будильников."""
|
||
active = [
|
||
alarm
|
||
for alarm in self.alarms
|
||
if alarm.get("active") and "hour" in alarm and "minute" in alarm
|
||
]
|
||
if not active:
|
||
return "Активных будильников нет."
|
||
|
||
active.sort(key=lambda a: (a["hour"], a["minute"]))
|
||
items = []
|
||
for alarm in active:
|
||
time_str = f"{alarm['hour']:02d}:{alarm['minute']:02d}"
|
||
days_phrase = self._format_days_phrase(alarm.get("days"))
|
||
if days_phrase:
|
||
items.append(f"{days_phrase} в {time_str}")
|
||
else:
|
||
items.append(time_str)
|
||
return "Активные будильники: " + ", ".join(items) + "."
|
||
|
||
def check_alarms(self):
|
||
"""
|
||
Проверка: не пора ли звенеть?
|
||
Вызывается в главном цикле.
|
||
Возвращает True, если будильник сработал.
|
||
"""
|
||
now = datetime.now()
|
||
triggered = False
|
||
|
||
for alarm in self.alarms:
|
||
if alarm["active"]:
|
||
if alarm["hour"] == now.hour and alarm["minute"] == now.minute:
|
||
days = self._normalize_days(alarm.get("days"))
|
||
if days and now.weekday() not in days:
|
||
continue
|
||
|
||
last_triggered = alarm.get("last_triggered")
|
||
if last_triggered:
|
||
try:
|
||
last_dt = datetime.fromisoformat(last_triggered)
|
||
if (
|
||
last_dt.date() == now.date()
|
||
and last_dt.hour == now.hour
|
||
and last_dt.minute == now.minute
|
||
):
|
||
continue
|
||
except Exception:
|
||
pass
|
||
|
||
print(
|
||
f"⏰ ВРЕМЯ БУДИЛЬНИКА: {alarm['hour']:02d}:{alarm['minute']:02d}"
|
||
)
|
||
if not days:
|
||
# Одноразовый будильник, выключаем после срабатывания
|
||
alarm["active"] = False
|
||
alarm["last_triggered"] = now.isoformat()
|
||
triggered = True
|
||
self.trigger_alarm() # Запуск звука и ожидание стоп-слова
|
||
break # Звоним только один за раз
|
||
|
||
if triggered:
|
||
self.save_alarms()
|
||
return True
|
||
return False
|
||
|
||
def trigger_alarm(self):
|
||
"""
|
||
Логика срабатывания будильника.
|
||
Запускает воспроизведение MP3 через mpg123 и слушает команду "Стоп".
|
||
Использует облачное распознавание речи для остановки.
|
||
"""
|
||
print("🔔 БУДИЛЬНИК ЗВОНИТ! (Скажите 'Стоп' или 'Александр стоп')")
|
||
|
||
# Запуск плеера mpg123 в бесконечном цикле (--loop -1)
|
||
cmd = ["mpg123", "-q", "--loop", "-1", str(ALARM_SOUND)]
|
||
|
||
try:
|
||
process = subprocess.Popen(cmd)
|
||
except FileNotFoundError:
|
||
print(
|
||
"❌ Ошибка: mpg123 не найден. Установите его: sudo apt install mpg123"
|
||
)
|
||
return
|
||
|
||
try:
|
||
# Цикл ожидания стоп-команды
|
||
while True:
|
||
text = listen(timeout_seconds=3.0, detection_timeout=3.0, fast_stop=True)
|
||
if text:
|
||
if is_stop_command(text, mode="lenient"):
|
||
print(f"🛑 Будильник остановлен по команде: '{text}'")
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка во время будильника: {e}")
|
||
finally:
|
||
# Обязательно убиваем процесс плеера
|
||
process.terminate()
|
||
try:
|
||
process.wait(timeout=1)
|
||
except subprocess.TimeoutExpired:
|
||
process.kill()
|
||
print("🔕 Будильник выключен.")
|
||
|
||
def parse_command(self, text: str) -> str | None:
|
||
"""
|
||
Парсинг команды установки будильника из текста.
|
||
Примеры: "разбуди в 7:30", "будильник на 8 утра".
|
||
"""
|
||
text = replace_roman_numerals(text.lower())
|
||
if "будильник" not in text and "разбуди" not in text:
|
||
return None
|
||
|
||
if "будильник" in text and re.search(
|
||
r"(какие|какой|список|активн|покажи|сколько|есть ли)", text
|
||
):
|
||
return self.describe_alarms()
|
||
|
||
if "отмени" in text:
|
||
self.cancel_all_alarms()
|
||
return "Хорошо, я отменил все будильники."
|
||
|
||
days = self._extract_alarm_days(text)
|
||
|
||
# Поиск формата "7:30", "7.30"
|
||
match = re.search(r"\b(\d{1,2})[:.-](\d{2})\b", text)
|
||
if match:
|
||
h, m = int(match.group(1)), int(match.group(2))
|
||
if 0 <= h <= 23 and 0 <= m <= 59:
|
||
self.add_alarm_with_days(h, m, days=days)
|
||
days_phrase = self._format_days_phrase(days)
|
||
suffix = f" {days_phrase}" if days_phrase else ""
|
||
return f"Я установил будильник на {h} часов {m} минут{suffix}."
|
||
|
||
# Поиск формата словами "на 7 часов 15 минут"
|
||
match_time = re.search(
|
||
r"на\s+(\d{1,2})(?:\s*(?:часов|часа|час))?(?:\s+(\d{1,2})(?:\s*(?:минут|минуты|минута))?)?",
|
||
text,
|
||
)
|
||
|
||
if match_time:
|
||
h = int(match_time.group(1))
|
||
m = int(match_time.group(2)) if match_time.group(2) else 0
|
||
|
||
# Умная коррекция времени (если говорят "в 8", а сейчас 9, то это скорее 8 вечера или 8 утра завтра)
|
||
# Здесь простая логика AM/PM
|
||
if "вечера" in text and h < 12:
|
||
h += 12
|
||
elif "утра" in text and h == 12:
|
||
h = 0
|
||
|
||
if 0 <= h <= 23 and 0 <= m <= 59:
|
||
self.add_alarm_with_days(h, m, days=days)
|
||
days_phrase = self._format_days_phrase(days)
|
||
suffix = f" {days_phrase}" if days_phrase else ""
|
||
return f"Хорошо, разбужу вас в {h}:{m:02d}{suffix}."
|
||
|
||
if re.search(r"(постав|установ|запусти|включи|разбуди)", text) or text.strip() in {
|
||
"будильник",
|
||
"поставь будильник",
|
||
}:
|
||
return ASK_ALARM_TIME_PROMPT
|
||
|
||
return "Я не понял время для будильника. Пожалуйста, скажите точное время, например 'семь тридцать'."
|
||
|
||
|
||
# Глобальный экземпляр
|
||
_alarm_clock = None
|
||
|
||
|
||
def get_alarm_clock():
|
||
global _alarm_clock
|
||
if _alarm_clock is None:
|
||
_alarm_clock = AlarmClock()
|
||
return _alarm_clock
|