Merge branch 'ytdl-org:master' into master

This commit is contained in:
afterdelight 2022-11-12 04:57:02 +07:00 committed by GitHub
commit 8eb7a188df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 1026 additions and 103 deletions

View file

@ -11,6 +11,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from youtube_dl.compat import ( from youtube_dl.compat import (
compat_casefold,
compat_getenv, compat_getenv,
compat_setenv, compat_setenv,
compat_etree_Element, compat_etree_Element,
@ -118,9 +119,21 @@ class TestCompat(unittest.TestCase):
<smil xmlns="http://www.w3.org/2001/SMIL20/Language"></smil>''' <smil xmlns="http://www.w3.org/2001/SMIL20/Language"></smil>'''
compat_etree_fromstring(xml) compat_etree_fromstring(xml)
def test_struct_unpack(self): def test_compat_struct_unpack(self):
self.assertEqual(compat_struct_unpack('!B', b'\x00'), (0,)) self.assertEqual(compat_struct_unpack('!B', b'\x00'), (0,))
def test_compat_casefold(self):
if hasattr(compat_str, 'casefold'):
# don't bother to test str.casefold() (again)
return
# thanks https://bugs.python.org/file24232/casefolding.patch
self.assertEqual(compat_casefold('hello'), 'hello')
self.assertEqual(compat_casefold('hELlo'), 'hello')
self.assertEqual(compat_casefold('ß'), 'ss')
self.assertEqual(compat_casefold(''), 'fi')
self.assertEqual(compat_casefold('\u03a3'), '\u03c3')
self.assertEqual(compat_casefold('A\u0345\u03a3'), 'a\u03b9\u03c3')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -12,7 +12,9 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Various small unit tests # Various small unit tests
import io import io
import itertools
import json import json
import re
import xml.etree.ElementTree import xml.etree.ElementTree
from youtube_dl.utils import ( from youtube_dl.utils import (
@ -40,11 +42,14 @@ from youtube_dl.utils import (
get_element_by_attribute, get_element_by_attribute,
get_elements_by_class, get_elements_by_class,
get_elements_by_attribute, get_elements_by_attribute,
get_first,
InAdvancePagedList, InAdvancePagedList,
int_or_none, int_or_none,
intlist_to_bytes, intlist_to_bytes,
is_html, is_html,
join_nonempty,
js_to_json, js_to_json,
LazyList,
limit_length, limit_length,
merge_dicts, merge_dicts,
mimetype2ext, mimetype2ext,
@ -79,6 +84,8 @@ from youtube_dl.utils import (
strip_or_none, strip_or_none,
subtitles_filename, subtitles_filename,
timeconvert, timeconvert,
traverse_obj,
try_call,
unescapeHTML, unescapeHTML,
unified_strdate, unified_strdate,
unified_timestamp, unified_timestamp,
@ -92,6 +99,7 @@ from youtube_dl.utils import (
urlencode_postdata, urlencode_postdata,
urshift, urshift,
update_url_query, update_url_query,
variadic,
version_tuple, version_tuple,
xpath_with_ns, xpath_with_ns,
xpath_element, xpath_element,
@ -112,12 +120,18 @@ from youtube_dl.compat import (
compat_getenv, compat_getenv,
compat_os_name, compat_os_name,
compat_setenv, compat_setenv,
compat_str,
compat_urlparse, compat_urlparse,
compat_parse_qs, compat_parse_qs,
) )
class TestUtil(unittest.TestCase): class TestUtil(unittest.TestCase):
# yt-dlp shim
def assertCountEqual(self, expected, got, msg='count should be the same'):
return self.assertEqual(len(tuple(expected)), len(tuple(got)), msg=msg)
def test_timeconvert(self): def test_timeconvert(self):
self.assertTrue(timeconvert('') is None) self.assertTrue(timeconvert('') is None)
self.assertTrue(timeconvert('bougrg') is None) self.assertTrue(timeconvert('bougrg') is None)
@ -1478,6 +1492,315 @@ Line 1
self.assertEqual(clean_podcast_url('https://www.podtrac.com/pts/redirect.mp3/chtbl.com/track/5899E/traffic.megaphone.fm/HSW7835899191.mp3'), 'https://traffic.megaphone.fm/HSW7835899191.mp3') self.assertEqual(clean_podcast_url('https://www.podtrac.com/pts/redirect.mp3/chtbl.com/track/5899E/traffic.megaphone.fm/HSW7835899191.mp3'), 'https://traffic.megaphone.fm/HSW7835899191.mp3')
self.assertEqual(clean_podcast_url('https://play.podtrac.com/npr-344098539/edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3'), 'https://edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3') self.assertEqual(clean_podcast_url('https://play.podtrac.com/npr-344098539/edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3'), 'https://edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3')
def test_LazyList(self):
it = list(range(10))
self.assertEqual(list(LazyList(it)), it)
self.assertEqual(LazyList(it).exhaust(), it)
self.assertEqual(LazyList(it)[5], it[5])
self.assertEqual(LazyList(it)[5:], it[5:])
self.assertEqual(LazyList(it)[:5], it[:5])
self.assertEqual(LazyList(it)[::2], it[::2])
self.assertEqual(LazyList(it)[1::2], it[1::2])
self.assertEqual(LazyList(it)[5::-1], it[5::-1])
self.assertEqual(LazyList(it)[6:2:-2], it[6:2:-2])
self.assertEqual(LazyList(it)[::-1], it[::-1])
self.assertTrue(LazyList(it))
self.assertFalse(LazyList(range(0)))
self.assertEqual(len(LazyList(it)), len(it))
self.assertEqual(repr(LazyList(it)), repr(it))
self.assertEqual(compat_str(LazyList(it)), compat_str(it))
self.assertEqual(list(LazyList(it, reverse=True)), it[::-1])
self.assertEqual(list(reversed(LazyList(it))[::-1]), it)
self.assertEqual(list(reversed(LazyList(it))[1:3:7]), it[::-1][1:3:7])
def test_LazyList_laziness(self):
def test(ll, idx, val, cache):
self.assertEqual(ll[idx], val)
self.assertEqual(ll._cache, list(cache))
ll = LazyList(range(10))
test(ll, 0, 0, range(1))
test(ll, 5, 5, range(6))
test(ll, -3, 7, range(10))
ll = LazyList(range(10), reverse=True)
test(ll, -1, 0, range(1))
test(ll, 3, 6, range(10))
ll = LazyList(itertools.count())
test(ll, 10, 10, range(11))
ll = reversed(ll)
test(ll, -15, 14, range(15))
def test_try_call(self):
def total(*x, **kwargs):
return sum(x) + sum(kwargs.values())
self.assertEqual(try_call(None), None,
msg='not a fn should give None')
self.assertEqual(try_call(lambda: 1), 1,
msg='int fn with no expected_type should give int')
self.assertEqual(try_call(lambda: 1, expected_type=int), 1,
msg='int fn with expected_type int should give int')
self.assertEqual(try_call(lambda: 1, expected_type=dict), None,
msg='int fn with wrong expected_type should give None')
self.assertEqual(try_call(total, args=(0, 1, 0, ), expected_type=int), 1,
msg='fn should accept arglist')
self.assertEqual(try_call(total, kwargs={'a': 0, 'b': 1, 'c': 0}, expected_type=int), 1,
msg='fn should accept kwargs')
self.assertEqual(try_call(lambda: 1, expected_type=dict), None,
msg='int fn with no expected_type should give None')
self.assertEqual(try_call(lambda x: {}, total, args=(42, ), expected_type=int), 42,
msg='expect first int result with expected_type int')
def test_variadic(self):
self.assertEqual(variadic(None), (None, ))
self.assertEqual(variadic('spam'), ('spam', ))
self.assertEqual(variadic('spam', allowed_types=dict), 'spam')
def test_traverse_obj(self):
_TEST_DATA = {
100: 100,
1.2: 1.2,
'str': 'str',
'None': None,
'...': Ellipsis,
'urls': [
{'index': 0, 'url': 'https://www.example.com/0'},
{'index': 1, 'url': 'https://www.example.com/1'},
],
'data': (
{'index': 2},
{'index': 3},
),
'dict': {},
}
# Test base functionality
self.assertEqual(traverse_obj(_TEST_DATA, ('str',)), 'str',
msg='allow tuple path')
self.assertEqual(traverse_obj(_TEST_DATA, ['str']), 'str',
msg='allow list path')
self.assertEqual(traverse_obj(_TEST_DATA, (value for value in ("str",))), 'str',
msg='allow iterable path')
self.assertEqual(traverse_obj(_TEST_DATA, 'str'), 'str',
msg='single items should be treated as a path')
self.assertEqual(traverse_obj(_TEST_DATA, None), _TEST_DATA)
self.assertEqual(traverse_obj(_TEST_DATA, 100), 100)
self.assertEqual(traverse_obj(_TEST_DATA, 1.2), 1.2)
# Test Ellipsis behavior
self.assertCountEqual(traverse_obj(_TEST_DATA, Ellipsis),
(item for item in _TEST_DATA.values() if item is not None),
msg='`...` should give all values except `None`')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', 0, Ellipsis)), _TEST_DATA['urls'][0].values(),
msg='`...` selection for dicts should select all values')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, Ellipsis, 'url')),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='nested `...` queries should work')
self.assertCountEqual(traverse_obj(_TEST_DATA, (Ellipsis, Ellipsis, 'index')), range(4),
msg='`...` query result should be flattened')
# Test function as key
self.assertEqual(traverse_obj(_TEST_DATA, lambda x, y: x == 'urls' and isinstance(y, list)),
[_TEST_DATA['urls']],
msg='function as query key should perform a filter based on (key, value)')
self.assertCountEqual(traverse_obj(_TEST_DATA, lambda _, x: isinstance(x[0], compat_str)), {'str'},
msg='exceptions in the query function should be caught')
# Test alternative paths
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'str'), 'str',
msg='multiple `paths` should be treated as alternative paths')
self.assertEqual(traverse_obj(_TEST_DATA, 'str', 100), 'str',
msg='alternatives should exit early')
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'fail'), None,
msg='alternatives should return `default` if exhausted')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, 'fail'), 100), 100,
msg='alternatives should track their own branching return')
self.assertEqual(traverse_obj(_TEST_DATA, ('dict', Ellipsis), ('data', Ellipsis)), list(_TEST_DATA['data']),
msg='alternatives on empty objects should search further')
# Test branch and path nesting
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', (3, 0), 'url')), ['https://www.example.com/0'],
msg='tuple as key should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', [3, 0], 'url')), ['https://www.example.com/0'],
msg='list as key should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', ((1, 'fail'), (0, 'url')))), ['https://www.example.com/0'],
msg='double nesting in path should be treated as paths')
self.assertEqual(traverse_obj(['0', [1, 2]], [(0, 1), 0]), [1],
msg='do not fail early on branching')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', ((1, ('fail', 'url')), (0, 'url')))),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='triple nesting in path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', ('fail', (Ellipsis, 'url')))),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='ellipsis as branch path start gets flattened')
# Test dictionary as key
self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}), {0: 100, 1: 1.2},
msg='dict key should result in a dict with the same keys')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', 0, 'url')}),
{0: 'https://www.example.com/0'},
msg='dict key should allow paths')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', (3, 0), 'url')}),
{0: ['https://www.example.com/0']},
msg='tuple in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', ((1, 'fail'), (0, 'url')))}),
{0: ['https://www.example.com/0']},
msg='double nesting in dict path should be treated as paths')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', ((1, ('fail', 'url')), (0, 'url')))}),
{0: ['https://www.example.com/1', 'https://www.example.com/0']},
msg='triple nesting in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}), {},
msg='remove `None` values when dict key')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}, default=Ellipsis), {0: Ellipsis},
msg='do not remove `None` values if `default`')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {0: {}},
msg='do not remove empty values when dict key')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=Ellipsis), {0: {}},
msg='do not remove empty values when dict key and a default')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', Ellipsis)}), {0: []},
msg='if branch in dict key not successful, return `[]`')
# Testing default parameter behavior
_DEFAULT_DATA = {'None': None, 'int': 0, 'list': []}
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail'), None,
msg='default value should be `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail', 'fail', default=Ellipsis), Ellipsis,
msg='chained fails should result in default')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'None', 'int'), 0,
msg='should not short cirquit on `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail', default=1), 1,
msg='invalid dict key should result in `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'None', default=1), 1,
msg='`None` is a deliberate sentinel and should become `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', 10)), None,
msg='`IndexError` should result in `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail'), default=1), 1,
msg='if branched but not successful return `default` if defined, not `[]`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail'), default=None), None,
msg='if branched but not successful return `default` even if `default` is `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail')), [],
msg='if branched but not successful return `[]`, not `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', Ellipsis)), [],
msg='if branched but object is empty return `[]`, not `default`')
# Testing expected_type behavior
_EXPECTED_TYPE_DATA = {'str': 'str', 'int': 0}
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=compat_str), 'str',
msg='accept matching `expected_type` type')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int), None,
msg='reject non matching `expected_type` type')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: compat_str(x)), '0',
msg='transform type using type function')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str',
expected_type=lambda _: 1 / 0), None,
msg='wrap expected_type function in try_call')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, Ellipsis, expected_type=compat_str), ['str'],
msg='eliminate items that expected_type fails on')
# Test get_all behavior
_GET_ALL_DATA = {'key': [0, 1, 2]}
self.assertEqual(traverse_obj(_GET_ALL_DATA, ('key', Ellipsis), get_all=False), 0,
msg='if not `get_all`, return only first matching value')
self.assertEqual(traverse_obj(_GET_ALL_DATA, Ellipsis, get_all=False), [0, 1, 2],
msg='do not overflatten if not `get_all`')
# Test casesense behavior
_CASESENSE_DATA = {
'KeY': 'value0',
0: {
'KeY': 'value1',
0: {'KeY': 'value2'},
},
# FULLWIDTH LATIN CAPITAL LETTER K
'\uff2bey': 'value3',
}
self.assertEqual(traverse_obj(_CASESENSE_DATA, 'key'), None,
msg='dict keys should be case sensitive unless `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, 'keY',
casesense=False), 'value0',
msg='allow non matching key case if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, '\uff4bey', # FULLWIDTH LATIN SMALL LETTER K
casesense=False), 'value3',
msg='allow non matching Unicode key case if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, (0, ('keY',)),
casesense=False), ['value1'],
msg='allow non matching key case in branch if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, (0, ((0, 'keY'),)),
casesense=False), ['value2'],
msg='allow non matching key case in branch path if `casesense`')
# Test traverse_string behavior
_TRAVERSE_STRING_DATA = {'str': 'str', 1.2: 1.2}
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', 0)), None,
msg='do not traverse into string if not `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', 0),
_traverse_string=True), 's',
msg='traverse into string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, (1.2, 1),
_traverse_string=True), '.',
msg='traverse into converted data if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', Ellipsis),
_traverse_string=True), list('str'),
msg='`...` branching into string should result in list')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', (0, 2)),
_traverse_string=True), ['s', 'r'],
msg='branching into string should result in list')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda _, x: x),
_traverse_string=True), list('str'),
msg='function branching into string should result in list')
# Test is_user_input behavior
_IS_USER_INPUT_DATA = {'range8': list(range(8))}
self.assertEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', '3'),
_is_user_input=True), 3,
msg='allow for string indexing if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', '3:'),
_is_user_input=True), tuple(range(8))[3:],
msg='allow for string slice if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':4:2'),
_is_user_input=True), tuple(range(8))[:4:2],
msg='allow step in string slice if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':'),
_is_user_input=True), range(8),
msg='`:` should be treated as `...` if `is_user_input`')
with self.assertRaises(TypeError, msg='too many params should result in error'):
traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':::'), _is_user_input=True)
# Test re.Match as input obj
mobj = re.match(r'^0(12)(?P<group>3)(4)?$', '0123')
self.assertEqual(traverse_obj(mobj, Ellipsis), [x for x in mobj.groups() if x is not None],
msg='`...` on a `re.Match` should give its `groups()`')
self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 2)), ['0123', '3'],
msg='function on a `re.Match` should give groupno, value starting at 0')
self.assertEqual(traverse_obj(mobj, 'group'), '3',
msg='str key on a `re.Match` should give group with that name')
self.assertEqual(traverse_obj(mobj, 2), '3',
msg='int key on a `re.Match` should give group with that name')
self.assertEqual(traverse_obj(mobj, 'gRoUp', casesense=False), '3',
msg='str key on a `re.Match` should respect casesense')
self.assertEqual(traverse_obj(mobj, 'fail'), None,
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 'gRoUpS', casesense=False), None,
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 8), None,
msg='failing int key on a `re.Match` should return `default`')
def test_get_first(self):
self.assertEqual(get_first([{'a': None}, {'a': 'spam'}], 'a'), 'spam')
def test_join_nonempty(self):
self.assertEqual(join_nonempty('a', 'b'), 'a-b')
self.assertEqual(join_nonempty(
'a', 'b', 'c', 'd',
from_dict={'a': 'c', 'c': [], 'b': 'd', 'd': None}), 'c-d')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,8 +1,20 @@
# coding: utf-8 # coding: utf-8
from __future__ import unicode_literals from __future__ import unicode_literals
from .compat import compat_str from .compat import (
compat_str,
compat_chr,
)
# Below is included the text of icu/CaseFolding.txt retrieved from
# https://github.com/unicode-org/icu/blob/main/icu4c/source/data/unidata/CaseFolding.txt
# In case newly foldable Unicode characters are defined, paste the new version
# of the text inside the ''' marks.
# The text is expected to have only blank lines andlines with 1st character #,
# all ignored, and fold definitions like this:
# `from_hex_code; space_separated_to_hex_code_list; comment`
_map_str = '''
# CaseFolding-15.0.0.txt # CaseFolding-15.0.0.txt
# Date: 2022-02-02, 23:35:35 GMT # Date: 2022-02-02, 23:35:35 GMT
# © 2022 Unicode®, Inc. # © 2022 Unicode®, Inc.
@ -65,7 +77,6 @@ from .compat import compat_str
# have the value C for the status field, and the code point itself for the mapping field. # have the value C for the status field, and the code point itself for the mapping field.
# ================================================================= # =================================================================
_map_str = '''
0041; C; 0061; # LATIN CAPITAL LETTER A 0041; C; 0061; # LATIN CAPITAL LETTER A
0042; C; 0062; # LATIN CAPITAL LETTER B 0042; C; 0062; # LATIN CAPITAL LETTER B
0043; C; 0063; # LATIN CAPITAL LETTER C 0043; C; 0063; # LATIN CAPITAL LETTER C
@ -1627,17 +1638,30 @@ FF3A; C; FF5A; # FULLWIDTH LATIN CAPITAL LETTER Z
1E920; C; 1E942; # ADLAM CAPITAL LETTER KPO 1E920; C; 1E942; # ADLAM CAPITAL LETTER KPO
1E921; C; 1E943; # ADLAM CAPITAL LETTER SHA 1E921; C; 1E943; # ADLAM CAPITAL LETTER SHA
''' '''
def _parse_unichr(s):
s = int(s, 16)
try:
return compat_chr(s)
except ValueError:
# work around "unichr() arg not in range(0x10000) (narrow Python build)"
return ('\\U%08x' % s).decode('unicode-escape')
_map = dict( _map = dict(
(unichr(int(from_, 16)), ''.join((unichr(int(v, 16)) for v in to_.split(' ')))) (_parse_unichr(from_), ''.join(map(_parse_unichr, to_.split(' '))))
for from_, type_, to_, _ in ( for from_, type_, to_, _ in (
l.split('; ', 3) for l in _map_str.splitlines() if l) l.split('; ', 3) for l in _map_str.splitlines() if l and not l[0] == '#')
if type_ in ('C', 'F')) if type_ in ('C', 'F'))
del _map_str del _map_str
def casefold(s): def casefold(s):
assert isinstance(s, compat_str) assert isinstance(s, compat_str)
return ''.join((_map.get(c, c) for c in s)) return ''.join((_map.get(c, c) for c in s))
__all__ = [ __all__ = [
casefold casefold
] ]

