[networking] Add proxy_client_cert, proxy_verify and legacy_proxy_ssl_support options

This commit is contained in:
coletdjnz 2024-09-08 15:55:55 +12:00
parent 46f4c80bc3
commit 4accb0befe
No known key found for this signature in database
GPG key ID: 91984263BB39894A
6 changed files with 171 additions and 18 deletions

View file

@ -19,6 +19,8 @@
from yt_dlp.networking import Request from yt_dlp.networking import Request
from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError
MTLS_CERT_DIR = os.path.join(TEST_DIR, 'testdata', 'certificate')
class HTTPProxyAuthMixin: class HTTPProxyAuthMixin:
@ -135,6 +137,21 @@ def __init__(self, request, *args, **kwargs):
super().__init__(request, *args, **kwargs) super().__init__(request, *args, **kwargs)
class MTLSHTTPSProxyHandler(HTTPProxyHandler):
def __init__(self, request, *args, **kwargs):
certfn = os.path.join(TEST_DIR, 'testcert.pem')
cacertfn = os.path.join(MTLS_CERT_DIR, 'ca.crt')
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.verify_mode = ssl.CERT_REQUIRED
sslctx.load_verify_locations(cafile=cacertfn)
sslctx.load_cert_chain(certfn, None)
if isinstance(request, ssl.SSLSocket):
request = SSLTransport(request, ssl_context=sslctx, server_side=True)
else:
request = sslctx.wrap_socket(request, server_side=True)
super().__init__(request, *args, **kwargs)
class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin): class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
protocol_version = 'HTTP/1.1' protocol_version = 'HTTP/1.1'
default_request_version = 'HTTP/1.1' default_request_version = 'HTTP/1.1'
@ -178,6 +195,23 @@ def do_CONNECT(self):
self.server.close_request(self._original_request) self.server.close_request(self._original_request)
class MTLSHTTPSConnectProxyHandler(HTTPConnectProxyHandler):
def __init__(self, request, *args, **kwargs):
certfn = os.path.join(TEST_DIR, 'testcert.pem')
cacertfn = os.path.join(MTLS_CERT_DIR, 'ca.crt')
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.verify_mode = ssl.CERT_REQUIRED
sslctx.load_verify_locations(cafile=cacertfn)
sslctx.load_cert_chain(certfn, None)
request = sslctx.wrap_socket(request, server_side=True)
self._original_request = request
super().__init__(request, *args, **kwargs)
def do_CONNECT(self):
super().do_CONNECT()
self.server.close_request(self._original_request)
@contextlib.contextmanager @contextlib.contextmanager
def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs): def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs):
server = server_thread = None server = server_thread = None
@ -285,7 +319,7 @@ def test_http_source_address(self, handler, ctx):
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies') @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
def test_https(self, handler, ctx): def test_https(self, handler, ctx):
with ctx.http_server(HTTPSProxyHandler) as server_address: with ctx.http_server(HTTPSProxyHandler) as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: with handler(proxy_verify=False, verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh) proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is False assert proxy_info['connect'] is False
@ -294,10 +328,50 @@ def test_https(self, handler, ctx):
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies') @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
def test_https_verify_failed(self, handler, ctx): def test_https_verify_failed(self, handler, ctx):
with ctx.http_server(HTTPSProxyHandler) as server_address: with ctx.http_server(HTTPSProxyHandler) as server_address:
with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: with handler(proxy_verify=True, verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
# Accept SSLError as may not be feasible to tell if it is proxy or request error. # Accept both ProxyError and SSLError as may not be feasible to tell if it is proxy or request error.
# note: if request proto also does ssl verification, this may also be the error of the request. with pytest.raises((ProxyError, SSLError)):
# Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases. ctx.proxy_info_request(rh)
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
@pytest.mark.parametrize('proxy_client_cert', [
{'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithkey.crt')},
{
'client_certificate': os.path.join(MTLS_CERT_DIR, 'client.crt'),
'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'client.key'),
},
{
'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithencryptedkey.crt'),
'client_certificate_password': 'foobar',
},
{
'client_certificate': os.path.join(MTLS_CERT_DIR, 'client.crt'),
'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'clientencrypted.key'),
'client_certificate_password': 'foobar',
},
], ids=['combined_nopass', 'nocombined_nopass', 'combined_pass', 'nocombined_pass'])
def test_https_mtls(self, handler, ctx, proxy_client_cert):
with ctx.http_server(MTLSHTTPSProxyHandler) as server_address:
with handler(
proxy_verify=False,
verify=False,
proxy_client_cert=proxy_client_cert,
proxies={ctx.REQUEST_PROTO: f'https://{server_address}'},
) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is False
assert 'Proxy-Authorization' not in proxy_info['headers']
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
def test_https_mtls_error(self, handler, ctx):
with ctx.http_server(MTLSHTTPSProxyHandler) as server_address:
with handler(
proxy_verify=False,
verify=False,
proxy_client_cert=None,
proxies={ctx.REQUEST_PROTO: f'https://{server_address}'},
) as rh:
with pytest.raises((ProxyError, SSLError)): with pytest.raises((ProxyError, SSLError)):
ctx.proxy_info_request(rh) ctx.proxy_info_request(rh)
@ -331,10 +405,6 @@ def test_http_connect_auth(self, handler, ctx):
assert proxy_info['proxy'] == server_address assert proxy_info['proxy'] == server_address
assert 'Proxy-Authorization' in proxy_info['headers'] assert 'Proxy-Authorization' in proxy_info['headers']
@pytest.mark.skip_handler(
'Requests',
'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374',
)
def test_http_connect_bad_auth(self, handler, ctx): def test_http_connect_bad_auth(self, handler, ctx):
with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address: with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh: with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
@ -355,7 +425,7 @@ def test_http_connect_source_address(self, handler, ctx):
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
def test_https_connect_proxy(self, handler, ctx): def test_https_connect_proxy(self, handler, ctx):
with ctx.http_server(HTTPSConnectProxyHandler) as server_address: with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: with handler(proxy_verify=False, verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh) proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is True assert proxy_info['connect'] is True
@ -364,17 +434,57 @@ def test_https_connect_proxy(self, handler, ctx):
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
def test_https_connect_verify_failed(self, handler, ctx): def test_https_connect_verify_failed(self, handler, ctx):
with ctx.http_server(HTTPSConnectProxyHandler) as server_address: with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh: with handler(proxy_verify=True, verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
# Accept SSLError as may not be feasible to tell if it is proxy or request error. # Accept both ProxyError and SSLError as may not be feasible to tell if it is proxy or request error.
# note: if request proto also does ssl verification, this may also be the error of the request.
# Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
with pytest.raises((ProxyError, SSLError)): with pytest.raises((ProxyError, SSLError)):
ctx.proxy_info_request(rh) ctx.proxy_info_request(rh)
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test') @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
def test_https_connect_proxy_auth(self, handler, ctx): def test_https_connect_proxy_auth(self, handler, ctx):
with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address: with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh: with handler(proxy_verify=False, verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh) proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address assert proxy_info['proxy'] == server_address
assert 'Proxy-Authorization' in proxy_info['headers'] assert 'Proxy-Authorization' in proxy_info['headers']
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
@pytest.mark.parametrize('proxy_client_cert', [
{'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithkey.crt')},
{
'client_certificate': os.path.join(MTLS_CERT_DIR, 'client.crt'),
'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'client.key'),
},
{
'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithencryptedkey.crt'),
'client_certificate_password': 'foobar',
},
{
'client_certificate': os.path.join(MTLS_CERT_DIR, 'client.crt'),
'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'clientencrypted.key'),
'client_certificate_password': 'foobar',
},
], ids=['combined_nopass', 'nocombined_nopass', 'combined_pass', 'nocombined_pass'])
def test_https_connect_mtls(self, handler, ctx, proxy_client_cert):
with ctx.http_server(MTLSHTTPSConnectProxyHandler) as server_address:
with handler(
proxy_verify=False,
verify=False,
proxy_client_cert=proxy_client_cert,
proxies={ctx.REQUEST_PROTO: f'https://{server_address}'},
) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is True
assert 'Proxy-Authorization' not in proxy_info['headers']
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
def test_https_connect_mtls_error(self, handler, ctx):
with ctx.http_server(MTLSHTTPSConnectProxyHandler) as server_address:
with handler(
proxy_verify=False,
verify=False,
proxy_client_cert=None,
proxies={ctx.REQUEST_PROTO: f'https://{server_address}'},
) as rh:
with pytest.raises((ProxyError, SSLError)):
ctx.proxy_info_request(rh)

