From bcec568ea7556d122b79bf802b1ac3ff814531f6 Mon Sep 17 00:00:00 2001 From: Kenshin Date: Mon, 24 Jan 2022 22:47:14 +0100 Subject: [PATCH] Add ffmpeg progress tracking to FFmpegFD Add ffmpeg progress tracking to FFmpegPostProcessor Apply changes from the code review Fix a bug where the subprocess didn't capture any output thus an empty stdout and stderr were sent back Add missing hooks Revert "Add missing hooks" This reverts commit a359c5ea10bb35b965e80801e736f43cdbcf3294. Add support of -ss=132 timestamp format Infer filename from ffmpeg args instead of info_dic Remove redundant parenthesis and switch from to_stodout to to_screen Add info kwarg with multiple files and ffmpeg to track progress Moved format progress function to util Moved format progress function to util Add progress tracking to postprocessing operations Fix typing error Handle self._downloader is None at __init__ Move format progress functions to utils Move format progress functions to utils Handle case where ydl passed is None Handle case where ydl passed is None Handle case where _multiline isn't initialized Handle case where _multiline isn't initialized Fix streams incorrectly returned Fix case where ydl is nested in the downloader Add progress_hook attribute Fix bug after merge Fix import bugs after merge Catch up with upstream Fix merge errors #1 Adapt tests and implementatation for ffmpeg progress tracking args --- test/test_downloader_external.py | 11 +- yt_dlp/downloader/common.py | 77 ++------- yt_dlp/downloader/external.py | 38 ++--- yt_dlp/downloader/http.py | 5 +- yt_dlp/downloader/rtmp.py | 5 +- yt_dlp/postprocessor/common.py | 132 +++++++++++++-- yt_dlp/postprocessor/ffmpeg.py | 277 +++++++++++++++++++++++++++++-- yt_dlp/utils/_utils.py | 48 ++++++ 8 files changed, 473 insertions(+), 120 deletions(-) diff --git a/test/test_downloader_external.py b/test/test_downloader_external.py index 62f7d45d4..c610ba012 100644 --- a/test/test_downloader_external.py +++ b/test/test_downloader_external.py @@ -120,19 +120,22 @@ def test_make_cmd(self): downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'}) self.assertEqual(self._args, [ 'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/', - '-c', 'copy', '-f', 'mp4', 'file:test']) + '-c', 'copy', '-f', 'mp4', 'file:test', '-progress', 'pipe:1']) # Test cookies arg is added ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'}) self.assertEqual(self._args, [ - 'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n', - '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test']) + 'ffmpeg', '-y', '-hide_banner', '-cookies', + 'test=ytdlp; path=/; domain=.example.com;\r\n', '-i', + 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', + 'file:test', '-progress', 'pipe:1']) # Test with non-url input (ffmpeg reads from stdin '-' for websockets) downloader._call_downloader('test', {'url': 'x', 'ext': 'mp4'}) self.assertEqual(self._args, [ - 'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', 'mp4', 'file:test']) + 'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', + 'mp4', 'file:test', '-progress', 'pipe:1']) if __name__ == '__main__': diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py index b71d7ee8f..1c7f13106 100644 --- a/yt_dlp/downloader/common.py +++ b/yt_dlp/downloader/common.py @@ -15,6 +15,7 @@ from ..utils import ( IDENTITY, NO_DEFAULT, + FormatProgressInfos, LockingUnsupportedError, Namespace, RetryManager, @@ -25,11 +26,9 @@ format_bytes, join_nonempty, parse_bytes, - remove_start, sanitize_open, shell_quote, timeconvert, - timetuple_from_msec, try_call, ) @@ -115,56 +114,6 @@ def to_screen(self, *args, **kargs): def FD_NAME(cls): return re.sub(r'(?<=[a-z])(?=[A-Z])', '_', cls.__name__[:-2]).lower() - @staticmethod - def format_seconds(seconds): - if seconds is None: - return ' Unknown' - time = timetuple_from_msec(seconds * 1000) - if time.hours > 99: - return '--:--:--' - return '%02d:%02d:%02d' % time[:-1] - - @classmethod - def format_eta(cls, seconds): - return f'{remove_start(cls.format_seconds(seconds), "00:"):>8s}' - - @staticmethod - def calc_percent(byte_counter, data_len): - if data_len is None: - return None - return float(byte_counter) / float(data_len) * 100.0 - - @staticmethod - def format_percent(percent): - return ' N/A%' if percent is None else f'{percent:>5.1f}%' - - @classmethod - def calc_eta(cls, start_or_rate, now_or_remaining, total=NO_DEFAULT, current=NO_DEFAULT): - if total is NO_DEFAULT: - rate, remaining = start_or_rate, now_or_remaining - if None in (rate, remaining): - return None - return int(float(remaining) / rate) - - start, now = start_or_rate, now_or_remaining - if total is None: - return None - if now is None: - now = time.time() - rate = cls.calc_speed(start, now, current) - return rate and int((float(total) - float(current)) / rate) - - @staticmethod - def calc_speed(start, now, bytes): - dif = now - start - if bytes == 0 or dif < 0.001: # One millisecond - return None - return float(bytes) / dif - - @staticmethod - def format_speed(speed): - return ' Unknown B/s' if speed is None else f'{format_bytes(speed):>10s}/s' - @staticmethod def format_retries(retries): return 'inf' if retries == float('inf') else int(retries) @@ -343,18 +292,16 @@ def with_fields(*tups, default=''): return tmpl return default - _format_bytes = lambda k: f'{format_bytes(s.get(k)):>10s}' - if s['status'] == 'finished': if self.params.get('noprogress'): self.to_screen('[download] Download completed') speed = try_call(lambda: s['total_bytes'] / s['elapsed']) s.update({ 'speed': speed, - '_speed_str': self.format_speed(speed).strip(), - '_total_bytes_str': _format_bytes('total_bytes'), - '_elapsed_str': self.format_seconds(s.get('elapsed')), - '_percent_str': self.format_percent(100), + '_speed_str': FormatProgressInfos.format_speed(speed).strip(), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), + '_percent_str': FormatProgressInfos.format_percent(100), }) self._report_progress_status(s, join_nonempty( '100%%', @@ -367,16 +314,16 @@ def with_fields(*tups, default=''): return s.update({ - '_eta_str': self.format_eta(s.get('eta')).strip(), - '_speed_str': self.format_speed(s.get('speed')), - '_percent_str': self.format_percent(try_call( + '_eta_str': FormatProgressInfos.format_eta(s.get('eta')), + '_speed_str': FormatProgressInfos.format_speed(s.get('speed')), + '_percent_str': FormatProgressInfos.format_percent(try_call( lambda: 100 * s['downloaded_bytes'] / s['total_bytes'], lambda: 100 * s['downloaded_bytes'] / s['total_bytes_estimate'], lambda: s['downloaded_bytes'] == 0 and 0)), - '_total_bytes_str': _format_bytes('total_bytes'), - '_total_bytes_estimate_str': _format_bytes('total_bytes_estimate'), - '_downloaded_bytes_str': _format_bytes('downloaded_bytes'), - '_elapsed_str': self.format_seconds(s.get('elapsed')), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_total_bytes_estimate_str': format_bytes(s.get('total_bytes_estimate')), + '_downloaded_bytes_str': format_bytes(s.get('downloaded_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), }) msg_template = with_fields( diff --git a/yt_dlp/downloader/external.py b/yt_dlp/downloader/external.py index 4ce8a3bf7..0a8464800 100644 --- a/yt_dlp/downloader/external.py +++ b/yt_dlp/downloader/external.py @@ -3,7 +3,6 @@ import os import re import subprocess -import sys import tempfile import time import uuid @@ -11,7 +10,11 @@ from .fragment import FragmentFD from ..compat import functools from ..networking import Request -from ..postprocessor.ffmpeg import EXT_TO_OUT_FORMATS, FFmpegPostProcessor +from ..postprocessor.ffmpeg import ( + EXT_TO_OUT_FORMATS, + FFmpegPostProcessor, + FFmpegProgressTracker, +) from ..utils import ( Popen, RetryManager, @@ -619,26 +622,23 @@ def _call_downloader(self, tmpfilename, info_dict): args = [encodeArgument(opt) for opt in args] args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True)) + args += ['-progress', 'pipe:1'] self._debug_cmd(args) piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats) - with Popen(args, stdin=subprocess.PIPE, env=env) as proc: - if piped: - self.on_process_started(proc, proc.stdin) - try: - retval = proc.wait() - except BaseException as e: - # subprocces.run would send the SIGKILL signal to ffmpeg and the - # mp4 file couldn't be played, but if we ask ffmpeg to quit it - # produces a file that is playable (this is mostly useful for live - # streams). Note that Windows is not affected and produces playable - # files (see https://github.com/ytdl-org/youtube-dl/issues/8300). - if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32' and not piped: - proc.communicate_or_kill(b'q') - else: - proc.kill(timeout=None) - raise - return retval + self._debug_cmd(args) + ffmpeg_progress_tracker = FFmpegProgressTracker(info_dict, args, self._ffmpeg_hook) + proc = ffmpeg_progress_tracker.ffmpeg_proc + if piped: + self.on_process_started(proc, proc.stdin) + _, _, return_code = ffmpeg_progress_tracker.run_ffmpeg_subprocess() + return return_code + + def _ffmpeg_hook(self, status, info_dict): + status['downloaded_bytes'] = status.get('outputted', 0) + if status.get('status') == 'ffmpeg_running': + status['status'] = 'downloading' + self._hook_progress(status, info_dict) class AVconvFD(FFmpegFD): diff --git a/yt_dlp/downloader/http.py b/yt_dlp/downloader/http.py index f5237443e..a1f95db29 100644 --- a/yt_dlp/downloader/http.py +++ b/yt_dlp/downloader/http.py @@ -11,6 +11,7 @@ ) from ..utils import ( ContentTooShortError, + FormatProgressInfos, RetryManager, ThrottledDownload, XAttrMetadataError, @@ -293,11 +294,11 @@ def retry(e): before = after # Progress message - speed = self.calc_speed(start, now, byte_counter - ctx.resume_len) + speed = FormatProgressInfos.calc_speed(start, now, byte_counter - ctx.resume_len) if ctx.data_len is None: eta = None else: - eta = self.calc_eta(start, time.time(), ctx.data_len - ctx.resume_len, byte_counter - ctx.resume_len) + eta = FormatProgressInfos.calc_eta(start, time.time(), ctx.data_len - ctx.resume_len, byte_counter - ctx.resume_len) self._hook_progress({ 'status': 'downloading', diff --git a/yt_dlp/downloader/rtmp.py b/yt_dlp/downloader/rtmp.py index 0e0952599..9373f3585 100644 --- a/yt_dlp/downloader/rtmp.py +++ b/yt_dlp/downloader/rtmp.py @@ -5,6 +5,7 @@ from .common import FileDownloader from ..utils import ( + FormatProgressInfos, Popen, check_executable, encodeArgument, @@ -50,8 +51,8 @@ def run_rtmpdump(args): resume_percent = percent resume_downloaded_data_len = downloaded_data_len time_now = time.time() - eta = self.calc_eta(start, time_now, 100 - resume_percent, percent - resume_percent) - speed = self.calc_speed(start, time_now, downloaded_data_len - resume_downloaded_data_len) + eta = FormatProgressInfos.calc_eta(start, time_now, 100 - resume_percent, percent - resume_percent) + speed = FormatProgressInfos.calc_speed(start, time_now, downloaded_data_len - resume_downloaded_data_len) data_len = None if percent > 0: data_len = int(downloaded_data_len * 100 / percent) diff --git a/yt_dlp/postprocessor/common.py b/yt_dlp/postprocessor/common.py index 8cef86c43..371164863 100644 --- a/yt_dlp/postprocessor/common.py +++ b/yt_dlp/postprocessor/common.py @@ -1,15 +1,27 @@ import functools import json import os +import sys +from ..minicurses import ( + BreaklineStatusPrinter, + MultilineLogger, + MultilinePrinter, + QuietMultilinePrinter, +) from ..networking import Request from ..networking.exceptions import HTTPError, network_exceptions from ..utils import ( + FormatProgressInfos, + Namespace, PostProcessingError, RetryManager, _configuration_args, deprecation_warning, encodeFilename, + format_bytes, + join_nonempty, + try_call, ) @@ -56,7 +68,9 @@ def __init__(self, downloader=None): self._progress_hooks = [] self.add_progress_hook(self.report_progress) self.set_downloader(downloader) + self._out_files = self.set_out_files() self.PP_NAME = self.pp_key() + self._prepare_multiline_status() @classmethod def pp_key(cls): @@ -102,6 +116,11 @@ def get_param(self, name, default=None, *args, **kwargs): return self._downloader.params.get(name, default, *args, **kwargs) return default + def set_out_files(self): + if not self._downloader: + return None + return getattr(self._downloader, '_out_files', None) or self._downloader.ydl._out_files + def set_downloader(self, downloader): """Sets the downloader for this PP.""" self._downloader = downloader @@ -173,25 +192,116 @@ def add_progress_hook(self, ph): # See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface self._progress_hooks.append(ph) - def report_progress(self, s): - s['_default_template'] = '%(postprocessor)s %(status)s' % s - if not self._downloader: - return + def report_destination(self, filename): + """Report destination filename.""" + self.to_screen('[processing] Destination: ' + filename) + + def _prepare_multiline_status(self, lines=1): + if self._downloader: + if self._downloader.params.get('noprogress'): + self._multiline = QuietMultilinePrinter() + elif self._downloader.params.get('logger'): + self._multiline = MultilineLogger(self._downloader.params['logger'], lines) + elif self._downloader.params.get('progress_with_newline'): + self._multiline = BreaklineStatusPrinter(self._downloader._out_files.out, lines) + elif hasattr(self._downloader, "_out_files"): + self._multiline = MultilinePrinter(self._downloader._out_files.out, lines, not self._downloader.params.get('quiet')) + else: + self._multiline = MultilinePrinter(sys.stdout, lines, not self._downloader.params.get('quiet')) + self._multiline.allow_colors = self._multiline._HAVE_FULLCAP and not self._downloader.params.get('no_color') + else: + self._multiline = MultilinePrinter(sys.stdout, lines, True) + self._multiline.allow_colors = self._multiline._HAVE_FULLCAP + + def _finish_multiline_status(self): + self._multiline.end() + + ProgressStyles = Namespace( + processed_bytes='light blue', + percent='light blue', + eta='yellow', + speed='green', + elapsed='bold white', + total_bytes='', + total_bytes_estimate='', + ) + + def _report_progress_status(self, s, default_template): + for name, style in self.ProgressStyles.items_: + name = f'_{name}_str' + if name not in s: + continue + s[name] = self._format_progress(s[name], style) + s['_default_template'] = default_template % s progress_dict = s.copy() progress_dict.pop('info_dict') progress_dict = {'info': s['info_dict'], 'progress': progress_dict} - progress_template = self.get_param('progress_template', {}) - tmpl = progress_template.get('postprocess') - if tmpl: - self._downloader.to_screen( - self._downloader.evaluate_outtmpl(tmpl, progress_dict), quiet=False) - + progress_template = self._downloader.params.get('progress_template', {}) + self._multiline.print_at_line(self._downloader.evaluate_outtmpl( + progress_template.get('process') or '[processing] %(progress._default_template)s', + progress_dict), s.get('progress_idx') or 0) self._downloader.to_console_title(self._downloader.evaluate_outtmpl( - progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s', + progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s', progress_dict)) + def _format_progress(self, *args, **kwargs): + return self._downloader._format_text( + self._multiline.stream, self._multiline.allow_colors, *args, **kwargs) + + def report_progress(self, s): + def with_fields(*tups, default=''): + for *fields, tmpl in tups: + if all(s.get(f) is not None for f in fields): + return tmpl + return default + + if not self._downloader: + return + + if s['status'] == 'finished': + if self._downloader.params.get('noprogress'): + self.to_screen('[processing] Download completed') + speed = try_call(lambda: s['total_bytes'] / s['elapsed']) + s.update({ + 'speed': speed, + '_speed_str': FormatProgressInfos.format_speed(speed).strip(), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), + '_percent_str': FormatProgressInfos.format_percent(100), + }) + self._report_progress_status(s, join_nonempty( + '100%%', + with_fields(('total_bytes', 'of %(_total_bytes_str)s')), + with_fields(('elapsed', 'in %(_elapsed_str)s')), + with_fields(('speed', 'at %(_speed_str)s')), + delim=' ')) + + if s['status'] != 'processing': + return + + s.update({ + '_eta_str': FormatProgressInfos.format_eta(s.get('eta')), + '_speed_str': FormatProgressInfos.format_speed(s.get('speed')), + '_percent_str': FormatProgressInfos.format_percent(try_call( + lambda: 100 * s['processed_bytes'] / s['total_bytes'], + lambda: 100 * s['processed_bytes'] / s['total_bytes_estimate'], + lambda: s['processed_bytes'] == 0 and 0)), + '_total_bytes_str': format_bytes(s.get('total_bytes')), + '_total_bytes_estimate_str': format_bytes(s.get('total_bytes_estimate')), + '_processed_bytes_str': format_bytes(s.get('processed_bytes')), + '_elapsed_str': FormatProgressInfos.format_seconds(s.get('elapsed')), + }) + + msg_template = with_fields( + ('total_bytes', '%(_percent_str)s of %(_total_bytes_str)s at %(_speed_str)s ETA %(_eta_str)s'), + ('processed_bytes', 'elapsed', '%(_processed_bytes_str)s at %(_speed_str)s (%(_elapsed_str)s)'), + ('processed_bytes', '%(_processed_bytes_str)s at %(_speed_str)s'), + default='%(_percent_str)s at %(_speed_str)s ETA %(_eta_str)s') + + self._report_progress_status(s, msg_template) + def _retry_download(self, err, count, retries): # While this is not an extractor, it behaves similar to one and # so obey extractor_retries and "--retry-sleep extractor" diff --git a/yt_dlp/postprocessor/ffmpeg.py b/yt_dlp/postprocessor/ffmpeg.py index 323f4303c..fa11bb94d 100644 --- a/yt_dlp/postprocessor/ffmpeg.py +++ b/yt_dlp/postprocessor/ffmpeg.py @@ -5,7 +5,10 @@ import os import re import subprocess +import sys import time +from queue import Queue +from threading import Thread from .common import PostProcessor from ..compat import functools, imghdr @@ -23,6 +26,7 @@ encodeFilename, filter_dict, float_or_none, + int_or_none, is_outdated_version, orderedSet, prepend_extension, @@ -326,11 +330,9 @@ def _duration_mismatch(self, d1, d2, tolerance=2): return abs(d1 - d2) > tolerance def run_ffmpeg_multiple_files(self, input_paths, out_path, opts, **kwargs): - return self.real_run_ffmpeg( - [(path, []) for path in input_paths], - [(out_path, opts)], **kwargs) + return self.real_run_ffmpeg([(path, []) for path in input_paths], [(out_path, opts)], **kwargs) - def real_run_ffmpeg(self, input_path_opts, output_path_opts, *, expected_retcodes=(0,)): + def real_run_ffmpeg(self, input_path_opts, output_path_opts, *, expected_retcodes=(0,), info_dict=None): self.check_version() oldest_mtime = min( @@ -350,19 +352,20 @@ def make_args(file, args, name, number): args += self._configuration_args(self.basename, keys) if name == 'i': args.append('-i') - return ( - [encodeArgument(arg) for arg in args] - + [encodeFilename(self._ffmpeg_filename_argument(file), True)]) + return [encodeArgument(arg) for arg in args] + [encodeFilename(self._ffmpeg_filename_argument(file), True)] for arg_type, path_opts in (('i', input_path_opts), ('o', output_path_opts)): cmd += itertools.chain.from_iterable( make_args(path, list(opts), arg_type, i + 1) for i, (path, opts) in enumerate(path_opts) if path) + cmd += ['-progress', 'pipe:1'] self.write_debug('ffmpeg command line: %s' % shell_quote(cmd)) - _, stderr, returncode = Popen.run( - cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) - if returncode not in variadic(expected_retcodes): + + ffmpeg_progress_tracker = FFmpegProgressTracker(info_dict, cmd, self._ffmpeg_hook, self._downloader) + _, stderr, return_code = ffmpeg_progress_tracker.run_ffmpeg_subprocess() + if return_code not in variadic(expected_retcodes): + stderr = stderr.strip() self.write_debug(stderr) raise FFmpegPostProcessorError(stderr.strip().splitlines()[-1]) for out_path, _ in output_path_opts: @@ -370,7 +373,7 @@ def make_args(file, args, name, number): self.try_utime(out_path, oldest_mtime, oldest_mtime) return stderr - def run_ffmpeg(self, path, out_path, opts, **kwargs): + def run_ffmpeg(self, path, out_path, opts, informations=None, **kwargs): return self.run_ffmpeg_multiple_files([path], out_path, opts, **kwargs) @staticmethod @@ -435,6 +438,12 @@ def _concat_spec(cls, in_files, concat_opts=None): if directive in opts: yield f'{directive} {opts[directive]}\n' + def _ffmpeg_hook(self, status, info_dict): + status['processed_bytes'] = status.get('outputted', 0) + if status.get('status') == 'ffmpeg_running': + status['status'] = 'processing' + self._hook_progress(status, info_dict) + class FFmpegExtractAudioPP(FFmpegPostProcessor): COMMON_AUDIO_EXTS = MEDIA_EXTENSIONS.common_audio + ('wma', ) @@ -469,14 +478,14 @@ def _quality_args(self, codec): return ['-vbr', f'{int(q)}'] return ['-q:a', f'{q}'] - def run_ffmpeg(self, path, out_path, codec, more_opts): + def run_ffmpeg(self, path, out_path, codec, more_opts, informations=None): if codec is None: acodec_opts = [] else: acodec_opts = ['-acodec', codec] opts = ['-vn'] + acodec_opts + more_opts try: - FFmpegPostProcessor.run_ffmpeg(self, path, out_path, opts) + FFmpegPostProcessor.run_ffmpeg(self, path, out_path, opts, informations) except FFmpegPostProcessorError as err: raise PostProcessingError(f'audio conversion failed: {err.msg}') @@ -527,7 +536,7 @@ def run(self, information): return [], information self.to_screen(f'Destination: {new_path}') - self.run_ffmpeg(path, temp_path, acodec, more_opts) + self.run_ffmpeg(path, temp_path, acodec, more_opts, information) os.replace(path, orig_path) os.replace(temp_path, new_path) @@ -570,7 +579,7 @@ def run(self, info): outpath = replace_extension(filename, target_ext, source_ext) self.to_screen(f'{self._ACTION.title()} video from {source_ext} to {target_ext}; Destination: {outpath}') - self.run_ffmpeg(filename, outpath, self._options(target_ext)) + self.run_ffmpeg(filename, outpath, self._options(target_ext), info) info['filepath'] = outpath info['format'] = info['ext'] = target_ext @@ -834,7 +843,7 @@ def run(self, info): if fmt.get('vcodec') != 'none': args.extend(['-map', '%u:v:0' % (i)]) self.to_screen('Merging formats into "%s"' % filename) - self.run_ffmpeg_multiple_files(info['__files_to_merge'], temp_filename, args) + self.run_ffmpeg_multiple_files(info['__files_to_merge'], temp_filename, args, info_dict=info) os.rename(encodeFilename(temp_filename), encodeFilename(filename)) return info['__files_to_merge'], info @@ -1005,7 +1014,7 @@ def run(self, info): else: sub_filenames.append(srt_file) - self.run_ffmpeg(old_file, new_file, ['-f', new_format]) + self.run_ffmpeg(old_file, new_file, ['-f', new_format], info) with open(new_file, encoding='utf-8') as f: subs[lang] = { @@ -1188,3 +1197,237 @@ def run(self, info): 'ext': ie_copy['ext'], }] return files_to_delete, info + + +class FFmpegProgressTracker: + def __init__(self, info_dict, ffmpeg_args, hook_progress, ydl=None): + self.ydl = ydl + self._info_dict = info_dict + self._ffmpeg_args = ffmpeg_args + self._hook_progress = hook_progress + self._stdout_queue, self._stderr_queue = Queue(), Queue() + self._streams, self._stderr_buffer, self._stdout_buffer = ['', ''], '', '' + self._progress_pattern = re.compile(r'''(?x) + (?: + frame=\s(?P\S+)\n + fps=\s(?P\S+)\n + stream\d+_\d+_q=\s(?P\S+)\n + )? + bitrate=\s(?P\S+)\n + total_size=\s(?P\S+)\n + out_time_us=\s(?P\S+)\n + out_time_ms=\s(?P\S+)\n + out_time=\s(?P\S+)\n + dup_frames=\s(?P\S+)\n + drop_frames=\s(?P\S+)\n + speed=\s(?P\S+)\n + progress=\s(?P\S+) + ''') + + if self.ydl: + self.ydl.write_debug(f'ffmpeg command line: {shell_quote(self._ffmpeg_args)}') + self.ffmpeg_proc = Popen(self._ffmpeg_args, universal_newlines=True, + encoding='utf8', stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self._start_time = time.time() + + def trigger_progress_hook(self, dct): + self._status.update(dct) + self._hook_progress(self._status, self._info_dict) + + def run_ffmpeg_subprocess(self): + if self._info_dict and self.ydl: + return self._track_ffmpeg_progress() + return self._run_ffmpeg_without_progress_tracking() + + def _run_ffmpeg_without_progress_tracking(self): + """Simply run ffmpeg and only care about the last stderr, stdout and the retcode""" + stdout, stderr = self.ffmpeg_proc.communicate_or_kill() + retcode = self.ffmpeg_proc.returncode + return stdout, stderr, retcode + + def _track_ffmpeg_progress(self): + """ Track ffmpeg progress in a non blocking way using queues""" + self._start_time = time.time() + # args needed to track ffmpeg progress from stdout + self._duration_to_track, self._total_duration = self._compute_duration_to_track() + self._total_filesize = self._compute_total_filesize(self._duration_to_track, self._total_duration) + self._status = { + 'filename': self._ffmpeg_args[-3].split(":")[-1], + 'status': 'ffmpeg_running', + 'total_bytes': self._total_filesize, + 'elapsed': 0, + 'outputted': 0 + } + + out_listener = Thread( + target=self._enqueue_lines, + args=(self.ffmpeg_proc.stdout, self._stdout_queue), + daemon=True, + ) + err_listener = Thread( + target=self._enqueue_lines, + args=(self.ffmpeg_proc.stderr, self._stderr_queue), + daemon=True, + ) + out_listener.start() + err_listener.start() + + retcode = self._wait_for_ffmpeg() + + self._status.update({ + 'status': 'finished', + 'outputted': self._total_filesize + }) + time.sleep(.5) # Needed if ffmpeg didn't release the file in time for yt-dlp to change its name + return self._streams[0], self._streams[1], retcode + + @staticmethod + def _enqueue_lines(out, queue): + for line in iter(out.readline, ''): + queue.put(line.rstrip()) + out.close() + + def _save_stream(self, lines, to_stderr=False): + if not lines: + return + self._streams[to_stderr] += lines + + self.ydl.to_screen('\r', skip_eol=True) + for msg in lines.splitlines(): + if msg.strip(): + self.ydl.write_debug(f'ffmpeg: {msg}') + + def _handle_lines(self): + if not self._stdout_queue.empty(): + stdout_line = self._stdout_queue.get_nowait() + self._stdout_buffer += stdout_line + '\n' + self._parse_ffmpeg_output() + + if not self._stderr_queue.empty(): + stderr_line = self._stderr_queue.get_nowait() + self._stderr_buffer += stderr_line + + def _wait_for_ffmpeg(self): + retcode = self.ffmpeg_proc.poll() + while retcode is None: + time.sleep(.01) + self._handle_lines() + self._status.update({ + 'elapsed': time.time() - self._start_time + }) + self._hook_progress(self._status, self._info_dict) + retcode = self.ffmpeg_proc.poll() + return retcode + + def _parse_ffmpeg_output(self): + ffmpeg_prog_infos = re.match(self._progress_pattern, self._stdout_buffer) + if not ffmpeg_prog_infos: + return + eta_seconds = self._compute_eta(ffmpeg_prog_infos, self._duration_to_track) + bitrate_int = self._compute_bitrate(ffmpeg_prog_infos.group('bitrate')) + # Not using ffmpeg 'total_size' value as it's imprecise and gives progress percentage over 100 + out_time_second = int_or_none(ffmpeg_prog_infos.group('out_time_us')) // 1_000_000 + try: + outputted_bytes_int = int_or_none(out_time_second / self._duration_to_track * self._total_filesize) + except ZeroDivisionError: + outputted_bytes_int = 0 + self._status.update({ + 'outputted': outputted_bytes_int, + 'speed': bitrate_int, + 'eta': eta_seconds, + }) + self._hook_progress(self._status, self._info_dict) + self._stderr_buffer = re.sub(r'=\s+', '=', self._stderr_buffer) + print(self._stdout_buffer, file=sys.stdout, end='') + print(self._stderr_buffer, file=sys.stderr) + self._stdout_buffer = '' + self._stderr_buffer = '' + + def _compute_total_filesize(self, duration_to_track, total_duration): + if not total_duration: + return 0 + filesize = self._info_dict.get('filesize') + if not filesize: + filesize = self._info_dict.get('filesize_approx', 0) + total_filesize = filesize * duration_to_track // total_duration + return total_filesize + + def _compute_duration_to_track(self): + duration = self._info_dict.get('duration') + if not duration: + return 0, 0 + + start_time, end_time = 0, duration + for i, arg in enumerate(self._ffmpeg_args[:-1]): + next_arg_is_a_timestamp = re.match(r'(?P(-ss|-sseof|-to))', arg) + this_arg_is_a_timestamp = re.match(r'(?P(-ss|-sseof|-to))=(?P\d+)', arg) + if not (next_arg_is_a_timestamp or this_arg_is_a_timestamp): + continue + elif next_arg_is_a_timestamp: + timestamp_seconds = self.ffmpeg_time_string_to_seconds(self._ffmpeg_args[i + 1]) + else: + timestamp_seconds = self.ffmpeg_time_string_to_seconds(this_arg_is_a_timestamp.group('timestamp')) + if next_arg_is_a_timestamp.group('at') == '-ss': + start_time = timestamp_seconds + elif next_arg_is_a_timestamp.group('at') == '-sseof': + start_time = end_time - timestamp_seconds + elif next_arg_is_a_timestamp.group('at') == '-to': + end_time = timestamp_seconds + + duration_to_track = end_time - start_time + if duration_to_track >= 0: + return duration_to_track, duration + return 0, duration + + @staticmethod + def _compute_eta(ffmpeg_prog_infos, duration_to_track): + try: + speed = float_or_none(ffmpeg_prog_infos.group('speed')[:-1]) + out_time_second = int_or_none(ffmpeg_prog_infos.group('out_time_us')) // 1_000_000 + eta_seconds = (duration_to_track - out_time_second) // speed + except (TypeError, ZeroDivisionError): + eta_seconds = 0 + return eta_seconds + + @staticmethod + def ffmpeg_time_string_to_seconds(time_string): + ffmpeg_time_seconds = 0 + hms_parsed = re.match(r"((?P\d+):)?((?P\d+):)?(?P\d+)(\.(?P\d+))?", time_string) + smu_parse = re.match(r"(?P