Source code for pylav.players.query.local_files

from __future__ import annotations

import os
import pathlib
import sys
import typing
from collections.abc import AsyncIterator

import aiopath  # type: ignore
import asyncstdlib
import discord

try:
    from redbot.core.i18n import Translator  # type: ignore

    _ = Translator("PyLav", pathlib.Path(__file__))
except ImportError:
    Translator = None

    def _(string: str) -> str:
        return string


if typing.TYPE_CHECKING:
    from pylav.players.query.obj import Query


__FULLY_SUPPORTED_MUSIC = (".mp3", ".flac", ".ogg")
__PARTIALLY_SUPPORTED_MUSIC_EXT = (
    ".m3u",
    ".m4a",
    ".aac",
    ".ra",
    ".wav",
    ".opus",
    ".wma",
    ".ts",
    ".au",
    ".m3u8",
    ".pls",
    ".pylav",
    # These do not work
    # ".mid",
    # ".mka",
    # ".amr",
    # ".aiff",
    # ".ac3",
    # ".voc",
    # ".dsf",
)
__PARTIALLY_SUPPORTED_VIDEO_EXT = (
    ".mp4",
    ".mov",
    ".flv",
    ".webm",
    ".mkv",
    ".wmv",
    ".3gp",
    ".m4v",
    ".mk3d",  # https://github.com/Devoxin/lavaplayer
    ".mka",  # https://github.com/Devoxin/lavaplayer
    ".mks",  # https://github.com/Devoxin/lavaplayer
    # These do not work
    # ".vob",
    # ".mts",
    # ".avi",
    # ".mpg",
    # ".mpeg",
    # ".swf",
)

__PLUGIN_SUPPORTED_EXT = (
    # https://github.com/esmBot/lava-xm-plugin
    # More can be supported but theres just too many to add,
    #  As people show up with the need to add more I will add them
    #  Full list of supported format: https://github.com/libxmp/libxmp/tree/master/docs/formats
    ".mod",
    ".s3m",
    ".xm",
    ".it",
    ".ahx",
    # -------------------------------
)
__PARTIALLY_SUPPORTED_EXTENSION = (
    __PARTIALLY_SUPPORTED_MUSIC_EXT + __PARTIALLY_SUPPORTED_VIDEO_EXT + __PLUGIN_SUPPORTED_EXT
)

ALL_EXTENSIONS = __FULLY_SUPPORTED_MUSIC + __PARTIALLY_SUPPORTED_EXTENSION

_ROOT_FOLDER: aiopath.AsyncPath | None = None