View file

@ -772,6 +772,10 @@ def test_certificate_nocombined_pass(self, handler):
'client_certificate_password': 'foobar', 'client_certificate_password': 'foobar',
}) })
def test_mtls_required(self, handler):
with pytest.raises(SSLError):
self._run_test(handler)
@pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True) @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase): class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):

View file

@ -356,7 +356,7 @@ def test_request_headers(self, handler):
'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'clientencrypted.key'), 'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'clientencrypted.key'),
'client_certificate_password': 'foobar', 'client_certificate_password': 'foobar',
}, },
)) ), ids=['combined_nopass', 'nocombined_nopass', 'combined_pass', 'nocombined_pass'])
def test_mtls(self, handler, client_cert): def test_mtls(self, handler, client_cert):
with handler( with handler(
# Disable client-side validation of unacceptable self-signed testcert.pem # Disable client-side validation of unacceptable self-signed testcert.pem
@ -366,6 +366,15 @@ def test_mtls(self, handler, client_cert):
) as rh: ) as rh:
ws_validate_and_send(rh, Request(self.mtls_wss_base_url)).close() ws_validate_and_send(rh, Request(self.mtls_wss_base_url)).close()
def test_mtls_required(self, handler):
with handler(
# Disable client-side validation of unacceptable self-signed testcert.pem
# The test is of a check on the server side, so unaffected
verify=False,
) as rh:
with pytest.raises(SSLError):
ws_validate_and_send(rh, Request(self.mtls_wss_base_url))
def test_request_disable_proxy(self, handler): def test_request_disable_proxy(self, handler):
for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['ws']: for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['ws']:
# Given handler is configured with a proxy # Given handler is configured with a proxy

