mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-29 23:41:28 +00:00
330 lines
11 KiB
Python
330 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Token management utils"""
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
import re
|
|
import typing
|
|
|
|
from pydantic import BaseModel
|
|
from mnemonic import Mnemonic
|
|
|
|
from . import ReadUserData, UserDataFiles, WriteUserData, parse_date
|
|
|
|
"""
|
|
Token are stored in the tokens.json file.
|
|
File contains device tokens, recovery token and new device auth token.
|
|
File structure:
|
|
{
|
|
"tokens": [
|
|
{
|
|
"token": "device token",
|
|
"name": "device name",
|
|
"date": "date of creation",
|
|
}
|
|
],
|
|
"recovery_token": {
|
|
"token": "recovery token",
|
|
"date": "date of creation",
|
|
"expiration": "date of expiration",
|
|
"uses_left": "number of uses left"
|
|
},
|
|
"new_device": {
|
|
"token": "new device auth token",
|
|
"date": "date of creation",
|
|
"expiration": "date of expiration",
|
|
}
|
|
}
|
|
Recovery token may or may not have expiration date and uses_left.
|
|
There may be no recovery token at all.
|
|
Device tokens must be unique.
|
|
"""
|
|
|
|
|
|
def _get_tokens():
|
|
"""Get all tokens as list of tokens of every device"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
return [token["token"] for token in tokens["tokens"]]
|
|
|
|
|
|
def _get_token_names():
|
|
"""Get all token names"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
return [t["name"] for t in tokens["tokens"]]
|
|
|
|
|
|
def _validate_token_name(name):
|
|
"""Token name must be an alphanumeric string and not empty.
|
|
Replace invalid characters with '_'
|
|
If token name exists, add a random number to the end of the name until it is unique.
|
|
"""
|
|
if not re.match("^[a-zA-Z0-9]*$", name):
|
|
name = re.sub("[^a-zA-Z0-9]", "_", name)
|
|
if name == "":
|
|
name = "Unknown device"
|
|
while name in _get_token_names():
|
|
name += str(secrets.randbelow(10))
|
|
return name
|
|
|
|
|
|
def is_token_valid(token):
|
|
"""Check if token is valid"""
|
|
if token in _get_tokens():
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_token_name_exists(token_name):
|
|
"""Check if token name exists"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
return token_name in [t["name"] for t in tokens["tokens"]]
|
|
|
|
|
|
def is_token_name_pair_valid(token_name, token):
|
|
"""Check if token name and token pair exists"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
for t in tokens["tokens"]:
|
|
if t["name"] == token_name and t["token"] == token:
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_token_name(token):
|
|
"""Return the name of the token provided"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
for t in tokens["tokens"]:
|
|
if t["token"] == token:
|
|
return t["name"]
|
|
return None
|
|
|
|
|
|
class BasicTokenInfo(BaseModel):
|
|
"""Token info"""
|
|
|
|
name: str
|
|
date: datetime
|
|
|
|
|
|
def get_tokens_info():
|
|
"""Get all tokens info without tokens themselves"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
return [
|
|
BasicTokenInfo(
|
|
name=t["name"],
|
|
date=parse_date(t["date"]),
|
|
)
|
|
for t in tokens["tokens"]
|
|
]
|
|
|
|
|
|
def _generate_token():
|
|
"""Generates new token and makes sure it is unique"""
|
|
token = secrets.token_urlsafe(32)
|
|
while token in _get_tokens():
|
|
token = secrets.token_urlsafe(32)
|
|
return token
|
|
|
|
|
|
def create_token(name):
|
|
"""Create new token"""
|
|
token = _generate_token()
|
|
name = _validate_token_name(name)
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
tokens["tokens"].append(
|
|
{
|
|
"token": token,
|
|
"name": name,
|
|
"date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")),
|
|
}
|
|
)
|
|
return token
|
|
|
|
|
|
def delete_token(token_name):
|
|
"""Delete token"""
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
tokens["tokens"] = [t for t in tokens["tokens"] if t["name"] != token_name]
|
|
|
|
|
|
def refresh_token(token: str) -> typing.Optional[str]:
|
|
"""Change the token field of the existing token"""
|
|
new_token = _generate_token()
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
for t in tokens["tokens"]:
|
|
if t["token"] == token:
|
|
t["token"] = new_token
|
|
return new_token
|
|
return None
|
|
|
|
|
|
def is_recovery_token_exists():
|
|
"""Check if recovery token exists"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
return "recovery_token" in tokens
|
|
|
|
|
|
def is_recovery_token_valid():
|
|
"""Check if recovery token is valid"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
if "recovery_token" not in tokens:
|
|
return False
|
|
recovery_token = tokens["recovery_token"]
|
|
if "uses_left" in recovery_token and recovery_token["uses_left"] is not None:
|
|
if recovery_token["uses_left"] <= 0:
|
|
return False
|
|
if "expiration" not in recovery_token or recovery_token["expiration"] is None:
|
|
return True
|
|
return datetime.now() < parse_date(recovery_token["expiration"])
|
|
|
|
|
|
def get_recovery_token_status():
|
|
"""Get recovery token date of creation, expiration and uses left"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
if "recovery_token" not in tokens:
|
|
return None
|
|
recovery_token = tokens["recovery_token"]
|
|
return {
|
|
"date": recovery_token["date"],
|
|
"expiration": recovery_token["expiration"]
|
|
if "expiration" in recovery_token
|
|
else None,
|
|
"uses_left": recovery_token["uses_left"]
|
|
if "uses_left" in recovery_token
|
|
else None,
|
|
}
|
|
|
|
|
|
def _get_recovery_token():
|
|
"""Get recovery token"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
if "recovery_token" not in tokens:
|
|
return None
|
|
return tokens["recovery_token"]["token"]
|
|
|
|
|
|
def generate_recovery_token(
|
|
expiration: typing.Optional[datetime], uses_left: typing.Optional[int]
|
|
) -> str:
|
|
"""Generate a 24 bytes recovery token and return a mneomnic word list.
|
|
Write a string representation of the recovery token to the tokens.json file.
|
|
"""
|
|
# expires must be a date or None
|
|
# uses_left must be an integer or None
|
|
if expiration is not None:
|
|
if not isinstance(expiration, datetime):
|
|
raise TypeError("expires must be a datetime object")
|
|
if uses_left is not None:
|
|
if not isinstance(uses_left, int):
|
|
raise TypeError("uses_left must be an integer")
|
|
if uses_left <= 0:
|
|
raise ValueError("uses_left must be greater than 0")
|
|
|
|
recovery_token = secrets.token_bytes(24)
|
|
recovery_token_str = recovery_token.hex()
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
tokens["recovery_token"] = {
|
|
"token": recovery_token_str,
|
|
"date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")),
|
|
"expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.%f")
|
|
if expiration is not None
|
|
else None,
|
|
"uses_left": uses_left if uses_left is not None else None,
|
|
}
|
|
return Mnemonic(language="english").to_mnemonic(recovery_token)
|
|
|
|
|
|
def use_mnemonic_recoverery_token(mnemonic_phrase, name):
|
|
"""Use the recovery token by converting the mnemonic word list to a byte array.
|
|
If the recovery token if invalid itself, return None
|
|
If the binary representation of phrase not matches
|
|
the byte array of the recovery token, return None.
|
|
If the mnemonic phrase is valid then generate a device token and return it.
|
|
Substract 1 from uses_left if it exists.
|
|
mnemonic_phrase is a string representation of the mnemonic word list.
|
|
"""
|
|
if not is_recovery_token_valid():
|
|
return None
|
|
recovery_token_str = _get_recovery_token()
|
|
if recovery_token_str is None:
|
|
return None
|
|
recovery_token = bytes.fromhex(recovery_token_str)
|
|
if not Mnemonic(language="english").check(mnemonic_phrase):
|
|
return None
|
|
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
|
|
if phrase_bytes != recovery_token:
|
|
return None
|
|
token = _generate_token()
|
|
name = _validate_token_name(name)
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
tokens["tokens"].append(
|
|
{
|
|
"token": token,
|
|
"name": name,
|
|
"date": str(datetime.now()),
|
|
}
|
|
)
|
|
if "recovery_token" in tokens:
|
|
if (
|
|
"uses_left" in tokens["recovery_token"]
|
|
and tokens["recovery_token"]["uses_left"] is not None
|
|
):
|
|
tokens["recovery_token"]["uses_left"] -= 1
|
|
return token
|
|
|
|
|
|
def get_new_device_auth_token() -> str:
|
|
"""Generate a new device auth token which is valid for 10 minutes
|
|
and return a mnemonic phrase representation
|
|
Write token to the new_device of the tokens.json file.
|
|
"""
|
|
token = secrets.token_bytes(16)
|
|
token_str = token.hex()
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
tokens["new_device"] = {
|
|
"token": token_str,
|
|
"date": str(datetime.now()),
|
|
"expiration": str(datetime.now() + timedelta(minutes=10)),
|
|
}
|
|
return Mnemonic(language="english").to_mnemonic(token)
|
|
|
|
|
|
def _get_new_device_auth_token():
|
|
"""Get new device auth token. If it is expired, return None"""
|
|
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
|
if "new_device" not in tokens:
|
|
return None
|
|
new_device = tokens["new_device"]
|
|
if "expiration" not in new_device:
|
|
return None
|
|
expiration = parse_date(new_device["expiration"])
|
|
if datetime.now() > expiration:
|
|
return None
|
|
return new_device["token"]
|
|
|
|
|
|
def delete_new_device_auth_token():
|
|
"""Delete new device auth token"""
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
if "new_device" in tokens:
|
|
del tokens["new_device"]
|
|
|
|
|
|
def use_new_device_auth_token(mnemonic_phrase, name):
|
|
"""Use the new device auth token by converting the mnemonic string to a byte array.
|
|
If the mnemonic phrase is valid then generate a device token and return it.
|
|
New device auth token must be deleted.
|
|
"""
|
|
token_str = _get_new_device_auth_token()
|
|
if token_str is None:
|
|
return None
|
|
token = bytes.fromhex(token_str)
|
|
if not Mnemonic(language="english").check(mnemonic_phrase):
|
|
return None
|
|
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
|
|
if phrase_bytes != token:
|
|
return None
|
|
token = create_token(name)
|
|
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
|
if "new_device" in tokens:
|
|
del tokens["new_device"]
|
|
return token
|