Source code for pylav.extension.red.utils.required_methods

from __future__ import annotations

import asyncio
import contextlib
import inspect
from pathlib import Path
from types import MethodType

import discord
from discord.ext.commands import CheckFailure
from redbot.core import commands
from redbot.core.data_manager import cog_data_path
from redbot.core.i18n import Translator
from redbot.core.utils.chat_formatting import box
from tabulate import tabulate

from pylav.core.client import Client
from pylav.core.context import PyLavContext
from pylav.exceptions.node import NoNodeAvailableException, NoNodeWithRequestFunctionalityAvailableException
from pylav.extension.red.errors import (
    IncompatibleException,
    MediaPlayerNotFoundError,
    NotDJError,
    UnauthorizedChannelError,
)
from pylav.helpers.format.ascii import EightBitANSI
from pylav.logging import getLogger
from pylav.type_hints.bot import DISCORD_BOT_TYPE, DISCORD_COG_TYPE

_ = Translator("PyLav", Path(__file__))
_LOCK = asyncio.Lock()
LOGGER = getLogger("PyLav.ext.red.utils.overrides")

INCOMPATIBLE_COGS = {}


@commands.command(
    cls=commands.commands._AlwaysAvailableCommand,
    name="plcredits",
    aliases=["pltranslation"],
    i18n=_,
)
async def pylav_credits(context: PyLavContext) -> None:
    """Shows the credits and translation details for the PyLav cogs and shared code"""
    await context.send(
        embed=await context.pylav.construct_embed(
            messageable=context,
            description=_(
                "PyLav was created by {library_author_name_variable_do_not_translate}.\n\n"
                "PyLav source code can be located in {library_url_variable_do_not_translate}\n"
                "PyLav license can be located in {library_license_variable_do_not_translate}\n"
                "PyLav-Cogs source code can be located in {second_library_url_variable_do_not_translate}\n\n"
                "You can join the PyLav support server via {support_server_url_variable_do_not_translate}\n"
                "\n\n"
                "You can help translate PyLav by contributing to our Crowdin project at:\n"
                "{crowdin_project_url_variable_do_not_translate}\n\n\n"
                "Contributors:\n"
                "- {project_contributors_url_variable_do_not_translate}\n"
                "- {second_project_contributors_url_variable_do_not_translate}\n"
                "If you wish to buy me a coffee for my work, you can do so at:\n"
                "{buymeacoffee_url_variable_do_not_translate} or {github_sponsors_url_variable_do_not_translate}"
            ).format(
                library_author_name_variable_do_not_translate="[Draper#6666](https://github.com/Drapersniper)",
                library_url_variable_do_not_translate="https://github.com/PyLav/PyLav",
                library_license_variable_do_not_translate="https://github.com/PyLav/PyLav/blob/develop/LICENSE",
                second_library_url_variable_do_not_translate="https://github.com/PyLav/Red-Cogs",
                support_server_url_variable_do_not_translate="https://discord.com/invite/vnmcXqtgeY",
                crowdin_project_url_variable_do_not_translate="https://crowdin.com/project/pylav",
                project_contributors_url_variable_do_not_translate="https://github.com/PyLav/PyLav/graphs/contributors",
                second_project_contributors_url_variable_do_not_translate="https://github.com/PyLav/Red-Cogs/graphs/contributors",
                buymeacoffee_url_variable_do_not_translate="https://www.buymeacoffee.com/draper",
                github_sponsors_url_variable_do_not_translate="https://github.com/sponsors/Drapersniper",
            ),
        ),
        ephemeral=True,
    )


@commands.command(
    cls=commands.commands._AlwaysAvailableCommand,
    name="plversion",
    aliases=["pylavversion"],
    i18n=_,
)
async def pylav_version(context: PyLavContext) -> None:
    """Show the version of PyLav library"""
    if isinstance(context, discord.Interaction):
        context = await context.client.get_context(context)
    if context.interaction and not context.interaction.response.is_done():
        await context.defer(ephemeral=True)
    data = [
        (EightBitANSI.paint_white("PyLav"), EightBitANSI.paint_blue(context.pylav.lib_version)),
    ]

    await context.send(
        embed=await context.pylav.construct_embed(
            description=box(
                tabulate(
                    data,
                    headers=(
                        EightBitANSI.paint_yellow(_("Library"), bold=True, underline=True),
                        EightBitANSI.paint_yellow(_("Version"), bold=True, underline=True),
                    ),
                    tablefmt="fancy_grid",
                ),
                lang="ansi",
            ),
            messageable=context,
        ),
        ephemeral=True,
    )


def _done_callback(task: asyncio.Task) -> None:
    with contextlib.suppress(asyncio.CancelledError):
        exc = task.exception()
        if exc is not None:
            LOGGER.error("Error in initialize task", exc_info=exc)


