[postprocessor,cleanup] Create _download_json

This commit is contained in:
pukkandan 2022-03-25 08:31:45 +05:30
parent ae72962643
commit a3f2445e29
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39
2 changed files with 31 additions and 28 deletions

View file

@ -1,13 +1,18 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import functools import functools
import itertools
import json
import os import os
import time
import urllib.error
from ..compat import compat_str
from ..utils import ( from ..utils import (
_configuration_args, _configuration_args,
encodeFilename, encodeFilename,
network_exceptions,
PostProcessingError, PostProcessingError,
sanitized_Request,
write_string, write_string,
) )
@ -63,7 +68,7 @@ def __init__(self, downloader=None):
@classmethod @classmethod
def pp_key(cls): def pp_key(cls):
name = cls.__name__[:-2] name = cls.__name__[:-2]
return compat_str(name[6:]) if name[:6].lower() == 'ffmpeg' else name return name[6:] if name[:6].lower() == 'ffmpeg' else name
def to_screen(self, text, prefix=True, *args, **kwargs): def to_screen(self, text, prefix=True, *args, **kwargs):
tag = '[%s] ' % self.PP_NAME if prefix else '' tag = '[%s] ' % self.PP_NAME if prefix else ''
@ -180,6 +185,28 @@ def report_progress(self, s):
progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s', progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
progress_dict)) progress_dict))
def _download_json(self, url, *, expected_http_errors=(404,)):
# While this is not an extractor, it behaves similar to one and
# so obey extractor_retries and sleep_interval_requests
max_retries = self.get_param('extractor_retries', 3)
sleep_interval = self.get_param('sleep_interval_requests') or 0
self.write_debug(f'{self.PP_NAME} query: {url}')
for retries in itertools.count():
try:
rsp = self._downloader.urlopen(sanitized_Request(url))
return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8'))
except network_exceptions as e:
if isinstance(e, urllib.error.HTTPError) and e.code in expected_http_errors:
return None
if retries < max_retries:
self.report_warning(f'{e}. Retrying...')
if sleep_interval > 0:
self.to_screen(f'Sleeping {sleep_interval} seconds ...')
time.sleep(sleep_interval)
continue
raise PostProcessingError(f'Unable to communicate with {self.PP_NAME} API: {e}')
class AudioConversionError(PostProcessingError): class AudioConversionError(PostProcessingError):
pass pass

View file

@ -1,12 +1,9 @@
from hashlib import sha256 from hashlib import sha256
import itertools
import json import json
import re import re
import time
from .ffmpeg import FFmpegPostProcessor from .ffmpeg import FFmpegPostProcessor
from ..compat import compat_urllib_parse_urlencode, compat_HTTPError from ..compat import compat_urllib_parse_urlencode
from ..utils import PostProcessingError, network_exceptions, sanitized_Request
class SponsorBlockPP(FFmpegPostProcessor): class SponsorBlockPP(FFmpegPostProcessor):
@ -94,28 +91,7 @@ def _get_sponsor_segments(self, video_id, service):
'categories': json.dumps(self._categories), 'categories': json.dumps(self._categories),
'actionTypes': json.dumps(['skip', 'poi']) 'actionTypes': json.dumps(['skip', 'poi'])
}) })
self.write_debug(f'SponsorBlock query: {url}') for d in self._download_json(url) or []:
for d in self._get_json(url):
if d['videoID'] == video_id: if d['videoID'] == video_id:
return d['segments'] return d['segments']
return [] return []
def _get_json(self, url):
# While this is not an extractor, it behaves similar to one and
# so obey extractor_retries and sleep_interval_requests
max_retries = self.get_param('extractor_retries', 3)
sleep_interval = self.get_param('sleep_interval_requests') or 0
for retries in itertools.count():
try:
rsp = self._downloader.urlopen(sanitized_Request(url))
return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8'))
except network_exceptions as e:
if isinstance(e, compat_HTTPError) and e.code == 404:
return []
if retries < max_retries:
self.report_warning(f'{e}. Retrying...')
if sleep_interval > 0:
self.to_screen(f'Sleeping {sleep_interval} seconds ...')
time.sleep(sleep_interval)
continue
raise PostProcessingError(f'Unable to communicate with SponsorBlock API: {e}')