[networking] Remove dot segments during URL normalization (#7662)

This implements RFC3986 5.2.4 remove_dot_segments during the URL normalization process.

Closes #3355, #6526

Authored by: coletdjnz
This commit is contained in:
coletdjnz 2023-07-29 10:40:20 +12:00 committed by GitHub
parent a15fcd299e
commit 4bf912282a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 104 additions and 36 deletions

View file

@ -173,6 +173,12 @@ def do_GET(self):
self.send_header('Location', self.path) self.send_header('Location', self.path)
self.send_header('Content-Length', '0') self.send_header('Content-Length', '0')
self.end_headers() self.end_headers()
elif self.path == '/redirect_dotsegments':
self.send_response(301)
# redirect to /headers but with dot segments before
self.send_header('Location', '/a/b/./../../headers')
self.send_header('Content-Length', '0')
self.end_headers()
elif self.path.startswith('/redirect_'): elif self.path.startswith('/redirect_'):
self._redirect() self._redirect()
elif self.path.startswith('/method'): elif self.path.startswith('/method'):
@ -355,6 +361,21 @@ def test_percent_encode(self, handler):
assert res.status == 200 assert res.status == 200
res.close() res.close()
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
def test_remove_dot_segments(self, handler):
with handler() as rh:
# This isn't a comprehensive test,
# but it should be enough to check whether the handler is removing dot segments
res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/a/b/./../../headers'))
assert res.status == 200
assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
res.close()
res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_dotsegments'))
assert res.status == 200
assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
res.close()
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True) @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
def test_unicode_path_redirection(self, handler): def test_unicode_path_redirection(self, handler):
with handler() as rh: with handler() as rh:

View file

