[vlive:channel] Fix extraction

Based on https://github.com/ytdl-org/youtube-dl/pull/29866
Closes #749, #927, https://github.com/ytdl-org/youtube-dl/issues/29837
Authored by kikuyan, pukkandan
This commit is contained in:
pukkandan 2021-10-22 23:13:06 +05:30
parent ad0090d0d2
commit 457f6d6866
No known key found for this signature in database
GPG key ID: 0F00D95A001F4698

View file

@ -17,17 +17,65 @@
strip_or_none, strip_or_none,
try_get, try_get,
urlencode_postdata, urlencode_postdata,
url_or_none,
) )
class VLiveBaseIE(NaverBaseIE): class VLiveBaseIE(NaverBaseIE):
_APP_ID = '8c6cc7b45d2568fb668be6e05b6e5a3b' _NETRC_MACHINE = 'vlive'
_logged_in = False
def _real_initialize(self):
if not self._logged_in:
VLiveBaseIE._logged_in = self._login()
def _login(self):
email, password = self._get_login_info()
if email is None:
return False
LOGIN_URL = 'https://www.vlive.tv/auth/email/login'
self._request_webpage(
LOGIN_URL, None, note='Downloading login cookies')
self._download_webpage(
LOGIN_URL, None, note='Logging in',
data=urlencode_postdata({'email': email, 'pwd': password}),
headers={
'Referer': LOGIN_URL,
'Content-Type': 'application/x-www-form-urlencoded'
})
login_info = self._download_json(
'https://www.vlive.tv/auth/loginInfo', None,
note='Checking login status',
headers={'Referer': 'https://www.vlive.tv/home'})
if not try_get(login_info, lambda x: x['message']['login'], bool):
raise ExtractorError('Unable to log in', expected=True)
return True
def _call_api(self, path_template, video_id, fields=None, query_add={}, note=None):
if note is None:
note = 'Downloading %s JSON metadata' % path_template.split('/')[-1].split('-')[0]
query = {'appId': '8c6cc7b45d2568fb668be6e05b6e5a3b', 'gcc': 'KR', 'platformType': 'PC'}
if fields:
query['fields'] = fields
if query_add:
query.update(query_add)
try:
return self._download_json(
'https://www.vlive.tv/globalv-web/vam-web/' + path_template % video_id, video_id,
note, headers={'Referer': 'https://www.vlive.tv/'}, query=query)
except ExtractorError as e:
if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403:
self.raise_login_required(json.loads(e.cause.read().decode('utf-8'))['message'])
raise
class VLiveIE(VLiveBaseIE): class VLiveIE(VLiveBaseIE):
IE_NAME = 'vlive' IE_NAME = 'vlive'
_VALID_URL = r'https?://(?:(?:www|m)\.)?vlive\.tv/(?:video|embed)/(?P<id>[0-9]+)' _VALID_URL = r'https?://(?:(?:www|m)\.)?vlive\.tv/(?:video|embed)/(?P<id>[0-9]+)'
_NETRC_MACHINE = 'vlive'
_TESTS = [{ _TESTS = [{
'url': 'http://www.vlive.tv/video/1326', 'url': 'http://www.vlive.tv/video/1326',
'md5': 'cc7314812855ce56de70a06a27314983', 'md5': 'cc7314812855ce56de70a06a27314983',
@ -81,53 +129,6 @@ class VLiveIE(VLiveBaseIE):
'playlist_mincount': 120 'playlist_mincount': 120
}] }]
def _real_initialize(self):
self._login()
def _login(self):
email, password = self._get_login_info()
if None in (email, password):
return
def is_logged_in():
login_info = self._download_json(
'https://www.vlive.tv/auth/loginInfo', None,
note='Downloading login info',
headers={'Referer': 'https://www.vlive.tv/home'})
return try_get(
login_info, lambda x: x['message']['login'], bool) or False
LOGIN_URL = 'https://www.vlive.tv/auth/email/login'
self._request_webpage(
LOGIN_URL, None, note='Downloading login cookies')
self._download_webpage(
LOGIN_URL, None, note='Logging in',
data=urlencode_postdata({'email': email, 'pwd': password}),
headers={
'Referer': LOGIN_URL,
'Content-Type': 'application/x-www-form-urlencoded'
})
if not is_logged_in():
raise ExtractorError('Unable to log in', expected=True)
def _call_api(self, path_template, video_id, fields=None, limit=None):
query = {'appId': self._APP_ID, 'gcc': 'KR', 'platformType': 'PC'}
if fields:
query['fields'] = fields
if limit:
query['limit'] = limit
try:
return self._download_json(
'https://www.vlive.tv/globalv-web/vam-web/' + path_template % video_id, video_id,
'Downloading %s JSON metadata' % path_template.split('/')[-1].split('-')[0],
headers={'Referer': 'https://www.vlive.tv/'}, query=query)
except ExtractorError as e:
if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403:
self.raise_login_required(json.loads(e.cause.read().decode('utf-8'))['message'])
raise
def _real_extract(self, url): def _real_extract(self, url):
video_id = self._match_id(url) video_id = self._match_id(url)
@ -150,7 +151,7 @@ def _real_extract(self, url):
playlist_count = str_or_none(playlist.get('totalCount')) playlist_count = str_or_none(playlist.get('totalCount'))
playlist = self._call_api( playlist = self._call_api(
'playlist/v1.0/playlist-%s/posts', playlist_id, 'data', limit=playlist_count) 'playlist/v1.0/playlist-%s/posts', playlist_id, 'data', {'limit': playlist_count})
entries = [] entries = []
for video_data in playlist['data']: for video_data in playlist['data']:
@ -216,7 +217,7 @@ def get_common_fields():
raise ExtractorError('Unknown status ' + status) raise ExtractorError('Unknown status ' + status)
class VLivePostIE(VLiveIE): class VLivePostIE(VLiveBaseIE):
IE_NAME = 'vlive:post' IE_NAME = 'vlive:post'
_VALID_URL = r'https?://(?:(?:www|m)\.)?vlive\.tv/post/(?P<id>\d-\d+)' _VALID_URL = r'https?://(?:(?:www|m)\.)?vlive\.tv/post/(?P<id>\d-\d+)'
_TESTS = [{ _TESTS = [{
@ -238,8 +239,6 @@ class VLivePostIE(VLiveIE):
'playlist_count': 1, 'playlist_count': 1,
}] }]
_FVIDEO_TMPL = 'fvideo/v1.0/fvideo-%%s/%s' _FVIDEO_TMPL = 'fvideo/v1.0/fvideo-%%s/%s'
_SOS_TMPL = _FVIDEO_TMPL % 'sosPlayInfo'
_INKEY_TMPL = _FVIDEO_TMPL % 'inKey'
def _real_extract(self, url): def _real_extract(self, url):
post_id = self._match_id(url) post_id = self._match_id(url)
@ -266,7 +265,7 @@ def _real_extract(self, url):
entry = None entry = None
if upload_type == 'SOS': if upload_type == 'SOS':
download = self._call_api( download = self._call_api(
self._SOS_TMPL, video_id)['videoUrl']['download'] self._FVIDEO_TMPL % 'sosPlayInfo', video_id)['videoUrl']['download']
formats = [] formats = []
for f_id, f_url in download.items(): for f_id, f_url in download.items():
formats.append({ formats.append({
@ -284,7 +283,7 @@ def _real_extract(self, url):
vod_id = upload_info.get('videoId') vod_id = upload_info.get('videoId')
if not vod_id: if not vod_id:
continue continue
inkey = self._call_api(self._INKEY_TMPL, video_id)['inKey'] inkey = self._call_api(self._FVIDEO_TMPL % 'inKey', video_id)['inKey']
entry = self._extract_video_info(video_id, vod_id, inkey) entry = self._extract_video_info(video_id, vod_id, inkey)
if entry: if entry:
entry['title'] = '%s_part%s' % (title, idx) entry['title'] = '%s_part%s' % (title, idx)
@ -295,7 +294,7 @@ def _real_extract(self, url):
class VLiveChannelIE(VLiveBaseIE): class VLiveChannelIE(VLiveBaseIE):
IE_NAME = 'vlive:channel' IE_NAME = 'vlive:channel'
_VALID_URL = r'https?://(?:channels\.vlive\.tv|(?:(?:www|m)\.)?vlive\.tv/channel)/(?P<id>[0-9A-Z]+)' _VALID_URL = r'https?://(?:channels\.vlive\.tv|(?:(?:www|m)\.)?vlive\.tv/channel)/(?P<channel_id>[0-9A-Z]+)(?:/board/(?P<posts_id>\d+))?'
_TESTS = [{ _TESTS = [{
'url': 'http://channels.vlive.tv/FCD4B', 'url': 'http://channels.vlive.tv/FCD4B',
'info_dict': { 'info_dict': {
@ -306,78 +305,58 @@ class VLiveChannelIE(VLiveBaseIE):
}, { }, {
'url': 'https://www.vlive.tv/channel/FCD4B', 'url': 'https://www.vlive.tv/channel/FCD4B',
'only_matching': True, 'only_matching': True,
}, {
'url': 'https://www.vlive.tv/channel/FCD4B/board/3546',
'info_dict': {
'id': 'FCD4B-3546',
'title': 'MAMAMOO - Star Board',
},
'playlist_mincount': 880
}] }]
def _call_api(self, path, channel_key_suffix, channel_value, note, query): def _entries(self, posts_id, board_name):
q = { if board_name:
'app_id': self._APP_ID, posts_path = 'post/v1.0/board-%s/posts'
'channel' + channel_key_suffix: channel_value, query_add = {'limit': 100, 'sortType': 'LATEST'}
} else:
q.update(query) posts_path = 'post/v1.0/channel-%s/starPosts'
return self._download_json( query_add = {'limit': 100}
'http://api.vfan.vlive.tv/vproxy/channelplus/' + path,
channel_value, note='Downloading ' + note, query=q)['result']
def _real_extract(self, url):
channel_code = self._match_id(url)
channel_seq = self._call_api(
'decodeChannelCode', 'Code', channel_code,
'decode channel code', {})['channelSeq']
channel_name = None
entries = []
for page_num in itertools.count(1): for page_num in itertools.count(1):
video_list = self._call_api( video_list = self._call_api(
'getChannelVideoList', 'Seq', channel_seq, posts_path, posts_id, 'channel{channelName},contentType,postId,title,url', query_add,
'channel list page #%d' % page_num, { note=f'Downloading playlist page {page_num}')
# Large values of maxNumOfRows (~300 or above) may cause
# empty responses (see [1]), e.g. this happens for [2] that
# has more than 300 videos.
# 1. https://github.com/ytdl-org/youtube-dl/issues/13830
# 2. http://channels.vlive.tv/EDBF.
'maxNumOfRows': 100,
'pageNo': page_num
}
)
if not channel_name: for video in try_get(video_list, lambda x: x['data'], list) or []:
channel_name = try_get( video_id = str(video.get('postId'))
video_list, video_title = str_or_none(video.get('title'))
lambda x: x['channelInfo']['channelName'], video_url = url_or_none(video.get('url'))
compat_str) if not all((video_id, video_title, video_url)) or video.get('contentType') != 'VIDEO':
continue
channel_name = try_get(video, lambda x: x['channel']['channelName'], compat_str)
yield self.url_result(video_url, VLivePostIE.ie_key(), video_id, video_title, channel=channel_name)
videos = try_get( after = try_get(video_list, lambda x: x['paging']['nextParams']['after'], compat_str)
video_list, lambda x: x['videoList'], list) if not after:
if not videos:
break break
query_add['after'] = after
for video in videos: def _real_extract(self, url):
video_id = video.get('videoSeq') channel_id, posts_id = self._match_valid_url(url).groups()
video_type = video.get('videoType')
if not video_id or not video_type: board_name = None
continue if posts_id:
video_id = compat_str(video_id) board = self._call_api(
'board/v1.0/board-%s', posts_id, 'title,boardType')
board_name = board.get('title') or 'Unknown'
if board.get('boardType') not in ('STAR', 'VLIVE_PLUS'):
raise ExtractorError(f'Board {board_name!r} is not supported', expected=True)
if video_type in ('PLAYLIST'): entries = self._entries(posts_id or channel_id, board_name)
first_video_id = try_get( first_video = next(entries)
video, channel_name = first_video['channel']
lambda x: x['videoPlaylist']['videoList'][0]['videoSeq'], int)
if not first_video_id:
continue
entries.append(
self.url_result(
'http://www.vlive.tv/video/%s' % first_video_id,
ie=VLiveIE.ie_key(), video_id=first_video_id))
else:
entries.append(
self.url_result(
'http://www.vlive.tv/video/%s' % video_id,
ie=VLiveIE.ie_key(), video_id=video_id))
return self.playlist_result( return self.playlist_result(
entries, channel_code, channel_name) itertools.chain([first_video], entries),
f'{channel_id}-{posts_id}' if posts_id else channel_id,
f'{channel_name} - {board_name}' if channel_name and board_name else channel_name)