from__future__importannotationsimportasyncioimportosimportaiopathfromcashewsimportCachefrompylav.extension.m3u.http_clientimportDefaultHTTPClient,parsed_urlfrompylav.extension.m3u.modelsimportM3U8frompylav.extension.m3u.parserimportis_url# coding: utf-8# Copyright 2014 Globo.com Player authors. All rights reserved.# Use of this source code is governed by a MIT License# license that can be found in the LICENSE file.CACHE=Cache("M3U3CACHE")CACHE.setup("mem://?check_interval=10",size=1_000_000,enable=True)
[docs]@CACHE(ttl=600,prefix="m3u_loads")asyncdefloads(self,content:str,uri:str=None,custom_tags_parser=None):# noqa""" Given a string with a m3u8 content, returns a M3U8 object. Optionally parses a uri to set a correct base_uri on the M3U8 object. Raises ValueError if invalid content """ifuriisNone:returnawaitasyncio.to_thread(M3U8,content,custom_tags_parser=custom_tags_parser)else:parsed_url(uri)
[docs]@CACHE(ttl=600,prefix="m3u_load")asyncdefload(self,# noqauri:str,timeout:float=None,headers:dict[str,str]=None,custom_tags_parser=None,http_client:DefaultHTTPClient=DefaultHTTPClient(),verify_ssl:bool=True,):""" Retrieves the content from a given URI and returns a M3U8 object. Raises ValueError if invalid content or IOError if request fails. """ifheadersisNone:headers={}ifis_url(uri):content,base_uri=awaithttp_client.download(uri=uri,timeout=timeout,headers=headers,verify_ssl=verify_ssl)returnawaitasyncio.to_thread(M3U8,content,base_uri=base_uri,custom_tags_parser=custom_tags_parser)else:returnawaitload_from_file(uri,custom_tags_parser)
[docs]asyncdefload_from_file(uri:str,custom_tags_parser=None):"""Loads a m3u8 file from the filesystem Parameters: ----------- uri: str uri of the m3u8 file custom_tags_parser: callable custom tags parser function """path=aiopath.AsyncPath(uri)asyncwithpath.open("r",encoding="utf8")asfile:raw_content=(awaitfile.read()).strip()base_uri=os.path.dirname(uri)returnM3U8(raw_content,base_uri=base_uri,custom_tags_parser=custom_tags_parser)