handle KeyError: 'frameworkUpdates' when the old comment format is served

This commit is contained in:
jakeogh 2024-04-23 14:53:00 -07:00
parent 16cb4fedbe
commit 6083596d50

View file

@ -3353,6 +3353,55 @@ def _extract_comment(self, view_model, entity, parent=None):
return info
def _extract_comment_old(self, comment_renderer, parent=None):
comment_id = comment_renderer.get('commentId')
if not comment_id:
return
info = {
'id': comment_id,
'text': self._get_text(comment_renderer, 'contentText'),
'like_count': self._get_count(comment_renderer, 'voteCount'),
'author_id': traverse_obj(comment_renderer, ('authorEndpoint', 'browseEndpoint', 'browseId', {self.ucid_or_none})),
'author': self._get_text(comment_renderer, 'authorText'),
'author_thumbnail': traverse_obj(comment_renderer, ('authorThumbnail', 'thumbnails', -1, 'url', {url_or_none})),
'parent': parent or 'root',
}
# Timestamp is an estimate calculated from the current time and time_text
time_text = self._get_text(comment_renderer, 'publishedTimeText') or ''
timestamp = self._parse_time_text(time_text)
info.update({
# FIXME: non-standard, but we need a way of showing that it is an estimate.
'_time_text': time_text,
'timestamp': timestamp,
})
info['author_url'] = urljoin(
'https://www.youtube.com', traverse_obj(comment_renderer, ('authorEndpoint', (
('browseEndpoint', 'canonicalBaseUrl'), ('commandMetadata', 'webCommandMetadata', 'url'))),
expected_type=str, get_all=False))
author_is_uploader = traverse_obj(comment_renderer, 'authorIsChannelOwner')
if author_is_uploader is not None:
info['author_is_uploader'] = author_is_uploader
comment_abr = traverse_obj(
comment_renderer, ('actionButtons', 'commentActionButtonsRenderer'), expected_type=dict)
if comment_abr is not None:
info['is_favorited'] = 'creatorHeart' in comment_abr
badges = self._extract_badges([traverse_obj(comment_renderer, 'authorCommentBadge')])
if self._has_badge(badges, BadgeType.VERIFIED):
info['author_is_verified'] = True
is_pinned = traverse_obj(comment_renderer, 'pinnedCommentBadge')
if is_pinned:
info['is_pinned'] = True
return info
def _comment_entries(self, root_continuation_data, ytcfg, video_id, parent=None, tracker=None):
get_single_config_arg = lambda c: self._configuration_arg(c, [''])[0]
@ -3392,16 +3441,30 @@ def extract_thread(contents, entity_payloads):
if not parent and tracker['total_parent_comments'] >= max_parents:
yield
comment_thread_renderer = try_get(content, lambda x: x['commentThreadRenderer'])
view_model = traverse_obj(comment_thread_renderer, ('commentViewModel', 'commentViewModel'))
if not view_model:
view_model = content.get('commentViewModel')
if not view_model:
continue
comment_id = view_model['commentId']
for entity in entity_payloads:
if traverse_obj(entity, ('payload', 'commentEntityPayload', 'properties', 'commentId')) == comment_id:
entity = entity
break
# old comment format
if entity_payloads is None:
comment_renderer = get_first(
(comment_thread_renderer, content), [['commentRenderer', ('comment', 'commentRenderer')]],
expected_type=dict, default={})
comment = self._extract_comment_old(comment_renderer, parent)
if not comment:
continue
comment_id = comment['id']
# new comment format
else:
view_model = traverse_obj(comment_thread_renderer, ('commentViewModel', 'commentViewModel'))
if not view_model:
view_model = content.get('commentViewModel')
if not view_model:
continue
comment_id = view_model['commentId']
for entity in entity_payloads:
if traverse_obj(entity, ('payload', 'commentEntityPayload', 'properties', 'commentId')) == comment_id:
entity = entity
break
comment = self._extract_comment(view_model, entity, parent)
if comment.get('is_pinned'):
@ -3528,10 +3591,16 @@ def extract_thread(contents, entity_payloads):
break
continue
for entry in extract_thread(continuation_items, response['frameworkUpdates']['entityBatchUpdate']['mutations']):
if 'frameworkUpdates' in response:
_iterator = extract_thread(continuation_items, response['frameworkUpdates']['entityBatchUpdate']['mutations'])
else:
_iterator = extract_thread(continuation_items, None)
for entry in _iterator:
if not entry:
return
yield entry
continuation = self._extract_continuation({'contents': continuation_items})
if continuation:
break