Merge branch 'yt-dlp:master' into fix/youtube/remove-android-as-default

This commit is contained in:
bashonly 2024-05-11 18:27:02 -05:00
commit 1c3028c754
No known key found for this signature in database
GPG key ID: 783F096F253D15B0
37 changed files with 1244 additions and 527 deletions

View file

@ -12,6 +12,9 @@ on:
unix:
default: true
type: boolean
linux_static:
default: true
type: boolean
linux_arm:
default: true
type: boolean
@ -27,9 +30,6 @@ on:
windows32:
default: true
type: boolean
meta_files:
default: true
type: boolean
origin:
required: false
default: ''
@ -52,7 +52,11 @@ on:
default: stable
type: string
unix:
description: yt-dlp, yt-dlp.tar.gz, yt-dlp_linux, yt-dlp_linux.zip
description: yt-dlp, yt-dlp.tar.gz
default: true
type: boolean
linux_static:
description: yt-dlp_linux
default: true
type: boolean
linux_arm:
@ -75,10 +79,6 @@ on:
description: yt-dlp_x86.exe
default: true
type: boolean
meta_files:
description: SHA2-256SUMS, SHA2-512SUMS, _update_spec
default: true
type: boolean
origin:
description: Origin
required: false
@ -112,27 +112,9 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- uses: conda-incubator/setup-miniconda@v3
with:
miniforge-variant: Mambaforge
use-mamba: true
channels: conda-forge
auto-update-conda: true
activate-environment: ""
auto-activate-base: false
- name: Install Requirements
run: |
sudo apt -y install zip pandoc man sed
cat > ./requirements.txt << EOF
python=3.10.*
pyinstaller
brotli-python
EOF
python devscripts/install_deps.py --print \
--exclude brotli --exclude brotlicffi \
--include secretstorage >> ./requirements.txt
mamba create -n build --file ./requirements.txt
- name: Prepare
run: |
python devscripts/update-version.py -c "${{ inputs.channel }}" -r "${{ needs.process.outputs.origin }}" "${{ inputs.version }}"
@ -141,30 +123,15 @@ jobs:
- name: Build Unix platform-independent binary
run: |
make all tar
- name: Build Unix standalone binary
shell: bash -l {0}
run: |
unset LD_LIBRARY_PATH # Harmful; set by setup-python
conda activate build
python -m bundle.pyinstaller --onedir
(cd ./dist/yt-dlp_linux && zip -r ../yt-dlp_linux.zip .)
python -m bundle.pyinstaller
mv ./dist/yt-dlp_linux ./yt-dlp_linux
mv ./dist/yt-dlp_linux.zip ./yt-dlp_linux.zip
- name: Verify --update-to
if: vars.UPDATE_TO_VERIFICATION
run: |
binaries=("yt-dlp" "yt-dlp_linux")
for binary in "${binaries[@]}"; do
chmod +x ./${binary}
cp ./${binary} ./${binary}_downgraded
version="$(./${binary} --version)"
./${binary}_downgraded -v --update-to yt-dlp/yt-dlp@2023.03.04
downgraded_version="$(./${binary}_downgraded --version)"
[[ "$version" != "$downgraded_version" ]]
done
chmod +x ./yt-dlp
cp ./yt-dlp ./yt-dlp_downgraded
version="$(./yt-dlp --version)"
./yt-dlp_downgraded -v --update-to yt-dlp/yt-dlp@2023.03.04
downgraded_version="$(./yt-dlp_downgraded --version)"
[[ "$version" != "$downgraded_version" ]]
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
@ -172,8 +139,39 @@ jobs:
path: |
yt-dlp
yt-dlp.tar.gz
yt-dlp_linux
yt-dlp_linux.zip
compression-level: 0
linux_static:
needs: process
if: inputs.linux_static
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build static executable
env:
channel: ${{ inputs.channel }}
origin: ${{ needs.process.outputs.origin }}
version: ${{ inputs.version }}
run: |
mkdir ~/build
cd bundle/docker
docker compose up --build static
sudo chown "${USER}:docker" ~/build/yt-dlp_linux
- name: Verify --update-to
if: vars.UPDATE_TO_VERIFICATION
run: |
chmod +x ~/build/yt-dlp_linux
cp ~/build/yt-dlp_linux ~/build/yt-dlp_linux_downgraded
version="$(~/build/yt-dlp_linux --version)"
~/build/yt-dlp_linux_downgraded -v --update-to yt-dlp/yt-dlp@2023.03.04
downgraded_version="$(~/build/yt-dlp_linux_downgraded --version)"
[[ "$version" != "$downgraded_version" ]]
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: build-bin-${{ github.job }}
path: |
~/build/yt-dlp_linux
compression-level: 0
linux_arm:
@ -300,7 +298,7 @@ jobs:
macos_legacy:
needs: process
if: inputs.macos_legacy
runs-on: macos-latest
runs-on: macos-12
steps:
- uses: actions/checkout@v4
@ -447,10 +445,11 @@ jobs:
compression-level: 0
meta_files:
if: inputs.meta_files && always() && !cancelled()
if: always() && !cancelled()
needs:
- process
- unix
- linux_static
- linux_arm
- macos
- macos_legacy

10
bundle/docker/compose.yml Normal file
View file

@ -0,0 +1,10 @@
services:
static:
build: static
environment:
channel: ${channel}
origin: ${origin}
version: ${version}
volumes:
- ~/build:/build
- ../..:/yt-dlp

View file

@ -0,0 +1,21 @@
FROM alpine:3.19 as base
RUN apk --update add --no-cache \
build-base \
python3 \
pipx \
;
RUN pipx install pyinstaller
# Requires above step to prepare the shared venv
RUN ~/.local/share/pipx/shared/bin/python -m pip install -U wheel
RUN apk --update add --no-cache \
scons \
patchelf \
binutils \
;
RUN pipx install staticx
WORKDIR /yt-dlp
COPY entrypoint.sh /entrypoint.sh
ENTRYPOINT /entrypoint.sh

View file

@ -0,0 +1,13 @@
#!/bin/ash
set -e
source ~/.local/share/pipx/venvs/pyinstaller/bin/activate
python -m devscripts.install_deps --include secretstorage
python -m devscripts.make_lazy_extractors
python devscripts/update-version.py -c "${channel}" -r "${origin}" "${version}"
python -m bundle.pyinstaller
deactivate
source ~/.local/share/pipx/venvs/staticx/bin/activate
staticx /yt-dlp/dist/yt-dlp_linux /build/yt-dlp_linux
deactivate

View file

@ -1,4 +1,3 @@
import functools
import inspect
import pytest
@ -10,7 +9,9 @@
@pytest.fixture
def handler(request):
RH_KEY = request.param
RH_KEY = getattr(request, 'param', None)
if not RH_KEY:
return
if inspect.isclass(RH_KEY) and issubclass(RH_KEY, RequestHandler):
handler = RH_KEY
elif RH_KEY in _REQUEST_HANDLERS:
@ -18,9 +19,46 @@ def handler(request):
else:
pytest.skip(f'{RH_KEY} request handler is not available')
return functools.partial(handler, logger=FakeLogger)
class HandlerWrapper(handler):
RH_KEY = handler.RH_KEY
def __init__(self, *args, **kwargs):
super().__init__(logger=FakeLogger, *args, **kwargs)
return HandlerWrapper
def validate_and_send(rh, req):
rh.validate(req)
return rh.send(req)
@pytest.fixture(autouse=True)
def skip_handler(request, handler):
"""usage: pytest.mark.skip_handler('my_handler', 'reason')"""
for marker in request.node.iter_markers('skip_handler'):
if marker.args[0] == handler.RH_KEY:
pytest.skip(marker.args[1] if len(marker.args) > 1 else '')
@pytest.fixture(autouse=True)
def skip_handler_if(request, handler):
"""usage: pytest.mark.skip_handler_if('my_handler', lambda request: True, 'reason')"""
for marker in request.node.iter_markers('skip_handler_if'):
if marker.args[0] == handler.RH_KEY and marker.args[1](request):
pytest.skip(marker.args[2] if len(marker.args) > 2 else '')
@pytest.fixture(autouse=True)
def skip_handlers_if(request, handler):
"""usage: pytest.mark.skip_handlers_if(lambda request, handler: True, 'reason')"""
for marker in request.node.iter_markers('skip_handlers_if'):
if handler and marker.args[0](request, handler):
pytest.skip(marker.args[1] if len(marker.args) > 1 else '')
def pytest_configure(config):
config.addinivalue_line(
"markers", "skip_handler(handler): skip test for the given handler",
)
config.addinivalue_line(
"markers", "skip_handler_if(handler): skip test for the given handler if condition is true"
)
config.addinivalue_line(
"markers", "skip_handlers_if(handler): skip test for handlers when the condition is true"
)

View file

@ -338,3 +338,8 @@ def http_server_port(httpd):
def verify_address_availability(address):
if find_available_port(address) is None:
pytest.skip(f'Unable to bind to source address {address} (address may not exist)')
def validate_and_send(rh, req):
rh.validate(req)
return rh.send(req)

379
test/test_http_proxy.py Normal file
View file

