[youtube] Fix feed extraction

In order to extract videos from further pages, we need to get various variables that are in an argument to the `ytcfg.set` call in a script on the feed page.
This commit is contained in:
xarantolus 2020-07-30 16:34:48 +02:00
parent de722d3cd7
commit c449f70965

View file

@ -3309,6 +3309,8 @@ class YoutubeFeedsInfoExtractor(YoutubeBaseInfoExtractor):
Subclasses must define the _FEED_NAME and _PLAYLIST_TITLE properties. Subclasses must define the _FEED_NAME and _PLAYLIST_TITLE properties.
""" """
_LOGIN_REQUIRED = True _LOGIN_REQUIRED = True
_FEED_DATA = r'window\[\"ytInitialData\"\]\W?=\W?({.*?});'
_YTCFG_DATA = r"ytcfg.set\(({.*?})\)"
@property @property
def IE_NAME(self): def IE_NAME(self):
@ -3317,37 +3319,91 @@ def IE_NAME(self):
def _real_initialize(self): def _real_initialize(self):
self._login() self._login()
def _find_videos_in_json(self, extracted):
videos = []
continuation = None
def _real_find(obj):
if obj is None or isinstance(obj, str):
return
if type(obj) is list:
for elem in obj:
_real_find(elem)
if type(obj) is dict:
if "videoId" in obj:
videos.append(obj)
return
if "nextContinuationData" in obj:
nonlocal continuation
continuation = obj["nextContinuationData"]
return
for _, o in obj.items():
_real_find(o)
_real_find(extracted)
return videos, continuation
def _entries(self, page): def _entries(self, page):
# The extraction process is the same as for playlists, but the regex info = []
# for the video ids doesn't contain an index
ids = [] yt_conf = self._parse_json(self._search_regex(self._YTCFG_DATA, page, 'ytcfg.set'), None)
more_widget_html = content_html = page
search_response = self._parse_json(self._search_regex(self._FEED_DATA, page, 'ytInitialData'), None)
for page_num in itertools.count(1): for page_num in itertools.count(1):
matches = re.findall(r'href="\s*/watch\?v=([0-9A-Za-z_-]{11})', content_html) video_info, continuation = self._find_videos_in_json(search_response)
# 'recommended' feed has infinite 'load more' and each new portion spins new_info = []
# the same videos in (sometimes) slightly different order, so we'll check
# for unicity and break when portion has no new videos for v in video_info:
new_ids = list(filter(lambda video_id: video_id not in ids, orderedSet(matches))) v_id = try_get(v, lambda x: x['videoId'])
if not new_ids: if not v_id:
continue
have_video = False
for old in info:
if old['videoId'] == v_id:
have_video = True
break break
ids.extend(new_ids) if not have_video:
new_info.append(v)
for entry in self._ids_to_results(new_ids): if not new_info:
yield entry
mobj = re.search(r'data-uix-load-more-href="/?(?P<more>[^"]+)"', more_widget_html)
if not mobj:
break break
more = self._download_json( info.extend(new_info)
'https://www.youtube.com/%s' % mobj.group('more'), self._PLAYLIST_TITLE,
for video in new_info:
yield self.url_result(try_get(video, lambda x: x['videoId']), YoutubeIE.ie_key(), video_title=try_get(video, lambda x: x['title']['simpleText']))
if not continuation:
break
search_response = self._download_json(
'https://www.youtube.com/browse_ajax', self._PLAYLIST_TITLE,
'Downloading page #%s' % page_num, 'Downloading page #%s' % page_num,
transform_source=uppercase_escape, transform_source=uppercase_escape,
headers=self._YOUTUBE_CLIENT_HEADERS) query={
content_html = more['content_html'] "ctoken": try_get(continuation, lambda x: x["continuation"]),
more_widget_html = more['load_more_widget_html'] "continuation": try_get(continuation, lambda x: x["continuation"]),
"itct": try_get(continuation, lambda x: x["clickTrackingParams"])
},
headers={
"X-YouTube-Client-Name": try_get(yt_conf, lambda x: x["INNERTUBE_CONTEXT_CLIENT_NAME"]),
"X-YouTube-Client-Version": try_get(yt_conf, lambda x: x["INNERTUBE_CONTEXT_CLIENT_VERSION"]),
"X-Youtube-Identity-Token": try_get(yt_conf, lambda x: x["ID_TOKEN"]),
"X-YouTube-Device": try_get(yt_conf, lambda x: x["DEVICE"]),
"X-YouTube-Page-CL": try_get(yt_conf, lambda x: x["PAGE_CL"]),
"X-YouTube-Page-Label": try_get(yt_conf, lambda x: x["PAGE_BUILD_LABEL"]),
"X-YouTube-Variants-Checksum": try_get(yt_conf, lambda x: x["VARIANTS_CHECKSUM"]),
})
def _real_extract(self, url): def _real_extract(self, url):
page = self._download_webpage( page = self._download_webpage(