1306 lines
47 KiB
Python
1306 lines
47 KiB
Python
"""Music controller with Navidrome-first playback and Spotify fallback."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import difflib
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import shutil
|
||
import socket
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
from dataclasses import asdict, dataclass
|
||
from pathlib import Path
|
||
from typing import Any, Callable, Optional
|
||
from urllib.parse import urlencode
|
||
|
||
import requests
|
||
|
||
from ..core.config import BASE_DIR, WAKE_WORD_ALIASES
|
||
|
||
try:
|
||
import spotipy
|
||
from spotipy.oauth2 import SpotifyOAuth
|
||
except ImportError:
|
||
spotipy = None
|
||
SpotifyOAuth = None
|
||
|
||
|
||
STATE_FILE = BASE_DIR / "data" / "music_state.json"
|
||
MPV_SOCKET_FILE = BASE_DIR / "data" / "music_mpv.sock"
|
||
|
||
|
||
class NavidromeUnavailableError(Exception):
|
||
"""Raised when Navidrome cannot be used (network/auth/config)."""
|
||
|
||
|
||
class NavidromeCommandError(Exception):
|
||
"""Raised for valid command paths with domain-level errors (e.g. no genre)."""
|
||
|
||
|
||
@dataclass
|
||
class Song:
|
||
"""Minimal song payload for queue/state handling."""
|
||
|
||
song_id: str
|
||
title: str
|
||
artist: str
|
||
album: str
|
||
duration: int
|
||
path: str
|
||
genre: str
|
||
|
||
@classmethod
|
||
def from_payload(cls, payload: dict[str, Any]) -> "Song":
|
||
return cls(
|
||
song_id=str(payload.get("id", "")).strip(),
|
||
title=str(payload.get("title", "Неизвестный трек")).strip(),
|
||
artist=str(payload.get("artist", "Неизвестный артист")).strip(),
|
||
album=str(payload.get("album", "")).strip(),
|
||
duration=int(payload.get("duration") or 0),
|
||
path=str(payload.get("path", "")).strip(),
|
||
genre=str(payload.get("genre", "")).strip(),
|
||
)
|
||
|
||
@classmethod
|
||
def from_state(cls, payload: dict[str, Any]) -> "Song":
|
||
return cls(
|
||
song_id=str(payload.get("song_id", payload.get("id", ""))).strip(),
|
||
title=str(payload.get("title", "Неизвестный трек")).strip(),
|
||
artist=str(payload.get("artist", "Неизвестный артист")).strip(),
|
||
album=str(payload.get("album", "")).strip(),
|
||
duration=int(payload.get("duration") or 0),
|
||
path=str(payload.get("path", "")).strip(),
|
||
genre=str(payload.get("genre", "")).strip(),
|
||
)
|
||
|
||
def to_state(self) -> dict[str, Any]:
|
||
return asdict(self)
|
||
|
||
|
||
class SpotifyProvider:
|
||
"""Spotify provider kept as fallback when Navidrome is unavailable."""
|
||
|
||
SCOPES = [
|
||
"user-read-playback-state",
|
||
"user-modify-playback-state",
|
||
"user-read-currently-playing",
|
||
"user-top-read",
|
||
"streaming",
|
||
]
|
||
|
||
def __init__(self) -> None:
|
||
self.sp = None
|
||
self._initialized = False
|
||
self._init_error = None
|
||
|
||
def initialize(self) -> bool:
|
||
if self._initialized:
|
||
return True
|
||
|
||
if spotipy is None:
|
||
self._init_error = (
|
||
"Библиотека spotipy не установлена. Установите: pip install spotipy"
|
||
)
|
||
print(f"⚠️ Spotify: {self._init_error}")
|
||
return False
|
||
|
||
client_id = os.getenv("SPOTIFY_CLIENT_ID")
|
||
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
|
||
redirect_uri = os.getenv("SPOTIFY_REDIRECT_URI", "http://localhost:8888/callback")
|
||
|
||
if not client_id or not client_secret:
|
||
self._init_error = "Не заданы SPOTIFY_CLIENT_ID и SPOTIFY_CLIENT_SECRET в .env"
|
||
print(f"⚠️ Spotify: {self._init_error}")
|
||
return False
|
||
|
||
try:
|
||
auth_manager = SpotifyOAuth(
|
||
client_id=client_id,
|
||
client_secret=client_secret,
|
||
redirect_uri=redirect_uri,
|
||
scope=" ".join(self.SCOPES),
|
||
cache_path=".spotify_cache",
|
||
)
|
||
self.sp = spotipy.Spotify(auth_manager=auth_manager)
|
||
self.sp.current_user()
|
||
self._initialized = True
|
||
print("✅ Spotify: подключено")
|
||
return True
|
||
except Exception as exc:
|
||
self._init_error = str(exc)
|
||
print(f"❌ Spotify: ошибка инициализации - {exc}")
|
||
return False
|
||
|
||
def _ensure_initialized(self) -> bool:
|
||
if not self._initialized:
|
||
return self.initialize()
|
||
return True
|
||
|
||
def _get_active_device(self) -> Optional[str]:
|
||
try:
|
||
devices = self.sp.devices()
|
||
for device in devices.get("devices", []):
|
||
if device.get("is_active"):
|
||
return device.get("id")
|
||
if devices.get("devices"):
|
||
return devices["devices"][0].get("id")
|
||
except Exception:
|
||
return None
|
||
return None
|
||
|
||
def play_music(self, query: Optional[str] = None) -> str:
|
||
if not self._ensure_initialized():
|
||
return f"Не удалось подключиться к Spotify. {self._init_error or ''}".strip()
|
||
|
||
try:
|
||
device_id = self._get_active_device()
|
||
if query:
|
||
results = self.sp.search(q=query, type="track", limit=1)
|
||
tracks = results.get("tracks", {}).get("items", [])
|
||
if not tracks:
|
||
return f"Не нашёл песню '{query}'"
|
||
track = tracks[0]
|
||
self.sp.start_playback(device_id=device_id, uris=[track["uri"]])
|
||
return f"В Spotify включаю {track['name']} от {track['artists'][0]['name']}"
|
||
|
||
top_tracks = self.sp.current_user_top_tracks(limit=20, time_range="medium_term")
|
||
if top_tracks and top_tracks.get("items"):
|
||
uris = [track["uri"] for track in top_tracks["items"]]
|
||
self.sp.start_playback(device_id=device_id, uris=uris)
|
||
first_track = top_tracks["items"][0]
|
||
return f"В Spotify включаю музыку. Сейчас {first_track['name']}"
|
||
|
||
self.sp.start_playback(device_id=device_id)
|
||
return "В Spotify включаю музыку"
|
||
except Exception as exc:
|
||
return f"Spotify: {exc}"
|
||
|
||
def pause_music(self) -> str:
|
||
if not self._ensure_initialized():
|
||
return "Spotify не подключён"
|
||
try:
|
||
self.sp.pause_playback()
|
||
return "Spotify: музыка на паузе"
|
||
except Exception as exc:
|
||
return f"Spotify: {exc}"
|
||
|
||
def pause_toggle(self) -> str:
|
||
"""Toggle pause for repeated 'pause' voice command."""
|
||
if not self._ensure_initialized():
|
||
return "Spotify не подключён"
|
||
try:
|
||
current = self.sp.current_playback()
|
||
if current and current.get("is_playing"):
|
||
self.sp.pause_playback()
|
||
return "Spotify: музыка на паузе"
|
||
if current and current.get("item"):
|
||
self.sp.start_playback()
|
||
return "Spotify: продолжаю воспроизведение"
|
||
self.sp.pause_playback()
|
||
return "Spotify: музыка на паузе"
|
||
except Exception as exc:
|
||
return f"Spotify: {exc}"
|
||
|
||
def resume_music(self) -> str:
|
||
if not self._ensure_initialized():
|
||
return "Spotify не подключён"
|
||
try:
|
||
self.sp.start_playback()
|
||
return "Spotify: продолжаю воспроизведение"
|
||
except Exception as exc:
|
||
return f"Spotify: {exc}"
|
||
|
||
def next_track(self) -> str:
|
||
if not self._ensure_initialized():
|
||
return "Spotify не подключён"
|
||
try:
|
||
self.sp.next_track()
|
||
time.sleep(0.5)
|
||
return self.get_current_track() or "Spotify: следующий трек"
|
||
except Exception as exc:
|
||
return f"Spotify: {exc}"
|
||
|
||
def previous_track(self) -> str:
|
||
if not self._ensure_initialized():
|
||
return "Spotify не подключён"
|
||
try:
|
||
self.sp.previous_track()
|
||
time.sleep(0.5)
|
||
return self.get_current_track() or "Spotify: предыдущий трек"
|
||
except Exception as exc:
|
||
return f"Spotify: {exc}"
|
||
|
||
def get_current_track(self) -> str:
|
||
if not self._ensure_initialized():
|
||
return "Spotify не подключён"
|
||
try:
|
||
current = self.sp.current_playback()
|
||
if current and current.get("item"):
|
||
track = current["item"]
|
||
status = "Сейчас играет" if current.get("is_playing") else "На паузе"
|
||
artists = ", ".join(artist["name"] for artist in track.get("artists", []))
|
||
return f"Spotify. {status}: {track.get('name', 'Трек')} от {artists}"
|
||
return "Spotify: сейчас ничего не играет"
|
||
except Exception as exc:
|
||
return f"Spotify: {exc}"
|
||
|
||
|
||
class NavidromeProvider:
|
||
"""Primary provider using Navidrome + MPV IPC."""
|
||
|
||
def __init__(self) -> None:
|
||
self.base_url = os.getenv("NAVIDROME_URL", "").strip().rstrip("/")
|
||
self.username = os.getenv("NAVIDROME_USERNAME", "").strip()
|
||
self.password = os.getenv("NAVIDROME_PASSWORD", "")
|
||
|
||
self._initialized = False
|
||
self._init_error = ""
|
||
self._state_lock = threading.Lock()
|
||
self._mpv_process: Optional[subprocess.Popen] = None
|
||
self._mpv_socket = str(MPV_SOCKET_FILE)
|
||
|
||
self._genres_cache: tuple[float, list[str]] = (0.0, [])
|
||
self._folder_index: dict[str, dict[str, Any]] = {}
|
||
self._folder_index_built_at = 0.0
|
||
self._autonext_lock = threading.Lock()
|
||
self._autonext_suppress_until = 0.0
|
||
|
||
self._snapshot_stop = threading.Event()
|
||
self._snapshot_thread = threading.Thread(
|
||
target=self._snapshot_loop,
|
||
name="music-mpv-snapshot",
|
||
daemon=True,
|
||
)
|
||
self._snapshot_thread.start()
|
||
|
||
self._state = self._load_state()
|
||
|
||
def initialize(self) -> bool:
|
||
if self._initialized:
|
||
return True
|
||
|
||
if not self.base_url or not self.username or not self.password:
|
||
self._init_error = (
|
||
"Не заданы NAVIDROME_URL, NAVIDROME_USERNAME или NAVIDROME_PASSWORD"
|
||
)
|
||
return False
|
||
|
||
if shutil.which("mpv") is None:
|
||
self._init_error = "Не найден mpv. Установите: sudo apt install mpv"
|
||
return False
|
||
|
||
try:
|
||
self._request_json("ping.view")
|
||
except Exception as exc:
|
||
self._init_error = f"Ошибка подключения к Navidrome: {exc}"
|
||
return False
|
||
|
||
self._initialized = True
|
||
print("✅ Navidrome: подключено")
|
||
return True
|
||
|
||
def _default_state(self) -> dict[str, Any]:
|
||
return {
|
||
"current_song": None,
|
||
"queue": [],
|
||
"queue_index": -1,
|
||
"position_sec": 0.0,
|
||
"paused": False,
|
||
"last_action": "",
|
||
"source": "navidrome",
|
||
"updated_at": "",
|
||
}
|
||
|
||
def _load_state(self) -> dict[str, Any]:
|
||
state = self._default_state()
|
||
if not STATE_FILE.exists():
|
||
return state
|
||
|
||
try:
|
||
with open(STATE_FILE, "r", encoding="utf-8") as file:
|
||
raw = json.load(file)
|
||
except Exception as exc:
|
||
print(f"⚠️ Музыка: не удалось загрузить состояние: {exc}")
|
||
return state
|
||
|
||
if not isinstance(raw, dict):
|
||
return state
|
||
|
||
for key in state:
|
||
if key in raw:
|
||
state[key] = raw[key]
|
||
|
||
state["queue"] = state["queue"] if isinstance(state["queue"], list) else []
|
||
state["queue_index"] = int(state.get("queue_index") or -1)
|
||
try:
|
||
state["position_sec"] = float(state.get("position_sec") or 0.0)
|
||
except Exception:
|
||
state["position_sec"] = 0.0
|
||
state["paused"] = bool(state.get("paused"))
|
||
return state
|
||
|
||
def _save_state(self) -> None:
|
||
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with self._state_lock:
|
||
payload = dict(self._state)
|
||
try:
|
||
with open(STATE_FILE, "w", encoding="utf-8") as file:
|
||
json.dump(payload, file, ensure_ascii=False, indent=2)
|
||
except Exception as exc:
|
||
print(f"⚠️ Музыка: не удалось сохранить состояние: {exc}")
|
||
|
||
def _set_state(self, **updates: Any) -> None:
|
||
with self._state_lock:
|
||
self._state.update(updates)
|
||
self._state["updated_at"] = time.strftime("%Y-%m-%dT%H:%M:%S")
|
||
self._save_state()
|
||
|
||
def _get_state_value(self, key: str, default: Any = None) -> Any:
|
||
with self._state_lock:
|
||
return self._state.get(key, default)
|
||
|
||
def _current_song(self) -> Optional[Song]:
|
||
payload = self._get_state_value("current_song")
|
||
if not isinstance(payload, dict):
|
||
return None
|
||
song = Song.from_state(payload)
|
||
if not song.song_id:
|
||
return None
|
||
return song
|
||
|
||
def _queue(self) -> list[Song]:
|
||
queue_raw = self._get_state_value("queue", [])
|
||
if not isinstance(queue_raw, list):
|
||
return []
|
||
|
||
songs: list[Song] = []
|
||
for item in queue_raw:
|
||
if isinstance(item, dict):
|
||
song = Song.from_state(item)
|
||
if song.song_id:
|
||
songs.append(song)
|
||
return songs
|
||
|
||
def _set_queue(self, queue: list[Song], index: int) -> None:
|
||
safe_index = max(0, min(index, len(queue) - 1)) if queue else -1
|
||
current_song = queue[safe_index].to_state() if queue else None
|
||
self._set_state(
|
||
queue=[song.to_state() for song in queue],
|
||
queue_index=safe_index,
|
||
current_song=current_song,
|
||
position_sec=0.0,
|
||
paused=False,
|
||
source="navidrome",
|
||
)
|
||
|
||
def _auth_params(self, include_format: bool = True) -> dict[str, str]:
|
||
salt = os.urandom(6).hex()
|
||
token = hashlib.md5(f"{self.password}{salt}".encode("utf-8")).hexdigest()
|
||
params = {
|
||
"u": self.username,
|
||
"t": token,
|
||
"s": salt,
|
||
"v": "1.16.1",
|
||
"c": "alexander-smart-speaker",
|
||
}
|
||
if include_format:
|
||
params["f"] = "json"
|
||
return params
|
||
|
||
def _request_json(
|
||
self,
|
||
endpoint: str,
|
||
params: Optional[dict[str, Any]] = None,
|
||
timeout: int = 20,
|
||
) -> dict[str, Any]:
|
||
if not self.base_url:
|
||
raise NavidromeUnavailableError("NAVIDROME_URL не настроен")
|
||
|
||
url = f"{self.base_url}/rest/{endpoint}"
|
||
full_params = self._auth_params(include_format=True)
|
||
if params:
|
||
full_params.update(params)
|
||
|
||
try:
|
||
response = requests.get(url, params=full_params, timeout=timeout)
|
||
response.raise_for_status()
|
||
payload = response.json()
|
||
except Exception as exc:
|
||
raise NavidromeUnavailableError(f"Navidrome API недоступен: {exc}") from exc
|
||
|
||
subsonic = payload.get("subsonic-response", {})
|
||
if subsonic.get("status") != "ok":
|
||
error = subsonic.get("error", {})
|
||
message = error.get("message") or "Неизвестная ошибка Navidrome"
|
||
raise NavidromeUnavailableError(message)
|
||
return subsonic
|
||
|
||
def _stream_url(self, song_id: str) -> str:
|
||
params = self._auth_params(include_format=False)
|
||
params["id"] = song_id
|
||
query = urlencode(params)
|
||
return f"{self.base_url}/rest/stream.view?{query}"
|
||
|
||
def _ensure_initialized(self) -> None:
|
||
if not self.initialize():
|
||
raise NavidromeUnavailableError(self._init_error or "Navidrome недоступен")
|
||
|
||
def _ensure_list(self, value: Any) -> list[Any]:
|
||
if value is None:
|
||
return []
|
||
if isinstance(value, list):
|
||
return value
|
||
return [value]
|
||
|
||
def _normalize(self, text: str) -> str:
|
||
text = text.lower().replace("ё", "е").strip()
|
||
text = re.sub(r"[^a-zа-я0-9/\\\s_-]+", " ", text)
|
||
text = re.sub(r"\s+", " ", text).strip()
|
||
return text
|
||
|
||
def _fuzzy_candidates(self, query: str, values: list[str]) -> list[str]:
|
||
normalized_query = self._normalize(query)
|
||
if not normalized_query:
|
||
return []
|
||
|
||
scored: list[tuple[float, str]] = []
|
||
for value in values:
|
||
normalized_value = self._normalize(value)
|
||
if not normalized_value:
|
||
continue
|
||
|
||
ratio = difflib.SequenceMatcher(
|
||
None,
|
||
normalized_query,
|
||
normalized_value,
|
||
).ratio()
|
||
score = ratio
|
||
if normalized_query in normalized_value:
|
||
score += 0.4
|
||
if normalized_value in normalized_query:
|
||
score += 0.2
|
||
scored.append((score, value))
|
||
|
||
if not scored:
|
||
return []
|
||
|
||
scored.sort(key=lambda item: item[0], reverse=True)
|
||
best_score = scored[0][0]
|
||
threshold = max(0.45, best_score - 0.08)
|
||
|
||
picked = [value for score, value in scored if score >= threshold]
|
||
if not picked and scored[0][0] >= 0.35:
|
||
return [scored[0][1]]
|
||
return picked
|
||
|
||
def _get_random_queue(self, size: int = 20) -> list[Song]:
|
||
response = self._request_json("getRandomSongs.view", {"size": size})
|
||
songs = self._ensure_list(response.get("randomSongs", {}).get("song"))
|
||
queue = [Song.from_payload(song) for song in songs if song.get("id")]
|
||
if not queue:
|
||
raise NavidromeCommandError("В Navidrome не найдено треков для воспроизведения")
|
||
random.shuffle(queue)
|
||
return queue
|
||
|
||
def _fetch_genres(self) -> list[str]:
|
||
cache_timestamp, genres = self._genres_cache
|
||
if genres and (time.time() - cache_timestamp < 600):
|
||
return genres
|
||
|
||
response = self._request_json("getGenres.view")
|
||
genre_items = self._ensure_list(response.get("genres", {}).get("genre"))
|
||
genre_values = [str(item.get("value", "")).strip() for item in genre_items]
|
||
genre_values = [item for item in genre_values if item]
|
||
self._genres_cache = (time.time(), genre_values)
|
||
return genre_values
|
||
|
||
def _build_folder_index(self) -> None:
|
||
if self._folder_index and (time.time() - self._folder_index_built_at < 900):
|
||
return
|
||
|
||
albums: list[dict[str, Any]] = []
|
||
offset = 0
|
||
page_size = 200
|
||
|
||
while True:
|
||
response = self._request_json(
|
||
"getAlbumList2.view",
|
||
{
|
||
"type": "alphabeticalByName",
|
||
"size": page_size,
|
||
"offset": offset,
|
||
},
|
||
)
|
||
page = self._ensure_list(response.get("albumList2", {}).get("album"))
|
||
if not page:
|
||
break
|
||
albums.extend(page)
|
||
if len(page) < page_size:
|
||
break
|
||
offset += page_size
|
||
|
||
if not albums:
|
||
self._folder_index = {}
|
||
self._folder_index_built_at = time.time()
|
||
return
|
||
|
||
index: dict[str, dict[str, Any]] = {}
|
||
|
||
def add_key(raw_key: str, song: Song) -> None:
|
||
normalized = self._normalize(raw_key)
|
||
if not normalized:
|
||
return
|
||
item = index.setdefault(normalized, {"name": raw_key.strip(), "songs": {}})
|
||
item["songs"][song.song_id] = song
|
||
|
||
for album in albums:
|
||
album_id = str(album.get("id", "")).strip()
|
||
if not album_id:
|
||
continue
|
||
|
||
album_payload = self._request_json("getAlbum.view", {"id": album_id})
|
||
song_items = self._ensure_list(album_payload.get("album", {}).get("song"))
|
||
|
||
for raw_song in song_items:
|
||
if not raw_song.get("id"):
|
||
continue
|
||
song = Song.from_payload(raw_song)
|
||
folder_path = song.path.replace("\\", "/")
|
||
segments = [segment for segment in folder_path.split("/") if segment]
|
||
if len(segments) <= 1:
|
||
continue
|
||
|
||
folders = segments[:-1]
|
||
for idx, segment in enumerate(folders):
|
||
add_key(segment, song)
|
||
add_key("/".join(folders[: idx + 1]), song)
|
||
|
||
self._folder_index = index
|
||
self._folder_index_built_at = time.time()
|
||
|
||
def _songs_by_genre(self, query: str) -> tuple[list[str], list[Song]]:
|
||
genres = self._fetch_genres()
|
||
matched = self._fuzzy_candidates(query, genres)
|
||
if not matched:
|
||
raise NavidromeCommandError(f"Жанр '{query}' не найден в Navidrome")
|
||
|
||
songs_map: dict[str, Song] = {}
|
||
for genre in matched:
|
||
response = self._request_json(
|
||
"getSongsByGenre.view",
|
||
{"genre": genre, "count": 300},
|
||
)
|
||
songs = self._ensure_list(response.get("songsByGenre", {}).get("song"))
|
||
for raw_song in songs:
|
||
if raw_song.get("id"):
|
||
song = Song.from_payload(raw_song)
|
||
songs_map[song.song_id] = song
|
||
|
||
if not songs_map:
|
||
raise NavidromeCommandError(f"В жанре '{matched[0]}' нет доступных треков")
|
||
|
||
songs_list = list(songs_map.values())
|
||
random.shuffle(songs_list)
|
||
return matched, songs_list
|
||
|
||
def _songs_by_folder(self, query: str) -> tuple[list[str], list[Song]]:
|
||
self._build_folder_index()
|
||
if not self._folder_index:
|
||
raise NavidromeCommandError("В Navidrome не удалось построить индекс папок")
|
||
|
||
matched = self._fuzzy_candidates(query, list(self._folder_index.keys()))
|
||
if not matched:
|
||
raise NavidromeCommandError(f"Папка '{query}' не найдена в библиотеке")
|
||
|
||
songs_map: dict[str, Song] = {}
|
||
matched_labels: list[str] = []
|
||
|
||
for key in matched:
|
||
entry = self._folder_index.get(key)
|
||
if not entry:
|
||
continue
|
||
matched_labels.append(entry.get("name", key))
|
||
for song_id, song in entry.get("songs", {}).items():
|
||
songs_map[song_id] = song
|
||
|
||
if not songs_map:
|
||
raise NavidromeCommandError(f"В папке '{query}' нет доступных треков")
|
||
|
||
songs_list = list(songs_map.values())
|
||
random.shuffle(songs_list)
|
||
return matched_labels, songs_list
|
||
|
||
def _search_songs(self, query: str, count: int = 50) -> list[Song]:
|
||
response = self._request_json(
|
||
"search3.view",
|
||
{
|
||
"query": query,
|
||
"songCount": count,
|
||
"artistCount": 0,
|
||
"albumCount": 0,
|
||
},
|
||
)
|
||
songs = self._ensure_list(response.get("searchResult3", {}).get("song"))
|
||
result = [Song.from_payload(song) for song in songs if song.get("id")]
|
||
random.shuffle(result)
|
||
return result
|
||
|
||
def _remove_stale_socket(self) -> None:
|
||
socket_path = Path(self._mpv_socket)
|
||
if socket_path.exists():
|
||
try:
|
||
socket_path.unlink()
|
||
except Exception:
|
||
pass
|
||
|
||
def _is_mpv_alive(self) -> bool:
|
||
if self._mpv_process is None:
|
||
return False
|
||
|
||
exit_code = self._mpv_process.poll()
|
||
if exit_code is None:
|
||
return True
|
||
|
||
self._mpv_process = None
|
||
self._remove_stale_socket()
|
||
return False
|
||
|
||
def _suppress_autonext(self, seconds: float = 2.0) -> None:
|
||
self._autonext_suppress_until = max(self._autonext_suppress_until, time.time() + seconds)
|
||
|
||
def _autonext_allowed(self) -> bool:
|
||
return time.time() >= self._autonext_suppress_until
|
||
|
||
def _auto_advance_if_track_finished(self) -> None:
|
||
if self._is_mpv_alive():
|
||
return
|
||
if not self._autonext_allowed():
|
||
return
|
||
if bool(self._get_state_value("paused", False)):
|
||
return
|
||
|
||
if not self._autonext_lock.acquire(blocking=False):
|
||
return
|
||
|
||
try:
|
||
if self._is_mpv_alive():
|
||
return
|
||
if not self._autonext_allowed():
|
||
return
|
||
if bool(self._get_state_value("paused", False)):
|
||
return
|
||
|
||
queue = self._queue()
|
||
if not queue:
|
||
if self._current_song() is not None:
|
||
self._set_state(current_song=None, position_sec=0.0, paused=False)
|
||
return
|
||
|
||
current_index = int(self._get_state_value("queue_index", -1) or -1)
|
||
next_index = (current_index + 1) % len(queue) if current_index >= 0 else 0
|
||
song = self._play_queue_index(queue, next_index, start_sec=0.0)
|
||
self._set_state(last_action="autonext")
|
||
print(f"⏭️ Автопереход: {song.title} от {song.artist}")
|
||
except Exception as exc:
|
||
self._suppress_autonext(seconds=5.0)
|
||
print(f"⚠️ Автопереход на следующий трек не удался: {exc}")
|
||
finally:
|
||
self._autonext_lock.release()
|
||
|
||
def _mpv_wait_for_ipc(self, timeout: float = 3.0) -> bool:
|
||
deadline = time.time() + timeout
|
||
while time.time() < deadline:
|
||
if os.path.exists(self._mpv_socket):
|
||
return True
|
||
time.sleep(0.05)
|
||
return False
|
||
|
||
def _mpv_ipc(self, command: list[Any]) -> Any:
|
||
if not os.path.exists(self._mpv_socket):
|
||
raise NavidromeUnavailableError("IPC сокет mpv недоступен")
|
||
|
||
request_payload = json.dumps({"command": command}, ensure_ascii=False) + "\n"
|
||
|
||
try:
|
||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
|
||
client.settimeout(1.5)
|
||
client.connect(self._mpv_socket)
|
||
client.sendall(request_payload.encode("utf-8"))
|
||
response = client.recv(8192).decode("utf-8", errors="ignore").strip()
|
||
except Exception as exc:
|
||
raise NavidromeUnavailableError(f"Ошибка IPC mpv: {exc}") from exc
|
||
|
||
if not response:
|
||
return None
|
||
|
||
try:
|
||
payload = json.loads(response)
|
||
except json.JSONDecodeError:
|
||
return None
|
||
|
||
error_value = payload.get("error")
|
||
if error_value not in (None, "success", "property unavailable"):
|
||
raise NavidromeUnavailableError(f"Ошибка mpv: {error_value}")
|
||
return payload.get("data")
|
||
|
||
def _sync_position_snapshot(self) -> None:
|
||
if not self._is_mpv_alive():
|
||
return
|
||
|
||
try:
|
||
paused = bool(self._mpv_ipc(["get_property", "pause"]))
|
||
except Exception:
|
||
paused = bool(self._get_state_value("paused", False))
|
||
|
||
try:
|
||
position = float(self._mpv_ipc(["get_property", "time-pos"]) or 0.0)
|
||
except Exception:
|
||
position = float(self._get_state_value("position_sec", 0.0) or 0.0)
|
||
|
||
self._set_state(paused=paused, position_sec=max(0.0, position))
|
||
|
||
def _snapshot_loop(self) -> None:
|
||
while not self._snapshot_stop.wait(1.0):
|
||
try:
|
||
self._sync_position_snapshot()
|
||
self._auto_advance_if_track_finished()
|
||
except Exception:
|
||
continue
|
||
|
||
def _stop_mpv(self) -> None:
|
||
self._suppress_autonext(seconds=2.0)
|
||
if self._is_mpv_alive():
|
||
try:
|
||
self._mpv_ipc(["quit"])
|
||
except Exception:
|
||
pass
|
||
|
||
if self._mpv_process is not None:
|
||
try:
|
||
self._mpv_process.terminate()
|
||
self._mpv_process.wait(timeout=2)
|
||
except Exception:
|
||
try:
|
||
self._mpv_process.kill()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
self._mpv_process = None
|
||
|
||
self._remove_stale_socket()
|
||
|
||
def _start_song(self, song: Song, start_sec: float = 0.0) -> None:
|
||
self._ensure_initialized()
|
||
self._suppress_autonext(seconds=2.0)
|
||
self._stop_mpv()
|
||
self._remove_stale_socket()
|
||
|
||
stream_url = self._stream_url(song.song_id)
|
||
command = [
|
||
"mpv",
|
||
"--no-video",
|
||
"--quiet",
|
||
"--force-window=no",
|
||
f"--input-ipc-server={self._mpv_socket}",
|
||
]
|
||
if start_sec > 0:
|
||
command.append(f"--start={start_sec:.3f}")
|
||
command.append(stream_url)
|
||
|
||
self._mpv_process = subprocess.Popen(
|
||
command,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
)
|
||
|
||
if not self._mpv_wait_for_ipc():
|
||
raise NavidromeUnavailableError("Не удалось запустить mpv IPC")
|
||
|
||
self._set_state(
|
||
current_song=song.to_state(),
|
||
position_sec=max(0.0, float(start_sec)),
|
||
paused=False,
|
||
source="navidrome",
|
||
)
|
||
|
||
def _play_queue_index(self, queue: list[Song], index: int, start_sec: float = 0.0) -> Song:
|
||
if not queue:
|
||
raise NavidromeCommandError("Очередь воспроизведения пуста")
|
||
|
||
safe_index = max(0, min(index, len(queue) - 1))
|
||
song = queue[safe_index]
|
||
self._start_song(song, start_sec=start_sec)
|
||
self._set_state(
|
||
queue=[item.to_state() for item in queue],
|
||
queue_index=safe_index,
|
||
current_song=song.to_state(),
|
||
position_sec=max(0.0, float(start_sec)),
|
||
paused=False,
|
||
source="navidrome",
|
||
)
|
||
return song
|
||
|
||
def can_resume_context(self) -> bool:
|
||
if self._is_mpv_alive():
|
||
try:
|
||
paused = bool(self._mpv_ipc(["get_property", "pause"]))
|
||
if paused:
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
current_song = self._current_song()
|
||
if not current_song:
|
||
return False
|
||
|
||
return (
|
||
bool(self._get_state_value("paused", False))
|
||
and str(self._get_state_value("last_action", "")).lower() == "pause"
|
||
)
|
||
|
||
def play_random(self, contextual_resume: bool = False) -> str:
|
||
self._ensure_initialized()
|
||
|
||
if contextual_resume and self.can_resume_context():
|
||
return self.resume()
|
||
|
||
queue = self._get_random_queue(size=20)
|
||
song = self._play_queue_index(queue, 0, start_sec=0.0)
|
||
self._set_state(last_action="play_random")
|
||
return f"Включаю случайный трек: {song.title} от {song.artist}"
|
||
|
||
def play_query(self, query: str) -> str:
|
||
self._ensure_initialized()
|
||
songs = self._search_songs(query, count=50)
|
||
if not songs:
|
||
raise NavidromeCommandError(f"Не нашёл трек по запросу '{query}' в Navidrome")
|
||
|
||
index = random.randrange(len(songs))
|
||
song = self._play_queue_index(songs, index, start_sec=0.0)
|
||
self._set_state(last_action="play_query")
|
||
return f"Включаю {song.title} от {song.artist}"
|
||
|
||
def play_genre(self, genre_query: str) -> str:
|
||
self._ensure_initialized()
|
||
matched_genres, songs = self._songs_by_genre(genre_query)
|
||
song = self._play_queue_index(songs, 0, start_sec=0.0)
|
||
self._set_state(last_action="play_genre")
|
||
genre_label = matched_genres[0]
|
||
return f"Включаю жанр {genre_label}. Сейчас {song.title} от {song.artist}"
|
||
|
||
def play_folder(self, folder_query: str) -> str:
|
||
self._ensure_initialized()
|
||
matched_folders, songs = self._songs_by_folder(folder_query)
|
||
song = self._play_queue_index(songs, 0, start_sec=0.0)
|
||
self._set_state(last_action="play_folder")
|
||
folder_label = matched_folders[0] if matched_folders else folder_query
|
||
return f"Включаю папку {folder_label}. Сейчас {song.title} от {song.artist}"
|
||
|
||
def _pause_impl(self, *, toggle: bool) -> str:
|
||
self._ensure_initialized()
|
||
|
||
if self._is_mpv_alive():
|
||
try:
|
||
paused_now = bool(self._mpv_ipc(["get_property", "pause"]))
|
||
except Exception:
|
||
paused_now = bool(self._get_state_value("paused", False))
|
||
|
||
if paused_now:
|
||
if toggle:
|
||
self._mpv_ipc(["set_property", "pause", False])
|
||
self._set_state(paused=False, last_action="resume")
|
||
song = self._current_song()
|
||
if song:
|
||
return f"Продолжаю: {song.title} от {song.artist}"
|
||
return "Продолжаю воспроизведение"
|
||
self._sync_position_snapshot()
|
||
self._set_state(paused=True, last_action="pause")
|
||
return "Уже на паузе."
|
||
|
||
self._mpv_ipc(["set_property", "pause", True])
|
||
time.sleep(0.1)
|
||
position = float(self._mpv_ipc(["get_property", "time-pos"]) or 0.0)
|
||
self._set_state(paused=True, position_sec=max(0.0, position), last_action="pause")
|
||
return "Пауза. Продолжу с того же места."
|
||
|
||
if self._current_song():
|
||
if bool(self._get_state_value("paused", False)):
|
||
if toggle:
|
||
return self.resume()
|
||
return "Уже на паузе."
|
||
self._set_state(paused=True, last_action="pause")
|
||
return "Пауза сохранена. Продолжу с прошлого места."
|
||
|
||
raise NavidromeCommandError("Сейчас ничего не играет")
|
||
|
||
def pause(self) -> str:
|
||
"""Toggle pause: second 'pause' resumes from stored position."""
|
||
return self._pause_impl(toggle=True)
|
||
|
||
def pause_strict(self) -> str:
|
||
"""Strict pause without auto-resume toggle."""
|
||
return self._pause_impl(toggle=False)
|
||
|
||
def resume(self) -> str:
|
||
self._ensure_initialized()
|
||
|
||
if self._is_mpv_alive():
|
||
paused = bool(self._mpv_ipc(["get_property", "pause"]))
|
||
if paused:
|
||
self._mpv_ipc(["set_property", "pause", False])
|
||
self._set_state(paused=False, last_action="resume")
|
||
song = self._current_song()
|
||
if song:
|
||
return f"Продолжаю: {song.title} от {song.artist}"
|
||
return "Продолжаю воспроизведение"
|
||
return "Музыка уже играет"
|
||
|
||
song = self._current_song()
|
||
if not song:
|
||
raise NavidromeCommandError("Нет сохранённого трека для продолжения")
|
||
|
||
start_sec = float(self._get_state_value("position_sec", 0.0) or 0.0)
|
||
self._start_song(song, start_sec=start_sec)
|
||
self._set_state(paused=False, last_action="resume")
|
||
return f"Продолжаю: {song.title} от {song.artist}"
|
||
|
||
def next_track(self) -> str:
|
||
self._ensure_initialized()
|
||
queue = self._queue()
|
||
|
||
if not queue:
|
||
queue = self._get_random_queue(size=20)
|
||
next_index = 0
|
||
else:
|
||
current_index = int(self._get_state_value("queue_index", 0) or 0)
|
||
next_index = (current_index + 1) % len(queue)
|
||
|
||
song = self._play_queue_index(queue, next_index, start_sec=0.0)
|
||
self._set_state(last_action="next")
|
||
return f"Следующий трек: {song.title} от {song.artist}"
|
||
|
||
def previous_track(self) -> str:
|
||
self._ensure_initialized()
|
||
queue = self._queue()
|
||
|
||
if not queue:
|
||
queue = self._get_random_queue(size=20)
|
||
previous_index = 0
|
||
else:
|
||
current_index = int(self._get_state_value("queue_index", 0) or 0)
|
||
previous_index = (current_index - 1) % len(queue)
|
||
|
||
song = self._play_queue_index(queue, previous_index, start_sec=0.0)
|
||
self._set_state(last_action="previous")
|
||
return f"Предыдущий трек: {song.title} от {song.artist}"
|
||
|
||
def current_track(self) -> str:
|
||
if self._is_mpv_alive():
|
||
self._sync_position_snapshot()
|
||
song = self._current_song()
|
||
|
||
if song is None:
|
||
return "Сейчас ничего не играет"
|
||
|
||
paused = bool(self._get_state_value("paused", False))
|
||
position_sec = int(float(self._get_state_value("position_sec", 0.0) or 0.0))
|
||
|
||
minutes = position_sec // 60
|
||
seconds = position_sec % 60
|
||
position_text = f"{minutes}:{seconds:02d}"
|
||
|
||
status = "На паузе" if paused else "Сейчас играет"
|
||
return f"{status}: {song.title} от {song.artist}. Позиция {position_text}"
|
||
|
||
|
||
class MusicController:
|
||
"""Voice command router for music providers."""
|
||
|
||
def __init__(self) -> None:
|
||
self.navidrome = NavidromeProvider()
|
||
self.spotify = SpotifyProvider()
|
||
aliases = sorted(
|
||
{
|
||
alias.lower().replace("ё", "е").strip()
|
||
for alias in WAKE_WORD_ALIASES
|
||
if alias and alias.strip()
|
||
},
|
||
key=len,
|
||
reverse=True,
|
||
)
|
||
if aliases:
|
||
self._wakeword_prefix_re = re.compile(
|
||
rf"^(?:{'|'.join(re.escape(alias) for alias in aliases)})(?:[\s,.:;!?-]+|$)",
|
||
flags=re.IGNORECASE,
|
||
)
|
||
else:
|
||
self._wakeword_prefix_re = None
|
||
|
||
def _with_fallback(
|
||
self,
|
||
nav_action: Callable[[], str],
|
||
spotify_action: Callable[[], str],
|
||
) -> str:
|
||
try:
|
||
return nav_action()
|
||
except NavidromeCommandError as exc:
|
||
return str(exc)
|
||
except NavidromeUnavailableError as exc:
|
||
spotify_response = spotify_action()
|
||
return (
|
||
"Navidrome недоступен, переключаюсь на Spotify. "
|
||
f"{spotify_response}"
|
||
f" (Причина: {exc})"
|
||
)
|
||
|
||
def pause_for_stop_word(self) -> Optional[str]:
|
||
"""
|
||
Pause music for generic stop-words ("стоп", "хватит", etc).
|
||
Returns response text only when something was really paused.
|
||
"""
|
||
try:
|
||
return self.navidrome.pause_strict()
|
||
except (NavidromeCommandError, NavidromeUnavailableError):
|
||
pass
|
||
|
||
spotify_response = self.spotify.pause_music()
|
||
spotify_lower = spotify_response.lower()
|
||
if "пауз" in spotify_lower or "pause" in spotify_lower:
|
||
return spotify_response
|
||
return None
|
||
|
||
def _is_pause_command(self, text: str) -> bool:
|
||
return bool(
|
||
re.search(
|
||
r"^(пауза|поставь на паузу|останови|стоп музыка|выключи музыку|pause)$",
|
||
text,
|
||
)
|
||
)
|
||
|
||
def _is_resume_command(self, text: str) -> bool:
|
||
return bool(
|
||
re.search(
|
||
r"^(продолжи|продолжай|возобнови|сними с паузы|resume|continue|играй дальше)$",
|
||
text,
|
||
)
|
||
)
|
||
|
||
def _is_next_command(self, text: str) -> bool:
|
||
return bool(
|
||
re.search(
|
||
r"^(следующ(ий|ая|ее)?( трек)?|дальше|скип|пропусти|next|skip)$",
|
||
text,
|
||
)
|
||
)
|
||
|
||
def _is_previous_command(self, text: str) -> bool:
|
||
return bool(
|
||
re.search(
|
||
r"^(предыдущ(ий|ая|ее)?( трек)?|назад|верни( назад)?|previous|back)$",
|
||
text,
|
||
)
|
||
)
|
||
|
||
def _is_current_command(self, text: str) -> bool:
|
||
return bool(
|
||
re.search(
|
||
r"^(что( сейчас)? играет|какая песня|что за (песня|трек|музыка)|what.*(playing|song)|current track)$",
|
||
text,
|
||
)
|
||
)
|
||
|
||
def _normalize_command_text(self, text: str) -> str:
|
||
normalized = text.lower().replace("ё", "е").strip()
|
||
normalized = re.sub(r"\s+", " ", normalized)
|
||
if self._wakeword_prefix_re is not None:
|
||
normalized = self._wakeword_prefix_re.sub("", normalized, count=1).strip()
|
||
# STT часто добавляет завершающую пунктуацию: "включи музыку."
|
||
normalized = re.sub(r"[.!?,;:…]+$", "", normalized).strip()
|
||
return normalized
|
||
|
||
def _sanitize_query(self, query: str) -> Optional[str]:
|
||
value = query.strip()
|
||
value = re.sub(r"\s+", " ", value)
|
||
value = re.sub(r"^[\"'`«»“”()\[\]{}<>\-–—.,!?;:…/\\\s]+", "", value)
|
||
value = re.sub(r"[\"'`«»“”()\[\]{}<>\-–—.,!?;:…/\\\s]+$", "", value)
|
||
if not value:
|
||
return None
|
||
if not re.search(r"[0-9a-zа-яё]", value, flags=re.IGNORECASE):
|
||
return None
|
||
return value
|
||
|
||
def _extract_genre_query(self, text: str) -> Optional[str]:
|
||
patterns = [
|
||
r"^(?:включи|поставь|играй)\s+(?:жанр|genre)\s+(.+)$",
|
||
r"^включи\s+музыку\s+жанра\s+(.+)$",
|
||
]
|
||
for pattern in patterns:
|
||
match = re.search(pattern, text)
|
||
if match:
|
||
return self._sanitize_query(match.group(1))
|
||
return None
|
||
|
||
def _extract_folder_query(self, text: str) -> Optional[str]:
|
||
patterns = [
|
||
r"^(?:включи|поставь|играй)\s+(?:из\s+)?папк(?:у|е|и|а)?\s+(.+)$",
|
||
r"^включи\s+музыку\s+из\s+папки\s+(.+)$",
|
||
]
|
||
for pattern in patterns:
|
||
match = re.search(pattern, text)
|
||
if match:
|
||
return self._sanitize_query(match.group(1))
|
||
return None
|
||
|
||
def _extract_play_query(self, text: str) -> tuple[bool, Optional[str]]:
|
||
patterns = [
|
||
r"^(?:play music|включи музыку|поставь музыку|играй музыку)[.!?,;:…]*$",
|
||
r"^(?:включи|поставь|играй|play)\s+(.+)$",
|
||
]
|
||
|
||
for pattern in patterns:
|
||
match = re.search(pattern, text)
|
||
if not match:
|
||
continue
|
||
|
||
if not match.groups():
|
||
return True, None
|
||
|
||
query = match.group(1).strip()
|
||
query = re.sub(r"\b(музыку|песню|трек|song|track|music)\b", " ", query)
|
||
query = self._sanitize_query(query)
|
||
return True, query
|
||
|
||
return False, None
|
||
|
||
def handle_semantic_action(self, action: str, query: Optional[str] = None) -> Optional[str]:
|
||
normalized_action = str(action or "").strip().lower()
|
||
normalized_query = self._sanitize_query(query or "") if query else None
|
||
|
||
if normalized_action in {"play", "play_random", "random"}:
|
||
if normalized_query:
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_query(normalized_query),
|
||
lambda: self.spotify.play_music(normalized_query),
|
||
)
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_random(contextual_resume=True),
|
||
lambda: self.spotify.play_music(None),
|
||
)
|
||
|
||
if normalized_action in {"play_query", "search"}:
|
||
if not normalized_query:
|
||
return None
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_query(normalized_query),
|
||
lambda: self.spotify.play_music(normalized_query),
|
||
)
|
||
|
||
if normalized_action == "play_genre":
|
||
if not normalized_query:
|
||
return "Уточни жанр, который нужно включить."
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_genre(normalized_query),
|
||
lambda: self.spotify.play_music(normalized_query),
|
||
)
|
||
|
||
if normalized_action == "play_folder":
|
||
if not normalized_query:
|
||
return "Уточни папку, из которой нужно включить музыку."
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_folder(normalized_query),
|
||
lambda: self.spotify.play_music(normalized_query),
|
||
)
|
||
|
||
if normalized_action in {"pause", "stop"}:
|
||
return self._with_fallback(self.navidrome.pause, self.spotify.pause_toggle)
|
||
|
||
if normalized_action in {"resume", "continue"}:
|
||
return self._with_fallback(self.navidrome.resume, self.spotify.resume_music)
|
||
|
||
if normalized_action in {"next", "skip"}:
|
||
return self._with_fallback(self.navidrome.next_track, self.spotify.next_track)
|
||
|
||
if normalized_action in {"previous", "back"}:
|
||
return self._with_fallback(
|
||
self.navidrome.previous_track,
|
||
self.spotify.previous_track,
|
||
)
|
||
|
||
if normalized_action in {"current", "status", "what_playing"}:
|
||
return self._with_fallback(
|
||
self.navidrome.current_track,
|
||
self.spotify.get_current_track,
|
||
)
|
||
|
||
return None
|
||
|
||
def parse_command(self, text: str) -> Optional[str]:
|
||
text_lower = self._normalize_command_text(text)
|
||
if not text_lower:
|
||
return None
|
||
|
||
genre_query = self._extract_genre_query(text_lower)
|
||
if genre_query:
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_genre(genre_query),
|
||
lambda: self.spotify.play_music(genre_query),
|
||
)
|
||
|
||
folder_query = self._extract_folder_query(text_lower)
|
||
if folder_query:
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_folder(folder_query),
|
||
lambda: self.spotify.play_music(folder_query),
|
||
)
|
||
|
||
if self._is_pause_command(text_lower):
|
||
return self._with_fallback(self.navidrome.pause, self.spotify.pause_toggle)
|
||
|
||
if self._is_resume_command(text_lower):
|
||
return self._with_fallback(self.navidrome.resume, self.spotify.resume_music)
|
||
|
||
if self._is_next_command(text_lower):
|
||
return self._with_fallback(self.navidrome.next_track, self.spotify.next_track)
|
||
|
||
if self._is_previous_command(text_lower):
|
||
return self._with_fallback(
|
||
self.navidrome.previous_track,
|
||
self.spotify.previous_track,
|
||
)
|
||
|
||
if self._is_current_command(text_lower):
|
||
return self._with_fallback(
|
||
self.navidrome.current_track,
|
||
self.spotify.get_current_track,
|
||
)
|
||
|
||
play_matched, play_query = self._extract_play_query(text_lower)
|
||
if play_matched:
|
||
if play_query:
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_query(play_query),
|
||
lambda: self.spotify.play_music(play_query),
|
||
)
|
||
return self._with_fallback(
|
||
lambda: self.navidrome.play_random(contextual_resume=True),
|
||
lambda: self.spotify.play_music(None),
|
||
)
|
||
|
||
return None
|
||
|
||
|
||
_music_controller: Optional[MusicController] = None
|
||
|
||
|
||
def get_music_controller() -> MusicController:
|
||
"""Get singleton music controller."""
|
||
global _music_controller
|
||
if _music_controller is None:
|
||
_music_controller = MusicController()
|
||
return _music_controller
|