From e3e606de12ea138825754290542559b888f72bb5 Mon Sep 17 00:00:00 2001 From: Pritam Das <49360491+pritam20ps05@users.noreply.github.com> Date: Fri, 15 Jul 2022 22:14:43 +0530 Subject: [PATCH] [extractor/instagram] Fix post/story extractors (#4074) Closes #4343, #3077, #2736, #3002 Authored by: pritam20ps05, pukkandan --- yt_dlp/extractor/instagram.py | 185 ++++++++++++++++++---------------- 1 file changed, 99 insertions(+), 86 deletions(-) diff --git a/yt_dlp/extractor/instagram.py b/yt_dlp/extractor/instagram.py index 5a824b5003..04afacb904 100644 --- a/yt_dlp/extractor/instagram.py +++ b/yt_dlp/extractor/instagram.py @@ -1,17 +1,17 @@ -import itertools import hashlib +import itertools import json import re import time +import urllib.error from .common import InfoExtractor -from ..compat import ( - compat_HTTPError, -) from ..utils import ( ExtractorError, - format_field, + decode_base_n, + encode_base_n, float_or_none, + format_field, get_element_by_attribute, int_or_none, lowercase_escape, @@ -22,6 +22,18 @@ urlencode_postdata, ) +_ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' + + +def _pk_to_id(id): + """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id""" + return encode_base_n(int(id.split('_')[0]), table=_ENCODING_CHARS) + + +def _id_to_pk(shortcode): + """Covert a shortcode to a numeric value""" + return decode_base_n(shortcode[:11], table=_ENCODING_CHARS) + class InstagramBaseIE(InfoExtractor): _NETRC_MACHINE = 'instagram' @@ -156,6 +168,15 @@ def _extract_product(self, product_info): if isinstance(product_info, list): product_info = product_info[0] + comment_data = traverse_obj(product_info, ('edge_media_to_parent_comment', 'edges')) + comments = [{ + 'author': traverse_obj(comment_dict, ('node', 'owner', 'username')), + 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id')), + 'id': traverse_obj(comment_dict, ('node', 'id')), + 'text': traverse_obj(comment_dict, ('node', 'text')), + 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), expected_type=int_or_none), + } for comment_dict in comment_data] if comment_data else None + user_info = product_info.get('user') or {} info_dict = { 'id': product_info.get('code') or product_info.get('id'), @@ -168,6 +189,7 @@ def _extract_product(self, product_info): 'view_count': int_or_none(product_info.get('view_count')), 'like_count': int_or_none(product_info.get('like_count')), 'comment_count': int_or_none(product_info.get('comment_count')), + 'comments': comments, 'http_headers': { 'Referer': 'https://www.instagram.com/', } @@ -214,23 +236,9 @@ class InstagramIOSIE(InfoExtractor): 'add_ie': ['Instagram'] }] - def _get_id(self, id): - """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id""" - chrs = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' - media_id = int(id.split('_')[0]) - shortened_id = '' - while media_id > 0: - r = media_id % 64 - media_id = (media_id - r) // 64 - shortened_id = chrs[r] + shortened_id - return shortened_id - def _real_extract(self, url): - return { - '_type': 'url_transparent', - 'url': f'http://instagram.com/tv/{self._get_id(self._match_id(url))}/', - 'ie_key': 'Instagram', - } + video_id = _pk_to_id(self._match_id(url)) + return self.url_result(f'http://instagram.com/tv/{video_id}', InstagramIE, video_id) class InstagramIE(InstagramBaseIE): @@ -358,39 +366,49 @@ def _extract_embed_url(webpage): def _real_extract(self, url): video_id, url = self._match_valid_url(url).group('id', 'url') - webpage, urlh = self._download_webpage_handle(url, video_id) - if 'www.instagram.com/accounts/login' in urlh.geturl(): - self.report_warning('Main webpage is locked behind the login page. ' - 'Retrying with embed webpage (Note that some metadata might be missing)') - webpage = self._download_webpage( - 'https://www.instagram.com/p/%s/embed/' % video_id, video_id, note='Downloading embed webpage') - - shared_data = self._parse_json( - self._search_regex( - r'window\._sharedData\s*=\s*({.+?});', - webpage, 'shared data', default='{}'), - video_id, fatal=False) - media = traverse_obj( - shared_data, - ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'), - ('entry_data', 'PostPage', 0, 'media'), - expected_type=dict) - - # _sharedData.entry_data.PostPage is empty when authenticated (see - # https://github.com/ytdl-org/youtube-dl/pull/22880) + general_info = self._download_json( + f'https://www.instagram.com/graphql/query/?query_hash=9f8827793ef34641b2fb195d4d41151c' + f'&variables=%7B"shortcode":"{video_id}",' + '"parent_comment_count":10,"has_threaded_comments":true}', video_id, fatal=False, errnote=False, + headers={ + 'Accept': '*', + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36', + 'Authority': 'www.instagram.com', + 'Referer': 'https://www.instagram.com', + 'x-ig-app-id': '936619743392459', + }) + media = traverse_obj(general_info, ('data', 'shortcode_media')) or {} if not media: - additional_data = self._parse_json( - self._search_regex( - r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*({.+?})\s*\);', - webpage, 'additional data', default='{}'), - video_id, fatal=False) - product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict) - if product_item: - return self._extract_product(product_item) - media = traverse_obj(additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {} + self.report_warning('General metadata extraction failed', video_id) - if not media and 'www.instagram.com/accounts/login' in urlh.geturl(): - self.raise_login_required('You need to log in to access this content') + info = self._download_json( + f'https://i.instagram.com/api/v1/media/{_id_to_pk(video_id)}/info/', video_id, + fatal=False, note='Downloading video info', errnote=False, headers={ + 'Accept': '*', + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36', + 'Authority': 'www.instagram.com', + 'Referer': 'https://www.instagram.com', + 'x-ig-app-id': '936619743392459', + }) + if info: + media.update(info['items'][0]) + return self._extract_product(media) + + webpage = self._download_webpage( + f'https://www.instagram.com/p/{video_id}/embed/', video_id, + note='Downloading embed webpage', fatal=False) + if not webpage: + self.raise_login_required('Requested content was not found, the content might be private') + + additional_data = self._search_json( + r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*', webpage, 'additional data', video_id, fatal=False) + product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict) + if product_item: + media.update(product_item) + return self._extract_product(media) + + media.update(traverse_obj( + additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {}) username = traverse_obj(media, ('owner', 'username')) or self._search_regex( r'"owner"\s*:\s*{\s*"username"\s*:\s*"(.+?)"', webpage, 'username', fatal=False) @@ -519,7 +537,7 @@ def _extract_graphql(self, data, url): except ExtractorError as e: # if it's an error caused by a bad query, and there are # more GIS templates to try, ignore it and keep trying - if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403: + if isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 403: if gis_tmpl != gis_tmpls[-1]: continue raise @@ -629,41 +647,36 @@ class InstagramStoryIE(InstagramBaseIE): def _real_extract(self, url): username, story_id = self._match_valid_url(url).groups() - - story_info_url = f'{username}/{story_id}/?__a=1' if username == 'highlights' else f'{username}/?__a=1' - story_info = self._download_json(f'https://www.instagram.com/stories/{story_info_url}', story_id, headers={ - 'X-IG-App-ID': 936619743392459, - 'X-ASBD-ID': 198387, - 'X-IG-WWW-Claim': 0, - 'X-Requested-With': 'XMLHttpRequest', - 'Referer': url, - }) - user_id = story_info['user']['id'] - highlight_title = traverse_obj(story_info, ('highlight', 'title')) + story_info = self._download_webpage(url, story_id) + user_info = self._search_json(r'"user":', story_info, 'user info', story_id, fatal=False) + if not user_info: + self.raise_login_required('This content is unreachable') + user_id = user_info.get('id') story_info_url = user_id if username != 'highlights' else f'highlight:{story_id}' - videos = self._download_json(f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}', story_id, headers={ - 'X-IG-App-ID': 936619743392459, - 'X-ASBD-ID': 198387, - 'X-IG-WWW-Claim': 0, - })['reels'] + videos = traverse_obj(self._download_json( + f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}', + story_id, errnote=False, fatal=False, headers={ + 'X-IG-App-ID': 936619743392459, + 'X-ASBD-ID': 198387, + 'X-IG-WWW-Claim': 0, + }), 'reels') + if not videos: + self.raise_login_required('You need to log in to access this content') - full_name = traverse_obj(videos, ('user', 'full_name')) - - user_info = {} - if not (username and username != 'highlights' and full_name): - user_info = self._download_json( - f'https://i.instagram.com/api/v1/users/{user_id}/info/', story_id, headers={ - 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SM-A505F Build/RP1A.200720.012; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/96.0.4664.45 Mobile Safari/537.36 Instagram 214.1.0.29.120 Android (30/11; 450dpi; 1080x2122; samsung; SM-A505F; a50; exynos9610; en_US; 333717274)', - }, note='Downloading user info') - - username = traverse_obj(user_info, ('user', 'username')) or username - full_name = traverse_obj(user_info, ('user', 'full_name')) or full_name + full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (str(user_id), 'user', 'full_name')) + story_title = traverse_obj(videos, (f'highlight:{story_id}', 'title')) + if not story_title: + story_title = f'Story by {username}' highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (str(user_id), 'items')) - return self.playlist_result([{ - **self._extract_product(highlight), - 'title': f'Story by {username}', - 'uploader': full_name, - 'uploader_id': user_id, - } for highlight in highlights], playlist_id=story_id, playlist_title=highlight_title) + info_data = [] + for highlight in highlights: + highlight_data = self._extract_product(highlight) + if highlight_data.get('formats'): + info_data.append({ + **highlight_data, + 'uploader': full_name, + 'uploader_id': user_id, + }) + return self.playlist_result(info_data, playlist_id=story_id, playlist_title=story_title)