@ -0,0 +1,379 @@
import abc
import base64
import contextlib
import functools
import json
import os
import random
import ssl
import threading
from http.server import BaseHTTPRequestHandler
from socketserver import ThreadingTCPServer
import pytest
from test.helper import http_server_port, verify_address_availability
from test.test_networking import TEST_DIR
from test.test_socks import IPv6ThreadingTCPServer
from yt_dlp.dependencies import urllib3
from yt_dlp.networking import Request
from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError
class HTTPProxyAuthMixin:
def proxy_auth_error(self):
self.send_response(407)
self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"')
self.end_headers()
return False
def do_proxy_auth(self, username, password):
if username is None and password is None:
return True
proxy_auth_header = self.headers.get('Proxy-Authorization', None)
if proxy_auth_header is None:
return self.proxy_auth_error()
if not proxy_auth_header.startswith('Basic '):
return self.proxy_auth_error()
auth = proxy_auth_header[6:]
try:
auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1)
except Exception:
return self.proxy_auth_error()
if auth_username != (username or '') or auth_password != (password or ''):
return self.proxy_auth_error()
return True
class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs):
self.username = username
self.password = password
self.proxy_info = proxy_info
super().__init__(*args, **kwargs)
def do_GET(self):
if not self.do_proxy_auth(self.username, self.password):
self.server.close_request(self.request)
return
if self.path.endswith('/proxy_info'):
payload = json.dumps(self.proxy_info or {
'client_address': self.client_address,
'connect': False,
'connect_host': None,
'connect_port': None,
'headers': dict(self.headers),
'path': self.path,
'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
})
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', str(len(payload)))
self.end_headers()
self.wfile.write(payload.encode())
else:
self.send_response(404)
self.end_headers()
self.server.close_request(self.request)
if urllib3:
import urllib3.util.ssltransport
class SSLTransport(urllib3.util.ssltransport.SSLTransport):
"""
Modified version of urllib3 SSLTransport to support server side SSL
This allows us to chain multiple TLS connections.
"""
def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False):
self.incoming = ssl.MemoryBIO()
self.outgoing = ssl.MemoryBIO()
self.suppress_ragged_eofs = suppress_ragged_eofs
self.socket = socket
self.sslobj = ssl_context.wrap_bio(
self.incoming,
self.outgoing,
server_hostname=server_hostname,
server_side=server_side
)
self._ssl_io_loop(self.sslobj.do_handshake)
@property
def _io_refs(self):
return self.socket._io_refs
@_io_refs.setter
def _io_refs(self, value):
self.socket._io_refs = value
def shutdown(self, *args, **kwargs):
self.socket.shutdown(*args, **kwargs)
else:
SSLTransport = None
class HTTPSProxyHandler(HTTPProxyHandler):
def __init__(self, request, *args, **kwargs):
certfn = os.path.join(TEST_DIR, 'testcert.pem')
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.load_cert_chain(certfn, None)
if isinstance(request, ssl.SSLSocket):
request = SSLTransport(request, ssl_context=sslctx, server_side=True)
else:
request = sslctx.wrap_socket(request, server_side=True)
super().__init__(request, *args, **kwargs)
class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
protocol_version = 'HTTP/1.1'
default_request_version = 'HTTP/1.1'
def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs):
self.username = username
self.password = password
self.request_handler = request_handler
super().__init__(*args, **kwargs)
def do_CONNECT(self):
if not self.do_proxy_auth(self.username, self.password):
self.server.close_request(self.request)
return
self.send_response(200)
self.end_headers()
proxy_info = {
'client_address': self.client_address,
'connect': True,
'connect_host': self.path.split(':')[0],
'connect_port': int(self.path.split(':')[1]),
'headers': dict(self.headers),
'path': self.path,
'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
}
self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info)
self.server.close_request(self.request)
class HTTPSConnectProxyHandler(HTTPConnectProxyHandler):
def __init__(self, request, *args, **kwargs):
certfn = os.path.join(TEST_DIR, 'testcert.pem')
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.load_cert_chain(certfn, None)
request = sslctx.wrap_socket(request, server_side=True)
self._original_request = request
super().__init__(request, *args, **kwargs)
def do_CONNECT(self):
super().do_CONNECT()
self.server.close_request(self._original_request)
@contextlib.contextmanager
def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs):
server = server_thread = None
try:
bind_address = bind_ip or '127.0.0.1'
server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
server = server_type(
(bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs))
server_port = http_server_port(server)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
if '.' not in bind_address:
yield f'[{bind_address}]:{server_port}'
else:
yield f'{bind_address}:{server_port}'
finally:
server.shutdown()
server.server_close()
server_thread.join(2.0)
class HTTPProxyTestContext(abc.ABC):
REQUEST_HANDLER_CLASS = None
REQUEST_PROTO = None
def http_server(self, server_class, *args, **kwargs):
return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
@abc.abstractmethod
def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
"""return a dict of proxy_info"""
class HTTPProxyHTTPTestContext(HTTPProxyTestContext):
# Standard HTTP Proxy for http requests
REQUEST_HANDLER_CLASS = HTTPProxyHandler
REQUEST_PROTO = 'http'
def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
handler.validate(request)
return json.loads(handler.send(request).read().decode())
class HTTPProxyHTTPSTestContext(HTTPProxyTestContext):
# HTTP Connect proxy, for https requests
REQUEST_HANDLER_CLASS = HTTPSProxyHandler
REQUEST_PROTO = 'https'
def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
handler.validate(request)
return json.loads(handler.send(request).read().decode())
CTX_MAP = {
'http': HTTPProxyHTTPTestContext,
'https': HTTPProxyHTTPSTestContext,
}
@pytest.fixture(scope='module')
def ctx(request):
return CTX_MAP[request.param]()
@pytest.mark.parametrize(
'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
@pytest.mark.parametrize('ctx', ['http'], indirect=True) # pure http proxy can only support http
class TestHTTPProxy:
def test_http_no_auth(self, handler, ctx):
with ctx.http_server(HTTPProxyHandler) as server_address:
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is False
assert 'Proxy-Authorization' not in proxy_info['headers']
def test_http_auth(self, handler, ctx):
with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert 'Proxy-Authorization' in proxy_info['headers']
def test_http_bad_auth(self, handler, ctx):
with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
with pytest.raises(HTTPError) as exc_info:
ctx.proxy_info_request(rh)
assert exc_info.value.response.status == 407
exc_info.value.response.close()
def test_http_source_address(self, handler, ctx):
with ctx.http_server(HTTPProxyHandler) as server_address:
source_address = f'127.0.0.{random.randint(5, 255)}'
verify_address_availability(source_address)
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
source_address=source_address) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['client_address'][0] == source_address
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
def test_https(self, handler, ctx):
with ctx.http_server(HTTPSProxyHandler) as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is False
assert 'Proxy-Authorization' not in proxy_info['headers']
@pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
def test_https_verify_failed(self, handler, ctx):
with ctx.http_server(HTTPSProxyHandler) as server_address:
with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
# Accept SSLError as may not be feasible to tell if it is proxy or request error.
# note: if request proto also does ssl verification, this may also be the error of the request.
# Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
with pytest.raises((ProxyError, SSLError)):
ctx.proxy_info_request(rh)
def test_http_with_idn(self, handler, ctx):
with ctx.http_server(HTTPProxyHandler) as server_address:
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw')
assert proxy_info['proxy'] == server_address
assert proxy_info['path'].startswith('http://xn--fiq228c.tw')
assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw'
@pytest.mark.parametrize(
'handler,ctx', [
('Requests', 'https'),
('CurlCFFI', 'https'),
], indirect=True)
class TestHTTPConnectProxy:
def test_http_connect_no_auth(self, handler, ctx):
with ctx.http_server(HTTPConnectProxyHandler) as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is True
assert 'Proxy-Authorization' not in proxy_info['headers']
def test_http_connect_auth(self, handler, ctx):
with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert 'Proxy-Authorization' in proxy_info['headers']
@pytest.mark.skip_handler(
'Requests',
'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374'
)
def test_http_connect_bad_auth(self, handler, ctx):
with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
with pytest.raises(ProxyError):
ctx.proxy_info_request(rh)
def test_http_connect_source_address(self, handler, ctx):
with ctx.http_server(HTTPConnectProxyHandler) as server_address:
source_address = f'127.0.0.{random.randint(5, 255)}'
verify_address_availability(source_address)
with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
source_address=source_address,
verify=False) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['client_address'][0] == source_address
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
def test_https_connect_proxy(self, handler, ctx):
with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert proxy_info['connect'] is True
assert 'Proxy-Authorization' not in proxy_info['headers']
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
def test_https_connect_verify_failed(self, handler, ctx):
with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
# Accept SSLError as may not be feasible to tell if it is proxy or request error.
# note: if request proto also does ssl verification, this may also be the error of the request.
# Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
with pytest.raises((ProxyError, SSLError)):
ctx.proxy_info_request(rh)
@pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
def test_https_connect_proxy_auth(self, handler, ctx):
with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address:
with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh:
proxy_info = ctx.proxy_info_request(rh)
assert proxy_info['proxy'] == server_address
assert 'Proxy-Authorization' in proxy_info['headers']

View file

