Source code for pylav.players.tracks.encoder
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Any
from pylav.constants.node import TRACK_VERSION
from pylav.logging import getLogger
from pylav.utils.vendor.lavalink_py.datarw import DataWriter
if TYPE_CHECKING:
from pylav.nodes.node import Node
LOGGER = getLogger("PyLav.Track.Decoder")
# noinspection SpellCheckingInspection,PyPep8Naming
[docs]
def encode_track(
title: str,
author: str,
length: int,
identifier: str,
isStream: bool,
uri: str | None,
sourceName: str,
artworkUrl: str | None = None,
isrc: str | None = None,
probe: str | None = None,
version: int = TRACK_VERSION,
**kwargs: Any,
) -> str:
writer = DataWriter()
writer.write_version(version)
writer.write_utf(title)
writer.write_utf(author)
writer.write_long(length)
writer.write_utf(identifier)
writer.write_boolean(isStream)
if version >= 2:
writer.write_nullable_utf(uri)
if version >= 3:
writer.write_nullable_utf(artworkUrl)
writer.write_nullable_utf(isrc)
writer.write_utf(sourceName)
match sourceName:
case "local" | "http" if probe is not None:
writer.write_utf(probe)
case "spotify" | "applemusic" | "deezer" if version == 2:
writer.write_nullable_utf(isrc)
writer.write_nullable_utf(artworkUrl)
case "yandexmusic" if version == 2:
writer.write_nullable_utf(artworkUrl)
writer.write_long(0)
return writer.to_base64()
# noinspection SpellCheckingInspection,PyPep8Naming
[docs]
async def async_encoder(
title: str,
author: str,
length: int,
identifier: str,
is_stream: bool,
uri: str | None,
source: str,
version: int = TRACK_VERSION,
artworkUrl: str | None = None,
isrc: str | None = None,
probe: str | None = None,
**kwargs: Any,
) -> str:
return await asyncio.to_thread(
encode_track,
title=title,
author=author,
length=length,
identifier=identifier,
isStream=is_stream,
uri=uri,
sourceName=source,
artworkUrl=artworkUrl,
isrc=isrc,
probe=probe,
version=version,
**kwargs,
)
[docs]
async def async_re_encoder(track: str, node: Node) -> str:
track_obj = await node.fetch_decodetrack(track, raise_on_failure=True)
track_data = track_obj.info.to_dict()
return await async_encoder(**track_data)
[docs]
async def async_bulk_re_encoder(track: list[str], node: Node) -> AsyncIterator[str]:
track_objs = await node.post_decodetracks(track, raise_on_failure=True)
for track_obj in track_objs:
track_data = track_obj.info.to_dict()
yield await async_encoder(**track_data)