Support ITVX

This commit is contained in:
dirkf 2023-01-25 17:58:29 +00:00 committed by GitHub
commit 5aae4eec82
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,81 +2,37 @@
from __future__ import unicode_literals
import json
import re
from .common import InfoExtractor
from .brightcove import BrightcoveNewIE
from ..compat import (
compat_HTTPError,
compat_kwargs,
compat_str,
)
from ..utils import ( # noqa: F401
base_url,
from ..utils import (
clean_html,
determine_ext,
extract_attributes,
ExtractorError,
get_element_by_class,
JSON_LD_RE,
get_element_by_attribute,
int_or_none,
merge_dicts,
parse_duration,
parse_iso8601,
smuggle_url,
try_get,
std_headers,
strip_or_none,
traverse_obj,
url_or_none,
url_basename,
urljoin,
)
class ITVBaseIE(InfoExtractor):
# enforce default UA that ITV doesn't block
_VANILLA_UA = 'Mozilla/5.0'
def _request_webpage(self, *args, **kwargs):
headers = kwargs.get('headers', {})
if 'User-Agent' not in headers:
headers['User-Agent'] = self._VANILLA_UA
kwargs.update({'headers': headers, })
kwargs = compat_kwargs(kwargs)
return super(ITVBaseIE, self)._request_webpage(*args, **kwargs)
class ITVIE(ITVBaseIE):
_VALID_URL = r'https?://(?:www\.)?itv\.com/hub/[^/]+/(?P<id>[0-9a-zA-Z]+)'
class ITVIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?itv\.com/(?:(?P<w>watch)|hub)/[^/]+/(?(w)[\w-]+/)(?P<id>\w+)'
_GEO_BYPASS = False
_GEO_COUNTRIES = ['GB']
_IE_DESC = 'ITVX'
_TESTS = [{
'url': 'https://www.itv.com/hub/plebs/2a1873a0002',
'info_dict': {
'id': '2a1873a0002',
'ext': 'mp4',
'title': 'Plebs - The Orgy',
'description': 'md5:4d7159af53ebd5b36e8b3ec82a41fdb4',
'series': 'Plebs',
'season_number': 1,
'episode_number': 1,
'thumbnail': r're:https?://hubimages\.itv\.com/episode/2_1873_0002'
},
'params': {
# m3u8 download
'skip_download': True,
},
}, {
'url': 'https://www.itv.com/hub/the-jonathan-ross-show/2a1166a0209',
'info_dict': {
'id': '2a1166a0209',
'ext': 'mp4',
'title': 'The Jonathan Ross Show - Series 17 - Episode 8',
'description': 'md5:3023dcdd375db1bc9967186cdb3f1399',
'series': 'The Jonathan Ross Show',
'episode_number': 8,
'season_number': 17,
'thumbnail': r're:https?://hubimages\.itv\.com/episode/2(?:_\d{4}){2}'
},
'params': {
# m3u8 download
'skip_download': True,
},
# while it redirects to ITVX
'url': 'https://www.itv.com/hub/liar/2a4547a0012',
'only_matching': True,
}, {
# unavailable via data-playlist-url
'url': 'https://www.itv.com/hub/through-the-keyhole/2a2271a0033',
@ -89,209 +45,232 @@ class ITVIE(ITVBaseIE):
# ContentUnavailable
'url': 'https://www.itv.com/hub/whos-doing-the-dishes/2a2898a0024',
'only_matching': True,
}]
}, {
'url': 'https://www.itv.com/watch/vera/1a7314/1a7314a0014',
'md5': 'bd0ad666b2c058fffe7d036785880064',
'info_dict': {
'id': '1a7314a0014',
'ext': 'mp4',
'title': 'Vera - Series 3 - Episode 4 - Prodigal Son',
'description': 'Vera and her team investigate the fatal stabbing of an ex-Met police officer outside a busy Newcastle nightclub - but there aren\'t many clues.',
'timestamp': 1653591600,
'upload_date': '20220526',
'uploader': 'ITVX',
'thumbnail': r're:https://\w+\.itv\.com/images/(?:\w+/)+\d+x\d+\?',
'duration': 5340.8,
'age_limit': 16,
'series': 'Vera',
'series_number': 3,
'episode': 'Prodigal Son',
'episode_number': 4,
'channel': 'ITV3',
'categories': list,
},
'params': {
# m3u8 download
'skip_download': True,
},
}, {
# Latest ITV news bulletin: details change daily
'url': 'https://www.itv.com/watch/news/varies-but-is-not-checked/6js5d0f',
'info_dict': {
'id': '6js5d0f',
'ext': 'mp4',
'title': r're:The latest ITV News headlines - \S.+',
'description': r'''re:.* today's top stories from the ITV News team.$''',
'timestamp': int,
'upload_date': r're:2\d\d\d(?:0[1-9]|1[0-2])(?:[012][1-9]|3[01])',
'uploader': 'ITVX',
'thumbnail': r're:https://images\.ctfassets\.net/(?:\w+/)+[\w.]+\.(?:jpg|png)',
'duration': float,
'age_limit': None,
},
'params': {
# variable download
'skip_download': True,
},
}
]
def _generate_api_headers(self, hmac):
return merge_dicts({
def _og_extract(self, webpage, require_title=False):
return {
'title': self._og_search_title(webpage, fatal=require_title),
'description': self._og_search_description(webpage, default=None),
'thumbnail': self._og_search_thumbnail(webpage, default=None),
'uploader': self._og_search_property('site_name', webpage, default=None),
}
def _search_nextjs_data(self, webpage, video_id, **kw):
transform_source = kw.pop('transform_source', None)
fatal = kw.pop('fatal', True)
return self._parse_json(
self._search_regex(
r'''<script\b[^>]+\bid=('|")__NEXT_DATA__\1[^>]*>(?P<js>[^<]+)</script>''',
webpage, 'next.js data', group='js', fatal=fatal, **kw),
video_id, transform_source=transform_source, fatal=fatal)
def _real_extract(self, url):
video_id = self._match_id(url)
params = self._downloader.params
v_headers = {}
if (
'user_agent' not in params
and not any(re.match(r'(?i)user-agent\s*:', h)
for h in (params.get('headers') or []))):
v_headers['User-agent'] = 'Mozilla/5.0'
webpage = self._download_webpage(url, video_id, headers=v_headers)
# now quite different params!
params = extract_attributes(self._search_regex(
r'''(<[^>]+\b(?:class|data-testid)\s*=\s*("|')genie-container\2[^>]*>)''',
webpage, 'params'))
ios_playlist_url = traverse_obj(
params, 'data-video-id', 'data-video-playlist',
get_all=False, expected_type=url_or_none)
headers = self.geo_verification_headers()
headers.update({
'Accept': 'application/vnd.itv.vod.playlist.v2+json',
'Content-Type': 'application/json',
'hmac': hmac.upper(),
}, self.geo_verification_headers())
def _call_api(self, video_id, playlist_url, headers, platform_tag, featureset, fatal=True):
return self._download_json(
playlist_url, video_id, data=json.dumps({
})
ios_playlist = self._download_json(
ios_playlist_url, video_id, data=json.dumps({
'user': {
'itvUserId': '',
'entitlements': [],
'token': ''
},
'device': {
'manufacturer': 'Safari',
'model': '5',
'manufacturer': 'Mobile Safari',
'model': '5.1',
'os': {
'name': 'Windows NT',
'version': '6.1',
'type': 'desktop'
'name': 'iOS',
'version': '5.0',
'type': ' mobile'
}
},
'client': {
'version': '4.1',
'id': 'browser'
'id': 'browser',
'supportsAdPods': True,
'service': 'itv.x',
'appversion': '2.43.28',
},
'variantAvailability': {
'player': 'hls',
'featureset': {
'min': featureset,
'max': featureset
'min': ['hls', 'aes', 'outband-webvtt'],
'max': ['hls', 'aes', 'outband-webvtt']
},
'platformTag': platform_tag
'platformTag': 'mobile'
}
}).encode(), headers=headers, fatal=fatal)
}).encode(), headers=headers)
video_data = ios_playlist['Playlist']['Video']
ios_base_url = traverse_obj(video_data, 'Base', expected_type=url_or_none)
def _get_subtitles(self, video_id, variants, ios_playlist_url, headers, *args, **kwargs):
subtitles = {}
# Prefer last matching featureset
# See: https://github.com/yt-dlp/yt-dlp/issues/986
platform_tag_subs, featureset_subs = next(
((platform_tag, featureset)
for platform_tag, featuresets in reversed(tuple(variants.items())) for featureset in featuresets
if try_get(featureset, lambda x: x[2]) == 'outband-webvtt'),
(None, None))
media_url = (
(lambda u: url_or_none(urljoin(ios_base_url, u)))
if ios_base_url else url_or_none)
if platform_tag_subs and featureset_subs:
subs_playlist = self._call_api(
video_id, ios_playlist_url, headers, platform_tag_subs, featureset_subs, fatal=False)
subs = try_get(subs_playlist, lambda x: x['Playlist']['Video']['Subtitles'], list) or []
for sub in subs:
if not isinstance(sub, dict):
continue
href = url_or_none(sub.get('Href'))
if not href:
continue
subtitles.setdefault('en', []).append({'url': href})
return subtitles
def _real_extract(self, url):
video_id = self._match_id(url)
webpage, urlh = self._download_webpage_handle(url, video_id, expected_status=404)
title = (
self._html_search_meta(['og:title', 'twitter:title'], webpage)
or self._html_search(r'(?s)<title\b[^>]*>(.+?)(?:-\s+ITV\s+Hub\s*)?</title\b', 'title', webpage))
if any(sorry in title for sorry in ('Episode not available', "We're really sorry")):
raise ExtractorError(
'%s not found; %s said: %s' % (video_id, self.IE_NAME, title),
expected=True)
if urlh.getcode() == 404:
raise compat_HTTPError(
urlh.geturl(), 404, '%s (%s: %s)' % (urlh.msg or 'Not Found', video_id, title, ), urlh.headers, None)
params = extract_attributes(self._search_regex(
r'(?s)(<[^>]+id="video"[^>]*>)', webpage, 'params'))
variants = self._parse_json(
try_get(params, lambda x: x['data-video-variants'], compat_str) or '{}',
video_id, fatal=False)
# Prefer last matching featureset
# See: https://github.com/yt-dlp/yt-dlp/issues/986
platform_tag_video, featureset_video = next(
((platform_tag, featureset)
for platform_tag, featuresets in reversed(tuple(variants.items())) for featureset in featuresets
if set(try_get(featureset, lambda x: x[:2]) or {}) == {'aes', 'hls'}),
(None, None))
if not platform_tag_video or not featureset_video:
raise ExtractorError('No downloads available', expected=True, video_id=video_id)
ios_playlist_url = params.get('data-video-playlist') or params['data-video-id']
headers = self._generate_api_headers(params['data-video-hmac'])
ios_playlist = self._call_api(
video_id, ios_playlist_url, headers, platform_tag_video, featureset_video)
video_data = try_get(ios_playlist, lambda x: x['Playlist']['Video'], dict) or {}
ios_base_url = video_data.get('Base')
formats = []
for media_file in (video_data.get('MediaFiles') or []):
href = media_file.get('Href')
for media_file in traverse_obj(video_data, 'MediaFiles', expected_type=list) or []:
href = traverse_obj(media_file, 'Href', expected_type=media_url)
if not href:
continue
if ios_base_url:
href = ios_base_url + href
ext = determine_ext(href)
if ext == 'm3u8':
formats.extend(self._extract_m3u8_formats(
href, video_id, 'mp4', entry_protocol='m3u8_native',
href, video_id, 'mp4', entry_protocol='m3u8',
m3u8_id='hls', fatal=False))
else:
formats.append({
'url': href,
})
self._sort_formats(formats)
info = self._search_json_ld(webpage, video_id, default={})
if not info:
json_ld = self._parse_json(self._search_regex(
JSON_LD_RE, webpage, 'JSON-LD', '{}',
group='json_ld'), video_id, fatal=False)
if json_ld and json_ld.get('@type') == 'BreadcrumbList':
for ile in (json_ld.get('itemListElement:') or []):
item = ile.get('item:') or {}
if item.get('@type') == 'TVEpisode':
item['@context'] = 'http://schema.org'
info = self._json_ld(item, video_id, fatal=False) or {}
break
thumbnails = []
thumbnail_url = try_get(params, lambda x: x['data-video-posterframe'], compat_str)
if thumbnail_url:
thumbnails.extend([{
'url': thumbnail_url.format(width=1920, height=1080, quality=100, blur=0, bg='false'),
'width': 1920,
'height': 1080,
}, {
'url': urljoin(base_url(thumbnail_url), url_basename(thumbnail_url)),
'preference': -2
}])
thumbnail_url = self._html_search_meta(['og:image', 'twitter:image'], webpage, default=None)
if thumbnail_url:
thumbnails.append({
'url': thumbnail_url,
subtitles = {}
for sub in traverse_obj(video_data, 'Subtitles', expected_type=list) or []:
href = traverse_obj(sub, 'Href', expected_type=url_or_none)
if not href:
continue
subtitles.setdefault('en', []).append({
'url': href,
'ext': determine_ext(href, 'vtt'),
})
self._remove_duplicate_formats(thumbnails)
# TODO: remove this once utils.py is updated
def parse_duration(s): # noqa: F811
from re import sub
from ..utils import parse_duration as utils_parse_duration
return utils_parse_duration(
sub(r':(\d{3,})$', r'.\1', s or '') or None)
next_data = self._search_nextjs_data(webpage, video_id, fatal=False, default='{}')
video_data.update(traverse_obj(next_data, ('props', 'pageProps', ('title', 'episode')), expected_type=dict)[0] or {})
title = traverse_obj(video_data, 'headerTitle', 'episodeTitle')
info = self._og_extract(webpage, require_title=not title)
tn = info.pop('thumbnail', None)
if tn:
info['thumbnails'] = [{'url': tn}]
# num. episode title
num_ep_title = video_data.get('numberedEpisodeTitle')
if not num_ep_title:
num_ep_title = clean_html(get_element_by_attribute('data-testid', 'episode-hero-description-strong', webpage))
num_ep_title = num_ep_title and num_ep_title.rstrip(' -')
ep_title = strip_or_none(
video_data.get('episodeTitle')
or (num_ep_title.split('.', 1)[-1] if num_ep_title else None))
title = title or re.sub(r'\s+-\s+ITVX$', '', info['title'])
if ep_title and ep_title != title:
title = title + ' - ' + ep_title
def get_thumbnails():
tns = []
for w, x in (traverse_obj(video_data, ('imagePresets'), expected_type=dict) or {}).items():
if isinstance(x, dict):
for y, z in x.items():
tns.append({'id': w + '_' + y, 'url': z})
return tns or None
video_str = lambda *x: traverse_obj(
video_data, *x, get_all=False, expected_type=strip_or_none)
return merge_dicts({
'id': video_id,
'title': title,
'formats': formats,
'subtitles': self.extract_subtitles(video_id, variants, ios_playlist_url, headers),
'duration': parse_duration(video_data.get('Duration')),
'description': clean_html(get_element_by_class('episode-info__synopsis', webpage)),
'thumbnails': thumbnails,
'http_headers': {'User-Agent': self._VANILLA_UA, },
'subtitles': subtitles,
# parsing hh:mm:ss:nnn not yet patched
'duration': parse_duration(re.sub(r'(\d{2})(:)(\d{3}$)', r'\1.\3', video_data.get('Duration') or '')),
'description': video_str('synopsis'),
'timestamp': traverse_obj(video_data, 'broadcastDateTime', 'dateTime', expected_type=parse_iso8601),
'thumbnails': get_thumbnails(),
'series': video_str('showTitle', 'programmeTitle'),
'series_number': int_or_none(video_data.get('seriesNumber')),
'episode': ep_title,
'episode_number': int_or_none((num_ep_title or '').split('.')[0]),
'channel': video_str('channel'),
'categories': traverse_obj(video_data, ('categories', 'formatted'), expected_type=list),
'age_limit': {False: 16, True: 0}.get(video_data.get('isChildrenCategory')),
}, info)
class ITVBTCCIE(ITVBaseIE):
_VALID_URL = r'https?://(?:www\.)?itv\.com/(?:news|btcc)/(?:[^/]+/)*(?P<id>[^/?#&]+)'
_TESTS = [{
'url': 'https://www.itv.com/btcc/articles/btcc-2019-brands-hatch-gp-race-action',
class ITVBTCCIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?itv\.com/btcc/(?:[^/]+/)*(?P<id>[^/?#&]+)'
_TEST = {
'url': 'http://www.itv.com/btcc/races/btcc-2018-all-the-action-from-brands-hatch',
'info_dict': {
'id': 'btcc-2019-brands-hatch-gp-race-action',
'title': 'BTCC 2019: Brands Hatch GP race action',
'id': 'btcc-2018-all-the-action-from-brands-hatch',
'title': 'BTCC 2018: All the action from Brands Hatch',
},
'playlist_count': 12,
}, {
'url': 'https://www.itv.com/news/2021-10-27/i-have-to-protect-the-country-says-rishi-sunak-as-uk-faces-interest-rate-hike',
'info_dict': {
'id': 'i-have-to-protect-the-country-says-rishi-sunak-as-uk-faces-interest-rate-hike',
'title': 'md5:6ef054dd9f069330db3dcc66cb772d32'
},
'playlist_count': 4
}]
BRIGHTCOVE_URL_TEMPLATE = 'http://players.brightcove.net/%s/%s_default/index.html?videoId=%s'
'playlist_mincount': 9,
}
BRIGHTCOVE_URL_TEMPLATE = 'http://players.brightcove.net/1582188683001/HkiHLnNRx_default/index.html?videoId=%s'
def _real_extract(self, url):
playlist_id = self._match_id(url)
webpage = self._download_webpage(url, playlist_id)
json_map = try_get(self._parse_json(self._html_search_regex(
'(?s)<script[^>]+id=[\'"]__NEXT_DATA__[^>]*>([^<]+)</script>', webpage, 'json_map'), playlist_id),
lambda x: x['props']['pageProps']['article']['body']['content']) or []
entries = []
for video in json_map:
if not any(video['data'].get(attr) == 'Brightcove' for attr in ('name', 'type')):
continue
video_id = video['data']['id']
account_id = video['data']['accountId']
player_id = video['data']['playerId']
entries.append(self.url_result(
smuggle_url(self.BRIGHTCOVE_URL_TEMPLATE % (account_id, player_id, video_id), {
entries = [
self.url_result(
smuggle_url(self.BRIGHTCOVE_URL_TEMPLATE % video_id, {
# ITV does not like some GB IP ranges, so here are some
# IP blocks it accepts
'geo_ip_blocks': [
@ -299,7 +278,8 @@ class ITVBTCCIE(ITVBaseIE):
],
'referrer': url,
}),
ie=BrightcoveNewIE.ie_key(), video_id=video_id))
ie=BrightcoveNewIE.ie_key(), video_id=video_id)
for video_id in re.findall(r'data-video-id=["\'](\d+)', webpage)]
title = self._og_search_title(webpage, fatal=False)