@ -6,6 +6,8 @@
import pytest
from yt_dlp.networking.common import Features
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import gzip
@ -27,8 +29,12 @@
from email.message import Message
from http.cookiejar import CookieJar
from test.conftest import validate_and_send
from test.helper import FakeYDL, http_server_port, verify_address_availability
from test.helper import (
FakeYDL,
http_server_port,
validate_and_send,
verify_address_availability,
)
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
from yt_dlp.networking import (
@ -62,21 +68,6 @@
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
def _build_proxy_handler(name):
class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
proxy_name = name
def log_message(self, format, *args):
pass
def do_GET(self):
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
return HTTPTestRequestHandler
class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
default_request_version = 'HTTP/1.1'
@ -317,8 +308,9 @@ def setup_class(cls):
cls.https_server_thread.start()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
class TestHTTPRequestHandler(TestRequestHandlerBase):
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_verify_cert(self, handler):
with handler() as rh:
with pytest.raises(CertificateVerifyError):
@ -329,7 +321,6 @@ def test_verify_cert(self, handler):
assert r.status == 200
r.close()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_ssl_error(self, handler):
# HTTPS server with too old TLS version
# XXX: is there a better way to test this than to create a new server?
@ -347,7 +338,6 @@ def test_ssl_error(self, handler):
validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
assert not issubclass(exc_info.type, CertificateVerifyError)
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_percent_encode(self, handler):
with handler() as rh:
# Unicode characters should be encoded with uppercase percent-encoding
@ -359,7 +349,6 @@ def test_percent_encode(self, handler):
assert res.status == 200
res.close()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
@pytest.mark.parametrize('path', [
'/a/b/./../../headers',
'/redirect_dotsegments',
@ -375,15 +364,13 @@ def test_remove_dot_segments(self, handler, path):
assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
res.close()
# Not supported by CurlCFFI (non-standard)
@pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
@pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi (non-standard)')
def test_unicode_path_redirection(self, handler):
with handler() as rh:
r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
r.close()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_raise_http_error(self, handler):
with handler() as rh:
for bad_status in (400, 500, 599, 302):
@ -393,7 +380,6 @@ def test_raise_http_error(self, handler):
# Should not raise an error
validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_response_url(self, handler):
with handler() as rh:
# Response url should be that of the last url in redirect chain
@ -405,7 +391,6 @@ def test_response_url(self, handler):
res2.close()
# Covers some basic cases we expect some level of consistency between request handlers for
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
@pytest.mark.parametrize('redirect_status,method,expected', [
# A 303 must either use GET or HEAD for subsequent request
(303, 'POST', ('', 'GET', False)),
@ -447,7 +432,6 @@ def test_redirect(self, handler, redirect_status, method, expected):
assert expected[1] == res.headers.get('method')
assert expected[2] == ('content-length' in headers.decode().lower())
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_request_cookie_header(self, handler):
# We should accept a Cookie header being passed as in normal headers and handle it appropriately.
with handler() as rh:
@ -480,19 +464,16 @@ def test_request_cookie_header(self, handler):
assert b'cookie: test=ytdlp' not in data.lower()
assert b'cookie: test=test3' in data.lower()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_redirect_loop(self, handler):
with handler() as rh:
with pytest.raises(HTTPError, match='redirect loop'):
validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_incompleteread(self, handler):
with handler(timeout=2) as rh:
with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_cookies(self, handler):
cookiejar = YoutubeDLCookieJar()
cookiejar.set_cookie(http.cookiejar.Cookie(
@ -509,7 +490,6 @@ def test_cookies(self, handler):
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
assert b'cookie: test=ytdlp' in data.lower()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_headers(self, handler):
with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
@ -525,7 +505,6 @@ def test_headers(self, handler):
assert b'test2: test2' not in data
assert b'test3: test3' in data
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_read_timeout(self, handler):
with handler() as rh:
# Default timeout is 20 seconds, so this should go through
@ -541,7 +520,6 @@ def test_read_timeout(self, handler):
validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_connect_timeout(self, handler):
# nothing should be listening on this port
connect_timeout_url = 'http://10.255.255.255'
@ -560,7 +538,6 @@ def test_connect_timeout(self, handler):
rh, Request(connect_timeout_url, extensions={'timeout': 0.01}))
assert 0.01 <= time.time() - now < 20
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_source_address(self, handler):
source_address = f'127.0.0.{random.randint(5, 255)}'
# on some systems these loopback addresses we need for testing may not be available
@ -572,13 +549,13 @@ def test_source_address(self, handler):
assert source_address == data
# Not supported by CurlCFFI
@pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
@pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
def test_gzip_trailing_garbage(self, handler):
with handler() as rh:
data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
assert data == '<html><video src="/vid.mp4" /></html>'
@pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
@pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
@pytest.mark.skipif(not brotli, reason='brotli support is not installed')
def test_brotli(self, handler):
with handler() as rh:
@ -589,7 +566,6 @@ def test_brotli(self, handler):
assert res.headers.get('Content-Encoding') == 'br'
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_deflate(self, handler):
with handler() as rh:
res = validate_and_send(
@ -599,7 +575,6 @@ def test_deflate(self, handler):
assert res.headers.get('Content-Encoding') == 'deflate'
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_gzip(self, handler):
with handler() as rh:
res = validate_and_send(
@ -609,7 +584,6 @@ def test_gzip(self, handler):
assert res.headers.get('Content-Encoding') == 'gzip'
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_multiple_encodings(self, handler):
with handler() as rh:
for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
@ -620,8 +594,7 @@ def test_multiple_encodings(self, handler):
assert res.headers.get('Content-Encoding') == pair
assert res.read() == b'<html><video src="/vid.mp4" /></html>'
# Not supported by curl_cffi
@pytest.mark.parametrize('handler', ['Urllib', 'Requests'], indirect=True)
@pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
def test_unsupported_encoding(self, handler):
with handler() as rh:
res = validate_and_send(
@ -631,7 +604,6 @@ def test_unsupported_encoding(self, handler):
assert res.headers.get('Content-Encoding') == 'unsupported'
assert res.read() == b'raw'
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_read(self, handler):
with handler() as rh:
res = validate_and_send(
@ -642,83 +614,48 @@ def test_read(self, handler):
assert res.read().decode().endswith('\n\n')
assert res.read() == b''
def test_request_disable_proxy(self, handler):
for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
# Given the handler is configured with a proxy
with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
# When a proxy is explicitly set to None for the request
res = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'http': None}))
# Then no proxy should be used
res.close()
assert res.status == 200
class TestHTTPProxy(TestRequestHandlerBase):
# Note: this only tests http urls over non-CONNECT proxy
@classmethod
def setup_class(cls):
super().setup_class()
# HTTP Proxy server
cls.proxy = http.server.ThreadingHTTPServer(
('127.0.0.1', 0), _build_proxy_handler('normal'))
cls.proxy_port = http_server_port(cls.proxy)
cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
cls.proxy_thread.daemon = True
cls.proxy_thread.start()
# Geo proxy server
cls.geo_proxy = http.server.ThreadingHTTPServer(
('127.0.0.1', 0), _build_proxy_handler('geo'))
cls.geo_port = http_server_port(cls.geo_proxy)
cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
cls.geo_proxy_thread.daemon = True
cls.geo_proxy_thread.start()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_http_proxy(self, handler):
http_proxy = f'http://127.0.0.1:{self.proxy_port}'
geo_proxy = f'http://127.0.0.1:{self.geo_port}'
# Test global http proxy
# Test per request http proxy
# Test per request http proxy disables proxy
url = 'http://foo.com/bar'
# Global HTTP proxy
with handler(proxies={'http': http_proxy}) as rh:
res = validate_and_send(rh, Request(url)).read().decode()
assert res == f'normal: {url}'
# Per request proxy overrides global
res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
assert res == f'geo: {url}'
# and setting to None disables all proxies for that request
real_url = f'http://127.0.0.1:{self.http_port}/headers'
res = validate_and_send(
rh, Request(real_url, proxies={'http': None})).read().decode()
assert res != f'normal: {real_url}'
assert 'Accept' in res
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
@pytest.mark.skip_handlers_if(
lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
def test_noproxy(self, handler):
with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
# NO_PROXY
for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
nop_response = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
'utf-8')
assert 'Accept' in nop_response
for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
# Given the handler is configured with a proxy
with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
# When request no proxy includes the request url host
nop_response = validate_and_send(
rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy}))
# Then the proxy should not be used
assert nop_response.status == 200
nop_response.close()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
@pytest.mark.skip_handlers_if(
lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
def test_allproxy(self, handler):
url = 'http://foo.com/bar'
with handler() as rh:
response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
'utf-8')
assert response == f'normal: {url}'
# This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
# 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
with handler(proxies={'all': 'http://10.255.255.255'}, timeout=0.1) as rh:
with pytest.raises(TransportError):
validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).close()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_http_proxy_with_idn(self, handler):
with handler(proxies={
'http': f'http://127.0.0.1:{self.proxy_port}',
}) as rh:
url = 'http://中文.tw/'
response = rh.send(Request(url)).read().decode()
# b'xn--fiq228c' is '中文'.encode('idna')
assert response == 'normal: http://xn--fiq228c.tw/'
with handler(timeout=0.1) as rh:
with pytest.raises(TransportError):
validate_and_send(
rh, Request(
f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
class TestClientCertificate:
@classmethod
def setup_class(cls):
@ -745,27 +682,23 @@ def _run_test(self, handler, **handler_kwargs):
) as rh:
validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_certificate_combined_nopass(self, handler):
self._run_test(handler, client_cert={
'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
})
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_certificate_nocombined_nopass(self, handler):
self._run_test(handler, client_cert={
'client_certificate': os.path.join(self.certdir, 'client.crt'),
'client_certificate_key': os.path.join(self.certdir, 'client.key'),
})
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_certificate_combined_pass(self, handler):
self._run_test(handler, client_cert={
'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
'client_certificate_password': 'foobar',
})
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_certificate_nocombined_pass(self, handler):
self._run_test(handler, client_cert={
'client_certificate': os.path.join(self.certdir, 'client.crt'),
@ -785,6 +718,25 @@ def test_supported_impersonate_targets(self, handler):
assert res.status == 200
assert std_headers['user-agent'].lower() not in res.read().decode().lower()
def test_response_extensions(self, handler):
with handler() as rh:
for target in rh.supported_targets:
request = Request(
f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
res = validate_and_send(rh, request)
assert res.extensions['impersonate'] == rh._get_request_target(request)
def test_http_error_response_extensions(self, handler):
with handler() as rh:
for target in rh.supported_targets:
request = Request(
f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
try:
validate_and_send(rh, request)
except HTTPError as e:
res = e.response
assert res.extensions['impersonate'] == rh._get_request_target(request)
class TestRequestHandlerMisc:
"""Misc generic tests for request handlers, not related to request or validation testing"""
@ -805,8 +757,8 @@ def test_remove_logging_handler(self, handler, logger_name):
assert len(logging_handlers) == before_count
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
class TestUrllibRequestHandler(TestRequestHandlerBase):
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
def test_file_urls(self, handler):
# See https://github.com/ytdl-org/youtube-dl/issues/8227
tf = tempfile.NamedTemporaryFile(delete=False)
@ -828,7 +780,6 @@ def test_file_urls(self, handler):
os.unlink(tf.name)
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
def test_http_error_returns_content(self, handler):
# urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
def get_response():
@ -841,7 +792,6 @@ def get_response():
assert get_response().read() == b'<html></html>'
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
def test_verify_cert_error_text(self, handler):
# Check the output of the error message
with handler() as rh:
@ -851,7 +801,6 @@ def test_verify_cert_error_text(self, handler):
):
validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
@pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
@pytest.mark.parametrize('req,match,version_check', [
# https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
# bpo-39603: Check implemented in 3.7.9+, 3.8.5+
@ -1183,7 +1132,7 @@ class HTTPSupportedRH(ValidationRH):
]
PROXY_SCHEME_TESTS = [
# scheme, expected to fail
# proxy scheme, expected to fail
('Urllib', 'http', [
('http', False),
('https', UnsupportedRequest),
@ -1209,30 +1158,41 @@ class HTTPSupportedRH(ValidationRH):
('socks5', False),
('socks5h', False),
]),
('Websockets', 'ws', [
('http', UnsupportedRequest),
('https', UnsupportedRequest),
('socks4', False),
('socks4a', False),
('socks5', False),
('socks5h', False),
]),
(NoCheckRH, 'http', [('http', False)]),
(HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
('Websockets', 'ws', [('http', UnsupportedRequest)]),
(NoCheckRH, 'http', [('http', False)]),
(HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
]
PROXY_KEY_TESTS = [
# key, expected to fail
('Urllib', [
('all', False),
('unrelated', False),
# proxy key, proxy scheme, expected to fail
('Urllib', 'http', [
('all', 'http', False),
('unrelated', 'http', False),
]),
('Requests', [
('all', False),
('unrelated', False),
('Requests', 'http', [
('all', 'http', False),
('unrelated', 'http', False),
]),
('CurlCFFI', [
('all', False),
('unrelated', False),
('CurlCFFI', 'http', [
('all', 'http', False),
('unrelated', 'http', False),
]),
(NoCheckRH, [('all', False)]),
(HTTPSupportedRH, [('all', UnsupportedRequest)]),
(HTTPSupportedRH, [('no', UnsupportedRequest)]),
('Websockets', 'ws', [
('all', 'socks5', False),
('unrelated', 'socks5', False),
]),
(NoCheckRH, 'http', [('all', 'http', False)]),
(HTTPSupportedRH, 'http', [('all', 'http', UnsupportedRequest)]),
(HTTPSupportedRH, 'http', [('no', 'http', UnsupportedRequest)]),
]
EXTENSION_TESTS = [
@ -1274,28 +1234,54 @@ class HTTPSupportedRH(ValidationRH):
]),
]
@pytest.mark.parametrize('handler,fail,scheme', [
('Urllib', False, 'http'),
('Requests', False, 'http'),
('CurlCFFI', False, 'http'),
('Websockets', False, 'ws')
], indirect=['handler'])
def test_no_proxy(self, handler, fail, scheme):
run_validation(handler, fail, Request(f'{scheme}://', proxies={'no': '127.0.0.1,github.com'}))
run_validation(handler, fail, Request(f'{scheme}://'), proxies={'no': '127.0.0.1,github.com'})
@pytest.mark.parametrize('handler,scheme', [
('Urllib', 'http'),
(HTTPSupportedRH, 'http'),
('Requests', 'http'),
('CurlCFFI', 'http'),
('Websockets', 'ws')
], indirect=['handler'])
def test_empty_proxy(self, handler, scheme):
run_validation(handler, False, Request(f'{scheme}://', proxies={scheme: None}))
run_validation(handler, False, Request(f'{scheme}://'), proxies={scheme: None})
@pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
@pytest.mark.parametrize('handler,scheme', [
('Urllib', 'http'),
(HTTPSupportedRH, 'http'),
('Requests', 'http'),
('CurlCFFI', 'http'),
('Websockets', 'ws')
], indirect=['handler'])
def test_invalid_proxy_url(self, handler, scheme, proxy_url):
run_validation(handler, UnsupportedRequest, Request(f'{scheme}://', proxies={scheme: proxy_url}))
@pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
(handler_tests[0], scheme, fail, handler_kwargs)
for handler_tests in URL_SCHEME_TESTS
for scheme, fail, handler_kwargs in handler_tests[1]
], indirect=['handler'])
def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
@pytest.mark.parametrize('handler,fail', [('Urllib', False), ('Requests', False), ('CurlCFFI', False)], indirect=['handler'])
def test_no_proxy(self, handler, fail):
run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
@pytest.mark.parametrize('handler,proxy_key,fail', [
(handler_tests[0], proxy_key, fail)
@pytest.mark.parametrize('handler,scheme,proxy_key,proxy_scheme,fail', [
(handler_tests[0], handler_tests[1], proxy_key, proxy_scheme, fail)
for handler_tests in PROXY_KEY_TESTS
for proxy_key, fail in handler_tests[1]
for proxy_key, proxy_scheme, fail in handler_tests[2]
], indirect=['handler'])
def test_proxy_key(self, handler, proxy_key, fail):
run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
def test_proxy_key(self, handler, scheme, proxy_key, proxy_scheme, fail):
run_validation(handler, fail, Request(f'{scheme}://', proxies={proxy_key: f'{proxy_scheme}://example.com'}))
run_validation(handler, fail, Request(f'{scheme}://'), proxies={proxy_key: f'{proxy_scheme}://example.com'})
@pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
(handler_tests[0], handler_tests[1], scheme, fail)
@ -1306,16 +1292,6 @@ def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
@pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH, 'Requests', 'CurlCFFI'], indirect=True)
def test_empty_proxy(self, handler):
run_validation(handler, False, Request('http://', proxies={'http': None}))
run_validation(handler, False, Request('http://'), proxies={'http': None})
@pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
def test_invalid_proxy_url(self, handler, proxy_url):
run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
@pytest.mark.parametrize('handler,scheme,extensions,fail', [
(handler_tests[0], handler_tests[1], extensions, fail)
for handler_tests in EXTENSION_TESTS

View file

@ -2059,7 +2059,22 @@ def test_extract_basic_auth(self):
assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
@unittest.skipUnless(compat_os_name == 'nt', 'Only relevant on Windows')
def test_Popen_windows_escaping(self):
def test_windows_escaping(self):
tests = [
'test"&',
'%CMDCMDLINE:~-1%&',
'a\nb',
'"',
'\\',
'!',
'^!',
'a \\ b',
'a \\" b',
'a \\ b\\',
# We replace \r with \n
('a\r\ra', 'a\n\na'),
]
def run_shell(args):
stdout, stderr, error = Popen.run(
args, text=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -2067,15 +2082,15 @@ def run_shell(args):
assert not error
return stdout
# Test escaping
assert run_shell(['echo', 'test"&']) == '"test""&"\n'
assert run_shell(['echo', '%CMDCMDLINE:~-1%&']) == '"%CMDCMDLINE:~-1%&"\n'
assert run_shell(['echo', 'a\nb']) == '"a"\n"b"\n'
assert run_shell(['echo', '"']) == '""""\n'
assert run_shell(['echo', '\\']) == '\\\n'
# Test if delayed expansion is disabled
assert run_shell(['echo', '^!']) == '"^!"\n'
assert run_shell('echo "^!"') == '"^!"\n'
for argument in tests:
if isinstance(argument, str):
expected = argument
else:
argument, expected = argument
args = [sys.executable, '-c', 'import sys; print(end=sys.argv[1])', argument, 'end']
assert run_shell(args) == expected
assert run_shell(shell_quote(args, shell=True)) == expected
if __name__ == '__main__':

View file

@ -7,6 +7,7 @@
import pytest
from test.helper import verify_address_availability
from yt_dlp.networking.common import Features
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -18,7 +19,7 @@
import ssl
import threading
from yt_dlp import socks
from yt_dlp import socks, traverse_obj
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.dependencies import websockets
from yt_dlp.networking import Request
@ -114,6 +115,7 @@ def ws_validate_and_send(rh, req):
@pytest.mark.skipif(not websockets, reason='websockets must be installed to test websocket request handlers')
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
class TestWebsSocketRequestHandlerConformance:
@classmethod
def setup_class(cls):
@ -129,7 +131,6 @@ def setup_class(cls):
cls.mtls_wss_thread, cls.mtls_wss_port = create_mtls_wss_websocket_server()
cls.mtls_wss_base_url = f'wss://127.0.0.1:{cls.mtls_wss_port}'
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_basic_websockets(self, handler):
with handler() as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
@ -141,7 +142,6 @@ def test_basic_websockets(self, handler):
# https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
@pytest.mark.parametrize('msg,opcode', [('str', 1), (b'bytes', 2)])
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_send_types(self, handler, msg, opcode):
with handler() as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
@ -149,7 +149,6 @@ def test_send_types(self, handler, msg, opcode):
assert int(ws.recv()) == opcode
ws.close()
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_verify_cert(self, handler):
with handler() as rh:
with pytest.raises(CertificateVerifyError):
@ -160,14 +159,12 @@ def test_verify_cert(self, handler):
assert ws.status == 101
ws.close()
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_ssl_error(self, handler):
with handler(verify=False) as rh:
with pytest.raises(SSLError, match=r'ssl(?:v3|/tls) alert handshake failure') as exc_info:
ws_validate_and_send(rh, Request(self.bad_wss_host))
assert not issubclass(exc_info.type, CertificateVerifyError)
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
@pytest.mark.parametrize('path,expected', [
# Unicode characters should be encoded with uppercase percent-encoding
('/中文', '/%E4%B8%AD%E6%96%87'),
@ -182,7 +179,6 @@ def test_percent_encode(self, handler, path, expected):
assert ws.status == 101
ws.close()
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_remove_dot_segments(self, handler):
with handler() as rh:
# This isn't a comprehensive test,
@ -195,7 +191,6 @@ def test_remove_dot_segments(self, handler):
# We are restricted to known HTTP status codes in http.HTTPStatus
# Redirects are not supported for websockets
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
@pytest.mark.parametrize('status', (200, 204, 301, 302, 303, 400, 500, 511))
def test_raise_http_error(self, handler, status):
with handler() as rh:
@ -203,7 +198,6 @@ def test_raise_http_error(self, handler, status):
ws_validate_and_send(rh, Request(f'{self.ws_base_url}/gen_{status}'))
assert exc_info.value.status == status
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
@pytest.mark.parametrize('params,extensions', [
({'timeout': sys.float_info.min}, {}),
({}, {'timeout': sys.float_info.min}),
@ -213,7 +207,6 @@ def test_timeout(self, handler, params, extensions):
with pytest.raises(TransportError):
ws_validate_and_send(rh, Request(self.ws_base_url, extensions=extensions))
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_cookies(self, handler):
cookiejar = YoutubeDLCookieJar()
cookiejar.set_cookie(http.cookiejar.Cookie(
@ -239,7 +232,6 @@ def test_cookies(self, handler):
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
ws.close()
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_source_address(self, handler):
source_address = f'127.0.0.{random.randint(5, 255)}'
verify_address_availability(source_address)
@ -249,7 +241,6 @@ def test_source_address(self, handler):
assert source_address == ws.recv()
ws.close()
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_response_url(self, handler):
with handler() as rh:
url = f'{self.ws_base_url}/something'
@ -257,7 +248,6 @@ def test_response_url(self, handler):
assert ws.url == url
ws.close()
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_request_headers(self, handler):
with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
# Global Headers
@ -293,7 +283,6 @@ def test_request_headers(self, handler):
'client_certificate_password': 'foobar',
}
))
@pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
def test_mtls(self, handler, client_cert):
with handler(
# Disable client-side validation of unacceptable self-signed testcert.pem
@ -303,6 +292,44 @@ def test_mtls(self, handler, client_cert):
) as rh:
ws_validate_and_send(rh, Request(self.mtls_wss_base_url)).close()
def test_request_disable_proxy(self, handler):
for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['ws']:
# Given handler is configured with a proxy
with handler(proxies={'ws': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
# When a proxy is explicitly set to None for the request
ws = ws_validate_and_send(rh, Request(self.ws_base_url, proxies={'http': None}))
# Then no proxy should be used
assert ws.status == 101
ws.close()
@pytest.mark.skip_handlers_if(
lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
def test_noproxy(self, handler):
for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['ws']:
# Given the handler is configured with a proxy
with handler(proxies={'ws': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
for no_proxy in (f'127.0.0.1:{self.ws_port}', '127.0.0.1', 'localhost'):
# When request no proxy includes the request url host
ws = ws_validate_and_send(rh, Request(self.ws_base_url, proxies={'no': no_proxy}))
# Then the proxy should not be used
assert ws.status == 101
ws.close()
@pytest.mark.skip_handlers_if(
lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
def test_allproxy(self, handler):
supported_proto = traverse_obj(handler._SUPPORTED_PROXY_SCHEMES, 0, default='ws')
# This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
# 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
with handler(proxies={'all': f'{supported_proto}://10.255.255.255'}, timeout=0.1) as rh:
with pytest.raises(TransportError):
ws_validate_and_send(rh, Request(self.ws_base_url)).close()
with handler(timeout=0.1) as rh:
with pytest.raises(TransportError):
ws_validate_and_send(
rh, Request(self.ws_base_url, proxies={'all': f'{supported_proto}://10.255.255.255'})).close()
def create_fake_ws_connection(raised):
import websockets.sync.client

View file

@ -2136,6 +2136,11 @@ def _filter(f):
def _check_formats(self, formats):
for f in formats:
working = f.get('__working')
if working is not None:
if working:
yield f
continue
self.to_screen('[info] Testing format %s' % f['format_id'])
path = self.get_output_path('temp')
if not self._ensure_dir_exists(f'{path}/'):
@ -2152,33 +2157,44 @@ def _check_formats(self, formats):
os.remove(temp_file.name)
except OSError:
self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
f['__working'] = success
if success:
yield f
else:
self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
def _select_formats(self, formats, selector):
return list(selector({
'formats': formats,
'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats),
'incomplete_formats': (all(f.get('vcodec') == 'none' for f in formats) # No formats with video
or all(f.get('acodec') == 'none' for f in formats)), # OR, No formats with audio
}))
def _default_format_spec(self, info_dict, download=True):
download = download and not self.params.get('simulate')
prefer_best = download and (
self.params['outtmpl']['default'] == '-'
or info_dict.get('is_live') and not self.params.get('live_from_start'))
def can_merge():
merger = FFmpegMergerPP(self)
return merger.available and merger.can_merge()
prefer_best = (
not self.params.get('simulate')
and download
and (
not can_merge()
or info_dict.get('is_live') and not self.params.get('live_from_start')
or self.params['outtmpl']['default'] == '-'))
compat = (
prefer_best
or self.params.get('allow_multiple_audio_streams', False)
or 'format-spec' in self.params['compat_opts'])
if not prefer_best and download and not can_merge():
prefer_best = True
formats = self._get_formats(info_dict)
evaluate_formats = lambda spec: self._select_formats(formats, self.build_format_selector(spec))
if evaluate_formats('b/bv+ba') != evaluate_formats('bv*+ba/b'):
self.report_warning('ffmpeg not found. The downloaded format may not be the best available. '
'Installing ffmpeg is strongly recommended: https://github.com/yt-dlp/yt-dlp#dependencies')
return (
'best/bestvideo+bestaudio' if prefer_best
else 'bestvideo*+bestaudio/best' if not compat
else 'bestvideo+bestaudio/best')
compat = (self.params.get('allow_multiple_audio_streams')
or 'format-spec' in self.params['compat_opts'])
return ('best/bestvideo+bestaudio' if prefer_best
else 'bestvideo+bestaudio/best' if compat
else 'bestvideo*+bestaudio/best')
def build_format_selector(self, format_spec):
def syntax_error(note, start):
@ -2928,12 +2944,7 @@ def is_wellformed(f):
self.write_debug(f'Default format spec: {req_format}')
format_selector = self.build_format_selector(req_format)
formats_to_download = list(format_selector({
'formats': formats,
'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats),
'incomplete_formats': (all(f.get('vcodec') == 'none' for f in formats) # No formats with video
or all(f.get('acodec') == 'none' for f in formats)), # OR, No formats with audio
}))
formats_to_download = self._select_formats(formats, format_selector)
if interactive_format_selection and not formats_to_download:
self.report_error('Requested format is not available', tb=False, is_error=False)
continue
@ -3060,7 +3071,7 @@ def process_subtitles(self, video_id, normal_subtitles, automatic_captions):
f = formats[-1]
self.report_warning(
'No subtitle format found matching "%s" for language %s, '
'using %s' % (formats_query, lang, f['ext']))
'using %s. Use --list-subs for a list of available subtitles' % (formats_query, lang, f['ext']))
subs[lang] = f
return subs

View file

@ -347,6 +347,11 @@ def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value, pa
if value is None:
return is_encrypted, None
# In chrome, session cookies have expires_utc set to 0
# In our cookie-store, cookies that do not expire should have expires set to None
if not expires_utc:
expires_utc = None
return is_encrypted, http.cookiejar.Cookie(
version=0, name=name, value=value, port=None, port_specified=False,
domain=host_key, domain_specified=bool(host_key), domain_initial_dot=host_key.startswith('.'),

View file

@ -387,7 +387,11 @@
ComedyCentralIE,
ComedyCentralTVIE,
)
from .commonmistakes import CommonMistakesIE, UnicodeBOMIE
from .commonmistakes import (
BlobIE,
CommonMistakesIE,
UnicodeBOMIE,
)
from .commonprotocols import (
MmsIE,
RtmpIE,

View file

@ -39,7 +39,7 @@ class AluraIE(InfoExtractor):
def _real_extract(self, url):
course, video_id = self._match_valid_url(url)
course, video_id = self._match_valid_url(url).group('course_name', 'id')
video_url = self._VIDEO_URL % (course, video_id)
video_dict = self._download_json(video_url, video_id, 'Searching for videos')
@ -52,7 +52,7 @@ def _real_extract(self, url):
formats = []
for video_obj in video_dict:
video_url_m3u8 = video_obj.get('link')
video_url_m3u8 = video_obj.get('mp4')
video_format = self._extract_m3u8_formats(
video_url_m3u8, None, 'mp4', entry_protocol='m3u8_native',
m3u8_id='hls', fatal=False)

View file

@ -93,11 +93,11 @@ def extract_formats(self, play_info):
return formats
def _download_playinfo(self, video_id, cid):
def _download_playinfo(self, video_id, cid, headers=None):
return self._download_json(
'https://api.bilibili.com/x/player/playurl', video_id,
query={'bvid': video_id, 'cid': cid, 'fnval': 4048},
note=f'Downloading video formats for cid {cid}')['data']
note=f'Downloading video formats for cid {cid}', headers=headers)['data']
def json2srt(self, json_data):
srt_data = ''
@ -493,7 +493,8 @@ class BiliBiliIE(BilibiliBaseIE):
def _real_extract(self, url):
video_id = self._match_id(url)
webpage, urlh = self._download_webpage_handle(url, video_id)
headers = self.geo_verification_headers()
webpage, urlh = self._download_webpage_handle(url, video_id, headers=headers)
if not self._match_valid_url(urlh.url):
return self.url_result(urlh.url)
@ -531,7 +532,7 @@ def _real_extract(self, url):
self._download_json(
'https://api.bilibili.com/x/player/pagelist', video_id,
fatal=False, query={'bvid': video_id, 'jsonp': 'jsonp'},
note='Extracting videos in anthology'),
note='Extracting videos in anthology', headers=headers),
'data', expected_type=list) or []
is_anthology = len(page_list_json) > 1
@ -552,7 +553,7 @@ def _real_extract(self, url):
festival_info = {}
if is_festival:
play_info = self._download_playinfo(video_id, cid)
play_info = self._download_playinfo(video_id, cid, headers=headers)
festival_info = traverse_obj(initial_state, {
'uploader': ('videoInfo', 'upName'),
@ -666,14 +667,15 @@ class BiliBiliBangumiIE(BilibiliBaseIE):
def _real_extract(self, url):
episode_id = self._match_id(url)
webpage = self._download_webpage(url, episode_id)
headers = self.geo_verification_headers()
webpage = self._download_webpage(url, episode_id, headers=headers)
if '您所在的地区无法观看本片' in webpage:
raise GeoRestrictedError('This video is restricted')
elif '正在观看预览,大会员免费看全片' in webpage:
self.raise_login_required('This video is for premium members only')
headers = {'Referer': url, **self.geo_verification_headers()}
headers['Referer'] = url
play_info = self._download_json(
'https://api.bilibili.com/pgc/player/web/v2/playurl', episode_id,
'Extracting episode', query={'fnval': '4048', 'ep_id': episode_id},
@ -724,7 +726,7 @@ def _real_extract(self, url):
'duration': float_or_none(play_info.get('timelength'), scale=1000),
'subtitles': self.extract_subtitles(episode_id, episode_info.get('cid'), aid=aid),
'__post_extractor': self.extract_comments(aid),
'http_headers': headers,
'http_headers': {'Referer': url},
}
@ -1043,15 +1045,17 @@ def fetch_page(page_idx):
try:
response = self._download_json('https://api.bilibili.com/x/space/wbi/arc/search',
playlist_id, note=f'Downloading page {page_idx}', query=query)
playlist_id, note=f'Downloading page {page_idx}', query=query,
headers={'referer': url})
except ExtractorError as e:
if isinstance(e.cause, HTTPError) and e.cause.status == 412:
raise ExtractorError(
'Request is blocked by server (412), please add cookies, wait and try later.', expected=True)
raise
if response['code'] == -401:
if response['code'] in (-352, -401):
raise ExtractorError(
'Request is blocked by server (401), please add cookies, wait and try later.', expected=True)
f'Request is blocked by server ({-response["code"]}), '
'please add cookies, wait and try later.', expected=True)
return response['data']
def get_metadata(page_data):

View file

@ -1,7 +1,11 @@
import json
import urllib.parse
from .common import InfoExtractor
from .youtube import YoutubeIE
from ..utils import (
ExtractorError,
bug_reports_message,
int_or_none,
qualities,
str_or_none,
@ -162,9 +166,19 @@ def _extract_formats(self, player_urls, video_id):
def _real_extract(self, url):
user, post_id = self._match_valid_url(url).group('user', 'post_id')
auth_headers = {}
auth_cookie = self._get_cookies('https://boosty.to/').get('auth')
if auth_cookie is not None:
try:
auth_data = json.loads(urllib.parse.unquote(auth_cookie.value))
auth_headers['Authorization'] = f'Bearer {auth_data["accessToken"]}'
except (json.JSONDecodeError, KeyError):
self.report_warning(f'Failed to extract token from auth cookie{bug_reports_message()}')
post = self._download_json(
f'https://api.boosty.to/v1/blog/{user}/post/{post_id}', post_id,
note='Downloading post data', errnote='Unable to download post data')
note='Downloading post data', errnote='Unable to download post data', headers=auth_headers)
post_title = post.get('title')
if not post_title:
@ -202,7 +216,9 @@ def _real_extract(self, url):
'thumbnail': (('previewUrl', 'defaultPreview'), {url_or_none}),
}, get_all=False)})
if not entries:
if not entries and not post.get('hasAccess'):
self.raise_login_required('This post requires a subscription', metadata_available=True)
elif not entries:
raise ExtractorError('No videos found', expected=True)
if len(entries) == 1:
return entries[0]

View file

@ -40,7 +40,7 @@ class CanalAlphaIE(InfoExtractor):
'id': '24484',
'ext': 'mp4',
'title': 'Ces innovations qui veulent rendre lagriculture plus durable',
'description': 'md5:3de3f151180684621e85be7c10e4e613',
'description': 'md5:85d594a3b5dc6ccfc4a85aba6e73b129',
'thumbnail': 'https://static.canalalpha.ch/poster/magazine/magazine_10236.jpg',
'upload_date': '20211026',
'duration': 360,
@ -58,14 +58,25 @@ class CanalAlphaIE(InfoExtractor):
'duration': 360,
},
'params': {'skip_download': True}
}, {
'url': 'https://www.canalalpha.ch/play/le-journal/topic/33500/encore-des-mesures-deconomie-dans-le-jura',
'info_dict': {
'id': '33500',
'ext': 'mp4',
'title': 'Encore des mesures d\'économie dans le Jura',
'description': 'md5:938b5b556592f2d1b9ab150268082a80',
'thumbnail': 'https://static.canalalpha.ch/poster/news/news_46665.jpg',
'upload_date': '20240411',
'duration': 105,
},
}]
def _real_extract(self, url):
id = self._match_id(url)
webpage = self._download_webpage(url, id)
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
data_json = self._parse_json(self._search_regex(
r'window\.__SERVER_STATE__\s?=\s?({(?:(?!};)[^"]|"([^"]|\\")*")+})\s?;',
webpage, 'data_json'), id)['1']['data']['data']
webpage, 'data_json'), video_id)['1']['data']['data']
manifests = try_get(data_json, lambda x: x['video']['manifests'], expected_type=dict) or {}
subtitles = {}
formats = [{
@ -75,15 +86,17 @@ def _real_extract(self, url):
'height': try_get(video, lambda x: x['res']['height'], expected_type=int),
} for video in try_get(data_json, lambda x: x['video']['mp4'], expected_type=list) or [] if video.get('$url')]
if manifests.get('hls'):
m3u8_frmts, m3u8_subs = self._parse_m3u8_formats_and_subtitles(manifests['hls'], video_id=id)
formats.extend(m3u8_frmts)
subtitles = self._merge_subtitles(subtitles, m3u8_subs)
fmts, subs = self._extract_m3u8_formats_and_subtitles(
manifests['hls'], video_id, m3u8_id='hls', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
if manifests.get('dash'):
dash_frmts, dash_subs = self._parse_mpd_formats_and_subtitles(manifests['dash'])
formats.extend(dash_frmts)
subtitles = self._merge_subtitles(subtitles, dash_subs)
fmts, subs = self._extract_mpd_formats_and_subtitles(
manifests['dash'], video_id, mpd_id='dash', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
return {
'id': id,
'id': video_id,
'title': data_json.get('title').strip(),
'description': clean_html(dict_get(data_json, ('longDesc', 'shortDesc'))),
'thumbnail': data_json.get('poster'),

View file

@ -151,7 +151,7 @@ def _real_extract(self, url):
class CBCPlayerIE(InfoExtractor):
IE_NAME = 'cbc.ca:player'
_VALID_URL = r'(?:cbcplayer:|https?://(?:www\.)?cbc\.ca/(?:player/play/|i/caffeine/syndicate/\?mediaId=))(?P<id>(?:\d\.)?\d+)'
_VALID_URL = r'(?:cbcplayer:|https?://(?:www\.)?cbc\.ca/(?:player/play/(?:video/)?|i/caffeine/syndicate/\?mediaId=))(?P<id>(?:\d\.)?\d+)'
_TESTS = [{
'url': 'http://www.cbc.ca/player/play/2683190193',
'md5': '64d25f841ddf4ddb28a235338af32e2c',
@ -277,6 +277,28 @@ class CBCPlayerIE(InfoExtractor):
'location': 'Canada',
'media_type': 'Full Program',
},
}, {
'url': 'https://www.cbc.ca/player/play/video/1.7194274',
'md5': '188b96cf6bdcb2540e178a6caa957128',
'info_dict': {
'id': '2334524995812',
'ext': 'mp4',
'title': '#TheMoment a rare white spirit moose was spotted in Alberta',
'description': 'md5:18ae269a2d0265c5b0bbe4b2e1ac61a3',
'timestamp': 1714788791,
'duration': 77.678,
'subtitles': {'eng': [{'ext': 'vtt', 'protocol': 'm3u8_native'}]},
'thumbnail': 'https://thumbnails.cbc.ca/maven_legacy/thumbnails/201/543/THE_MOMENT.jpg',
'uploader': 'CBCC-NEW',
'chapters': 'count:0',
'upload_date': '20240504',
'categories': 'count:3',
'series': 'The National',
'tags': 'count:15',
'creators': ['encoder'],
'location': 'Canada',
'media_type': 'Excerpt',
},
}, {
'url': 'cbcplayer:1.7159484',
'only_matching': True,

View file

@ -40,3 +40,19 @@ def _real_extract(self, url):
'Your URL starts with a Byte Order Mark (BOM). '
'Removing the BOM and looking for "%s" ...' % real_url)
return self.url_result(real_url)
class BlobIE(InfoExtractor):
IE_DESC = False
_VALID_URL = r'blob:'
_TESTS = [{
'url': 'blob:https://www.youtube.com/4eb3d090-a761-46e6-8083-c32016a36e3b',
'only_matching': True,
}]
def _real_extract(self, url):
raise ExtractorError(
'You\'ve asked yt-dlp to download a blob URL. '
'A blob URL exists only locally in your browser. '
'It is not possible for yt-dlp to access it.', expected=True)

View file

@ -53,15 +53,19 @@ def _set_auth_info(self, response):
CrunchyrollBaseIE._AUTH_EXPIRY = time_seconds(seconds=traverse_obj(response, ('expires_in', {float_or_none}), default=300) - 10)
def _request_token(self, headers, data, note='Requesting token', errnote='Failed to request token'):
try: # TODO: Add impersonation support here
try:
return self._download_json(
f'{self._BASE_URL}/auth/v1/token', None, note=note, errnote=errnote,
headers=headers, data=urlencode_postdata(data))
headers=headers, data=urlencode_postdata(data), impersonate=True)
except ExtractorError as error:
if not isinstance(error.cause, HTTPError) or error.cause.status != 403:
raise
if target := error.cause.response.extensions.get('impersonate'):
raise ExtractorError(f'Got HTTP Error 403 when using impersonate target "{target}"')
raise ExtractorError(
'Request blocked by Cloudflare; navigate to Crunchyroll in your browser, '
'Request blocked by Cloudflare. '
'Install the required impersonation dependency if possible, '
'or else navigate to Crunchyroll in your browser, '
'then pass the fresh cookies (with --cookies-from-browser or --cookies) '
'and your browser\'s User-Agent (with --user-agent)', expected=True)
@ -394,10 +398,11 @@ def entries():
if not self._IS_PREMIUM and traverse_obj(response, (f'{object_type}_metadata', 'is_premium_only')):
message = f'This {object_type} is for premium members only'
if CrunchyrollBaseIE._REFRESH_TOKEN:
raise ExtractorError(message, expected=True)
self.raise_login_required(message, method='password')
result['formats'], result['subtitles'] = self._extract_stream(internal_id)
self.raise_no_formats(message, expected=True, video_id=internal_id)
else:
self.raise_login_required(message, method='password', metadata_available=True)
else:
result['formats'], result['subtitles'] = self._extract_stream(internal_id)
result['chapters'] = self._extract_chapters(internal_id)
@ -583,14 +588,16 @@ def _real_extract(self, url):
if not response:
raise ExtractorError(f'No video with id {internal_id} could be found (possibly region locked?)', expected=True)
result = self._transform_music_response(response)
if not self._IS_PREMIUM and response.get('isPremiumOnly'):
message = f'This {response.get("type") or "media"} is for premium members only'
if CrunchyrollBaseIE._REFRESH_TOKEN:
raise ExtractorError(message, expected=True)
self.raise_login_required(message, method='password')
result = self._transform_music_response(response)
result['formats'], _ = self._extract_stream(f'music/{internal_id}', internal_id)
self.raise_no_formats(message, expected=True, video_id=internal_id)
else:
self.raise_login_required(message, method='password', metadata_available=True)
else:
result['formats'], _ = self._extract_stream(f'music/{internal_id}', internal_id)
return result

View file

@ -94,13 +94,14 @@ def get_item(type_, preference):
class EuroParlWebstreamIE(InfoExtractor):
_VALID_URL = r'''(?x)
https?://multimedia\.europarl\.europa\.eu/[^/#?]+/
(?:(?!video)[^/#?]+/[\w-]+_)(?P<id>[\w-]+)
https?://multimedia\.europarl\.europa\.eu/
(?:\w+/)?webstreaming/(?:[\w-]+_)?(?P<id>[\w-]+)
'''
_TESTS = [{
'url': 'https://multimedia.europarl.europa.eu/pl/webstreaming/plenary-session_20220914-0900-PLENARY',
'info_dict': {
'id': '62388b15-d85b-4add-99aa-ba12ccf64f0d',
'display_id': '20220914-0900-PLENARY',
'ext': 'mp4',
'title': 'Plenary session',
'release_timestamp': 1663139069,
@ -125,6 +126,7 @@ class EuroParlWebstreamIE(InfoExtractor):
'url': 'https://multimedia.europarl.europa.eu/en/webstreaming/committee-on-culture-and-education_20230301-1130-COMMITTEE-CULT',
'info_dict': {
'id': '7355662c-8eac-445e-4bb9-08db14b0ddd7',
'display_id': '20230301-1130-COMMITTEE-CULT',
'ext': 'mp4',
'release_date': '20230301',
'title': 'Committee on Culture and Education',
@ -142,6 +144,19 @@ class EuroParlWebstreamIE(InfoExtractor):
'live_status': 'is_live',
},
'skip': 'Not live anymore'
}, {
'url': 'https://multimedia.europarl.europa.eu/en/webstreaming/20240320-1345-SPECIAL-PRESSER',
'info_dict': {
'id': 'c1f11567-5b52-470a-f3e1-08dc3c216ace',
'display_id': '20240320-1345-SPECIAL-PRESSER',
'ext': 'mp4',
'release_date': '20240320',
'title': 'md5:7c6c814cac55dea5e2d87bf8d3db2234',
'release_timestamp': 1710939767,
}
}, {
'url': 'https://multimedia.europarl.europa.eu/webstreaming/briefing-for-media-on-2024-european-elections_20240429-1000-SPECIAL-OTHER',
'only_matching': True,
}]
def _real_extract(self, url):
@ -166,6 +181,7 @@ def _real_extract(self, url):
return {
'id': json_info['id'],
'display_id': display_id,
'title': traverse_obj(webpage_nextjs, (('mediaItem', 'title'), ('title', )), get_all=False),
'formats': formats,
'subtitles': subtitles,

View file

@ -1,7 +1,8 @@
import re
from .cloudflarestream import CloudflareStreamIE
from .common import InfoExtractor
from ..utils import traverse_obj
from ..utils.traversal import traverse_obj
class HytaleIE(InfoExtractor):
@ -49,7 +50,7 @@ def _real_extract(self, url):
entries = [
self.url_result(
f'https://cloudflarestream.com/{video_hash}/manifest/video.mpd?parentOrigin=https%3A%2F%2Fhytale.com',
title=self._titles.get(video_hash), url_transparent=True)
CloudflareStreamIE, title=self._titles.get(video_hash), url_transparent=True)
for video_hash in re.findall(
r'<stream\s+class\s*=\s*"ql-video\s+cf-stream"\s+src\s*=\s*"([a-f0-9]{32})"',
webpage)

View file

@ -1,6 +1,12 @@
from .common import InfoExtractor
from ..networking.exceptions import HTTPError
from ..utils import ExtractorError, UserNotLive, int_or_none, url_or_none
from ..utils import (
ExtractorError,
UserNotLive,
int_or_none,
str_or_none,
url_or_none,
)
from ..utils.traversal import traverse_obj
@ -9,17 +15,20 @@ class MixchIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?mixch\.tv/u/(?P<id>\d+)'
_TESTS = [{
'url': 'https://mixch.tv/u/16236849/live',
'url': 'https://mixch.tv/u/16943797/live',
'skip': 'don\'t know if this live persists',
'info_dict': {
'id': '16236849',
'title': '24配信シェア⭕投票🙏💦',
'comment_count': 13145,
'view_count': 28348,
'timestamp': 1636189377,
'uploader': '🦥伊咲👶🏻#フレアワ',
'uploader_id': '16236849',
}
'id': '16943797',
'ext': 'mp4',
'title': '#EntView #カリナ #セブチ 2024-05-05 06:58',
'comment_count': int,
'view_count': int,
'timestamp': 1714726805,
'uploader': 'Ent.View K-news🎶💕',
'uploader_id': '16943797',
'live_status': 'is_live',
'upload_date': '20240503',
},
}, {
'url': 'https://mixch.tv/u/16137876/live',
'only_matching': True,
@ -48,8 +57,20 @@ def _real_extract(self, url):
'protocol': 'm3u8',
}],
'is_live': True,
'__post_extractor': self.extract_comments(video_id),
}
def _get_comments(self, video_id):
yield from traverse_obj(self._download_json(
f'https://mixch.tv/api-web/lives/{video_id}/messages', video_id,
note='Downloading comments', errnote='Failed to download comments'), (..., {
'author': ('name', {str}),
'author_id': ('user_id', {str_or_none}),
'id': ('message_id', {str}, {lambda x: x or None}),
'text': ('body', {str}),
'timestamp': ('created', {int}),
}))
class MixchArchiveIE(InfoExtractor):
IE_NAME = 'mixch:archive'

View file

@ -561,7 +561,8 @@ def _real_extract(self, url):
'timestamp': ('createTime', {self.kilo_or_none}),
})
if not self._yes_playlist(info['songs'] and program_id, info['mainSong']['id']):
if not self._yes_playlist(
info['songs'] and program_id, info['mainSong']['id'], playlist_label='program', video_label='song'):
formats = self.extract_formats(info['mainSong'])
return {

View file

@ -5,7 +5,6 @@
merge_dicts,
parse_count,
url_or_none,
urljoin,
)
from ..utils.traversal import traverse_obj
@ -16,8 +15,7 @@ class NFBBaseIE(InfoExtractor):
def _extract_ep_data(self, webpage, video_id, fatal=False):
return self._search_json(
r'const\s+episodesData\s*=', webpage, 'episode data', video_id,
contains_pattern=r'\[\s*{(?s:.+)}\s*\]', fatal=fatal) or []
r'episodesData\s*:', webpage, 'episode data', video_id, fatal=fatal) or {}
def _extract_ep_info(self, data, video_id, slug=None):
info = traverse_obj(data, (lambda _, v: video_id in v['embed_url'], {
@ -224,18 +222,14 @@ def _real_extract(self, url):
# type_ can change from film to serie(s) after redirect; new slug may have episode number
type_, slug = self._match_valid_url(urlh.url).group('type', 'id')
embed_url = urljoin(f'https://www.{site}.ca', self._html_search_regex(
r'<[^>]+\bid=["\']player-iframe["\'][^>]*\bsrc=["\']([^"\']+)', webpage, 'embed url'))
video_id = self._match_id(embed_url) # embed url has unique slug
player = self._download_webpage(embed_url, video_id, 'Downloading player page')
if 'MESSAGE_GEOBLOCKED' in player:
self.raise_geo_restricted(countries=self._GEO_COUNTRIES)
player_data = self._search_json(
r'window\.PLAYER_OPTIONS\[[^\]]+\]\s*=', webpage, 'player data', slug)
video_id = self._match_id(player_data['overlay']['url']) # overlay url always has unique slug
formats, subtitles = self._extract_m3u8_formats_and_subtitles(
self._html_search_regex(r'source:\s*\'([^\']+)', player, 'm3u8 url'),
video_id, 'mp4', m3u8_id='hls')
player_data['source'], video_id, 'mp4', m3u8_id='hls')
if dv_source := self._html_search_regex(r'dvSource:\s*\'([^\']+)', player, 'dv', default=None):
if dv_source := url_or_none(player_data.get('dvSource')):
fmts, subs = self._extract_m3u8_formats_and_subtitles(
dv_source, video_id, 'mp4', m3u8_id='dv', preference=-2, fatal=False)
for fmt in fmts:
@ -246,17 +240,16 @@ def _real_extract(self, url):
info = {
'id': video_id,
'title': self._html_search_regex(
r'<[^>]+\bid=["\']titleHeader["\'][^>]*>\s*<h1[^>]*>\s*([^<]+?)\s*</h1>',
r'["\']nfb_version_title["\']\s*:\s*["\']([^"\']+)',
webpage, 'title', default=None),
'description': self._html_search_regex(
r'<[^>]+\bid=["\']tabSynopsis["\'][^>]*>\s*<p[^>]*>\s*([^<]+)',
webpage, 'description', default=None),
'thumbnail': self._html_search_regex(
r'poster:\s*\'([^\']+)', player, 'thumbnail', default=None),
'thumbnail': url_or_none(player_data.get('poster')),
'uploader': self._html_search_regex(
r'<[^>]+\bitemprop=["\']name["\'][^>]*>([^<]+)', webpage, 'uploader', default=None),
r'<[^>]+\bitemprop=["\']director["\'][^>]*>([^<]+)', webpage, 'uploader', default=None),
'release_year': int_or_none(self._html_search_regex(
r'<[^>]+\bitemprop=["\']datePublished["\'][^>]*>([^<]+)',
r'["\']nfb_version_year["\']\s*:\s*["\']([^"\']+)',
webpage, 'release_year', default=None)),
} if type_ == 'film' else self._extract_ep_info(self._extract_ep_data(webpage, video_id, slug), video_id)

View file

@ -219,7 +219,29 @@ class PatreonIE(PatreonBaseIE):
'thumbnail': r're:^https?://.+',
},
'params': {'skip_download': 'm3u8'},
}, {
# multiple attachments/embeds
'url': 'https://www.patreon.com/posts/holy-wars-solos-100601977',
'playlist_count': 3,
'info_dict': {
'id': '100601977',
'title': '"Holy Wars" (Megadeth) Solos Transcription & Lesson/Analysis',
'description': 'md5:d099ab976edfce6de2a65c2b169a88d3',
'uploader': 'Bradley Hall',
'uploader_id': '24401883',
'uploader_url': 'https://www.patreon.com/bradleyhallguitar',
'channel_id': '3193932',
'channel_url': 'https://www.patreon.com/bradleyhallguitar',
'channel_follower_count': int,
'timestamp': 1710777855,
'upload_date': '20240318',
'like_count': int,
'comment_count': int,
'thumbnail': r're:^https?://.+',
},
'skip': 'Patron-only content',
}]
_RETURN_TYPE = 'video'
def _real_extract(self, url):
video_id = self._match_id(url)
@ -234,58 +256,54 @@ def _real_extract(self, url):
'include': 'audio,user,user_defined_tags,campaign,attachments_media',
})
attributes = post['data']['attributes']
title = attributes['title'].strip()
image = attributes.get('image') or {}
info = {
'id': video_id,
'title': title,
'description': clean_html(attributes.get('content')),
'thumbnail': image.get('large_url') or image.get('url'),
'timestamp': parse_iso8601(attributes.get('published_at')),
'like_count': int_or_none(attributes.get('like_count')),
'comment_count': int_or_none(attributes.get('comment_count')),
}
can_view_post = traverse_obj(attributes, 'current_user_can_view')
if can_view_post and info['comment_count']:
info['__post_extractor'] = self.extract_comments(video_id)
info = traverse_obj(attributes, {
'title': ('title', {str.strip}),
'description': ('content', {clean_html}),
'thumbnail': ('image', ('large_url', 'url'), {url_or_none}, any),
'timestamp': ('published_at', {parse_iso8601}),
'like_count': ('like_count', {int_or_none}),
'comment_count': ('comment_count', {int_or_none}),
})
for i in post.get('included', []):
i_type = i.get('type')
if i_type == 'media':
media_attributes = i.get('attributes') or {}
download_url = media_attributes.get('download_url')
entries = []
idx = 0
for include in traverse_obj(post, ('included', lambda _, v: v['type'])):
include_type = include['type']
if include_type == 'media':
media_attributes = traverse_obj(include, ('attributes', {dict})) or {}
download_url = url_or_none(media_attributes.get('download_url'))
ext = mimetype2ext(media_attributes.get('mimetype'))
# if size_bytes is None, this media file is likely unavailable
# See: https://github.com/yt-dlp/yt-dlp/issues/4608
size_bytes = int_or_none(media_attributes.get('size_bytes'))
if download_url and ext in KNOWN_EXTENSIONS and size_bytes is not None:
# XXX: what happens if there are multiple attachments?
return {
**info,
idx += 1
entries.append({
'id': f'{video_id}-{idx}',
'ext': ext,
'filesize': size_bytes,
'url': download_url,
}
elif i_type == 'user':
user_attributes = i.get('attributes')
if user_attributes:
info.update({
'uploader': user_attributes.get('full_name'),
'uploader_id': str_or_none(i.get('id')),
'uploader_url': user_attributes.get('url'),
})
elif i_type == 'post_tag':
info.setdefault('tags', []).append(traverse_obj(i, ('attributes', 'value')))
elif include_type == 'user':
info.update(traverse_obj(include, {
'uploader': ('attributes', 'full_name', {str}),
'uploader_id': ('id', {str_or_none}),
'uploader_url': ('attributes', 'url', {url_or_none}),
}))
elif i_type == 'campaign':
info.update({
'channel': traverse_obj(i, ('attributes', 'title')),
'channel_id': str_or_none(i.get('id')),
'channel_url': traverse_obj(i, ('attributes', 'url')),
'channel_follower_count': int_or_none(traverse_obj(i, ('attributes', 'patron_count'))),
})
elif include_type == 'post_tag':
if post_tag := traverse_obj(include, ('attributes', 'value', {str})):
info.setdefault('tags', []).append(post_tag)
elif include_type == 'campaign':
info.update(traverse_obj(include, {
'channel': ('attributes', 'title', {str}),
'channel_id': ('id', {str_or_none}),
'channel_url': ('attributes', 'url', {url_or_none}),
'channel_follower_count': ('attributes', 'patron_count', {int_or_none}),
}))
# handle Vimeo embeds
if traverse_obj(attributes, ('embed', 'provider')) == 'Vimeo':
@ -296,36 +314,50 @@ def _real_extract(self, url):
v_url, video_id, 'Checking Vimeo embed URL',
headers={'Referer': 'https://patreon.com/'},
fatal=False, errnote=False):
return self.url_result(
entries.append(self.url_result(
VimeoIE._smuggle_referrer(v_url, 'https://patreon.com/'),
VimeoIE, url_transparent=True, **info)
VimeoIE, url_transparent=True))
embed_url = traverse_obj(attributes, ('embed', 'url', {url_or_none}))
if embed_url and self._request_webpage(embed_url, video_id, 'Checking embed URL', fatal=False, errnote=False):
return self.url_result(embed_url, **info)
entries.append(self.url_result(embed_url))
post_file = traverse_obj(attributes, 'post_file')
post_file = traverse_obj(attributes, ('post_file', {dict}))
if post_file:
name = post_file.get('name')
ext = determine_ext(name)
if ext in KNOWN_EXTENSIONS:
return {
**info,
entries.append({
'id': video_id,
'ext': ext,
'url': post_file['url'],
}
})
elif name == 'video' or determine_ext(post_file.get('url')) == 'm3u8':
formats, subtitles = self._extract_m3u8_formats_and_subtitles(post_file['url'], video_id)
return {
**info,
entries.append({
'id': video_id,
'formats': formats,
'subtitles': subtitles,
}
})
if can_view_post is False:
can_view_post = traverse_obj(attributes, 'current_user_can_view')
comments = None
if can_view_post and info.get('comment_count'):
comments = self.extract_comments(video_id)
if not entries and can_view_post is False:
self.raise_no_formats('You do not have access to this post', video_id=video_id, expected=True)
else:
elif not entries:
self.raise_no_formats('No supported media found in this post', video_id=video_id, expected=True)
elif len(entries) == 1:
info.update(entries[0])
else:
for entry in entries:
entry.update(info)
return self.playlist_result(entries, video_id, **info, __post_extractor=comments)
info['id'] = video_id
info['__post_extractor'] = comments
return info
def _get_comments(self, post_id):

View file

@ -361,7 +361,7 @@ def extract_count(key):
'like_count': extract_count('favoritings') or extract_count('likes'),
'comment_count': extract_count('comment'),
'repost_count': extract_count('reposts'),
'genre': info.get('genre'),
'genres': traverse_obj(info, ('genre', {str}, {lambda x: x or None}, all)),
'formats': formats if not extract_flat else None
}
@ -395,10 +395,10 @@ class SoundcloudIE(SoundcloudBaseIE):
_TESTS = [
{
'url': 'http://soundcloud.com/ethmusic/lostin-powers-she-so-heavy',
'md5': 'ebef0a451b909710ed1d7787dddbf0d7',
'md5': 'de9bac153e7427a7333b4b0c1b6a18d2',
'info_dict': {
'id': '62986583',
'ext': 'mp3',
'ext': 'opus',
'title': 'Lostin Powers - She so Heavy (SneakPreview) Adrian Ackers Blueprint 1',
'description': 'No Downloads untill we record the finished version this weekend, i was too pumped n i had to post it , earl is prolly gonna b hella p.o\'d',
'uploader': 'E.T. ExTerrestrial Music',
@ -411,6 +411,9 @@ class SoundcloudIE(SoundcloudBaseIE):
'like_count': int,
'comment_count': int,
'repost_count': int,
'thumbnail': 'https://i1.sndcdn.com/artworks-000031955188-rwb18x-original.jpg',
'uploader_url': 'https://soundcloud.com/ethmusic',
'genres': [],
}
},
# geo-restricted
@ -418,7 +421,7 @@ class SoundcloudIE(SoundcloudBaseIE):
'url': 'https://soundcloud.com/the-concept-band/goldrushed-mastered?in=the-concept-band/sets/the-royal-concept-ep',
'info_dict': {
'id': '47127627',
'ext': 'mp3',
'ext': 'opus',
'title': 'Goldrushed',
'description': 'From Stockholm Sweden\r\nPovel / Magnus / Filip / David\r\nwww.theroyalconcept.com',
'uploader': 'The Royal Concept',
@ -431,6 +434,9 @@ class SoundcloudIE(SoundcloudBaseIE):
'like_count': int,
'comment_count': int,
'repost_count': int,
'uploader_url': 'https://soundcloud.com/the-concept-band',
'thumbnail': 'https://i1.sndcdn.com/artworks-v8bFHhXm7Au6-0-original.jpg',
'genres': ['Alternative'],
},
},
# private link
@ -452,6 +458,9 @@ class SoundcloudIE(SoundcloudBaseIE):
'like_count': int,
'comment_count': int,
'repost_count': int,
'uploader_url': 'https://soundcloud.com/jaimemf',
'thumbnail': 'https://a1.sndcdn.com/images/default_avatar_large.png',
'genres': ['youtubedl'],
},
},
# private link (alt format)
@ -473,6 +482,9 @@ class SoundcloudIE(SoundcloudBaseIE):
'like_count': int,
'comment_count': int,
'repost_count': int,
'uploader_url': 'https://soundcloud.com/jaimemf',
'thumbnail': 'https://a1.sndcdn.com/images/default_avatar_large.png',
'genres': ['youtubedl'],
},
},
# downloadable song
@ -482,6 +494,21 @@ class SoundcloudIE(SoundcloudBaseIE):
'info_dict': {
'id': '343609555',
'ext': 'wav',
'title': 'The Following',
'description': '',
'uploader': '80M',
'uploader_id': '312384765',
'uploader_url': 'https://soundcloud.com/the80m',
'upload_date': '20170922',
'timestamp': 1506120436,
'duration': 397.228,
'thumbnail': 'https://i1.sndcdn.com/artworks-000243916348-ktoo7d-original.jpg',
'license': 'all-rights-reserved',
'like_count': int,
'comment_count': int,
'repost_count': int,
'view_count': int,
'genres': ['Dance & EDM'],
},
},
# private link, downloadable format
@ -503,6 +530,9 @@ class SoundcloudIE(SoundcloudBaseIE):
'like_count': int,
'comment_count': int,
'repost_count': int,
'thumbnail': 'https://i1.sndcdn.com/artworks-000240712245-kedn4p-original.jpg',
'uploader_url': 'https://soundcloud.com/oriuplift',
'genres': ['Trance'],
},
},
# no album art, use avatar pic for thumbnail
@ -525,6 +555,8 @@ class SoundcloudIE(SoundcloudBaseIE):
'like_count': int,
'comment_count': int,
'repost_count': int,
'uploader_url': 'https://soundcloud.com/garyvee',
'genres': [],
},
'params': {
'skip_download': True,
@ -532,13 +564,13 @@ class SoundcloudIE(SoundcloudBaseIE):
},
{
'url': 'https://soundcloud.com/giovannisarani/mezzo-valzer',
'md5': 'e22aecd2bc88e0e4e432d7dcc0a1abf7',
'md5': '8227c3473a4264df6b02ad7e5b7527ac',
'info_dict': {
'id': '583011102',
'ext': 'mp3',
'ext': 'opus',
'title': 'Mezzo Valzer',
'description': 'md5:4138d582f81866a530317bae316e8b61',
'uploader': 'Micronie',
'description': 'md5:f4d5f39d52e0ccc2b4f665326428901a',
'uploader': 'Giovanni Sarani',
'uploader_id': '3352531',
'timestamp': 1551394171,
'upload_date': '20190228',
@ -549,6 +581,8 @@ class SoundcloudIE(SoundcloudBaseIE):
'like_count': int,
'comment_count': int,
'repost_count': int,
'genres': ['Piano'],
'uploader_url': 'https://soundcloud.com/giovannisarani',
},
},
{

View file

@ -2,85 +2,88 @@
from .common import InfoExtractor
from ..utils import (
clean_html,
determine_ext,
extract_attributes,
get_element_by_class,
get_element_html_by_class,
int_or_none,
parse_duration,
traverse_obj,
try_get,
url_or_none,
)
from ..utils.traversal import traverse_obj
class TV5MondePlusIE(InfoExtractor):
IE_DESC = 'TV5MONDE+'
_VALID_URL = r'https?://(?:www\.)?(?:tv5mondeplus|revoir\.tv5monde)\.com/toutes-les-videos/[^/]+/(?P<id>[^/?#]+)'
IE_NAME = 'TV5MONDE'
_VALID_URL = r'https?://(?:www\.)?tv5monde\.com/tv/video/(?P<id>[^/?#]+)'
_TESTS = [{
# movie
'url': 'https://revoir.tv5monde.com/toutes-les-videos/cinema/les-novices',
'md5': 'c86f60bf8b75436455b1b205f9745955',
# documentary
'url': 'https://www.tv5monde.com/tv/video/65931-baudouin-l-heritage-d-un-roi-baudouin-l-heritage-d-un-roi',
'md5': 'd2a708902d3df230a357c99701aece05',
'info_dict': {
'id': 'ZX0ipMyFQq_6D4BA7b',
'display_id': 'les-novices',
'id': '3FPa7JMu21_6D4BA7b',
'display_id': '65931-baudouin-l-heritage-d-un-roi-baudouin-l-heritage-d-un-roi',
'ext': 'mp4',
'title': 'Les novices',
'description': 'md5:2e7c33ba3ad48dabfcc2a956b88bde2b',
'upload_date': '20230821',
'thumbnail': 'https://revoir.tv5monde.com/uploads/media/video_thumbnail/0738/60/01e952b7ccf36b7c6007ec9131588954ab651de9.jpeg',
'duration': 5177,
'episode': 'Les novices',
'title': "Baudouin, l'héritage d'un roi",
'thumbnail': 'https://psi.tv5monde.com/upsilon-images/960x540/6f/baudouin-f49c6b0e.jpg',
'duration': 4842,
'upload_date': '20240130',
'timestamp': 1706641242,
'episode': "BAUDOUIN, L'HERITAGE D'UN ROI",
'description': 'md5:78125c74a5cac06d7743a2d09126edad',
'series': "Baudouin, l'héritage d'un roi",
},
}, {
# series episode
'url': 'https://revoir.tv5monde.com/toutes-les-videos/series-fictions/opj-les-dents-de-la-terre-2',
'url': 'https://www.tv5monde.com/tv/video/52952-toute-la-vie-mardi-23-mars-2021',
'md5': 'f5e09637cadd55639c05874e22eb56bf',
'info_dict': {
'id': 'wJ0eeEPozr_6D4BA7b',
'display_id': 'opj-les-dents-de-la-terre-2',
'id': 'obRRZ8m6g9_6D4BA7b',
'display_id': '52952-toute-la-vie-mardi-23-mars-2021',
'ext': 'mp4',
'title': "OPJ - Les dents de la Terre (2)",
'description': 'md5:288f87fd68d993f814e66e60e5302d9d',
'upload_date': '20230823',
'series': 'OPJ',
'episode': 'Les dents de la Terre (2)',
'duration': 2877,
'thumbnail': 'https://dl-revoir.tv5monde.com/images/1a/5753448.jpg'
'title': 'Toute la vie',
'description': 'md5:a824a2e1dfd94cf45fa379a1fb43ce65',
'thumbnail': 'https://psi.tv5monde.com/media/image/960px/5880553.jpg',
'duration': 2526,
'upload_date': '20230721',
'timestamp': 1689971646,
'series': 'Toute la vie',
'episode': 'Mardi 23 mars 2021',
},
}, {
# movie
'url': 'https://revoir.tv5monde.com/toutes-les-videos/cinema/ceux-qui-travaillent',
'md5': '32fa0cde16a4480d1251502a66856d5f',
'url': 'https://www.tv5monde.com/tv/video/8771-ce-fleuve-qui-nous-charrie-ce-fleuve-qui-nous-charrie-p001-ce-fleuve-qui-nous-charrie',
'md5': '87cefc34e10a6bf4f7823cccd7b36eb2',
'info_dict': {
'id': 'dc57a011-ec4b-4648-2a9a-4f03f8352ed3',
'display_id': 'ceux-qui-travaillent',
'id': 'DOcfvdLKXL_6D4BA7b',
'display_id': '8771-ce-fleuve-qui-nous-charrie-ce-fleuve-qui-nous-charrie-p001-ce-fleuve-qui-nous-charrie',
'ext': 'mp4',
'title': 'Ceux qui travaillent',
'description': 'md5:570e8bb688036ace873b2d50d24c026d',
'upload_date': '20210819',
'title': 'Ce fleuve qui nous charrie',
'description': 'md5:62ba3f875343c7fc4082bdfbbc1be992',
'thumbnail': 'https://psi.tv5monde.com/media/image/960px/5476617.jpg',
'duration': 5300,
'upload_date': '20210822',
'timestamp': 1629594105,
'episode': 'CE FLEUVE QUI NOUS CHARRIE-P001-CE FLEUVE QUI NOUS CHARRIE',
'series': 'Ce fleuve qui nous charrie',
},
'skip': 'no longer available',
}, {
# series episode
'url': 'https://revoir.tv5monde.com/toutes-les-videos/series-fictions/vestiaires-caro-actrice',
# news
'url': 'https://www.tv5monde.com/tv/video/70402-tv5monde-le-journal-edition-du-08-05-24-11h',
'md5': 'c62977d6d10754a2ecebba70ad370479',
'info_dict': {
'id': '9e9d599e-23af-6915-843e-ecbf62e97925',
'display_id': 'vestiaires-caro-actrice',
'id': 'LgQFrOCNsc_6D4BA7b',
'display_id': '70402-tv5monde-le-journal-edition-du-08-05-24-11h',
'ext': 'mp4',
'title': "Vestiaires - Caro actrice",
'description': 'md5:db15d2e1976641e08377f942778058ea',
'upload_date': '20210819',
'series': "Vestiaires",
'episode': 'Caro actrice',
'title': 'TV5MONDE, le journal',
'description': 'md5:777dc209eaa4423b678477c36b0b04a8',
'thumbnail': 'https://psi.tv5monde.com/media/image/960px/6184105.jpg',
'duration': 854,
'upload_date': '20240508',
'timestamp': 1715159640,
'series': 'TV5MONDE, le journal',
'episode': 'EDITION DU 08/05/24 - 11H',
},
'params': {
'skip_download': True,
},
'skip': 'no longer available',
}, {
'url': 'https://revoir.tv5monde.com/toutes-les-videos/series-fictions/neuf-jours-en-hiver-neuf-jours-en-hiver',
'only_matching': True,
}, {
'url': 'https://revoir.tv5monde.com/toutes-les-videos/info-societe/le-journal-de-la-rts-edition-du-30-01-20-19h30',
'only_matching': True,
}]
_GEO_BYPASS = False
@ -98,7 +101,6 @@ def _real_extract(self, url):
if ">Ce programme n'est malheureusement pas disponible pour votre zone géographique.<" in webpage:
self.raise_geo_restricted(countries=['FR'])
title = episode = self._html_search_regex(r'<h1>([^<]+)', webpage, 'title')
vpl_data = extract_attributes(self._search_regex(
r'(<[^>]+class="video_player_loader"[^>]+>)',
webpage, 'video player loader'))
@ -147,26 +149,7 @@ def process_video_files(v):
process_video_files(video_files)
metadata = self._parse_json(
vpl_data['data-metadata'], display_id)
duration = (int_or_none(try_get(metadata, lambda x: x['content']['duration']))
or parse_duration(self._html_search_meta('duration', webpage)))
description = self._html_search_regex(
r'(?s)<div[^>]+class=["\']episode-texte[^>]+>(.+?)</div>', webpage,
'description', fatal=False)
series = self._html_search_regex(
r'<p[^>]+class=["\']episode-emission[^>]+>([^<]+)', webpage,
'series', default=None)
if series and series != title:
title = '%s - %s' % (series, title)
upload_date = self._search_regex(
r'(?:date_publication|publish_date)["\']\s*:\s*["\'](\d{4}_\d{2}_\d{2})',
webpage, 'upload date', default=None)
if upload_date:
upload_date = upload_date.replace('_', '')
vpl_data.get('data-metadata') or '{}', display_id, fatal=False)
if not video_id:
video_id = self._search_regex(
@ -175,16 +158,20 @@ def process_video_files(v):
default=display_id)
return {
**traverse_obj(metadata, ('content', {
'id': ('id', {str}),
'title': ('title', {str}),
'episode': ('title', {str}),
'series': ('series', {str}),
'timestamp': ('publishDate_ts', {int_or_none}),
'duration': ('duration', {int_or_none}),
})),
'id': video_id,
'display_id': display_id,
'title': title,
'description': description,
'thumbnail': vpl_data.get('data-image'),
'duration': duration,
'upload_date': upload_date,
'title': clean_html(get_element_by_class('main-title', webpage)),
'description': clean_html(get_element_by_class('text', get_element_html_by_class('ep-summary', webpage) or '')),
'thumbnail': url_or_none(vpl_data.get('data-image')),
'formats': formats,
'subtitles': self._extract_subtitles(self._parse_json(
traverse_obj(vpl_data, ('data-captions', {str}), default='{}'), display_id, fatal=False)),
'series': series,
'episode': episode,
}