View file

@ -187,7 +187,7 @@ def _send(self, request: Request):
# curl_cffi does not currently set these for proxies # curl_cffi does not currently set these for proxies
session.curl.setopt(CurlOpt.PROXY_CAINFO, certifi.where()) session.curl.setopt(CurlOpt.PROXY_CAINFO, certifi.where())
if not self.verify: if not self.proxy_verify:
session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYPEER, 0) session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYPEER, 0)
session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYHOST, 0) session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYHOST, 0)
@ -202,6 +202,15 @@ def _send(self, request: Request):
if client_certificate_password: if client_certificate_password:
session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password) session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password)
if self._proxy_client_cert:
session.curl.setopt(CurlOpt.PROXY_SSLCERT, self._proxy_client_cert['client_certificate'])
proxy_client_certificate_key = self._proxy_client_cert.get('client_certificate_key')
proxy_client_certificate_password = self._proxy_client_cert.get('client_certificate_password')
if proxy_client_certificate_key:
session.curl.setopt(CurlOpt.PROXY_SSLKEY, proxy_client_certificate_key)
if proxy_client_certificate_password:
session.curl.setopt(CurlOpt.PROXY_KEYPASSWD, proxy_client_certificate_password)
timeout = self._calculate_timeout(request) timeout = self._calculate_timeout(request)
# set CURLOPT_LOW_SPEED_LIMIT and CURLOPT_LOW_SPEED_TIME to act as a read timeout. [1] # set CURLOPT_LOW_SPEED_LIMIT and CURLOPT_LOW_SPEED_TIME to act as a read timeout. [1]
@ -243,6 +252,8 @@ def _send(self, request: Request):
or (e.code == CurlECode.RECV_ERROR and 'CONNECT' in str(e)) or (e.code == CurlECode.RECV_ERROR and 'CONNECT' in str(e))
): ):
raise ProxyError(cause=e) from e raise ProxyError(cause=e) from e
elif e.code == CurlECode.RECV_ERROR and 'SSL' in str(e):
raise SSLError(cause=e) from e
else: else:
raise TransportError(cause=e) from e raise TransportError(cause=e) from e