@ -47,8 +47,6 @@
encode_base_n, encode_base_n,
encode_compat_str, encode_compat_str,
encodeFilename, encodeFilename,
escape_rfc3986,
escape_url,
expand_path, expand_path,
extract_attributes, extract_attributes,
extract_basic_auth, extract_basic_auth,
@ -132,7 +130,12 @@
xpath_text, xpath_text,
xpath_with_ns, xpath_with_ns,
) )
from yt_dlp.utils.networking import HTTPHeaderDict from yt_dlp.utils.networking import (
HTTPHeaderDict,
escape_rfc3986,
normalize_url,
remove_dot_segments,
)
class TestUtil(unittest.TestCase): class TestUtil(unittest.TestCase):
@ -933,24 +936,45 @@ def test_escape_rfc3986(self):
self.assertEqual(escape_rfc3986('foo bar'), 'foo%20bar') self.assertEqual(escape_rfc3986('foo bar'), 'foo%20bar')
self.assertEqual(escape_rfc3986('foo%20bar'), 'foo%20bar') self.assertEqual(escape_rfc3986('foo%20bar'), 'foo%20bar')
def test_escape_url(self): def test_normalize_url(self):
self.assertEqual( self.assertEqual(
escape_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'), normalize_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'),
'http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavre%CC%81_FD.mp4' 'http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavre%CC%81_FD.mp4'
) )
self.assertEqual( self.assertEqual(
escape_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'), normalize_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'),
'http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erkl%C3%A4rt/Das-Erste/Video?documentId=22673108&bcastId=5290' 'http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erkl%C3%A4rt/Das-Erste/Video?documentId=22673108&bcastId=5290'
) )
self.assertEqual( self.assertEqual(
escape_url('http://тест.рф/фрагмент'), normalize_url('http://тест.рф/фрагмент'),
'http://xn--e1aybc.xn--p1ai/%D1%84%D1%80%D0%B0%D0%B3%D0%BC%D0%B5%D0%BD%D1%82' 'http://xn--e1aybc.xn--p1ai/%D1%84%D1%80%D0%B0%D0%B3%D0%BC%D0%B5%D0%BD%D1%82'
) )
self.assertEqual( self.assertEqual(
escape_url('http://тест.рф/абв?абв=абв#абв'), normalize_url('http://тест.рф/абв?абв=абв#абв'),
'http://xn--e1aybc.xn--p1ai/%D0%B0%D0%B1%D0%B2?%D0%B0%D0%B1%D0%B2=%D0%B0%D0%B1%D0%B2#%D0%B0%D0%B1%D0%B2' 'http://xn--e1aybc.xn--p1ai/%D0%B0%D0%B1%D0%B2?%D0%B0%D0%B1%D0%B2=%D0%B0%D0%B1%D0%B2#%D0%B0%D0%B1%D0%B2'
) )
self.assertEqual(escape_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0') self.assertEqual(normalize_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
self.assertEqual(normalize_url('http://www.example.com/../a/b/../c/./d.html'), 'http://www.example.com/a/c/d.html')
def test_remove_dot_segments(self):
self.assertEqual(remove_dot_segments('/a/b/c/./../../g'), '/a/g')
self.assertEqual(remove_dot_segments('mid/content=5/../6'), 'mid/6')
self.assertEqual(remove_dot_segments('/ad/../cd'), '/cd')
self.assertEqual(remove_dot_segments('/ad/../cd/'), '/cd/')
self.assertEqual(remove_dot_segments('/..'), '/')
self.assertEqual(remove_dot_segments('/./'), '/')
self.assertEqual(remove_dot_segments('/./a'), '/a')
self.assertEqual(remove_dot_segments('/abc/./.././d/././e/.././f/./../../ghi'), '/ghi')
self.assertEqual(remove_dot_segments('/'), '/')
self.assertEqual(remove_dot_segments('/t'), '/t')
self.assertEqual(remove_dot_segments('t'), 't')
self.assertEqual(remove_dot_segments(''), '')
self.assertEqual(remove_dot_segments('/../a/b/c'), '/a/b/c')
self.assertEqual(remove_dot_segments('../a'), 'a')
self.assertEqual(remove_dot_segments('./a'), 'a')
self.assertEqual(remove_dot_segments('.'), '')
self.assertEqual(remove_dot_segments('////'), '////')
def test_js_to_json_vars_strings(self): def test_js_to_json_vars_strings(self):
self.assertDictEqual( self.assertDictEqual(

View file

@ -33,7 +33,6 @@
from .utils import ( from .utils import (
Popen, Popen,
error_to_str, error_to_str,
escape_url,
expand_path, expand_path,
is_path_like, is_path_like,
sanitize_url, sanitize_url,
@ -42,6 +41,7 @@
write_string, write_string,
) )
from .utils._utils import _YDLLogger from .utils._utils import _YDLLogger
from .utils.networking import normalize_url
CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'} CHROMIUM_BASED_BROWSERS = {'brave', 'chrome', 'chromium', 'edge', 'opera', 'vivaldi'}
SUPPORTED_BROWSERS = CHROMIUM_BASED_BROWSERS | {'firefox', 'safari'} SUPPORTED_BROWSERS = CHROMIUM_BASED_BROWSERS | {'firefox', 'safari'}
@ -1308,7 +1308,7 @@ def prepare_line(line):
def get_cookie_header(self, url): def get_cookie_header(self, url):
"""Generate a Cookie HTTP header for a given url""" """Generate a Cookie HTTP header for a given url"""
cookie_req = urllib.request.Request(escape_url(sanitize_url(url))) cookie_req = urllib.request.Request(normalize_url(sanitize_url(url)))
self.add_cookie_header(cookie_req) self.add_cookie_header(cookie_req)
return cookie_req.get_header('Cookie') return cookie_req.get_header('Cookie')
@ -1317,7 +1317,7 @@ def get_cookies_for_url(self, url):
# Policy `_now` attribute must be set before calling `_cookies_for_request` # Policy `_now` attribute must be set before calling `_cookies_for_request`
# Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360 # Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360
self._policy._now = self._now = int(time.time()) self._policy._now = self._now = int(time.time())
return self._cookies_for_request(urllib.request.Request(escape_url(sanitize_url(url)))) return self._cookies_for_request(urllib.request.Request(normalize_url(sanitize_url(url))))
def clear(self, *args, **kwargs): def clear(self, *args, **kwargs):
with contextlib.suppress(KeyError): with contextlib.suppress(KeyError):

View file

@ -41,7 +41,8 @@
from ..dependencies import brotli from ..dependencies import brotli
from ..socks import ProxyError as SocksProxyError from ..socks import ProxyError as SocksProxyError
from ..socks import sockssocket from ..socks import sockssocket
from ..utils import escape_url, update_url_query from ..utils import update_url_query
from ..utils.networking import normalize_url
SUPPORTED_ENCODINGS = ['gzip', 'deflate'] SUPPORTED_ENCODINGS = ['gzip', 'deflate']
CONTENT_DECODE_ERRORS = [zlib.error, OSError] CONTENT_DECODE_ERRORS = [zlib.error, OSError]
@ -179,7 +180,7 @@ def http_request(self, req):
# Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09) # Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
# the code of this workaround has been moved here from YoutubeDL.urlopen() # the code of this workaround has been moved here from YoutubeDL.urlopen()
url = req.get_full_url() url = req.get_full_url()
url_escaped = escape_url(url) url_escaped = normalize_url(url)
# Substitute URL if any change after escaping # Substitute URL if any change after escaping
if url != url_escaped: if url != url_escaped:
@ -212,7 +213,7 @@ def http_response(self, req, resp):
if location: if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3 # As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
location = location.encode('iso-8859-1').decode() location = location.encode('iso-8859-1').decode()
location_escaped = escape_url(location) location_escaped = normalize_url(location)
if location != location_escaped: if location != location_escaped:
del resp.headers['Location'] del resp.headers['Location']
resp.headers['Location'] = location_escaped resp.headers['Location'] = location_escaped

View file

@ -27,10 +27,9 @@
classproperty, classproperty,
deprecation_warning, deprecation_warning,
error_to_str, error_to_str,
escape_url,
update_url_query, update_url_query,
) )
from ..utils.networking import HTTPHeaderDict from ..utils.networking import HTTPHeaderDict, normalize_url
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
RequestData = bytes | Iterable[bytes] | typing.IO | None RequestData = bytes | Iterable[bytes] | typing.IO | None
@ -372,7 +371,7 @@ def url(self, url):
raise TypeError('url must be a string') raise TypeError('url must be a string')
elif url.startswith('//'): elif url.startswith('//'):
url = 'http:' + url url = 'http:' + url
self._url = escape_url(url) self._url = normalize_url(url)
@property @property
def method(self): def method(self):

View file

@ -8,6 +8,8 @@
import zlib import zlib
from ._utils import Popen, decode_base_n, preferredencoding from ._utils import Popen, decode_base_n, preferredencoding
from .networking import escape_rfc3986 # noqa: F401
from .networking import normalize_url as escape_url # noqa: F401
from .traversal import traverse_obj from .traversal import traverse_obj
from ..dependencies import certifi, websockets from ..dependencies import certifi, websockets
from ..networking._helper import make_ssl_context from ..networking._helper import make_ssl_context
@ -197,7 +199,7 @@ def request_to_url(req):
def sanitized_Request(url, *args, **kwargs): def sanitized_Request(url, *args, **kwargs):
from ..utils import escape_url, extract_basic_auth, sanitize_url from ..utils import extract_basic_auth, sanitize_url
url, auth_header = extract_basic_auth(escape_url(sanitize_url(url))) url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
if auth_header is not None: if auth_header is not None:
headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {}) headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})

