mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-11-28 02:51:29 +00:00
[twitch] Add basic support for two-factor authentication
This commit is contained in:
parent
bcefc59279
commit
c64c03be35
|
@ -12,7 +12,6 @@
|
||||||
compat_str,
|
compat_str,
|
||||||
compat_urllib_parse_urlencode,
|
compat_urllib_parse_urlencode,
|
||||||
compat_urllib_parse_urlparse,
|
compat_urllib_parse_urlparse,
|
||||||
compat_urlparse,
|
|
||||||
)
|
)
|
||||||
from ..utils import (
|
from ..utils import (
|
||||||
clean_html,
|
clean_html,
|
||||||
|
@ -24,6 +23,7 @@
|
||||||
parse_iso8601,
|
parse_iso8601,
|
||||||
update_url_query,
|
update_url_query,
|
||||||
urlencode_postdata,
|
urlencode_postdata,
|
||||||
|
urljoin,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ class TwitchBaseIE(InfoExtractor):
|
||||||
|
|
||||||
_API_BASE = 'https://api.twitch.tv'
|
_API_BASE = 'https://api.twitch.tv'
|
||||||
_USHER_BASE = 'https://usher.ttvnw.net'
|
_USHER_BASE = 'https://usher.ttvnw.net'
|
||||||
_LOGIN_URL = 'http://www.twitch.tv/login'
|
_LOGIN_URL = 'https://www.twitch.tv/login'
|
||||||
_CLIENT_ID = 'jzkbprff40iqj646a697cyrvl0zt2m6'
|
_CLIENT_ID = 'jzkbprff40iqj646a697cyrvl0zt2m6'
|
||||||
_NETRC_MACHINE = 'twitch'
|
_NETRC_MACHINE = 'twitch'
|
||||||
|
|
||||||
|
@ -64,35 +64,22 @@ def fail(message):
|
||||||
raise ExtractorError(
|
raise ExtractorError(
|
||||||
'Unable to login. Twitch said: %s' % message, expected=True)
|
'Unable to login. Twitch said: %s' % message, expected=True)
|
||||||
|
|
||||||
login_page, handle = self._download_webpage_handle(
|
def post_login_form(page, urlh, note, data):
|
||||||
self._LOGIN_URL, None, 'Downloading login page')
|
form = self._hidden_inputs(page)
|
||||||
|
form.update(data)
|
||||||
# Some TOR nodes and public proxies are blocked completely
|
|
||||||
if 'blacklist_message' in login_page:
|
|
||||||
fail(clean_html(login_page))
|
|
||||||
|
|
||||||
login_form = self._hidden_inputs(login_page)
|
|
||||||
|
|
||||||
login_form.update({
|
|
||||||
'username': username,
|
|
||||||
'password': password,
|
|
||||||
})
|
|
||||||
|
|
||||||
redirect_url = handle.geturl()
|
|
||||||
|
|
||||||
|
page_url = urlh.geturl()
|
||||||
post_url = self._search_regex(
|
post_url = self._search_regex(
|
||||||
r'<form[^>]+action=(["\'])(?P<url>.+?)\1', login_page,
|
r'<form[^>]+action=(["\'])(?P<url>.+?)\1', page,
|
||||||
'post url', default=redirect_url, group='url')
|
'post url', default=page_url, group='url')
|
||||||
|
post_url = urljoin(page_url, post_url)
|
||||||
|
|
||||||
if not post_url.startswith('http'):
|
headers = {'Referer': page_url}
|
||||||
post_url = compat_urlparse.urljoin(redirect_url, post_url)
|
|
||||||
|
|
||||||
headers = {'Referer': redirect_url}
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = self._download_json(
|
response = self._download_json(
|
||||||
post_url, None, 'Logging in as %s' % username,
|
post_url, None, note,
|
||||||
data=urlencode_postdata(login_form),
|
data=urlencode_postdata(form),
|
||||||
headers=headers)
|
headers=headers)
|
||||||
except ExtractorError as e:
|
except ExtractorError as e:
|
||||||
if isinstance(e.cause, compat_HTTPError) and e.cause.code == 400:
|
if isinstance(e.cause, compat_HTTPError) and e.cause.code == 400:
|
||||||
|
@ -102,10 +89,38 @@ def fail(message):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if response.get('redirect'):
|
if response.get('redirect'):
|
||||||
self._download_webpage(
|
redirect_url = urljoin(post_url, response['redirect'])
|
||||||
response['redirect'], None, 'Downloading login redirect page',
|
return self._download_webpage_handle(
|
||||||
|
redirect_url, None, 'Downloading login redirect page',
|
||||||
headers=headers)
|
headers=headers)
|
||||||
|
|
||||||
|
login_page, handle = self._download_webpage_handle(
|
||||||
|
self._LOGIN_URL, None, 'Downloading login page')
|
||||||
|
|
||||||
|
# Some TOR nodes and public proxies are blocked completely
|
||||||
|
if 'blacklist_message' in login_page:
|
||||||
|
fail(clean_html(login_page))
|
||||||
|
|
||||||
|
login_data = {
|
||||||
|
'username': username,
|
||||||
|
'password': password,
|
||||||
|
}
|
||||||
|
redirect_res = post_login_form(
|
||||||
|
login_page, handle, 'Logging in as %s' % username, login_data)
|
||||||
|
|
||||||
|
if not redirect_res:
|
||||||
|
return
|
||||||
|
redirect_page, handle = redirect_res
|
||||||
|
|
||||||
|
if re.search(r'(?i)<form[^>]+id="two-factor-submit"', redirect_page) is not None:
|
||||||
|
# TODO: Add mechanism to request an SMS or phone call
|
||||||
|
tfa_token = self._get_tfa_info('two-factor authentication token')
|
||||||
|
tfa_data = {
|
||||||
|
'authy_token': tfa_token,
|
||||||
|
'remember_2fa': 'true',
|
||||||
|
}
|
||||||
|
post_login_form(redirect_page, handle, 'Submitting TFA token', tfa_data)
|
||||||
|
|
||||||
def _prefer_source(self, formats):
|
def _prefer_source(self, formats):
|
||||||
try:
|
try:
|
||||||
source = next(f for f in formats if f['format_id'] == 'Source')
|
source = next(f for f in formats if f['format_id'] == 'Source')
|
||||||
|
|
Loading…
Reference in a new issue