[docs] async def cog_command_error(self: DISCORD_COG_TYPE, context: PyLavContext, error: Exception) -> None: error = getattr(error, "original", error) unhandled = True if isinstance(error, MediaPlayerNotFoundError): unhandled = False await context.send( embed=await self.pylav.construct_embed( messageable=context, description=_("This command requires that I be in a voice channel before it can be executed."), ), ephemeral=True, ) elif isinstance(error, NoNodeAvailableException): unhandled = False await context.send( embed=await self.pylav.construct_embed( messageable=context, description=_( "PyLavPlayer cog is temporarily unavailable due to an outage with the backend services; please try again later." ), footer=_("There are no nodes available currently.") if await self.bot.is_owner(context.author) else None, ), ephemeral=True, ) elif isinstance(error, NoNodeWithRequestFunctionalityAvailableException): unhandled = False await context.send( embed=await self.pylav.construct_embed( messageable=context, description=_( "PyLavPlayer is currently unable to process tracks belonging to {feature_name_variable_do_not_translate}." ).format(feature_name_variable_do_not_translate=error.feature), footer=_( "There is currently no available Lavalink node with the feature {feature_name_variable_do_not_translate}." ).format(feature_name_variable_do_not_translate=error.feature) if await self.bot.is_owner(context.author) else None, ), ephemeral=True, ) elif isinstance(error, UnauthorizedChannelError): unhandled = False await context.send( embed=await self.pylav.construct_embed( messageable=context, description=_( "This command is unavailable in this channel; please use {channel_name_variable_do_not_translate} instead." ).format( channel_name_variable_do_not_translate=channel.mention if (channel := context.guild.get_channel_or_thread(error.channel)) else error.channel ), ), ephemeral=True, delete_after=10, ) elif isinstance(error, NotDJError): unhandled = False await context.send( embed=await self.pylav.construct_embed( messageable=context, description=_("This command requires you to be a disc jockey."), ), ephemeral=True, delete_after=10, ) if unhandled: if (meth := getattr(self, "__pylav_original_cog_command_error", None)) and ( func := self._get_overridden_method(meth) ): return await discord.utils.maybe_coroutine(func, context, error) else: return await self.bot.on_command_error(context, error, unhandled_by_cog=True) # type: ignore
[docs] async def cog_unload(self: DISCORD_COG_TYPE) -> None: if self._init_task is not None: self._init_task.cancel() client = self.pylav await client.unregister(cog=self) if client._shutting_down: self.bot.remove_command(pylav_credits.qualified_name) self.bot.remove_command(pylav_version.qualified_name) if meth := getattr(self, "__pylav_original_cog_unload", None): return await discord.utils.maybe_coroutine(meth)
[docs] async def cog_before_invoke(self: DISCORD_COG_TYPE, context: PyLavContext): try: await self.pylav.wait_until_ready(timeout=30) except TimeoutError as e: LOGGER.debug("Discarded command due to PyLav not being ready within 30 seconds") LOGGER.verbose( "Discarded command due to PyLav not being ready within 30 seconds - Guild: %s - Command: %s", context.guild, context.command.qualified_name, ) raise CheckFailure(_("PyLav is starting up; please try again in a few minutes.")) from e if meth := getattr(self, "__pylav_original_cog_before_invoke", None): return await discord.utils.maybe_coroutine(meth)
[docs] async def initialize(self: DISCORD_COG_TYPE, *args, **kwargs) -> None: if not self.init_called: await self.pylav.register(self) await self.pylav.initialize() self.init_called = True if meth := getattr(self, "__pylav_original_initialize", None): return await discord.utils.maybe_coroutine(meth, *args, **kwargs)
[docs] async def cog_check(self: DISCORD_COG_TYPE, context: PyLavContext) -> bool: # This cog mock discord objects and sends them on the listener # Due to the potential risk for unexpected behaviour - disabled all commands if this cog is loaded. if any(context.bot.get_cog(name) is not None for name in INCOMPATIBLE_COGS): return False if not (getattr(context.bot, "pylav", None)): return False meth = getattr(self, "__pylav_original_cog_check", None) if not context.guild: return await discord.utils.maybe_coroutine(meth, context) if meth else True if getattr(context, "player", None): config = context.player.config else: config = context.bot.pylav.player_config_manager.get_config(context.guild.id) if (channel_id := await config.fetch_text_channel_id()) != 0 and channel_id != context.channel.id: return False return await discord.utils.maybe_coroutine(meth, context) if meth else True
[docs] def class_factory( bot: DISCORD_BOT_TYPE, cls: type[DISCORD_COG_TYPE], cogargs: tuple[object], cogkwargs: dict[str, object], ) -> DISCORD_COG_TYPE: # sourcery no-metrics """ Creates a new class which inherits from the given class and overrides the following methods: - cog_check - cog_unload - cog_before_invoke - initialize - cog_command_error """ if not bot.get_command(pylav_credits.qualified_name): bot.add_command(pylav_credits) if not bot.get_command(pylav_version.qualified_name): bot.add_command(pylav_version) argspec = inspect.getfullargspec(cls.__init__) if ("bot" in argspec.args or "bot" in argspec.kwonlyargs) and bot not in cogargs: cogkwargs["bot"] = bot cog_instance = cls(*cogargs, **cogkwargs) if not hasattr(cog_instance, "__version__"): cog_instance.__version__ = "0.0.0" cog_instance.pylav = Client(bot=bot, cog=cog_instance, config_folder=cog_data_path(raw_name="PyLav")) cog_instance.bot = bot cog_instance.init_called = False cog_instance._init_task = cls.cog_check cog_instance.lavalink = cog_instance.pylav old_cog_on_command_error = cog_instance._get_overridden_method(cog_instance.cog_command_error) old_cog_unload = cog_instance._get_overridden_method(cog_instance.cog_unload) old_cog_before_invoke = cog_instance._get_overridden_method(cog_instance.cog_before_invoke) old_cog_check = cog_instance._get_overridden_method(cog_instance.cog_check) old_cog_initialize = getattr(cog_instance, "initialize", None) if old_cog_on_command_error: cog_instance.__pylav_original_cog_command_error = old_cog_on_command_error if old_cog_unload: cog_instance.__pylav_original_cog_unload = old_cog_unload if old_cog_before_invoke: cog_instance.__pylav_original_cog_before_invoke = old_cog_before_invoke if old_cog_check: cog_instance.__pylav_original_cog_check = old_cog_check if old_cog_initialize: cog_instance.__pylav_original_initialize = old_cog_initialize cog_instance.cog_command_error = MethodType(cog_command_error, cog_instance) cog_instance.cog_unload = MethodType(cog_unload, cog_instance) cog_instance.cog_before_invoke = MethodType(cog_before_invoke, cog_instance) cog_instance.initialize = MethodType(initialize, cog_instance) cog_instance.cog_check = MethodType(cog_check, cog_instance) return cog_instance
[docs] async def pylav_auto_setup( bot: DISCORD_BOT_TYPE, cog_cls: type[DISCORD_COG_TYPE], cogargs: tuple[object, ...] = None, cogkwargs: dict[str, object] = None, initargs: tuple[object, ...] = None, initkwargs: dict[str, object] = None, ) -> DISCORD_COG_TYPE: """Injects all the methods and attributes to respect PyLav Settings and keep the user experience consistent. Adds `.bot` attribute to the cog instance. Adds `.pylav` attribute to the cog instance and starts up PyLav Overwrites cog_unload method to unregister the cog from Lavalink, calling the original cog_unload method once the PyLav unregister code is run. Overwrites cog_before_invoke To force commands to wait for PyLav to be ready Overwrites cog_check method to check if the cog is allowed to run in the current context. If called within a Guild then we check if we can run as per the PyLav Command channel lock, if this check passes then the original cog_check method is called. Overwrites cog_command_error method to handle PyLav errors raised by the cog, if the cog defines their own cog_command_error method, this will still be called after the built-in PyLav error handling if the error raised was unhandled. Overwrites initialize method to handle PyLav startup, calling the original initialize method once the PyLav initialization code is run, if such method exists. code is run. Args: bot (DISCORD_BOT_TYPE): The bot instance to load the cog instance to. cog_cls (type[DISCORD_COG_TYPE]): The cog class load. cogargs (tuple[object]): The arguments to pass to the cog class. cogkwargs (dict[str, object]): The keyword arguments to pass to the cog class. initargs (tuple[object]): The arguments to pass to the initialize method. initkwargs (dict[str, object]): The keyword arguments to pass to the initialize method. Returns: DISCORD_COG_TYPE: The cog instance loaded to the bot. Example: >>> from pylav.extension.red.utils.required_methods import pylav_auto_setup >>> from discord.ext.commands import Cog >>> class MyCogClass(Cog): ... def __init__(self, bot: DISCORD_BOT_TYPE, special_arg: object): ... self.bot = bot ... self.special_arg = special_arg >>> async def setup(bot: DISCORD_BOT_TYPE) -> None: ... await pylav_auto_setup(bot, MyCogClass, cogargs=(), cogkwargs=dict(special_arg=42), initargs=(), initkwargs=dict()) """ if any(bot.get_cog(name) is not None for name in INCOMPATIBLE_COGS if (_name := name)): raise IncompatibleException( f"{_name} is loaded, this cog is incompatible with PyLav - PyLav will not work as long as this cog is loaded" ) if cogargs is None: cogargs = () if cogkwargs is None: cogkwargs = {} if initargs is None: initargs = () if initkwargs is None: initkwargs = {} async with _LOCK: cog_instance = class_factory(bot, cog_cls, cogargs, cogkwargs) await bot.add_cog(cog_instance) cog_instance._init_task = asyncio.create_task(cog_instance.initialize(*initargs, **initkwargs)) cog_instance._init_task.add_done_callback(_done_callback) return cog_instance