Source code for pylav.players.manager

from __future__ import annotations

import asyncio
import contextlib
import pathlib
from collections.abc import Iterator
from typing import TYPE_CHECKING

import asyncpg
import discord

from pylav.constants.config import ENABLE_NODE_RESUMING
from pylav.events.player import PlayerConnectedEvent
from pylav.exceptions.node import NoNodeAvailableException
from pylav.helpers.format.strings import shorten_string
from pylav.logging import getLogger
from pylav.nodes.node import Node
from pylav.players.player import Player
from pylav.players.query.obj import Query
from pylav.storage.models.player.config import PlayerConfig
from pylav.storage.models.player.state import PlayerState

try:
    from redbot.core.i18n import Translator

    _ = Translator("PyLav", pathlib.Path(__file__))
except ImportError:
    Translator = None

    def _(string: str) -> str:
        return string


if TYPE_CHECKING:
    from pylav.core.client import Client

LOGGER = getLogger("PyLav.PlayerManager")


[docs] class PlayerController: """Represents the player manager that contains all the players. len(x): Returns the total amount of cached players. iter(x): Returns an iterator of all the players cached. Attributes ---------- default_player_class: :class:`BasePlayer` The player that the player manager is initialized with. bot: :class:`discord.Client` The client that the player manager is initialized with. client: :class:`Client` The client that the player manager is initialized with. """ __slots__ = ("_players", "default_player_class", "bot", "client", "_global_player_config") _global_player_config: PlayerConfig def __init__(self, lavalink: Client, player: type[Player] = Player): if not issubclass(player, Player): raise ValueError("Player must implement Player") self.client = lavalink self.bot = lavalink.bot self._players: dict[int, Player] = {} self.default_player_class = player def __len__(self): return len(self._players)
[docs] def __iter__(self) -> Iterator[tuple[int, Player]]: """Returns an iterator that yields a tuple of (guild_id, player)""" yield from self.players.items()
@property def players(self) -> dict[int, Player]: """Returns a dictionary of all players in manager.""" return self._players @property def global_config(self) -> PlayerConfig: return self._global_player_config @property def connected_players(self) -> list[Player]: """Returns a list of all the connected players""" return [p for p in self.players.values() if p.is_connected] @property def playing_players(self) -> list[Player]: """Returns a list of all the playing players""" return [p for p in self.players.values() if p.is_active] @property def not_playing_players(self) -> list[Player]: """Returns a list of all the not playing players""" return [p for p in self.players.values() if not p.is_active] @property def paused_players(self) -> list[Player]: """Returns a list of all the paused players""" return [p for p in self.players.values() if p.paused] @property def empty_players(self) -> list[Player]: """Returns a list of all the empty players""" return [p for p in self.players.values() if p.is_empty]
[docs] async def initialize(self): self._global_player_config = self.client.player_config_manager.get_global_config() self.client.scheduler.add_job( self.update_bot_activity, trigger="interval", seconds=5, max_instances=1, replace_existing=True, name="update_bot_activity", coalesce=True, id=f"{self.bot.user.id}-update_bot_activity", )
[docs] async def destroy(self, guild_id: int, requester: discord.Member | None): """ Removes a player from cache, and also Lavalink if applicable. Ensure you have disconnected the given guild_id from the voicechannel first, if connected. Warning ------- This should only be used if you know what you're doing. Players should never be destroyed unless they have been moved to another :class:`Node`. Parameters ---------- guild_id: int The guild_id associated with the player to remove. requester: :class:`discord.Member` The member requesting the player to be removed. """ if guild_id not in self.players: return player = self.players.pop(guild_id) await player.disconnect(requester=requester, maybe_resuming=ENABLE_NODE_RESUMING) # noinspection PyProtectedMember player.node._logger.debug("Successfully destroyed player %s", guild_id)
[docs] async def save_and_restore(self, guild_id: int): await asyncio.sleep(5) if player := self.players.pop(guild_id, None): await player.save() await player.disconnect(requester=self.client.bot.user) player_state = await self.client.player_state_db_manager.fetch_player(guild_id) if player_state: await self._restore_player(player_state)
[docs] def find_all(self, predicate=None): """Returns a list of players that match the given predicate. Parameters ---------- predicate: Optional[:class:`function`] A predicate to return specific players. Defaults to `None`. Returns ------- List[:class:`Player`] """ return [p for p in self.players.values() if bool(predicate(p))] if predicate else list(self.players.values())
[docs] async def remove(self, guild_id: int) -> None: """Removes a player from the internal cache. Parameters ---------- guild_id: :class:`int` The player that will be removed. """ if guild_id in self.players: player = self.players.pop(guild_id) player.cleanup()
[docs] def get(self, guild_id: int) -> Player: """ Gets a player from cache. Parameters ---------- guild_id: :class:`int` The guild_id associated with the player to get. Returns ------- Optional[:class:`Player`] """ return self.players.get(guild_id)
[docs] async def create( self, channel: discord.channel.VocalGuildChannel, endpoint: str = None, node: Node = None, self_deaf: bool = None, requester: discord.Member = None, feature: str | None = None, ) -> Player: """ Creates a player if one doesn't exist with the given information. If node is provided, a player will be created on that node. If endpoint is provided, PyLav will attempt to parse the region from the endpoint and return a node in the parsed region. If node, region and endpoint are left unspecified, or region/endpoint selection fails, PyLav will fall back to the node with the lowest penalty. Region can be omitted if node is specified and vice-versa. Parameters ---------- channel: :class:`discord.channel.VocalGuildChannel` The voice channel to connect to. endpoint: :class:`str` The address of the Discord voice server. Defaults to `None`. node: :class:`Node` The node to put the player on. Defaults to `None` and a node with the lowest penalty is chosen. requester: :class:`discord.Member` The member requesting the player. Defaults to `None`. feature: Optional[:class:`str`] The feature to look for for the initial Node. Defaults to `None`. self_deaf: :class:`bool` Whether the player should deafen themselves. Defaults to `False`. Returns ------- :class:`Player` """ if p := self.players.get(channel.guild.id): if channel.id != p.channel_id: await p.move_to(requester, channel) return p region = self.client.node_manager.get_region(endpoint) best_node = node or await self.client.node_manager.find_best_node(region, feature=feature or None) if not best_node: raise NoNodeAvailableException("No available nodes!") player_config = self.client.player_config_manager.get_config(channel.guild.id) forced_channel_id = await player_config.fetch_forced_channel_id() self_deafen = await self.client.player_config_manager.get_self_deaf(channel.guild.id) if forced_channel_id != 0: act_channel = channel.guild.get_channel_or_thread(forced_channel_id) else: act_channel = channel player: Player = await act_channel.connect( cls=Player, self_deaf=self_deafen if self_deaf is None else self_deaf # type: ignore ) self.players[channel.guild.id] = player try: best_node = node or await self.client.node_manager.find_best_node( region, feature=feature or None, coordinates=player.coordinates ) if not best_node: raise NoNodeAvailableException(_("There are no nodes available currently.")) await player.post_init( node=best_node, player_manager=self, config=player_config, pylav=self.client, requester=requester ) await player.move_to( requester, channel=player.channel, self_deaf=self_deafen if self_deaf is None else self_deaf ) best_node.dispatch_event(PlayerConnectedEvent(player, requester or self.client.bot.user)) # noinspection PyProtectedMember best_node._logger.debug("Successfully created player for %s", channel.guild.id) return player except Exception: LOGGER.error("Failed to create player for %s", channel.guild.id) LOGGER.debug("Error in create player for %s", channel.guild.id, exc_info=True) await player.disconnect(requester=requester) raise
[docs] async def save_all_players(self) -> None: LOGGER.debug("Saving player states") await self.client.player_state_db_manager.save_players( [await p.to_dict() for p in self.connected_players if p.is_active] )
[docs] async def restore_player_states(self) -> None: # noinspection PyProtectedMember await asyncio.wait_for(self.client._wait_for_playlists.wait(), timeout=600) LOGGER.info("Restoring player states") while not self.client.node_manager.available_nodes: await asyncio.sleep(1) tasks = [ asyncio.create_task(self._restore_player(p)) async for p in self.client.player_state_db_manager.fetch_all_players() ] await asyncio.gather(*tasks, return_exceptions=True) LOGGER.info("Restored %s player states", len(self.players))
async def _restore_player(self, player_state: PlayerState) -> None: player = self.players.get(player_state.id) if player is not None: # Player was started before restore LOGGER.debug("Player %s initialized before restore, skipping restore", player_state.id) await self.client.player_state_db_manager.delete_player(guild_id=player_state.id) return channel = self.client.bot.get_channel(player_state.channel_id) if not channel: # Channel does not exist anymore LOGGER.debug("Channel for %s could not be found, skipping player restore", player_state.id) await self.client.player_state_db_manager.delete_player(guild_id=player_state.id) return if not player_state.current: # Player was empty LOGGER.debug("Player %s does not have a current track, skipping restore", player_state.id) await self.client.player_state_db_manager.delete_player(guild_id=player_state.id) return requester = self.client.bot.user try: async with asyncio.timeout(10): discord_player = await self.create( channel=channel, requester=requester, feature=(await Query.from_base64(player_state.current["encoded"], lazy=True)).requires_capability, self_deaf=player_state.self_deaf, ) except Exception: LOGGER.exception("Failed to restore player %s - %s", player_state.id, player_state.channel_id) raise # noinspection PyProtectedMember if not discord_player._restored: await discord_player.restore(player_state, requester)
[docs] async def shutdown(self) -> None: LOGGER.info("Shutting down all players") tasks = [ asyncio.create_task(self.destroy(guild_id=guild_id, requester=self.client.bot.user)) for guild_id in self.players ] await asyncio.gather(*tasks, return_exceptions=True)
[docs] async def update_bot_activity(self) -> None: """ Updates the bot's activity. """ with contextlib.suppress(asyncio.exceptions.CancelledError, asyncpg.exceptions.CannotConnectNowError): if not await self.client.lib_db_manager.get_config().fetch_update_bot_activity(): return playing_players = len(self.playing_players) activities = self.bot.guilds[0].me.activities activity = discord.utils.find(lambda a: a.type == discord.ActivityType.listening, activities) if playing_players > 1: if (not activity) or _("Music in {number_of_servers_variable_do_not_translate} servers").format( number_of_servers_variable_do_not_translate=playing_players ) not in activity.name: LOGGER.verbose("Updating bot activity to %s", f"Listening to Music in {playing_players} servers") await self.bot.change_presence( activity=discord.Activity( type=discord.ActivityType.listening, name=shorten_string( max_length=100, string=_("Music in {number_of_servers_variable_do_not_translate} servers").format( number_of_servers_variable_do_not_translate=playing_players ), ), ) ) elif playing_players == 1: current_player = self.playing_players[0] if current_player.current is None: return track_name = await current_player.current.get_track_display_name( max_length=40, author=True, unformatted=True, escape=False, ) if activity and track_name in activity.name: return LOGGER.verbose("Updating bot activity to %s", f"Listening to {track_name}") await self.bot.change_presence( activity=discord.Activity(type=discord.ActivityType.listening, name=track_name) ) elif playing_players == 0 and activity: LOGGER.verbose("Removing bot activity") await self.bot.change_presence(activity=None)