from __future__ import annotations
import asyncio
import collections
import contextlib
import datetime
import pathlib
import random
import time
from collections.abc import Coroutine
from itertools import islice
from typing import TYPE_CHECKING, Any, Literal
import asyncpg
import discord
from apscheduler.jobstores.base import JobLookupError
from dacite import from_dict
from discord import VoiceProtocol
from discord.abc import Messageable
from pylav.constants.config import DEFAULT_SEARCH_SOURCE, ENABLE_NODE_RESUMING
from pylav.constants.coordinates import REGION_TO_COUNTRY_COORDINATE_MAPPING
from pylav.constants.regex import VOICE_CHANNEL_ENDPOINT
from pylav.enums.plugins.sponsorblock import SegmentCategory
from pylav.events.node import NodeChangedEvent
from pylav.events.player import (
FiltersAppliedEvent,
PlayerAutoDisconnectedAloneEvent,
PlayerAutoDisconnectedEmptyQueueEvent,
PlayerAutoPausedEvent,
PlayerAutoResumedEvent,
PlayerDisconnectedEvent,
PlayerMovedEvent,
PlayerPausedEvent,
PlayerRepeatEvent,
PlayerRestoredEvent,
PlayerResumedEvent,
PlayerStoppedEvent,
PlayerUpdateEvent,
PlayerVolumeChangedEvent,
QuickPlayEvent,
)
from pylav.events.queue import (
QueueEndEvent,
QueueShuffledEvent,
QueueTrackPositionChangedEvent,
QueueTracksAddedEvent,
QueueTracksRemovedEvent,
)
from pylav.events.track import (
TrackAutoPlayEvent,
TrackEndEvent,
TrackExceptionEvent,
TrackPreviousRequestedEvent,
TrackResumedEvent,
TrackSeekEvent,
TrackSkippedEvent,
TrackStuckEvent,
)
from pylav.exceptions.database import EntryNotFoundException
from pylav.exceptions.node import (
NodeHasNoFiltersException,
NoNodeAvailableException,
NoNodeWithRequestFunctionalityAvailableException,
)
from pylav.exceptions.request import HTTPException
from pylav.exceptions.track import TrackNotFoundException
from pylav.extension.radio import RadioBrowser
from pylav.helpers.format.strings import format_time_dd_hh_mm_ss, format_time_string, shorten_string
from pylav.helpers.time import get_now_utc
from pylav.logging import getLogger
from pylav.nodes.api.responses.exceptions import LavalinkException
from pylav.nodes.api.responses.player import State
from pylav.nodes.api.responses.rest_api import LavalinkPlayer
from pylav.nodes.api.responses.track import Track as APITrack
from pylav.nodes.api.responses.websocket import TrackException
from pylav.nodes.node import Node
from pylav.players.filters import (
ChannelMix,
Distortion,
Echo,
Equalizer,
Karaoke,
LowPass,
Reverb,
Rotation,
Timescale,
Tremolo,
Vibrato,
Volume,
)
from pylav.players.filters.misc import FilterMixin
from pylav.players.query.obj import Query
from pylav.players.tracks.obj import Track
from pylav.players.utils import PlayerQueue, TrackHistoryQueue
from pylav.storage.models.player.config import PlayerConfig
from pylav.storage.models.player.state import PlayerState
from pylav.storage.models.playlist import Playlist
from pylav.type_hints.bot import DISCORD_BOT_TYPE, DISCORD_INTERACTION_TYPE
from pylav.type_hints.dict_typing import JSON_DICT_TYPE
try:
from redbot.core.i18n import Translator
_ = Translator("PyLav", pathlib.Path(__file__))
except ImportError:
Translator = None
def _(string: str) -> str:
return string
if TYPE_CHECKING:
from pylav.core.client import Client
from pylav.players.manager import PlayerController
[docs]
class Player(VoiceProtocol):
__slots__ = (
"ready",
"bot",
"client",
"_channel",
"channel_id",
"node",
"player_manager",
"_original_node",
"_voice_state",
"_region",
"_coordinates",
"_connected",
"connected_at",
"_paused",
"_config",
"stopped",
"_last_update",
"_last_position",
"position_timestamp",
"_ping",
"queue",
"history",
"current",
"_post_init_completed",
"_autoplay_playlist",
"_restored",
"_effect_enabled",
"_volume",
"_equalizer",
"_karaoke",
"_timescale",
"_tremolo",
"_vibrato",
"_rotation",
"_distortion",
"_lowpass",
"_echo",
"_reverb",
"_channelmix",
"_extras",
"_last_alone_paused_check",
"_was_alone_paused",
"_last_alone_dc_check",
"_last_empty_queue_check",
"_waiting_for_node",
"last_track",
"next_track",
"_global_config",
"_pylav",
"_discord_session_id",
"_logger",
"_last_track_stuck_check",
"_last_track_stuck_position",
"_paused_position",
)
_config: PlayerConfig
_global_config: PlayerConfig
_pylav: Client
def __init__(
self,
client: DISCORD_BOT_TYPE,
channel: discord.channel.VocalGuildChannel,
*,
node: Node = None,
):
super().__init__(client, channel)
self.__adding_lock = asyncio.Lock()
self.__reconnect_lock = asyncio.Lock()
self.__playing_lock = asyncio.Lock()
self.ready = asyncio.Event()
self.bot = self.client = client
self._channel = None
self.channel = channel
self.channel_id = channel.id
self.node: Node = node
self._logger = getLogger(f"PyLav.Player-{channel.guild.id}")
self.player_manager: PlayerController = None # type: ignore
self._original_node: Node = None # type: ignore
self._voice_state = {}
self._region = channel.rtc_region or "unknown_pylav"
self._coordinates = REGION_TO_COUNTRY_COORDINATE_MAPPING.get(self._region, (0, 0))
self._connected = False
self.connected_at = get_now_utc()
self.last_track: Track | None = None
self.next_track: Track | None = None
self._hashed_voice_state = None
self._discord_session_id = None
self._user_data = {}
self._paused = False
self.stopped = False
self._last_update = 0
self._last_position = 0
self._paused_position = 0
self.position_timestamp = 0
self._ping = 0
self.queue: PlayerQueue[Track] = PlayerQueue()
self.history: TrackHistoryQueue[Track] = TrackHistoryQueue(maxsize=100)
self.current: Track | None = None
self._post_init_completed = False
self._autoplay_playlist: Playlist | None = None
self._restored = False
# Filters
self._effect_enabled: bool = False
self._volume: Volume = Volume.default()
self._equalizer: Equalizer = Equalizer.default()
self._karaoke: Karaoke = Karaoke.default()
self._timescale: Timescale = Timescale.default()
self._tremolo: Tremolo = Tremolo.default()
self._vibrato: Vibrato = Vibrato.default()
self._rotation: Rotation = Rotation.default()
self._distortion: Distortion = Distortion.default()
self._echo: Echo = Echo.default()
self._reverb: Reverb = Reverb.default()
self._low_pass: LowPass = LowPass.default()
self._channel_mix: ChannelMix = ChannelMix.default()
self._config = None # type: ignore
self._extras = {}
self._last_alone_paused_check = 0
self._was_alone_paused = False
self._last_alone_dc_check = 0
self._last_empty_queue_check = 0
self._last_track_stuck_check = 0
self._last_track_stuck_position = -1
self._waiting_for_node = asyncio.Event()
def __hash__(self):
return hash((self.channel.guild.id, self.channel_id))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__hash__() == other.__hash__()
def __repr__(self):
return (
f"<Player id={self.guild.id} "
f"channel={self.channel.id} "
f"playing={self.is_active} "
f"queue={self.queue.size()} "
f"node={self.node}>"
)
[docs]
def add_voice_to_payload(self, payload: JSON_DICT_TYPE) -> JSON_DICT_TYPE:
if not payload:
payload = {}
if {"sessionId", "token", "endpoint"} == self._voice_state.keys():
payload["voice"] = self._voice_state
return payload
[docs]
async def post_init(
self,
node: Node,
player_manager: PlayerController,
config: PlayerConfig,
pylav: Client,
requester: discord.Member = None,
) -> None:
# sourcery no-metrics
if self._post_init_completed:
return
self._pylav = pylav
self.player_manager = player_manager
self.node = node
self._config = config
self._global_config = player_manager.global_config
self._extras = await config.fetch_extras()
self._post_init_completed = True
player_state = await self.player_manager.client.player_state_db_manager.fetch_player(self.channel.guild.id)
if player_state:
try:
async with asyncio.timeout(10):
await self.restore(player=player_state, requester=requester or self.guild.me)
self._logger.verbose("Player restored in postinit - %s", self)
except Exception as e:
self._logger.error("Failed to restore player in postinit - %s", e)
await self._apply_filters_to_new_player(config, player_manager)
finally:
await self.player_manager.client.player_state_db_manager.delete_player(self.channel.guild.id)
else:
await self._apply_filters_to_new_player(config, player_manager)
await self._create_job_for_player()
self.ready.set()
async def _create_job_for_player(self) -> None:
now_time = get_now_utc()
self.player_manager.client.scheduler.add_job(
self.auto_dc_task,
trigger="interval",
seconds=5,
max_instances=1,
id=f"{self.bot.user.id}-{self.guild.id}-auto_dc_task",
replace_existing=True,
coalesce=True,
next_run_time=now_time + datetime.timedelta(seconds=3),
)
self.player_manager.client.scheduler.add_job(
self.auto_empty_queue_task,
trigger="interval",
seconds=5,
max_instances=1,
id=f"{self.bot.user.id}-{self.guild.id}-auto_empty_queue_task",
replace_existing=True,
coalesce=True,
next_run_time=now_time + datetime.timedelta(seconds=2),
)
self.player_manager.client.scheduler.add_job(
self.auto_pause_task,
trigger="interval",
seconds=5,
max_instances=1,
id=f"{self.bot.user.id}-{self.guild.id}-auto_pause_task",
replace_existing=True,
coalesce=True,
next_run_time=now_time + datetime.timedelta(seconds=1),
)
self.player_manager.client.scheduler.add_job(
self.auto_resume_task,
trigger="interval",
seconds=5,
max_instances=1,
id=f"{self.bot.user.id}-{self.guild.id}-auto_resume_task",
replace_existing=True,
coalesce=True,
next_run_time=now_time + datetime.timedelta(seconds=4),
)
self.player_manager.client.scheduler.add_job(
self.auto_save_task,
trigger="interval",
seconds=10,
max_instances=1,
id=f"{self.bot.user.id}-{self.guild.id}-auto_save_task",
replace_existing=True,
coalesce=True,
next_run_time=now_time + datetime.timedelta(seconds=10),
)
async def _apply_filters_to_new_player(self, config: PlayerConfig, player_manager: PlayerController) -> None:
self._volume = Volume(await player_manager.client.player_config_manager.get_volume(self.guild.id))
effects = await config.fetch_effects()
if (v := effects.get("volume", None)) and (f := Volume.from_dict(v)):
self._volume = f
if (
self.node.has_filter("equalizer")
and (eq := effects.get("equalizer", None))
and (f := Equalizer.from_dict(eq)) # noqa
):
self._equalizer = f
if (
self.node.has_filter("karaoke")
and (k := effects.get("karaoke", None))
and (f := Karaoke.from_dict(k)) # noqa
):
self._karaoke = f
if (
self.node.has_filter("timescale")
and (ts := effects.get("timescale", None))
and (f := Timescale.from_dict(ts)) # noqa
):
self._timescale = f
if (
self.node.has_filter("tremolo")
and (tr := effects.get("tremolo", None))
and (f := Tremolo.from_dict(tr)) # noqa
):
self._tremolo = f
if (
self.node.has_filter("vibrato")
and (vb := effects.get("vibrato", None))
and (f := Vibrato.from_dict(vb)) # noqa
):
self._vibrato = f
if (
self.node.has_filter("rotation")
and (ro := effects.get("rotation", None))
and (f := Rotation.from_dict(ro)) # noqa
):
self._rotation = f
if (
self.node.has_filter("distortion")
and (di := effects.get("distortion", None))
and (f := Distortion.from_dict(di)) # noqa
):
self._distortion = f
if (
self.node.has_filter("lowPass")
and (lo := effects.get("lowpass", None))
and (f := LowPass.from_dict(lo)) # noqa
):
self._low_pass = f
if (
self.node.has_filter("channelMix")
and (ch := effects.get("channel_mix", None))
and (f := ChannelMix.from_dict(ch)) # noqa
):
self._channel_mix = f
if self.node.has_filter("echo") and (echo := effects.get("echo", None)) and (f := Echo.from_dict(echo)): # noqa
self._echo = f
if (
self.node.has_filter("reverb")
and (reverb := effects.get("reverb", None))
and (f := Reverb.from_dict(reverb))
): # noqa
self._reverb = f
payload = {}
if any(
[
self.equalizer,
self.karaoke,
self.timescale,
self.tremolo,
self.vibrato,
self.rotation,
self.distortion,
self.low_pass,
self.channel_mix,
self.echo,
]
):
payload["filters"] = self.node.get_filter_payload(
player=self,
equalizer=self.equalizer,
karaoke=self.karaoke,
timescale=self.timescale,
tremolo=self.tremolo,
vibrato=self.vibrato,
rotation=self.rotation,
distortion=self.distortion,
low_pass=self.low_pass,
channel_mix=self.channel_mix,
pluginFilters=dict(echo=self.echo),
)
if self.volume_filter:
payload["volume"] = self.volume
if payload:
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload)
[docs]
async def update_current_duration(self) -> Track | None:
if not self.current:
return
if await self.current.is_spotify() or await self.current.is_apple_music():
await asyncio.sleep(1)
api_player = await self.fetch_node_player()
track = api_player.track
if track and self.current._processed.info.length != track.info.length:
object.__setattr__(self.current._processed.info, "length", track.info.length)
return self.current
@property
def paused(self) -> bool:
return self._paused
@paused.setter
def paused(self, value: bool) -> None:
self._paused = value
if value:
self._paused_position = self._last_position + ((time.time() * 1000) - self._last_update)
else:
self._paused_position = 0
@property
def channel(self) -> discord.channel.VocalGuildChannel:
return self._channel
@channel.setter
def channel(self, value: discord.channel.VocalGuildChannel) -> None:
if isinstance(value, (discord.VoiceChannel, discord.StageChannel)):
self._channel = value
self._region = value.rtc_region or "unknown_pylav"
self._coordinates = REGION_TO_COUNTRY_COORDINATE_MAPPING.get(self._region, (0, 0))
@property
def coordinates(self) -> tuple[int, int]:
return self._coordinates
@property
def ping(self) -> int:
return self._ping
@property
def region(self) -> str | None:
return self._region
@property
def config(self) -> PlayerConfig:
return self._config
@property
def lavalink(self) -> Client:
return self._pylav
@property
def pylav(self) -> Client:
return self._pylav
@property
def radio(self) -> RadioBrowser:
return self.pylav.radio_browser
[docs]
def vote_node_down(self) -> int:
return -1 if (self.node is None or not self.is_active) else self.node.down_vote(self)
[docs]
def voted(self) -> bool:
return self.node.voted(self)
[docs]
def unvote_node_down(self) -> int:
return -1 if (self.node is None or not self.is_active) else not self.node.down_unvote(self)
[docs]
async def text_channel(self) -> discord.abc.MessageableChannel:
return self.guild.get_channel_or_thread(await self.config.fetch_text_channel_id())
[docs]
async def set_text_channel(self, value: discord.abc.MessageableChannel) -> None:
await self.config.update_text_channel_id(text_channel_id=value.id if value else 0)
[docs]
async def notify_channel(self) -> discord.abc.MessageableChannel:
return self.guild.get_channel_or_thread(await self.config.fetch_notify_channel_id())
[docs]
async def set_notify_channel(self, value: discord.abc.MessageableChannel) -> None:
await self.config.update_notify_channel_id(notify_channel_id=value.id if value else 0)
[docs]
async def forced_vc(self) -> discord.abc.MessageableChannel:
return self.guild.get_channel_or_thread(await self.config.fetch_forced_channel_id())
[docs]
async def set_forced_vc(self, value: discord.abc.MessageableChannel) -> None:
await self.config.update_forced_channel_id(forced_channel_id=value.id)
[docs]
async def self_deaf(self) -> bool | None:
return (
await self.player_manager.client.player_config_manager.get_self_deaf(self.guild.id)
if self.player_manager
else None
)
[docs]
async def is_repeating(self) -> bool:
"""Whether the player is repeating tracks"""
if await self.config.fetch_repeat_queue() is True:
return True
return await self.config.fetch_repeat_current()
[docs]
async def autoplay_enabled(self) -> bool:
"""Whether autoplay is enabled"""
return bool(
await self.player_manager.client.player_config_manager.get_auto_play(self.guild.id) is True
and await self.get_auto_playlist() is not None
)
@property
def volume(self) -> int:
"""
The current volume.
"""
return self._volume.get_int_value()
@property
def volume_filter(self) -> Volume:
"""The currently applied Volume filter"""
return self._volume
@property
def equalizer(self) -> Equalizer:
"""The currently applied Equalizer filter"""
return self._equalizer
@property
def karaoke(self) -> Karaoke:
"""The currently applied Karaoke filter"""
return self._karaoke
@property
def timescale(self) -> Timescale:
"""The currently applied Timescale filter"""
return self._timescale
@property
def tremolo(self) -> Tremolo:
"""The currently applied Tremolo filter"""
return self._tremolo
@property
def vibrato(self) -> Vibrato:
"""The currently applied Vibrato filter"""
return self._vibrato
@property
def rotation(self) -> Rotation:
"""The currently applied Rotation filter"""
return self._rotation
@property
def distortion(self) -> Distortion:
"""The currently applied Distortion filter"""
return self._distortion
@property
def echo(self) -> Echo:
"""The currently applied Echo filter"""
return self._echo
@property
def reverb(self) -> Reverb:
"""The currently applied Echo filter"""
return self._reverb
@property
def low_pass(self) -> LowPass:
"""The currently applied Low Pass filter"""
return self._low_pass
@property
def channel_mix(self) -> ChannelMix:
"""The currently applied Channel Mix filter"""
return self._channel_mix
@property
def filters(self) -> list[FilterMixin]:
"""A list of all filters"""
return [
self.equalizer,
self.karaoke,
self.timescale,
self.tremolo,
self.vibrato,
self.rotation,
self.distortion,
self.echo,
self.low_pass,
self.channel_mix,
]
@property
def has_effects(self):
return any(f.changed for f in self.filters)
@property
def guild(self) -> discord.Guild:
return self.channel.guild
@property
def is_playing(self) -> bool:
"""Returns the player's track state"""
return self.is_active and not self.paused
@property
def is_active(self) -> bool:
"""Returns the player's track state"""
return self.is_connected and self.current is not None and not self.stopped
@property
def is_connected(self) -> bool:
"""Returns whether the player is connected to a voice-channel or not"""
return self.channel_id is not None
@property
def is_empty(self) -> bool:
"""Returns whether the player is empty or not"""
return sum(not i.bot for i in self.channel.members) == 0
[docs]
async def position(self) -> float:
"""Returns the position in the track, adjusted for delta since last update and the Timescale filter"""
if not self.is_active:
return 0
if self.paused:
return min(
self.timescale.adjust_position(self._paused_position)
if self.timescale.changed
else self._paused_position,
await self.current.duration(),
)
difference = time.time() * 1000 - self._last_update
position = self._last_position + difference
return min(
self.timescale.adjust_position(position) if self.timescale.changed else position,
await self.current.duration(),
)
@property
def estimated_position(self) -> float:
"""Returns the position in the track, adjusted for delta since last update"""
if not self.is_active:
return 0
if self.paused:
return min(
self.timescale.adjust_position(self._paused_position)
if self.timescale.changed
else self._paused_position,
self.current._duration,
)
difference = time.time() * 1000 - self._last_update
position = self._last_position + difference
return min(
self.timescale.adjust_position(position) if self.timescale.changed else position,
self.current._duration,
)
[docs]
async def fetch_player_stats(self, return_position: bool = False):
try:
player = await self.fetch_node_player()
except Exception: # noqa
return
if isinstance(player, HTTPException):
return
self._last_position = player.track.info.position if player.track else 0
self._last_update = time.time() * 1000
self.paused = player.paused
self._volume = Volume(player.volume)
self._connected = player.state.connected
self._ping = player.state.ping
if self.current:
self.current.last_known_position = self._last_position
object.__setattr__(self.current._processed.info, "length", player.track.info.length)
if return_position:
return player.track.info.position or 0 if player.track else 0
[docs]
async def fetch_position(self, skip_fetch: bool = False) -> float:
"""Returns the position in the track"""
pos = await self.position()
if skip_fetch:
return pos
try:
if not self.current:
return pos
# await self.fetch_player_stats()
except Exception: # noqa
return pos
return pos
[docs]
async def auto_pause_task(self):
with contextlib.suppress(
asyncio.exceptions.CancelledError,
):
if not self.ready.is_set():
return
if not self.is_connected:
self._logger.trace(
"Auto Pause task for %s fired while player is not connected to a voice channel - discarding",
self,
)
return
if (
(not self.paused)
and self.is_empty
and (
feature := await self.player_manager.client.player_config_manager.get_alone_pause(
guild_id=self.guild.id
)
).enabled
):
if not self._last_alone_paused_check:
self._logger.verbose(
"Auto Pause task for %s - Player is alone - starting countdown",
self,
)
self._last_alone_paused_check = time.time()
if (self._last_alone_paused_check + feature.time) <= time.time():
self._logger.debug(
"Auto Pause task for %s - Player in an empty channel for longer than %s seconds - Pausing",
self,
feature.time,
)
await self.set_pause(pause=True, requester=self.guild.me)
self._was_alone_paused = True
self._last_alone_paused_check = 0
self.player_manager.client.dispatch_event(PlayerAutoPausedEvent(self))
else:
self._last_alone_paused_check = 0
[docs]
async def auto_resume_task(self):
with contextlib.suppress(
asyncio.exceptions.CancelledError,
):
if not self.ready.is_set():
return
if not self._was_alone_paused:
self._logger.trace(
"Auto Resume task for %s fired while player is auto paused - discarding",
self,
)
return
if (
self.paused
and not self.is_empty
and (
feature := await self.player_manager.client.player_config_manager.get_alone_pause(
guild_id=self.guild.id
)
).enabled
):
self._logger.debug(
"Auto Resume task for %s - Player in an non-empty channel - Resuming",
self,
feature.time,
)
await self.set_pause(pause=False, requester=self.guild.me)
self._was_alone_paused = False
self.player_manager.client.dispatch_event(PlayerAutoResumedEvent(self))
[docs]
async def auto_dc_task(self):
with contextlib.suppress(
asyncio.exceptions.CancelledError, NoNodeAvailableException, asyncpg.exceptions.CannotConnectNowError
):
if not self.ready.is_set():
return
if not self.is_connected:
self._logger.trace(
"Auto Disconnect task fired while player is not connected to a voice channel - discarding",
)
return
if (
self.is_empty
and (
feature := await self.player_manager.client.player_config_manager.get_alone_dc(
guild_id=self.guild.id
)
).enabled
):
if not self._last_alone_dc_check:
self._logger.verbose(
"Auto Disconnect task - Player is alone - starting countdown",
)
self._last_alone_dc_check = time.time()
if (self._last_alone_dc_check + feature.time) <= time.time():
self._logger.debug(
"Auto Disconnect task - Player in an empty channel for longer than %s seconds "
"- Disconnecting",
feature.time,
)
await self.disconnect(requester=self.guild.me)
self._last_alone_dc_check = 0
self.player_manager.client.dispatch_event(PlayerAutoDisconnectedEmptyQueueEvent(self))
else:
self._last_alone_dc_check = 0
[docs]
async def auto_empty_queue_task(self):
with contextlib.suppress(
asyncio.exceptions.CancelledError, NoNodeAvailableException, asyncpg.exceptions.CannotConnectNowError
):
if not self.ready.is_set():
return
if not self.is_connected:
self._logger.trace(
"Auto Empty Queue task fired while player is not connected to a voice channel - discarding",
)
return
if self.current:
self._logger.trace("Auto Empty Queue task - Current track is not empty - discarding")
return
if (
self.queue.empty()
and (
feature := await self.player_manager.client.player_config_manager.get_empty_queue_dc(
guild_id=self.guild.id
)
).enabled
):
if not self._last_empty_queue_check:
self._logger.verbose(
"Auto Empty Queue task - Queue is empty - starting countdown",
)
self._last_empty_queue_check = time.time()
if (self._last_empty_queue_check + feature.time) <= time.time():
self._logger.debug(
"Auto Empty Queue task - Queue is empty for longer than %s seconds "
"- Stopping and disconnecting",
feature.time,
)
await self.stop(requester=self.guild.me)
await self.disconnect(requester=self.guild.me)
self._last_empty_queue_check = 0
self.player_manager.client.dispatch_event(PlayerAutoDisconnectedAloneEvent(self))
else:
self._last_empty_queue_check = 0
[docs]
async def auto_save_task(self):
with contextlib.suppress(
asyncio.exceptions.CancelledError, NoNodeAvailableException, asyncpg.exceptions.CannotConnectNowError
):
if not self.is_active:
self._logger.trace(
"Auto save task for %s fired while player is not active - discarding",
self,
)
return
self._logger.trace("Auto save task for %s - Saving the player at %s", self, get_now_utc())
await self.save()
[docs]
async def change_to_best_node(
self, feature: str = None, ops: bool = True, forced: bool = True, skip_position_fetch: bool = False
) -> Node | None:
"""
Returns the best node to play the current track.
Returns
-------
:class:`Node`
"""
if feature is None and self.current:
feature = await self.current.requires_capability()
node = await self.node.node_manager.find_best_node(
region=self.region, feature=feature, coordinates=self.coordinates
)
if not node:
self._logger.warning(
"No node with %s functionality available - Waiting for one to become available!", feature
)
node = await self.node.node_manager.find_best_node(
region=self.region, feature=feature, coordinates=self.coordinates, wait=True
)
if feature and not node:
self._logger.warning(
"No node with %s functionality available after one temporarily became available!", feature
)
raise NoNodeWithRequestFunctionalityAvailableException(
f"No node with {feature} functionality available", feature
)
if node != self.node or (not ops) or forced:
await self.change_node(node, ops=ops, skip_position_fetch=skip_position_fetch, forced=forced)
return node
[docs]
async def change_to_best_node_diff_region(
self, feature: str = None, ops: bool = True, skip_position_fetch: bool = False, forced: bool = False
) -> Node | None:
"""
Returns the best node to play the current track in a different region.
Returns
-------
:class:`Node`
"""
if feature is None and self.current:
feature = await self.current.requires_capability()
node = await self.node.node_manager.find_best_node(
not_region=self.region, feature=feature, coordinates=self.coordinates
)
if not node:
self._logger.warning(
"No node with %s functionality available - Waiting for one to become available!", feature
)
node = await self.node.node_manager.find_best_node(
region=self.region, feature=feature, coordinates=self.coordinates, wait=True
)
if feature and not node:
self._logger.warning(
"No node with %s functionality available after one temporarily became available!", feature
)
raise NoNodeWithRequestFunctionalityAvailableException(
f"No node with {feature} functionality available", feature
)
if node != self.node or (not ops) or forced:
await self.change_node(node, ops=ops, skip_position_fetch=skip_position_fetch, forced=forced)
return node
[docs]
def store(
self,
key: str,
value: Any,
) -> None:
"""
Stores a value in the player's memory storage.
Parameters
----------
value: Any
The value to store.
key: str
The key to store the value under.
"""
self._user_data[key] = value
[docs]
def fetch(self, key: object, default: Any = None) -> Any:
"""
Retrieves the related value from the stored user data.
Parameters
----------
key: :class:`object`
The key to fetch.
default: Optional[:class:`any`]
The object that should be returned if the key doesn't exist. Defaults to `None`.
Returns
-------
:class:`any`
"""
return self._user_data.get(key, default)
[docs]
def delete(self, key: object) -> None:
"""
Removes an item from the stored user data.
Parameters
----------
key: :class:`object`
The key to delete.
"""
with contextlib.suppress(KeyError):
del self._user_data[key]
[docs]
async def on_voice_server_update(self, data: dict) -> None:
if "token" in data and data["token"]:
self._voice_state.update({"token": data["token"]})
if "endpoint" in data and data["endpoint"]:
self._voice_state.update({"endpoint": data["endpoint"]})
if match := VOICE_CHANNEL_ENDPOINT.match(data["endpoint"]):
self._region = match.group("region").replace("-", "_")
self._coordinates = REGION_TO_COUNTRY_COORDINATE_MAPPING.get(self._region, (0, 0))
await self._dispatch_voice_update()
[docs]
async def on_voice_state_update(self, data: dict) -> None:
"""|coro|
An abstract method that is called when the client's voice state
has changed. This corresponds to ``VOICE_STATE_UPDATE``.
Parameters
------------
data: :class:`dict`
The raw :ddocs:`voice state payload <resources/voice#voice-state-object>`.
"""
self._voice_state.update({"sessionId": data["session_id"]})
self._discord_session_id = data["session_id"]
self.channel_id = data["channel_id"]
if not self.channel_id: # We're disconnecting
await self.disconnect(force=True, requester=self.guild.me)
return
if self.channel_id != int(self.channel_id):
self.channel = self.guild.get_channel(int(self.channel_id))
# Ensure we're in the correct voice channel
if (vc := await self.forced_vc()) and vc.id != int(self.channel_id):
self._logger.debug(
"Player was moved to %s, which is different than the forced voice channel; Moving to %s",
self.channel_id,
vc.id,
)
await self.move_to(channel=vc, requester=self.guild.me)
return
await self._dispatch_voice_update()
async def _dispatch_voice_update(self) -> None:
if {"sessionId", "token", "endpoint"} == self._voice_state.keys():
existing_session = await self.fetch_node_player()
if isinstance(existing_session, HTTPException) or (
existing_session.voice.sessionId != self._voice_state["sessionId"]
or existing_session.voice.token != self._voice_state["token"]
or existing_session.voice.endpoint != self._voice_state["endpoint"]
):
await self.node.patch_session_player(self.guild.id, payload={"voice": self._voice_state})
self._waiting_for_node.set()
self._hashed_voice_state = hash(tuple(self._voice_state.items()))
async def _query_to_track(
self,
requester: int,
track: Track | APITrack | dict | str | None,
query: Query = None,
) -> Track:
if not isinstance(track, Track):
track = await Track.build_track(
node=self.node, data=track, query=query, requester=requester, player_instance=self
)
else:
track._requester = requester
track._player = self
return track
[docs]
async def add(
self,
requester: int,
track: Track | APITrack | dict | str | None,
index: int = None,
query: Query = None,
) -> None:
"""
Adds a track to the queue.
Parameters
----------
requester: :class:`int`
The ID of the user who requested the track.
track: Union[:class:`Track`, :class:`dict`]
The track to add. Accepts either an Track or
a dict representing a track returned from Lavalink.
index: Optional[:class:`int`]
The index at which to add the track.
If index is left unspecified, the default behaviour is to append the track. Defaults to `None`.
query: Optional[:class:`Query`]
The query that was used to search for the track.
Returns
-------
:class:`None`
"""
async with self.__adding_lock:
at = await self._query_to_track(requester, track, query)
await self.queue.put([at], index=index)
if index is None:
await self.maybe_shuffle_queue(requester=requester)
self.next_track = None if self.queue.empty() else self.queue.raw_queue.popleft()
self.node.dispatch_event(QueueTracksAddedEvent(self, self.guild.get_member(requester), [at]))
[docs]
async def bulk_add(
self,
tracks_and_queries: list[Track | APITrack | dict | str | list[tuple[Track | APITrack | dict | str, Query]]],
requester: int,
index: int = None,
) -> None:
"""
Adds multiple tracks to the queue.
Parameters
----------
tracks_and_queries: list[Track | dict | str | list[tuple[Track | dict | str, Query]]]
A list of tuples containing the track and query.
requester: :class:`int`
The ID of the user who requested the tracks.
index: Optional[:class:`int`]
The index at which to add the tracks.
"""
async with self.__adding_lock:
output = []
is_list = isinstance(tracks_and_queries[0], (list, tuple))
for entry in tracks_and_queries:
track, query = entry if is_list else (entry, None)
track = await self._query_to_track(requester, track, query)
output.append(track)
await self.queue.put(output, index=index)
if index is None:
await self.maybe_shuffle_queue(requester=requester)
self.next_track = None if self.queue.empty() else self.queue.raw_queue.popleft()
self.node.dispatch_event(QueueTracksAddedEvent(self, self.guild.get_member(requester), output))
[docs]
async def previous(self, requester: discord.Member, bypass_cache: bool = False) -> None:
async with self.__playing_lock:
if self.history.empty():
raise TrackNotFoundException(_("There are no tracks currently in the player history."))
self.stopped = False
track = await self.history.get()
if self.current:
self.last_track = self.current
if await track.query() and not self.node.has_source(await track.requires_capability()):
self.current = None
await self.change_to_best_node(feature=await track.requires_capability(), skip_position_fetch=True)
self.current = track
payload = {"encodedTrack": track.encoded}
if self.volume_filter:
payload["volume"] = self.volume
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload, no_replace=False)
self.node.dispatch_event(TrackPreviousRequestedEvent(self, requester, track))
[docs]
async def quick_play(
self,
requester: discord.Member,
track: Track | APITrack | dict | str | None,
query: Query,
no_replace: bool = False,
bypass_cache: bool = False,
) -> None:
async with self.__playing_lock:
track = await Track.build_track(
node=self.node, data=track, query=query, requester=requester.id, player_instance=self
)
self.next_track = None
self.last_track = None
self.stopped = False
if self.current:
self.current.timestamp = self.fetch_position()
await self.queue.put([self.current], 0)
self.next_track = self.current
self.last_track = self.current
if await track.query() and not self.node.has_source(await track.requires_capability()):
self.current = None
await self.change_to_best_node(feature=await track.requires_capability(), skip_position_fetch=True)
self.current = track
if self.next_track is None and not self.queue.empty():
self.next_track = self.queue.raw_queue.popleft()
payload = {"encodedTrack": track.encoded}
if self.volume_filter:
payload["volume"] = self.volume
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload, no_replace=no_replace)
self.node.dispatch_event(QuickPlayEvent(self, requester, track))
[docs]
def next(self, requester: discord.Member = None, node: Node = None) -> Coroutine[Any, Any, None]:
return self.play(None, None, requester or self.bot.user, node=node) # type: ignore
[docs]
async def play(
self,
track: Track | APITrack | dict | str | None,
query: Query | None,
requester: discord.Member,
start_time: int = 0,
end_time: int = None,
no_replace: bool = False,
bypass_cache: bool = False,
node: Node = None,
) -> None: # sourcery skip: low-code-quality
"""Plays the given track.
Parameters
----------
track: Optional[Union[:class:`Track`, :class:`dict`]]
The track to play. If left unspecified, this will default
to the first track in the queue. Defaults to `None` so plays the next
song in queue. Accepts either an Track or a dict representing a track
returned from Lavalink.
start_time: Optional[:class:`int`]
Setting that determines the number of milliseconds to offset the track by.
If left unspecified, it will start the track at its beginning. Defaults to `0`,
which is the normal start time.
end_time: Optional[:class:`int`]
Settings that determines the number of milliseconds the track will stop playing.
By default, track plays until it ends as per encoded data. Defaults to `0`, which is
the normal end time.
no_replace: Optional[:class:`bool`]
If set to true, operation will be ignored if a track is already playing or paused.
Defaults to `False`
query: Optional[:class:`Query`]
The query that was used to search for the track.
requester: :class:`discord.Member`
The member that requested the track.
bypass_cache: Optional[:class:`bool`]
If set to true, the track will not be looked up in the cache. Defaults to `False`.
node: Optional[:class:`Node`]
The node to use. Defaults the best available node with the needed feature.
"""
# sourcery no-metrics
async with self.__playing_lock:
auto_play, payload = await self._on_play_reset()
if track is not None and isinstance(track, (Track, APITrack, dict, str, type(None))):
track = await Track.build_track(
node=self.node, data=track, query=query, requester=requester.id, player_instance=self
)
if self.current:
await self._process_repeat_on_play()
if self.current:
await self.history.put([self.current], discard=True)
self.last_track = self.current
self.current = None
if not track:
auto_play, track, returned = await self._process_play_no_track(auto_play, track)
if returned:
return
if await track.query() is None:
track._query = await Query.from_base64(track.encoded, lazy=True)
if node:
if self.node != node:
await self.change_node(node)
else:
try:
await self.change_to_best_node(feature=await track.requires_capability(), skip_position_fetch=True)
except NoNodeWithRequestFunctionalityAvailableException as exc:
await self._process_error_on_play(exc, track)
return
track._node = self.node
await self._process_partial_payload(end_time, payload, start_time, track)
if no_replace is None:
no_replace = False
if not isinstance(no_replace, bool):
raise TypeError("no_replace must be a bool")
if not track.encoded:
return await self.play(None, None, requester or self.bot.user, node=node)
self.current = track
self.next_track = None if self.queue.empty() else self.queue.raw_queue.popleft()
payload["encodedTrack"] = track.encoded
if self.volume_filter:
payload["volume"] = self.volume
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload, no_replace=no_replace)
if auto_play:
self.node.dispatch_event(TrackAutoPlayEvent(player=self, track=track))
async def _process_partial_payload(self, end_time, payload, start_time, track: Track):
if start_time or track.timestamp:
await self._process_payload_position(payload, start_time, track)
if end_time is not None:
await self._process_payload_end_time(end_time, payload, track)
async def _process_payload_end_time(self, end_time, payload, track: Track):
if not isinstance(end_time, int) or not 0 <= end_time <= self.timescale.reverse_position(
await track.duration()
):
raise ValueError(
"end_time must be an int with a value equal to, or greater than 0, and less than the track duration"
)
payload["endTime"] = int(end_time)
async def _process_payload_position(self, payload, start_time, track: Track):
start_time = start_time or track.timestamp
if not isinstance(start_time, int) or not 0 <= start_time <= self.timescale.reverse_position(
await track.duration()
):
raise ValueError(
"start_time must be an int with a value equal to, "
"or greater than 0, and less than the track duration"
)
payload["position"] = int(start_time or track.timestamp)
async def _process_partial_playlist(
self, track: Track, requester: int | discord.Member | None
) -> bool | list[Track]:
try:
tracks = await track.search_all(self, requester.id if requester else self.client.user.id)
if not tracks:
raise TrackNotFoundException(f"No tracks found for query {await track.query_identifier()}")
except TrackNotFoundException as exc:
if not track:
raise TrackNotFoundException from exc
await self._process_error_on_play(exc, track)
return False
return tracks
async def _on_play_reset(self):
payload = {}
self._last_update = 0
self._last_position = 0
self.position_timestamp = 0
self.paused = False
self.stopped = False
auto_play = False
self.next_track = None
self.last_track = None
return auto_play, payload
async def _process_play_no_track(self, auto_play, track):
if self.queue.empty():
if await self.autoplay_enabled() and (
available_tracks := await (await self.get_auto_playlist()).fetch_tracks()
):
auto_play, track = await self._process_autoplay_on_play(available_tracks)
else:
await self.stop(
requester=self.guild.get_member(self.node.node_manager.client.bot.user.id)
) # Also sets current to None.
self.history.clear()
self.last_track = None
self.node.dispatch_event(QueueEndEvent(self))
return auto_play, track, True
else:
track = await self.queue.get()
return auto_play, track, False
async def _process_error_on_play(self, exc: Exception, track: Track) -> None:
event = TrackExceptionEvent(
self,
track,
self.node,
event_object=TrackException(
op="event",
guildId=str(self.guild.id),
type="TrackExceptionEvent",
track=await track.fetch_full_track_data(),
exception=LavalinkException(cause=str(exc), message=str(exc), severity="suspicious"),
),
)
self.node.dispatch_event(event)
await self._handle_event(event)
async def _process_autoplay_on_play(self, available_tracks):
available_tracks = {track["encoded"]: track for track in available_tracks}
if tracks_not_in_history := list(set(available_tracks) - set(self.history.raw_b64s)):
track = await Track.build_track(
node=self.node,
data=available_tracks[random.choice(list(tracks_not_in_history))],
query=None,
requester=self.client.user.id,
player_instance=self,
)
else:
track = await Track.build_track(
node=self.node,
data=available_tracks[random.choice(list(available_tracks))],
query=None,
requester=self.client.user.id,
player_instance=self,
)
auto_play = True
self.next_track = None
return auto_play, track
async def _process_repeat_on_play(self):
if await self.config.fetch_repeat_current():
await self.add(self.current.requester_id, self.current)
elif await self.config.fetch_repeat_queue():
await self.add(self.current.requester_id, self.current, index=-1)
[docs]
async def resume(self, requester: discord.Member = None):
self._last_update = 0
self.stopped = False
self._last_position = 0
payload = {
"encodedTrack": self.current.encoded,
"position": int(self.current.last_known_position if self.current else await self.fetch_position()),
}
if self.volume_filter:
payload["volume"] = self.volume
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload, no_replace=False)
self.node.dispatch_event(PlayerResumedEvent(player=self, requester=requester or self.client.user.id))
[docs]
async def skip(self, requester: discord.Member) -> None:
"""Plays the next track in the queue, if any"""
previous_track = self.current
previous_position = await self.fetch_position()
# Send a Stop OP to clear the buffer for avoid a small continuation on playback after skip fires
payload = {"encodedTrack": None}
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload)
await self.next(requester=requester)
if previous_track:
self.node.dispatch_event(TrackSkippedEvent(self, requester, previous_track, previous_position))
[docs]
async def set_repeat(
self, op_type: Literal["current", "queue", "disable"], repeat: bool, requester: discord.Member
) -> None:
"""
Sets the player's repeat state.
Parameters
----------
repeat: :class:`bool`
Whether to repeat the player or not.
op_type: :class:`str`
The type of repeat to set. Can be either ``"current"`` or ``"queue"`` or ``disable``.
requester: :class:`discord.Member`
The member who requested the repeat change.
"""
current_after = current_before = await self.config.fetch_repeat_current()
queue_after = queue_before = await self.config.fetch_repeat_queue()
if op_type == "disable":
await self.config.update_repeat_current(False)
await self.config.update_repeat_queue(False)
queue_after = False
current_after = False
elif op_type == "current":
current_after = repeat
await self.config.update_repeat_current(current_after)
await self.config.update_repeat_queue(False)
elif op_type == "queue":
queue_after = repeat
await self.config.update_repeat_current(False)
await self.config.update_repeat_queue(queue_after)
else:
raise ValueError(f"op_type must be either 'current' or 'queue' or `disable` not `{op_type}`")
self.node.dispatch_event(
PlayerRepeatEvent(self, requester, op_type, queue_before, queue_after, current_before, current_after)
)
[docs]
async def set_shuffle(self, shuffle: bool) -> None:
"""
Sets the player's shuffle state.
Parameters
----------
shuffle: :class:`bool`
Whether to shuffle the player or not.
"""
if await self.player_manager.global_config.fetch_shuffle() is False:
return
await self.config.update_shuffle(shuffle)
[docs]
async def set_auto_shuffle(self, shuffle: bool) -> None:
"""
Sets the player's auto shuffle state.
Parameters
----------
shuffle: :class:`bool`
Whether to shuffle the player or not.
"""
if await self.player_manager.global_config.fetch_auto_shuffle() is False:
return
await self.config.update_auto_shuffle(shuffle)
[docs]
async def set_pause(self, pause: bool, requester: discord.Member) -> None:
"""
Sets the player's paused state.
Parameters
----------
pause: :class:`bool`
Whether to pause the player or not.
requester: :class:`discord.Member`
The member who requested the pause.
"""
payload = {"paused": pause}
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload)
self.paused = pause
self._was_alone_paused = False
if self.paused:
self.node.dispatch_event(PlayerPausedEvent(self, requester))
else:
self.node.dispatch_event(TrackResumedEvent(self, track=self.current, requester=requester))
[docs]
async def set_volume(self, vol: int | float | Volume, requester: discord.Member) -> None:
"""
Sets the player's volume
Note
----
A limit of 1000 is imposed by Lavalink. (This function also inforces a globally and server set limit.)
Parameters
----------
vol: :class:`int`
The new volume level.
requester: :class:`discord.Member`
The member who requested the volume change.
"""
max_volume = await self.player_manager.client.player_config_manager.get_max_volume(self.guild.id)
volume = max([min([vol, max_volume]), 0])
if volume == self.volume:
return
await self.config.update_volume(volume)
self._volume = Volume(volume)
payload = {"volume": volume}
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload)
self.node.dispatch_event(PlayerVolumeChangedEvent(self, requester, self.volume, volume))
[docs]
async def seek(self, position: float, requester: discord.Member, with_filter: bool = False) -> None:
"""
Seeks to a given position in the track.
Parameters
----------
position: :class:`int`
The new position to seek to in milliseconds.
with_filter: :class:`bool`
Whether to apply the filter or not.
requester: :class:`discord.Member`
The member who requested the seek.
"""
if self.current and await self.current.is_seekable():
if with_filter:
position = await self.fetch_position()
position = max([min([position, await self.current.duration()]), 0])
if self.timescale.changed:
position = self.timescale.reverse_position(position)
self.node.dispatch_event(
TrackSeekEvent(self, requester, self.current, before=await self.fetch_position(), after=position)
)
payload = {"position": int(position)}
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload)
self._last_update = time.time() * 1000
self._last_position = position
async def _handle_event(self, event) -> None:
"""
Handles the given event as necessary.
Parameters
----------
event: :class:`Event`
The event that will be handled.
"""
if event.node.identifier != self.node.identifier:
return
if (
isinstance(event, TrackEndEvent)
and event.reason == "finished"
or isinstance(event, TrackExceptionEvent)
or isinstance(event, TrackStuckEvent)
):
self.last_track = self.current
await self.next()
self.next_track = None if self.queue.empty() else self.queue.raw_queue.popleft()
async def _update_state(self, state: State) -> None:
"""
Updates the position of the player.
Parameters
----------
state: :class:`dict`
The state that is given to update.
"""
self._last_update = time.time() * 1000
self._last_position = state.position
self.position_timestamp = state.time
self._ping = state.ping
if self.current:
self.current.last_known_position = self._last_position
event = PlayerUpdateEvent(self, self._last_position, self.position_timestamp)
self.node.dispatch_event(event)
[docs]
async def change_node(
self, node: Node, ops: bool = True, forced: bool = False, skip_position_fetch: bool = False
) -> None:
"""
Changes the player's node
Parameters
----------
node: :class:`Node`
The node the player is changed to.
ops: :class:`bool`
Whether to change apply the volume and filter ops on change.
forced: :class:`bool`
Whether to force the change
skip_position_fetch: :class:`bool`
Whether to skip the position fetch.
"""
if node == self.node and self.node.available and ops and not forced:
return
payload = {}
old_node = self.node
position = await self.fetch_position(skip_fetch=skip_position_fetch)
if self.timescale.changed:
position = self.timescale.reverse_position(position)
self.node = node
await asyncio.wait_for(self._waiting_for_node.wait(), timeout=None)
await node.websocket.wait_until_ready()
if old_node.available and node.session_id != old_node.session_id:
await old_node.delete_session_player(self.guild.id)
if self._voice_state:
await self._dispatch_voice_update()
if node.session_id != old_node.session_id and self.node.supports_sponsorblock:
await self.add_sponsorblock_categories()
if ops:
if self.current:
payload = {"encodedTrack": self.current.encoded, "position": int(position)}
if self.paused:
payload["paused"] = self.paused
self._last_update = time.time() * 1000
if self.has_effects:
payload["filters"] = node.get_filter_payload(
player=self,
equalizer=self.equalizer,
karaoke=self.karaoke,
timescale=self.timescale,
tremolo=self.tremolo,
vibrato=self.vibrato,
rotation=self.rotation,
distortion=self.distortion,
low_pass=self.low_pass,
channel_mix=self.channel_mix,
pluginFilters=dict(echo=self.echo),
)
if self.volume_filter:
payload["volume"] = self.volume
if old_node.identifier != node.identifier:
node.dispatch_event(NodeChangedEvent(self, old_node, node))
if payload:
await node.patch_session_player(guild_id=self.guild.id, payload=payload)
[docs]
async def connect(
self,
*,
timeout: float = 2.0,
reconnect: bool = False,
self_mute: bool = False,
self_deaf: bool = True,
requester: discord.Member = None,
) -> None:
"""
Connects the player to the voice channel.
Parameters
----------
timeout: :class:`float`
The timeout for the connection.
reconnect: :class:`bool`
Whether the player should reconnect if the connection is lost.
self_mute: :class:`bool`
Whether the player should be muted.
self_deaf: :class:`bool`
Whether the player should be deafened.
requester: :class:`discord.Member`
The member requesting the connection.
"""
async with self.__reconnect_lock:
await self.guild.change_voice_state(
channel=self.channel,
self_mute=self_mute,
self_deaf=deaf if (deaf := await self.self_deaf()) is True else self_deaf,
)
self._connected = True
self.connected_at = get_now_utc()
self._logger.debug(
"Connected to voice channel"
if self.guild.me not in self.channel.members
else "Reconnected to voice channel"
)
[docs]
async def reconnect(self):
if self.__reconnect_lock.locked():
return
async with self.__reconnect_lock:
self._waiting_for_node.clear()
shard = self.bot.get_shard(self.guild.shard_id)
while shard.is_closed():
await asyncio.sleep(1)
await self.guild.change_voice_state(
channel=None,
self_mute=False,
self_deaf=False,
)
await self.guild.change_voice_state(
channel=self.channel,
self_mute=False,
self_deaf=await self.self_deaf(),
)
self._connected = True
self.connected_at = get_now_utc()
await asyncio.wait_for(self._waiting_for_node.wait(), timeout=None)
await self.change_to_best_node(forced=True, skip_position_fetch=True)
self._logger.debug("Reconnected to voice channel")
[docs]
async def disconnect(
self, *, force: bool = False, requester: discord.Member | None, maybe_resuming: bool = False
) -> None:
try:
if self.is_active:
await self.save()
if (not maybe_resuming) and self.node.can_resume and self.channel_id is not None:
await self.guild.change_voice_state(channel=None)
self.node.dispatch_event(PlayerDisconnectedEvent(self, requester))
self._logger.debug("Disconnected from voice channel")
finally:
self._connected = False
self.queue.clear()
self.history.clear()
self.last_track = None
self.next_track = None
self.stopped = True
self.current = None
with contextlib.suppress(ValueError):
await self.player_manager.remove(self.channel.guild.id)
if not maybe_resuming:
await self.node.delete_session_player(self.guild.id)
with contextlib.suppress(JobLookupError):
self.player_manager.client.scheduler.remove_job(
job_id=f"{self.bot.user.id}-{self.guild.id}-auto_dc_task"
)
with contextlib.suppress(JobLookupError):
self.player_manager.client.scheduler.remove_job(
job_id=f"{self.bot.user.id}-{self.guild.id}-auto_empty_queue_task"
)
with contextlib.suppress(JobLookupError):
self.player_manager.client.scheduler.remove_job(
job_id=f"{self.bot.user.id}-{self.guild.id}-auto_pause_task"
)
with contextlib.suppress(JobLookupError):
self.player_manager.client.scheduler.remove_job(
job_id=f"{self.bot.user.id}-{self.guild.id}-auto_save_task"
)
self.cleanup()
[docs]
async def stop(self, requester: discord.Member) -> None:
"""Stops the player"""
payload = {"encodedTrack": None}
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload)
self.node.dispatch_event(PlayerStoppedEvent(self, requester))
self.current = None
self.queue.clear()
self.next_track = None
self.stopped = True
await self.player_manager.client.player_state_db_manager.delete_player(self.channel.guild.id)
[docs]
async def move_to(
self,
requester: discord.Member,
channel: discord.channel.VocalGuildChannel,
self_mute: bool = False,
self_deaf: bool = True,
) -> discord.channel.VocalGuildChannel | None:
"""|coro|
Moves the player to a different voice channel.
Parameters
-----------
channel: :class:`discord.channel.VocalGuildChannel`
The channel to move to. Must be a voice channel.
self_mute: :class:`bool`
Indicates if the player should be self-muted on move.
self_deaf: :class:`bool`
Indicates if the player should be self-deafened on move.
requester: :class:`discord.Member`
The member requesting to move the player.
"""
if self.config and (vc := await self.forced_vc()) and channel.id != vc.id: # noqa
channel = vc
self._logger.debug("Player has a forced VC enabled replacing channel arg with it")
if channel == self.channel:
return
old_channel = self.channel
self._logger.debug("Moving from %s to voice channel: %s", self.channel.id, channel.id)
self.channel = channel
self_deaf = deaf if (deaf := await self.self_deaf()) is True else self_deaf
if self.guild.me not in self.channel.members:
await self.guild.change_voice_state(channel=self.channel, self_mute=self_mute, self_deaf=self_deaf)
self._connected = True
self.node.dispatch_event(PlayerMovedEvent(self, requester, old_channel, self.channel))
return channel
[docs]
async def self_deafen(self, toggle: bool) -> None:
"""|coro|
Deafens the player.
Parameters
-----------
toggle: :class:`bool`
Indicates if the player should be deafened.
"""
await self.config.update_self_deaf(toggle)
await self.guild.change_voice_state(self_deaf=toggle, channel=self.channel)
[docs]
async def set_volume_filter(self, requester: discord.Member, volume: Volume) -> None:
"""
Sets the volume of Lavalink.
Parameters
----------
volume : Volume
Volume to set
requester : discord.Member
Raises
------
ValueError
If the volume is not between 0 and 1000
NodeHasNoFiltersException
If the node does not have specified filter enabled
"""
if not self.node.has_filter("volume"):
raise NodeHasNoFiltersException(_("Current node has the volume filter feature disabled."))
max_volume = await self.player_manager.client.player_config_manager.get_max_volume(self.guild.id)
if volume.get_int_value() > max_volume:
volume = Volume(max_volume)
await self.set_filters(
volume=volume,
requester=requester,
)
[docs]
async def set_equalizer(self, requester: discord.Member, equalizer: Equalizer, forced: bool = False) -> None:
"""
Sets the Equalizer of Lavalink.
Parameters
----------
equalizer : Equalizer
Equalizer to set
forced : bool
Whether to force the equalizer to be set resetting any other filters currently applied
requester : discord.Member
The member who requested the equalizer to be set
"""
if not self.node.has_filter("equalizer"):
raise NodeHasNoFiltersException(_("Current node has the equalizer feature disabled."))
await self.set_filters(
equalizer=equalizer,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_karaoke(self, requester: discord.Member, karaoke: Karaoke, forced: bool = False) -> None:
"""
Sets the Karaoke of Lavalink.
Parameters
----------
karaoke : Karaoke
Karaoke to set
forced : bool
Whether to force the karaoke to be set resetting any other filters currently applied
requester: discord.Member
The member who requested the karaoke
"""
if not self.node.has_filter("karaoke"):
raise NodeHasNoFiltersException(_("Current node has the karaoke feature disabled"))
await self.set_filters(
karaoke=karaoke,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_timescale(self, requester: discord.Member, timescale: Timescale, forced: bool = False) -> None:
"""
Sets the Timescale of Lavalink.
Parameters
----------
timescale : Timescale
Timescale to set
forced : bool
Whether to force the timescale to be set resetting any other filters currently applied
requester: discord.Member
The member who requested the timescale
"""
if not self.node.has_filter("timescale"):
raise NodeHasNoFiltersException(_("Current node has the timescale feature disabled."))
await self.set_filters(
timescale=timescale,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_tremolo(self, requester: discord.Member, tremolo: Tremolo, forced: bool = False) -> None:
"""
Sets the Tremolo of Lavalink.
Parameters
----------
tremolo : Tremolo
Tremolo to set
forced : bool
Whether to force the tremolo to be set resetting any other filters currently applied
requester: discord.Member
The member who requested the tremolo
"""
if not self.node.has_filter("tremolo"):
raise NodeHasNoFiltersException(_("Current node has the tremolo feature disabled."))
await self.set_filters(
tremolo=tremolo,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_vibrato(self, requester: discord.Member, vibrato: Vibrato, forced: bool = False) -> None:
"""
Sets the Vibrato of Lavalink.
Parameters
----------
vibrato : Vibrato
Vibrato to set
forced : bool
Whether to force the vibrato to be set resetting any other filters currently applied
requester: discord.Member
The member who requested the vibrato
"""
if not self.node.has_filter("vibrato"):
raise NodeHasNoFiltersException(_("Current node has the vibrato feature disabled."))
await self.set_filters(
vibrato=vibrato,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_rotation(self, requester: discord.Member, rotation: Rotation, forced: bool = False) -> None:
"""
Sets the Rotation of Lavalink.
Parameters
----------
rotation : Rotation
Rotation to set
forced : bool
Whether to force the rotation to be set resetting any other filters currently applied
requester: discord.Member
The member who requested the rotation
"""
if not self.node.has_filter("rotation"):
raise NodeHasNoFiltersException(_("Current node has the rotation feature disabled."))
await self.set_filters(
rotation=rotation,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_distortion(self, requester: discord.Member, distortion: Distortion, forced: bool = False) -> None:
"""
Sets the Distortion of Lavalink.
Parameters
----------
distortion : Distortion
Distortion to set
forced : bool
Whether to force the distortion to be set resetting any other filters currently applied
requester: discord.Member
The member who requested the distortion
"""
if not self.node.has_filter("distortion"):
raise NodeHasNoFiltersException(_("Current node has the distortion feature disabled."))
await self.set_filters(
distortion=distortion,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_low_pass(self, requester: discord.Member, low_pass: LowPass, forced: bool = False) -> None:
"""
Sets the LowPass of Lavalink.
Parameters
----------
low_pass : LowPass
LowPass to set
forced : bool
Whether to force the low_pass to be set resetting any other filters currently applied
requester : discord.Member
Member who requested the filter change
"""
if not self.node.has_filter("lowPass"):
raise NodeHasNoFiltersException(_("Current node has the low-pass feature disabled."))
await self.set_filters(
low_pass=low_pass,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_echo(self, requester: discord.Member, echo: Echo, forced: bool = False) -> None:
"""
Sets the Echo of Lavalink.
Parameters
----------
echo : Echo
Echo to set
forced : bool
Whether to force the low_pass to be set resetting any other filters currently applied
requester : discord.Member
Member who requested the filter change
"""
if not self.node.has_filter("echo"):
raise NodeHasNoFiltersException(_("Current node has the echo feature disabled."))
await self.set_filters(
echo=echo,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_reverb(self, requester: discord.Member, reverb: Reverb, forced: bool = False) -> None:
"""
Sets the Reverb of Lavalink.
Parameters
----------
reverb : Reverb
Reverb to set
forced : bool
Whether to force the low_pass to be set resetting any other filters currently applied
requester : discord.Member
Member who requested the filter change
"""
if not self.node.has_filter("reverb"):
raise NodeHasNoFiltersException(_("Current node has the reverb feature disabled."))
await self.set_filters(
reverb=reverb,
reset_not_set=forced,
requester=requester,
)
[docs]
async def apply_nightcore(self, requester: discord.Member) -> None:
"""
Applies the NightCore filter to the player.
Parameters
----------
requester : discord.Member
Member who requested the filter change
"""
if not self.node.has_filter("equalizer"):
raise NodeHasNoFiltersException(_("Current node has the equalizer feature disabled."))
if not self.node.has_filter("timescale"):
raise NodeHasNoFiltersException(_("Current node has the timescale feature disabled."))
await self.set_filters(
requester=requester,
low_pass=None,
equalizer=Equalizer(
levels=[
{"band": 0, "gain": -0.075},
{"band": 1, "gain": 0.125},
{"band": 2, "gain": 0.125},
],
name="Nightcore",
),
karaoke=self.karaoke or None,
tremolo=self.tremolo or None,
vibrato=self.vibrato or None,
distortion=self.distortion or None,
timescale=Timescale(speed=1.0, pitch=0.95, rate=1.3),
channel_mix=self.channel_mix or None,
echo=self.echo or None,
reset_not_set=True,
)
[docs]
async def remove_nightcore(self, requester: discord.Member) -> None:
"""
Removes the NightCore filter from the player.
Parameters
----------
requester : discord.Member
Member who requested the filter change
"""
await self.set_filters(
requester=requester,
low_pass=self.low_pass or None,
equalizer=None,
timescale=None,
reset_not_set=True,
karaoke=self.karaoke or None,
tremolo=self.tremolo or None,
vibrato=self.vibrato or None,
distortion=self.distortion or None,
channel_mix=self.channel_mix or None,
echo=self.echo or None,
)
[docs]
async def apply_vaporwave(self, requester: discord.Member) -> None:
"""
Applies the Vaporwave filter to the player.
Parameters
----------
requester : discord.Member
Member who requested the filter change
"""
if not self.node.has_filter("equalizer"):
raise NodeHasNoFiltersException(_("Current node has the equalizer feature disabled."))
if not self.node.has_filter("timescale"):
raise NodeHasNoFiltersException(_("Current node has the timescale feature disabled."))
await self.set_filters(
requester=requester,
low_pass=None,
equalizer=Equalizer(
levels=[
{"band": 0, "gain": 0.25},
{"band": 1, "gain": 0.2},
{"band": 2, "gain": 0.2},
],
name="Vaporwave",
),
karaoke=self.karaoke or None,
tremolo=self.tremolo or None,
vibrato=self.vibrato or None,
distortion=self.distortion or None,
timescale=Timescale(speed=1.0, pitch=1.0, rate=0.7),
channel_mix=self.channel_mix or None,
echo=self.echo or None,
reset_not_set=True,
)
[docs]
async def remove_vaporwave(self, requester: discord.Member) -> None:
"""
Removes the Vaporwave filter from the player.
Parameters
----------
requester : discord.Member
Member who requested the filter change
"""
await self.set_filters(
requester=requester,
low_pass=self.low_pass or None,
equalizer=None,
timescale=None,
reset_not_set=True,
karaoke=self.karaoke or None,
tremolo=self.tremolo or None,
vibrato=self.vibrato or None,
distortion=self.distortion or None,
channel_mix=self.channel_mix or None,
echo=self.echo or None,
)
[docs]
async def set_channel_mix(self, requester: discord.Member, channel_mix: ChannelMix, forced: bool = False) -> None:
"""
Sets the ChannelMix of Lavalink.
Parameters
----------
channel_mix : ChannelMix
ChannelMix to set
forced : bool
Whether to force the channel_mix to be set resetting any other filters currently applied
requester : discord.Member
The member who requested the channel_mix
"""
if not self.node.has_filter("channelMix"):
raise NodeHasNoFiltersException(_("Current node has the channel-mix feature disabled."))
await self.set_filters(
channel_mix=channel_mix,
reset_not_set=forced,
requester=requester,
)
[docs]
async def set_filters(
self,
*,
requester: discord.Member,
volume: Volume = None,
equalizer: Equalizer = None,
karaoke: Karaoke = None,
timescale: Timescale = None,
tremolo: Tremolo = None,
vibrato: Vibrato = None,
rotation: Rotation = None,
distortion: Distortion = None,
low_pass: LowPass = None,
channel_mix: ChannelMix = None,
echo: Echo = None,
reset_not_set: bool = False,
): # sourcery skip: low-code-quality
"""
Sets the filters of Lavalink.
Parameters
----------
volume : Volume
Volume to set
equalizer : Equalizer
Equalizer to set
karaoke : Karaoke
Karaoke to set
timescale : Timescale
Timescale to set
tremolo : Tremolo
Tremolo to set
vibrato : Vibrato
Vibrato to set
rotation : Rotation
Rotation to set
distortion : Distortion
Distortion to set
low_pass : LowPass
LowPass to set
channel_mix : ChannelMix
ChannelMix to set
echo: Echo
Echo to set
reset_not_set : bool
Whether to reset any filters that are not set
requester : discord.Member
Member who requested the filters to be set
"""
if volume and not self.node.has_filter("volume"):
volume = None
if equalizer and not self.node.has_filter("equalizer"):
equalizer = None
if karaoke and not self.node.has_filter("karaoke"):
karaoke = None
if timescale and not self.node.has_filter("timescale"):
timescale = None
if tremolo and not self.node.has_filter("tremolo"):
tremolo = None
if vibrato and not self.node.has_filter("vibrato"):
vibrato = None
if rotation and not self.node.has_filter("rotation"):
rotation = None
if distortion and not self.node.has_filter("distortion"):
distortion = None
if low_pass and not self.node.has_filter("lowPass"):
low_pass = None
if channel_mix and not self.node.has_filter("channelMix"):
channel_mix = None
if echo and not self.node.has_filter("echo"):
echo = None
changed = await self._set_filter_variables(
False,
channel_mix,
distortion,
echo,
equalizer,
karaoke,
low_pass,
rotation,
timescale,
tremolo,
vibrato,
volume,
)
self._effect_enabled = changed
if reset_not_set:
kwargs = await self._process_filters_reset_not_set(
channel_mix,
distortion,
echo,
equalizer,
karaoke,
low_pass,
rotation,
timescale,
tremolo,
vibrato,
volume,
)
else:
kwargs = {
"volume": volume or self.volume_filter or None,
"equalizer": equalizer or self.equalizer or None,
"karaoke": karaoke or self.karaoke or None,
"timescale": timescale or self.timescale or None,
"tremolo": tremolo or self.tremolo or None,
"vibrato": vibrato or self.vibrato or None,
"rotation": rotation or self.rotation or None,
"distortion": distortion or self.distortion or None,
"low_pass": low_pass or self.low_pass or None,
"channel_mix": channel_mix or self.channel_mix or None,
"pluginFilters": {
"echo": echo or self.echo or None,
},
}
if not volume:
kwargs.pop("volume", None)
position = await self.fetch_position()
if self.timescale.changed:
position = self.timescale.reverse_position(position)
payload = {
"filters": self.node.get_filter_payload(
player=self,
reset_no_set=reset_not_set,
**kwargs,
),
"position": int(position),
}
await self.node.patch_session_player(self.guild.id, payload=payload)
kwargs.pop("reset_not_set", None)
kwargs.pop("requester", None)
self.node.dispatch_event(FiltersAppliedEvent(player=self, requester=requester, node=self.node, **kwargs))
async def _process_filters_reset_not_set(
self,
channel_mix,
distortion,
echo,
equalizer,
karaoke,
low_pass,
rotation,
timescale,
tremolo,
vibrato,
volume,
reverb,
):
kwargs = {
"volume": volume or self.volume_filter,
"equalizer": equalizer,
"karaoke": karaoke,
"timescale": timescale,
"tremolo": tremolo,
"vibrato": vibrato,
"rotation": rotation,
"distortion": distortion,
"low_pass": low_pass,
"channel_mix": channel_mix,
"pluginFilters": {"echo": echo, "reverb": reverb},
}
if not equalizer:
self._equalizer = self._equalizer.default()
if not karaoke:
self._karaoke = self._karaoke.default()
if not timescale:
self._timescale = self._timescale.default()
if not tremolo:
self._tremolo = self._tremolo.default()
if not vibrato:
self._vibrato = self._vibrato.default()
if not rotation:
self._rotation = self._rotation.default()
if not distortion:
self._distortion = self._distortion.default()
if not low_pass:
self._low_pass = self._low_pass.default()
if not channel_mix:
self._channel_mix = self._channel_mix.default()
if not echo:
self._echo = self._echo.default()
if not reverb:
self._reverb = self._reverb.default()
return kwargs
async def _set_filter_variables(
self,
changed,
channel_mix,
distortion,
echo,
equalizer,
karaoke,
low_pass,
rotation,
timescale,
tremolo,
vibrato,
volume,
reverb,
):
if volume and self.node.has_filter("volume"):
self._volume = volume
if equalizer and self.node.has_filter("equalizer"):
self._equalizer = equalizer
changed = True
if karaoke and self.node.has_filter("karaoke"):
self._karaoke = karaoke
changed = True
if timescale and self.node.has_filter("timescale"):
self._timescale = timescale
changed = True
if tremolo and self.node.has_filter("tremolo"):
self._tremolo = tremolo
changed = True
if vibrato and self.node.has_filter("vibrato"):
self._vibrato = vibrato
changed = True
if rotation and self.node.has_filter("rotation"):
self._rotation = rotation
changed = True
if distortion and self.node.has_filter("distortion"):
self._distortion = distortion
changed = True
if low_pass and self.node.has_filter("low_pass"):
self._low_pass = low_pass
changed = True
if channel_mix and self.node.has_filter("channel_mix"):
self._channel_mix = channel_mix
changed = True
if echo and self.node.has_filter("echo"):
self._echo = echo
changed = True
if reverb and self.node.has_filter("reverb"):
self._reverb = reverb
changed = True
return changed
@staticmethod
async def _process_skip_segments() -> list[str]:
return SegmentCategory.get_category_list_value()
[docs]
async def draw_time(self) -> str:
paused = self.paused
position = await self.fetch_position()
duration = None if not self.current else await self.current.duration()
dur = duration or position
sections = 12
loc_time = round((position / dur if dur != 0 else position) * sections)
bar = "\N{BOX DRAWINGS HEAVY HORIZONTAL}"
seek = "\N{RADIO BUTTON}"
msg = (
"\N{DOUBLE VERTICAL BAR}\N{VARIATION SELECTOR-16}"
if paused
else "\N{BLACK RIGHT-POINTING TRIANGLE}\N{VARIATION SELECTOR-16}"
)
for i in range(sections):
msg += seek if i == loc_time else bar
return msg
[docs]
async def get_currently_playing_message(
self,
embed: bool = True,
messageable: Messageable | DISCORD_INTERACTION_TYPE = None,
progress: bool = True,
show_help: bool = False,
) -> dict[str, discord.Embed | str | discord.File]: # sourcery skip: use-fstring-for-formatting
if not embed:
return {"content": ""}
queue_list = ""
if not progress:
arrow = ""
pos = ""
else:
arrow = await self.draw_time()
position = await self.fetch_position()
pos = format_time_dd_hh_mm_ss(position)
current = self.current
dur = (
None
if current is None
else _("LIVE")
if await current.stream()
else format_time_dd_hh_mm_ss(await current.duration())
)
if self.timescale.changed:
dur += "*"
pos += "*"
current_track_description = await current.get_track_display_name(with_url=True) if current else None
next_track_description = (
await self.next_track.get_track_display_name(with_url=True) if self.next_track else None
)
previous_track_description = (
await self.last_track.get_track_display_name(with_url=True) if self.last_track else None
)
queue_list = await self._process_np_embed_initial_description(
arrow, current, current_track_description, dur, pos, queue_list, progress
)
page = await self.node.node_manager.client.construct_embed(
title=discord.utils.escape_markdown(
_("Now Playing in {server_name_variable_do_not_translate}").format(
server_name_variable_do_not_translate=self.guild.name
)
),
description=queue_list,
messageable=messageable,
)
if current and (url := await current.artworkUrl()):
page.set_thumbnail(url=url)
await self._process_np_embed_prev_track(page, previous_track_description)
await self._process_np_embed_next_track(next_track_description, page)
await self._process_now_playing_embed_footer(page, show_help)
kwargs = {"embed": page}
if current and (artwork := await current.get_embedded_artwork()):
kwargs["file"] = artwork
return kwargs
@staticmethod
async def _process_np_embed_initial_description(
arrow, current, current_track_description, dur, pos, queue_list, progress
):
if current is None:
return queue_list
# sourcery skip: use-fstring-for-formatting
if await current.stream():
queue_list += "**{}:**\n".format(discord.utils.escape_markdown(_("Currently livestreaming")))
else:
queue_list += _("Playing: ")
queue_list += f"{current_track_description}\n"
queue_list += "{translation}: **{current}**".format(
current=current.requester.mention, translation=discord.utils.escape_markdown(_("Requester"))
)
if progress:
queue_list += f"\n\n{arrow}`{pos}`/`{dur}`\n\n"
else:
queue_list += "\n{dur}\n\n\n".format(
dur=_("Duration: {track_duration_variable_do_not_translate}").format(
track_duration_variable_do_not_translate=f"`{dur}`"
)
)
return queue_list
async def _process_np_embed_prev_track(self, page, previous_track_description):
if previous_track_description:
val = f"{previous_track_description}\n"
val += "{translation}: `{duration}`\n".format(
duration=shorten_string(max_length=100, string=_("LIVE"))
if await self.last_track.stream()
else (
format_time_dd_hh_mm_ss(await self.last_track.duration()) + ("*" if self.timescale.changed else "")
),
translation=discord.utils.escape_markdown(_("Duration")),
)
if rq := self.last_track.requester:
val += "{translation}: **{rq}**\n\n".format(
rq=rq.mention, translation=discord.utils.escape_markdown(_("Requester"))
)
page.add_field(name=_("Previous Track"), value=val)
async def _process_np_embed_next_track(self, next_track_description, page):
if next_track_description:
val = f"{next_track_description}\n"
val += "{translation}: `{duration}`\n".format(
duration=_("LIVE")
if await self.next_track.stream()
else format_time_dd_hh_mm_ss(await self.next_track.duration())
+ ("*" if self.timescale.changed else ""),
translation=discord.utils.escape_markdown(_("Duration")),
)
if rq := self.next_track.requester:
val += "{translation}: **{rq}**\n\n".format(
rq=rq.mention, translation=discord.utils.escape_markdown(_("Requester"))
)
page.add_field(name=_("Next Track"), value=val)
async def _process_now_playing_embed_footer(self, page, show_help):
queue_dur = await self.queue_duration()
queue_total_duration = format_time_string(queue_dur // 1000)
if self.timescale.changed:
queue_total_duration += "*"
track_count = self.queue.qsize()
match track_count:
case 1:
text = _("1 track, {queue_total_duration_variable_do_not_translate} remaining\n").format(
queue_total_duration_variable_do_not_translate=queue_total_duration
)
case 0:
text = _("0 tracks, {queue_total_duration_variable_do_not_translate} remaining\n").format(
queue_total_duration_variable_do_not_translate=queue_total_duration
)
case __:
text = _(
"{track_count_variable_do_not_translate} tracks, {queue_total_duration_variable_do_not_translate} remaining\n"
).format(
track_count_variable_do_not_translate=track_count,
queue_total_duration_variable_do_not_translate=queue_total_duration,
)
autoplay_emoji, repeat_emoji, filter_emoji = await self._process_embed_emojis()
text += "{translation}: {repeat_emoji}".format(repeat_emoji=repeat_emoji, translation=_("Repeating"))
text += "{space}{translation}: {autoplay_emoji}".format(
space=(" | " if text else ""), autoplay_emoji=autoplay_emoji, translation=_("Auto Play")
)
text += "{space}{translation}: {filter_emoji}".format(
space=(" | " if text else ""), filter_emoji=filter_emoji, translation=_("Effects")
)
text += "{space}{translation}: {volume}".format(
space=(" | " if text else ""),
volume=_("{volume_variable_do_not_translate}%").format(volume_variable_do_not_translate=self.volume),
translation=_("Volume"),
)
if show_help:
text += _(
"\n\nYou can search specific services by using the following prefixes:\n"
"{deezer_service_variable_do_not_translate} - Deezer\n"
"{spotify_service_variable_do_not_translate} - Spotify\n"
"{apple_music_service_variable_do_not_translate} - Apple Music\n"
"{youtube_music_service_variable_do_not_translate} - YouTube Music\n"
"{youtube_service_variable_do_not_translate} - YouTube\n"
"{soundcloud_service_variable_do_not_translate} - SoundCloud\n"
"{yandex_music_service_variable_do_not_translate} - Yandex Music\n"
"Example: {example_variable_do_not_translate}.\n\n"
"If no prefix is used I will default to {fallback_service_variable_do_not_translate}\n"
).format(
fallback_service_variable_do_not_translate=f"`{DEFAULT_SEARCH_SOURCE}:`",
deezer_service_variable_do_not_translate="'dzsearch:' ",
spotify_service_variable_do_not_translate="'spsearch:' ",
apple_music_service_variable_do_not_translate="'amsearch:' ",
youtube_music_service_variable_do_not_translate="'ytmsearch:'",
youtube_service_variable_do_not_translate="'ytsearch:' ",
soundcloud_service_variable_do_not_translate="'scsearch:' ",
yandex_music_service_variable_do_not_translate="'ymsearch:' ",
example_variable_do_not_translate=f"'{DEFAULT_SEARCH_SOURCE}:Hello Adele'",
)
page.set_footer(text=text)
async def _process_embed_emojis(self):
if not await self.is_repeating():
repeat_emoji = "\N{CROSS MARK}"
elif await self.config.fetch_repeat_queue():
repeat_emoji = "\N{CLOCKWISE RIGHTWARDS AND LEFTWARDS OPEN CIRCLE ARROWS}"
else:
repeat_emoji = "\N{CLOCKWISE RIGHTWARDS AND LEFTWARDS OPEN CIRCLE ARROWS WITH CIRCLED ONE OVERLAY}"
autoplay_emoji = "\N{WHITE HEAVY CHECK MARK}" if await self.autoplay_enabled() else "\N{CROSS MARK}"
filter_emoji = "\N{WHITE HEAVY CHECK MARK}" if self.has_effects else "\N{CROSS MARK}"
return autoplay_emoji, repeat_emoji, filter_emoji
[docs]
async def get_queue_page(
self,
page_index: int,
per_page: int,
total_pages: int,
embed: bool = True,
messageable: Messageable | DISCORD_INTERACTION_TYPE = None,
history: bool = False,
) -> dict[str, discord.Embed | str | discord.File]:
if not embed:
return {"content": ""}
queue = self.history if history else self.queue
queue_list = ""
start_index = page_index * per_page
end_index = start_index + per_page
tracks = list(islice(queue.raw_queue, start_index, end_index))
arrow = await self.draw_time()
position = await self.fetch_position()
pos = format_time_dd_hh_mm_ss(position)
current = self.current
dur = (
None
if current is None
else _("LIVE")
if await current.stream()
else format_time_dd_hh_mm_ss(await current.duration())
)
if self.timescale.changed:
pos += "*"
dur += "*"
queue_list = await self._process_queue_embed_initial_description(arrow, current, dur, pos, queue_list)
queue_list = await self._process_queue_embed_maybe_shuffle(history, queue_list, tracks)
queue_list = await self._process_queue_tracks(history, queue_list, start_index, tracks)
if history:
title = discord.utils.escape_markdown(
_("Recently Played for {server_name_variable_do_not_translate}").format(
server_name_variable_do_not_translate=self.guild.name
)
)
else:
title = discord.utils.escape_markdown(
_("Queue for {server_name_variable_do_not_translate}").format(
server_name_variable_do_not_translate=self.guild.name
)
)
page = await self.node.node_manager.client.construct_embed(
title=title,
description=queue_list,
messageable=messageable,
)
if current and (url := await current.artworkUrl()):
page.set_thumbnail(url=url)
queue_dur = await self.queue_duration(history=history)
queue_total_duration = format_time_string(queue_dur // 1000)
if self.timescale.changed:
queue_total_duration += "*"
await self._process_queue_embed_footer(page, page_index, queue, queue_total_duration, total_pages)
kwargs = {"embed": page}
if current and (artwork := await current.get_embedded_artwork()):
kwargs["file"] = artwork
return kwargs
@staticmethod
async def _process_queue_embed_initial_description(arrow, current, dur, pos, queue_list):
if current is None:
return queue_list
current_track_description = await current.get_track_display_name(with_url=True)
if await current.stream():
queue_list += "**{translation}:**\n".format(
translation=discord.utils.escape_markdown(_("Currently livestreaming"))
)
else:
queue_list += "{translation}: ".format(translation=discord.utils.escape_markdown(_("Playing")))
queue_list += f"{current_track_description}\n"
queue_list += "{translation}: **{current}**".format(
current=current.requester.mention, translation=discord.utils.escape_markdown(_("Requester"))
)
queue_list += f"\n\n{arrow}`{pos}`/`{dur}`\n\n"
return queue_list
async def _process_queue_embed_maybe_shuffle(self, history, queue_list, tracks):
if (
len(tracks)
and not history
and (await self.player_manager.client.player_config_manager.get_auto_shuffle(self.guild.id)) is True
):
queue_list += "__{translation}__\n\n".format(
translation=discord.utils.escape_markdown(
_("Queue order will change every time a track is added due to auto shuffle being enabled.")
)
)
return queue_list
async def _process_queue_embed_footer(self, page, page_index, queue, queue_total_duration, total_pages):
track_number = queue.qsize()
match track_number:
case 1:
text = _("Page 1 / 1 | 1 track, {queue_total_duration_variable_do_not_translate} remaining\n").format(
queue_total_duration_variable_do_not_translate=queue_total_duration,
)
case 0:
text = _("Page 1 / 1 | 0 tracks, {queue_total_duration_variable_do_not_translate} remaining\n").format(
queue_total_duration_variable_do_not_translate=queue_total_duration,
)
case __:
text = _(
"Page {current_page_variable_do_not_translate} / {total_pages_variable_do_not_translate} | {track_number_variable_do_not_translate} tracks, {queue_total_duration_variable_do_not_translate} remaining\n"
).format(
current_page_variable_do_not_translate=page_index + 1,
total_pages_variable_do_not_translate=total_pages,
track_number_variable_do_not_translate=track_number,
queue_total_duration_variable_do_not_translate=queue_total_duration,
)
autoplay_emoji, repeat_emoji, filter_emoji = await self._process_embed_emojis()
text += "{translation}: {repeat_emoji}".format(repeat_emoji=repeat_emoji, translation=_("Repeating"))
text += "{space}{translation}: {autoplay_emoji}".format(
space=(" | " if text else ""), autoplay_emoji=autoplay_emoji, translation=_("Auto Play")
)
text += "{space}{translation}: {filter_emoji}".format(
space=(" | " if text else ""), filter_emoji=filter_emoji, translation=_("Effects")
)
text += "{space}{translation}: {volume}".format(
space=(" | " if text else ""),
volume=_("{volume_variable_do_not_translate}%").format(volume_variable_do_not_translate=self.volume),
translation=_("Volume"),
)
page.set_footer(text=text)
async def _process_queue_tracks(self, history, queue_list, start_index, tracks):
if tracks:
padding = len(str(start_index + len(tracks)))
for track_idx, track in enumerate(tracks, start=start_index + 1):
queue_list = await self._process_single_queue_track(history, padding, queue_list, track, track_idx)
return queue_list
@staticmethod
async def _process_single_queue_track(history, padding, queue_list, track, track_idx):
track_description = await track.get_track_display_name(max_length=50, with_url=True)
diff = padding - len(str(track_idx))
queue_list += f"`{track_idx}.{' ' * diff}` {track_description}"
if history and track.requester:
queue_list += f" - **{track.requester.mention}**"
queue_list += "\n"
return queue_list
[docs]
async def queue_duration(self, history: bool = False) -> int:
queue = self.history if history else self.queue
dur = [await track.duration() for track in queue.raw_queue if not await track.stream()]
queue_dur = sum(dur)
if queue.empty():
queue_dur = 0
if history:
return queue_dur
try:
remain = 0 if await self.current.stream() else (await self.current.duration() - await self.fetch_position())
except AttributeError:
remain = 0
return remain + queue_dur
[docs]
async def remove_from_queue(
self,
track: Track,
requester: discord.Member,
duplicates: bool = False,
) -> int:
if self.queue.empty():
return 0
tracks, count = await self.queue.remove(track, duplicates=duplicates)
self.next_track = None if self.queue.empty() else self.queue.raw_queue.popleft()
self.node.dispatch_event(QueueTracksRemovedEvent(player=self, requester=requester, tracks=tracks))
return count
[docs]
async def move_track(
self,
queue_number: int,
requester: discord.Member,
new_index: int = None,
) -> Track | None:
if self.queue.empty():
return None
track = await self.queue.get(queue_number)
await self.queue.put([track], new_index)
self.next_track = None if self.queue.empty() else self.queue.raw_queue.popleft()
self.node.dispatch_event(
QueueTrackPositionChangedEvent(
before=queue_number, after=new_index, track=track, player=self, requester=requester
)
)
return track
[docs]
async def maybe_shuffle_queue(self, requester: int) -> None:
if (await self.player_manager.client.player_config_manager.get_auto_shuffle(self.guild.id)) is False:
return
await self.shuffle_queue(requester)
[docs]
async def shuffle_queue(self, requester: int) -> None:
self.node.dispatch_event(QueueShuffledEvent(player=self, requester=self.guild.get_member(requester)))
await self.queue.shuffle()
self.next_track = None if self.queue.empty() else self.queue.raw_queue.popleft()
[docs]
async def set_autoplay_playlist(self, playlist: int | Playlist) -> None:
if isinstance(playlist, int):
await self.config.update_auto_play_playlist_id(playlist)
else:
await self.config.update_auto_play_playlist_id(playlist.id)
[docs]
async def get_auto_playlist(self) -> Playlist | None:
try:
return await self.player_manager.client.playlist_db_manager.get_playlist_by_id(
await self.config.fetch_auto_play_playlist_id()
)
except EntryNotFoundException:
return None
[docs]
async def set_autoplay(self, autoplay: bool) -> None:
await self.config.update_auto_play(autoplay)
[docs]
async def to_dict(self) -> dict:
"""
Returns a dict representation of the player.
"""
data = await self.config.fetch_all()
position = await self.position()
if self.timescale.changed:
position = self.timescale.reverse_position(position)
return {
"id": int(self.guild.id),
"channel_id": self.channel.id,
"current": await self.current.to_dict() if self.current else None,
"text_channel_id": data["text_channel_id"],
"notify_channel_id": data["notify_channel_id"],
"forced_channel_id": data["forced_channel_id"],
"paused": self.paused,
"repeat_queue": data["repeat_queue"],
"repeat_current": data["repeat_current"],
"shuffle": data["shuffle"],
"auto_shuffle": data["auto_shuffle"],
"auto_play": data["auto_play"],
"auto_play_playlist_id": data["auto_play_playlist_id"],
"volume": self.volume,
"position": position,
"playing": self.is_active,
"queue": [] if self.queue.empty() else [await t.to_dict() for t in self.queue.raw_queue],
"history": [] if self.history.empty() else [await t.to_dict() for t in self.history.raw_queue],
"effect_enabled": self._effect_enabled,
"effects": {
"volume": self._volume.to_dict(),
"equalizer": self._equalizer.to_dict(),
"karaoke": self._karaoke.to_dict(),
"timescale": self._timescale.to_dict(),
"tremolo": self._tremolo.to_dict(),
"vibrato": self._vibrato.to_dict(),
"rotation": self._rotation.to_dict(),
"distortion": self._distortion.to_dict(),
"low_pass": self._low_pass.to_dict(),
"channel_mix": self._channel_mix.to_dict(),
"echo": self._echo.to_dict(),
"reverb": self._reverb.to_dict(),
},
"self_deaf": data["self_deaf"],
"extras": {
"last_track": await self.last_track.to_dict() if self.last_track else None,
"next_track": await self.next_track.to_dict() if self.next_track else None,
"was_alone_paused": self._was_alone_paused,
},
}
[docs]
async def save(self) -> None:
if self.is_active:
await self.node.node_manager.client.player_state_db_manager.save_player(await self.to_dict())
[docs]
async def restore(self, player: PlayerState, requester: discord.User | discord.Member) -> None:
# sourcery no-metrics
if self._restored is True:
return
self._was_alone_paused = player.extras.get("was_alone_paused", False)
current, last_track, next_track, restoring_session = await self._process_restore_current_tracks(player)
self.last_track = last_track
self.next_track = next_track
self.current = None
self.paused = player.paused
await self._process_restore_autoplaylist(player)
self._last_position = player.position
history, queue = await self._process_restore_queues(player)
self.queue.raw_queue = collections.deque(queue)
self.queue.raw_b64s = [t.encoded for t in queue if t.encoded]
self.history.raw_queue = collections.deque(history)
self.history.raw_b64s = [t.encoded for t in history]
self._effect_enabled = player.effect_enabled
await self._process_restore_filters(player)
self.current = current
if self.current is None and ENABLE_NODE_RESUMING:
self.stopped = (not await self.autoplay_enabled()) and not self.queue.qsize()
else:
self.stopped = (not await self.autoplay_enabled()) and not self.queue.qsize() and not self.current
await self.change_to_best_node(ops=False, skip_position_fetch=True)
await self._process_restore_rest_call(restoring_session)
self.last_track = last_track
self._restored = True
await self.player_manager.client.player_state_db_manager.delete_player(guild_id=self.guild.id)
self.node.dispatch_event(PlayerRestoredEvent(self, requester))
self._logger.verbose("Player restored - %s", self)
async def _process_restore_autoplaylist(self, player: PlayerState) -> None:
if self._autoplay_playlist is None:
try:
self._autoplay_playlist = (
await self.player_manager.client.playlist_db_manager.get_playlist_by_id(
player.auto_play_playlist_id
)
if player.auto_play_playlist_id
else None
)
except EntryNotFoundException:
# Set playlist no longer exists, reset to the bundled playlist - stop player crashing on creation
await self.set_autoplay_playlist(1)
async def _process_restore_rest_call(self, restoring_session: bool) -> None:
payload = {}
if self.paused:
payload["paused"] = self.paused
if self.current and not restoring_session:
payload |= {"encodedTrack": self.current.encoded, "position": int(self._last_position)}
self._last_update = time.time() * 1000
if self.stopped and not restoring_session:
payload |= {"encodedTrack": None}
self._last_update = time.time() * 1000
if self.has_effects:
payload["filters"] = self.node.get_filter_payload(
player=self,
equalizer=self.equalizer,
karaoke=self.karaoke,
timescale=self.timescale,
tremolo=self.tremolo,
vibrato=self.vibrato,
rotation=self.rotation,
distortion=self.distortion,
low_pass=self.low_pass,
channel_mix=self.channel_mix,
pluginFilters=dict(echo=self.echo),
)
if self.volume_filter:
payload["volume"] = self.volume
if payload:
await self.node.patch_session_player(guild_id=self.guild.id, payload=payload)
async def _process_restore_queues(self, player):
queue = await self._generate_queue(player.queue)
history = await self._generate_queue(player.history)
return history, queue
async def _generate_queue(self, raw_queue):
queue_raw = (
[
{
"data": t.pop("encoded", None),
"query": t.pop("query"),
"full_track_data": t.pop("full_track_data", None),
"lazy": True,
**t.pop("extra", {}),
**t,
}
for t in raw_queue
]
if raw_queue
else []
)
encoded_list = [track["data"] for track in queue_raw if track["full_track_data"] is None]
full_track_data = [track["full_track_data"] for track in queue_raw if track["full_track_data"] is not None]
if encoded_list:
track_objects = await self.node.post_decodetracks(encoded_list)
track_objects_mapping = (
{track.encoded: track for track in track_objects} if isinstance(track_objects, list) else {}
)
else:
track_objects_mapping = {}
queue = []
if track_objects_mapping:
for i, track in enumerate(queue_raw, start=0):
query = await Query.from_string(track.pop("query"))
lazy = track.pop("lazy")
if track["data"] is not None and track["data"] in track_objects_mapping:
data = track_objects_mapping[track.pop("data")]
else:
data = track.pop("data")
new_track = await Track.build_track(
node=self.node, query=query, lazy=lazy, data=data, **track, player_instance=self
)
if new_track:
queue.append(new_track)
if (not track_objects_mapping) or full_track_data:
queue = (
[
(
await Track.build_track(
node=self.node,
data=t_full or t_data,
query=await Query.from_string(t.pop("query"), lazy=True),
lazy=t.pop("lazy") and not t_full,
**t,
player_instance=self,
)
)
for t in queue_raw
if [(t_full := t.pop("full_track_list", None)), (t_data := t.pop("data", None)), False]
and t_data not in track_objects_mapping
and t_full
or t_data
]
if queue_raw
else []
)
return queue
async def _process_restore_current_tracks(self, player):
restoring_session = False
if player.current:
player_api = await self.node.fetch_session_player(guild_id=self.guild.id)
if isinstance(player_api, LavalinkPlayer):
if player_api.track:
current = await Track.build_track(
node=self.node,
data=player_api.track,
**player.current.pop("extra"),
**player.current,
player_instance=self,
)
restoring_session = True
else:
current = None
elif full_track := player.current.pop("full_track_data", None):
current = await Track.build_track(
node=self.node,
data=from_dict(data_class=APITrack, data=full_track),
lazy=True,
query=await Query.from_string(player.current.pop("query")),
**player.current.pop("extra"),
**player.current,
player_instance=self,
)
else:
current = await Track.build_track(
node=self.node,
data=player.current.pop("encoded", None),
lazy=True,
query=await Query.from_string(player.current.pop("query")),
**player.current.pop("extra"),
**player.current,
player_instance=self,
)
else:
current = None
if n_track := player.extras.get("next_track", {}):
if full_track := n_track.pop("full_track_data", None):
next_track = await Track.build_track(
node=self.node,
data=from_dict(data_class=APITrack, data=full_track),
lazy=True,
query=await Query.from_string(n_track.pop("query")),
**n_track.pop("extra"),
**n_track,
player_instance=self,
)
else:
next_track = await Track.build_track(
node=self.node,
data=n_track.pop("encoded", None),
lazy=True,
query=await Query.from_string(n_track.pop("query")),
**n_track.pop("extra"),
**n_track,
player_instance=self,
)
else:
next_track = None
if l_track := player.extras.get("last_track", {}):
if full_track := l_track.pop("full_track_data", None):
last_track = await Track.build_track(
node=self.node,
data=from_dict(data_class=APITrack, data=full_track),
lazy=True,
query=await Query.from_string(l_track.pop("query")),
**l_track.pop("extra"),
**l_track,
player_instance=self,
)
else:
last_track = await Track.build_track(
node=self.node,
data=l_track.pop("encoded", None),
lazy=True,
query=await Query.from_string(l_track.pop("query")),
**l_track.pop("extra"),
**l_track,
player_instance=self,
)
else:
last_track = None
return current, last_track, next_track, restoring_session
async def _process_restore_filters(self, player):
effects = player.effects
if (v := effects.get("volume", None)) and (f := Volume.from_dict(v)):
self._volume = f
if (
self.node.has_filter("equalizer")
and (v := effects.get("equalizer", None))
and (f := Equalizer.from_dict(v))
):
self._equalizer = f
if self.node.has_filter("karaoke") and (v := effects.get("karaoke", None)) and (f := Karaoke.from_dict(v)):
self._karaoke = f
if (
self.node.has_filter("timescale")
and (v := effects.get("timescale", None))
and (f := Timescale.from_dict(v))
):
self._timescale = f
if self.node.has_filter("tremolo") and (v := effects.get("tremolo", None)) and (f := Tremolo.from_dict(v)):
self._tremolo = f
if self.node.has_filter("vibrato") and (v := effects.get("vibrato", None)) and (f := Vibrato.from_dict(v)):
self._vibrato = f
if self.node.has_filter("rotation") and (v := effects.get("rotation", None)) and (f := Rotation.from_dict(v)):
self._rotation = f
if (
self.node.has_filter("distortion")
and (v := effects.get("distortion", None))
and (f := Distortion.from_dict(v))
):
self._distortion = f
if self.node.has_filter("lowPass") and (v := effects.get("low_pass", None)) and (f := LowPass.from_dict(v)):
self._low_pass = f
if (
self.node.has_filter("channelMix")
and (v := effects.get("channel_mix", None))
and (f := ChannelMix.from_dict(v))
):
self._channel_mix = f
if self.node.has_filter("echo") and (v := effects.get("echo", None)) and (f := Echo.from_dict(v)):
self._echo = f
if self.node.has_filter("reverb") and (v := effects.get("reverb", None)) and (f := Reverb.from_dict(v)):
self._reverb = f
[docs]
async def fetch_node_player(self) -> LavalinkPlayer | HTTPException:
return await self.node.fetch_session_player(self.guild.id)