mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-17 08:02:36 +00:00
Inital auth work, untested
This commit is contained in:
parent
aa76f87828
commit
ea696d0f0e
|
@ -9,6 +9,7 @@ flask-swagger-ui
|
|||
pytz
|
||||
huey
|
||||
gevent
|
||||
mnemonic
|
||||
|
||||
pytest
|
||||
coverage
|
||||
|
|
|
@ -13,11 +13,14 @@ from selfprivacy_api.resources.users import User, Users
|
|||
from selfprivacy_api.resources.common import ApiVersion
|
||||
from selfprivacy_api.resources.system import api_system
|
||||
from selfprivacy_api.resources.services import services as api_services
|
||||
from selfprivacy_api.resources.api_auth import auth as api_auth
|
||||
|
||||
from selfprivacy_api.restic_controller.tasks import huey, init_restic
|
||||
|
||||
from selfprivacy_api.migrations import run_migrations
|
||||
|
||||
from selfprivacy_api.utils.auth import is_token_valid
|
||||
|
||||
swagger_blueprint = get_swaggerui_blueprint(
|
||||
"/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"}
|
||||
)
|
||||
|
@ -29,9 +32,6 @@ def create_app(test_config=None):
|
|||
api = Api(app)
|
||||
|
||||
if test_config is None:
|
||||
app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN")
|
||||
if app.config["AUTH_TOKEN"] is None:
|
||||
raise ValueError("AUTH_TOKEN is not set")
|
||||
app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0")
|
||||
app.config["B2_BUCKET"] = os.environ.get("B2_BUCKET")
|
||||
else:
|
||||
|
@ -40,14 +40,20 @@ def create_app(test_config=None):
|
|||
# Check bearer token
|
||||
@app.before_request
|
||||
def check_auth():
|
||||
# Exclude swagger-ui
|
||||
if not request.path.startswith("/api"):
|
||||
# Exclude swagger-ui, /auth/new_device/authorize, /auth/recovery_token/use
|
||||
if request.path.startswith("/api"):
|
||||
pass
|
||||
elif request.path.startswith("/auth/new_device/authorize"):
|
||||
pass
|
||||
elif request.path.startswith("/auth/recovery_token/use"):
|
||||
pass
|
||||
else:
|
||||
auth = request.headers.get("Authorization")
|
||||
if auth is None:
|
||||
return jsonify({"error": "Missing Authorization header"}), 401
|
||||
|
||||
# Check if token is valid
|
||||
if auth != "Bearer " + app.config["AUTH_TOKEN"]:
|
||||
# Strip Bearer from auth header
|
||||
auth = auth.replace("Bearer ", "")
|
||||
if not is_token_valid(auth):
|
||||
return jsonify({"error": "Invalid token"}), 401
|
||||
|
||||
api.add_resource(ApiVersion, "/api/version")
|
||||
|
@ -56,6 +62,7 @@ def create_app(test_config=None):
|
|||
|
||||
app.register_blueprint(api_system)
|
||||
app.register_blueprint(api_services)
|
||||
app.register_blueprint(api_auth)
|
||||
|
||||
@app.route("/api/swagger.json")
|
||||
def spec():
|
||||
|
|
57
selfprivacy_api/migrations/create_tokens_json.py
Normal file
57
selfprivacy_api/migrations/create_tokens_json.py
Normal file
|
@ -0,0 +1,57 @@
|
|||
from datetime import datetime
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.utils import USERDATA_FILE, TOKENS_FILE, ReadUserData
|
||||
|
||||
|
||||
class CreateTokensJson(Migration):
|
||||
def get_migration_name(self):
|
||||
return "create_tokens_json"
|
||||
|
||||
def get_migration_description(self):
|
||||
return """Selfprivacy API used a single token in userdata.json for authentication.
|
||||
This migration creates a new tokens.json file with the old token in it.
|
||||
This migration runs if the tokens.json file does not exist.
|
||||
Old token is located at ["api"]["token"] in userdata.json.
|
||||
tokens.json path is declared in TOKENS_FILE imported from utils.py
|
||||
tokens.json must have the following format:
|
||||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "token_string",
|
||||
"name": "Master Token",
|
||||
"date": "current date from str(datetime.now())",
|
||||
}
|
||||
]
|
||||
}
|
||||
tokens.json must have 0600 permissions.
|
||||
"""
|
||||
|
||||
def is_migration_needed(self):
|
||||
return not os.path.exists(TOKENS_FILE)
|
||||
|
||||
def migrate(self):
|
||||
try:
|
||||
with ReadUserData(USERDATA_FILE) as userdata:
|
||||
token = userdata["api"]["token"]
|
||||
# Touch tokens.json with 0600 permissions
|
||||
Path(TOKENS_FILE).touch(mode=0o600)
|
||||
# Write token to tokens.json
|
||||
structure = {
|
||||
"tokens": [
|
||||
{
|
||||
"token": token,
|
||||
"name": "Master Token",
|
||||
"date": str(datetime.now()),
|
||||
}
|
||||
]
|
||||
}
|
||||
with open(TOKENS_FILE, "w") as tokens:
|
||||
json.dump(structure, tokens, indent=4)
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error creating tokens.json")
|
14
selfprivacy_api/resources/api_auth/__init__.py
Normal file
14
selfprivacy_api/resources/api_auth/__init__.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
#!/usr/bin/env python3
|
||||
"""API authentication module"""
|
||||
|
||||
from flask import Blueprint
|
||||
from flask_restful import Api
|
||||
|
||||
auth = Blueprint("auth", __name__, url_prefix="/auth")
|
||||
api = Api(auth)
|
||||
|
||||
from . import (
|
||||
new_device,
|
||||
recovery_token,
|
||||
app_tokens,
|
||||
)
|
100
selfprivacy_api/resources/api_auth/app_tokens.py
Normal file
100
selfprivacy_api/resources/api_auth/app_tokens.py
Normal file
|
@ -0,0 +1,100 @@
|
|||
#!/usr/bin/env python3
|
||||
"""App tokens management module"""
|
||||
from flask import request
|
||||
from flask_restful import Resource, reqparse
|
||||
|
||||
from selfprivacy_api.resources.api_auth import api
|
||||
from selfprivacy_api.utils.auth import (
|
||||
delete_token,
|
||||
get_tokens_info,
|
||||
delete_token,
|
||||
refresh_token,
|
||||
is_token_valid,
|
||||
)
|
||||
|
||||
|
||||
class Tokens(Resource):
|
||||
"""Token management class
|
||||
GET returns the list of active devices.
|
||||
DELETE invalidates token unless it is the last one or the caller uses this token.
|
||||
POST refreshes the token of the caller.
|
||||
"""
|
||||
|
||||
def get(self):
|
||||
"""
|
||||
Get current device tokens
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
responses:
|
||||
200:
|
||||
description: List of tokens
|
||||
400:
|
||||
description: Bad request
|
||||
"""
|
||||
return get_tokens_info()
|
||||
|
||||
def delete(self):
|
||||
"""
|
||||
Delete token
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
parameters:
|
||||
- in: body
|
||||
name: token
|
||||
required: true
|
||||
description: Token to delete
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
token:
|
||||
type: string
|
||||
description: Token to delete
|
||||
responses:
|
||||
200:
|
||||
description: Token deleted
|
||||
400:
|
||||
description: Bad request
|
||||
404:
|
||||
description: Token not found
|
||||
"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("token", type=str, required=True, help="Token to delete")
|
||||
args = parser.parse_args()
|
||||
token = args["token"]
|
||||
if request.headers.get("Authorization") == f"Bearer {token}":
|
||||
return {"message": "Cannot delete caller's token"}, 400
|
||||
if not is_token_valid(token):
|
||||
return {"message": "Token not found"}, 404
|
||||
delete_token(token)
|
||||
return {"message": "Token deleted"}, 200
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
Refresh token
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
responses:
|
||||
200:
|
||||
description: Token refreshed
|
||||
400:
|
||||
description: Bad request
|
||||
404:
|
||||
description: Token not found
|
||||
"""
|
||||
# Get token from header
|
||||
token = request.headers.get("Authorization").split(" ")[1]
|
||||
if not is_token_valid(token):
|
||||
return {"message": "Token not found"}, 404
|
||||
return refresh_token(token)
|
||||
|
||||
|
||||
api.add_resource(Tokens, "/tokens")
|
87
selfprivacy_api/resources/api_auth/new_device.py
Normal file
87
selfprivacy_api/resources/api_auth/new_device.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
#!/usr/bin/env python3
|
||||
"""New device auth module"""
|
||||
from flask import request
|
||||
from flask_restful import Resource, reqparse
|
||||
|
||||
from selfprivacy_api.resources.api_auth import api
|
||||
from selfprivacy_api.utils.auth import (
|
||||
get_new_device_auth_token,
|
||||
use_new_device_auth_token,
|
||||
)
|
||||
|
||||
|
||||
class NewDevice(Resource):
|
||||
"""New device auth class
|
||||
POST returns a new token for the caller.
|
||||
"""
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
Get new device token
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
responses:
|
||||
200:
|
||||
description: New device token
|
||||
400:
|
||||
description: Bad request
|
||||
"""
|
||||
return get_new_device_auth_token()
|
||||
|
||||
|
||||
class AuthorizeDevice(Resource):
|
||||
"""Authorize device class
|
||||
POST authorizes the caller.
|
||||
"""
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
Authorize device
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
parameters:
|
||||
- in: body
|
||||
name: data
|
||||
required: true
|
||||
description: Who is authorizing
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
token:
|
||||
type: string
|
||||
description: Mnemonic token to authorize
|
||||
device:
|
||||
type: string
|
||||
description: Device to authorize
|
||||
responses:
|
||||
200:
|
||||
description: Device authorized
|
||||
400:
|
||||
description: Bad request
|
||||
404:
|
||||
description: Token not found
|
||||
"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument(
|
||||
"token", type=str, required=True, help="Mnemonic token to authorize"
|
||||
)
|
||||
parser.add_argument(
|
||||
"device", type=str, required=True, help="Device to authorize"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
auth_token = args["token"]
|
||||
device = args["device"]
|
||||
token = use_new_device_auth_token(auth_token, device)
|
||||
if token is None:
|
||||
return {"message": "Token not found"}, 404
|
||||
return {"message": "Device authorized", "token": token}, 200
|
||||
|
||||
|
||||
api.add_resource(NewDevice, "/new_device")
|
||||
api.add_resource(AuthorizeDevice, "/new_device/authorize")
|
189
selfprivacy_api/resources/api_auth/recovery_token.py
Normal file
189
selfprivacy_api/resources/api_auth/recovery_token.py
Normal file
|
@ -0,0 +1,189 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Recovery token module"""
|
||||
from datetime import datetime
|
||||
from flask import request
|
||||
from flask_restful import Resource, reqparse
|
||||
|
||||
from selfprivacy_api.resources.api_auth import api
|
||||
from selfprivacy_api.utils.auth import (
|
||||
is_recovery_token_exists,
|
||||
is_recovery_token_valid,
|
||||
get_recovery_token_status,
|
||||
generate_recovery_token,
|
||||
use_mnemonic_recoverery_token,
|
||||
)
|
||||
|
||||
|
||||
class RecoveryToken(Resource):
|
||||
"""Recovery token class
|
||||
GET returns the status of the recovery token.
|
||||
POST generates a new recovery token.
|
||||
"""
|
||||
|
||||
def get(self):
|
||||
"""
|
||||
Get recovery token status
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
responses:
|
||||
200:
|
||||
description: Recovery token status
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
exists:
|
||||
type: boolean
|
||||
description: Recovery token exists
|
||||
valid:
|
||||
type: boolean
|
||||
description: Recovery token is valid
|
||||
date:
|
||||
type: string
|
||||
description: Recovery token date
|
||||
expiration:
|
||||
type: string
|
||||
description: Recovery token expiration date
|
||||
uses_left:
|
||||
type: integer
|
||||
description: Recovery token uses left
|
||||
400:
|
||||
description: Bad request
|
||||
"""
|
||||
if not is_recovery_token_exists():
|
||||
return {
|
||||
"exists": False,
|
||||
"valid": False,
|
||||
"date": None,
|
||||
"expiration": None,
|
||||
"uses_left": None,
|
||||
}
|
||||
status = get_recovery_token_status()
|
||||
if not is_recovery_token_valid():
|
||||
return {
|
||||
"exists": True,
|
||||
"valid": False,
|
||||
"date": status["date"],
|
||||
"expiration": status["expiration"],
|
||||
"uses_left": status["uses_left"],
|
||||
}
|
||||
return {
|
||||
"exists": True,
|
||||
"valid": True,
|
||||
"date": status["date"],
|
||||
"expiration": status["expiration"],
|
||||
"uses_left": status["uses_left"],
|
||||
}
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
Generate recovery token
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
parameters:
|
||||
- in: body
|
||||
name: data
|
||||
required: true
|
||||
description: Token data
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
expiration:
|
||||
type: string
|
||||
description: Token expiration date
|
||||
uses:
|
||||
type: integer
|
||||
description: Token uses
|
||||
responses:
|
||||
200:
|
||||
description: Recovery token generated
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
token:
|
||||
type: string
|
||||
description: Mnemonic recovery token
|
||||
400:
|
||||
description: Bad request
|
||||
"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument(
|
||||
"expiration", type=str, required=True, help="Token expiration date"
|
||||
)
|
||||
parser.add_argument("uses", type=int, required=True, help="Token uses")
|
||||
args = parser.parse_args()
|
||||
# Convert expiration date to datetime and return 400 if it is not valid
|
||||
try:
|
||||
expiration = datetime.strptime(args["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
except ValueError:
|
||||
return {
|
||||
"error": "Invalid expiration date. Use YYYY-MM-DDTHH:MM:SS.SSSZ"
|
||||
}, 400
|
||||
# Generate recovery token
|
||||
token = generate_recovery_token(expiration, args["uses"])
|
||||
return {"token": token}
|
||||
|
||||
|
||||
class UseRecoveryToken(Resource):
|
||||
"""Use recovery token class
|
||||
POST uses the recovery token.
|
||||
"""
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
Use recovery token
|
||||
---
|
||||
tags:
|
||||
- Tokens
|
||||
security:
|
||||
- bearerAuth: []
|
||||
parameters:
|
||||
- in: body
|
||||
name: data
|
||||
required: true
|
||||
description: Token data
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
token:
|
||||
type: string
|
||||
description: Mnemonic recovery token
|
||||
device:
|
||||
type: string
|
||||
description: Device to authorize
|
||||
responses:
|
||||
200:
|
||||
description: Recovery token used
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
token:
|
||||
type: string
|
||||
description: Device authorization token
|
||||
400:
|
||||
description: Bad request
|
||||
404:
|
||||
description: Token not found
|
||||
"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument(
|
||||
"token", type=str, required=True, help="Mnemonic recovery token"
|
||||
)
|
||||
parser.add_argument(
|
||||
"device", type=str, required=True, help="Device to authorize"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
# Use recovery token
|
||||
token = use_mnemonic_recoverery_token(args["token"], args["device"])
|
||||
if token is None:
|
||||
return {"error": "Token not found"}, 404
|
||||
return {"token": token}
|
||||
|
||||
|
||||
api.add_resource(RecoveryToken, "/recovery_token")
|
||||
api.add_resource(UseRecoveryToken, "/recovery_token/use")
|
|
@ -1,13 +1,22 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Various utility functions"""
|
||||
from enum import Enum
|
||||
import json
|
||||
import portalocker
|
||||
|
||||
|
||||
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
|
||||
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
|
||||
DOMAIN_FILE = "/var/domain"
|
||||
|
||||
|
||||
class UserDataFiles(Enum):
|
||||
"""Enum for userdata files"""
|
||||
|
||||
USERDATA = 0
|
||||
TOKENS = 1
|
||||
|
||||
|
||||
def get_domain():
|
||||
"""Get domain from /var/domain without trailing new line"""
|
||||
with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file:
|
||||
|
@ -18,8 +27,13 @@ def get_domain():
|
|||
class WriteUserData(object):
|
||||
"""Write userdata.json with lock"""
|
||||
|
||||
def __init__(self):
|
||||
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
|
||||
def __init__(self, file_type=UserDataFiles.USERDATA):
|
||||
if file_type == UserDataFiles.USERDATA:
|
||||
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
|
||||
elif file_type == UserDataFiles.TOKENS:
|
||||
self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8")
|
||||
else:
|
||||
raise ValueError("Unknown file type")
|
||||
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
|
||||
self.data = json.load(self.userdata_file)
|
||||
|
||||
|
@ -38,8 +52,13 @@ class WriteUserData(object):
|
|||
class ReadUserData(object):
|
||||
"""Read userdata.json with lock"""
|
||||
|
||||
def __init__(self):
|
||||
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
|
||||
def __init__(self, file_type=UserDataFiles.USERDATA):
|
||||
if file_type == UserDataFiles.USERDATA:
|
||||
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
|
||||
elif file_type == UserDataFiles.TOKENS:
|
||||
self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8")
|
||||
else:
|
||||
raise ValueError("Unknown file type")
|
||||
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
|
||||
self.data = json.load(self.userdata_file)
|
||||
|
273
selfprivacy_api/utils/auth.py
Normal file
273
selfprivacy_api/utils/auth.py
Normal file
|
@ -0,0 +1,273 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Token management utils"""
|
||||
import secrets
|
||||
from datetime import datetime, timedelta
|
||||
import re
|
||||
|
||||
from mnemonic import Mnemonic
|
||||
|
||||
from . import ReadUserData, UserDataFiles, WriteUserData
|
||||
|
||||
"""
|
||||
Token are stored in the tokens.json file.
|
||||
File contains device tokens, recovery token and new device auth token.
|
||||
File structure:
|
||||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "device token",
|
||||
"name": "device name",
|
||||
"date": "date of creation",
|
||||
}
|
||||
],
|
||||
"recovery_token": {
|
||||
"token": "recovery token",
|
||||
"date": "date of creation",
|
||||
"expiration": "date of expiration",
|
||||
"uses_left": "number of uses left"
|
||||
},
|
||||
"new_device": {
|
||||
"token": "new device auth token",
|
||||
"date": "date of creation",
|
||||
"expiration": "date of expiration",
|
||||
}
|
||||
}
|
||||
Recovery token may or may not have expiration date and uses_left.
|
||||
There may be no recovery token at all.
|
||||
Device tokens must be unique.
|
||||
"""
|
||||
|
||||
|
||||
def _get_tokens():
|
||||
"""Get all tokens as list of tokens of every device"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
return [token["token"] for token in tokens["tokens"]]
|
||||
|
||||
|
||||
def _get_token_names():
|
||||
"""Get all token names"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
return [t["name"] for t in tokens["tokens"]]
|
||||
|
||||
|
||||
def _validate_token_name(name):
|
||||
"""Token name must be an alphanumeric string and not empty.
|
||||
Replace invalid characters with '_'
|
||||
If token name exists, add a random number to the end of the name until it is unique.
|
||||
"""
|
||||
if not re.match("^[a-zA-Z0-9]*$", name):
|
||||
name = re.sub("[^a-zA-Z0-9]", "_", name)
|
||||
if name == "":
|
||||
name = "Unknown device"
|
||||
while name in _get_token_names():
|
||||
name += str(secrets.randbelow(10))
|
||||
return name
|
||||
|
||||
|
||||
def is_token_valid(token):
|
||||
"""Check if token is valid"""
|
||||
if token in _get_tokens():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_tokens_info():
|
||||
"""Get all tokens info without tokens themselves"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
return [
|
||||
{"name": token["name"], "date": token["date"]} for token in tokens["tokens"]
|
||||
]
|
||||
|
||||
|
||||
def _generate_token():
|
||||
"""Generates new token and makes sure it is unique"""
|
||||
token = secrets.token_urlsafe(32)
|
||||
while token in _get_tokens():
|
||||
token = secrets.token_urlsafe(32)
|
||||
return token
|
||||
|
||||
|
||||
def create_token(name):
|
||||
"""Create new token"""
|
||||
token = _generate_token()
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
tokens["tokens"].append(
|
||||
{
|
||||
"token": token,
|
||||
"name": _validate_token_name(name),
|
||||
"date": str(datetime.now()),
|
||||
}
|
||||
)
|
||||
return token
|
||||
|
||||
|
||||
def delete_token(token):
|
||||
"""Delete token"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
tokens["tokens"] = [t for t in tokens["tokens"] if t["token"] != token]
|
||||
|
||||
|
||||
def refresh_token(token):
|
||||
"""Change the token field of the existing token"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
for t in tokens["tokens"]:
|
||||
if t["token"] == token:
|
||||
t["token"] = _generate_token()
|
||||
break
|
||||
|
||||
|
||||
def is_recovery_token_exists():
|
||||
"""Check if recovery token exists"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
return "recovery_token" in tokens
|
||||
|
||||
|
||||
def is_recovery_token_valid():
|
||||
"""Check if recovery token is valid"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
if "recovery_token" not in tokens:
|
||||
return False
|
||||
recovery_token = tokens["recovery_token"]
|
||||
if "uses_left" in recovery_token:
|
||||
if recovery_token["uses_left"] <= 0:
|
||||
return False
|
||||
if "expiration" not in recovery_token:
|
||||
return True
|
||||
return datetime.now() < datetime.strptime(
|
||||
recovery_token["expiration"], "%Y-%m-%d %H:%M:%S.%f"
|
||||
)
|
||||
|
||||
|
||||
def get_recovery_token_status():
|
||||
"""Get recovery token date of creation, expiration and uses left"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
if "recovery_token" not in tokens:
|
||||
return None
|
||||
recovery_token = tokens["recovery_token"]
|
||||
return {
|
||||
"date": recovery_token["date"],
|
||||
"expiration": recovery_token["expiration"]
|
||||
if "expiration" in recovery_token
|
||||
else None,
|
||||
"uses_left": recovery_token["uses_left"]
|
||||
if "uses_left" in recovery_token
|
||||
else None,
|
||||
}
|
||||
|
||||
|
||||
def _get_recovery_token():
|
||||
"""Get recovery token"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
if "recovery_token" not in tokens:
|
||||
return None
|
||||
return tokens["recovery_token"]["token"]
|
||||
|
||||
|
||||
def generate_recovery_token(expiration=None, uses_left=None):
|
||||
"""Generate a 24 bytes recovery token and return a mneomnic word list.
|
||||
Write a string representation of the recovery token to the tokens.json file.
|
||||
"""
|
||||
# expires must be a date or None
|
||||
# uses_left must be an integer or None
|
||||
if expiration is not None:
|
||||
if not isinstance(expiration, datetime):
|
||||
raise TypeError("expires must be a datetime object")
|
||||
if uses_left is not None:
|
||||
if not isinstance(uses_left, int):
|
||||
raise TypeError("uses_left must be an integer")
|
||||
if uses_left <= 0:
|
||||
raise ValueError("uses_left must be greater than 0")
|
||||
|
||||
recovery_token = secrets.token_bytes(24)
|
||||
recovery_token_str = recovery_token.hex()
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
tokens["recovery_token"] = {
|
||||
"token": recovery_token_str,
|
||||
"date": str(datetime.now()),
|
||||
"expiration": expiration if expiration is not None else None,
|
||||
"uses_left": uses_left if uses_left is not None else None,
|
||||
}
|
||||
return Mnemonic(language="english").to_mnemonic(recovery_token)
|
||||
|
||||
|
||||
def use_mnemonic_recoverery_token(mnemonic_phrase, name):
|
||||
"""Use the recovery token by converting the mnemonic word list to a byte array.
|
||||
If the recovery token if invalid itself, return None
|
||||
If the binary representation of phrase not matches the byte array of the recovery token, return None.
|
||||
If the mnemonic phrase is valid then generate a device token and return it.
|
||||
Substract 1 from uses_left if it exists.
|
||||
mnemonic_phrase is a string representation of the mnemonic word list.
|
||||
"""
|
||||
recovery_token_str = _get_recovery_token()
|
||||
if recovery_token_str is None:
|
||||
return None
|
||||
recovery_token = bytes.fromhex(recovery_token_str)
|
||||
if not Mnemonic(language="english").check(mnemonic_phrase):
|
||||
return None
|
||||
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
|
||||
if phrase_bytes != recovery_token:
|
||||
return None
|
||||
token = _generate_token()
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
tokens["tokens"].append(
|
||||
{
|
||||
"token": token,
|
||||
"name": _validate_token_name(name),
|
||||
"date": str(datetime.now()),
|
||||
}
|
||||
)
|
||||
if "recovery_token" in tokens:
|
||||
if "uses_left" in tokens["recovery_token"]:
|
||||
tokens["recovery_token"]["uses_left"] -= 1
|
||||
return token
|
||||
|
||||
|
||||
def get_new_device_auth_token():
|
||||
"""Generate a new device auth token which is valid for 10 minutes and return a mnemonic phrase representation
|
||||
Write token to the new_device of the tokens.json file.
|
||||
"""
|
||||
token = secrets.token_bytes(24)
|
||||
token_str = token.hex()
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
tokens["new_device"] = {
|
||||
"token": token_str,
|
||||
"date": str(datetime.now()),
|
||||
"expiration": str(datetime.now() + timedelta(minutes=10)),
|
||||
}
|
||||
return Mnemonic(language="english").to_mnemonic(token)
|
||||
|
||||
|
||||
def _get_new_device_auth_token():
|
||||
"""Get new device auth token. If it is expired, return None"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens:
|
||||
if "new_device" not in tokens:
|
||||
return None
|
||||
new_device = tokens["new_device"]
|
||||
if "expiration" not in new_device:
|
||||
return None
|
||||
if datetime.now() > datetime.strptime(
|
||||
new_device["expiration"], "%Y-%m-%d %H:%M:%S.%f"
|
||||
):
|
||||
return None
|
||||
return new_device["token"]
|
||||
|
||||
|
||||
def use_new_device_auth_token(mnemonic_phrase, name):
|
||||
"""Use the new device auth token by converting the mnemonic string to a byte array.
|
||||
If the mnemonic phrase is valid then generate a device token and return it.
|
||||
New device auth token must be deleted.
|
||||
"""
|
||||
token_str = _get_new_device_auth_token()
|
||||
if token_str is None:
|
||||
return None
|
||||
token = bytes.fromhex(token_str)
|
||||
if not Mnemonic(language="english").check(mnemonic_phrase):
|
||||
return None
|
||||
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
|
||||
if phrase_bytes != token:
|
||||
return None
|
||||
token = create_token(name)
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
if "new_device" in tokens:
|
||||
del tokens["new_device"]
|
||||
return token
|
2
setup.py
2
setup.py
|
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
|||
|
||||
setup(
|
||||
name="selfprivacy_api",
|
||||
version="1.1.0",
|
||||
version="1.1.1",
|
||||
packages=find_packages(),
|
||||
scripts=[
|
||||
"selfprivacy_api/app.py",
|
||||
|
|
|
@ -4,10 +4,10 @@ from selfprivacy_api.app import create_app
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def app():
|
||||
def app(mocker, shared_datadir):
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json")
|
||||
app = create_app(
|
||||
{
|
||||
"AUTH_TOKEN": "TEST_TOKEN",
|
||||
"ENABLE_SWAGGER": "0",
|
||||
}
|
||||
)
|
||||
|
|
9
tests/data/tokens.json
Normal file
9
tests/data/tokens.json
Normal file
|
@ -0,0 +1,9 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "TEST_TOKEN",
|
||||
"name": "Test Token",
|
||||
"date": "2022-01-14 08:31:10.789314"
|
||||
}
|
||||
]
|
||||
}
|
9
tests/services/data/tokens.json
Normal file
9
tests/services/data/tokens.json
Normal file
|
@ -0,0 +1,9 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "TEST_TOKEN",
|
||||
"name": "Test Token",
|
||||
"date": "2022-01-14 08:31:10.789314"
|
||||
}
|
||||
]
|
||||
}
|
Loading…
Reference in a new issue