[fragment,hls,f4m,dash,ism] improve fragment downloading

- resume immediately
- no need to concatenate segments and decrypt them on every resume
- no need to save temp files for segments

and for hls downloader:
- no need to download keys for segments that already downloaded
This commit is contained in:
Remita Amine 2016-06-28 18:07:50 +01:00
parent 58f6ab72ed
commit 75a2485407
7 changed files with 111 additions and 123 deletions

View file

@ -327,6 +327,7 @@ def download(self, filename, info_dict):
os.path.exists(encodeFilename(filename)) os.path.exists(encodeFilename(filename))
) )
if not hasattr(filename, 'write'):
continuedl_and_exists = ( continuedl_and_exists = (
self.params.get('continuedl', True) and self.params.get('continuedl', True) and
os.path.isfile(encodeFilename(filename)) and os.path.isfile(encodeFilename(filename)) and

View file

@ -1,13 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os
from .fragment import FragmentFD from .fragment import FragmentFD
from ..compat import compat_urllib_error from ..compat import compat_urllib_error
from ..utils import (
sanitize_open,
encodeFilename,
)
class DashSegmentsFD(FragmentFD): class DashSegmentsFD(FragmentFD):
@ -28,31 +22,24 @@ def real_download(self, filename, info_dict):
self._prepare_and_start_frag_download(ctx) self._prepare_and_start_frag_download(ctx)
segments_filenames = []
fragment_retries = self.params.get('fragment_retries', 0) fragment_retries = self.params.get('fragment_retries', 0)
skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True) skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
def process_segment(segment, tmp_filename, num): frag_index = 0
segment_url = segment['url'] for i, segment in enumerate(segments):
segment_name = 'Frag%d' % num frag_index += 1
target_filename = '%s-%s' % (tmp_filename, segment_name) if frag_index <= ctx['frag_index']:
continue
# In DASH, the first segment contains necessary headers to # In DASH, the first segment contains necessary headers to
# generate a valid MP4 file, so always abort for the first segment # generate a valid MP4 file, so always abort for the first segment
fatal = num == 0 or not skip_unavailable_fragments fatal = i == 0 or not skip_unavailable_fragments
count = 0 count = 0
while count <= fragment_retries: while count <= fragment_retries:
try: try:
success = ctx['dl'].download(target_filename, { success, frag_content = self._download_fragment(ctx, segment['url'], info_dict)
'url': segment_url,
'http_headers': info_dict.get('http_headers'),
})
if not success: if not success:
return False return False
down, target_sanitized = sanitize_open(target_filename, 'rb') self._append_fragment(ctx, frag_content)
ctx['dest_stream'].write(down.read())
down.close()
segments_filenames.append(target_sanitized)
break break
except compat_urllib_error.HTTPError as err: except compat_urllib_error.HTTPError as err:
# YouTube may often return 404 HTTP error for a fragment causing the # YouTube may often return 404 HTTP error for a fragment causing the
@ -63,22 +50,14 @@ def process_segment(segment, tmp_filename, num):
# HTTP error. # HTTP error.
count += 1 count += 1
if count <= fragment_retries: if count <= fragment_retries:
self.report_retry_fragment(err, segment_name, count, fragment_retries) self.report_retry_fragment(err, frag_index, count, fragment_retries)
if count > fragment_retries: if count > fragment_retries:
if not fatal: if not fatal:
self.report_skip_fragment(segment_name) self.report_skip_fragment(frag_index)
return True continue
self.report_error('giving up after %s fragment retries' % fragment_retries) self.report_error('giving up after %s fragment retries' % fragment_retries)
return False return False
return True
for i, segment in enumerate(segments):
if not process_segment(segment, ctx['tmpfilename'], i):
return False
self._finish_frag_download(ctx) self._finish_frag_download(ctx)
for segment_file in segments_filenames:
os.remove(encodeFilename(segment_file))
return True return True

View file

@ -3,7 +3,6 @@
import base64 import base64
import io import io
import itertools import itertools
import os
import time import time
from .fragment import FragmentFD from .fragment import FragmentFD
@ -16,9 +15,7 @@
compat_struct_unpack, compat_struct_unpack,
) )
from ..utils import ( from ..utils import (
encodeFilename,
fix_xml_ampersands, fix_xml_ampersands,
sanitize_open,
xpath_text, xpath_text,
) )
@ -366,6 +363,7 @@ def real_download(self, filename, info_dict):
dest_stream = ctx['dest_stream'] dest_stream = ctx['dest_stream']
if ctx['complete_frags_downloaded_bytes'] == 0:
write_flv_header(dest_stream) write_flv_header(dest_stream)
if not live: if not live:
write_metadata_tag(dest_stream, metadata) write_metadata_tag(dest_stream, metadata)
@ -374,9 +372,12 @@ def real_download(self, filename, info_dict):
self._start_frag_download(ctx) self._start_frag_download(ctx)
frags_filenames = [] frag_index = 0
while fragments_list: while fragments_list:
seg_i, frag_i = fragments_list.pop(0) seg_i, frag_i = fragments_list.pop(0)
frag_index += 1
if frag_index <= ctx['frag_index']:
continue
name = 'Seg%d-Frag%d' % (seg_i, frag_i) name = 'Seg%d-Frag%d' % (seg_i, frag_i)
query = [] query = []
if base_url_parsed.query: if base_url_parsed.query:
@ -386,17 +387,10 @@ def real_download(self, filename, info_dict):
if info_dict.get('extra_param_to_segment_url'): if info_dict.get('extra_param_to_segment_url'):
query.append(info_dict['extra_param_to_segment_url']) query.append(info_dict['extra_param_to_segment_url'])
url_parsed = base_url_parsed._replace(path=base_url_parsed.path + name, query='&'.join(query)) url_parsed = base_url_parsed._replace(path=base_url_parsed.path + name, query='&'.join(query))
frag_filename = '%s-%s' % (ctx['tmpfilename'], name)
try: try:
success = ctx['dl'].download(frag_filename, { success, down_data = self._download_fragment(ctx, url_parsed.geturl(), info_dict)
'url': url_parsed.geturl(),
'http_headers': info_dict.get('http_headers'),
})
if not success: if not success:
return False return False
(down, frag_sanitized) = sanitize_open(frag_filename, 'rb')
down_data = down.read()
down.close()
reader = FlvReader(down_data) reader = FlvReader(down_data)
while True: while True:
try: try:
@ -411,12 +405,8 @@ def real_download(self, filename, info_dict):
break break
raise raise
if box_type == b'mdat': if box_type == b'mdat':
dest_stream.write(box_data) self._append_fragment(ctx, box_data)
break break
if live:
os.remove(encodeFilename(frag_sanitized))
else:
frags_filenames.append(frag_sanitized)
except (compat_urllib_error.HTTPError, ) as err: except (compat_urllib_error.HTTPError, ) as err:
if live and (err.code == 404 or err.code == 410): if live and (err.code == 404 or err.code == 410):
# We didn't keep up with the live window. Continue # We didn't keep up with the live window. Continue
@ -436,7 +426,4 @@ def real_download(self, filename, info_dict):
self._finish_frag_download(ctx) self._finish_frag_download(ctx)
for frag_file in frags_filenames:
os.remove(encodeFilename(frag_file))
return True return True

