[docs]classError(PyLavException):"""Base class for all exceptions raised by this module"""
[docs]classRDNSLookupError(Error):"""Raised when there was a problem with performing reverse dns lookup for ip."""__slots__=("ip","port")def__init__(self,ip:str)->None:self.ip=ipself.error_msg=f"There was a problem with performing reverse dns lookup for ip: {ip}"super().__init__(self.error_msg)
[docs]@CACHE(ttl=3600,prefix="fetch_servers")asyncdeffetch_servers()->set[str]:""" Get IP of all currently available `Radio Browser` servers. Returns: set: List of addresses. """try:asyncwithaiohttp.ClientSession(json_serialize=json.dumps)assession:asyncwithsession.get("http://all.api.radio-browser.info/json/servers")asresponse:data=awaitresponse.json(loads=json.loads)exceptsocket.gaierror:returnset()else:return{server["name"]forserverindata}
[docs]@CACHE(ttl=300,prefix="pick_base_url")asyncdefpick_base_url(session:aiohttp.ClientSession)->URL|None:"""Pick a base url for the RadioBrowser API."""servers=awaitfetch_servers()ifnotservers:LOGGER.warning("RadioBrowser API seems to be down at the moment, disabling Radio functionality.")returnNoneforhostinservers:withcontextlib.suppress(Exception):asyncwithsession.get(f"https://{host}/json/stats")asresponse:ifresponse.status==200:returnURL(f"https://{host}")LOGGER.verbose("Error interacting with %s: %s",host,response.status)LOGGER.error("All the following hosts for the RadioBrowser API are broken: %s",", ".join(servers))