from __future__ import annotations
import asyncio
import contextlib
import datetime
import pathlib
import typing
from collections import namedtuple
from typing import TYPE_CHECKING
import asyncpg
import discord
from pylav.constants.config import (
TASK_TIMER_UPDATE_BUNDLED_EXTERNAL_PLAYLISTS_DAYS,
TASK_TIMER_UPDATE_BUNDLED_PLAYLISTS_DAYS,
TASK_TIMER_UPDATE_EXTERNAL_PLAYLISTS_DAYS,
)
from pylav.constants.playlists import (
BUNDLED_DEEZER_PLAYLIST_IDS,
BUNDLED_EXTERNAL_PLAYLISTS,
BUNDLED_PLAYLIST_IDS,
BUNDLED_PYLAV_PLAYLISTS,
BUNDLED_SPOTIFY_PLAYLIST_IDS,
)
from pylav.core.context import PyLavContext
from pylav.exceptions.database import EntryNotFoundException
from pylav.helpers.time import get_now_utc
from pylav.logging import getLogger
from pylav.nodes.api.responses.rest_api import PlaylistResponse
from pylav.players.query.obj import Query
from pylav.players.tracks.obj import Track
from pylav.storage.database.tables.playlists import PlaylistRow
from pylav.storage.models.playlist import Playlist
from pylav.type_hints.bot import DISCORD_BOT_TYPE
from pylav.type_hints.dict_typing import JSON_DICT_TYPE
if TYPE_CHECKING:
from pylav.core.client import Client
LOGGER = getLogger("PyLav.Database.Controller.Playlist")
try:
from redbot.core.i18n import Translator # type: ignore
_ = Translator("PyLav", pathlib.Path(__file__))
except ImportError:
Translator = None
def _(string: str) -> str:
return string
[docs]
class PlaylistController:
__slots__ = ("_client",)
def __init__(self, client: Client) -> None:
self._client = client
@property
def client(self) -> Client:
return self._client
[docs]
@staticmethod
def get_playlist(**kwargs: typing.Any) -> Playlist:
if identifier := kwargs.pop("identifier", kwargs.pop("id", None)):
return Playlist(id=identifier)
else:
raise ValueError("Playlist identifier not provided")
[docs]
async def get_bundled_playlists(self) -> list[Playlist]:
return [
p for playlist in BUNDLED_PLAYLIST_IDS if (p := self.get_playlist(identifier=playlist)) and await p.exists()
]
[docs]
async def get_playlist_by_name(self, playlist_name: str, limit: int = None) -> list[Playlist]:
query = (
PlaylistRow.select(PlaylistRow.id)
.where(PlaylistRow.name.ilike(f"%{playlist_name.lower()}%"))
.output(load_json=True, nested=True)
)
if limit is not None:
query = query.limit(limit)
playlists = await query
if not playlists:
raise EntryNotFoundException(
_("Playlist with the name {playlist_name_variable_do_not_translate} was not found.").format(
playlist_name_variable_do_not_translate=playlist_name
)
)
return [self.get_playlist(**playlist) for playlist in playlists]
[docs]
async def get_playlist_by_id(self, playlist_id: int | str) -> Playlist:
try:
playlist_id = int(playlist_id)
response = await PlaylistRow.exists().where(PlaylistRow.id == playlist_id)
except ValueError as e:
raise EntryNotFoundException(f"Playlist with id {playlist_id} not found") from e
if response:
return self.get_playlist(identifier=playlist_id)
else:
raise EntryNotFoundException(f"Playlist with id {playlist_id} not found")
[docs]
async def get_playlist_by_name_or_id(self, playlist_name_or_id: int | str, limit: int = None) -> list[Playlist]:
try:
return [await self.get_playlist_by_id(playlist_name_or_id)]
except EntryNotFoundException:
return await self.get_playlist_by_name(playlist_name_or_id, limit=limit)
[docs]
async def get_playlists_by_author(self, author: int, return_empty: bool = True) -> list[Playlist]:
playlists = (
await PlaylistRow.select(PlaylistRow.id)
.where(PlaylistRow.author == author)
.output(load_json=True, nested=True)
)
if playlists or return_empty:
return [self.get_playlist(**playlist) for playlist in playlists]
else:
raise EntryNotFoundException(f"Playlist with author {author} not found")
[docs]
async def get_playlists_by_scope(self, scope: int, return_empty: bool = True) -> list[Playlist]:
playlists = (
await PlaylistRow.select(PlaylistRow.id)
.where(PlaylistRow.scope == scope)
.output(load_json=True, nested=True)
)
if playlists or return_empty:
return [self.get_playlist(**playlist) for playlist in playlists]
else:
raise EntryNotFoundException(f"Playlist with scope {scope} not found")
[docs]
async def get_all_playlists(self) -> typing.AsyncIterator[Playlist]:
for entry in await PlaylistRow.select(PlaylistRow.id).output(load_json=True, nested=True):
yield self.get_playlist(**entry)
[docs]
async def get_external_playlists(self, *ids: int, ignore_ids: list[int] = None) -> typing.AsyncIterator[Playlist]:
if ignore_ids is None:
ignore_ids = []
base_query = PlaylistRow.select(PlaylistRow.id).output(load_json=True, nested=True)
if ids and ignore_ids:
query = base_query.where(
PlaylistRow.url.is_not_null() & PlaylistRow.id.in_(ids) & PlaylistRow.id.not_in(ignore_ids)
)
elif ignore_ids:
query = base_query.where(PlaylistRow.url.is_not_null() & PlaylistRow.id.not_in(ignore_ids))
else:
query = base_query.where(PlaylistRow.url.is_not_null() & PlaylistRow.id.is_in(ids))
for entry in await query:
yield self.get_playlist(**entry)
[docs]
async def create_or_update_playlist(
self,
identifier: int,
scope: int,
author: int,
name: str,
url: str | None = None,
tracks: list[str | JSON_DICT_TYPE | Track] = None,
) -> Playlist:
playlist = self.get_playlist(identifier=identifier)
await playlist.bulk_update(
scope=scope,
author=author,
name=name,
url=url,
tracks=tracks or [],
)
return playlist
[docs]
async def delete_playlist(self, playlist_id: int) -> None:
await self.get_playlist(identifier=playlist_id).delete()
[docs]
async def create_or_update_global_playlist(
self, identifier: int, author: int, name: str, url: str | None = None, tracks: list[str | Track] = None
) -> Playlist:
return await self.create_or_update_playlist(
identifier=identifier, scope=self._client.bot.user.id, author=author, name=name, url=url, tracks=tracks
)
[docs]
async def create_or_update_user_playlist(
self, identifier: int, author: int, name: str, url: str | None = None, tracks: list[str | Track] = None
) -> Playlist:
return await self.create_or_update_playlist(
identifier=identifier, scope=author, author=author, name=name, url=url, tracks=tracks
)
[docs]
async def create_or_update_channel_playlist(
self,
channel: discord.abc.MessageableChannel,
author: int,
name: str,
url: str | None = None,
tracks: list[str] = None,
) -> Playlist:
return await self.create_or_update_playlist(
identifier=channel.id, scope=channel.id, author=author, name=name, url=url, tracks=tracks
)
[docs]
async def create_or_update_guild_playlist(
self, guild: discord.Guild, author: int, name: str, url: str | None = None, tracks: list[str] = None
) -> Playlist:
return await self.create_or_update_playlist(
identifier=guild.id, scope=guild.id, author=author, name=name, url=url, tracks=tracks
)
[docs]
async def create_or_update_vc_playlist(
self,
vc: discord.channel.VocalGuildChannel,
author: int,
name: str,
url: str | None = None,
tracks: list[str] = None,
) -> Playlist:
return await self.create_or_update_playlist(
identifier=vc.id, scope=vc.id, author=author, name=name, url=url, tracks=tracks
)
[docs]
async def get_all_for_user(
self,
requester: int,
empty: bool = False,
*,
vc: discord.channel.VocalGuildChannel = None,
guild: discord.Guild = None,
channel: discord.abc.MessageableChannel = None,
) -> tuple[list[Playlist], list[Playlist], list[Playlist], list[Playlist], list[Playlist]]:
"""
Gets all playlists a user has access to in a given context.
Globals, User specific, Guild specific, Channel specific, VC specific.
"""
global_playlists = [
p
for p in await self.get_playlists_by_scope(scope=self._client.bot.user.id, return_empty=True)
if (not empty or await p.size())
]
user_playlists = [
p
for p in await self.get_playlists_by_scope(scope=requester, return_empty=True)
if (not empty or await p.size())
]
vc_playlists = []
guild_playlists = []
channel_playlists = []
if vc is not None:
vc_playlists = [
p
for p in await self.get_playlists_by_scope(scope=vc.id, return_empty=True)
if (not empty or await p.size())
]
if guild is not None:
guild_playlists = [
p
for p in await self.get_playlists_by_scope(scope=guild.id, return_empty=True)
if (not empty or await p.size())
]
if channel is not None:
channel_playlists = [
p
for p in await self.get_playlists_by_scope(scope=channel.id, return_empty=True)
if (not empty or await p.size())
]
return global_playlists, user_playlists, guild_playlists, channel_playlists, vc_playlists
[docs]
async def get_manageable_playlists(
self, requester: discord.abc.User, bot: DISCORD_BOT_TYPE, *, name_or_id: str | None = None
) -> list[Playlist]:
if name_or_id:
try:
playlists = await self.get_playlist_by_name_or_id(name_or_id)
except EntryNotFoundException:
playlists = []
else:
try:
playlists = [p async for p in self.get_all_playlists()]
except EntryNotFoundException:
playlists = []
returning_list = []
if playlists:
for playlist in playlists:
if await playlist.can_manage(requester=requester, bot=bot):
returning_list.append(playlist)
return returning_list
async def _update_bundled_playlists(self, playlist_id, url, source, name, old_time_stamp):
try:
ctx = typing.cast(
PyLavContext,
namedtuple("PyLavContext", "message author")(
message=discord.Object(id=playlist_id), author=discord.Object(id=self._client.bot.user.id)
),
)
playlist = await Playlist.from_yaml(context=ctx, url=url, scope=self._client.bot.user.id)
LOGGER.info("Updating bundled playlist - %s - %s", playlist_id, f"[{source}] {name}")
except Exception as exc:
LOGGER.error(
"Built-in playlist couldn't be parsed - %s, report this error",
f"[{source}] {name}",
exc_info=exc,
)
playlist = None
if not playlist:
# noinspection PyProtectedMember
await self.client._config.update_next_execution_update_bundled_playlists(old_time_stamp)
return
[docs]
async def update_bundled_playlists(self, *playlist_ids: int) -> None:
# NOTICE: Update the BUNDLED_PLAYLIST_IDS constant in the constants.py file
with contextlib.suppress(asyncio.exceptions.CancelledError, asyncpg.exceptions.CannotConnectNowError):
await self.client.node_manager.wait_until_ready()
# noinspection PyProtectedMember
await self.client._maybe_wait_until_bundled_node(await self.client.managed_node_is_enabled())
# noinspection PyProtectedMember
old_time_stamp = await self.client._config.fetch_next_execution_update_bundled_playlists()
id_filtered = {
playlist_id: BUNDLED_PYLAV_PLAYLISTS[playlist_id]
for playlist_id in playlist_ids
if playlist_id in BUNDLED_PYLAV_PLAYLISTS
}
if not id_filtered:
id_filtered = BUNDLED_PYLAV_PLAYLISTS
count = 0
tasks = []
for playlist_id, (name, url, source) in id_filtered.items():
tasks.append(self._update_bundled_playlists(playlist_id, url, source, name, old_time_stamp))
if count % 10 == 0:
await asyncio.gather(*tasks)
tasks = []
count += 1
if tasks:
await asyncio.gather(*tasks)
# noinspection PyProtectedMember
await self.client._config.update_next_execution_update_bundled_playlists(
get_now_utc() + datetime.timedelta(days=TASK_TIMER_UPDATE_BUNDLED_PLAYLISTS_DAYS)
)
# noinspection PyProtectedMember
self.client._wait_for_playlists.set()
LOGGER.info("Finished updating bundled playlists")
async def _update_bundled_external_playlist(
self, playlist_id, album_playlist, identifier, name, old_time_stamp, retry: int = 0
):
is_deezer = False
if retry > 3:
return
if (playlist_id in BUNDLED_SPOTIFY_PLAYLIST_IDS and not self.client._spotify_auth) or (
playlist_id in BUNDLED_DEEZER_PLAYLIST_IDS and not self.client._has_deezer_support
):
return
elif playlist_id in BUNDLED_SPOTIFY_PLAYLIST_IDS:
url = f"https://open.spotify.com/{album_playlist}/{identifier}"
elif playlist_id in BUNDLED_DEEZER_PLAYLIST_IDS:
url = f"https://www.deezer.com/en/{album_playlist}/{identifier}"
is_deezer = True
else:
LOGGER.debug("Unknown playlist id: %s", playlist_id)
return
tracks_raw = []
data = None
try:
LOGGER.info("Updating bundled external playlist - %s - %s", playlist_id, name)
query = await Query.from_string(url)
data: PlaylistResponse = await self.client.get_tracks(query, bypass_cache=True)
name = (
f"[{query.source_abbreviation}] {data.data.info.name}"
if data.data.info.name
else f"[{query.source_abbreviation}] {name}"
)
tracks_raw = data.data.tracks
except Exception as exc:
if (not data) and is_deezer:
await asyncio.sleep(1)
LOGGER.debug("Retrying Deezer playlist - %s (%s) (%s)", name, playlist_id, url)
await self._update_bundled_external_playlist(
playlist_id, album_playlist, identifier, name, old_time_stamp, retry=retry + 1
)
return
else:
LOGGER.error(
"Built-in external playlist couldn't be parsed - %s, report this error", name, exc_info=exc
)
LOGGER.debug("Built-in external playlist couldn't be parsed - %s (%r)", name, data, exc_info=exc)
data = None
if not data:
# noinspection PyProtectedMember
await self.client._config.update_next_execution_update_bundled_external_playlists(old_time_stamp)
return
if tracks_raw:
await self.create_or_update_global_playlist(
identifier=playlist_id, name=name, tracks=tracks_raw, author=self._client.bot.user.id, url=url
)
else:
await self.delete_playlist(playlist_id=playlist_id)
[docs]
async def update_bundled_external_playlists(self, *playlist_ids: int) -> None:
with contextlib.suppress(asyncio.exceptions.CancelledError, asyncpg.exceptions.CannotConnectNowError):
await self.client.node_manager.wait_until_ready()
# noinspection PyProtectedMember
await self.client._maybe_wait_until_bundled_node(await self.client.managed_node_is_enabled())
# noinspection PyProtectedMember
old_time_stamp = await self.client._config.fetch_next_execution_update_bundled_external_playlists()
# NOTICE: Update the BUNDLED_PLAYLIST_IDS constant in the constants.py file
id_filtered = {
playlist_id: BUNDLED_EXTERNAL_PLAYLISTS[playlist_id]
for playlist_id in playlist_ids
if playlist_id in BUNDLED_EXTERNAL_PLAYLISTS
}
if not id_filtered:
id_filtered = BUNDLED_EXTERNAL_PLAYLISTS
tasks = []
count = 0
for playlist_id, (identifier, name, album_playlist) in id_filtered.items():
tasks.append(
self._update_bundled_external_playlist(
playlist_id, album_playlist, identifier, name, old_time_stamp
)
)
if count % 10 == 0:
await asyncio.gather(*tasks)
tasks = []
count += 1
if tasks:
await asyncio.gather(*tasks)
# noinspection PyProtectedMember
await self.client._config.update_next_execution_update_bundled_external_playlists(
get_now_utc() + datetime.timedelta(days=TASK_TIMER_UPDATE_BUNDLED_EXTERNAL_PLAYLISTS_DAYS)
)
LOGGER.info("Finished updating bundled external playlists")
[docs]
async def update_external_playlists(self, *playlist_ids: int) -> None:
with contextlib.suppress(asyncio.exceptions.CancelledError, asyncpg.exceptions.CannotConnectNowError):
await self.client.node_manager.wait_until_ready()
# noinspection PyProtectedMember
await self.client._maybe_wait_until_bundled_node(await self.client.managed_node_is_enabled())
count = 0
tasks = []
async for playlist in self.get_external_playlists(*playlist_ids, ignore_ids=BUNDLED_PLAYLIST_IDS):
tasks.append(self._update_external_playlist(playlist))
if count % 10 == 0:
await asyncio.gather(*tasks)
tasks = []
count += 1
if tasks:
await asyncio.gather(*tasks)
# noinspection PyProtectedMember
await self.client._config.update_next_execution_update_external_playlists(
get_now_utc() + datetime.timedelta(days=TASK_TIMER_UPDATE_EXTERNAL_PLAYLISTS_DAYS)
)
LOGGER.info("Finished updating external playlists")
async def _update_external_playlist(self, playlist, retry: int = 0):
name = await playlist.fetch_name()
url = await playlist.fetch_url()
query = await Query.from_string(url)
response = None
try:
LOGGER.info("Updating external playlist - %s (%s)", name, playlist.id)
response: PlaylistResponse = await self.client.get_tracks(
query,
bypass_cache=True,
)
tracks_raw = response.data.tracks
new_name = response.data.info.name
new_name = f"[{query.source_abbreviation}] {new_name}" if new_name else None
if tracks_raw:
await playlist.update_tracks(tracks=tracks_raw)
if new_name and new_name != name:
await playlist.update_name(new_name)
except Exception as exc:
if (not response) and query.is_deezer:
await asyncio.sleep(1)
await self._update_external_playlist(playlist, retry=retry + 1)
return
LOGGER.error(
"External playlist couldn't be updated - %s (%s), report this error",
name,
playlist.id,
exc_info=exc,
)
[docs]
@staticmethod
async def count() -> int:
"""Returns the number of playlists in the database."""
return await PlaylistRow.count()