View file

@ -2,6 +2,7 @@
import os import os
import time import time
import io
from .common import FileDownloader from .common import FileDownloader
from .http import HttpFD from .http import HttpFD
@ -10,6 +11,7 @@
encodeFilename, encodeFilename,
sanitize_open, sanitize_open,
sanitized_Request, sanitized_Request,
compat_str,
) )
@ -30,13 +32,13 @@ class FragmentFD(FileDownloader):
Skip unavailable fragments (DASH and hlsnative only) Skip unavailable fragments (DASH and hlsnative only)
""" """
def report_retry_fragment(self, err, fragment_name, count, retries): def report_retry_fragment(self, err, frag_index, count, retries):
self.to_screen( self.to_screen(
'[download] Got server HTTP error: %s. Retrying fragment %s (attempt %d of %s)...' '[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s)...'
% (error_to_compat_str(err), fragment_name, count, self.format_retries(retries))) % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
def report_skip_fragment(self, fragment_name): def report_skip_fragment(self, frag_index):
self.to_screen('[download] Skipping fragment %s...' % fragment_name) self.to_screen('[download] Skipping fragment %d...' % frag_index)
def _prepare_url(self, info_dict, url): def _prepare_url(self, info_dict, url):
headers = info_dict.get('http_headers') headers = info_dict.get('http_headers')
@ -46,6 +48,25 @@ def _prepare_and_start_frag_download(self, ctx):
self._prepare_frag_download(ctx) self._prepare_frag_download(ctx)
self._start_frag_download(ctx) self._start_frag_download(ctx)
def _download_fragment(self, ctx, frag_url, info_dict, headers=None):
down = io.BytesIO()
success = ctx['dl'].download(down, {
'url': frag_url,
'http_headers': headers or info_dict.get('http_headers'),
})
if not success:
return False, None
frag_content = down.getvalue()
down.close()
return True, frag_content
def _append_fragment(self, ctx, frag_content):
ctx['dest_stream'].write(frag_content)
if not (ctx.get('live') or ctx['tmpfilename'] == '-'):
frag_index_stream, _ = sanitize_open(ctx['tmpfilename'] + '.fragindex', 'w')
frag_index_stream.write(compat_str(ctx['frag_index']))
frag_index_stream.close()
def _prepare_frag_download(self, ctx): def _prepare_frag_download(self, ctx):
if 'live' not in ctx: if 'live' not in ctx:
ctx['live'] = False ctx['live'] = False
@ -66,11 +87,26 @@ def _prepare_frag_download(self, ctx):
} }
) )
tmpfilename = self.temp_name(ctx['filename']) tmpfilename = self.temp_name(ctx['filename'])
dest_stream, tmpfilename = sanitize_open(tmpfilename, 'wb') open_mode = 'wb'
resume_len = 0
frag_index = 0
# Establish possible resume length
if os.path.isfile(encodeFilename(tmpfilename)):
open_mode = 'ab'
resume_len = os.path.getsize(encodeFilename(tmpfilename))
if os.path.isfile(encodeFilename(tmpfilename + '.fragindex')):
frag_index_stream, _ = sanitize_open(tmpfilename + '.fragindex', 'r')
frag_index = int(frag_index_stream.read())
frag_index_stream.close()
dest_stream, tmpfilename = sanitize_open(tmpfilename, open_mode)
ctx.update({ ctx.update({
'dl': dl, 'dl': dl,
'dest_stream': dest_stream, 'dest_stream': dest_stream,
'tmpfilename': tmpfilename, 'tmpfilename': tmpfilename,
'frag_index': frag_index,
# Total complete fragments downloaded so far in bytes
'complete_frags_downloaded_bytes': resume_len,
}) })
def _start_frag_download(self, ctx): def _start_frag_download(self, ctx):
@ -79,8 +115,8 @@ def _start_frag_download(self, ctx):
# hook # hook
state = { state = {
'status': 'downloading', 'status': 'downloading',
'downloaded_bytes': 0, 'downloaded_bytes': ctx['complete_frags_downloaded_bytes'],
'frag_index': 0, 'frag_index': ctx['frag_index'],
'frag_count': total_frags, 'frag_count': total_frags,
'filename': ctx['filename'], 'filename': ctx['filename'],
'tmpfilename': ctx['tmpfilename'], 'tmpfilename': ctx['tmpfilename'],
@ -89,8 +125,6 @@ def _start_frag_download(self, ctx):
start = time.time() start = time.time()
ctx.update({ ctx.update({
'started': start, 'started': start,
# Total complete fragments downloaded so far in bytes
'complete_frags_downloaded_bytes': 0,
# Amount of fragment's bytes downloaded by the time of the previous # Amount of fragment's bytes downloaded by the time of the previous
# frag progress hook invocation # frag progress hook invocation
'prev_frag_downloaded_bytes': 0, 'prev_frag_downloaded_bytes': 0,
@ -111,6 +145,7 @@ def frag_progress_hook(s):
if s['status'] == 'finished': if s['status'] == 'finished':
state['frag_index'] += 1 state['frag_index'] += 1
ctx['frag_index'] = state['frag_index']
state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes'] state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes']
ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes'] ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes']
ctx['prev_frag_downloaded_bytes'] = 0 ctx['prev_frag_downloaded_bytes'] = 0
@ -132,6 +167,8 @@ def frag_progress_hook(s):
def _finish_frag_download(self, ctx): def _finish_frag_download(self, ctx):
ctx['dest_stream'].close() ctx['dest_stream'].close()
if os.path.isfile(encodeFilename(ctx['tmpfilename'] + '.fragindex')):
os.remove(encodeFilename(ctx['tmpfilename'] + '.fragindex'))
elapsed = time.time() - ctx['started'] elapsed = time.time() - ctx['started']
self.try_rename(ctx['tmpfilename'], ctx['filename']) self.try_rename(ctx['tmpfilename'], ctx['filename'])
fsize = os.path.getsize(encodeFilename(ctx['filename'])) fsize = os.path.getsize(encodeFilename(ctx['filename']))

View file

@ -1,6 +1,5 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os.path
import re import re
import binascii import binascii
try: try:
@ -18,8 +17,6 @@
compat_struct_pack, compat_struct_pack,
) )
from ..utils import ( from ..utils import (
encodeFilename,
sanitize_open,
parse_m3u8_attributes, parse_m3u8_attributes,
update_url_query, update_url_query,
) )
@ -103,17 +100,18 @@ def real_download(self, filename, info_dict):
media_sequence = 0 media_sequence = 0
decrypt_info = {'METHOD': 'NONE'} decrypt_info = {'METHOD': 'NONE'}
byte_range = {} byte_range = {}
frags_filenames = [] frag_index = 0
for line in s.splitlines(): for line in s.splitlines():
line = line.strip() line = line.strip()
if line: if line:
if not line.startswith('#'): if not line.startswith('#'):
frag_index += 1
if frag_index <= ctx['frag_index']:
continue
frag_url = ( frag_url = (
line line
if re.match(r'^https?://', line) if re.match(r'^https?://', line)
else compat_urlparse.urljoin(man_url, line)) else compat_urlparse.urljoin(man_url, line))
frag_name = 'Frag%d' % i
frag_filename = '%s-%s' % (ctx['tmpfilename'], frag_name)
if extra_query: if extra_query:
frag_url = update_url_query(frag_url, extra_query) frag_url = update_url_query(frag_url, extra_query)
count = 0 count = 0
@ -122,15 +120,10 @@ def real_download(self, filename, info_dict):
headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end']) headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'])
while count <= fragment_retries: while count <= fragment_retries:
try: try:
success = ctx['dl'].download(frag_filename, { success, frag_content = self._download_fragment(
'url': frag_url, ctx, frag_url, info_dict, headers)
'http_headers': headers,
})
if not success: if not success:
return False return False
down, frag_sanitized = sanitize_open(frag_filename, 'rb')
frag_content = down.read()
down.close()
break break
except compat_urllib_error.HTTPError as err: except compat_urllib_error.HTTPError as err:
# Unavailable (possibly temporary) fragments may be served. # Unavailable (possibly temporary) fragments may be served.
@ -139,28 +132,29 @@ def real_download(self, filename, info_dict):
# https://github.com/rg3/youtube-dl/issues/10448). # https://github.com/rg3/youtube-dl/issues/10448).
count += 1 count += 1
if count <= fragment_retries: if count <= fragment_retries:
self.report_retry_fragment(err, frag_name, count, fragment_retries) self.report_retry_fragment(err, frag_index, count, fragment_retries)
if count > fragment_retries: if count > fragment_retries:
if skip_unavailable_fragments: if skip_unavailable_fragments:
i += 1 i += 1
media_sequence += 1 media_sequence += 1
self.report_skip_fragment(frag_name) self.report_skip_fragment(frag_index)
continue continue
self.report_error( self.report_error(
'giving up after %s fragment retries' % fragment_retries) 'giving up after %s fragment retries' % fragment_retries)
return False return False
if decrypt_info['METHOD'] == 'AES-128': if decrypt_info['METHOD'] == 'AES-128':
iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence) iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence)
decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(decrypt_info['URI']).read()
frag_content = AES.new( frag_content = AES.new(
decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content) decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
ctx['dest_stream'].write(frag_content) self._append_fragment(ctx, frag_content)
frags_filenames.append(frag_sanitized)
# We only download the first fragment during the test # We only download the first fragment during the test
if test: if test:
break break
i += 1 i += 1
media_sequence += 1 media_sequence += 1
elif line.startswith('#EXT-X-KEY'): elif line.startswith('#EXT-X-KEY'):
decrypt_url = decrypt_info.get('URI')
decrypt_info = parse_m3u8_attributes(line[11:]) decrypt_info = parse_m3u8_attributes(line[11:])
if decrypt_info['METHOD'] == 'AES-128': if decrypt_info['METHOD'] == 'AES-128':
if 'IV' in decrypt_info: if 'IV' in decrypt_info:
@ -170,7 +164,8 @@ def real_download(self, filename, info_dict):
man_url, decrypt_info['URI']) man_url, decrypt_info['URI'])
if extra_query: if extra_query:
decrypt_info['URI'] = update_url_query(decrypt_info['URI'], extra_query) decrypt_info['URI'] = update_url_query(decrypt_info['URI'], extra_query)
decrypt_info['KEY'] = self.ydl.urlopen(decrypt_info['URI']).read() if decrypt_url != decrypt_info['URI']:
decrypt_info['KEY'] = None
elif line.startswith('#EXT-X-MEDIA-SEQUENCE'): elif line.startswith('#EXT-X-MEDIA-SEQUENCE'):
media_sequence = int(line[22:]) media_sequence = int(line[22:])
elif line.startswith('#EXT-X-BYTERANGE'): elif line.startswith('#EXT-X-BYTERANGE'):
@ -183,7 +178,4 @@ def real_download(self, filename, info_dict):
self._finish_frag_download(ctx) self._finish_frag_download(ctx)
for frag_file in frags_filenames:
os.remove(encodeFilename(frag_file))
return True return True