View file

@ -12,35 +12,21 @@ from ..utils import (
ExtractorError, ExtractorError,
float_or_none, float_or_none,
sanitized_Request, sanitized_Request,
unescapeHTML, str_or_none,
update_url_query, traverse_obj,
urlencode_postdata, urlencode_postdata,
USER_AGENTS, USER_AGENTS,
) )
class CeskaTelevizeIE(InfoExtractor): class CeskaTelevizeIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?ceskatelevize\.cz/ivysilani/(?:[^/?#&]+/)*(?P<id>[^/#?]+)' _VALID_URL = r'https?://(?:www\.)?ceskatelevize\.cz/(?:ivysilani|porady|zive)/(?:[^/?#&]+/)*(?P<id>[^/#?]+)'
_TESTS = [{ _TESTS = [{
'url': 'http://www.ceskatelevize.cz/ivysilani/ivysilani/10441294653-hyde-park-civilizace/214411058091220',
'info_dict': {
'id': '61924494877246241',
'ext': 'mp4',
'title': 'Hyde Park Civilizace: Život v Grónsku',
'description': 'md5:3fec8f6bb497be5cdb0c9e8781076626',
'thumbnail': r're:^https?://.*\.jpg',
'duration': 3350,
},
'params': {
# m3u8 download
'skip_download': True,
},
}, {
'url': 'http://www.ceskatelevize.cz/ivysilani/10441294653-hyde-park-civilizace/215411058090502/bonus/20641-bonus-01-en', 'url': 'http://www.ceskatelevize.cz/ivysilani/10441294653-hyde-park-civilizace/215411058090502/bonus/20641-bonus-01-en',
'info_dict': { 'info_dict': {
'id': '61924494877028507', 'id': '61924494877028507',
'ext': 'mp4', 'ext': 'mp4',
'title': 'Hyde Park Civilizace: Bonus 01 - En', 'title': 'Bonus 01 - En - Hyde Park Civilizace',
'description': 'English Subtittles', 'description': 'English Subtittles',
'thumbnail': r're:^https?://.*\.jpg', 'thumbnail': r're:^https?://.*\.jpg',
'duration': 81.3, 'duration': 81.3,
@ -51,31 +37,111 @@ class CeskaTelevizeIE(InfoExtractor):
}, },
}, { }, {
# live stream # live stream
'url': 'http://www.ceskatelevize.cz/ivysilani/zive/ct4/', 'url': 'http://www.ceskatelevize.cz/zive/ct1/',
'info_dict': { 'info_dict': {
'id': 402, 'id': '102',
'ext': 'mp4', 'ext': 'mp4',
'title': r're:^ČT Sport \d{4}-\d{2}-\d{2} \d{2}:\d{2}$', 'title': r'ČT1 - živé vysílání online',
'description': 'Sledujte živé vysílání kanálu ČT1 online. Vybírat si můžete i z dalších kanálů České televize na kterémkoli z vašich zařízení.',
'is_live': True, 'is_live': True,
}, },
'params': { 'params': {
# m3u8 download # m3u8 download
'skip_download': True, 'skip_download': True,
}, },
'skip': 'Georestricted to Czech Republic', }, {
# another
'url': 'http://www.ceskatelevize.cz/ivysilani/zive/ct4/',
'only_matching': True,
'info_dict': {
'id': 402,
'ext': 'mp4',
'title': r're:^ČT Sport \d{4}-\d{2}-\d{2} \d{2}:\d{2}$',
'is_live': True,
},
# 'skip': 'Georestricted to Czech Republic',
}, { }, {
'url': 'http://www.ceskatelevize.cz/ivysilani/embed/iFramePlayer.php?hash=d6a3e1370d2e4fa76296b90bad4dfc19673b641e&IDEC=217 562 22150/0004&channelID=1&width=100%25', 'url': 'http://www.ceskatelevize.cz/ivysilani/embed/iFramePlayer.php?hash=d6a3e1370d2e4fa76296b90bad4dfc19673b641e&IDEC=217 562 22150/0004&channelID=1&width=100%25',
'only_matching': True, 'only_matching': True,
}, {
# video with 18+ caution trailer
'url': 'http://www.ceskatelevize.cz/porady/10520528904-queer/215562210900007-bogotart/',
'info_dict': {
'id': '215562210900007-bogotart',
'title': 'Bogotart - Queer',
'description': 'Hlavní město Kolumbie v doprovodu queer umělců. Vroucí svět plný vášně, sebevědomí, ale i násilí a bolesti',
},
'playlist': [{
'info_dict': {
'id': '61924494877311053',
'ext': 'mp4',
'title': 'Bogotart - Queer (Varování 18+)',
'duration': 11.9,
},
}, {
'info_dict': {
'id': '61924494877068022',
'ext': 'mp4',
'title': 'Bogotart - Queer (Queer)',
'thumbnail': r're:^https?://.*\.jpg',
'duration': 1558.3,
},
}],
'params': {
# m3u8 download
'skip_download': True,
},
}, {
# iframe embed
'url': 'http://www.ceskatelevize.cz/porady/10614999031-neviditelni/21251212048/',
'only_matching': True,
}] }]
def _search_nextjs_data(self, webpage, video_id, **kw):
return self._parse_json(
self._search_regex(
r'(?s)<script[^>]+id=[\'"]__NEXT_DATA__[\'"][^>]*>([^<]+)</script>',
webpage, 'next.js data', **kw),
video_id, **kw)
def _real_extract(self, url): def _real_extract(self, url):
playlist_id = self._match_id(url) playlist_id = self._match_id(url)
webpage, urlh = self._download_webpage_handle(url, playlist_id)
parsed_url = compat_urllib_parse_urlparse(urlh.geturl())
site_name = self._og_search_property('site_name', webpage, fatal=False, default='Česká televize')
playlist_title = self._og_search_title(webpage, default=None)
if site_name and playlist_title:
playlist_title = re.split(r'\s*[—|]\s*%s' % (site_name, ), playlist_title, 1)[0]
playlist_description = self._og_search_description(webpage, default=None)
if playlist_description:
playlist_description = playlist_description.replace('\xa0', ' ')
webpage = self._download_webpage(url, playlist_id) type_ = 'IDEC'
if re.search(r'(^/porady|/zive)/', parsed_url.path):
next_data = self._search_nextjs_data(webpage, playlist_id)
if '/zive/' in parsed_url.path:
idec = traverse_obj(next_data, ('props', 'pageProps', 'data', 'liveBroadcast', 'current', 'idec'), get_all=False)
else:
idec = traverse_obj(next_data, ('props', 'pageProps', 'data', ('show', 'mediaMeta'), 'idec'), get_all=False)
if not idec:
idec = traverse_obj(next_data, ('props', 'pageProps', 'data', 'videobonusDetail', 'bonusId'), get_all=False)
if idec:
type_ = 'bonus'
if not idec:
raise ExtractorError('Failed to find IDEC id')
iframe_hash = self._download_webpage(
'https://www.ceskatelevize.cz/v-api/iframe-hash/',
playlist_id, note='Getting IFRAME hash')
query = {'hash': iframe_hash, 'origin': 'iVysilani', 'autoStart': 'true', type_: idec, }
webpage = self._download_webpage(
'https://www.ceskatelevize.cz/ivysilani/embed/iFramePlayer.php',
playlist_id, note='Downloading player', query=query)
NOT_AVAILABLE_STRING = 'This content is not available at your territory due to limited copyright.' NOT_AVAILABLE_STRING = 'This content is not available at your territory due to limited copyright.'
if '%s</p>' % NOT_AVAILABLE_STRING in webpage: if '%s</p>' % NOT_AVAILABLE_STRING in webpage:
raise ExtractorError(NOT_AVAILABLE_STRING, expected=True) self.raise_geo_restricted(NOT_AVAILABLE_STRING)
if any(not_found in webpage for not_found in ('Neplatný parametr pro videopřehrávač', 'IDEC nebyl nalezen', )):
raise ExtractorError('no video with IDEC available', video_id=idec, expected=True)
type_ = None type_ = None
episode_id = None episode_id = None
@ -100,7 +166,7 @@ class CeskaTelevizeIE(InfoExtractor):
data = { data = {
'playlist[0][type]': type_, 'playlist[0][type]': type_,
'playlist[0][id]': episode_id, 'playlist[0][id]': episode_id,
'requestUrl': compat_urllib_parse_urlparse(url).path, 'requestUrl': parsed_url.path,
'requestSource': 'iVysilani', 'requestSource': 'iVysilani',
} }
@ -108,7 +174,7 @@ class CeskaTelevizeIE(InfoExtractor):
for user_agent in (None, USER_AGENTS['Safari']): for user_agent in (None, USER_AGENTS['Safari']):
req = sanitized_Request( req = sanitized_Request(
'https://www.ceskatelevize.cz/ivysilani/ajax/get-client-playlist', 'https://www.ceskatelevize.cz/ivysilani/ajax/get-client-playlist/',
data=urlencode_postdata(data)) data=urlencode_postdata(data))
req.add_header('Content-type', 'application/x-www-form-urlencoded') req.add_header('Content-type', 'application/x-www-form-urlencoded')
@ -130,9 +196,6 @@ class CeskaTelevizeIE(InfoExtractor):
req = sanitized_Request(compat_urllib_parse_unquote(playlist_url)) req = sanitized_Request(compat_urllib_parse_unquote(playlist_url))
req.add_header('Referer', url) req.add_header('Referer', url)
playlist_title = self._og_search_title(webpage, default=None)
playlist_description = self._og_search_description(webpage, default=None)
playlist = self._download_json(req, playlist_id, fatal=False) playlist = self._download_json(req, playlist_id, fatal=False)
if not playlist: if not playlist:
continue continue
@ -167,7 +230,7 @@ class CeskaTelevizeIE(InfoExtractor):
entries[num]['formats'].extend(formats) entries[num]['formats'].extend(formats)
continue continue
item_id = item.get('id') or item['assetId'] item_id = str_or_none(item.get('id') or item['assetId'])
title = item['title'] title = item['title']
duration = float_or_none(item.get('duration')) duration = float_or_none(item.get('duration'))
@ -181,8 +244,6 @@ class CeskaTelevizeIE(InfoExtractor):
if playlist_len == 1: if playlist_len == 1:
final_title = playlist_title or title final_title = playlist_title or title
if is_live:
final_title = self._live_title(final_title)
else: else:
final_title = '%s (%s)' % (playlist_title, title) final_title = '%s (%s)' % (playlist_title, title)
@ -200,6 +261,8 @@ class CeskaTelevizeIE(InfoExtractor):
for e in entries: for e in entries:
self._sort_formats(e['formats']) self._sort_formats(e['formats'])
if len(entries) == 1:
return entries[0]
return self.playlist_result(entries, playlist_id, playlist_title, playlist_description) return self.playlist_result(entries, playlist_id, playlist_title, playlist_description)
def _get_subtitles(self, episode_id, subs): def _get_subtitles(self, episode_id, subs):
@ -236,54 +299,3 @@ class CeskaTelevizeIE(InfoExtractor):
yield line yield line
return '\r\n'.join(_fix_subtitle(subtitles)) return '\r\n'.join(_fix_subtitle(subtitles))
class CeskaTelevizePoradyIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?ceskatelevize\.cz/porady/(?:[^/?#&]+/)*(?P<id>[^/#?]+)'
_TESTS = [{
# video with 18+ caution trailer
'url': 'http://www.ceskatelevize.cz/porady/10520528904-queer/215562210900007-bogotart/',
'info_dict': {
'id': '215562210900007-bogotart',
'title': 'Queer: Bogotart',
'description': 'Alternativní průvodce současným queer světem',
},
'playlist': [{
'info_dict': {
'id': '61924494876844842',
'ext': 'mp4',
'title': 'Queer: Bogotart (Varování 18+)',
'duration': 10.2,
},
}, {
'info_dict': {
'id': '61924494877068022',
'ext': 'mp4',
'title': 'Queer: Bogotart (Queer)',
'thumbnail': r're:^https?://.*\.jpg',
'duration': 1558.3,
},
}],
'params': {
# m3u8 download
'skip_download': True,
},
}, {
# iframe embed
'url': 'http://www.ceskatelevize.cz/porady/10614999031-neviditelni/21251212048/',
'only_matching': True,
}]
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
data_url = update_url_query(unescapeHTML(self._search_regex(
(r'<span[^>]*\bdata-url=(["\'])(?P<url>(?:(?!\1).)+)\1',
r'<iframe[^>]+\bsrc=(["\'])(?P<url>(?:https?:)?//(?:www\.)?ceskatelevize\.cz/ivysilani/embed/iFramePlayer\.php.*?)\1'),
webpage, 'iframe player url', group='url')), query={
'autoStart': 'true',
})
return self.url_result(data_url, ie=CeskaTelevizeIE.ie_key())

View file

@ -70,6 +70,7 @@ from ..utils import (
str_or_none, str_or_none,
str_to_int, str_to_int,
strip_or_none, strip_or_none,
try_get,
unescapeHTML, unescapeHTML,
unified_strdate, unified_strdate,
unified_timestamp, unified_timestamp,
@ -2713,7 +2714,7 @@ class InfoExtractor(object):
def _find_jwplayer_data(self, webpage, video_id=None, transform_source=js_to_json): def _find_jwplayer_data(self, webpage, video_id=None, transform_source=js_to_json):
mobj = re.search( mobj = re.search(
r'(?s)jwplayer\((?P<quote>[\'"])[^\'" ]+(?P=quote)\)(?!</script>).*?\.setup\s*\((?P<options>[^)]+)\)', r'''(?s)jwplayer\s*\(\s*(?P<q>'|")(?!(?P=q)).+(?P=q)\s*\)(?!</script>).*?\.\s*setup\s*\(\s*(?P<options>(?:\([^)]*\)|[^)])+)\s*\)''',
webpage) webpage)
if mobj: if mobj:
try: try:
@ -2734,9 +2735,14 @@ class InfoExtractor(object):
def _parse_jwplayer_data(self, jwplayer_data, video_id=None, require_title=True, def _parse_jwplayer_data(self, jwplayer_data, video_id=None, require_title=True,
m3u8_id=None, mpd_id=None, rtmp_params=None, base_url=None): m3u8_id=None, mpd_id=None, rtmp_params=None, base_url=None):
flat_pl = try_get(jwplayer_data, lambda x: x.get('playlist') or True)
if flat_pl is None:
# not even a dict
return []
# JWPlayer backward compatibility: flattened playlists # JWPlayer backward compatibility: flattened playlists
# https://github.com/jwplayer/jwplayer/blob/v7.4.3/src/js/api/config.js#L81-L96 # https://github.com/jwplayer/jwplayer/blob/v7.4.3/src/js/api/config.js#L81-L96
if 'playlist' not in jwplayer_data: if flat_pl is True:
jwplayer_data = {'playlist': [jwplayer_data]} jwplayer_data = {'playlist': [jwplayer_data]}
entries = [] entries = []
@ -2784,6 +2790,13 @@ class InfoExtractor(object):
'timestamp': int_or_none(video_data.get('pubdate')), 'timestamp': int_or_none(video_data.get('pubdate')),
'duration': float_or_none(jwplayer_data.get('duration') or video_data.get('duration')), 'duration': float_or_none(jwplayer_data.get('duration') or video_data.get('duration')),
'subtitles': subtitles, 'subtitles': subtitles,
'alt_title': clean_html(video_data.get('subtitle')), # attributes used e.g. by Tele5 ...
'genre': clean_html(video_data.get('genre')),
'channel': clean_html(dict_get(video_data, ('category', 'channel'))),
'season_number': int_or_none(video_data.get('season')),
'episode_number': int_or_none(video_data.get('episode')),
'release_year': int_or_none(video_data.get('releasedate')),
'age_limit': int_or_none(video_data.get('age_restriction')),
} }
# https://github.com/jwplayer/jwplayer/blob/master/src/js/utils/validator.js#L32 # https://github.com/jwplayer/jwplayer/blob/master/src/js/utils/validator.js#L32
if len(formats) == 1 and re.search(r'^(?:http|//).*(?:youtube\.com|youtu\.be)/.+', formats[0]['url']): if len(formats) == 1 and re.search(r'^(?:http|//).*(?:youtube\.com|youtu\.be)/.+', formats[0]['url']):
@ -2792,7 +2805,9 @@ class InfoExtractor(object):
'url': formats[0]['url'], 'url': formats[0]['url'],
}) })
else: else:
self._sort_formats(formats) # avoid exception in case of only sttls
if formats:
self._sort_formats(formats)
entry['formats'] = formats entry['formats'] = formats
entries.append(entry) entries.append(entry)
if len(entries) == 1: if len(entries) == 1:
@ -2802,7 +2817,7 @@ class InfoExtractor(object):
def _parse_jwplayer_formats(self, jwplayer_sources_data, video_id=None, def _parse_jwplayer_formats(self, jwplayer_sources_data, video_id=None,
m3u8_id=None, mpd_id=None, rtmp_params=None, base_url=None): m3u8_id=None, mpd_id=None, rtmp_params=None, base_url=None):
urls = [] urls = set()
formats = [] formats = []
for source in jwplayer_sources_data: for source in jwplayer_sources_data:
if not isinstance(source, dict): if not isinstance(source, dict):
@ -2811,14 +2826,14 @@ class InfoExtractor(object):
base_url, self._proto_relative_url(source.get('file'))) base_url, self._proto_relative_url(source.get('file')))
if not source_url or source_url in urls: if not source_url or source_url in urls:
continue continue
urls.append(source_url) urls.add(source_url)
source_type = source.get('type') or '' source_type = source.get('type') or ''
ext = mimetype2ext(source_type) or determine_ext(source_url) ext = mimetype2ext(source_type) or determine_ext(source_url)
if source_type == 'hls' or ext == 'm3u8': if source_type == 'hls' or ext == 'm3u8' or 'format=m3u8-aapl' in source_url:
formats.extend(self._extract_m3u8_formats( formats.extend(self._extract_m3u8_formats(
source_url, video_id, 'mp4', entry_protocol='m3u8_native', source_url, video_id, 'mp4', entry_protocol='m3u8_native',
m3u8_id=m3u8_id, fatal=False)) m3u8_id=m3u8_id, fatal=False))
elif source_type == 'dash' or ext == 'mpd': elif source_type == 'dash' or ext == 'mpd' or 'format=mpd-time-csf' in source_url:
formats.extend(self._extract_mpd_formats( formats.extend(self._extract_mpd_formats(
source_url, video_id, mpd_id=mpd_id, fatal=False)) source_url, video_id, mpd_id=mpd_id, fatal=False))
elif ext == 'smil': elif ext == 'smil':
@ -2833,20 +2848,23 @@ class InfoExtractor(object):
'ext': ext, 'ext': ext,
}) })
else: else:
format_id = str_or_none(source.get('label'))
height = int_or_none(source.get('height')) height = int_or_none(source.get('height'))
if height is None: if height is None and format_id:
# Often no height is provided but there is a label in # Often no height is provided but there is a label in
# format like "1080p", "720p SD", or 1080. # format like "1080p", "720p SD", or 1080.
height = int_or_none(self._search_regex( height = parse_resolution(format_id).get('height')
r'^(\d{3,4})[pP]?(?:\b|$)', compat_str(source.get('label') or ''),
'height', default=None))
a_format = { a_format = {
'url': source_url, 'url': source_url,
'width': int_or_none(source.get('width')), 'width': int_or_none(source.get('width')),
'height': height, 'height': height,
'tbr': int_or_none(source.get('bitrate'), scale=1000), 'tbr': int_or_none(source.get('bitrate'), scale=1000),
'filesize': int_or_none(source.get('filesize')),
'ext': ext, 'ext': ext,
} }
if format_id:
a_format['format_id'] = format_id
if source_url.startswith('rtmp'): if source_url.startswith('rtmp'):
a_format['ext'] = 'flv' a_format['ext'] = 'flv'
# See com/longtailvideo/jwplayer/media/RTMPMediaProvider.as # See com/longtailvideo/jwplayer/media/RTMPMediaProvider.as