View file

@ -1,10 +1,9 @@
import functools
import re
from .common import InfoExtractor
from ..utils import (
float_or_none,
int_or_none,
smuggle_url,
strip_or_none,
)
from ..utils import float_or_none, int_or_none, smuggle_url, strip_or_none
from ..utils.traversal import traverse_obj
class TVAIE(InfoExtractor):
@ -49,11 +48,20 @@ class QubIE(InfoExtractor):
'info_dict': {
'id': '6084352463001',
'ext': 'mp4',
'title': 'Épisode 01',
'title': 'Ép 01. Mon dernier jour',
'uploader_id': '5481942443001',
'upload_date': '20190907',
'timestamp': 1567899756,
'description': 'md5:9c0d7fbb90939420c651fd977df90145',
'thumbnail': r're:https://.+\.jpg',
'episode': 'Ép 01. Mon dernier jour',
'episode_number': 1,
'tags': ['alerte amber', 'alerte amber saison 1', 'surdemande'],
'duration': 2625.963,
'season': 'Season 1',
'season_number': 1,
'series': 'Alerte Amber',
'channel': 'TVA',
},
}, {
'url': 'https://www.qub.ca/tele/video/lcn-ca-vous-regarde-rev-30s-ap369664-1009357943',
@ -64,22 +72,24 @@ class QubIE(InfoExtractor):
def _real_extract(self, url):
entity_id = self._match_id(url)
entity = self._download_json(
'https://www.qub.ca/proxy/pfu/content-delivery-service/v1/entities',
entity_id, query={'id': entity_id})
webpage = self._download_webpage(url, entity_id)
entity = self._search_nextjs_data(webpage, entity_id)['props']['initialProps']['pageProps']['fallbackData']
video_id = entity['videoId']
episode = strip_or_none(entity.get('name'))
return {
'_type': 'url_transparent',
'url': f'https://videos.tva.ca/details/_{video_id}',
'ie_key': TVAIE.ie_key(),
'id': video_id,
'title': episode,
# 'url': self.BRIGHTCOVE_URL_TEMPLATE % entity['referenceId'],
'url': 'https://videos.tva.ca/details/_' + video_id,
'description': entity.get('longDescription'),
'duration': float_or_none(entity.get('durationMillis'), 1000),
'episode': episode,
'episode_number': int_or_none(entity.get('episodeNumber')),
# 'ie_key': 'BrightcoveNew',
'ie_key': TVAIE.ie_key(),
**traverse_obj(entity, {
'description': ('longDescription', {str}),
'duration': ('durationMillis', {functools.partial(float_or_none, scale=1000)}),
'channel': ('knownEntities', 'channel', 'name', {str}),
'series': ('knownEntities', 'videoShow', 'name', {str}),
'season_number': ('slug', {lambda x: re.search(r'/s(?:ai|ea)son-(\d+)/', x)}, 1, {int_or_none}),
'episode_number': ('episodeNumber', {int_or_none}),
}),
}

View file

@ -451,6 +451,7 @@ def _real_extract(self, url):
info_page, 'view count', default=None))
formats = []
subtitles = {}
for format_id, format_url in data.items():
format_url = url_or_none(format_url)
if not format_url or not format_url.startswith(('http', '//', 'rtmp')):
@ -462,12 +463,21 @@ def _real_extract(self, url):
formats.append({
'format_id': format_id,
'url': format_url,
'ext': 'mp4',
'source_preference': 1,
'height': height,
})
elif format_id == 'hls':
formats.extend(self._extract_m3u8_formats(
fmts, subs = self._extract_m3u8_formats_and_subtitles(
format_url, video_id, 'mp4', 'm3u8_native',
m3u8_id=format_id, fatal=False, live=is_live))
m3u8_id=format_id, fatal=False, live=is_live)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
elif format_id.startswith('dash_'):
fmts, subs = self._extract_mpd_formats_and_subtitles(
format_url, video_id, mpd_id=format_id, fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
elif format_id == 'rtmp':
formats.append({
'format_id': format_id,
@ -475,7 +485,6 @@ def _real_extract(self, url):
'ext': 'flv',
})
subtitles = {}
for sub in data.get('subs') or {}:
subtitles.setdefault(sub.get('lang', 'en'), []).append({
'ext': sub.get('title', '.srt').split('.')[-1],
@ -496,6 +505,7 @@ def _real_extract(self, url):
'comment_count': int_or_none(mv_data.get('commcount')),
'is_live': is_live,
'subtitles': subtitles,
'_format_sort_fields': ('res', 'source'),
}

View file

@ -12,6 +12,7 @@
jwt_decode_hs256,
traverse_obj,
try_call,
url_basename,
url_or_none,
urlencode_postdata,
variadic,
@ -194,8 +195,7 @@ def _real_extract(self, url):
return {
'id': video_id,
'formats': self._get_formats(video_data, (
(('protocolHls', 'url'), ('chromecastUrls', ...)), {url_or_none}), video_id),
'formats': self._get_formats(video_data, ('protocolHls', 'url', {url_or_none}), video_id),
**traverse_obj(metadata, {
'title': ('displayName', {str}),
'description': ('description', {str}),
@ -259,6 +259,10 @@ class WrestleUniversePPVIE(WrestleUniverseBaseIE):
'params': {
'skip_download': 'm3u8',
},
}, {
'note': 'manifest provides live-a (partial) and live-b (full) streams',
'url': 'https://www.wrestle-universe.com/en/lives/umc99R9XsexXrxr9VjTo9g',
'only_matching': True,
}]
_API_PATH = 'events'
@ -285,12 +289,16 @@ def _real_extract(self, url):
video_data, decrypt = self._call_encrypted_api(
video_id, ':watchArchive', 'watch archive', data={'method': 1})
info['formats'] = self._get_formats(video_data, (
('hls', None), ('urls', 'chromecastUrls'), ..., {url_or_none}), video_id)
# 'chromecastUrls' can be only partial videos, avoid
info['formats'] = self._get_formats(video_data, ('hls', (('urls', ...), 'url'), {url_or_none}), video_id)
for f in info['formats']:
# bitrates are exaggerated in PPV playlists, so avoid wrong/huge filesize_approx values
if f.get('tbr'):
f['tbr'] = int(f['tbr'] / 2.5)
# prefer variants with the same basename as the master playlist to avoid partial streams
f['format_id'] = url_basename(f['url']).partition('.')[0]
if not f['format_id'].startswith(url_basename(f['manifest_url']).partition('.')[0]):
f['preference'] = -10
hls_aes_key = traverse_obj(video_data, ('hls', 'key', {decrypt}))
if hls_aes_key:

View file

@ -259,15 +259,15 @@ def _real_extract(self, url):
webpage = self._download_webpage(redirect, video_id, note='Redirecting')
data_json = self._search_json(
r'("data"\s*:|data\s*=)', webpage, 'metadata', video_id, contains_pattern=r'{["\']_*serverState_*video.+}')
serverstate = self._search_regex(r'(_+serverState_+video-site_[^_]+_+)',
webpage, 'server state').replace('State', 'Settings')
serverstate = self._search_regex(r'(_+serverState_+video-site_[^_]+_+)', webpage, 'server state')
uploader = self._search_regex(r'(<a\s*class=["\']card-channel-link[^"\']+["\'][^>]+>)',
webpage, 'uploader', default='<a>')
uploader_name = extract_attributes(uploader).get('aria-label')
video_json = try_get(data_json, lambda x: x[serverstate]['exportData']['video'], dict)
stream_urls = try_get(video_json, lambda x: x['video']['streams'])
item_id = traverse_obj(data_json, (serverstate, 'videoViewer', 'openedItemId', {str}))
video_json = traverse_obj(data_json, (serverstate, 'videoViewer', 'items', item_id, {dict})) or {}
formats, subtitles = [], {}
for s_url in stream_urls:
for s_url in traverse_obj(video_json, ('video', 'streams', ..., {url_or_none})):
ext = determine_ext(s_url)
if ext == 'mpd':
fmts, subs = self._extract_mpd_formats_and_subtitles(s_url, video_id, mpd_id='dash')

View file

@ -72,15 +72,15 @@ class YouPornIE(InfoExtractor):
'id': '16290308',
'age_limit': 18,
'categories': [],
'description': 'md5:00ea70f642f431c379763c17c2f396bc',
'description': str, # TODO: detect/remove SEO spam description in ytdl backport
'display_id': 'tinderspecial-trailer1',
'duration': 298.0,
'ext': 'mp4',
'upload_date': '20201123',
'uploader': 'Ersties',
'tags': [],
'thumbnail': 'https://fi1.ypncdn.com/202011/23/16290308/original/8/tinderspecial-trailer1-8(m=eaAaaEPbaaaa).jpg',
'timestamp': 1606089600,
'thumbnail': r're:https://.+\.jpg',
'timestamp': 1606147564,
'title': 'Tinder In Real Life',
'view_count': int,
}
@ -88,11 +88,17 @@ class YouPornIE(InfoExtractor):
def _real_extract(self, url):
video_id, display_id = self._match_valid_url(url).group('id', 'display_id')
definitions = self._download_json(
f'https://www.youporn.com/api/video/media_definitions/{video_id}/', display_id or video_id)
self._set_cookie('.youporn.com', 'age_verified', '1')
webpage = self._download_webpage(f'https://www.youporn.com/watch/{video_id}', video_id)
definitions = self._search_json(r'\bplayervars\s*:', webpage, 'player vars', video_id)['mediaDefinitions']
def get_format_data(data, f):
return traverse_obj(data, lambda _, v: v['format'] == f and url_or_none(v['videoUrl']))
def get_format_data(data, stream_type):
info_url = traverse_obj(data, (lambda _, v: v['format'] == stream_type, 'videoUrl', {url_or_none}, any))
if not info_url:
return []
return traverse_obj(
self._download_json(info_url, video_id, f'Downloading {stream_type} info JSON', fatal=False),
lambda _, v: v['format'] == stream_type and url_or_none(v['videoUrl']))
formats = []
# Try to extract only the actual master m3u8 first, avoiding the duplicate single resolution "master" m3u8s
@ -123,10 +129,6 @@ def get_format_data(data, f):
f['height'] = height
formats.append(f)
webpage = self._download_webpage(
'http://www.youporn.com/watch/%s' % video_id, display_id,
headers={'Cookie': 'age_verified=1'})
title = self._html_search_regex(
r'(?s)<div[^>]+class=["\']watchVideoTitle[^>]+>(.+?)</div>',
webpage, 'title', default=None) or self._og_search_title(

View file

@ -21,7 +21,7 @@
TransportError,
)
from .impersonate import ImpersonateRequestHandler, ImpersonateTarget
from ..dependencies import curl_cffi
from ..dependencies import curl_cffi, certifi
from ..utils import int_or_none
if curl_cffi is None:
@ -132,6 +132,16 @@ def _check_extensions(self, extensions):
extensions.pop('cookiejar', None)
extensions.pop('timeout', None)
def send(self, request: Request) -> Response:
target = self._get_request_target(request)
try:
response = super().send(request)
except HTTPError as e:
e.response.extensions['impersonate'] = target
raise
response.extensions['impersonate'] = target
return response
def _send(self, request: Request):
max_redirects_exceeded = False
session: curl_cffi.requests.Session = self._get_instance(
@ -156,6 +166,13 @@ def _send(self, request: Request):
# See: https://curl.se/libcurl/c/CURLOPT_HTTPPROXYTUNNEL.html
session.curl.setopt(CurlOpt.HTTPPROXYTUNNEL, 1)
# curl_cffi does not currently set these for proxies
session.curl.setopt(CurlOpt.PROXY_CAINFO, certifi.where())
if not self.verify:
session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYPEER, 0)
session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYHOST, 0)
headers = self._get_impersonate_headers(request)
if self._client_cert:
@ -203,7 +220,10 @@ def _send(self, request: Request):
max_redirects_exceeded = True
curl_response = e.response
elif e.code == CurlECode.PROXY:
elif (
e.code == CurlECode.PROXY
or (e.code == CurlECode.RECV_ERROR and 'Received HTTP code 407 from proxy after CONNECT' in str(e))
):
raise ProxyError(cause=e) from e
else:
raise TransportError(cause=e) from e

View file

@ -497,6 +497,7 @@ class Response(io.IOBase):
@param headers: response headers.
@param status: Response HTTP status code. Default is 200 OK.
@param reason: HTTP status reason. Will use built-in reasons based on status code if not provided.
@param extensions: Dictionary of handler-specific response extensions.
"""
def __init__(
@ -505,7 +506,9 @@ def __init__(
url: str,
headers: Mapping[str, str],
status: int = 200,
reason: str = None):
reason: str = None,
extensions: dict = None
):
self.fp = fp
self.headers = Message()
@ -517,6 +520,7 @@ def __init__(
self.reason = reason or HTTPStatus(status).phrase
except ValueError:
self.reason = None
self.extensions = extensions or {}
def readable(self):
return self.fp.readable()

View file

@ -69,6 +69,10 @@ def _get_variant_and_executable_path():
# Ref: https://en.wikipedia.org/wiki/Uname#Examples
if machine[1:] in ('x86', 'x86_64', 'amd64', 'i386', 'i686'):
machine = '_x86' if platform.architecture()[0][:2] == '32' else ''
# sys.executable returns a /tmp/ path for staticx builds (linux_static)
# Ref: https://staticx.readthedocs.io/en/latest/usage.html#run-time-information
if static_exe_path := os.getenv('STATICX_PROG_PATH'):
path = static_exe_path
return f'{remove_end(sys.platform, "32")}{machine}_exe', path
path = os.path.dirname(__file__)

View file

@ -1638,16 +1638,14 @@ def get_filesystem_encoding():
return encoding if encoding is not None else 'utf-8'
_WINDOWS_QUOTE_TRANS = str.maketrans({'"': '\\"', '\\': '\\\\'})
_WINDOWS_QUOTE_TRANS = str.maketrans({'"': R'\"'})
_CMD_QUOTE_TRANS = str.maketrans({
# Keep quotes balanced by replacing them with `""` instead of `\\"`
'"': '""',
# Requires a variable `=` containing `"^\n\n"` (set in `utils.Popen`)
# These require an env-variable `=` containing `"^\n\n"` (set in `utils.Popen`)
# `=` should be unique since variables containing `=` cannot be set using cmd
'\n': '%=%',
# While we are only required to escape backslashes immediately before quotes,
# we instead escape all of 'em anyways to be consistent
'\\': '\\\\',
'\r': '%=%',
# Use zero length variable replacement so `%` doesn't get expanded
# `cd` is always set as long as extensions are enabled (`/E:ON` in `utils.Popen`)
'%': '%%cd:~,%',
@ -1656,19 +1654,14 @@ def get_filesystem_encoding():
def shell_quote(args, *, shell=False):
args = list(variadic(args))
if any(isinstance(item, bytes) for item in args):
deprecation_warning('Passing bytes to utils.shell_quote is deprecated')
encoding = get_filesystem_encoding()
for index, item in enumerate(args):
if isinstance(item, bytes):
args[index] = item.decode(encoding)
if compat_os_name != 'nt':
return shlex.join(args)
trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
return ' '.join(
s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII) else s.translate(trans).join('""')
s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII)
else re.sub(r'(\\+)("|$)', r'\1\1\2', s).translate(trans).join('""')
for s in args)