diff --git a/test/test_downloader_external.py b/test/test_downloader_external.py index 62f7d45d4..c610ba012 100644 --- a/test/test_downloader_external.py +++ b/test/test_downloader_external.py @@ -120,19 +120,22 @@ def test_make_cmd(self): downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'}) self.assertEqual(self._args, [ 'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/', - '-c', 'copy', '-f', 'mp4', 'file:test']) + '-c', 'copy', '-f', 'mp4', 'file:test', '-progress', 'pipe:1']) # Test cookies arg is added ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'}) self.assertEqual(self._args, [ - 'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n', - '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test']) + 'ffmpeg', '-y', '-hide_banner', '-cookies', + 'test=ytdlp; path=/; domain=.example.com;\r\n', '-i', + 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', + 'file:test', '-progress', 'pipe:1']) # Test with non-url input (ffmpeg reads from stdin '-' for websockets) downloader._call_downloader('test', {'url': 'x', 'ext': 'mp4'}) self.assertEqual(self._args, [ - 'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', 'mp4', 'file:test']) + 'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', + 'mp4', 'file:test', '-progress', 'pipe:1']) if __name__ == '__main__': diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py index b71d7ee8f..1c7f13106 100644 --- a/yt_dlp/downloader/common.py +++ b/yt_dlp/downloader/common.py @@ -15,6 +15,7 @@ from ..utils import ( IDENTITY, NO_DEFAULT, + FormatProgressInfos, LockingUnsupportedError, Namespace, RetryManager, @@ -25,11 +26,9 @@ format_bytes, join_nonempty, parse_bytes, - remove_start, sanitize_open, shell_quote, timeconvert, - timetuple_from_msec, try_call, ) @@ -115,56 +114,6 @@ def to_screen(self, *args, **kargs): def FD_NAME(cls): return re.sub(r'(?<=[a-z])(?=[A-Z])', '_', cls.__name__[:-2]).lower() - @staticmethod - def format_seconds(seconds): - if seconds is None: - return ' Unknown' - time = timetuple_from_msec(seconds * 1000) - if time.hours > 99: - return '--:--:--' - return '%02d:%02d:%02d' % time[:-1] - - @classmethod - def format_eta(cls, seconds): - return f'{remove_start(cls.format_seconds(seconds), "00:"):>8s}' - - @staticmethod - def calc_percent(byte_counter, data_len): - if data_len is None: - return None - return float(byte_counter) / float(data_len) * 100.0 - - @staticmethod - def format_percent(percent): - return ' N/A%' if percent is None else f'{percent:>5.1f}%' - - @classmethod - def calc_eta(cls, start_or_rate, now_or_remaining, total=NO_DEFAULT, current=NO_DEFAULT): - if total is NO_DEFAULT: - rate, remaining = start_or_rate, now_or_remaining - if None in (rate, remaining): - return None - return int(float(remaining) / rate) - - start, now = start_or_rate, now_or_remaining - if total is None: - return None - if now is None: - now = time.time() - rate = cls.calc_speed(start, now, current) - return rate and int((float(total) - float(current)) / rate) - - @staticmethod - def calc_speed(start, now, bytes): - dif = now - start - if bytes == 0 or dif < 0.001: # One millisecond - return None - return float(bytes) / dif - - @staticmethod - def format_speed(speed): - return ' Unknown B/s' if speed is None else f'{format_bytes(speed):>10s}/s' - @staticmethod def format_retries(retries): return 'inf' if retries == float('inf') else int(retries) @@ -343,18 +292,16 @@ def with_fields(*tups, default=''): return tmpl return default - _format_bytes = lambda k: f'{format_bytes(s.get(k)):>10s}' - if s['status'] == 'finished': if self.params.get('noprogress'): self.to_screen('[download] Download completed') speed = try_call(lambda: s['total_bytes'] / s['elapsed']) s.update({ 'speed': speed, - '_speed_str': self.format_speed(speed).strip(), - '_total_bytes_str': _format_bytes('total_bytes'), - '_elapsed_str': self.format_seconds(s.get('elapsed')), - '_percent_str': self.format_percent(100), + '_speed_str': FormatProgressInfos.format_speed(speed).strip(), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), + '_percent_str': FormatProgressInfos.format_percent(100), }) self._report_progress_status(s, join_nonempty( '100%%', @@ -367,16 +314,16 @@ def with_fields(*tups, default=''): return s.update({ - '_eta_str': self.format_eta(s.get('eta')).strip(), - '_speed_str': self.format_speed(s.get('speed')), - '_percent_str': self.format_percent(try_call( + '_eta_str': FormatProgressInfos.format_eta(s.get('eta')), + '_speed_str': FormatProgressInfos.format_speed(s.get('speed')), + '_percent_str': FormatProgressInfos.format_percent(try_call( lambda: 100 * s['downloaded_bytes'] / s['total_bytes'], lambda: 100 * s['downloaded_bytes'] / s['total_bytes_estimate'], lambda: s['downloaded_bytes'] == 0 and 0)), - '_total_bytes_str': _format_bytes('total_bytes'), - '_total_bytes_estimate_str': _format_bytes('total_bytes_estimate'), - '_downloaded_bytes_str': _format_bytes('downloaded_bytes'), - '_elapsed_str': self.format_seconds(s.get('elapsed')), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_total_bytes_estimate_str': format_bytes(s.get('total_bytes_estimate')), + '_downloaded_bytes_str': format_bytes(s.get('downloaded_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), }) msg_template = with_fields( diff --git a/yt_dlp/downloader/external.py b/yt_dlp/downloader/external.py index 4ce8a3bf7..0a8464800 100644 --- a/yt_dlp/downloader/external.py +++ b/yt_dlp/downloader/external.py @@ -3,7 +3,6 @@ import os import re import subprocess -import sys import tempfile import time import uuid @@ -11,7 +10,11 @@ from .fragment import FragmentFD from ..compat import functools from ..networking import Request -from ..postprocessor.ffmpeg import EXT_TO_OUT_FORMATS, FFmpegPostProcessor +from ..postprocessor.ffmpeg import ( + EXT_TO_OUT_FORMATS, + FFmpegPostProcessor, + FFmpegProgressTracker, +) from ..utils import ( Popen, RetryManager, @@ -619,26 +622,23 @@ def _call_downloader(self, tmpfilename, info_dict): args = [encodeArgument(opt) for opt in args] args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True)) + args += ['-progress', 'pipe:1'] self._debug_cmd(args) piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats) - with Popen(args, stdin=subprocess.PIPE, env=env) as proc: - if piped: - self.on_process_started(proc, proc.stdin) - try: - retval = proc.wait() - except BaseException as e: - # subprocces.run would send the SIGKILL signal to ffmpeg and the - # mp4 file couldn't be played, but if we ask ffmpeg to quit it - # produces a file that is playable (this is mostly useful for live - # streams). Note that Windows is not affected and produces playable - # files (see https://github.com/ytdl-org/youtube-dl/issues/8300). - if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32' and not piped: - proc.communicate_or_kill(b'q') - else: - proc.kill(timeout=None) - raise - return retval + self._debug_cmd(args) + ffmpeg_progress_tracker = FFmpegProgressTracker(info_dict, args, self._ffmpeg_hook) + proc = ffmpeg_progress_tracker.ffmpeg_proc + if piped: + self.on_process_started(proc, proc.stdin) + _, _, return_code = ffmpeg_progress_tracker.run_ffmpeg_subprocess() + return return_code + + def _ffmpeg_hook(self, status, info_dict): + status['downloaded_bytes'] = status.get('outputted', 0) + if status.get('status') == 'ffmpeg_running': + status['status'] = 'downloading' + self._hook_progress(status, info_dict) class AVconvFD(FFmpegFD): diff --git a/yt_dlp/downloader/http.py b/yt_dlp/downloader/http.py index f5237443e..a1f95db29 100644 --- a/yt_dlp/downloader/http.py +++ b/yt_dlp/downloader/http.py @@ -11,6 +11,7 @@ ) from ..utils import ( ContentTooShortError, + FormatProgressInfos, RetryManager, ThrottledDownload, XAttrMetadataError, @@ -293,11 +294,11 @@ def retry(e): before = after # Progress message - speed = self.calc_speed(start, now, byte_counter - ctx.resume_len) + speed = FormatProgressInfos.calc_speed(start, now, byte_counter - ctx.resume_len) if ctx.data_len is None: eta = None else: - eta = self.calc_eta(start, time.time(), ctx.data_len - ctx.resume_len, byte_counter - ctx.resume_len) + eta = FormatProgressInfos.calc_eta(start, time.time(), ctx.data_len - ctx.resume_len, byte_counter - ctx.resume_len) self._hook_progress({ 'status': 'downloading', diff --git a/yt_dlp/downloader/rtmp.py b/yt_dlp/downloader/rtmp.py index 0e0952599..9373f3585 100644 --- a/yt_dlp/downloader/rtmp.py +++ b/yt_dlp/downloader/rtmp.py @@ -5,6 +5,7 @@ from .common import FileDownloader from ..utils import ( + FormatProgressInfos, Popen, check_executable, encodeArgument, @@ -50,8 +51,8 @@ def run_rtmpdump(args): resume_percent = percent resume_downloaded_data_len = downloaded_data_len time_now = time.time() - eta = self.calc_eta(start, time_now, 100 - resume_percent, percent - resume_percent) - speed = self.calc_speed(start, time_now, downloaded_data_len - resume_downloaded_data_len) + eta = FormatProgressInfos.calc_eta(start, time_now, 100 - resume_percent, percent - resume_percent) + speed = FormatProgressInfos.calc_speed(start, time_now, downloaded_data_len - resume_downloaded_data_len) data_len = None if percent > 0: data_len = int(downloaded_data_len * 100 / percent) diff --git a/yt_dlp/postprocessor/common.py b/yt_dlp/postprocessor/common.py index 8cef86c43..371164863 100644 --- a/yt_dlp/postprocessor/common.py +++ b/yt_dlp/postprocessor/common.py @@ -1,15 +1,27 @@ import functools import json import os +import sys +from ..minicurses import ( + BreaklineStatusPrinter, + MultilineLogger, + MultilinePrinter, + QuietMultilinePrinter, +) from ..networking import Request from ..networking.exceptions import HTTPError, network_exceptions from ..utils import ( + FormatProgressInfos, + Namespace, PostProcessingError, RetryManager, _configuration_args, deprecation_warning, encodeFilename, + format_bytes, + join_nonempty, + try_call, ) @@ -56,7 +68,9 @@ def __init__(self, downloader=None): self._progress_hooks = [] self.add_progress_hook(self.report_progress) self.set_downloader(downloader) + self._out_files = self.set_out_files() self.PP_NAME = self.pp_key() + self._prepare_multiline_status() @classmethod def pp_key(cls): @@ -102,6 +116,11 @@ def get_param(self, name, default=None, *args, **kwargs): return self._downloader.params.get(name, default, *args, **kwargs) return default + def set_out_files(self): + if not self._downloader: + return None + return getattr(self._downloader, '_out_files', None) or self._downloader.ydl._out_files + def set_downloader(self, downloader): """Sets the downloader for this PP.""" self._downloader = downloader @@ -173,25 +192,116 @@ def add_progress_hook(self, ph): # See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface self._progress_hooks.append(ph) - def report_progress(self, s): - s['_default_template'] = '%(postprocessor)s %(status)s' % s - if not self._downloader: - return + def report_destination(self, filename): + """Report destination filename.""" + self.to_screen('[processing] Destination: ' + filename) + + def _prepare_multiline_status(self, lines=1): + if self._downloader: + if self._downloader.params.get('noprogress'): + self._multiline = QuietMultilinePrinter() + elif self._downloader.params.get('logger'): + self._multiline = MultilineLogger(self._downloader.params['logger'], lines) + elif self._downloader.params.get('progress_with_newline'): + self._multiline = BreaklineStatusPrinter(self._downloader._out_files.out, lines) + elif hasattr(self._downloader, "_out_files"): + self._multiline = MultilinePrinter(self._downloader._out_files.out, lines, not self._downloader.params.get('quiet')) + else: + self._multiline = MultilinePrinter(sys.stdout, lines, not self._downloader.params.get('quiet')) + self._multiline.allow_colors = self._multiline._HAVE_FULLCAP and not self._downloader.params.get('no_color') + else: + self._multiline = MultilinePrinter(sys.stdout, lines, True) + self._multiline.allow_colors = self._multiline._HAVE_FULLCAP + + def _finish_multiline_status(self): + self._multiline.end() + + ProgressStyles = Namespace( + processed_bytes='light blue', + percent='light blue', + eta='yellow', + speed='green', + elapsed='bold white', + total_bytes='', + total_bytes_estimate='', + ) + + def _report_progress_status(self, s, default_template): + for name, style in self.ProgressStyles.items_: + name = f'_{name}_str' + if name not in s: + continue + s[name] = self._format_progress(s[name], style) + s['_default_template'] = default_template % s progress_dict = s.copy() progress_dict.pop('info_dict') progress_dict = {'info': s['info_dict'], 'progress': progress_dict} - progress_template = self.get_param('progress_template', {}) - tmpl = progress_template.get('postprocess') - if tmpl: - self._downloader.to_screen( - self._downloader.evaluate_outtmpl(tmpl, progress_dict), quiet=False) - + progress_template = self._downloader.params.get('progress_template', {}) + self._multiline.print_at_line(self._downloader.evaluate_outtmpl( + progress_template.get('process') or '[processing] %(progress._default_template)s', + progress_dict), s.get('progress_idx') or 0) self._downloader.to_console_title(self._downloader.evaluate_outtmpl( - progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s', + progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s', progress_dict)) + def _format_progress(self, *args, **kwargs): + return self._downloader._format_text( + self._multiline.stream, self._multiline.allow_colors, *args, **kwargs) + + def report_progress(self, s): + def with_fields(*tups, default=''): + for *fields, tmpl in tups: + if all(s.get(f) is not None for f in fields): + return tmpl + return default + + if not self._downloader: + return + + if s['status'] == 'finished': + if self._downloader.params.get('noprogress'): + self.to_screen('[processing] Download completed') + speed = try_call(lambda: s['total_bytes'] / s['elapsed']) + s.update({ + 'speed': speed, + '_speed_str': FormatProgressInfos.format_speed(speed).strip(), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), + '_percent_str': FormatProgressInfos.format_percent(100), + }) + self._report_progress_status(s, join_nonempty( + '100%%', + with_fields(('total_bytes', 'of %(_total_bytes_str)s')), + with_fields(('elapsed', 'in %(_elapsed_str)s')), + with_fields(('speed', 'at %(_speed_str)s')), + delim=' ')) + + if s['status'] != 'processing': + return + + s.update({ + '_eta_str': FormatProgressInfos.format_eta(s.get('eta')), + '_speed_str': FormatProgressInfos.format_speed(s.get('speed')), + '_percent_str': FormatProgressInfos.format_percent(try_call( + lambda: 100 * s['processed_bytes'] / s['total_bytes'], + lambda: 100 * s['processed_bytes'] / s['total_bytes_estimate'], + lambda: s['processed_bytes'] == 0 and 0)), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_total_bytes_estimate_str': format_bytes(s.get('total_bytes_estimate')), + '_processed_bytes_str': format_bytes(s.get('processed_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), + }) + + msg_template = with_fields( + ('total_bytes', '%(_percent_str)s of %(_total_bytes_str)s at %(_speed_str)s ETA %(_eta_str)s'), + ('processed_bytes', 'elapsed', '%(_processed_bytes_str)s at %(_speed_str)s (%(_elapsed_str)s)'), + ('processed_bytes', '%(_processed_bytes_str)s at %(_speed_str)s'), + default='%(_percent_str)s at %(_speed_str)s ETA %(_eta_str)s') + + self._report_progress_status(s, msg_template) + def _retry_download(self, err, count, retries): # While this is not an extractor, it behaves similar to one and # so obey extractor_retries and "--retry-sleep extractor" diff --git a/yt_dlp/postprocessor/ffmpeg.py b/yt_dlp/postprocessor/ffmpeg.py index 323f4303c..fa11bb94d 100644 --- a/yt_dlp/postprocessor/ffmpeg.py +++ b/yt_dlp/postprocessor/ffmpeg.py @@ -5,7 +5,10 @@ import os import re import subprocess +import sys import time +from queue import Queue +from threading import Thread from .common import PostProcessor from ..compat import functools, imghdr @@ -23,6 +26,7 @@ encodeFilename, filter_dict, float_or_none, + int_or_none, is_outdated_version, orderedSet, prepend_extension, @@ -326,11 +330,9 @@ def _duration_mismatch(self, d1, d2, tolerance=2): return abs(d1 - d2) > tolerance def run_ffmpeg_multiple_files(self, input_paths, out_path, opts, **kwargs): - return self.real_run_ffmpeg( - [(path, []) for path in input_paths], - [(out_path, opts)], **kwargs) + return self.real_run_ffmpeg([(path, []) for path in input_paths], [(out_path, opts)], **kwargs) - def real_run_ffmpeg(self, input_path_opts, output_path_opts, *, expected_retcodes=(0,)): + def real_run_ffmpeg(self, input_path_opts, output_path_opts, *, expected_retcodes=(0,), info_dict=None): self.check_version() oldest_mtime = min( @@ -350,19 +352,20 @@ def make_args(file, args, name, number): args += self._configuration_args(self.basename, keys) if name == 'i': args.append('-i') - return ( - [encodeArgument(arg) for arg in args] - + [encodeFilename(self._ffmpeg_filename_argument(file), True)]) + return [encodeArgument(arg) for arg in args] + [encodeFilename(self._ffmpeg_filename_argument(file), True)] for arg_type, path_opts in (('i', input_path_opts), ('o', output_path_opts)): cmd += itertools.chain.from_iterable( make_args(path, list(opts), arg_type, i + 1) for i, (path, opts) in enumerate(path_opts) if path) + cmd += ['-progress', 'pipe:1'] self.write_debug('ffmpeg command line: %s' % shell_quote(cmd)) - _, stderr, returncode = Popen.run( - cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) - if returncode not in variadic(expected_retcodes): + + ffmpeg_progress_tracker = FFmpegProgressTracker(info_dict, cmd, self._ffmpeg_hook, self._downloader) + _, stderr, return_code = ffmpeg_progress_tracker.run_ffmpeg_subprocess() + if return_code not in variadic(expected_retcodes): + stderr = stderr.strip() self.write_debug(stderr) raise FFmpegPostProcessorError(stderr.strip().splitlines()[-1]) for out_path, _ in output_path_opts: @@ -370,7 +373,7 @@ def make_args(file, args, name, number): self.try_utime(out_path, oldest_mtime, oldest_mtime) return stderr - def run_ffmpeg(self, path, out_path, opts, **kwargs): + def run_ffmpeg(self, path, out_path, opts, informations=None, **kwargs): return self.run_ffmpeg_multiple_files([path], out_path, opts, **kwargs) @staticmethod @@ -435,6 +438,12 @@ def _concat_spec(cls, in_files, concat_opts=None): if directive in opts: yield f'{directive} {opts[directive]}\n' + def _ffmpeg_hook(self, status, info_dict): + status['processed_bytes'] = status.get('outputted', 0) + if status.get('status') == 'ffmpeg_running': + status['status'] = 'processing' + self._hook_progress(status, info_dict) + class FFmpegExtractAudioPP(FFmpegPostProcessor): COMMON_AUDIO_EXTS = MEDIA_EXTENSIONS.common_audio + ('wma', ) @@ -469,14 +478,14 @@ def _quality_args(self, codec): return ['-vbr', f'{int(q)}'] return ['-q:a', f'{q}'] - def run_ffmpeg(self, path, out_path, codec, more_opts): + def run_ffmpeg(self, path, out_path, codec, more_opts, informations=None): if codec is None: acodec_opts = [] else: acodec_opts = ['-acodec', codec] opts = ['-vn'] + acodec_opts + more_opts try: - FFmpegPostProcessor.run_ffmpeg(self, path, out_path, opts) + FFmpegPostProcessor.run_ffmpeg(self, path, out_path, opts, informations) except FFmpegPostProcessorError as err: raise PostProcessingError(f'audio conversion failed: {err.msg}') @@ -527,7 +536,7 @@ def run(self, information): return [], information self.to_screen(f'Destination: {new_path}') - self.run_ffmpeg(path, temp_path, acodec, more_opts) + self.run_ffmpeg(path, temp_path, acodec, more_opts, information) os.replace(path, orig_path) os.replace(temp_path, new_path) @@ -570,7 +579,7 @@ def run(self, info): outpath = replace_extension(filename, target_ext, source_ext) self.to_screen(f'{self._ACTION.title()} video from {source_ext} to {target_ext}; Destination: {outpath}') - self.run_ffmpeg(filename, outpath, self._options(target_ext)) + self.run_ffmpeg(filename, outpath, self._options(target_ext), info) info['filepath'] = outpath info['format'] = info['ext'] = target_ext @@ -834,7 +843,7 @@ def run(self, info): if fmt.get('vcodec') != 'none': args.extend(['-map', '%u:v:0' % (i)]) self.to_screen('Merging formats into "%s"' % filename) - self.run_ffmpeg_multiple_files(info['__files_to_merge'], temp_filename, args) + self.run_ffmpeg_multiple_files(info['__files_to_merge'], temp_filename, args, info_dict=info) os.rename(encodeFilename(temp_filename), encodeFilename(filename)) return info['__files_to_merge'], info @@ -1005,7 +1014,7 @@ def run(self, info): else: sub_filenames.append(srt_file) - self.run_ffmpeg(old_file, new_file, ['-f', new_format]) + self.run_ffmpeg(old_file, new_file, ['-f', new_format], info) with open(new_file, encoding='utf-8') as f: subs[lang] = { @@ -1188,3 +1197,237 @@ def run(self, info): 'ext': ie_copy['ext'], }] return files_to_delete, info + + +class FFmpegProgressTracker: + def __init__(self, info_dict, ffmpeg_args, hook_progress, ydl=None): + self.ydl = ydl + self._info_dict = info_dict + self._ffmpeg_args = ffmpeg_args + self._hook_progress = hook_progress + self._stdout_queue, self._stderr_queue = Queue(), Queue() + self._streams, self._stderr_buffer, self._stdout_buffer = ['', ''], '', '' + self._progress_pattern = re.compile(r'''(?x) + (?: + frame=\s(?P\S+)\n + fps=\s(?P\S+)\n + stream\d+_\d+_q=\s(?P\S+)\n + )? + bitrate=\s(?P\S+)\n + total_size=\s(?P\S+)\n + out_time_us=\s(?P\S+)\n + out_time_ms=\s(?P\S+)\n + out_time=\s(?P\S+)\n + dup_frames=\s(?P\S+)\n + drop_frames=\s(?P\S+)\n + speed=\s(?P\S+)\n + progress=\s(?P\S+) + ''') + + if self.ydl: + self.ydl.write_debug(f'ffmpeg command line: {shell_quote(self._ffmpeg_args)}') + self.ffmpeg_proc = Popen(self._ffmpeg_args, universal_newlines=True, + encoding='utf8', stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self._start_time = time.time() + + def trigger_progress_hook(self, dct): + self._status.update(dct) + self._hook_progress(self._status, self._info_dict) + + def run_ffmpeg_subprocess(self): + if self._info_dict and self.ydl: + return self._track_ffmpeg_progress() + return self._run_ffmpeg_without_progress_tracking() + + def _run_ffmpeg_without_progress_tracking(self): + """Simply run ffmpeg and only care about the last stderr, stdout and the retcode""" + stdout, stderr = self.ffmpeg_proc.communicate_or_kill() + retcode = self.ffmpeg_proc.returncode + return stdout, stderr, retcode + + def _track_ffmpeg_progress(self): + """ Track ffmpeg progress in a non blocking way using queues""" + self._start_time = time.time() + # args needed to track ffmpeg progress from stdout + self._duration_to_track, self._total_duration = self._compute_duration_to_track() + self._total_filesize = self._compute_total_filesize(self._duration_to_track, self._total_duration) + self._status = { + 'filename': self._ffmpeg_args[-3].split(":")[-1], + 'status': 'ffmpeg_running', + 'total_bytes': self._total_filesize, + 'elapsed': 0, + 'outputted': 0 + } + + out_listener = Thread( + target=self._enqueue_lines, + args=(self.ffmpeg_proc.stdout, self._stdout_queue), + daemon=True, + ) + err_listener = Thread( + target=self._enqueue_lines, + args=(self.ffmpeg_proc.stderr, self._stderr_queue), + daemon=True, + ) + out_listener.start() + err_listener.start() + + retcode = self._wait_for_ffmpeg() + + self._status.update({ + 'status': 'finished', + 'outputted': self._total_filesize + }) + time.sleep(.5) # Needed if ffmpeg didn't release the file in time for yt-dlp to change its name + return self._streams[0], self._streams[1], retcode + + @staticmethod + def _enqueue_lines(out, queue): + for line in iter(out.readline, ''): + queue.put(line.rstrip()) + out.close() + + def _save_stream(self, lines, to_stderr=False): + if not lines: + return + self._streams[to_stderr] += lines + + self.ydl.to_screen('\r', skip_eol=True) + for msg in lines.splitlines(): + if msg.strip(): + self.ydl.write_debug(f'ffmpeg: {msg}') + + def _handle_lines(self): + if not self._stdout_queue.empty(): + stdout_line = self._stdout_queue.get_nowait() + self._stdout_buffer += stdout_line + '\n' + self._parse_ffmpeg_output() + + if not self._stderr_queue.empty(): + stderr_line = self._stderr_queue.get_nowait() + self._stderr_buffer += stderr_line + + def _wait_for_ffmpeg(self): + retcode = self.ffmpeg_proc.poll() + while retcode is None: + time.sleep(.01) + self._handle_lines() + self._status.update({ + 'elapsed': time.time() - self._start_time + }) + self._hook_progress(self._status, self._info_dict) + retcode = self.ffmpeg_proc.poll() + return retcode + + def _parse_ffmpeg_output(self): + ffmpeg_prog_infos = re.match(self._progress_pattern, self._stdout_buffer) + if not ffmpeg_prog_infos: + return + eta_seconds = self._compute_eta(ffmpeg_prog_infos, self._duration_to_track) + bitrate_int = self._compute_bitrate(ffmpeg_prog_infos.group('bitrate')) + # Not using ffmpeg 'total_size' value as it's imprecise and gives progress percentage over 100 + out_time_second = int_or_none(ffmpeg_prog_infos.group('out_time_us')) // 1_000_000 + try: + outputted_bytes_int = int_or_none(out_time_second / self._duration_to_track * self._total_filesize) + except ZeroDivisionError: + outputted_bytes_int = 0 + self._status.update({ + 'outputted': outputted_bytes_int, + 'speed': bitrate_int, + 'eta': eta_seconds, + }) + self._hook_progress(self._status, self._info_dict) + self._stderr_buffer = re.sub(r'=\s+', '=', self._stderr_buffer) + print(self._stdout_buffer, file=sys.stdout, end='') + print(self._stderr_buffer, file=sys.stderr) + self._stdout_buffer = '' + self._stderr_buffer = '' + + def _compute_total_filesize(self, duration_to_track, total_duration): + if not total_duration: + return 0 + filesize = self._info_dict.get('filesize') + if not filesize: + filesize = self._info_dict.get('filesize_approx', 0) + total_filesize = filesize * duration_to_track // total_duration + return total_filesize + + def _compute_duration_to_track(self): + duration = self._info_dict.get('duration') + if not duration: + return 0, 0 + + start_time, end_time = 0, duration + for i, arg in enumerate(self._ffmpeg_args[:-1]): + next_arg_is_a_timestamp = re.match(r'(?P(-ss|-sseof|-to))', arg) + this_arg_is_a_timestamp = re.match(r'(?P(-ss|-sseof|-to))=(?P\d+)', arg) + if not (next_arg_is_a_timestamp or this_arg_is_a_timestamp): + continue + elif next_arg_is_a_timestamp: + timestamp_seconds = self.ffmpeg_time_string_to_seconds(self._ffmpeg_args[i + 1]) + else: + timestamp_seconds = self.ffmpeg_time_string_to_seconds(this_arg_is_a_timestamp.group('timestamp')) + if next_arg_is_a_timestamp.group('at') == '-ss': + start_time = timestamp_seconds + elif next_arg_is_a_timestamp.group('at') == '-sseof': + start_time = end_time - timestamp_seconds + elif next_arg_is_a_timestamp.group('at') == '-to': + end_time = timestamp_seconds + + duration_to_track = end_time - start_time + if duration_to_track >= 0: + return duration_to_track, duration + return 0, duration + + @staticmethod + def _compute_eta(ffmpeg_prog_infos, duration_to_track): + try: + speed = float_or_none(ffmpeg_prog_infos.group('speed')[:-1]) + out_time_second = int_or_none(ffmpeg_prog_infos.group('out_time_us')) // 1_000_000 + eta_seconds = (duration_to_track - out_time_second) // speed + except (TypeError, ZeroDivisionError): + eta_seconds = 0 + return eta_seconds + + @staticmethod + def ffmpeg_time_string_to_seconds(time_string): + ffmpeg_time_seconds = 0 + hms_parsed = re.match(r"((?P\d+):)?((?P\d+):)?(?P\d+)(\.(?P\d+))?", time_string) + smu_parse = re.match(r"(?P