View file

@ -208,10 +208,7 @@ from .ccc import (
from .ccma import CCMAIE from .ccma import CCMAIE
from .cctv import CCTVIE from .cctv import CCTVIE
from .cda import CDAIE from .cda import CDAIE
from .ceskatelevize import ( from .ceskatelevize import CeskaTelevizeIE
CeskaTelevizeIE,
CeskaTelevizePoradyIE,
)
from .channel9 import Channel9IE from .channel9 import Channel9IE
from .charlierose import CharlieRoseIE from .charlierose import CharlieRoseIE
from .chaturbate import ChaturbateIE from .chaturbate import ChaturbateIE
@ -913,6 +910,10 @@ from .parliamentliveuk import ParliamentLiveUKIE
from .patreon import PatreonIE from .patreon import PatreonIE
from .pbs import PBSIE from .pbs import PBSIE
from .pearvideo import PearVideoIE from .pearvideo import PearVideoIE
from .peekvids import (
PeekVidsIE,
PlayVidsIE,
)
from .peertube import PeerTubeIE from .peertube import PeerTubeIE
from .people import PeopleIE from .people import PeopleIE
from .performgroup import PerformGroupIE from .performgroup import PerformGroupIE

View file

@ -0,0 +1,193 @@
# coding: utf-8
from __future__ import unicode_literals
import re
from .common import InfoExtractor
from ..utils import (
ExtractorError,
get_element_by_class,
int_or_none,
merge_dicts,
url_or_none,
)
class PeekVidsIE(InfoExtractor):
_VALID_URL = r'''(?x)
https?://(?:www\.)?peekvids\.com/
(?:(?:[^/?#]+/){2}|embed/?\?(?:[^#]*&)?v=)
(?P<id>[^/?&#]*)
'''
_TESTS = [{
'url': 'https://peekvids.com/pc/dane-jones-cute-redhead-with-perfect-tits-with-mini-vamp/BSyLMbN0YCd',
'md5': '2ff6a357a9717dc9dc9894b51307e9a2',
'info_dict': {
'id': '1262717',
'display_id': 'BSyLMbN0YCd',
'title': ' Dane Jones - Cute redhead with perfect tits with Mini Vamp',
'ext': 'mp4',
'thumbnail': r're:^https?://.*\.jpg$',
'description': 'md5:0a61df3620de26c0af8963b1a730cd69',
'timestamp': 1642579329,
'upload_date': '20220119',
'duration': 416,
'view_count': int,
'age_limit': 18,
'uploader': 'SEXYhub.com',
'categories': list,
'tags': list,
},
}]
_DOMAIN = 'www.peekvids.com'
def _get_detail(self, html):
return get_element_by_class('detail-video-block', html)
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id, expected_status=429)
if '>Rate Limit Exceeded' in webpage:
raise ExtractorError(
'[%s] %s: %s' % (self.IE_NAME, video_id, 'You are suspected as a bot. Wait, or pass the captcha test on the site and provide --cookies.'),
expected=True)
title = self._html_search_regex(r'(?s)<h1\b[^>]*>(.+?)</h1>', webpage, 'title')
display_id = video_id
video_id = self._search_regex(r'(?s)<video\b[^>]+\bdata-id\s*=\s*["\']?([\w-]+)', webpage, 'short video ID')
srcs = self._download_json(
'https://%s/v-alt/%s' % (self._DOMAIN, video_id), video_id,
note='Downloading list of source files')
formats = [{
'url': f_url,
'format_id': f_id,
'height': int_or_none(f_id),
} for f_url, f_id in (
(url_or_none(f_v), f_match.group(1))
for f_v, f_match in (
(v, re.match(r'^data-src(\d{3,})$', k))
for k, v in srcs.items() if v) if f_match)
if f_url
]
if not formats:
formats = [{'url': url} for url in srcs.values()]
self._sort_formats(formats)
info = self._search_json_ld(webpage, video_id, expected_type='VideoObject', default={})
info.pop('url', None)
# may not have found the thumbnail if it was in a list in the ld+json
info.setdefault('thumbnail', self._og_search_thumbnail(webpage))
detail = self._get_detail(webpage) or ''
info['description'] = self._html_search_regex(
r'(?s)(.+?)(?:%s\s*<|<ul\b)' % (re.escape(info.get('description', '')), ),
detail, 'description', default=None) or None
info['title'] = re.sub(r'\s*[,-][^,-]+$', '', info.get('title') or title) or self._generic_title(url)
def cat_tags(name, html):
l = self._html_search_regex(
r'(?s)<span\b[^>]*>\s*%s\s*:\s*</span>(.+?)</li>' % (re.escape(name), ),
html, name, default='')
return [x for x in re.split(r'\s+', l) if x]
return merge_dicts({
'id': video_id,
'display_id': display_id,
'age_limit': 18,
'formats': formats,
'categories': cat_tags('Categories', detail),
'tags': cat_tags('Tags', detail),
'uploader': self._html_search_regex(r'[Uu]ploaded\s+by\s(.+?)"', webpage, 'uploader', default=None),
}, info)
class PlayVidsIE(PeekVidsIE):
_VALID_URL = r'https?://(?:www\.)?playvids\.com/(?:embed/|\w\w?/)?(?P<id>[^/?#]*)'
_TESTS = [{
'url': 'https://www.playvids.com/U3pBrYhsjXM/pc/dane-jones-cute-redhead-with-perfect-tits-with-mini-vamp',
'md5': '2f12e50213dd65f142175da633c4564c',
'info_dict': {
'id': '1978030',
'display_id': 'U3pBrYhsjXM',
'title': ' Dane Jones - Cute redhead with perfect tits with Mini Vamp',
'ext': 'mp4',
'thumbnail': r're:^https?://.*\.jpg$',
'description': 'md5:0a61df3620de26c0af8963b1a730cd69',
'timestamp': 1640435839,
'upload_date': '20211225',
'duration': 416,
'view_count': int,
'age_limit': 18,
'uploader': 'SEXYhub.com',
'categories': list,
'tags': list,
},
}, {
'url': 'https://www.playvids.com/es/U3pBrYhsjXM/pc/dane-jones-cute-redhead-with-perfect-tits-with-mini-vamp',
'only_matching': True,
}, {
'url': 'https://www.playvids.com/embed/U3pBrYhsjXM',
'only_matching': True,
}, {
'url': 'https://www.playvids.com/bKmGLe3IwjZ/sv/brazzers-800-phone-sex-madison-ivy-always-on-the-line',
'md5': 'e783986e596cafbf46411a174ab42ba6',
'info_dict': {
'id': '762385',
'display_id': 'bKmGLe3IwjZ',
'ext': 'mp4',
'title': 'Brazzers - 1 800 Phone Sex: Madison Ivy Always On The Line 6',
'description': 'md5:bdcd2db2b8ad85831a491d7c8605dcef',
'timestamp': 1516958544,
'upload_date': '20180126',
'thumbnail': r're:^https?://.*\.jpg$',
'duration': 480,
'uploader': 'Brazzers',
'age_limit': 18,
'view_count': int,
'age_limit': 18,
'categories': list,
'tags': list,
},
}, {
'url': 'https://www.playvids.com/v/47iUho33toY',
'md5': 'b056b5049d34b648c1e86497cf4febce',
'info_dict': {
'id': '700621',
'display_id': '47iUho33toY',
'ext': 'mp4',
'title': 'KATEE OWEN STRIPTIASE IN SEXY RED LINGERIE',
'description': None,
'timestamp': 1507052209,
'upload_date': '20171003',
'thumbnail': r're:^https?://.*\.jpg$',
'duration': 332,
'uploader': 'Cacerenele',
'age_limit': 18,
'view_count': int,
'categories': list,
'tags': list,
}
}, {
'url': 'https://www.playvids.com/z3_7iwWCmqt/sexy-teen-filipina-striptease-beautiful-pinay-bargirl-strips-and-dances',
'md5': 'efa09be9f031314b7b7e3bc6510cd0df',
'info_dict': {
'id': '1523518',
'display_id': 'z3_7iwWCmqt',
'ext': 'mp4',
'title': 'SEXY TEEN FILIPINA STRIPTEASE - Beautiful Pinay Bargirl Strips and Dances',
'description': None,
'timestamp': 1607470323,
'upload_date': '20201208',
'thumbnail': r're:^https?://.*\.jpg$',
'duration': 593,
'uploader': 'yorours',
'age_limit': 18,
'view_count': int,
'categories': list,
'tags': list,
},
}]
_DOMAIN = 'www.playvids.com'
def _get_detail(self, html):
return get_element_by_class('detail-block', html)

