Files
smart-speaker/app/features/alarm.py

322 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Alarm clock module."""
# Модуль будильника.
# Отвечает за хранение будильников (в JSON файле), их проверку и воспроизведение звука.
import json
import subprocess
import re
from datetime import datetime
from ..core.config import BASE_DIR
from ..audio.stt import listen
from ..core.commands import is_stop_command
from ..core.roman import replace_roman_numerals
# Файл базы данных будильников
ALARM_FILE = BASE_DIR / "data" / "alarms.json"
# Звуковой файл сигнала
ALARM_SOUND = BASE_DIR / "assets" / "sounds" / "Apex-1.mp3"
ASK_ALARM_TIME_PROMPT = "На какое время мне поставить будильник?"
class AlarmClock:
def __init__(self):
self.alarms = []
self.load_alarms()
def _normalize_days(self, days):
if not days:
return None
unique = sorted({int(day) for day in days})
return unique
def _days_key(self, days):
normalized = self._normalize_days(days)
return tuple(normalized) if normalized else None
def _format_days_phrase(self, days):
normalized = self._normalize_days(days)
if not normalized:
return ""
day_set = set(normalized)
if day_set == {0, 1, 2, 3, 4}:
return "по будням"
if day_set == {5, 6}:
return "по выходным"
if len(day_set) == 7:
return "каждый день"
names = {
0: "понедельникам",
1: "вторникам",
2: "средам",
3: "четвергам",
4: "пятницам",
5: "субботам",
6: "воскресеньям",
}
ordered = [names[d] for d in normalized if d in names]
if not ordered:
return ""
if len(ordered) == 1:
return f"по {ordered[0]}"
return "по " + ", ".join(ordered[:-1]) + " и " + ordered[-1]
def _extract_alarm_days(self, text: str):
text = text.lower().replace("ё", "е")
days = set()
if re.search(r"\b(каждый день|ежедневно)\b", text):
return [0, 1, 2, 3, 4, 5, 6]
if re.search(r"\b(по будн|в будн|будние)\b", text):
days.update([0, 1, 2, 3, 4])
if re.search(r"\b(по выходн|в выходн|выходные)\b", text):
days.update([5, 6])
day_patterns = {
0: r"\bпонедельн\w*\b",
1: r"\bвторник\w*\b",
2: r"\bсред\w*\b",
3: r"\етверг\w*\b",
4: r"\bпятниц\w*\b",
5: r"\bсуббот\w*\b",
6: r"\оскресен\w*\b",
}
for day_idx, pattern in day_patterns.items():
if re.search(pattern, text):
days.add(day_idx)
return self._normalize_days(days)
def load_alarms(self):
"""Загрузка списка будильников из JSON файла."""
if ALARM_FILE.exists():
try:
with open(ALARM_FILE, "r", encoding="utf-8") as f:
self.alarms = json.load(f)
except Exception as e:
print(f"❌ Ошибка загрузки будильников: {e}")
self.alarms = []
def save_alarms(self):
"""Сохранение списка будильников в JSON файл."""
try:
with open(ALARM_FILE, "w", encoding="utf-8") as f:
json.dump(self.alarms, f, indent=4)
except Exception as e:
print(f"❌ Ошибка сохранения будильников: {e}")
def add_alarm(self, hour: int, minute: int):
"""Добавление нового будильника (или обновление существующего)."""
return self.add_alarm_with_days(hour, minute, days=None)
def add_alarm_with_days(self, hour: int, minute: int, days=None):
"""Добавление нового будильника (или обновление существующего) с днями недели."""
days_key = self._days_key(days)
for alarm in self.alarms:
if (
alarm.get("hour") == hour
and alarm.get("minute") == minute
and self._days_key(alarm.get("days")) == days_key
):
alarm["active"] = True
alarm["days"] = days_key
alarm["last_triggered"] = None
self.save_alarms()
return
self.alarms.append(
{"hour": hour, "minute": minute, "active": True, "days": days_key}
)
self.save_alarms()
days_phrase = self._format_days_phrase(days_key)
suffix = f" {days_phrase}" if days_phrase else ""
print(f"⏰ Будильник установлен на {hour:02d}:{minute:02d}{suffix}")
def cancel_all_alarms(self):
"""Выключение (деактивация) всех будильников."""
for alarm in self.alarms:
alarm["active"] = False
self.save_alarms()
print("🔕 Все будильники отменены.")
def describe_alarms(self) -> str:
"""Возвращает текстовое описание активных будильников."""
active = [
alarm
for alarm in self.alarms
if alarm.get("active") and "hour" in alarm and "minute" in alarm
]
if not active:
return "Активных будильников нет."
active.sort(key=lambda a: (a["hour"], a["minute"]))
items = []
for alarm in active:
time_str = f"{alarm['hour']:02d}:{alarm['minute']:02d}"
days_phrase = self._format_days_phrase(alarm.get("days"))
if days_phrase:
items.append(f"{days_phrase} в {time_str}")
else:
items.append(time_str)
return "Активные будильники: " + ", ".join(items) + "."
def check_alarms(self):
"""
Проверка: не пора ли звенеть?
Вызывается в главном цикле.
Возвращает True, если будильник сработал.
"""
now = datetime.now()
triggered = False
for alarm in self.alarms:
if alarm["active"]:
if alarm["hour"] == now.hour and alarm["minute"] == now.minute:
days = self._normalize_days(alarm.get("days"))
if days and now.weekday() not in days:
continue
last_triggered = alarm.get("last_triggered")
if last_triggered:
try:
last_dt = datetime.fromisoformat(last_triggered)
if (
last_dt.date() == now.date()
and last_dt.hour == now.hour
and last_dt.minute == now.minute
):
continue
except Exception:
pass
print(
f"⏰ ВРЕМЯ БУДИЛЬНИКА: {alarm['hour']:02d}:{alarm['minute']:02d}"
)
if not days:
# Одноразовый будильник, выключаем после срабатывания
alarm["active"] = False
alarm["last_triggered"] = now.isoformat()
triggered = True
self.trigger_alarm() # Запуск звука и ожидание стоп-слова
break # Звоним только один за раз
if triggered:
self.save_alarms()
return True
return False
def trigger_alarm(self):
"""
Логика срабатывания будильника.
Запускает воспроизведение MP3 через mpg123 и слушает команду "Стоп".
Использует облачное распознавание речи для остановки.
"""
print("🔔 БУДИЛЬНИК ЗВОНИТ! (Скажите 'Стоп' или 'Александр стоп')")
# Запуск плеера mpg123 в бесконечном цикле (--loop -1)
cmd = ["mpg123", "-q", "--loop", "-1", str(ALARM_SOUND)]
try:
process = subprocess.Popen(cmd)
except FileNotFoundError:
print(
"❌ Ошибка: mpg123 не найден. Установите его: sudo apt install mpg123"
)
return
try:
# Цикл ожидания стоп-команды
while True:
text = listen(timeout_seconds=3.0, detection_timeout=3.0, fast_stop=True)
if text:
if is_stop_command(text, mode="lenient"):
print(f"🛑 Будильник остановлен по команде: '{text}'")
break
except Exception as e:
print(f"❌ Ошибка во время будильника: {e}")
finally:
# Обязательно убиваем процесс плеера
process.terminate()
try:
process.wait(timeout=1)
except subprocess.TimeoutExpired:
process.kill()
print("🔕 Будильник выключен.")
def parse_command(self, text: str) -> str | None:
"""
Парсинг команды установки будильника из текста.
Примеры: "разбуди в 7:30", "будильник на 8 утра".
"""
text = replace_roman_numerals(text.lower())
if "будильник" not in text and "разбуди" not in text:
return None
if "будильник" in text and re.search(
r"(какие|какой|список|активн|покажи|сколько|есть ли)", text
):
return self.describe_alarms()
if "отмени" in text:
self.cancel_all_alarms()
return "Хорошо, я отменил все будильники."
days = self._extract_alarm_days(text)
# Поиск формата "7:30", "7.30"
match = re.search(r"\b(\d{1,2})[:.-](\d{2})\b", text)
if match:
h, m = int(match.group(1)), int(match.group(2))
if 0 <= h <= 23 and 0 <= m <= 59:
self.add_alarm_with_days(h, m, days=days)
days_phrase = self._format_days_phrase(days)
suffix = f" {days_phrase}" if days_phrase else ""
return f"Я установил будильник на {h} часов {m} минут{suffix}."
# Поиск формата словами "на 7 часов 15 минут"
match_time = re.search(
r"на\s+(\d{1,2})(?:\s*(?:часов|часа|час))?(?:\s+(\d{1,2})(?:\s*(?:минут|минуты|минута))?)?",
text,
)
if match_time:
h = int(match_time.group(1))
m = int(match_time.group(2)) if match_time.group(2) else 0
# Умная коррекция времени (если говорят "в 8", а сейчас 9, то это скорее 8 вечера или 8 утра завтра)
# Здесь простая логика AM/PM
if "вечера" in text and h < 12:
h += 12
elif "утра" in text and h == 12:
h = 0
if 0 <= h <= 23 and 0 <= m <= 59:
self.add_alarm_with_days(h, m, days=days)
days_phrase = self._format_days_phrase(days)
suffix = f" {days_phrase}" if days_phrase else ""
return f"Хорошо, разбужу вас в {h}:{m:02d}{suffix}."
if re.search(r"(постав|установ|запусти|включи|разбуди)", text) or text.strip() in {
"будильник",
"поставь будильник",
}:
return ASK_ALARM_TIME_PROMPT
return "Я не понял время для будильника. Пожалуйста, скажите точное время, например 'семь тридцать'."
# Глобальный экземпляр
_alarm_clock = None
def get_alarm_clock():
global _alarm_clock
if _alarm_clock is None:
_alarm_clock = AlarmClock()
return _alarm_clock