mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-11-16 05:13:21 +00:00
380 lines
16 KiB
Python
380 lines
16 KiB
Python
|
import abc
|
||
|
import base64
|
||
|
import contextlib
|
||
|
import functools
|
||
|
import json
|
||
|
import os
|
||
|
import random
|
||
|
import ssl
|
||
|
import threading
|
||
|
from http.server import BaseHTTPRequestHandler
|
||
|
from socketserver import ThreadingTCPServer
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
from test.helper import http_server_port, verify_address_availability
|
||
|
from test.test_networking import TEST_DIR
|
||
|
from test.test_socks import IPv6ThreadingTCPServer
|
||
|
from yt_dlp.dependencies import urllib3
|
||
|
from yt_dlp.networking import Request
|
||
|
from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError
|
||
|
|
||
|
|
||
|
class HTTPProxyAuthMixin:
|
||
|
|
||
|
def proxy_auth_error(self):
|
||
|
self.send_response(407)
|
||
|
self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"')
|
||
|
self.end_headers()
|
||
|
return False
|
||
|
|
||
|
def do_proxy_auth(self, username, password):
|
||
|
if username is None and password is None:
|
||
|
return True
|
||
|
|
||
|
proxy_auth_header = self.headers.get('Proxy-Authorization', None)
|
||
|
if proxy_auth_header is None:
|
||
|
return self.proxy_auth_error()
|
||
|
|
||
|
if not proxy_auth_header.startswith('Basic '):
|
||
|
return self.proxy_auth_error()
|
||
|
|
||
|
auth = proxy_auth_header[6:]
|
||
|
|
||
|
try:
|
||
|
auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1)
|
||
|
except Exception:
|
||
|
return self.proxy_auth_error()
|
||
|
|
||
|
if auth_username != (username or '') or auth_password != (password or ''):
|
||
|
return self.proxy_auth_error()
|
||
|
return True
|
||
|
|
||
|
|
||
|
class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
|
||
|
def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs):
|
||
|
self.username = username
|
||
|
self.password = password
|
||
|
self.proxy_info = proxy_info
|
||
|
super().__init__(*args, **kwargs)
|
||
|
|
||
|
def do_GET(self):
|
||
|
if not self.do_proxy_auth(self.username, self.password):
|
||
|
self.server.close_request(self.request)
|
||
|
return
|
||
|
if self.path.endswith('/proxy_info'):
|
||
|
payload = json.dumps(self.proxy_info or {
|
||
|
'client_address': self.client_address,
|
||
|
'connect': False,
|
||
|
'connect_host': None,
|
||
|
'connect_port': None,
|
||
|
'headers': dict(self.headers),
|
||
|
'path': self.path,
|
||
|
'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
|
||
|
})
|
||
|
self.send_response(200)
|
||
|
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
||
|
self.send_header('Content-Length', str(len(payload)))
|
||
|
self.end_headers()
|
||
|
self.wfile.write(payload.encode())
|
||
|
else:
|
||
|
self.send_response(404)
|
||
|
self.end_headers()
|
||
|
|
||
|
self.server.close_request(self.request)
|
||
|
|
||
|
|
||
|
if urllib3:
|
||
|
import urllib3.util.ssltransport
|
||
|
|
||
|
class SSLTransport(urllib3.util.ssltransport.SSLTransport):
|
||
|
"""
|
||
|
Modified version of urllib3 SSLTransport to support server side SSL
|
||
|
|
||
|
This allows us to chain multiple TLS connections.
|
||
|
"""
|
||
|
def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False):
|
||
|
self.incoming = ssl.MemoryBIO()
|
||
|
self.outgoing = ssl.MemoryBIO()
|
||
|
|
||
|
self.suppress_ragged_eofs = suppress_ragged_eofs
|
||
|
self.socket = socket
|
||
|
|
||
|
self.sslobj = ssl_context.wrap_bio(
|
||
|
self.incoming,
|
||
|
self.outgoing,
|
||
|
server_hostname=server_hostname,
|
||
|
server_side=server_side
|
||
|
)
|
||
|
self._ssl_io_loop(self.sslobj.do_handshake)
|
||
|
|
||
|
@property
|
||
|
def _io_refs(self):
|
||
|
return self.socket._io_refs
|
||
|
|
||
|
@_io_refs.setter
|
||
|
def _io_refs(self, value):
|
||
|
self.socket._io_refs = value
|
||
|
|
||
|
def shutdown(self, *args, **kwargs):
|
||
|
self.socket.shutdown(*args, **kwargs)
|
||
|
else:
|
||
|
SSLTransport = None
|
||
|
|
||
|
|
||
|
class HTTPSProxyHandler(HTTPProxyHandler):
|
||
|
def __init__(self, request, *args, **kwargs):
|
||
|
certfn = os.path.join(TEST_DIR, 'testcert.pem')
|
||
|
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||
|
sslctx.load_cert_chain(certfn, None)
|
||
|
if isinstance(request, ssl.SSLSocket):
|
||
|
request = SSLTransport(request, ssl_context=sslctx, server_side=True)
|
||
|
else:
|
||
|
request = sslctx.wrap_socket(request, server_side=True)
|
||
|
super().__init__(request, *args, **kwargs)
|
||
|
|
||
|
|
||
|
class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
|
||
|
protocol_version = 'HTTP/1.1'
|
||
|
default_request_version = 'HTTP/1.1'
|
||
|
|
||
|
def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs):
|
||
|
self.username = username
|
||
|
self.password = password
|
||
|
self.request_handler = request_handler
|
||
|
super().__init__(*args, **kwargs)
|
||
|
|
||
|
def do_CONNECT(self):
|
||
|
if not self.do_proxy_auth(self.username, self.password):
|
||
|
self.server.close_request(self.request)
|
||
|
return
|
||
|
self.send_response(200)
|
||
|
self.end_headers()
|
||
|
proxy_info = {
|
||
|
'client_address': self.client_address,
|
||
|
'connect': True,
|
||
|
'connect_host': self.path.split(':')[0],
|
||
|
'connect_port': int(self.path.split(':')[1]),
|
||
|
'headers': dict(self.headers),
|
||
|
'path': self.path,
|
||
|
'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
|
||
|
}
|
||
|
self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info)
|
||
|
self.server.close_request(self.request)
|
||
|
|
||
|
|
||
|
class HTTPSConnectProxyHandler(HTTPConnectProxyHandler):
|
||
|
def __init__(self, request, *args, **kwargs):
|
||
|
certfn = os.path.join(TEST_DIR, 'testcert.pem')
|
||
|
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||
|
sslctx.load_cert_chain(certfn, None)
|
||
|
request = sslctx.wrap_socket(request, server_side=True)
|
||
|
self._original_request = request
|
||
|
super().__init__(request, *args, **kwargs)
|
||
|
|
||
|
def do_CONNECT(self):
|
||
|
super().do_CONNECT()
|
||
|
self.server.close_request(self._original_request)
|
||
|
|
||
|
|
||
|
@contextlib.contextmanager
|
||
|
def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs):
|
||
|
server = server_thread = None
|
||
|
try:
|
||
|
bind_address = bind_ip or '127.0.0.1'
|
||
|
server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
|
||
|
server = server_type(
|
||
|
(bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs))
|
||
|
server_port = http_server_port(server)
|
||
|
server_thread = threading.Thread(target=server.serve_forever)
|
||
|
server_thread.daemon = True
|
||
|
server_thread.start()
|
||
|
if '.' not in bind_address:
|
||
|
yield f'[{bind_address}]:{server_port}'
|
||
|
else:
|
||
|
yield f'{bind_address}:{server_port}'
|
||
|
finally:
|
||
|
server.shutdown()
|
||
|
server.server_close()
|
||
|
server_thread.join(2.0)
|
||
|
|
||
|
|
||
|
class HTTPProxyTestContext(abc.ABC):
|
||
|
REQUEST_HANDLER_CLASS = None
|
||
|
REQUEST_PROTO = None
|
||
|
|
||
|
def http_server(self, server_class, *args, **kwargs):
|
||
|
return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
|
||
|
|
||
|
@abc.abstractmethod
|
||
|
def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
|
||
|
"""return a dict of proxy_info"""
|
||
|
|
||
|
|
||
|
class HTTPProxyHTTPTestContext(HTTPProxyTestContext):
|
||
|
# Standard HTTP Proxy for http requests
|
||
|
REQUEST_HANDLER_CLASS = HTTPProxyHandler
|
||
|
REQUEST_PROTO = 'http'
|
||
|
|
||
|
def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
|
||
|
request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
|
||
|
handler.validate(request)
|
||
|
return json.loads(handler.send(request).read().decode())
|
||
|
|
||
|
|
||
|
class HTTPProxyHTTPSTestContext(HTTPProxyTestContext):
|
||
|
# HTTP Connect proxy, for https requests
|
||
|
REQUEST_HANDLER_CLASS = HTTPSProxyHandler
|
||
|
REQUEST_PROTO = 'https'
|
||
|
|
||
|
def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
|
||
|
request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
|
||
|
handler.validate(request)
|
||
|
return json.loads(handler.send(request).read().decode())
|
||
|
|
||
|
|
||
|
CTX_MAP = {
|
||
|
'http': HTTPProxyHTTPTestContext,
|
||
|
'https': HTTPProxyHTTPSTestContext,
|
||
|
}
|
||
|
|
||
|
|
||
|
@pytest.fixture(scope='module')
|
||
|
def ctx(request):
|
||
|
return CTX_MAP[request.param]()
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
|
||
|
@pytest.mark.parametrize('ctx', ['http'], indirect=True) # pure http proxy can only support http
|
||
|
class TestHTTPProxy:
|
||
|
def test_http_no_auth(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPProxyHandler) as server_address:
|
||
|
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert proxy_info['connect'] is False
|
||
|
assert 'Proxy-Authorization' not in proxy_info['headers']
|
||
|
|
||
|
def test_http_auth(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
|
||
|
with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert 'Proxy-Authorization' in proxy_info['headers']
|
||
|
|
||
|
def test_http_bad_auth(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
|
||
|
with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
|
||
|
with pytest.raises(HTTPError) as exc_info:
|
||
|
ctx.proxy_info_request(rh)
|
||
|
assert exc_info.value.response.status == 407
|
||
|
exc_info.value.response.close()
|
||
|
|
||
|
def test_http_source_address(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPProxyHandler) as server_address:
|
||
|
source_address = f'127.0.0.{random.randint(5, 255)}'
|
||
|
verify_address_availability(source_address)
|
||
|
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
|
||
|
source_address=source_address) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert proxy_info['client_address'][0] == source_address
|
||
|
|
||
|
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
|
||
|
def test_https(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPSProxyHandler) as server_address:
|
||
|
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert proxy_info['connect'] is False
|
||
|
assert 'Proxy-Authorization' not in proxy_info['headers']
|
||
|
|
||
|
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
|
||
|
def test_https_verify_failed(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPSProxyHandler) as server_address:
|
||
|
with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
|
||
|
# Accept SSLError as may not be feasible to tell if it is proxy or request error.
|
||
|
# note: if request proto also does ssl verification, this may also be the error of the request.
|
||
|
# Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
|
||
|
with pytest.raises((ProxyError, SSLError)):
|
||
|
ctx.proxy_info_request(rh)
|
||
|
|
||
|
def test_http_with_idn(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPProxyHandler) as server_address:
|
||
|
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw')
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert proxy_info['path'].startswith('http://xn--fiq228c.tw')
|
||
|
assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw'
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
'handler,ctx', [
|
||
|
('Requests', 'https'),
|
||
|
('CurlCFFI', 'https'),
|
||
|
], indirect=True)
|
||
|
class TestHTTPConnectProxy:
|
||
|
def test_http_connect_no_auth(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPConnectProxyHandler) as server_address:
|
||
|
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert proxy_info['connect'] is True
|
||
|
assert 'Proxy-Authorization' not in proxy_info['headers']
|
||
|
|
||
|
def test_http_connect_auth(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
|
||
|
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert 'Proxy-Authorization' in proxy_info['headers']
|
||
|
|
||
|
@pytest.mark.skip_handler(
|
||
|
'Requests',
|
||
|
'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374'
|
||
|
)
|
||
|
def test_http_connect_bad_auth(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
|
||
|
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
|
||
|
with pytest.raises(ProxyError):
|
||
|
ctx.proxy_info_request(rh)
|
||
|
|
||
|
def test_http_connect_source_address(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPConnectProxyHandler) as server_address:
|
||
|
source_address = f'127.0.0.{random.randint(5, 255)}'
|
||
|
verify_address_availability(source_address)
|
||
|
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
|
||
|
source_address=source_address,
|
||
|
verify=False) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert proxy_info['client_address'][0] == source_address
|
||
|
|
||
|
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
|
||
|
def test_https_connect_proxy(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
|
||
|
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert proxy_info['connect'] is True
|
||
|
assert 'Proxy-Authorization' not in proxy_info['headers']
|
||
|
|
||
|
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
|
||
|
def test_https_connect_verify_failed(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
|
||
|
with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
|
||
|
# Accept SSLError as may not be feasible to tell if it is proxy or request error.
|
||
|
# note: if request proto also does ssl verification, this may also be the error of the request.
|
||
|
# Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
|
||
|
with pytest.raises((ProxyError, SSLError)):
|
||
|
ctx.proxy_info_request(rh)
|
||
|
|
||
|
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
|
||
|
def test_https_connect_proxy_auth(self, handler, ctx):
|
||
|
with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address:
|
||
|
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh:
|
||
|
proxy_info = ctx.proxy_info_request(rh)
|
||
|
assert proxy_info['proxy'] == server_address
|
||
|
assert 'Proxy-Authorization' in proxy_info['headers']
|