[docs] class LocalFile: __slots__ = ( "_path", "_parent", "__init", ) _ROOT_FOLDER: aiopath.AsyncPath | None = _ROOT_FOLDER def __init__(self, path: str | pathlib.Path | aiopath.AsyncPath): if self._ROOT_FOLDER is None: # noinspection SpellCheckingInspection raise RuntimeError( _("Root folder is not initialized, call `{python_function_variable_do_not_translate}`.").format( python_function_variable_do_not_translate="await Client.update_localtracks_folder(folder: str | pathlib.Path)" ) ) self._path: aiopath.AsyncPath = aiopath.AsyncPath(path) self._parent = self._path.parent self.__init = False def __str__(self) -> str: return str(self._path) def __repr__(self) -> str: return f"<LocalFile path={self._path}>"
[docs] async def initialize(self) -> None: if self.__init: return self._path = await discord.utils.maybe_coroutine(self._path.absolute) self._path.relative_to(self.root_folder) self.__init = True
[docs] @classmethod async def add_root_folder(cls, path: str | pathlib.Path | aiopath.AsyncPath, *, create: bool = True) -> None: global _ROOT_FOLDER _ROOT_FOLDER = cls._ROOT_FOLDER = aiopath.AsyncPath(path) if create: await _ROOT_FOLDER.mkdir(parents=True, exist_ok=True)
@property def path(self) -> aiopath.AsyncPath: return self._path @property def sync_path(self) -> pathlib.Path: return pathlib.Path(self._path) @property def root_folder(self) -> aiopath.AsyncPath: return self._ROOT_FOLDER @property def parent(self) -> aiopath.AsyncPath: return self._parent @property def name(self) -> str: return typing.cast(str, self._path.name if pathlib.Path(self._path).is_dir() else self._path.stem) @property def extension(self) -> str: return typing.cast(str, self._path.suffix) @property def initialized(self) -> bool: return self.__init async def _to_string_user_no_string( self, path: aiopath.AsyncPath, length: int | None = None, add_ellipsis: bool = False, with_emoji: bool = False, no_extension: bool = False, is_album: bool = False, ) -> str: string = typing.cast(str, path.name if await self.path.is_dir() else path.stem if no_extension else path.name) if string.startswith("/") or string.startswith("\\"): string = string[1:] if length is not None: string = string[length * -1 :] # noqa: E203 if add_ellipsis and length is not None and len(string) > length: string = f"\N{HORIZONTAL ELLIPSIS}{string[3:].strip()}" if with_emoji: emoji = "\N{FILE FOLDER}" if is_album else "\N{MULTIPLE MUSICAL NOTES}" string = f"{emoji}{string}" return string
[docs] async def to_string_user( self, length: int | None = None, name_only: bool = False, add_ellipsis: bool = False, with_emoji: bool = False, no_extension: bool = False, is_album: bool = False, ) -> str: if with_emoji and length is not None: length -= 1 path = typing.cast(aiopath.AsyncPath, await discord.utils.maybe_coroutine(self.path.absolute)) is_dir = await self.path.is_dir() if name_only: string = typing.cast(str, path.name if is_dir else path.stem if no_extension else path.name) else: root = typing.cast(aiopath.AsyncPath, await discord.utils.maybe_coroutine(self.root_folder.absolute)) string = str(path).replace(str(root), "") if no_extension and not is_dir: string = string.removesuffix(path.suffix) if not string: return await self._to_string_user_no_string(path, length, add_ellipsis, with_emoji, no_extension, is_album) if length is not None: string = await self._to_string_user_no_length(string, add_ellipsis, length, name_only, no_extension) if with_emoji: emoji = "\N{FILE FOLDER}" if is_album else "\N{MULTIPLE MUSICAL NOTES}" string = f"{emoji}{string}" return string
@staticmethod async def _to_string_user_no_length( string: str, add_ellipsis: bool = False, length: int | None = None, name_only: bool = False, no_extension: bool = False, ) -> str: temp_path = aiopath.AsyncPath(string) parts = list(temp_path.parts) parts_reversed = list(parts[::-1]) if await temp_path.is_file(): parts_reversed[0] = temp_path.stem if no_extension else temp_path.name string_list = parts_reversed[::-1] if name_only: string = os.path.join(*string_list[-2:]) else: string = os.path.join(*string_list) if string.startswith("/") or string.startswith("\\"): string = string[1:] if length is not None and len(string) > length: if not add_ellipsis: string = string[length * -1 :] # noqa: E203 else: string = f"\N{HORIZONTAL ELLIPSIS}{string[length * -1:].strip()}" return string
[docs] async def files_in_folder(self, show_folders: bool = False) -> AsyncIterator[Query]: parent = self.path if await self.path.is_dir() else self.path.parent async for path in self._get_entries_in_folder(parent, show_folders=show_folders): yield path
async def _get_entries_in_folder( self, folder: aiopath.AsyncPath, recursive: bool = False, show_folders: bool = False, folder_only: bool = False ) -> AsyncIterator[Query]: async def _key(path_file: aiopath.AsyncPath) -> str: return f"{path_file}" from pylav.players.query.obj import Query for path in await asyncstdlib.heapq.nsmallest(asyncstdlib.iter(folder.iterdir()), n=sys.maxsize, key=_key): if await path.is_dir(): if recursive: yield await Query.from_string(path) async for p in self._get_entries_in_folder( path, recursive=recursive, show_folders=show_folders, folder_only=folder_only ): yield p elif show_folders and path.is_relative_to(self._ROOT_FOLDER): yield await Query.from_string(path) elif (not folder_only) and await path.is_file(): if path.suffix.lower() in ALL_EXTENSIONS and path.is_relative_to(self._ROOT_FOLDER): yield await Query.from_string(path)
[docs] async def files_in_tree(self, show_folders: bool = False) -> AsyncIterator[Query]: parent = self.path if await self.path.is_dir() else self.path.parent async for path in self._get_entries_in_folder(parent, recursive=True, show_folders=show_folders): yield path
[docs] async def folders_in_tree(self) -> AsyncIterator[Query]: parent = self.path if await self.path.is_dir() else self.path.parent async for path in self._get_entries_in_folder(parent, recursive=True, show_folders=True, folder_only=True): yield path