View file

@ -20,10 +20,14 @@
class HttpFD(FileDownloader): class HttpFD(FileDownloader):
def real_download(self, filename, info_dict): def real_download(self, filename_or_stream, info_dict):
url = info_dict['url'] url = info_dict['url']
tmpfilename = self.temp_name(filename) filename = filename_or_stream
stream = None stream = None
if hasattr(filename_or_stream, 'write'):
stream = filename_or_stream
filename = '-'
tmpfilename = self.temp_name(filename)
# Do not include the Accept-Encoding header # Do not include the Accept-Encoding header
headers = {'Youtubedl-no-compression': 'True'} headers = {'Youtubedl-no-compression': 'True'}

View file

@ -1,6 +1,5 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os
import time import time
import struct import struct
import binascii import binascii
@ -8,10 +7,6 @@
from .fragment import FragmentFD from .fragment import FragmentFD
from ..compat import compat_urllib_error from ..compat import compat_urllib_error
from ..utils import (
sanitize_open,
encodeFilename,
)
u8 = struct.Struct(b'>B') u8 = struct.Struct(b'>B')
@ -225,16 +220,15 @@ def real_download(self, filename, info_dict):
self._prepare_and_start_frag_download(ctx) self._prepare_and_start_frag_download(ctx)
segments_filenames = []
fragment_retries = self.params.get('fragment_retries', 0) fragment_retries = self.params.get('fragment_retries', 0)
skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True) skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
track_written = False track_written = False
frag_index = 0
for i, segment in enumerate(segments): for i, segment in enumerate(segments):
segment_url = segment['url'] frag_index += 1
segment_name = 'Frag%d' % i if frag_index <= ctx['frag_index']:
target_filename = '%s-%s' % (ctx['tmpfilename'], segment_name) continue
count = 0 count = 0
while count <= fragment_retries: while count <= fragment_retries:
try: try:
@ -242,33 +236,27 @@ def real_download(self, filename, info_dict):
'url': segment_url, 'url': segment_url,
'http_headers': info_dict.get('http_headers'), 'http_headers': info_dict.get('http_headers'),
}) })
success, frag_content = self._download_fragment(ctx, segment['url'], info_dict)
if not success: if not success:
return False return False
down, target_sanitized = sanitize_open(target_filename, 'rb')
down_data = down.read()
if not track_written: if not track_written:
tfhd_data = extract_box_data(down_data, [b'moof', b'traf', b'tfhd']) tfhd_data = extract_box_data(frag_content, [b'moof', b'traf', b'tfhd'])
info_dict['_download_params']['track_id'] = u32.unpack(tfhd_data[4:8])[0] info_dict['_download_params']['track_id'] = u32.unpack(tfhd_data[4:8])[0]
write_piff_header(ctx['dest_stream'], info_dict['_download_params']) write_piff_header(ctx['dest_stream'], info_dict['_download_params'])
track_written = True track_written = True
ctx['dest_stream'].write(down_data) self._append_fragment(ctx, frag_content)
down.close()
segments_filenames.append(target_sanitized)
break break
except compat_urllib_error.HTTPError as err: except compat_urllib_error.HTTPError as err:
count += 1 count += 1
if count <= fragment_retries: if count <= fragment_retries:
self.report_retry_fragment(err, segment_name, count, fragment_retries) self.report_retry_fragment(err, frag_index, count, fragment_retries)
if count > fragment_retries: if count > fragment_retries:
if skip_unavailable_fragments: if skip_unavailable_fragments:
self.report_skip_fragment(segment_name) self.report_skip_fragment(frag_index)
continue continue
self.report_error('giving up after %s fragment retries' % fragment_retries) self.report_error('giving up after %s fragment retries' % fragment_retries)
return False return False
self._finish_frag_download(ctx) self._finish_frag_download(ctx)
for segment_file in segments_filenames:
os.remove(encodeFilename(segment_file))
return True return True