View file

@ -43,6 +43,7 @@ from .compat import (
compat_HTTPError, compat_HTTPError,
compat_basestring, compat_basestring,
compat_chr, compat_chr,
compat_collections_abc,
compat_cookiejar, compat_cookiejar,
compat_ctypes_WINFUNCTYPE, compat_ctypes_WINFUNCTYPE,
compat_etree_fromstring, compat_etree_fromstring,
@ -1685,6 +1686,7 @@ USER_AGENTS = {
NO_DEFAULT = object() NO_DEFAULT = object()
IDENTITY = lambda x: x
ENGLISH_MONTH_NAMES = [ ENGLISH_MONTH_NAMES = [
'January', 'February', 'March', 'April', 'May', 'June', 'January', 'February', 'March', 'April', 'May', 'June',
@ -3867,6 +3869,105 @@ def detect_exe_version(output, version_re=None, unrecognized='present'):
return unrecognized return unrecognized
class LazyList(compat_collections_abc.Sequence):
"""Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList"""
class IndexError(IndexError):
def __init__(self, cause=None):
if cause:
# reproduce `raise from`
self.__cause__ = cause
super(IndexError, self).__init__()
def __init__(self, iterable, **kwargs):
# kwarg-only
reverse = kwargs.get('reverse', False)
_cache = kwargs.get('_cache')
self._iterable = iter(iterable)
self._cache = [] if _cache is None else _cache
self._reversed = reverse
def __iter__(self):
if self._reversed:
# We need to consume the entire iterable to iterate in reverse
for item in self.exhaust():
yield item
return
for item in self._cache:
yield item
for item in self._iterable:
self._cache.append(item)
yield item
def _exhaust(self):
self._cache.extend(self._iterable)
self._iterable = [] # Discard the emptied iterable to make it pickle-able
return self._cache
def exhaust(self):
"""Evaluate the entire iterable"""
return self._exhaust()[::-1 if self._reversed else 1]
@staticmethod
def _reverse_index(x):
return None if x is None else ~x
def __getitem__(self, idx):
if isinstance(idx, slice):
if self._reversed:
idx = slice(self._reverse_index(idx.start), self._reverse_index(idx.stop), -(idx.step or 1))
start, stop, step = idx.start, idx.stop, idx.step or 1
elif isinstance(idx, int):
if self._reversed:
idx = self._reverse_index(idx)
start, stop, step = idx, idx, 0
else:
raise TypeError('indices must be integers or slices')
if ((start or 0) < 0 or (stop or 0) < 0
or (start is None and step < 0)
or (stop is None and step > 0)):
# We need to consume the entire iterable to be able to slice from the end
# Obviously, never use this with infinite iterables
self._exhaust()
try:
return self._cache[idx]
except IndexError as e:
raise self.IndexError(e)
n = max(start or 0, stop or 0) - len(self._cache) + 1
if n > 0:
self._cache.extend(itertools.islice(self._iterable, n))
try:
return self._cache[idx]
except IndexError as e:
raise self.IndexError(e)
def __bool__(self):
try:
self[-1] if self._reversed else self[0]
except self.IndexError:
return False
return True
def __len__(self):
self._exhaust()
return len(self._cache)
def __reversed__(self):
return type(self)(self._iterable, reverse=not self._reversed, _cache=self._cache)
def __copy__(self):
return type(self)(self._iterable, reverse=self._reversed, _cache=self._cache)
def __repr__(self):
# repr and str should mimic a list. So we exhaust the iterable
return repr(self.exhaust())
def __str__(self):
return repr(self.exhaust())
class PagedList(object): class PagedList(object):
def __len__(self): def __len__(self):
# This is only useful for tests # This is only useful for tests
@ -4092,6 +4193,10 @@ def multipart_encode(data, boundary=None):
return out, content_type return out, content_type
def variadic(x, allowed_types=(compat_str, bytes, dict)):
return x if isinstance(x, compat_collections_abc.Iterable) and not isinstance(x, allowed_types) else (x,)
def dict_get(d, key_or_keys, default=None, skip_false_values=True): def dict_get(d, key_or_keys, default=None, skip_false_values=True):
if isinstance(key_or_keys, (list, tuple)): if isinstance(key_or_keys, (list, tuple)):
for key in key_or_keys: for key in key_or_keys:
@ -4102,6 +4207,23 @@ def dict_get(d, key_or_keys, default=None, skip_false_values=True):
return d.get(key_or_keys, default) return d.get(key_or_keys, default)
def try_call(*funcs, **kwargs):
# parameter defaults
expected_type = kwargs.get('expected_type')
fargs = kwargs.get('args', [])
fkwargs = kwargs.get('kwargs', {})
for f in funcs:
try:
val = f(*fargs, **fkwargs)
except (AttributeError, KeyError, TypeError, IndexError, ZeroDivisionError):
pass
else:
if expected_type is None or isinstance(val, expected_type):
return val
def try_get(src, getter, expected_type=None): def try_get(src, getter, expected_type=None):
if not isinstance(getter, (list, tuple)): if not isinstance(getter, (list, tuple)):
getter = [getter] getter = [getter]
@ -5835,3 +5957,220 @@ def clean_podcast_url(url):
st\.fm # https://podsights.com/docs/ st\.fm # https://podsights.com/docs/
)/e )/e
)/''', '', url) )/''', '', url)
def traverse_obj(obj, *paths, **kwargs):
"""
Safely traverse nested `dict`s and `Sequence`s
>>> obj = [{}, {"key": "value"}]
>>> traverse_obj(obj, (1, "key"))
"value"
Each of the provided `paths` is tested and the first producing a valid result will be returned.
The next path will also be tested if the path branched but no results could be found.
Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
A value of None is treated as the absence of a value.
The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
The keys in the path can be one of:
- `None`: Return the current object.
- `str`/`int`: Return `obj[key]`. For `re.Match, return `obj.group(key)`.
- `slice`: Branch out and return all values in `obj[key]`.
- `Ellipsis`: Branch out and return a list of all values.
- `tuple`/`list`: Branch out and return a list of all matching values.
Read as: `[traverse_obj(obj, branch) for branch in branches]`.
- `function`: Branch out and return values filtered by the function.
Read as: `[value for key, value in obj if function(key, value)]`.
For `Sequence`s, `key` is the index of the value.
- `dict` Transform the current object and return a matching dict.
Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
`tuple`, `list`, and `dict` all support nested paths and branches.
@params paths Paths which to traverse by.
Keyword arguments:
@param default Value to return if the paths do not match.
@param expected_type If a `type`, only accept final values of this type.
If any other callable, try to call the function on each result.
@param get_all If `False`, return the first matching result, otherwise all matching ones.
@param casesense If `False`, consider string dictionary keys as case insensitive.
The following are only meant to be used by YoutubeDL.prepare_outtmpl and are not part of the API
@param _is_user_input Whether the keys are generated from user input.
If `True` strings get converted to `int`/`slice` if needed.
@param _traverse_string Whether to traverse into objects as strings.
If `True`, any non-compatible object will first be
converted into a string and then traversed into.
@returns The result of the object traversal.
If successful, `get_all=True`, and the path branches at least once,
then a list of results is returned instead.
A list is always returned if the last path branches and no `default` is given.
"""
# parameter defaults
default = kwargs.get('default', NO_DEFAULT)
expected_type = kwargs.get('expected_type')
get_all = kwargs.get('get_all', True)
casesense = kwargs.get('casesense', True)
_is_user_input = kwargs.get('_is_user_input', False)
_traverse_string = kwargs.get('_traverse_string', False)
# instant compat
str = compat_str
is_sequence = lambda x: isinstance(x, compat_collections_abc.Sequence) and not isinstance(x, (str, bytes))
# stand-in until compat_re_Match is added
compat_re_Match = type(re.match('a', 'a'))
# stand-in until casefold.py is added
try:
''.casefold()
compat_casefold = lambda s: s.casefold()
except AttributeError:
compat_casefold = lambda s: s.lower()
casefold = lambda k: compat_casefold(k) if isinstance(k, str) else k
if isinstance(expected_type, type):
type_test = lambda val: val if isinstance(val, expected_type) else None
else:
type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,))
def from_iterable(iterables):
# chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
for it in iterables:
for item in it:
yield item
def apply_key(key, obj):
if obj is None:
return
elif key is None:
yield obj
elif isinstance(key, (list, tuple)):
for branch in key:
_, result = apply_path(obj, branch)
for item in result:
yield item
elif key is Ellipsis:
result = []
if isinstance(obj, compat_collections_abc.Mapping):
result = obj.values()
elif is_sequence(obj):
result = obj
elif isinstance(obj, compat_re_Match):
result = obj.groups()
elif _traverse_string:
result = str(obj)
for item in result:
yield item
elif callable(key):
if is_sequence(obj):
iter_obj = enumerate(obj)
elif isinstance(obj, compat_collections_abc.Mapping):
iter_obj = obj.items()
elif isinstance(obj, compat_re_Match):
iter_obj = enumerate(itertools.chain([obj.group()], obj.groups()))
elif _traverse_string:
iter_obj = enumerate(str(obj))
else:
return
for item in (v for k, v in iter_obj if try_call(key, args=(k, v))):
yield item
elif isinstance(key, dict):
iter_obj = ((k, _traverse_obj(obj, v)) for k, v in key.items())
yield dict((k, v if v is not None else default) for k, v in iter_obj
if v is not None or default is not NO_DEFAULT)
elif isinstance(obj, compat_collections_abc.Mapping):
yield (obj.get(key) if casesense or (key in obj)
else next((v for k, v in obj.items() if casefold(k) == key), None))
elif isinstance(obj, compat_re_Match):
if isinstance(key, int) or casesense:
try:
yield obj.group(key)
return
except IndexError:
pass
if not isinstance(key, str):
return
yield next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
else:
if _is_user_input:
key = (int_or_none(key) if ':' not in key
else slice(*map(int_or_none, key.split(':'))))
if not isinstance(key, (int, slice)):
return
if not is_sequence(obj):
if not _traverse_string:
return
obj = str(obj)
try:
yield obj[key]
except IndexError:
pass
def apply_path(start_obj, path):
objs = (start_obj,)
has_branched = False
for key in variadic(path):
if _is_user_input and key == ':':
key = Ellipsis
if not casesense and isinstance(key, str):
key = compat_casefold(key)
if key is Ellipsis or isinstance(key, (list, tuple)) or callable(key):
has_branched = True
key_func = functools.partial(apply_key, key)
objs = from_iterable(map(key_func, objs))
return has_branched, objs
def _traverse_obj(obj, path, use_list=True):
has_branched, results = apply_path(obj, path)
results = LazyList(x for x in map(type_test, results) if x is not None)
if get_all and has_branched:
return results.exhaust() if results or use_list else None
return results[0] if results else None
for index, path in enumerate(paths, 1):
use_list = default is NO_DEFAULT and index == len(paths)
result = _traverse_obj(obj, path, use_list)
if result is not None:
return result
return None if default is NO_DEFAULT else default
def get_first(obj, keys, **kwargs):
return traverse_obj(obj, (Ellipsis,) + tuple(variadic(keys)), get_all=False, **kwargs)
def join_nonempty(*values, **kwargs):
# parameter defaults
delim = kwargs.get('delim', '-')
from_dict = kwargs.get('from_dict')
if from_dict is not None:
values = (traverse_obj(from_dict, variadic(v)) for v in values)
return delim.join(map(compat_str, filter(None, values)))