[extractor/AbemaTVTitle] Implement paging (#4376)

Authored by: Lesmiscore
This commit is contained in:
Lesmiscore 2022-07-18 22:06:54 +09:00 committed by GitHub
parent 8ef5af1942
commit bc83b4b06c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,5 +1,6 @@
import base64 import base64
import binascii import binascii
import functools
import hashlib import hashlib
import hmac import hmac
import io import io
@ -20,11 +21,11 @@
decode_base_n, decode_base_n,
int_or_none, int_or_none,
intlist_to_bytes, intlist_to_bytes,
OnDemandPagedList,
request_to_url, request_to_url,
time_seconds, time_seconds,
traverse_obj, traverse_obj,
update_url_query, update_url_query,
urljoin,
) )
# NOTE: network handler related code is temporary thing until network stack overhaul PRs are merged (#2861/#2862) # NOTE: network handler related code is temporary thing until network stack overhaul PRs are merged (#2861/#2862)
@ -145,17 +146,106 @@ def abematv_license_open(self, url):
class AbemaTVBaseIE(InfoExtractor): class AbemaTVBaseIE(InfoExtractor):
_USERTOKEN = None
_DEVICE_ID = None
_MEDIATOKEN = None
_SECRETKEY = b'v+Gjs=25Aw5erR!J8ZuvRrCx*rGswhB&qdHd_SYerEWdU&a?3DzN9BRbp5KwY4hEmcj5#fykMjJ=AuWz5GSMY-d@H7DMEh3M@9n2G552Us$$k9cD=3TxwWe86!x#Zyhe'
@classmethod
def _generate_aks(cls, deviceid):
deviceid = deviceid.encode('utf-8')
# add 1 hour and then drop minute and secs
ts_1hour = int((time_seconds(hours=9) // 3600 + 1) * 3600)
time_struct = time.gmtime(ts_1hour)
ts_1hour_str = str(ts_1hour).encode('utf-8')
tmp = None
def mix_once(nonce):
nonlocal tmp
h = hmac.new(cls._SECRETKEY, digestmod=hashlib.sha256)
h.update(nonce)
tmp = h.digest()
def mix_tmp(count):
nonlocal tmp
for i in range(count):
mix_once(tmp)
def mix_twist(nonce):
nonlocal tmp
mix_once(base64.urlsafe_b64encode(tmp).rstrip(b'=') + nonce)
mix_once(cls._SECRETKEY)
mix_tmp(time_struct.tm_mon)
mix_twist(deviceid)
mix_tmp(time_struct.tm_mday % 5)
mix_twist(ts_1hour_str)
mix_tmp(time_struct.tm_hour % 5)
return base64.urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8')
def _get_device_token(self):
if self._USERTOKEN:
return self._USERTOKEN
AbemaTVBaseIE._DEVICE_ID = str(uuid.uuid4())
aks = self._generate_aks(self._DEVICE_ID)
user_data = self._download_json(
'https://api.abema.io/v1/users', None, note='Authorizing',
data=json.dumps({
'deviceId': self._DEVICE_ID,
'applicationKeySecret': aks,
}).encode('utf-8'),
headers={
'Content-Type': 'application/json',
})
AbemaTVBaseIE._USERTOKEN = user_data['token']
# don't allow adding it 2 times or more, though it's guarded
remove_opener(self._downloader, AbemaLicenseHandler)
add_opener(self._downloader, AbemaLicenseHandler(self))
return self._USERTOKEN
def _get_media_token(self, invalidate=False, to_show=True):
if not invalidate and self._MEDIATOKEN:
return self._MEDIATOKEN
AbemaTVBaseIE._MEDIATOKEN = self._download_json(
'https://api.abema.io/v1/media/token', None, note='Fetching media token' if to_show else False,
query={
'osName': 'android',
'osVersion': '6.0.1',
'osLang': 'ja_JP',
'osTimezone': 'Asia/Tokyo',
'appId': 'tv.abema',
'appVersion': '3.27.1'
}, headers={
'Authorization': f'bearer {self._get_device_token()}',
})['token']
return self._MEDIATOKEN
def _call_api(self, endpoint, video_id, query=None, note='Downloading JSON metadata'):
return self._download_json(
f'https://api.abema.io/{endpoint}', video_id, query=query or {},
note=note,
headers={
'Authorization': f'bearer {self._get_device_token()}',
})
def _extract_breadcrumb_list(self, webpage, video_id): def _extract_breadcrumb_list(self, webpage, video_id):
for jld in re.finditer( for jld in re.finditer(
r'(?is)</span></li></ul><script[^>]+type=(["\']?)application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>', r'(?is)</span></li></ul><script[^>]+type=(["\']?)application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>',
webpage): webpage):
jsonld = self._parse_json(jld.group('json_ld'), video_id, fatal=False) jsonld = self._parse_json(jld.group('json_ld'), video_id, fatal=False)
if jsonld: if traverse_obj(jsonld, '@type') != 'BreadcrumbList':
if jsonld.get('@type') != 'BreadcrumbList':
continue continue
trav = traverse_obj(jsonld, ('itemListElement', ..., 'name')) items = traverse_obj(jsonld, ('itemListElement', ..., 'name'))
if trav: if items:
return trav return items
return [] return []
@ -207,87 +297,7 @@ class AbemaTVIE(AbemaTVBaseIE):
}, },
'skip': 'Not supported until yt-dlp implements native live downloader OR AbemaTV can start a local HTTP server', 'skip': 'Not supported until yt-dlp implements native live downloader OR AbemaTV can start a local HTTP server',
}] }]
_USERTOKEN = None
_DEVICE_ID = None
_TIMETABLE = None _TIMETABLE = None
_MEDIATOKEN = None
_SECRETKEY = b'v+Gjs=25Aw5erR!J8ZuvRrCx*rGswhB&qdHd_SYerEWdU&a?3DzN9BRbp5KwY4hEmcj5#fykMjJ=AuWz5GSMY-d@H7DMEh3M@9n2G552Us$$k9cD=3TxwWe86!x#Zyhe'
def _generate_aks(self, deviceid):
deviceid = deviceid.encode('utf-8')
# add 1 hour and then drop minute and secs
ts_1hour = int((time_seconds(hours=9) // 3600 + 1) * 3600)
time_struct = time.gmtime(ts_1hour)
ts_1hour_str = str(ts_1hour).encode('utf-8')
tmp = None
def mix_once(nonce):
nonlocal tmp
h = hmac.new(self._SECRETKEY, digestmod=hashlib.sha256)
h.update(nonce)
tmp = h.digest()
def mix_tmp(count):
nonlocal tmp
for i in range(count):
mix_once(tmp)
def mix_twist(nonce):
nonlocal tmp
mix_once(base64.urlsafe_b64encode(tmp).rstrip(b'=') + nonce)
mix_once(self._SECRETKEY)
mix_tmp(time_struct.tm_mon)
mix_twist(deviceid)
mix_tmp(time_struct.tm_mday % 5)
mix_twist(ts_1hour_str)
mix_tmp(time_struct.tm_hour % 5)
return base64.urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8')
def _get_device_token(self):
if self._USERTOKEN:
return self._USERTOKEN
self._DEVICE_ID = str(uuid.uuid4())
aks = self._generate_aks(self._DEVICE_ID)
user_data = self._download_json(
'https://api.abema.io/v1/users', None, note='Authorizing',
data=json.dumps({
'deviceId': self._DEVICE_ID,
'applicationKeySecret': aks,
}).encode('utf-8'),
headers={
'Content-Type': 'application/json',
})
self._USERTOKEN = user_data['token']
# don't allow adding it 2 times or more, though it's guarded
remove_opener(self._downloader, AbemaLicenseHandler)
add_opener(self._downloader, AbemaLicenseHandler(self))
return self._USERTOKEN
def _get_media_token(self, invalidate=False, to_show=True):
if not invalidate and self._MEDIATOKEN:
return self._MEDIATOKEN
self._MEDIATOKEN = self._download_json(
'https://api.abema.io/v1/media/token', None, note='Fetching media token' if to_show else False,
query={
'osName': 'android',
'osVersion': '6.0.1',
'osLang': 'ja_JP',
'osTimezone': 'Asia/Tokyo',
'appId': 'tv.abema',
'appVersion': '3.27.1'
}, headers={
'Authorization': 'bearer ' + self._get_device_token()
})['token']
return self._MEDIATOKEN
def _perform_login(self, username, password): def _perform_login(self, username, password):
if '@' in username: # don't strictly check if it's email address or not if '@' in username: # don't strictly check if it's email address or not
@ -301,13 +311,13 @@ def _perform_login(self, username, password):
method: username, method: username,
'password': password 'password': password
}).encode('utf-8'), headers={ }).encode('utf-8'), headers={
'Authorization': 'bearer ' + self._get_device_token(), 'Authorization': f'bearer {self._get_device_token()}',
'Origin': 'https://abema.tv', 'Origin': 'https://abema.tv',
'Referer': 'https://abema.tv/', 'Referer': 'https://abema.tv/',
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}) })
self._USERTOKEN = login_response['token'] AbemaTVBaseIE._USERTOKEN = login_response['token']
self._get_media_token(True) self._get_media_token(True)
def _real_extract(self, url): def _real_extract(self, url):
@ -442,6 +452,7 @@ def _real_extract(self, url):
class AbemaTVTitleIE(AbemaTVBaseIE): class AbemaTVTitleIE(AbemaTVBaseIE):
_VALID_URL = r'https?://abema\.tv/video/title/(?P<id>[^?/]+)' _VALID_URL = r'https?://abema\.tv/video/title/(?P<id>[^?/]+)'
_PAGE_SIZE = 25
_TESTS = [{ _TESTS = [{
'url': 'https://abema.tv/video/title/90-1597', 'url': 'https://abema.tv/video/title/90-1597',
@ -457,18 +468,39 @@ class AbemaTVTitleIE(AbemaTVBaseIE):
'title': '真心が届く~僕とスターのオフィス・ラブ!?~', 'title': '真心が届く~僕とスターのオフィス・ラブ!?~',
}, },
'playlist_mincount': 16, 'playlist_mincount': 16,
}, {
'url': 'https://abema.tv/video/title/25-102',
'info_dict': {
'id': '25-102',
'title': 'ソードアート・オンライン アリシゼーション',
},
'playlist_mincount': 24,
}] }]
def _fetch_page(self, playlist_id, series_version, page):
programs = self._call_api(
f'v1/video/series/{playlist_id}/programs', playlist_id,
note=f'Downloading page {page + 1}',
query={
'seriesVersion': series_version,
'offset': str(page * self._PAGE_SIZE),
'order': 'seq',
'limit': str(self._PAGE_SIZE),
})
yield from (
self.url_result(f'https://abema.tv/video/episode/{x}')
for x in traverse_obj(programs, ('programs', ..., 'id'), default=[]))
def _entries(self, playlist_id, series_version):
return OnDemandPagedList(
functools.partial(self._fetch_page, playlist_id, series_version),
self._PAGE_SIZE)
def _real_extract(self, url): def _real_extract(self, url):
video_id = self._match_id(url) playlist_id = self._match_id(url)
webpage = self._download_webpage(url, video_id) series_info = self._call_api(f'v1/video/series/{playlist_id}', playlist_id)
playlist_title, breadcrumb = None, self._extract_breadcrumb_list(webpage, video_id) return self.playlist_result(
if breadcrumb: self._entries(playlist_id, series_info['version']), playlist_id=playlist_id,
playlist_title = breadcrumb[-1] playlist_title=series_info.get('title'),
playlist_description=series_info.get('content'))
playlist = [
self.url_result(urljoin('https://abema.tv/', mobj.group(1)))
for mobj in re.finditer(r'<li\s*class=".+?EpisodeList.+?"><a\s*href="(/[^"]+?)"', webpage)]
return self.playlist_result(playlist, playlist_title=playlist_title, playlist_id=video_id)