[core] Fix HTTP headers and cookie handling

- Remove `Cookie` header from `http_headers` immediately after loading into cookiejar
- Restore compat for `--load-info-json` cookies
- Add more tests
- Fix improper passing of Cookie header by `MailRu` extractor

Closes #7558
Authored by: bashonly, pukkandan
This commit is contained in:
bashonly 2023-07-15 15:22:10 -05:00
parent 2b029ca0a9
commit 6c5211cebe
No known key found for this signature in database
GPG key ID: 783F096F253D15B0
5 changed files with 120 additions and 33 deletions

View file

@ -11,7 +11,7 @@
import copy import copy
import json import json
from test.helper import FakeYDL, assertRegexpMatches from test.helper import FakeYDL, assertRegexpMatches, try_rm
from yt_dlp import YoutubeDL from yt_dlp import YoutubeDL
from yt_dlp.compat import compat_os_name from yt_dlp.compat import compat_os_name
from yt_dlp.extractor import YoutubeIE from yt_dlp.extractor import YoutubeIE
@ -24,6 +24,8 @@
int_or_none, int_or_none,
match_filter_func, match_filter_func,
) )
from yt_dlp.utils.traversal import traverse_obj
TEST_URL = 'http://localhost/sample.mp4' TEST_URL = 'http://localhost/sample.mp4'
@ -1227,10 +1229,10 @@ def cookie(name, value, version=None, domain='', path='', secure=False, expires=
_test_url = 'https://yt.dlp/test' _test_url = 'https://yt.dlp/test'
def test(encoded_cookies, cookies, headers=False, round_trip=None, error=None): def test(encoded_cookies, cookies, *, headers=False, round_trip=None, error_re=None):
def _test(): def _test():
ydl.cookiejar.clear() ydl.cookiejar.clear()
ydl._load_cookies(encoded_cookies, from_headers=headers) ydl._load_cookies(encoded_cookies, autoscope=headers)
if headers: if headers:
ydl._apply_header_cookies(_test_url) ydl._apply_header_cookies(_test_url)
data = {'url': _test_url} data = {'url': _test_url}
@ -1245,14 +1247,14 @@ def _test():
ydl.__dict__['_YoutubeDL__header_cookies'] = [] ydl.__dict__['_YoutubeDL__header_cookies'] = []
with self.subTest(msg=encoded_cookies): with self.subTest(msg=encoded_cookies):
if not error: if not error_re:
_test() _test()
return return
with self.assertRaisesRegex(Exception, error): with self.assertRaisesRegex(Exception, error_re):
_test() _test()
test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')]) test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
test('test=value', [cookie('test', 'value')], error='Unscoped cookies are not allowed') test('test=value', [cookie('test', 'value')], error_re=r'Unscoped cookies are not allowed')
test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [ test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'), cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
cookie('cookie2', 'value2', domain='.yt.dlp', path='/')]) cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
@ -1265,9 +1267,76 @@ def _test():
round_trip='name=""; Domain=.yt.dlp') round_trip='name=""; Domain=.yt.dlp')
test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True) test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error='Invalid syntax') test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error_re=r'Invalid syntax')
ydl.deprecated_feature = ydl.report_error ydl.deprecated_feature = ydl.report_error
test('test=value', [], headers=True, error='Passing cookies as a header is a potential security risk') test('test=value', [], headers=True, error_re=r'Passing cookies as a header is a potential security risk')
def test_infojson_cookies(self):
TEST_FILE = 'test_infojson_cookies.info.json'
TEST_URL = 'https://example.com/example.mp4'
COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com'
COOKIE_HEADER = {'Cookie': 'a=b; c=d'}
ydl = FakeYDL()
ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE)
def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False):
fmt = {'url': TEST_URL}
if fmts_header_cookies:
fmt['http_headers'] = COOKIE_HEADER
if cookies_field:
fmt['cookies'] = COOKIES
return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None)
def test(initial_info, note):
result = {}
result['processed'] = ydl.process_ie_result(initial_info)
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
msg=f'No cookies set in cookiejar after initial process when {note}')
ydl.cookiejar.clear()
with open(TEST_FILE) as infojson:
result['loaded'] = ydl.sanitize_info(json.load(infojson), True)
result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False)
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
msg=f'No cookies set in cookiejar after final process when {note}')
ydl.cookiejar.clear()
for key in ('processed', 'loaded', 'final'):
info = result[key]
self.assertIsNone(
traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False),
msg=f'Cookie header not removed in {key} result when {note}')
self.assertEqual(
traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES,
msg=f'No cookies field found in {key} result when {note}')
test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field')
test(make_info(info_header_cookies=True), 'info_dict header cokies')
test(make_info(fmts_header_cookies=True), 'format header cookies')
test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies')
test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields')
test(make_info(cookies_field=True), 'cookies format field')
test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only')
try_rm(TEST_FILE)
def test_add_headers_cookie(self):
def check_for_cookie_header(result):
return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False)
ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}})
ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com
fmt = {'url': 'https://example.com/video.mp4'}
result = ydl.process_ie_result(_make_result([fmt]), download=False)
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict')
self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field')
self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar')
fmt = {'url': 'https://wrong.com/video.mp4'}
result = ydl.process_ie_result(_make_result([fmt]), download=False)
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain')
self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -53,6 +53,14 @@ def test_get_cookie_header(self):
header = cookiejar.get_cookie_header('https://www.foobar.foobar') header = cookiejar.get_cookie_header('https://www.foobar.foobar')
self.assertIn('HTTPONLY_COOKIE', header) self.assertIn('HTTPONLY_COOKIE', header)
def test_get_cookies_for_url(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
cookiejar.load(ignore_discard=True, ignore_expires=True)
cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/')
self.assertEqual(len(cookies), 2)
cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/')
self.assertFalse(cookies)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -680,14 +680,15 @@ def process_color_policy(stream):
self.params['compat_opts'] = set(self.params.get('compat_opts', ())) self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
self.params['http_headers'] = HTTPHeaderDict(std_headers, self.params.get('http_headers')) self.params['http_headers'] = HTTPHeaderDict(std_headers, self.params.get('http_headers'))
self.__header_cookies = []
self._load_cookies(self.params['http_headers'].get('Cookie')) # compat
self.params['http_headers'].pop('Cookie', None)
self._request_director = self.build_request_director( self._request_director = self.build_request_director(
sorted(_REQUEST_HANDLERS.values(), key=lambda rh: rh.RH_NAME.lower())) sorted(_REQUEST_HANDLERS.values(), key=lambda rh: rh.RH_NAME.lower()))
if auto_init and auto_init != 'no_verbose_header': if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header() self.print_debug_header()
self.__header_cookies = []
self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False)) # compat
def check_deprecated(param, option, suggestion): def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None: if self.params.get(param) is not None:
self.report_warning(f'{option} is deprecated. Use {suggestion} instead') self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
@ -1645,18 +1646,19 @@ def progress(msg):
self.to_screen('') self.to_screen('')
raise raise
def _load_cookies(self, data, *, from_headers=True): def _load_cookies(self, data, *, autoscope=True):
"""Loads cookies from a `Cookie` header """Loads cookies from a `Cookie` header
This tries to work around the security vulnerability of passing cookies to every domain. This tries to work around the security vulnerability of passing cookies to every domain.
See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
The unscoped cookies are saved for later to be stored in the jar with a limited scope.
@param data The Cookie header as string to load the cookies from @param data The Cookie header as string to load the cookies from
@param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required) @param autoscope If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains
If `True`, save cookies for later to be stored in the jar with a limited scope
If a URL, save cookies in the jar with the domain of the URL
""" """
for cookie in LenientSimpleCookie(data).values(): for cookie in LenientSimpleCookie(data).values():
if from_headers and any(cookie.values()): if autoscope and any(cookie.values()):
raise ValueError('Invalid syntax in Cookie Header') raise ValueError('Invalid syntax in Cookie Header')
domain = cookie.get('domain') or '' domain = cookie.get('domain') or ''
@ -1670,17 +1672,23 @@ def _load_cookies(self, data, *, from_headers=True):
if domain: if domain:
self.cookiejar.set_cookie(prepared_cookie) self.cookiejar.set_cookie(prepared_cookie)
elif from_headers: elif autoscope is True:
self.deprecated_feature( self.deprecated_feature(
'Passing cookies as a header is a potential security risk; ' 'Passing cookies as a header is a potential security risk; '
'they will be scoped to the domain of the downloaded urls. ' 'they will be scoped to the domain of the downloaded urls. '
'Please consider loading cookies from a file or browser instead.') 'Please consider loading cookies from a file or browser instead.')
self.__header_cookies.append(prepared_cookie) self.__header_cookies.append(prepared_cookie)
elif autoscope:
self.report_warning(
'The extractor result contains an unscoped cookie as an HTTP header. '
f'If you are using yt-dlp with an input URL{bug_reports_message(before=",")}',
only_once=True)
self._apply_header_cookies(autoscope, [prepared_cookie])
else: else:
self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping', self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping',
tb=False, is_error=False) tb=False, is_error=False)
def _apply_header_cookies(self, url): def _apply_header_cookies(self, url, cookies=None):
"""Applies stray header cookies to the provided url """Applies stray header cookies to the provided url
This loads header cookies and scopes them to the domain provided in `url`. This loads header cookies and scopes them to the domain provided in `url`.
@ -1691,7 +1699,7 @@ def _apply_header_cookies(self, url):
if not parsed.hostname: if not parsed.hostname:
return return
for cookie in map(copy.copy, self.__header_cookies): for cookie in map(copy.copy, cookies or self.__header_cookies):
cookie.domain = f'.{parsed.hostname}' cookie.domain = f'.{parsed.hostname}'
self.cookiejar.set_cookie(cookie) self.cookiejar.set_cookie(cookie)
@ -2481,9 +2489,16 @@ def restore_last_token(self):
parsed_selector = _parse_format_selection(iter(TokenIterator(tokens))) parsed_selector = _parse_format_selection(iter(TokenIterator(tokens)))
return _build_selector_function(parsed_selector) return _build_selector_function(parsed_selector)
def _calc_headers(self, info_dict): def _calc_headers(self, info_dict, load_cookies=False):
res = HTTPHeaderDict(self.params['http_headers'], info_dict.get('http_headers')) res = HTTPHeaderDict(self.params['http_headers'], info_dict.get('http_headers'))
clean_headers(res) clean_headers(res)
if load_cookies: # For --load-info-json
self._load_cookies(res.get('Cookie'), autoscope=info_dict['url']) # compat
self._load_cookies(info_dict.get('cookies'), autoscope=False)
# The `Cookie` header is removed to prevent leaks and unscoped cookies.
# See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
res.pop('Cookie', None)
cookies = self.cookiejar.get_cookies_for_url(info_dict['url']) cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
if cookies: if cookies:
encoder = LenientSimpleCookie() encoder = LenientSimpleCookie()
@ -2762,7 +2777,12 @@ def is_wellformed(f):
and info_dict.get('duration') and format.get('tbr') and info_dict.get('duration') and format.get('tbr')
and not format.get('filesize') and not format.get('filesize_approx')): and not format.get('filesize') and not format.get('filesize_approx')):
format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8)) format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8))
format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict)) format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True)
# Safeguard against old/insecure infojson when using --load-info-json
if info_dict.get('http_headers'):
info_dict['http_headers'] = HTTPHeaderDict(info_dict['http_headers'])
info_dict['http_headers'].pop('Cookie', None)
# This is copied to http_headers by the above _calc_headers and can now be removed # This is copied to http_headers by the above _calc_headers and can now be removed
if '__x_forwarded_for_ip' in info_dict: if '__x_forwarded_for_ip' in info_dict:
@ -3508,8 +3528,6 @@ def download_with_info_file(self, info_filename):
infos = [self.sanitize_info(info, self.params.get('clean_infojson', True)) infos = [self.sanitize_info(info, self.params.get('clean_infojson', True))
for info in variadic(json.loads('\n'.join(f)))] for info in variadic(json.loads('\n'.join(f)))]
for info in infos: for info in infos:
self._load_cookies(info.get('cookies'), from_headers=False)
self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False)) # compat
try: try:
self.__download_wrapper(self.process_ie_result)(info, download=True) self.__download_wrapper(self.process_ie_result)(info, download=True)
except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e: except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e:

View file

@ -32,7 +32,6 @@
timetuple_from_msec, timetuple_from_msec,
try_call, try_call,
) )
from ..utils.traversal import traverse_obj
class FileDownloader: class FileDownloader:
@ -453,11 +452,6 @@ def download(self, filename, info_dict, subtitle=False):
self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...') self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...')
time.sleep(sleep_interval) time.sleep(sleep_interval)
# Filter the `Cookie` header from the info_dict to prevent leaks.
# See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
info_dict['http_headers'] = dict(traverse_obj(info_dict, (
'http_headers', {dict.items}, lambda _, pair: pair[0].lower() != 'cookie'))) or None
ret = self.real_download(filename, info_dict) ret = self.real_download(filename, info_dict)
self._finish_multiline_status() self._finish_multiline_status()
return ret, True return ret, True

View file

@ -1,6 +1,7 @@
import itertools import itertools
import json import json
import re import re
import urllib.parse
from .common import InfoExtractor from .common import InfoExtractor
from ..compat import compat_urllib_parse_unquote from ..compat import compat_urllib_parse_unquote
@ -140,17 +141,15 @@ def _real_extract(self, url):
'http://api.video.mail.ru/videos/%s.json?new=1' % video_id, 'http://api.video.mail.ru/videos/%s.json?new=1' % video_id,
video_id, 'Downloading video JSON') video_id, 'Downloading video JSON')
headers = {}
video_key = self._get_cookies('https://my.mail.ru').get('video_key') video_key = self._get_cookies('https://my.mail.ru').get('video_key')
if video_key:
headers['Cookie'] = 'video_key=%s' % video_key.value
formats = [] formats = []
for f in video_data['videos']: for f in video_data['videos']:
video_url = f.get('url') video_url = f.get('url')
if not video_url: if not video_url:
continue continue
if video_key:
self._set_cookie(urllib.parse.urlparse(video_url).hostname, 'video_key', video_key.value)
format_id = f.get('key') format_id = f.get('key')
height = int_or_none(self._search_regex( height = int_or_none(self._search_regex(
r'^(\d+)[pP]$', format_id, 'height', default=None)) if format_id else None r'^(\d+)[pP]$', format_id, 'height', default=None)) if format_id else None
@ -158,7 +157,6 @@ def _real_extract(self, url):
'url': video_url, 'url': video_url,
'format_id': format_id, 'format_id': format_id,
'height': height, 'height': height,
'http_headers': headers,
}) })
meta_data = video_data['meta'] meta_data = video_data['meta']