Source code for pylav.extension.m3u.parser

from __future__ import annotations

import datetime
import itertools
import re
from collections.abc import Callable
from typing import Any
from urllib.parse import urljoin as _urljoin

import iso8601

from pylav.extension.m3u import protocols
from pylav.type_hints.generics import ANY_GENERIC_TYPE

# coding: utf-8
# Copyright 2014 Globo.com Player authors. All rights reserved.
# Use of this source code is governed by a MIT License
# license that can be found in the LICENSE file.


"""
http://tools.ietf.org/html/draft-pantos-http-live-streaming-08#section-3.2
http://stackoverflow.com/questions/2785755/how-to-split-but-ignore-separators-in-quoted-strings-in-python
"""
ATTRIBUTELISTPATTERN = re.compile(r"""((?:[^,"']|"[^"]*"|'[^']*')+)""")
URI_PREFIXES = ("https://", "http://", "s3://", "s3a://", "s3n://")


[docs] def cast_date_time(value: str) -> datetime.datetime: return iso8601.parse_date(value)
[docs] def format_date_time(value: datetime.datetime) -> str: return value.isoformat()
[docs] class ParseError(Exception): def __init__(self, lineno: int, line: str) -> None: self.lineno = lineno self.line = line def __str__(self) -> str: return f"Syntax error in manifest on line {self.lineno:d}: {self.line}"
[docs] def parse(content: str, strict: bool = False, custom_tags_parser: bool = None) -> dict[str, Any]: """ Given a M3U8 playlist content returns a dictionary with all data found """ data = { "media_sequence": 0, "is_variant": False, "is_endlist": False, "is_i_frames_only": False, "is_independent_segments": False, "playlist_type": None, "playlists": [], "segments": [], "iframe_playlists": [], "media": [], "keys": [], "rendition_reports": [], "skip": {}, "part_inf": {}, "session_data": [], "session_keys": [], } state = { "expect_segment": False, "expect_playlist": False, "current_key": None, "current_segment_map": None, } lineno = 0 for line in string_to_lines(content): lineno += 1 line = line.strip() # Call custom parser if needed if line.startswith("#") and callable(custom_tags_parser): go_to_next_line = custom_tags_parser(line, lineno, data, state) # Do not try to parse other standard tags on this line if custom_tags_parser function returns 'True' if go_to_next_line: continue if line.startswith(protocols.EXT_X_BYTE_RANGE): _parse_byterange(line, state) state["expect_segment"] = True continue _process_line(content, data, line, lineno, state, strict) # there could be remaining partial segments if "segment" in state: data["segments"].append(state.pop("segment")) return data
def _process_line(content, data, line, lineno, state, strict): # sourcery skip: low-code-quality if line.startswith(protocols.EXT_X_BIT_RATE): _parse_bitrate(line, state) elif line.startswith(protocols.EXT_X_TARGET_DURATION): _parse_simple_parameter(line, data, float) elif line.startswith(protocols.EXT_X_MEDIA_SEQUENCE): _parse_simple_parameter(line, data, int) elif line.startswith(protocols.EXT_X_DISCONTINUITY_SEQUENCE): _parse_simple_parameter(line, data, int) elif line.startswith(protocols.EXT_X_PROGRAM_DATE_TIME): _, program_date_time = _parse_simple_parameter_raw_value(line, cast_date_time) if not data.get("program_date_time"): data["program_date_time"] = program_date_time state["current_program_date_time"] = program_date_time state["program_date_time"] = program_date_time elif line.startswith(protocols.EXT_X_DISCONTINUITY): state["discontinuity"] = True elif line.startswith(protocols.EXT_X_CUE_OUT_CONT): _parse_cueout_cont(line, state) state["cue_out"] = True elif line.startswith(protocols.EXT_X_CUE_OUT): _parse_cueout(line, state, string_to_lines(content)[lineno - 2]) state["cue_out_start"] = True state["cue_out"] = True elif line.startswith(protocols.EXT_X_CUE_IN): state["cue_in"] = True elif line.startswith(protocols.EXT_X_CUE_SPAN): state["cue_out"] = True elif line.startswith(protocols.EXT_X_VERSION): _parse_simple_parameter(line, data, int) elif line.startswith(protocols.EXT_X_ALLOW_CACHE): _parse_simple_parameter(line, data) elif line.startswith(protocols.EXT_X_KEY): key = _parse_key(line) state["current_key"] = key if key not in data["keys"]: data["keys"].append(key) elif line.startswith(protocols.EXT_INF): _parse_extinf(line, data, state, lineno, strict) state["expect_segment"] = True elif line.startswith(protocols.EXT_X_STREAM_INF): state["expect_playlist"] = True _parse_stream_inf(line, data, state) elif line.startswith(protocols.EXT_X_I_FRAME_STREAM_INF): _parse_i_frame_stream_inf(line, data) elif line.startswith(protocols.EXT_X_MEDIA): _parse_media(line, data, state) elif line.startswith(protocols.EXT_X_PLAYLIST_TYPE): _parse_simple_parameter(line, data) elif line.startswith(protocols.EXT_I_FRAMES_ONLY): data["is_i_frames_only"] = True elif line.startswith(protocols.EXT_IS_INDEPENDENT_SEGMENTS): data["is_independent_segments"] = True elif line.startswith(protocols.EXT_X_END_LIST): data["is_endlist"] = True elif line.startswith(protocols.EXT_X_MAP): quoted_parser = remove_quotes_parser("uri") segment_map_info = _parse_attribute_list(protocols.EXT_X_MAP, line, quoted_parser) state["current_segment_map"] = segment_map_info # left for backward compatibility data["segment_map"] = segment_map_info elif line.startswith(protocols.EXT_X_START): attribute_parser = {"time_offset": lambda x: float(x)} start_info = _parse_attribute_list(protocols.EXT_X_START, line, attribute_parser) data["start"] = start_info elif line.startswith(protocols.EXT_X_SERVER_CONTROL): _parse_server_control(line, data, state) elif line.startswith(protocols.EXT_X_PART_INF): _parse_part_inf(line, data, state) elif line.startswith(protocols.EXT_X_RENDITION_REPORT): _parse_rendition_report(line, data, state) elif line.startswith(protocols.EXT_X_PART): _parse_part(line, data, state) elif line.startswith(protocols.EXT_X_SKIP): _parse_skip(line, data, state) elif line.startswith(protocols.EXT_X_SESSION_DATA): _parse_session_data(line, data, state) elif line.startswith(protocols.EXT_X_SESSION_KEY): _parse_session_key(line, data, state) elif line.startswith(protocols.EXT_X_PRELOAD_HINT): _parse_preload_hint(line, data, state) elif line.startswith(protocols.EXT_X_DATERANGE): _parse_daterange(line, data, state) elif line.startswith(protocols.EXT_X_GAP): state["gap"] = True elif line.startswith(protocols.EXT_X_CONTENT_STEERING): _parse_content_steering(line, data, state) elif line.startswith(protocols.EXT_M3U): # We don't parse #EXTM3U, it just should to be present pass elif line.strip() == "": # blank lines are legal pass elif state["expect_segment"]: _parse_ts_chunk(line, data, state) state["expect_segment"] = False elif state["expect_playlist"]: _parse_variant_playlist(line, data, state) state["expect_playlist"] = False elif strict: raise ParseError(lineno, line) def _parse_key(line: str) -> dict[str, Any]: params = ATTRIBUTELISTPATTERN.split(line.replace(f"{protocols.EXT_X_KEY}:", ""))[1::2] key = {} for param in params: name, value = param.split("=", 1) key[normalize_attribute(name)] = remove_quotes(value) return key def _parse_extinf(line: str, data: dict, state: dict, lineno: int, strict: bool) -> None: # noqa chunks = line.replace(f"{protocols.EXT_INF}:", "").split(",", 1) if len(chunks) == 2: duration, title = chunks elif len(chunks) == 1: if strict: raise ParseError(lineno, line) duration = chunks[0] title = "" if "segment" not in state: state["segment"] = {} state["segment"]["duration"] = float(duration) # noqa state["segment"]["title"] = title # noqa def _parse_ts_chunk(line: str, data: dict, state: dict) -> None: segment = state.pop("segment") if state.get("program_date_time"): segment["program_date_time"] = state.pop("program_date_time") if state.get("current_program_date_time"): segment["current_program_date_time"] = state["current_program_date_time"] state["current_program_date_time"] += datetime.timedelta(seconds=segment["duration"]) segment["uri"] = line segment["cue_in"] = state.pop("cue_in", False) segment["cue_out"] = state.pop("cue_out", False) segment["cue_out_start"] = state.pop("cue_out_start", False) scte_op = state.pop if segment["cue_in"] else state.get segment["scte35"] = scte_op("current_cue_out_scte35", None) segment["scte35_duration"] = scte_op("current_cue_out_duration", None) segment["discontinuity"] = state.pop("discontinuity", False) if state.get("current_key"): segment["key"] = state["current_key"] elif None not in data["keys"]: data["keys"].append(None) if state.get("current_segment_map"): segment["init_section"] = state["current_segment_map"] segment["dateranges"] = state.pop("dateranges", None) segment["gap_tag"] = state.pop("gap", None) data["segments"].append(segment) def _parse_attribute_list(prefix: str, line: str, atribute_parser: dict) -> dict: params = ATTRIBUTELISTPATTERN.split(line.replace(f"{prefix}:", ""))[1::2] attributes = {} for param in params: name, value = param.split("=", 1) name = normalize_attribute(name) if name in atribute_parser: value = atribute_parser[name](value) attributes[name] = value return attributes def _parse_stream_inf(line: str, data: dict, state: dict) -> None: data["is_variant"] = True data["media_sequence"] = None atribute_parser = remove_quotes_parser("codecs", "audio", "video", "subtitles", "closed_captions", "pathway_id") atribute_parser["program_id"] = int atribute_parser["bandwidth"] = lambda x: int(float(x)) atribute_parser["average_bandwidth"] = int atribute_parser["frame_rate"] = float atribute_parser["video_range"] = str atribute_parser["hdcp_level"] = str state["stream_info"] = _parse_attribute_list(protocols.EXT_X_STREAM_INF, line, atribute_parser) def _parse_i_frame_stream_inf(line: str, data: dict) -> None: atribute_parser = remove_quotes_parser("codecs", "uri", "pathway_id") atribute_parser["program_id"] = int atribute_parser["bandwidth"] = int atribute_parser["average_bandwidth"] = int atribute_parser["video_range"] = str atribute_parser["hdcp_level"] = str iframe_stream_info = _parse_attribute_list(protocols.EXT_X_I_FRAME_STREAM_INF, line, atribute_parser) iframe_playlist = {"uri": iframe_stream_info.pop("uri"), "iframe_stream_info": iframe_stream_info} data["iframe_playlists"].append(iframe_playlist) def _parse_media(line: str, data: dict, state: dict) -> None: # noqa quoted = remove_quotes_parser( "uri", "group_id", "language", "assoc_language", "name", "instream_id", "characteristics", "channels" ) media = _parse_attribute_list(protocols.EXT_X_MEDIA, line, quoted) data["media"].append(media) def _parse_variant_playlist(line: str, data: dict, state: dict) -> None: playlist = {"uri": line, "stream_info": state.pop("stream_info")} data["playlists"].append(playlist) def _parse_bitrate(line: str, state: dict) -> None: if "segment" not in state: state["segment"] = {} state["segment"]["bitrate"] = line.replace(f"{protocols.EXT_X_BIT_RATE}:", "") def _parse_byterange(line: str, state: dict) -> None: if "segment" not in state: state["segment"] = {} state["segment"]["byterange"] = line.replace(f"{protocols.EXT_X_BYTE_RANGE}:", "") def _parse_simple_parameter_raw_value(line: str, cast_to: Callable = str, normalize=False): param, value = line.split(":", 1) param = normalize_attribute(param.replace("#EXT-X-", "")) if normalize: value = value.strip().lower() return param, cast_to(value) def _parse_and_set_simple_parameter_raw_value(line: str, data: dict, cast_to: type = str, normalize=False): param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize) data[param] = value return data[param] def _parse_simple_parameter(line: str, data: dict, cast_to: type = str): return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True) def _parse_cueout_cont(line: str, state) -> None: elements = line.split(":", 1) if len(elements) != 2: return param, value = elements if res := re.match(".*Duration=(.*),SCTE35=(.*)$", value): state["current_cue_out_duration"] = res[1] state["current_cue_out_scte35"] = res[2] def _cueout_no_duration(line: str) -> None | tuple[None, None]: # this needs to be called first since line.split in all other # parsers will throw a ValueError if passed just this tag if line == protocols.EXT_X_CUE_OUT: return None, None def _cueout_elemental(line: str, state: dict, prevline: str) -> None | tuple[str, str]: # noqa param, value = line.split(":", 1) if res := re.match(".*EXT-OATCLS-SCTE35:(.*)$", prevline): return res[1], value else: return None def _cueout_envivio(line: str, state: dict, prevline: str) -> None | tuple[str, str]: # noqa param, value = line.split(":", 1) if res := re.match('.*DURATION=(.*),.*,CUE="(.*)"', value): return res[2], res[1] else: return None def _cueout_duration(line: str) -> None | tuple[None, str]: # this needs to be called after _cueout_elemental # as it would capture those cues incompletely # This was added separately rather than modifying "simple" param, value = line.split(":", 1) if res := re.match(r"DURATION=(.*)", value): return None, res[1] def _cueout_simple(line) -> None | tuple[None, str]: # this needs to be called after _cueout_elemental # as it would capture those cues incompletely param, value = line.split(":", 1) if res := re.match(r"^(\d+(?:\.\d)?\d*)$", value): return None, res[1] def _parse_cueout(line: str, state: dict, prevline) -> None: if _cueout_state := ( _cueout_no_duration(line) or _cueout_elemental(line, state, prevline) or _cueout_envivio(line, state, prevline) or _cueout_duration(line) or _cueout_simple(line) ): state["current_cue_out_scte35"] = _cueout_state[0] state["current_cue_out_duration"] = _cueout_state[1] def _parse_server_control(line: str, data, state) -> None: # noqa attribute_parser = { "can_block_reload": str, "hold_back": lambda x: float(x), "part_hold_back": lambda x: float(x), "can_skip_until": lambda x: float(x), "can_skip_dateranges": str, } data["server_control"] = _parse_attribute_list(protocols.EXT_X_SERVER_CONTROL, line, attribute_parser) def _parse_part_inf(line: str, data: dict, state: dict) -> None: # noqa attribute_parser = {"part_target": lambda x: float(x)} data["part_inf"] = _parse_attribute_list(protocols.EXT_X_PART_INF, line, attribute_parser) def _parse_rendition_report(line: str, data: dict, state: dict) -> None: # noqa attribute_parser = remove_quotes_parser("uri") attribute_parser["last_msn"] = int attribute_parser["last_part"] = int rendition_report = _parse_attribute_list(protocols.EXT_X_RENDITION_REPORT, line, attribute_parser) data["rendition_reports"].append(rendition_report) def _parse_part(line: str, data: dict, state: dict) -> None: # noqa attribute_parser = remove_quotes_parser("uri") attribute_parser["duration"] = lambda x: float(x) attribute_parser["independent"] = str attribute_parser["gap"] = str attribute_parser["byterange"] = str part = _parse_attribute_list(protocols.EXT_X_PART, line, attribute_parser) # this should always be true according to spec if state.get("current_program_date_time"): part["program_date_time"] = state["current_program_date_time"] state["current_program_date_time"] += datetime.timedelta(seconds=part["duration"]) part["dateranges"] = state.pop("dateranges", None) part["gap_tag"] = state.pop("gap", None) if "segment" not in state: state["segment"] = {} segment = state["segment"] if "parts" not in segment: segment["parts"] = [] segment["parts"].append(part) def _parse_skip(line: str, data: dict, state: dict) -> None: # noqa attribute_parser = remove_quotes_parser("recently_removed_dateranges") attribute_parser["skipped_segments"] = int data["skip"] = _parse_attribute_list(protocols.EXT_X_SKIP, line, attribute_parser) def _parse_session_data(line: str, data: dict, state: dict) -> None: # noqa quoted = remove_quotes_parser("data_id", "value", "uri", "language") session_data = _parse_attribute_list(protocols.EXT_X_SESSION_DATA, line, quoted) data["session_data"].append(session_data) def _parse_session_key(line: str, data: dict, state: dict) -> None: # noqa params = ATTRIBUTELISTPATTERN.split(line.replace(f"{protocols.EXT_X_SESSION_KEY}:", ""))[1::2] key = {} for param in params: name, value = param.split("=", 1) key[normalize_attribute(name)] = remove_quotes(value) data["session_keys"].append(key) def _parse_preload_hint(line: str, data: dict, state: dict) -> None: # noqa attribute_parser = remove_quotes_parser("uri") attribute_parser["type"] = str attribute_parser["byterange_start"] = int attribute_parser["byterange_length"] = int data["preload_hint"] = _parse_attribute_list(protocols.EXT_X_PRELOAD_HINT, line, attribute_parser) def _parse_daterange(line: str, date: dict, state: dict) -> None: # noqa attribute_parser = remove_quotes_parser("id", "class", "start_date", "end_date") attribute_parser["duration"] = float attribute_parser["planned_duration"] = float attribute_parser["end_on_next"] = str attribute_parser["scte35_cmd"] = str attribute_parser["scte35_out"] = str attribute_parser["scte35_in"] = str parsed = _parse_attribute_list(protocols.EXT_X_DATERANGE, line, attribute_parser) if "dateranges" not in state: state["dateranges"] = [] state["dateranges"].append(parsed) def _parse_content_steering(line: str, data: dict, state: dict) -> None: # noqa attribute_parser = remove_quotes_parser("server_uri", "pathway_id") data["content_steering"] = _parse_attribute_list(protocols.EXT_X_CONTENT_STEERING, line, attribute_parser)
[docs] def string_to_lines(string: str) -> list[str]: return string.strip().splitlines()
[docs] def remove_quotes_parser(*attrs: str) -> dict[str, Callable]: return dict(zip(attrs, itertools.repeat(remove_quotes)))
[docs] def remove_quotes(string: str) -> str: """ Remove quotes from string. Ex.: "foo" -> foo 'foo' -> foo 'foo -> 'foo """ quotes = ('"', "'") if string.startswith(quotes) and string.endswith(quotes): return string[1:-1] return string
[docs] def normalize_attribute(attribute: str) -> str: return attribute.replace("-", "_").lower().strip()
[docs] def is_url(uri: str) -> bool: return f"{uri}".startswith(URI_PREFIXES)
[docs] def urljoin(base: str, url: str) -> str: base = base.replace("://", "\1") url = url.replace("://", "\1") while "//" in base: base = base.replace("//", "/\0/") while "//" in url: url = url.replace("//", "/\0/") return _urljoin(base.replace("\1", "://"), url.replace("\1", "://")).replace("\0", "")
[docs] def get_segment_custom_value(state: dict, key: str, default: ANY_GENERIC_TYPE = None) -> ANY_GENERIC_TYPE | Any: """ Helper function for getting custom values for Segment Are useful with custom_tags_parser """ if "segment" not in state: return default if "custom_parser_values" not in state["segment"]: return default return state["segment"]["custom_parser_values"].get(key, default)
[docs] def save_segment_custom_value(state: dict, key: str, value: Any) -> None: """ Helper function for saving custom values for Segment Are useful with custom_tags_parser """ if "segment" not in state: state["segment"] = {} if "custom_parser_values" not in state["segment"]: state["segment"]["custom_parser_values"] = {} state["segment"]["custom_parser_values"][key] = value