This commit is contained in:
inexcode 2022-02-16 16:03:38 +03:00
parent f228db5b29
commit c22fe9e8bd
11 changed files with 50 additions and 51 deletions

View file

@ -1,3 +1,13 @@
"""Migrations module.
Migrations module is introduced in v1.1.1 and provides one-shot
migrations which cannot be performed from the NixOS configuration file changes.
These migrations are checked and ran before every start of the API.
You can disable certain migrations if needed by creating an array
at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
@ -26,7 +36,7 @@ def run_migrations():
try:
if migration.is_migration_needed():
migration.migrate()
except Exception as e:
except Exception as err:
print(f"Error while migrating {migration.get_migration_name()}")
print(e)
print(err)
print("Skipping this migration")

View file

@ -4,7 +4,7 @@ import json
from pathlib import Path
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import USERDATA_FILE, TOKENS_FILE, ReadUserData
from selfprivacy_api.utils import TOKENS_FILE, ReadUserData
class CreateTokensJson(Migration):
@ -50,7 +50,7 @@ class CreateTokensJson(Migration):
}
]
}
with open(TOKENS_FILE, "w") as tokens:
with open(TOKENS_FILE, "w", encoding="utf-8") as tokens:
json.dump(structure, tokens, indent=4)
print("Done")
except Exception as e:

View file

@ -24,10 +24,7 @@ class FixNixosConfigBranch(Migration):
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
)
os.chdir(current_working_directory)
if nixos_config_branch.decode("utf-8").strip() == "rolling-testing":
return True
else:
return False
return nixos_config_branch.decode("utf-8").strip() == "rolling-testing"
except subprocess.CalledProcessError:
os.chdir(current_working_directory)
return False

View file

@ -1,16 +1,16 @@
from abc import ABC, abstractmethod
"""
Abstract Migration class
This class is used to define the structure of a migration
Migration has a function is_migration_needed() that returns True or False
Migration has a function migrate() that does the migration
Migration has a function get_migration_name() that returns the migration name
Migration has a function get_migration_description() that returns the migration description
"""
class Migration(ABC):
"""
Abstract Migration class
This class is used to define the structure of a migration
Migration has a function is_migration_needed() that returns True or False
Migration has a function migrate() that does the migration
Migration has a function get_migration_name() that returns the migration name
Migration has a function get_migration_description() that returns the migration description
"""
@abstractmethod
def get_migration_name(self):
pass

View file

@ -7,7 +7,6 @@ from selfprivacy_api.resources.api_auth import api
from selfprivacy_api.utils.auth import (
delete_token,
get_tokens_info,
delete_token,
is_token_name_exists,
is_token_name_pair_valid,
refresh_token,

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""New device auth module"""
from flask import request
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.api_auth import api

View file

@ -1,7 +1,6 @@
#!/usr/bin/env python3
"""Recovery token module"""
from datetime import datetime
from flask import request
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.api_auth import api

View file

@ -212,17 +212,16 @@ class SSHKeys(Resource):
if "sshKeys" not in data:
data["sshKeys"] = []
return data["sshKeys"]
else:
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
return user["sshKeys"]
return {
"error": "User not found",
}, 404
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
return user["sshKeys"]
return {
"error": "User not found",
}, 404
def post(self, username):
"""

View file

@ -283,6 +283,8 @@ class PythonVersion(Resource):
class PullRepositoryChanges(Resource):
"""Pull NixOS config repository changes"""
def get(self):
"""
Pull Repository Changes
@ -324,12 +326,11 @@ class PullRepositoryChanges(Resource):
"message": "Update completed successfully",
"data": data,
}
elif git_pull_process_descriptor.returncode > 0:
return {
"status": git_pull_process_descriptor.returncode,
"message": "Something went wrong",
"data": data,
}, 500
return {
"status": git_pull_process_descriptor.returncode,
"message": "Something went wrong",
"data": data,
}, 500
api.add_resource(Timezone, "/configuration/timezone")

View file

@ -105,10 +105,7 @@ class ResticController:
"--json",
]
if (
self.state == ResticStates.BACKING_UP
or self.state == ResticStates.RESTORING
):
if self.state in (ResticStates.BACKING_UP, ResticStates.RESTORING):
return
with subprocess.Popen(
backup_listing_command,
@ -129,10 +126,9 @@ class ResticController:
if "Is there a repository at the following location?" in snapshots_list:
self.state = ResticStates.NOT_INITIALIZED
return
else:
self.state = ResticStates.ERROR
self.error_message = snapshots_list
return
self.state = ResticStates.ERROR
self.error_message = snapshots_list
return
def initialize_repository(self):
"""
@ -195,10 +191,7 @@ class ResticController:
"""
backup_status_check_command = ["tail", "-1", "/var/backup.log"]
if (
self.state == ResticStates.NO_KEY
or self.state == ResticStates.NOT_INITIALIZED
):
if self.state in (ResticStates.NO_KEY, ResticStates.NOT_INITIALIZED):
return
# If the log file does not exists

View file

@ -222,7 +222,8 @@ def generate_recovery_token(expiration=None, uses_left=None):
def use_mnemonic_recoverery_token(mnemonic_phrase, name):
"""Use the recovery token by converting the mnemonic word list to a byte array.
If the recovery token if invalid itself, return None
If the binary representation of phrase not matches the byte array of the recovery token, return None.
If the binary representation of phrase not matches
the byte array of the recovery token, return None.
If the mnemonic phrase is valid then generate a device token and return it.
Substract 1 from uses_left if it exists.
mnemonic_phrase is a string representation of the mnemonic word list.
@ -258,7 +259,8 @@ def use_mnemonic_recoverery_token(mnemonic_phrase, name):
def get_new_device_auth_token():
"""Generate a new device auth token which is valid for 10 minutes and return a mnemonic phrase representation
"""Generate a new device auth token which is valid for 10 minutes
and return a mnemonic phrase representation
Write token to the new_device of the tokens.json file.
"""
token = secrets.token_bytes(16)