View file

@ -2464,23 +2464,6 @@ def lowercase_escape(s):
s) s)
def escape_rfc3986(s):
"""Escape non-ASCII characters as suggested by RFC 3986"""
return urllib.parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
def escape_url(url):
"""Escape URL as suggested by RFC 3986"""
url_parsed = urllib.parse.urlparse(url)
return url_parsed._replace(
netloc=url_parsed.netloc.encode('idna').decode('ascii'),
path=escape_rfc3986(url_parsed.path),
params=escape_rfc3986(url_parsed.params),
query=escape_rfc3986(url_parsed.query),
fragment=escape_rfc3986(url_parsed.fragment)
).geturl()
def parse_qs(url, **kwargs): def parse_qs(url, **kwargs):
return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs) return urllib.parse.parse_qs(urllib.parse.urlparse(url).query, **kwargs)

View file

@ -121,3 +121,41 @@ def clean_headers(headers: HTTPHeaderDict):
if 'Youtubedl-No-Compression' in headers: # compat if 'Youtubedl-No-Compression' in headers: # compat
del headers['Youtubedl-No-Compression'] del headers['Youtubedl-No-Compression']
headers['Accept-Encoding'] = 'identity' headers['Accept-Encoding'] = 'identity'
def remove_dot_segments(path):
# Implements RFC3986 5.2.4 remote_dot_segments
# Pseudo-code: https://tools.ietf.org/html/rfc3986#section-5.2.4
# https://github.com/urllib3/urllib3/blob/ba49f5c4e19e6bca6827282feb77a3c9f937e64b/src/urllib3/util/url.py#L263
output = []
segments = path.split('/')
for s in segments:
if s == '.':
continue
elif s == '..':
if output:
output.pop()
else:
output.append(s)
if not segments[0] and (not output or output[0]):
output.insert(0, '')
if segments[-1] in ('.', '..'):
output.append('')
return '/'.join(output)
def escape_rfc3986(s):
"""Escape non-ASCII characters as suggested by RFC 3986"""
return urllib.parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
def normalize_url(url):
"""Normalize URL as suggested by RFC 3986"""
url_parsed = urllib.parse.urlparse(url)
return url_parsed._replace(
netloc=url_parsed.netloc.encode('idna').decode('ascii'),
path=escape_rfc3986(remove_dot_segments(url_parsed.path)),
params=escape_rfc3986(url_parsed.params),
query=escape_rfc3986(url_parsed.query),
fragment=escape_rfc3986(url_parsed.fragment)
).geturl()