from __future__ import annotations
import datetime
import itertools
import re
from collections.abc import Callable
from typing import Any
from urllib.parse import urljoin as _urljoin
import iso8601
from pylav.extension.m3u import protocols
from pylav.type_hints.generics import ANY_GENERIC_TYPE
# coding: utf-8
# Copyright 2014 Globo.com Player authors. All rights reserved.
# Use of this source code is governed by a MIT License
# license that can be found in the LICENSE file.
"""
http://tools.ietf.org/html/draft-pantos-http-live-streaming-08#section-3.2
http://stackoverflow.com/questions/2785755/how-to-split-but-ignore-separators-in-quoted-strings-in-python
"""
ATTRIBUTELISTPATTERN = re.compile(r"""((?:[^,"']|"[^"]*"|'[^']*')+)""")
URI_PREFIXES = ("https://", "http://", "s3://", "s3a://", "s3n://")
[docs]
def cast_date_time(value: str) -> datetime.datetime:
return iso8601.parse_date(value)
[docs]
class ParseError(Exception):
def __init__(self, lineno: int, line: str) -> None:
self.lineno = lineno
self.line = line
def __str__(self) -> str:
return f"Syntax error in manifest on line {self.lineno:d}: {self.line}"
[docs]
def parse(content: str, strict: bool = False, custom_tags_parser: bool = None) -> dict[str, Any]:
"""
Given a M3U8 playlist content returns a dictionary with all data found
"""
data = {
"media_sequence": 0,
"is_variant": False,
"is_endlist": False,
"is_i_frames_only": False,
"is_independent_segments": False,
"playlist_type": None,
"playlists": [],
"segments": [],
"iframe_playlists": [],
"media": [],
"keys": [],
"rendition_reports": [],
"skip": {},
"part_inf": {},
"session_data": [],
"session_keys": [],
}
state = {
"expect_segment": False,
"expect_playlist": False,
"current_key": None,
"current_segment_map": None,
}
lineno = 0
for line in string_to_lines(content):
lineno += 1
line = line.strip()
# Call custom parser if needed
if line.startswith("#") and callable(custom_tags_parser):
go_to_next_line = custom_tags_parser(line, lineno, data, state)
# Do not try to parse other standard tags on this line if custom_tags_parser function returns 'True'
if go_to_next_line:
continue
if line.startswith(protocols.EXT_X_BYTE_RANGE):
_parse_byterange(line, state)
state["expect_segment"] = True
continue
_process_line(content, data, line, lineno, state, strict)
# there could be remaining partial segments
if "segment" in state:
data["segments"].append(state.pop("segment"))
return data
def _process_line(content, data, line, lineno, state, strict): # sourcery skip: low-code-quality
if line.startswith(protocols.EXT_X_BIT_RATE):
_parse_bitrate(line, state)
elif line.startswith(protocols.EXT_X_TARGET_DURATION):
_parse_simple_parameter(line, data, float)
elif line.startswith(protocols.EXT_X_MEDIA_SEQUENCE):
_parse_simple_parameter(line, data, int)
elif line.startswith(protocols.EXT_X_DISCONTINUITY_SEQUENCE):
_parse_simple_parameter(line, data, int)
elif line.startswith(protocols.EXT_X_PROGRAM_DATE_TIME):
_, program_date_time = _parse_simple_parameter_raw_value(line, cast_date_time)
if not data.get("program_date_time"):
data["program_date_time"] = program_date_time
state["current_program_date_time"] = program_date_time
state["program_date_time"] = program_date_time
elif line.startswith(protocols.EXT_X_DISCONTINUITY):
state["discontinuity"] = True
elif line.startswith(protocols.EXT_X_CUE_OUT_CONT):
_parse_cueout_cont(line, state)
state["cue_out"] = True
elif line.startswith(protocols.EXT_X_CUE_OUT):
_parse_cueout(line, state, string_to_lines(content)[lineno - 2])
state["cue_out_start"] = True
state["cue_out"] = True
elif line.startswith(protocols.EXT_X_CUE_IN):
state["cue_in"] = True
elif line.startswith(protocols.EXT_X_CUE_SPAN):
state["cue_out"] = True
elif line.startswith(protocols.EXT_X_VERSION):
_parse_simple_parameter(line, data, int)
elif line.startswith(protocols.EXT_X_ALLOW_CACHE):
_parse_simple_parameter(line, data)
elif line.startswith(protocols.EXT_X_KEY):
key = _parse_key(line)
state["current_key"] = key
if key not in data["keys"]:
data["keys"].append(key)
elif line.startswith(protocols.EXT_INF):
_parse_extinf(line, data, state, lineno, strict)
state["expect_segment"] = True
elif line.startswith(protocols.EXT_X_STREAM_INF):
state["expect_playlist"] = True
_parse_stream_inf(line, data, state)
elif line.startswith(protocols.EXT_X_I_FRAME_STREAM_INF):
_parse_i_frame_stream_inf(line, data)
elif line.startswith(protocols.EXT_X_MEDIA):
_parse_media(line, data, state)
elif line.startswith(protocols.EXT_X_PLAYLIST_TYPE):
_parse_simple_parameter(line, data)
elif line.startswith(protocols.EXT_I_FRAMES_ONLY):
data["is_i_frames_only"] = True
elif line.startswith(protocols.EXT_IS_INDEPENDENT_SEGMENTS):
data["is_independent_segments"] = True
elif line.startswith(protocols.EXT_X_END_LIST):
data["is_endlist"] = True
elif line.startswith(protocols.EXT_X_MAP):
quoted_parser = remove_quotes_parser("uri")
segment_map_info = _parse_attribute_list(protocols.EXT_X_MAP, line, quoted_parser)
state["current_segment_map"] = segment_map_info
# left for backward compatibility
data["segment_map"] = segment_map_info
elif line.startswith(protocols.EXT_X_START):
attribute_parser = {"time_offset": lambda x: float(x)}
start_info = _parse_attribute_list(protocols.EXT_X_START, line, attribute_parser)
data["start"] = start_info
elif line.startswith(protocols.EXT_X_SERVER_CONTROL):
_parse_server_control(line, data, state)
elif line.startswith(protocols.EXT_X_PART_INF):
_parse_part_inf(line, data, state)
elif line.startswith(protocols.EXT_X_RENDITION_REPORT):
_parse_rendition_report(line, data, state)
elif line.startswith(protocols.EXT_X_PART):
_parse_part(line, data, state)
elif line.startswith(protocols.EXT_X_SKIP):
_parse_skip(line, data, state)
elif line.startswith(protocols.EXT_X_SESSION_DATA):
_parse_session_data(line, data, state)
elif line.startswith(protocols.EXT_X_SESSION_KEY):
_parse_session_key(line, data, state)
elif line.startswith(protocols.EXT_X_PRELOAD_HINT):
_parse_preload_hint(line, data, state)
elif line.startswith(protocols.EXT_X_DATERANGE):
_parse_daterange(line, data, state)
elif line.startswith(protocols.EXT_X_GAP):
state["gap"] = True
elif line.startswith(protocols.EXT_X_CONTENT_STEERING):
_parse_content_steering(line, data, state)
elif line.startswith(protocols.EXT_M3U):
# We don't parse #EXTM3U, it just should to be present
pass
elif line.strip() == "":
# blank lines are legal
pass
elif state["expect_segment"]:
_parse_ts_chunk(line, data, state)
state["expect_segment"] = False
elif state["expect_playlist"]:
_parse_variant_playlist(line, data, state)
state["expect_playlist"] = False
elif strict:
raise ParseError(lineno, line)
def _parse_key(line: str) -> dict[str, Any]:
params = ATTRIBUTELISTPATTERN.split(line.replace(f"{protocols.EXT_X_KEY}:", ""))[1::2]
key = {}
for param in params:
name, value = param.split("=", 1)
key[normalize_attribute(name)] = remove_quotes(value)
return key
def _parse_extinf(line: str, data: dict, state: dict, lineno: int, strict: bool) -> None: # noqa
chunks = line.replace(f"{protocols.EXT_INF}:", "").split(",", 1)
if len(chunks) == 2:
duration, title = chunks
elif len(chunks) == 1:
if strict:
raise ParseError(lineno, line)
duration = chunks[0]
title = ""
if "segment" not in state:
state["segment"] = {}
state["segment"]["duration"] = float(duration) # noqa
state["segment"]["title"] = title # noqa
def _parse_ts_chunk(line: str, data: dict, state: dict) -> None:
segment = state.pop("segment")
if state.get("program_date_time"):
segment["program_date_time"] = state.pop("program_date_time")
if state.get("current_program_date_time"):
segment["current_program_date_time"] = state["current_program_date_time"]
state["current_program_date_time"] += datetime.timedelta(seconds=segment["duration"])
segment["uri"] = line
segment["cue_in"] = state.pop("cue_in", False)
segment["cue_out"] = state.pop("cue_out", False)
segment["cue_out_start"] = state.pop("cue_out_start", False)
scte_op = state.pop if segment["cue_in"] else state.get
segment["scte35"] = scte_op("current_cue_out_scte35", None)
segment["scte35_duration"] = scte_op("current_cue_out_duration", None)
segment["discontinuity"] = state.pop("discontinuity", False)
if state.get("current_key"):
segment["key"] = state["current_key"]
elif None not in data["keys"]:
data["keys"].append(None)
if state.get("current_segment_map"):
segment["init_section"] = state["current_segment_map"]
segment["dateranges"] = state.pop("dateranges", None)
segment["gap_tag"] = state.pop("gap", None)
data["segments"].append(segment)
def _parse_attribute_list(prefix: str, line: str, atribute_parser: dict) -> dict:
params = ATTRIBUTELISTPATTERN.split(line.replace(f"{prefix}:", ""))[1::2]
attributes = {}
for param in params:
name, value = param.split("=", 1)
name = normalize_attribute(name)
if name in atribute_parser:
value = atribute_parser[name](value)
attributes[name] = value
return attributes
def _parse_stream_inf(line: str, data: dict, state: dict) -> None:
data["is_variant"] = True
data["media_sequence"] = None
atribute_parser = remove_quotes_parser("codecs", "audio", "video", "subtitles", "closed_captions", "pathway_id")
atribute_parser["program_id"] = int
atribute_parser["bandwidth"] = lambda x: int(float(x))
atribute_parser["average_bandwidth"] = int
atribute_parser["frame_rate"] = float
atribute_parser["video_range"] = str
atribute_parser["hdcp_level"] = str
state["stream_info"] = _parse_attribute_list(protocols.EXT_X_STREAM_INF, line, atribute_parser)
def _parse_i_frame_stream_inf(line: str, data: dict) -> None:
atribute_parser = remove_quotes_parser("codecs", "uri", "pathway_id")
atribute_parser["program_id"] = int
atribute_parser["bandwidth"] = int
atribute_parser["average_bandwidth"] = int
atribute_parser["video_range"] = str
atribute_parser["hdcp_level"] = str
iframe_stream_info = _parse_attribute_list(protocols.EXT_X_I_FRAME_STREAM_INF, line, atribute_parser)
iframe_playlist = {"uri": iframe_stream_info.pop("uri"), "iframe_stream_info": iframe_stream_info}
data["iframe_playlists"].append(iframe_playlist)
def _parse_media(line: str, data: dict, state: dict) -> None: # noqa
quoted = remove_quotes_parser(
"uri", "group_id", "language", "assoc_language", "name", "instream_id", "characteristics", "channels"
)
media = _parse_attribute_list(protocols.EXT_X_MEDIA, line, quoted)
data["media"].append(media)
def _parse_variant_playlist(line: str, data: dict, state: dict) -> None:
playlist = {"uri": line, "stream_info": state.pop("stream_info")}
data["playlists"].append(playlist)
def _parse_bitrate(line: str, state: dict) -> None:
if "segment" not in state:
state["segment"] = {}
state["segment"]["bitrate"] = line.replace(f"{protocols.EXT_X_BIT_RATE}:", "")
def _parse_byterange(line: str, state: dict) -> None:
if "segment" not in state:
state["segment"] = {}
state["segment"]["byterange"] = line.replace(f"{protocols.EXT_X_BYTE_RANGE}:", "")
def _parse_simple_parameter_raw_value(line: str, cast_to: Callable = str, normalize=False):
param, value = line.split(":", 1)
param = normalize_attribute(param.replace("#EXT-X-", ""))
if normalize:
value = value.strip().lower()
return param, cast_to(value)
def _parse_and_set_simple_parameter_raw_value(line: str, data: dict, cast_to: type = str, normalize=False):
param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize)
data[param] = value
return data[param]
def _parse_simple_parameter(line: str, data: dict, cast_to: type = str):
return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True)
def _parse_cueout_cont(line: str, state) -> None:
elements = line.split(":", 1)
if len(elements) != 2:
return
param, value = elements
if res := re.match(".*Duration=(.*),SCTE35=(.*)$", value):
state["current_cue_out_duration"] = res[1]
state["current_cue_out_scte35"] = res[2]
def _cueout_no_duration(line: str) -> None | tuple[None, None]:
# this needs to be called first since line.split in all other
# parsers will throw a ValueError if passed just this tag
if line == protocols.EXT_X_CUE_OUT:
return None, None
def _cueout_elemental(line: str, state: dict, prevline: str) -> None | tuple[str, str]: # noqa
param, value = line.split(":", 1)
if res := re.match(".*EXT-OATCLS-SCTE35:(.*)$", prevline):
return res[1], value
else:
return None
def _cueout_envivio(line: str, state: dict, prevline: str) -> None | tuple[str, str]: # noqa
param, value = line.split(":", 1)
if res := re.match('.*DURATION=(.*),.*,CUE="(.*)"', value):
return res[2], res[1]
else:
return None
def _cueout_duration(line: str) -> None | tuple[None, str]:
# this needs to be called after _cueout_elemental
# as it would capture those cues incompletely
# This was added separately rather than modifying "simple"
param, value = line.split(":", 1)
if res := re.match(r"DURATION=(.*)", value):
return None, res[1]
def _cueout_simple(line) -> None | tuple[None, str]:
# this needs to be called after _cueout_elemental
# as it would capture those cues incompletely
param, value = line.split(":", 1)
if res := re.match(r"^(\d+(?:\.\d)?\d*)$", value):
return None, res[1]
def _parse_cueout(line: str, state: dict, prevline) -> None:
if _cueout_state := (
_cueout_no_duration(line)
or _cueout_elemental(line, state, prevline)
or _cueout_envivio(line, state, prevline)
or _cueout_duration(line)
or _cueout_simple(line)
):
state["current_cue_out_scte35"] = _cueout_state[0]
state["current_cue_out_duration"] = _cueout_state[1]
def _parse_server_control(line: str, data, state) -> None: # noqa
attribute_parser = {
"can_block_reload": str,
"hold_back": lambda x: float(x),
"part_hold_back": lambda x: float(x),
"can_skip_until": lambda x: float(x),
"can_skip_dateranges": str,
}
data["server_control"] = _parse_attribute_list(protocols.EXT_X_SERVER_CONTROL, line, attribute_parser)
def _parse_part_inf(line: str, data: dict, state: dict) -> None: # noqa
attribute_parser = {"part_target": lambda x: float(x)}
data["part_inf"] = _parse_attribute_list(protocols.EXT_X_PART_INF, line, attribute_parser)
def _parse_rendition_report(line: str, data: dict, state: dict) -> None: # noqa
attribute_parser = remove_quotes_parser("uri")
attribute_parser["last_msn"] = int
attribute_parser["last_part"] = int
rendition_report = _parse_attribute_list(protocols.EXT_X_RENDITION_REPORT, line, attribute_parser)
data["rendition_reports"].append(rendition_report)
def _parse_part(line: str, data: dict, state: dict) -> None: # noqa
attribute_parser = remove_quotes_parser("uri")
attribute_parser["duration"] = lambda x: float(x)
attribute_parser["independent"] = str
attribute_parser["gap"] = str
attribute_parser["byterange"] = str
part = _parse_attribute_list(protocols.EXT_X_PART, line, attribute_parser)
# this should always be true according to spec
if state.get("current_program_date_time"):
part["program_date_time"] = state["current_program_date_time"]
state["current_program_date_time"] += datetime.timedelta(seconds=part["duration"])
part["dateranges"] = state.pop("dateranges", None)
part["gap_tag"] = state.pop("gap", None)
if "segment" not in state:
state["segment"] = {}
segment = state["segment"]
if "parts" not in segment:
segment["parts"] = []
segment["parts"].append(part)
def _parse_skip(line: str, data: dict, state: dict) -> None: # noqa
attribute_parser = remove_quotes_parser("recently_removed_dateranges")
attribute_parser["skipped_segments"] = int
data["skip"] = _parse_attribute_list(protocols.EXT_X_SKIP, line, attribute_parser)
def _parse_session_data(line: str, data: dict, state: dict) -> None: # noqa
quoted = remove_quotes_parser("data_id", "value", "uri", "language")
session_data = _parse_attribute_list(protocols.EXT_X_SESSION_DATA, line, quoted)
data["session_data"].append(session_data)
def _parse_session_key(line: str, data: dict, state: dict) -> None: # noqa
params = ATTRIBUTELISTPATTERN.split(line.replace(f"{protocols.EXT_X_SESSION_KEY}:", ""))[1::2]
key = {}
for param in params:
name, value = param.split("=", 1)
key[normalize_attribute(name)] = remove_quotes(value)
data["session_keys"].append(key)
def _parse_preload_hint(line: str, data: dict, state: dict) -> None: # noqa
attribute_parser = remove_quotes_parser("uri")
attribute_parser["type"] = str
attribute_parser["byterange_start"] = int
attribute_parser["byterange_length"] = int
data["preload_hint"] = _parse_attribute_list(protocols.EXT_X_PRELOAD_HINT, line, attribute_parser)
def _parse_daterange(line: str, date: dict, state: dict) -> None: # noqa
attribute_parser = remove_quotes_parser("id", "class", "start_date", "end_date")
attribute_parser["duration"] = float
attribute_parser["planned_duration"] = float
attribute_parser["end_on_next"] = str
attribute_parser["scte35_cmd"] = str
attribute_parser["scte35_out"] = str
attribute_parser["scte35_in"] = str
parsed = _parse_attribute_list(protocols.EXT_X_DATERANGE, line, attribute_parser)
if "dateranges" not in state:
state["dateranges"] = []
state["dateranges"].append(parsed)
def _parse_content_steering(line: str, data: dict, state: dict) -> None: # noqa
attribute_parser = remove_quotes_parser("server_uri", "pathway_id")
data["content_steering"] = _parse_attribute_list(protocols.EXT_X_CONTENT_STEERING, line, attribute_parser)
[docs]
def string_to_lines(string: str) -> list[str]:
return string.strip().splitlines()
[docs]
def remove_quotes_parser(*attrs: str) -> dict[str, Callable]:
return dict(zip(attrs, itertools.repeat(remove_quotes)))
[docs]
def remove_quotes(string: str) -> str:
"""
Remove quotes from string.
Ex.:
"foo" -> foo
'foo' -> foo
'foo -> 'foo
"""
quotes = ('"', "'")
if string.startswith(quotes) and string.endswith(quotes):
return string[1:-1]
return string
[docs]
def normalize_attribute(attribute: str) -> str:
return attribute.replace("-", "_").lower().strip()
[docs]
def is_url(uri: str) -> bool:
return f"{uri}".startswith(URI_PREFIXES)
[docs]
def urljoin(base: str, url: str) -> str:
base = base.replace("://", "\1")
url = url.replace("://", "\1")
while "//" in base:
base = base.replace("//", "/\0/")
while "//" in url:
url = url.replace("//", "/\0/")
return _urljoin(base.replace("\1", "://"), url.replace("\1", "://")).replace("\0", "")
[docs]
def get_segment_custom_value(state: dict, key: str, default: ANY_GENERIC_TYPE = None) -> ANY_GENERIC_TYPE | Any:
"""
Helper function for getting custom values for Segment
Are useful with custom_tags_parser
"""
if "segment" not in state:
return default
if "custom_parser_values" not in state["segment"]:
return default
return state["segment"]["custom_parser_values"].get(key, default)
[docs]
def save_segment_custom_value(state: dict, key: str, value: Any) -> None:
"""
Helper function for saving custom values for Segment
Are useful with custom_tags_parser
"""
if "segment" not in state:
state["segment"] = {}
if "custom_parser_values" not in state["segment"]:
state["segment"]["custom_parser_values"] = {}
state["segment"]["custom_parser_values"][key] = value