- avoid repeatedly calling `self._configuration_arg`
- avoid unnecessary looping when 'ver' argument is not provided
- add/change error messages
This commit is contained in:
kclauhk 2024-09-28 17:00:22 +08:00
parent 52f73ffc61
commit 05b4719f6c

View file

@ -17,14 +17,12 @@
class ExtremeMusicBaseIE(InfoExtractor): class ExtremeMusicBaseIE(InfoExtractor):
_API_URL = 'https://snapi.extrememusic.com' _API_URL = 'https://snapi.extrememusic.com'
_REQUEST_HEADERS = None _REQUEST_HEADERS = None
_REQUIRE_VERSION = []
def _process_version_arg(self, arg): def _initialize(self, url, video_id, country=None):
self._version_requested = arg self._REQUIRE_VERSION = self._configuration_arg('ver') or self._configuration_arg('version')
# This site serves different versions of the same playlist id due to geo-restriction
def _set_request_headers(self, url, video_id, country=None): # use user's own country code if no code (geo_bypass_country or pre-defined country code) is provided
if not self._REQUEST_HEADERS:
# the site serves different versions of the same playlist id due to geo-restriction,
# so use user's own country code or geo_bypass_country code
if not country: if not country:
country = self._download_webpage('https://ipapi.co/country_code', video_id) country = self._download_webpage('https://ipapi.co/country_code', video_id)
env = self._download_json('https://www.extrememusic.com/env', video_id) env = self._download_json('https://www.extrememusic.com/env', video_id)
@ -37,27 +35,26 @@ def _set_request_headers(self, url, video_id, country=None):
'X-Site-Id': 4, 'X-Site-Id': 4,
'X-Viewer-Country': country.upper(), 'X-Viewer-Country': country.upper(),
} }
return self._REQUEST_HEADERS
def _get_album_data(self, album_id, video_id, fatal=True): def _get_album_data(self, album_id, video_id, fatal=True):
self._process_version_arg(self._configuration_arg('ver') or self._configuration_arg('version'))
album = self._download_json(f'{self._API_URL}/albums/{album_id}', video_id, fatal=fatal, album = self._download_json(f'{self._API_URL}/albums/{album_id}', video_id, fatal=fatal,
note='Downloading album data', headers=self._REQUEST_HEADERS) note='Downloading album data', errnote='Unable to download album data',
headers=self._REQUEST_HEADERS) or {}
if video_id == album_id: if video_id == album_id:
bio = self._download_json(f'{self._API_URL}/albums/{album_id}/bio', video_id, fatal=False, bio = self._download_json(f'{self._API_URL}/albums/{album_id}/bio', video_id, fatal=False,
note='Downloading album data', headers=self._REQUEST_HEADERS) note='Downloading album data', errnote='Unable to download album data',
return merge_dicts(album, bio or {}) headers=self._REQUEST_HEADERS) or {}
return merge_dicts(album, bio)
else: else:
return album return album
def _extract_track(self, album_data, track_id=None, version_id=None): def _extract_track(self, album_data, track_id=None, version_id=None):
album_data = album_data or {}
if 'tracks' in album_data and 'track_sounds' in album_data: if 'tracks' in album_data and 'track_sounds' in album_data:
if not track_id and version_id: if not track_id and version_id:
track_id = traverse_obj(album_data['track_sounds'], track_id = traverse_obj(album_data['track_sounds'],
(lambda _, v: v['id'] == int(version_id), 'track_id', {int}), get_all=False) (lambda _, v: v['id'] == int(version_id), 'track_id', {int}), get_all=False)
track = traverse_obj(album_data['tracks'], if track := traverse_obj(album_data['tracks'],
(lambda _, v: v['id'] == int(track_id), {dict}), get_all=False) (lambda _, v: v['id'] == int(track_id), {dict}), get_all=False):
info = {**traverse_obj(track, { info = {**traverse_obj(track, {
'track': ('title', {str}), 'track': ('title', {str}),
'track_number': ('sort_order', {lambda v: v + 1}, {int}), 'track_number': ('sort_order', {lambda v: v + 1}, {int}),
@ -80,14 +77,13 @@ def _extract_track(self, album_data, track_id=None, version_id=None):
'width': ('width', {int_or_none}), 'width': ('width', {int_or_none}),
'height': ('height', {int_or_none}), 'height': ('height', {int_or_none}),
})) }))
for idx, sound_id in enumerate([version_id] if version_id else track['track_sound_ids']): if not self._REQUIRE_VERSION:
version_id = version_id or traverse_obj(track, 'default_track_sound_id', ('track_sound_ids', 0))
for sound_id in [version_id] if version_id else track['track_sound_ids']:
if sound := traverse_obj(album_data['track_sounds'], if sound := traverse_obj(album_data['track_sounds'],
(lambda _, v: v['id'] == int(sound_id) and v['track_id'] == int(track_id), (lambda _, v: v['id'] == int(sound_id) and v['track_id'] == int(track_id),
{dict}), get_all=False): {dict}), get_all=False):
if (version_id if version_id or any(x in self._REQUIRE_VERSION for x in ['all', sound['version_type'].lower()]):
or (not version_id and ((not self._version_requested and idx == 0)
or 'all' in self._version_requested
or sound['version_type'].lower() in self._version_requested))):
formats = [] formats = []
for audio_url in traverse_obj(sound, ('assets', 'audio', ('preview_url', for audio_url in traverse_obj(sound, ('assets', 'audio', ('preview_url',
'preview_url_hls'))): 'preview_url_hls'))):
@ -124,6 +120,8 @@ def _extract_track(self, album_data, track_id=None, version_id=None):
} }
elif len(entries) == 1: elif len(entries) == 1:
return entries[0] return entries[0]
else:
self.raise_no_formats('Track data not found', video_id=track_id)
return [] return []
@ -210,7 +208,7 @@ class ExtremeMusicIE(ExtremeMusicBaseIE):
def _real_extract(self, url): def _real_extract(self, url):
album_id, track_id, version_id = self._match_valid_url(url).group('album', 'id', 'ver') album_id, track_id, version_id = self._match_valid_url(url).group('album', 'id', 'ver')
self._set_request_headers(url, track_id or version_id, self.get_param('geo_bypass_country') or 'DE') self._initialize(url, track_id or version_id, self.get_param('geo_bypass_country') or 'DE')
album_data = self._get_album_data(album_id, track_id or version_id) album_data = self._get_album_data(album_id, track_id or version_id)
if result := self._extract_track(album_data, track_id, version_id): if result := self._extract_track(album_data, track_id, version_id):
return result return result
@ -243,7 +241,7 @@ class ExtremeMusicAIE(ExtremeMusicBaseIE):
def _real_extract(self, url): def _real_extract(self, url):
album_id = self._match_id(url) album_id = self._match_id(url)
self._set_request_headers(url, album_id, self.get_param('geo_bypass_country') or 'DE') self._initialize(url, album_id, self.get_param('geo_bypass_country') or 'DE')
album_data = self._get_album_data(album_id, album_id) album_data = self._get_album_data(album_id, album_id)
entries = [] entries = []
@ -297,7 +295,7 @@ class ExtremeMusicPIE(ExtremeMusicBaseIE):
def _real_extract(self, url): def _real_extract(self, url):
playlist_id = self._match_id(url) playlist_id = self._match_id(url)
headers = self._set_request_headers(url, playlist_id, self.get_param('geo_bypass_country')) self._initialize(url, playlist_id, self.get_param('geo_bypass_country'))
def playlist_query(playlist_id, offset, limit): def playlist_query(playlist_id, offset, limit):
# playlist api: https://snapi.extrememusic.com/playlists?id={playlist_id}&range={offset}%2C{limit}' # playlist api: https://snapi.extrememusic.com/playlists?id={playlist_id}&range={offset}%2C{limit}'
@ -306,7 +304,7 @@ def playlist_query(playlist_id, offset, limit):
note=f'Downloading item {offset + 1}-{offset + limit}', query={ note=f'Downloading item {offset + 1}-{offset + limit}', query={
'id': playlist_id, 'id': playlist_id,
'range': f'{offset},{limit}', 'range': f'{offset},{limit}',
}, headers=headers) }, headers=self._REQUEST_HEADERS)
thumbnails, entries = [], [] thumbnails, entries = [], []
album_data, track_done, limit = {}, [], 50 album_data, track_done, limit = {}, [], 50
@ -329,11 +327,11 @@ def playlist_query(playlist_id, offset, limit):
else: else:
entries.append(track) entries.append(track)
track_done.append(track_id) track_done.append(track_id)
if len(entries) >= playlist['playlist']['playlist_items_count']: if len(track_done) >= playlist['playlist']['playlist_items_count']:
break break
if entries: if entries:
if len(entries) < playlist['playlist']['playlist_items_count']: if len(track_done) < playlist['playlist']['playlist_items_count']:
self.report_warning('This playlist has geo-restricted items. Try using --xff to specify a different country code.') self.report_warning('This playlist has geo-restricted items. Try using --xff to specify a different country code.')
for image in traverse_obj(playlist['playlist'], ('images', 'square')): for image in traverse_obj(playlist['playlist'], ('images', 'square')):