View file

@ -301,6 +301,7 @@ def _create_instance(self, cookiejar, legacy_ssl_support=None):
session = RequestsSession() session = RequestsSession()
http_adapter = RequestsHTTPAdapter( http_adapter = RequestsHTTPAdapter(
ssl_context=self._make_sslcontext(legacy_ssl_support=legacy_ssl_support), ssl_context=self._make_sslcontext(legacy_ssl_support=legacy_ssl_support),
proxy_ssl_context=self._make_proxy_sslcontext(),
source_address=self.source_address, source_address=self.source_address,
max_retries=urllib3.util.retry.Retry(False), max_retries=urllib3.util.retry.Retry(False),
) )

View file

@ -187,10 +187,14 @@ class RequestHandler(abc.ABC):
@param source_address: Client-side IP address to bind to for requests. @param source_address: Client-side IP address to bind to for requests.
@param verbose: Print debug request and traffic information to stdout. @param verbose: Print debug request and traffic information to stdout.
@param prefer_system_certs: Whether to prefer system certificates over other means (e.g. certifi). @param prefer_system_certs: Whether to prefer system certificates over other means (e.g. certifi).
@param client_cert: SSL client certificate configuration. @param client_cert: SSL client certificate configuration.z
dict with {client_certificate, client_certificate_key, client_certificate_password}
@param proxy_client_cert: SSL client certificate configuration for proxy connections.
dict with {client_certificate, client_certificate_key, client_certificate_password} dict with {client_certificate, client_certificate_key, client_certificate_password}
@param verify: Verify SSL certificates @param verify: Verify SSL certificates
@param proxy_verify: Verify SSL certificates of proxy connections
@param legacy_ssl_support: Enable legacy SSL options such as legacy server connect and older cipher support. @param legacy_ssl_support: Enable legacy SSL options such as legacy server connect and older cipher support.
@param legacy_proxy_ssl_support: Enable legacy SSL options such as legacy server connect and older cipher support for proxy connections.
Some configuration options may be available for individual Requests too. In this case, Some configuration options may be available for individual Requests too. In this case,
either the Request configuration option takes precedence or they are merged. either the Request configuration option takes precedence or they are merged.
@ -230,8 +234,11 @@ def __init__(
verbose: bool = False, verbose: bool = False,
prefer_system_certs: bool = False, prefer_system_certs: bool = False,
client_cert: dict[str, str | None] | None = None, client_cert: dict[str, str | None] | None = None,
proxy_client_cert: dict[str, str | None] | None = None,
verify: bool = True, verify: bool = True,
proxy_verify: bool = True,
legacy_ssl_support: bool = False, legacy_ssl_support: bool = False,
legacy_proxy_ssl_support: bool = False,
**_, **_,
): ):
@ -244,8 +251,11 @@ def __init__(
self.verbose = verbose self.verbose = verbose
self.prefer_system_certs = prefer_system_certs self.prefer_system_certs = prefer_system_certs
self._client_cert = client_cert or {} self._client_cert = client_cert or {}
self._proxy_client_cert = proxy_client_cert or {}
self.verify = verify self.verify = verify
self.proxy_verify = proxy_verify
self.legacy_ssl_support = legacy_ssl_support self.legacy_ssl_support = legacy_ssl_support
self.legacy_proxy_ssl_support = legacy_proxy_ssl_support
super().__init__() super().__init__()
def _make_sslcontext(self, legacy_ssl_support=None): def _make_sslcontext(self, legacy_ssl_support=None):
@ -256,6 +266,14 @@ def _make_sslcontext(self, legacy_ssl_support=None):
**self._client_cert, **self._client_cert,
) )
def _make_proxy_sslcontext(self, legacy_ssl_support=None):
return make_ssl_context(
verify=self.proxy_verify,
legacy_support=legacy_ssl_support if legacy_ssl_support is not None else self.legacy_proxy_ssl_support,
use_certifi=not self.prefer_system_certs,
**self._proxy_client_cert,
)
def _merge_headers(self, request_headers): def _merge_headers(self, request_headers):
return HTTPHeaderDict(self.headers, request_headers) return HTTPHeaderDict(self.headers, request_headers)