mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-10-31 17:17:17 +00:00
refactor: Adapt API to the NixOS configuration changes
This commit is contained in:
parent
1e9744227b
commit
b6f436d8b3
|
@ -31,7 +31,7 @@ def get_ssh_settings() -> UserdataSshSettings:
|
|||
if "enable" not in data["ssh"]:
|
||||
data["ssh"]["enable"] = True
|
||||
if "passwordAuthentication" not in data["ssh"]:
|
||||
data["ssh"]["passwordAuthentication"] = True
|
||||
data["ssh"]["passwordAuthentication"] = False
|
||||
if "rootKeys" not in data["ssh"]:
|
||||
data["ssh"]["rootKeys"] = []
|
||||
return UserdataSshSettings(**data["ssh"])
|
||||
|
|
|
@ -13,7 +13,7 @@ def get_timezone() -> str:
|
|||
with ReadUserData() as user_data:
|
||||
if "timezone" in user_data:
|
||||
return user_data["timezone"]
|
||||
return "Europe/Uzhgorod"
|
||||
return "Etc/UTC"
|
||||
|
||||
|
||||
class InvalidTimezone(Exception):
|
||||
|
|
|
@ -372,7 +372,6 @@ class ResticBackupper(AbstractBackupper):
|
|||
stderr=subprocess.STDOUT,
|
||||
shell=False,
|
||||
) as handle:
|
||||
|
||||
# for some reason restore does not support
|
||||
# nice reporting of progress via json
|
||||
output = handle.communicate()[0].decode("utf-8")
|
||||
|
|
|
@ -17,7 +17,6 @@ class UserType(Enum):
|
|||
|
||||
@strawberry.type
|
||||
class User:
|
||||
|
||||
user_type: UserType
|
||||
username: str
|
||||
# userHomeFolderspace: UserHomeFolderUsage
|
||||
|
@ -32,7 +31,6 @@ class UserMutationReturn(MutationReturnInterface):
|
|||
|
||||
|
||||
def get_user_by_username(username: str) -> typing.Optional[User]:
|
||||
|
||||
user = users_actions.get_user_by_username(username)
|
||||
if user is None:
|
||||
return None
|
||||
|
|
|
@ -15,7 +15,6 @@ from selfprivacy_api.jobs import Jobs
|
|||
class Job:
|
||||
@strawberry.field
|
||||
def get_jobs(self) -> typing.List[ApiJob]:
|
||||
|
||||
Jobs.get_jobs()
|
||||
|
||||
return [job_to_api_job(job) for job in Jobs.get_jobs()]
|
||||
|
|
|
@ -8,37 +8,12 @@ at api.skippedMigrations in userdata.json and populating it
|
|||
with IDs of the migrations to skip.
|
||||
Adding DISABLE_ALL to that array disables the migrations module entirely.
|
||||
"""
|
||||
from selfprivacy_api.migrations.check_for_failed_binds_migration import (
|
||||
CheckForFailedBindsMigration,
|
||||
)
|
||||
from selfprivacy_api.utils import ReadUserData
|
||||
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
|
||||
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
|
||||
from selfprivacy_api.migrations.migrate_to_selfprivacy_channel import (
|
||||
MigrateToSelfprivacyChannel,
|
||||
)
|
||||
from selfprivacy_api.migrations.mount_volume import MountVolume
|
||||
from selfprivacy_api.migrations.providers import CreateProviderFields
|
||||
from selfprivacy_api.migrations.modules_in_json import CreateModulesField
|
||||
from selfprivacy_api.migrations.prepare_for_nixos_2211 import (
|
||||
MigrateToSelfprivacyChannelFrom2205,
|
||||
)
|
||||
from selfprivacy_api.migrations.prepare_for_nixos_2305 import (
|
||||
MigrateToSelfprivacyChannelFrom2211,
|
||||
)
|
||||
from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis
|
||||
|
||||
from selfprivacy_api.utils import ReadUserData, UserDataFiles
|
||||
from selfprivacy_api.migrations.write_token_to_redis import WriteTokenToRedis
|
||||
|
||||
migrations = [
|
||||
FixNixosConfigBranch(),
|
||||
CreateTokensJson(),
|
||||
MigrateToSelfprivacyChannel(),
|
||||
MountVolume(),
|
||||
CheckForFailedBindsMigration(),
|
||||
CreateProviderFields(),
|
||||
MigrateToSelfprivacyChannelFrom2205(),
|
||||
MigrateToSelfprivacyChannelFrom2211(),
|
||||
LoadTokensToRedis(),
|
||||
CreateModulesField(),
|
||||
WriteTokenToRedis(),
|
||||
]
|
||||
|
||||
|
||||
|
@ -47,7 +22,7 @@ def run_migrations():
|
|||
Go over all migrations. If they are not skipped in userdata file, run them
|
||||
if the migration needed.
|
||||
"""
|
||||
with ReadUserData() as data:
|
||||
with ReadUserData(UserDataFiles.SECRETS) as data:
|
||||
if "api" not in data:
|
||||
skipped_migrations = []
|
||||
elif "skippedMigrations" not in data["api"]:
|
||||
|
|
|
@ -1,48 +0,0 @@
|
|||
from selfprivacy_api.jobs import JobStatus, Jobs
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.utils import WriteUserData
|
||||
|
||||
|
||||
class CheckForFailedBindsMigration(Migration):
|
||||
"""Mount volume."""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "check_for_failed_binds_migration"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "If binds migration failed, try again."
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
jobs = Jobs.get_jobs()
|
||||
# If there is a job with type_id "migrations.migrate_to_binds" and status is not "FINISHED",
|
||||
# then migration is needed and job is deleted
|
||||
for job in jobs:
|
||||
if (
|
||||
job.type_id == "migrations.migrate_to_binds"
|
||||
and job.status != JobStatus.FINISHED
|
||||
):
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Get info about existing volumes
|
||||
# Write info about volumes to userdata.json
|
||||
try:
|
||||
jobs = Jobs.get_jobs()
|
||||
for job in jobs:
|
||||
if (
|
||||
job.type_id == "migrations.migrate_to_binds"
|
||||
and job.status != JobStatus.FINISHED
|
||||
):
|
||||
Jobs.remove(job)
|
||||
with WriteUserData() as userdata:
|
||||
userdata["useBinds"] = False
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error mounting volume")
|
|
@ -1,58 +0,0 @@
|
|||
from datetime import datetime
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.utils import TOKENS_FILE, ReadUserData
|
||||
|
||||
|
||||
class CreateTokensJson(Migration):
|
||||
def get_migration_name(self):
|
||||
return "create_tokens_json"
|
||||
|
||||
def get_migration_description(self):
|
||||
return """Selfprivacy API used a single token in userdata.json for authentication.
|
||||
This migration creates a new tokens.json file with the old token in it.
|
||||
This migration runs if the tokens.json file does not exist.
|
||||
Old token is located at ["api"]["token"] in userdata.json.
|
||||
tokens.json path is declared in TOKENS_FILE imported from utils.py
|
||||
tokens.json must have the following format:
|
||||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "token_string",
|
||||
"name": "Master Token",
|
||||
"date": "current date from str(datetime.now())",
|
||||
}
|
||||
]
|
||||
}
|
||||
tokens.json must have 0600 permissions.
|
||||
"""
|
||||
|
||||
def is_migration_needed(self):
|
||||
return not os.path.exists(TOKENS_FILE)
|
||||
|
||||
def migrate(self):
|
||||
try:
|
||||
print(f"Creating tokens.json file at {TOKENS_FILE}")
|
||||
with ReadUserData() as userdata:
|
||||
token = userdata["api"]["token"]
|
||||
# Touch tokens.json with 0600 permissions
|
||||
Path(TOKENS_FILE).touch(mode=0o600)
|
||||
# Write token to tokens.json
|
||||
structure = {
|
||||
"tokens": [
|
||||
{
|
||||
"token": token,
|
||||
"name": "primary_token",
|
||||
"date": str(datetime.now()),
|
||||
}
|
||||
]
|
||||
}
|
||||
with open(TOKENS_FILE, "w", encoding="utf-8") as tokens:
|
||||
json.dump(structure, tokens, indent=4)
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error creating tokens.json")
|
|
@ -1,57 +0,0 @@
|
|||
import os
|
||||
import subprocess
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
|
||||
|
||||
class FixNixosConfigBranch(Migration):
|
||||
def get_migration_name(self):
|
||||
return "fix_nixos_config_branch"
|
||||
|
||||
def get_migration_description(self):
|
||||
return """Mobile SelfPrivacy app introduced a bug in version 0.4.0.
|
||||
New servers were initialized with a rolling-testing nixos config branch.
|
||||
This was fixed in app version 0.4.2, but existing servers were not updated.
|
||||
This migration fixes this by changing the nixos config branch to master.
|
||||
"""
|
||||
|
||||
def is_migration_needed(self):
|
||||
"""Check the current branch of /etc/nixos and return True if it is rolling-testing"""
|
||||
current_working_directory = os.getcwd()
|
||||
try:
|
||||
os.chdir("/etc/nixos")
|
||||
nixos_config_branch = subprocess.check_output(
|
||||
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
|
||||
)
|
||||
os.chdir(current_working_directory)
|
||||
return nixos_config_branch.decode("utf-8").strip() == "rolling-testing"
|
||||
except subprocess.CalledProcessError:
|
||||
os.chdir(current_working_directory)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
"""Affected server pulled the config with the --single-branch flag.
|
||||
Git config remote.origin.fetch has to be changed, so all branches will be fetched.
|
||||
Then, fetch all branches, pull and switch to master branch.
|
||||
"""
|
||||
print("Fixing Nixos config branch")
|
||||
current_working_directory = os.getcwd()
|
||||
try:
|
||||
os.chdir("/etc/nixos")
|
||||
|
||||
subprocess.check_output(
|
||||
[
|
||||
"git",
|
||||
"config",
|
||||
"remote.origin.fetch",
|
||||
"+refs/heads/*:refs/remotes/origin/*",
|
||||
]
|
||||
)
|
||||
subprocess.check_output(["git", "fetch", "--all"])
|
||||
subprocess.check_output(["git", "pull"])
|
||||
subprocess.check_output(["git", "checkout", "master"])
|
||||
os.chdir(current_working_directory)
|
||||
print("Done")
|
||||
except subprocess.CalledProcessError:
|
||||
os.chdir(current_working_directory)
|
||||
print("Error")
|
|
@ -1,49 +0,0 @@
|
|||
import os
|
||||
import subprocess
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
|
||||
|
||||
class MigrateToSelfprivacyChannel(Migration):
|
||||
"""Migrate to selfprivacy Nix channel."""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "migrate_to_selfprivacy_channel"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Migrate to selfprivacy Nix channel."
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
output = subprocess.check_output(
|
||||
["nix-channel", "--list"], start_new_session=True
|
||||
)
|
||||
output = output.decode("utf-8")
|
||||
first_line = output.split("\n", maxsplit=1)[0]
|
||||
return first_line.startswith("nixos") and (
|
||||
first_line.endswith("nixos-21.11") or first_line.endswith("nixos-21.05")
|
||||
)
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Change the channel and update them.
|
||||
# Also, go to /etc/nixos directory and make a git pull
|
||||
current_working_directory = os.getcwd()
|
||||
try:
|
||||
print("Changing channel")
|
||||
os.chdir("/etc/nixos")
|
||||
subprocess.check_output(
|
||||
[
|
||||
"nix-channel",
|
||||
"--add",
|
||||
"https://channel.selfprivacy.org/nixos-selfpricacy",
|
||||
"nixos",
|
||||
]
|
||||
)
|
||||
subprocess.check_output(["nix-channel", "--update"])
|
||||
subprocess.check_output(["git", "pull"])
|
||||
os.chdir(current_working_directory)
|
||||
except subprocess.CalledProcessError:
|
||||
os.chdir(current_working_directory)
|
||||
print("Error")
|
|
@ -1,50 +0,0 @@
|
|||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
||||
from selfprivacy_api.services import get_all_services
|
||||
|
||||
|
||||
def migrate_services_to_modules():
|
||||
with WriteUserData() as userdata:
|
||||
if "modules" not in userdata.keys():
|
||||
userdata["modules"] = {}
|
||||
|
||||
for service in get_all_services():
|
||||
name = service.get_id()
|
||||
if name in userdata.keys():
|
||||
field_content = userdata[name]
|
||||
userdata["modules"][name] = field_content
|
||||
del userdata[name]
|
||||
|
||||
|
||||
# If you ever want to get rid of modules field you will need to get rid of this migration
|
||||
class CreateModulesField(Migration):
|
||||
"""introduce 'modules' (services) into userdata"""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "modules_in_json"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Group service settings into a 'modules' field in userdata.json"
|
||||
|
||||
def is_migration_needed(self) -> bool:
|
||||
try:
|
||||
with ReadUserData() as userdata:
|
||||
for service in get_all_services():
|
||||
if service.get_id() in userdata.keys():
|
||||
return True
|
||||
|
||||
if "modules" not in userdata.keys():
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Write info about providers to userdata.json
|
||||
try:
|
||||
migrate_services_to_modules()
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error migrating service fields")
|
|
@ -1,51 +0,0 @@
|
|||
import os
|
||||
import subprocess
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
||||
from selfprivacy_api.utils.block_devices import BlockDevices
|
||||
|
||||
|
||||
class MountVolume(Migration):
|
||||
"""Mount volume."""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "mount_volume"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Mount volume if it is not mounted."
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
with ReadUserData() as userdata:
|
||||
return "volumes" not in userdata
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Get info about existing volumes
|
||||
# Write info about volumes to userdata.json
|
||||
try:
|
||||
volumes = BlockDevices().get_block_devices()
|
||||
# If there is an unmounted volume sdb,
|
||||
# Write it to userdata.json
|
||||
is_there_a_volume = False
|
||||
for volume in volumes:
|
||||
if volume.name == "sdb":
|
||||
is_there_a_volume = True
|
||||
break
|
||||
with WriteUserData() as userdata:
|
||||
userdata["volumes"] = []
|
||||
if is_there_a_volume:
|
||||
userdata["volumes"].append(
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4",
|
||||
}
|
||||
)
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error mounting volume")
|
|
@ -1,58 +0,0 @@
|
|||
import os
|
||||
import subprocess
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
|
||||
|
||||
class MigrateToSelfprivacyChannelFrom2205(Migration):
|
||||
"""Migrate to selfprivacy Nix channel.
|
||||
For some reason NixOS 22.05 servers initialized with the nixos channel instead of selfprivacy.
|
||||
This stops us from upgrading to NixOS 22.11
|
||||
"""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "migrate_to_selfprivacy_channel_from_2205"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Migrate to selfprivacy Nix channel from NixOS 22.05."
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
output = subprocess.check_output(
|
||||
["nix-channel", "--list"], start_new_session=True
|
||||
)
|
||||
output = output.decode("utf-8")
|
||||
first_line = output.split("\n", maxsplit=1)[0]
|
||||
return first_line.startswith("nixos") and (
|
||||
first_line.endswith("nixos-22.05")
|
||||
)
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Change the channel and update them.
|
||||
# Also, go to /etc/nixos directory and make a git pull
|
||||
current_working_directory = os.getcwd()
|
||||
try:
|
||||
print("Changing channel")
|
||||
os.chdir("/etc/nixos")
|
||||
subprocess.check_output(
|
||||
[
|
||||
"nix-channel",
|
||||
"--add",
|
||||
"https://channel.selfprivacy.org/nixos-selfpricacy",
|
||||
"nixos",
|
||||
]
|
||||
)
|
||||
subprocess.check_output(["nix-channel", "--update"])
|
||||
nixos_config_branch = subprocess.check_output(
|
||||
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
|
||||
)
|
||||
if nixos_config_branch.decode("utf-8").strip() == "api-redis":
|
||||
print("Also changing nixos-config branch from api-redis to master")
|
||||
subprocess.check_output(["git", "checkout", "master"])
|
||||
subprocess.check_output(["git", "pull"])
|
||||
os.chdir(current_working_directory)
|
||||
except subprocess.CalledProcessError:
|
||||
os.chdir(current_working_directory)
|
||||
print("Error")
|
|
@ -1,58 +0,0 @@
|
|||
import os
|
||||
import subprocess
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
|
||||
|
||||
class MigrateToSelfprivacyChannelFrom2211(Migration):
|
||||
"""Migrate to selfprivacy Nix channel.
|
||||
For some reason NixOS 22.11 servers initialized with the nixos channel instead of selfprivacy.
|
||||
This stops us from upgrading to NixOS 23.05
|
||||
"""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "migrate_to_selfprivacy_channel_from_2211"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Migrate to selfprivacy Nix channel from NixOS 22.11."
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
output = subprocess.check_output(
|
||||
["nix-channel", "--list"], start_new_session=True
|
||||
)
|
||||
output = output.decode("utf-8")
|
||||
first_line = output.split("\n", maxsplit=1)[0]
|
||||
return first_line.startswith("nixos") and (
|
||||
first_line.endswith("nixos-22.11")
|
||||
)
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Change the channel and update them.
|
||||
# Also, go to /etc/nixos directory and make a git pull
|
||||
current_working_directory = os.getcwd()
|
||||
try:
|
||||
print("Changing channel")
|
||||
os.chdir("/etc/nixos")
|
||||
subprocess.check_output(
|
||||
[
|
||||
"nix-channel",
|
||||
"--add",
|
||||
"https://channel.selfprivacy.org/nixos-selfpricacy",
|
||||
"nixos",
|
||||
]
|
||||
)
|
||||
subprocess.check_output(["nix-channel", "--update"])
|
||||
nixos_config_branch = subprocess.check_output(
|
||||
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
|
||||
)
|
||||
if nixos_config_branch.decode("utf-8").strip() == "api-redis":
|
||||
print("Also changing nixos-config branch from api-redis to master")
|
||||
subprocess.check_output(["git", "checkout", "master"])
|
||||
subprocess.check_output(["git", "pull"])
|
||||
os.chdir(current_working_directory)
|
||||
except subprocess.CalledProcessError:
|
||||
os.chdir(current_working_directory)
|
||||
print("Error")
|
|
@ -1,43 +0,0 @@
|
|||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
||||
|
||||
|
||||
class CreateProviderFields(Migration):
|
||||
"""Unhardcode providers"""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "create_provider_fields"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Add DNS, backup and server provider fields to enable user to choose between different clouds and to make the deployment adapt to these preferences."
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
with ReadUserData() as userdata:
|
||||
return "dns" not in userdata
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Write info about providers to userdata.json
|
||||
try:
|
||||
with WriteUserData() as userdata:
|
||||
userdata["dns"] = {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": userdata["cloudflare"]["apiKey"],
|
||||
}
|
||||
userdata["server"] = {
|
||||
"provider": "HETZNER",
|
||||
}
|
||||
userdata["backup"] = {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": userdata["backblaze"]["accountId"],
|
||||
"accountKey": userdata["backblaze"]["accountKey"],
|
||||
"bucket": userdata["backblaze"]["bucket"],
|
||||
}
|
||||
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error migrating provider fields")
|
|
@ -1,48 +0,0 @@
|
|||
from selfprivacy_api.migrations.migration import Migration
|
||||
|
||||
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
||||
JsonTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
|
||||
RedisTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
||||
AbstractTokensRepository,
|
||||
)
|
||||
|
||||
|
||||
class LoadTokensToRedis(Migration):
|
||||
"""Load Json tokens into Redis"""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "load_tokens_to_redis"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Loads access tokens and recovery keys from legacy json file into redis token storage"
|
||||
|
||||
def is_repo_empty(self, repo: AbstractTokensRepository) -> bool:
|
||||
if repo.get_tokens() != []:
|
||||
return False
|
||||
if repo.get_recovery_key() is not None:
|
||||
return False
|
||||
return True
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
if not self.is_repo_empty(JsonTokensRepository()) and self.is_repo_empty(
|
||||
RedisTokensRepository()
|
||||
):
|
||||
return True
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Write info about providers to userdata.json
|
||||
try:
|
||||
RedisTokensRepository().clone(JsonTokensRepository())
|
||||
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error migrating access tokens from json to redis")
|
63
selfprivacy_api/migrations/write_token_to_redis.py
Normal file
63
selfprivacy_api/migrations/write_token_to_redis.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
|
||||
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
|
||||
RedisTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
||||
AbstractTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.utils import ReadUserData, UserDataFiles
|
||||
|
||||
|
||||
class WriteTokenToRedis(Migration):
|
||||
"""Load Json tokens into Redis"""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "write_token_to_redis"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Loads the initial token into redis token storage"
|
||||
|
||||
def is_repo_empty(self, repo: AbstractTokensRepository) -> bool:
|
||||
if repo.get_tokens() != []:
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_token_from_json(self) -> Optional[Token]:
|
||||
try:
|
||||
with ReadUserData(UserDataFiles.SECRETS) as userdata:
|
||||
return Token(
|
||||
token=userdata["api"]["token"],
|
||||
device_name="Initial device",
|
||||
created_at=datetime.now(),
|
||||
)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
if self.get_token_from_json() is not None and self.is_repo_empty(
|
||||
RedisTokensRepository()
|
||||
):
|
||||
return True
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Write info about providers to userdata.json
|
||||
try:
|
||||
token = self.get_token_from_json()
|
||||
if token is None:
|
||||
print("No token found in secrets.json")
|
||||
return
|
||||
RedisTokensRepository()._store_token(token)
|
||||
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error migrating access tokens from json to redis")
|
|
@ -1,8 +0,0 @@
|
|||
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
||||
AbstractTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
||||
JsonTokensRepository,
|
||||
)
|
||||
|
||||
repository = JsonTokensRepository()
|
|
@ -1,153 +0,0 @@
|
|||
"""
|
||||
temporary legacy
|
||||
"""
|
||||
from typing import Optional
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
|
||||
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
|
||||
from selfprivacy_api.repositories.tokens.exceptions import (
|
||||
TokenNotFound,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
||||
AbstractTokensRepository,
|
||||
)
|
||||
|
||||
|
||||
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
|
||||
|
||||
|
||||
class JsonTokensRepository(AbstractTokensRepository):
|
||||
def get_tokens(self) -> list[Token]:
|
||||
"""Get the tokens"""
|
||||
tokens_list = []
|
||||
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
for userdata_token in tokens_file["tokens"]:
|
||||
tokens_list.append(
|
||||
Token(
|
||||
token=userdata_token["token"],
|
||||
device_name=userdata_token["name"],
|
||||
created_at=userdata_token["date"],
|
||||
)
|
||||
)
|
||||
|
||||
return tokens_list
|
||||
|
||||
def _store_token(self, new_token: Token):
|
||||
"""Store a token directly"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
tokens_file["tokens"].append(
|
||||
{
|
||||
"token": new_token.token,
|
||||
"name": new_token.device_name,
|
||||
"date": new_token.created_at.strftime(DATETIME_FORMAT),
|
||||
}
|
||||
)
|
||||
|
||||
def delete_token(self, input_token: Token) -> None:
|
||||
"""Delete the token"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
for userdata_token in tokens_file["tokens"]:
|
||||
if userdata_token["token"] == input_token.token:
|
||||
tokens_file["tokens"].remove(userdata_token)
|
||||
return
|
||||
|
||||
raise TokenNotFound("Token not found!")
|
||||
|
||||
def __key_date_from_str(self, date_string: str) -> datetime:
|
||||
if date_string is None or date_string == "":
|
||||
return None
|
||||
# we assume that we store dates in json as naive utc
|
||||
utc_no_tz = datetime.fromisoformat(date_string)
|
||||
utc_with_tz = utc_no_tz.replace(tzinfo=timezone.utc)
|
||||
return utc_with_tz
|
||||
|
||||
def __date_from_tokens_file(
|
||||
self, tokens_file: object, tokenfield: str, datefield: str
|
||||
):
|
||||
date_string = tokens_file[tokenfield].get(datefield)
|
||||
return self.__key_date_from_str(date_string)
|
||||
|
||||
def get_recovery_key(self) -> Optional[RecoveryKey]:
|
||||
"""Get the recovery key"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
|
||||
if (
|
||||
"recovery_token" not in tokens_file
|
||||
or tokens_file["recovery_token"] is None
|
||||
):
|
||||
return
|
||||
|
||||
recovery_key = RecoveryKey(
|
||||
key=tokens_file["recovery_token"].get("token"),
|
||||
created_at=self.__date_from_tokens_file(
|
||||
tokens_file, "recovery_token", "date"
|
||||
),
|
||||
expires_at=self.__date_from_tokens_file(
|
||||
tokens_file, "recovery_token", "expiration"
|
||||
),
|
||||
uses_left=tokens_file["recovery_token"].get("uses_left"),
|
||||
)
|
||||
|
||||
return recovery_key
|
||||
|
||||
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
key_expiration: Optional[str] = None
|
||||
if recovery_key.expires_at is not None:
|
||||
key_expiration = recovery_key.expires_at.strftime(DATETIME_FORMAT)
|
||||
tokens_file["recovery_token"] = {
|
||||
"token": recovery_key.key,
|
||||
"date": recovery_key.created_at.strftime(DATETIME_FORMAT),
|
||||
"expiration": key_expiration,
|
||||
"uses_left": recovery_key.uses_left,
|
||||
}
|
||||
|
||||
def _decrement_recovery_token(self):
|
||||
"""Decrement recovery key use count by one"""
|
||||
if self.is_recovery_key_valid():
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
if tokens["recovery_token"]["uses_left"] is not None:
|
||||
tokens["recovery_token"]["uses_left"] -= 1
|
||||
|
||||
def _delete_recovery_key(self) -> None:
|
||||
"""Delete the recovery key"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
if "recovery_token" in tokens_file:
|
||||
del tokens_file["recovery_token"]
|
||||
return
|
||||
|
||||
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
tokens_file["new_device"] = {
|
||||
"token": new_device_key.key,
|
||||
"date": new_device_key.created_at.strftime(DATETIME_FORMAT),
|
||||
"expiration": new_device_key.expires_at.strftime(DATETIME_FORMAT),
|
||||
}
|
||||
|
||||
def delete_new_device_key(self) -> None:
|
||||
"""Delete the new device key"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
if "new_device" in tokens_file:
|
||||
del tokens_file["new_device"]
|
||||
return
|
||||
|
||||
def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]:
|
||||
"""Retrieves new device key that is already stored."""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
if "new_device" not in tokens_file or tokens_file["new_device"] is None:
|
||||
return
|
||||
|
||||
new_device_key = NewDeviceKey(
|
||||
key=tokens_file["new_device"]["token"],
|
||||
created_at=self.__date_from_tokens_file(
|
||||
tokens_file, "new_device", "date"
|
||||
),
|
||||
expires_at=self.__date_from_tokens_file(
|
||||
tokens_file, "new_device", "expiration"
|
||||
),
|
||||
)
|
||||
return new_device_key
|
|
@ -3,7 +3,7 @@
|
|||
import typing
|
||||
from selfprivacy_api.services.bitwarden import Bitwarden
|
||||
from selfprivacy_api.services.gitea import Gitea
|
||||
from selfprivacy_api.services.jitsi import Jitsi
|
||||
from selfprivacy_api.services.jitsimeet import JitsiMeet
|
||||
from selfprivacy_api.services.mailserver import MailServer
|
||||
from selfprivacy_api.services.nextcloud import Nextcloud
|
||||
from selfprivacy_api.services.pleroma import Pleroma
|
||||
|
@ -18,7 +18,7 @@ services: list[Service] = [
|
|||
Nextcloud(),
|
||||
Pleroma(),
|
||||
Ocserv(),
|
||||
Jitsi(),
|
||||
JitsiMeet(),
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -244,9 +244,11 @@ def move_service(
|
|||
progress=95,
|
||||
)
|
||||
with WriteUserData() as user_data:
|
||||
if userdata_location not in user_data:
|
||||
user_data[userdata_location] = {}
|
||||
user_data[userdata_location]["location"] = volume.name
|
||||
if "modules" not in user_data:
|
||||
user_data["modules"] = {}
|
||||
if userdata_location not in user_data["modules"]:
|
||||
user_data["modules"][userdata_location] = {}
|
||||
user_data["modules"][userdata_location]["location"] = volume.name
|
||||
# Start service
|
||||
service.start()
|
||||
Jobs.update(
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
"""Class representing Jitsi service"""
|
||||
"""Class representing Jitsi Meet service"""
|
||||
import base64
|
||||
import subprocess
|
||||
import typing
|
||||
|
@ -11,26 +11,26 @@ from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceS
|
|||
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
|
||||
from selfprivacy_api.utils.block_devices import BlockDevice
|
||||
import selfprivacy_api.utils.network as network_utils
|
||||
from selfprivacy_api.services.jitsi.icon import JITSI_ICON
|
||||
from selfprivacy_api.services.jitsimeet.icon import JITSI_ICON
|
||||
|
||||
|
||||
class Jitsi(Service):
|
||||
class JitsiMeet(Service):
|
||||
"""Class representing Jitsi service"""
|
||||
|
||||
@staticmethod
|
||||
def get_id() -> str:
|
||||
"""Return service id."""
|
||||
return "jitsi"
|
||||
return "jitsi-meet"
|
||||
|
||||
@staticmethod
|
||||
def get_display_name() -> str:
|
||||
"""Return service display name."""
|
||||
return "Jitsi"
|
||||
return "JitsiMeet"
|
||||
|
||||
@staticmethod
|
||||
def get_description() -> str:
|
||||
"""Return service description."""
|
||||
return "Jitsi is a free and open-source video conferencing solution."
|
||||
return "Jitsi Meet is a free and open-source video conferencing solution."
|
||||
|
||||
@staticmethod
|
||||
def get_svg_icon() -> str:
|
||||
|
@ -123,4 +123,4 @@ class Jitsi(Service):
|
|||
]
|
||||
|
||||
def move_to_volume(self, volume: BlockDevice) -> Job:
|
||||
raise NotImplementedError("jitsi service is not movable")
|
||||
raise NotImplementedError("jitsi-meet service is not movable")
|
|
@ -21,7 +21,7 @@ class MailServer(Service):
|
|||
|
||||
@staticmethod
|
||||
def get_id() -> str:
|
||||
return "email"
|
||||
return "simple-nixos-mailserver"
|
||||
|
||||
@staticmethod
|
||||
def get_display_name() -> str:
|
||||
|
@ -173,7 +173,7 @@ class MailServer(Service):
|
|||
volume,
|
||||
job,
|
||||
FolderMoveNames.default_foldermoves(self),
|
||||
"email",
|
||||
"simple-nixos-mailserver",
|
||||
)
|
||||
|
||||
return job
|
||||
|
|
|
@ -225,10 +225,14 @@ class Service(ABC):
|
|||
return root_device
|
||||
with utils.ReadUserData() as userdata:
|
||||
if userdata.get("useBinds", False):
|
||||
return userdata.get(cls.get_id(), {}).get(
|
||||
return (
|
||||
userdata.get("modules", {})
|
||||
.get(cls.get_id(), {})
|
||||
.get(
|
||||
"location",
|
||||
root_device,
|
||||
)
|
||||
)
|
||||
else:
|
||||
return root_device
|
||||
|
||||
|
|
|
@ -9,10 +9,8 @@ import portalocker
|
|||
import typing
|
||||
|
||||
|
||||
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
|
||||
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
|
||||
JOBS_FILE = "/etc/nixos/userdata/jobs.json"
|
||||
DOMAIN_FILE = "/var/domain"
|
||||
USERDATA_FILE = "/etc/nixos/userdata.json"
|
||||
SECRETS_FILE = "/etc/selfprivacy/secrets.json"
|
||||
DKIM_DIR = "/var/dkim/"
|
||||
|
||||
|
||||
|
@ -20,15 +18,13 @@ class UserDataFiles(Enum):
|
|||
"""Enum for userdata files"""
|
||||
|
||||
USERDATA = 0
|
||||
TOKENS = 1
|
||||
JOBS = 2
|
||||
SECRETS = 3
|
||||
|
||||
|
||||
def get_domain():
|
||||
"""Get domain from /var/domain without trailing new line"""
|
||||
with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file:
|
||||
domain = domain_file.readline().rstrip()
|
||||
return domain
|
||||
"""Get domain from userdata.json"""
|
||||
with ReadUserData() as user_data:
|
||||
return user_data["domain"]
|
||||
|
||||
|
||||
class WriteUserData(object):
|
||||
|
@ -37,14 +33,12 @@ class WriteUserData(object):
|
|||
def __init__(self, file_type=UserDataFiles.USERDATA):
|
||||
if file_type == UserDataFiles.USERDATA:
|
||||
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
|
||||
elif file_type == UserDataFiles.TOKENS:
|
||||
self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8")
|
||||
elif file_type == UserDataFiles.JOBS:
|
||||
elif file_type == UserDataFiles.SECRETS:
|
||||
# Make sure file exists
|
||||
if not os.path.exists(JOBS_FILE):
|
||||
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
|
||||
jobs_file.write("{}")
|
||||
self.userdata_file = open(JOBS_FILE, "r+", encoding="utf-8")
|
||||
if not os.path.exists(SECRETS_FILE):
|
||||
with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file:
|
||||
secrets_file.write("{}")
|
||||
self.userdata_file = open(SECRETS_FILE, "r+", encoding="utf-8")
|
||||
else:
|
||||
raise ValueError("Unknown file type")
|
||||
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
|
||||
|
@ -68,14 +62,11 @@ class ReadUserData(object):
|
|||
def __init__(self, file_type=UserDataFiles.USERDATA):
|
||||
if file_type == UserDataFiles.USERDATA:
|
||||
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
|
||||
elif file_type == UserDataFiles.TOKENS:
|
||||
self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8")
|
||||
elif file_type == UserDataFiles.JOBS:
|
||||
# Make sure file exists
|
||||
if not os.path.exists(JOBS_FILE):
|
||||
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
|
||||
jobs_file.write("{}")
|
||||
self.userdata_file = open(JOBS_FILE, "r", encoding="utf-8")
|
||||
elif file_type == UserDataFiles.SECRETS:
|
||||
if not os.path.exists(SECRETS_FILE):
|
||||
with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file:
|
||||
secrets_file.write("{}")
|
||||
self.userdata_file = open(SECRETS_FILE, "r", encoding="utf-8")
|
||||
else:
|
||||
raise ValueError("Unknown file type")
|
||||
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
|
||||
|
|
|
@ -2,14 +2,15 @@
|
|||
import os
|
||||
from huey import SqliteHuey
|
||||
|
||||
HUEY_DATABASE = "/etc/nixos/userdata/tasks.db"
|
||||
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
|
||||
|
||||
# Singleton instance containing the huey database.
|
||||
|
||||
test_mode = os.environ.get("TEST_MODE")
|
||||
|
||||
huey = SqliteHuey(
|
||||
HUEY_DATABASE,
|
||||
"selfprivacy-api",
|
||||
filename=HUEY_DATABASE if not test_mode else None,
|
||||
immediate=test_mode == "true",
|
||||
utc=True,
|
||||
)
|
||||
|
|
|
@ -7,28 +7,28 @@ RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
|
|||
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
|
||||
|
||||
|
||||
def five_minutes_into_future_naive():
|
||||
return datetime.now() + timedelta(minutes=5)
|
||||
def ten_minutes_into_future_naive():
|
||||
return datetime.now() + timedelta(minutes=10)
|
||||
|
||||
|
||||
def five_minutes_into_future_naive_utc():
|
||||
return datetime.utcnow() + timedelta(minutes=5)
|
||||
def ten_minutes_into_future_naive_utc():
|
||||
return datetime.utcnow() + timedelta(minutes=10)
|
||||
|
||||
|
||||
def five_minutes_into_future():
|
||||
return datetime.now(timezone.utc) + timedelta(minutes=5)
|
||||
def ten_minutes_into_future():
|
||||
return datetime.now(timezone.utc) + timedelta(minutes=10)
|
||||
|
||||
|
||||
def five_minutes_into_past_naive():
|
||||
return datetime.now() - timedelta(minutes=5)
|
||||
def ten_minutes_into_past_naive():
|
||||
return datetime.now() - timedelta(minutes=10)
|
||||
|
||||
|
||||
def five_minutes_into_past_naive_utc():
|
||||
return datetime.utcnow() - timedelta(minutes=5)
|
||||
def ten_minutes_into_past_naive_utc():
|
||||
return datetime.utcnow() - timedelta(minutes=10)
|
||||
|
||||
|
||||
def five_minutes_into_past():
|
||||
return datetime.now(timezone.utc) - timedelta(minutes=5)
|
||||
def ten_minutes_into_past():
|
||||
return datetime.now(timezone.utc) - timedelta(minutes=10)
|
||||
|
||||
|
||||
class NearFuture(datetime):
|
||||
|
|
|
@ -9,6 +9,7 @@ from os import path
|
|||
from os import makedirs
|
||||
from typing import Generator
|
||||
from fastapi.testclient import TestClient
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
|
||||
from selfprivacy_api.utils.huey import huey
|
||||
|
||||
|
@ -16,22 +17,14 @@ import selfprivacy_api.services as services
|
|||
from selfprivacy_api.services import get_service_by_id, Service
|
||||
from selfprivacy_api.services.test_service import DummyService
|
||||
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
||||
JsonTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
|
||||
RedisTokensRepository,
|
||||
)
|
||||
|
||||
from tests.common import read_json
|
||||
|
||||
TESTFILE_BODY = "testytest!"
|
||||
TESTFILE_2_BODY = "testissimo!"
|
||||
|
||||
EMPTY_TOKENS_JSON = ' {"tokens": []}'
|
||||
|
||||
|
||||
TOKENS_FILE_CONTENTS = {
|
||||
"tokens": [
|
||||
{
|
||||
|
@ -47,6 +40,19 @@ TOKENS_FILE_CONTENTS = {
|
|||
]
|
||||
}
|
||||
|
||||
TOKENS = [
|
||||
Token(
|
||||
token="TEST_TOKEN",
|
||||
device_name="test_token",
|
||||
created_at=datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
|
||||
),
|
||||
Token(
|
||||
token="TEST_TOKEN2",
|
||||
device_name="test_token2",
|
||||
created_at=datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
|
||||
),
|
||||
]
|
||||
|
||||
DEVICE_WE_AUTH_TESTS_WITH = TOKENS_FILE_CONTENTS["tokens"][0]
|
||||
|
||||
|
||||
|
@ -58,25 +64,6 @@ def global_data_dir():
|
|||
return path.join(path.dirname(__file__), "data")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_tokens(mocker, tmpdir):
|
||||
tokenfile = tmpdir / "empty_tokens.json"
|
||||
with open(tokenfile, "w") as file:
|
||||
file.write(EMPTY_TOKENS_JSON)
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile)
|
||||
assert read_json(tokenfile)["tokens"] == []
|
||||
return tmpdir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_json_repo(empty_tokens):
|
||||
repo = JsonTokensRepository()
|
||||
for token in repo.get_tokens():
|
||||
repo.delete_token(token)
|
||||
assert repo.get_tokens() == []
|
||||
return repo
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_redis_repo():
|
||||
repo = RedisTokensRepository()
|
||||
|
@ -86,25 +73,14 @@ def empty_redis_repo():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def tokens_file(empty_redis_repo, tmpdir):
|
||||
"""A state with tokens"""
|
||||
repo = empty_redis_repo
|
||||
for token in TOKENS_FILE_CONTENTS["tokens"]:
|
||||
repo._store_token(
|
||||
Token(
|
||||
token=token["token"],
|
||||
device_name=token["name"],
|
||||
created_at=token["date"],
|
||||
def redis_repo_with_tokens():
|
||||
repo = RedisTokensRepository()
|
||||
repo.reset()
|
||||
for token in TOKENS:
|
||||
repo._store_token(token)
|
||||
assert sorted(repo.get_tokens(), key=lambda x: x.token) == sorted(
|
||||
TOKENS, key=lambda x: x.token
|
||||
)
|
||||
)
|
||||
return repo
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def jobs_file(mocker, shared_datadir):
|
||||
"""Mock tokens file."""
|
||||
mock = mocker.patch("selfprivacy_api.utils.JOBS_FILE", shared_datadir / "jobs.json")
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -131,14 +107,14 @@ def huey_database(mocker, shared_datadir):
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def client(tokens_file, huey_database, jobs_file):
|
||||
def client(huey_database, redis_repo_with_tokens):
|
||||
from selfprivacy_api.app import app
|
||||
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def authorized_client(tokens_file, huey_database, jobs_file):
|
||||
def authorized_client(huey_database, redis_repo_with_tokens):
|
||||
"""Authorized test client fixture."""
|
||||
from selfprivacy_api.app import app
|
||||
|
||||
|
@ -150,7 +126,7 @@ def authorized_client(tokens_file, huey_database, jobs_file):
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def wrong_auth_client(tokens_file, huey_database, jobs_file):
|
||||
def wrong_auth_client(huey_database, redis_repo_with_tokens):
|
||||
"""Wrong token test client fixture."""
|
||||
from selfprivacy_api.app import app
|
||||
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
{}
|
|
@ -1,14 +0,0 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "TEST_TOKEN",
|
||||
"name": "test_token",
|
||||
"date": "2022-01-14 08:31:10.789314"
|
||||
},
|
||||
{
|
||||
"token": "TEST_TOKEN2",
|
||||
"name": "test_token2",
|
||||
"date": "2022-01-14 08:31:10.789314"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -1,40 +1,20 @@
|
|||
{
|
||||
"api": {"token": "TEST_TOKEN", "enableSwagger": false},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": ["ssh-ed25519 KEY test@pc"]
|
||||
},
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"autoUpgrade": {"enable": true, "allowReboot": true},
|
||||
"useBinds": true,
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": ["ssh-rsa KEY test@pc"],
|
||||
"dns": {"provider": "CLOUDFLARE", "apiKey": "TOKEN"},
|
||||
"server": {"provider": "HETZNER"},
|
||||
"modules": {
|
||||
"bitwarden": {"enable": true},
|
||||
"gitea": {"enable": true},
|
||||
"ocserv": {"enable": true},
|
||||
"pleroma": {"enable": true},
|
||||
"jitsi": {"enable": true},
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
}
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
},
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [
|
||||
{
|
||||
"username": "user1",
|
||||
|
@ -51,5 +31,57 @@
|
|||
"hashedPassword": "HASHED_PASSWORD_3",
|
||||
"sshKeys": ["ssh-rsa KEY user3@pc"]
|
||||
}
|
||||
],
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ def only_root_in_userdata(mocker, datadir):
|
|||
read_json(datadir / "only_root.json")["volumes"][0]["mountPoint"]
|
||||
== "/volumes/sda1"
|
||||
)
|
||||
assert read_json(datadir / "only_root.json")["volumes"][0]["filesystem"] == "ext4"
|
||||
assert read_json(datadir / "only_root.json")["volumes"][0]["fsType"] == "ext4"
|
||||
return datadir
|
||||
|
||||
|
||||
|
|
|
@ -1,59 +1,59 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
},
|
||||
"volumes": [
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,64 +1,65 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sda1",
|
||||
"mountPoint": "/volumes/sda1",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sda1",
|
||||
"mountPoint": "/volumes/sda1",
|
||||
"filesystem": "ext4"
|
||||
}
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,57 +1,58 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ DKIM_FILE_CONTENT = b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def dkim_file(mocker, domain_file, tmpdir):
|
||||
def dkim_file(mocker, tmpdir, generic_userdata):
|
||||
domain = get_domain()
|
||||
assert domain is not None
|
||||
assert domain != ""
|
||||
|
@ -27,14 +27,6 @@ def dkim_file(mocker, domain_file, tmpdir):
|
|||
return dkim_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def domain_file(mocker):
|
||||
# TODO: move to conftest. Challenge: it does not behave with "/" like pytest datadir does
|
||||
domain_path = path.join(global_data_dir(), "domain")
|
||||
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", domain_path)
|
||||
return domain_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def no_dkim_file(dkim_file):
|
||||
os.remove(dkim_file)
|
||||
|
@ -45,7 +37,7 @@ def no_dkim_file(dkim_file):
|
|||
###############################################################################
|
||||
|
||||
|
||||
def test_get_dkim_key(domain_file, dkim_file):
|
||||
def test_get_dkim_key(dkim_file):
|
||||
"""Test DKIM key"""
|
||||
dkim_key = get_dkim_key("test-domain.tld")
|
||||
assert (
|
||||
|
@ -54,7 +46,7 @@ def test_get_dkim_key(domain_file, dkim_file):
|
|||
)
|
||||
|
||||
|
||||
def test_no_dkim_key(domain_file, no_dkim_file):
|
||||
def test_no_dkim_key(no_dkim_file):
|
||||
"""Test no DKIM key"""
|
||||
dkim_key = get_dkim_key("test-domain.tld")
|
||||
assert dkim_key is None
|
||||
|
|
|
@ -9,7 +9,7 @@ from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY
|
|||
from tests.test_graphql.test_api_version import API_VERSION_QUERY
|
||||
|
||||
|
||||
def test_graphql_get_entire_api_data(authorized_client, tokens_file):
|
||||
def test_graphql_get_entire_api_data(authorized_client):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
|
|
|
@ -7,7 +7,7 @@ from tests.common import (
|
|||
NearFuture,
|
||||
generate_api_query,
|
||||
)
|
||||
from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH, TOKENS_FILE_CONTENTS
|
||||
from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH
|
||||
from tests.test_graphql.common import (
|
||||
get_data,
|
||||
assert_empty,
|
||||
|
@ -66,11 +66,11 @@ def graphql_authorize_new_device(client, mnemonic_key, device_name) -> str:
|
|||
return token
|
||||
|
||||
|
||||
def test_graphql_tokens_info(authorized_client, tokens_file):
|
||||
def test_graphql_tokens_info(authorized_client):
|
||||
assert_original(authorized_client)
|
||||
|
||||
|
||||
def test_graphql_tokens_info_unauthorized(client, tokens_file):
|
||||
def test_graphql_tokens_info_unauthorized(client):
|
||||
response = request_devices(client)
|
||||
assert_empty(response)
|
||||
|
||||
|
@ -88,7 +88,7 @@ mutation DeleteToken($device: String!) {
|
|||
"""
|
||||
|
||||
|
||||
def test_graphql_delete_token_unauthorized(client, tokens_file):
|
||||
def test_graphql_delete_token_unauthorized(client):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
|
@ -101,7 +101,7 @@ def test_graphql_delete_token_unauthorized(client, tokens_file):
|
|||
assert_empty(response)
|
||||
|
||||
|
||||
def test_graphql_delete_token(authorized_client, tokens_file):
|
||||
def test_graphql_delete_token(authorized_client):
|
||||
test_devices = ORIGINAL_DEVICES.copy()
|
||||
device_to_delete = test_devices.pop(1)
|
||||
assert device_to_delete != DEVICE_WE_AUTH_TESTS_WITH
|
||||
|
@ -121,7 +121,7 @@ def test_graphql_delete_token(authorized_client, tokens_file):
|
|||
assert_same(devices, test_devices)
|
||||
|
||||
|
||||
def test_graphql_delete_self_token(authorized_client, tokens_file):
|
||||
def test_graphql_delete_self_token(authorized_client):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
|
@ -137,7 +137,6 @@ def test_graphql_delete_self_token(authorized_client, tokens_file):
|
|||
|
||||
def test_graphql_delete_nonexistent_token(
|
||||
authorized_client,
|
||||
tokens_file,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
|
@ -167,7 +166,7 @@ mutation RefreshToken {
|
|||
"""
|
||||
|
||||
|
||||
def test_graphql_refresh_token_unauthorized(client, tokens_file):
|
||||
def test_graphql_refresh_token_unauthorized(client):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={"query": REFRESH_TOKEN_MUTATION},
|
||||
|
@ -175,7 +174,7 @@ def test_graphql_refresh_token_unauthorized(client, tokens_file):
|
|||
assert_empty(response)
|
||||
|
||||
|
||||
def test_graphql_refresh_token(authorized_client, client, tokens_file):
|
||||
def test_graphql_refresh_token(authorized_client, client):
|
||||
caller_name_and_date = graphql_get_caller_token_info(authorized_client)
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
|
@ -206,7 +205,6 @@ mutation NewDeviceKey {
|
|||
|
||||
def test_graphql_get_new_device_auth_key_unauthorized(
|
||||
client,
|
||||
tokens_file,
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
|
@ -230,7 +228,6 @@ mutation InvalidateNewDeviceKey {
|
|||
|
||||
def test_graphql_invalidate_new_device_token_unauthorized(
|
||||
client,
|
||||
tokens_file,
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
|
@ -244,7 +241,7 @@ def test_graphql_invalidate_new_device_token_unauthorized(
|
|||
assert_empty(response)
|
||||
|
||||
|
||||
def test_graphql_get_and_delete_new_device_key(client, authorized_client, tokens_file):
|
||||
def test_graphql_get_and_delete_new_device_key(client, authorized_client):
|
||||
mnemonic_key = graphql_get_new_device_key(authorized_client)
|
||||
|
||||
response = authorized_client.post(
|
||||
|
@ -271,7 +268,7 @@ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
|
|||
"""
|
||||
|
||||
|
||||
def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file):
|
||||
def test_graphql_get_and_authorize_new_device(client, authorized_client):
|
||||
mnemonic_key = graphql_get_new_device_key(authorized_client)
|
||||
old_devices = graphql_get_devices(authorized_client)
|
||||
|
||||
|
@ -282,16 +279,14 @@ def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_
|
|||
assert "new_device" in [device["name"] for device in new_devices]
|
||||
|
||||
|
||||
def test_graphql_authorize_new_device_with_invalid_key(
|
||||
client, authorized_client, tokens_file
|
||||
):
|
||||
def test_graphql_authorize_new_device_with_invalid_key(client, authorized_client):
|
||||
response = graphql_try_auth_new_device(client, "invalid_token", "new_device")
|
||||
assert_errorcode(get_data(response)["api"]["authorizeWithNewDeviceApiKey"], 404)
|
||||
|
||||
assert_original(authorized_client)
|
||||
|
||||
|
||||
def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file):
|
||||
def test_graphql_get_and_authorize_used_key(client, authorized_client):
|
||||
mnemonic_key = graphql_get_new_device_key(authorized_client)
|
||||
|
||||
graphql_authorize_new_device(client, mnemonic_key, "new_device")
|
||||
|
@ -304,7 +299,7 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi
|
|||
|
||||
|
||||
def test_graphql_get_and_authorize_key_after_12_minutes(
|
||||
client, authorized_client, tokens_file, mocker
|
||||
client, authorized_client, mocker
|
||||
):
|
||||
mnemonic_key = graphql_get_new_device_key(authorized_client)
|
||||
mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
|
||||
|
@ -315,7 +310,6 @@ def test_graphql_get_and_authorize_key_after_12_minutes(
|
|||
|
||||
def test_graphql_authorize_without_token(
|
||||
client,
|
||||
tokens_file,
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
|
|
|
@ -14,9 +14,9 @@ from tests.common import (
|
|||
)
|
||||
|
||||
# Graphql API's output should be timezone-naive
|
||||
from tests.common import five_minutes_into_future_naive_utc as five_minutes_into_future
|
||||
from tests.common import five_minutes_into_future as five_minutes_into_future_tz
|
||||
from tests.common import five_minutes_into_past_naive_utc as five_minutes_into_past
|
||||
from tests.common import ten_minutes_into_future_naive_utc as ten_minutes_into_future
|
||||
from tests.common import ten_minutes_into_future as ten_minutes_into_future_tz
|
||||
from tests.common import ten_minutes_into_past_naive_utc as ten_minutes_into_past
|
||||
|
||||
from tests.test_graphql.common import (
|
||||
assert_empty,
|
||||
|
@ -111,12 +111,12 @@ def graphql_use_recovery_key(client, key, device_name):
|
|||
return token
|
||||
|
||||
|
||||
def test_graphql_recovery_key_status_unauthorized(client, tokens_file):
|
||||
def test_graphql_recovery_key_status_unauthorized(client):
|
||||
response = request_recovery_status(client)
|
||||
assert_empty(response)
|
||||
|
||||
|
||||
def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file):
|
||||
def test_graphql_recovery_key_status_when_none_exists(authorized_client):
|
||||
status = graphql_recovery_status(authorized_client)
|
||||
assert status["exists"] is False
|
||||
assert status["valid"] is False
|
||||
|
@ -152,7 +152,7 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
|
|||
"""
|
||||
|
||||
|
||||
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
|
||||
def test_graphql_generate_recovery_key(client, authorized_client):
|
||||
key = graphql_make_new_recovery_key(authorized_client)
|
||||
|
||||
status = graphql_recovery_status(authorized_client)
|
||||
|
@ -168,10 +168,10 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"expiration_date", [five_minutes_into_future(), five_minutes_into_future_tz()]
|
||||
"expiration_date", [ten_minutes_into_future(), ten_minutes_into_future_tz()]
|
||||
)
|
||||
def test_graphql_generate_recovery_key_with_expiration_date(
|
||||
client, authorized_client, tokens_file, expiration_date: datetime
|
||||
client, authorized_client, expiration_date: datetime
|
||||
):
|
||||
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
|
||||
|
||||
|
@ -192,10 +192,8 @@ def test_graphql_generate_recovery_key_with_expiration_date(
|
|||
graphql_use_recovery_key(client, key, "new_test_token2")
|
||||
|
||||
|
||||
def test_graphql_use_recovery_key_after_expiration(
|
||||
client, authorized_client, tokens_file, mocker
|
||||
):
|
||||
expiration_date = five_minutes_into_future()
|
||||
def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mocker):
|
||||
expiration_date = ten_minutes_into_future()
|
||||
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
|
||||
|
||||
# Timewarp to after it expires
|
||||
|
@ -220,10 +218,8 @@ def test_graphql_use_recovery_key_after_expiration(
|
|||
assert status["usesLeft"] is None
|
||||
|
||||
|
||||
def test_graphql_generate_recovery_key_with_expiration_in_the_past(
|
||||
authorized_client, tokens_file
|
||||
):
|
||||
expiration_date = five_minutes_into_past()
|
||||
def test_graphql_generate_recovery_key_with_expiration_in_the_past(authorized_client):
|
||||
expiration_date = ten_minutes_into_past()
|
||||
response = request_make_new_recovery_key(
|
||||
authorized_client, expires_at=expiration_date
|
||||
)
|
||||
|
@ -235,9 +231,7 @@ def test_graphql_generate_recovery_key_with_expiration_in_the_past(
|
|||
assert graphql_recovery_status(authorized_client)["exists"] is False
|
||||
|
||||
|
||||
def test_graphql_generate_recovery_key_with_invalid_time_format(
|
||||
authorized_client, tokens_file
|
||||
):
|
||||
def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_client):
|
||||
expiration_date = "invalid_time_format"
|
||||
expiration_date_str = expiration_date
|
||||
|
||||
|
@ -256,10 +250,7 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(
|
|||
assert graphql_recovery_status(authorized_client)["exists"] is False
|
||||
|
||||
|
||||
def test_graphql_generate_recovery_key_with_limited_uses(
|
||||
authorized_client, client, tokens_file
|
||||
):
|
||||
|
||||
def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, client):
|
||||
mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2)
|
||||
|
||||
status = graphql_recovery_status(authorized_client)
|
||||
|
@ -292,9 +283,7 @@ def test_graphql_generate_recovery_key_with_limited_uses(
|
|||
assert_errorcode(output, 404)
|
||||
|
||||
|
||||
def test_graphql_generate_recovery_key_with_negative_uses(
|
||||
authorized_client, tokens_file
|
||||
):
|
||||
def test_graphql_generate_recovery_key_with_negative_uses(authorized_client):
|
||||
response = request_make_new_recovery_key(authorized_client, uses=-1)
|
||||
|
||||
output = get_data(response)["api"]["getNewRecoveryApiKey"]
|
||||
|
@ -303,7 +292,7 @@ def test_graphql_generate_recovery_key_with_negative_uses(
|
|||
assert graphql_recovery_status(authorized_client)["exists"] is False
|
||||
|
||||
|
||||
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file):
|
||||
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client):
|
||||
response = request_make_new_recovery_key(authorized_client, uses=0)
|
||||
|
||||
output = get_data(response)["api"]["getNewRecoveryApiKey"]
|
||||
|
|
|
@ -503,7 +503,7 @@ def test_move_same_volume(authorized_client, dummy_service):
|
|||
|
||||
|
||||
def test_mailservice_cannot_enable_disable(authorized_client):
|
||||
mailservice = get_service_by_id("email")
|
||||
mailservice = get_service_by_id("simple-nixos-mailserver")
|
||||
|
||||
mutation_response = api_enable(authorized_client, mailservice)
|
||||
data = get_data(mutation_response)["services"]["enableService"]
|
||||
|
|
|
@ -308,7 +308,6 @@ original_settings = [
|
|||
def test_graphql_readwrite_ssh_settings(
|
||||
authorized_client, some_users, settings, original_settings
|
||||
):
|
||||
|
||||
# Userdata-related tests like undefined fields are in actions-level tests.
|
||||
output = api_set_ssh_settings_dict(authorized_client, original_settings)
|
||||
assert_includes(api_ssh_settings(authorized_client), output)
|
||||
|
@ -334,7 +333,6 @@ forbidden_settings = [
|
|||
def test_graphql_readwrite_ssh_settings_partial(
|
||||
authorized_client, some_users, settings, original_settings
|
||||
):
|
||||
|
||||
output = api_set_ssh_settings_dict(authorized_client, original_settings)
|
||||
with pytest.raises(Exception):
|
||||
output = api_set_ssh_settings_dict(authorized_client, settings)
|
||||
|
|
|
@ -1,43 +1,17 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
|
@ -60,17 +34,50 @@
|
|||
"hashedPassword": "HASHED_PASSWORD_3"
|
||||
}
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,18 +9,12 @@ from tests.test_graphql.common import assert_empty
|
|||
from tests.test_dkim import no_dkim_file, dkim_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def domain_file(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def turned_on(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
|
||||
assert read_json(datadir / "turned_on.json")["autoUpgrade"]["enable"] == True
|
||||
assert read_json(datadir / "turned_on.json")["autoUpgrade"]["allowReboot"] == True
|
||||
assert read_json(datadir / "turned_on.json")["timezone"] == "Europe/Moscow"
|
||||
assert read_json(datadir / "turned_on.json")["timezone"] == "Etc/UTC"
|
||||
return datadir
|
||||
|
||||
|
||||
|
@ -29,7 +23,7 @@ def turned_off(mocker, datadir):
|
|||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
|
||||
assert read_json(datadir / "turned_off.json")["autoUpgrade"]["enable"] == False
|
||||
assert read_json(datadir / "turned_off.json")["autoUpgrade"]["allowReboot"] == False
|
||||
assert read_json(datadir / "turned_off.json")["timezone"] == "Europe/Moscow"
|
||||
assert read_json(datadir / "turned_off.json")["timezone"] == "Etc/UTC"
|
||||
return datadir
|
||||
|
||||
|
||||
|
@ -251,7 +245,7 @@ def is_dns_record_in_array(records, dns_record) -> bool:
|
|||
|
||||
|
||||
def test_graphql_get_domain(
|
||||
authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
|
||||
authorized_client, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
|
||||
):
|
||||
"""Test get domain"""
|
||||
response = authorized_client.post(
|
||||
|
@ -262,7 +256,9 @@ def test_graphql_get_domain(
|
|||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
assert response.json()["data"]["system"]["domainInfo"]["domain"] == "test.tld"
|
||||
assert (
|
||||
response.json()["data"]["system"]["domainInfo"]["domain"] == "test-domain.tld"
|
||||
)
|
||||
assert (
|
||||
response.json()["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
|
||||
)
|
||||
|
@ -335,7 +331,6 @@ def test_graphql_get_domain(
|
|||
|
||||
def test_graphql_get_domain_no_dkim(
|
||||
authorized_client,
|
||||
domain_file,
|
||||
mock_get_ip4,
|
||||
mock_get_ip6,
|
||||
no_dkim_file,
|
||||
|
@ -384,7 +379,7 @@ def test_graphql_get_timezone(authorized_client, turned_on):
|
|||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
assert response.json()["data"]["system"]["settings"]["timezone"] == "Europe/Moscow"
|
||||
assert response.json()["data"]["system"]["settings"]["timezone"] == "Etc/UTC"
|
||||
|
||||
|
||||
def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config):
|
||||
|
@ -397,9 +392,7 @@ def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config):
|
|||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
assert (
|
||||
response.json()["data"]["system"]["settings"]["timezone"] == "Europe/Uzhgorod"
|
||||
)
|
||||
assert response.json()["data"]["system"]["settings"]["timezone"] == "Etc/UTC"
|
||||
|
||||
|
||||
API_CHANGE_TIMEZONE_MUTATION = """
|
||||
|
@ -423,7 +416,7 @@ def test_graphql_change_timezone_unauthorized(client, turned_on):
|
|||
json={
|
||||
"query": API_CHANGE_TIMEZONE_MUTATION,
|
||||
"variables": {
|
||||
"timezone": "Europe/Moscow",
|
||||
"timezone": "Etc/UTC",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
@ -495,7 +488,7 @@ def test_graphql_change_timezone_without_timezone(authorized_client, turned_on):
|
|||
assert response.json()["data"]["system"]["changeTimezone"]["message"] is not None
|
||||
assert response.json()["data"]["system"]["changeTimezone"]["code"] == 400
|
||||
assert response.json()["data"]["system"]["changeTimezone"]["timezone"] is None
|
||||
assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow"
|
||||
assert read_json(turned_on / "turned_on.json")["timezone"] == "Etc/UTC"
|
||||
|
||||
|
||||
def test_graphql_change_timezone_with_invalid_timezone(authorized_client, turned_on):
|
||||
|
@ -515,7 +508,7 @@ def test_graphql_change_timezone_with_invalid_timezone(authorized_client, turned
|
|||
assert response.json()["data"]["system"]["changeTimezone"]["message"] is not None
|
||||
assert response.json()["data"]["system"]["changeTimezone"]["code"] == 400
|
||||
assert response.json()["data"]["system"]["changeTimezone"]["timezone"] is None
|
||||
assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow"
|
||||
assert read_json(turned_on / "turned_on.json")["timezone"] == "Etc/UTC"
|
||||
|
||||
|
||||
API_GET_AUTO_UPGRADE_SETTINGS_QUERY = """
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
test-domain.tld
|
|
@ -1,55 +1,62 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"autoUpgrade": {},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,57 +1,65 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"autoUpgrade": {
|
||||
"enable": false,
|
||||
"allowReboot": false
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": false,
|
||||
"allowReboot": false
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,62 +1,65 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"resticPassword": "PASS",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"modules": {
|
||||
"gitea": {
|
||||
"enable": true
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"jitsi": {
|
||||
"enable": true
|
||||
},
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
}
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,52 +1,60 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,12 +4,6 @@
|
|||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def domain_file(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
|
||||
return datadir
|
||||
|
||||
|
||||
class ProcessMock:
|
||||
"""Mock subprocess.Popen"""
|
||||
|
||||
|
|
|
@ -254,7 +254,6 @@ def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_pop
|
|||
|
||||
|
||||
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
|
||||
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
|
@ -275,7 +274,6 @@ def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen
|
|||
|
||||
|
||||
def test_graphql_get_some_user_undefined(authorized_client, undefined_settings):
|
||||
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
|
|
|
@ -1,59 +1,65 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [],
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,43 +1,17 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
|
@ -50,17 +24,50 @@
|
|||
]
|
||||
}
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,43 +1,17 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
|
@ -60,17 +34,50 @@
|
|||
"hashedPassword": "HASHED_PASSWORD_3"
|
||||
}
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,57 +1,64 @@
|
|||
{
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"useStagingACME": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"domain": "test-domain.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"timezone": "Etc/UTC",
|
||||
"username": "tester",
|
||||
"useBinds": true,
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"modules": {
|
||||
"bitwarden": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"jitsi-meet": {
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"nextcloud": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
},
|
||||
"simple-nixos-mailserver": {
|
||||
"enable": true,
|
||||
"location": "sdb"
|
||||
}
|
||||
},
|
||||
"volumes": [
|
||||
{
|
||||
"device": "/dev/sdb",
|
||||
"mountPoint": "/volumes/sdb",
|
||||
"fsType": "ext4"
|
||||
}
|
||||
],
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"dns": {
|
||||
"provider": "CLOUDFLARE",
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"server": {
|
||||
"provider": "HETZNER"
|
||||
},
|
||||
"backup": {
|
||||
"provider": "BACKBLAZE",
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
import pytest
|
||||
|
||||
from selfprivacy_api.migrations.modules_in_json import CreateModulesField
|
||||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
||||
from selfprivacy_api.services import get_all_services
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def stray_services(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "strays.json")
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def empty_json(generic_userdata):
|
||||
with WriteUserData() as data:
|
||||
data.clear()
|
||||
|
||||
with ReadUserData() as data:
|
||||
assert len(data.keys()) == 0
|
||||
|
||||
return
|
||||
|
||||
|
||||
def test_modules_empty_json(empty_json):
|
||||
with ReadUserData() as data:
|
||||
assert "modules" not in data.keys()
|
||||
|
||||
assert CreateModulesField().is_migration_needed()
|
||||
|
||||
CreateModulesField().migrate()
|
||||
assert not CreateModulesField().is_migration_needed()
|
||||
|
||||
with ReadUserData() as data:
|
||||
assert "modules" in data.keys()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("modules_field", [True, False])
|
||||
def test_modules_stray_services(modules_field, stray_services):
|
||||
if not modules_field:
|
||||
with WriteUserData() as data:
|
||||
del data["modules"]
|
||||
assert CreateModulesField().is_migration_needed()
|
||||
|
||||
CreateModulesField().migrate()
|
||||
|
||||
for service in get_all_services():
|
||||
# assumes we do not tolerate previous format
|
||||
assert service.is_enabled()
|
||||
if service.get_id() == "email":
|
||||
continue
|
||||
with ReadUserData() as data:
|
||||
assert service.get_id() in data["modules"].keys()
|
||||
assert service.get_id() not in data.keys()
|
||||
|
||||
assert not CreateModulesField().is_migration_needed()
|
||||
|
||||
|
||||
def test_modules_no_migration_on_generic_data(generic_userdata):
|
||||
assert not CreateModulesField().is_migration_needed()
|
|
@ -1,23 +0,0 @@
|
|||
{
|
||||
"bitwarden": {
|
||||
"enable": true
|
||||
},
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"gitea": {
|
||||
"enable": true
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"jitsi": {
|
||||
"enable": true
|
||||
},
|
||||
"modules": {}
|
||||
}
|
|
@ -1,245 +0,0 @@
|
|||
# pylint: disable=redefined-outer-name
|
||||
# pylint: disable=unused-argument
|
||||
# pylint: disable=missing-function-docstring
|
||||
"""
|
||||
tests that restrict json token repository implementation
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
from selfprivacy_api.repositories.tokens.exceptions import (
|
||||
TokenNotFound,
|
||||
RecoveryKeyNotFound,
|
||||
NewDeviceKeyNotFound,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
||||
JsonTokensRepository,
|
||||
)
|
||||
|
||||
from tests.common import read_json
|
||||
from test_tokens_repository import (
|
||||
mock_recovery_key_generate,
|
||||
mock_generate_token,
|
||||
mock_new_device_key_generate,
|
||||
)
|
||||
|
||||
ORIGINAL_TOKEN_CONTENT = [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698",
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
},
|
||||
]
|
||||
|
||||
EMPTY_KEYS_JSON = """
|
||||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tokens(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
|
||||
assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_keys(mocker, tmpdir):
|
||||
tokens_file = tmpdir / "empty_keys.json"
|
||||
with open(tokens_file, "w") as file:
|
||||
file.write(EMPTY_KEYS_JSON)
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file)
|
||||
assert read_json(tokens_file)["tokens"] == [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698",
|
||||
}
|
||||
]
|
||||
return tmpdir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def null_keys(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")
|
||||
assert read_json(datadir / "null_keys.json")["recovery_token"] is None
|
||||
assert read_json(datadir / "null_keys.json")["new_device"] is None
|
||||
return datadir
|
||||
|
||||
|
||||
def test_delete_token(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
input_token = Token(
|
||||
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
repo.delete_token(input_token)
|
||||
assert read_json(tokens / "tokens.json")["tokens"] == [
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def test_delete_not_found_token(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
input_token = Token(
|
||||
token="imbadtoken",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
with pytest.raises(TokenNotFound):
|
||||
assert repo.delete_token(input_token) is None
|
||||
|
||||
assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
|
||||
|
||||
|
||||
def test_create_recovery_key(tokens, mock_recovery_key_generate):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
|
||||
assert read_json(tokens / "tokens.json")["recovery_token"] == {
|
||||
"token": "889bf49c1d3199d71a2e704718772bd53a422020334db051",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
"expiration": None,
|
||||
"uses_left": 1,
|
||||
}
|
||||
|
||||
|
||||
def test_use_mnemonic_recovery_key_when_null(null_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(RecoveryKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
|
||||
device_name="newdevice",
|
||||
) == Token(
|
||||
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
|
||||
device_name="newdevice",
|
||||
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
|
||||
)
|
||||
|
||||
assert read_json(tokens / "tokens.json")["tokens"] == [
|
||||
{
|
||||
"date": "2022-07-15 17:41:31.675698",
|
||||
"name": "primary_token",
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
},
|
||||
{
|
||||
"date": "2022-11-14T06:06:32.777123",
|
||||
"name": "newdevice",
|
||||
"token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
|
||||
},
|
||||
]
|
||||
assert read_json(tokens / "tokens.json")["recovery_token"] == {
|
||||
"date": "2022-11-11T11:48:54.228038",
|
||||
"expiration": None,
|
||||
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
|
||||
"uses_left": 1,
|
||||
}
|
||||
|
||||
|
||||
def test_get_new_device_key(tokens, mock_new_device_key_generate):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_new_device_key() is not None
|
||||
assert read_json(tokens / "tokens.json")["new_device"] == {
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
"expiration": "2022-07-15T17:41:31.675698",
|
||||
"token": "43478d05b35e4781598acd76e33832bb",
|
||||
}
|
||||
|
||||
|
||||
def test_delete_new_device_key(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.delete_new_device_key() is None
|
||||
assert "new_device" not in read_json(tokens / "tokens.json")
|
||||
|
||||
|
||||
def test_delete_new_device_key_when_empty(empty_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
repo.delete_new_device_key()
|
||||
assert "new_device" not in read_json(empty_keys / "empty_keys.json")
|
||||
|
||||
|
||||
def test_use_mnemonic_new_device_key_when_null(null_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(NewDeviceKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_new_device_key(
|
||||
device_name="imnew",
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
)
|
||||
is None
|
||||
)
|
|
@ -1,9 +0,0 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698"
|
||||
}
|
||||
],
|
||||
"recovery_token": null,
|
||||
"new_device": null
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698"
|
||||
}
|
||||
],
|
||||
"recovery_token": {
|
||||
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
|
||||
"date": "2022-11-11T11:48:54.228038",
|
||||
"expiration": null,
|
||||
"uses_left": 2
|
||||
},
|
||||
"new_device": {
|
||||
"token": "2237238de23dc71ab558e317bdb8ff8e",
|
||||
"date": "2022-10-26 20:50:47.973212",
|
||||
"expiration": "2022-10-26 21:00:47.974153"
|
||||
}
|
||||
}
|
|
@ -17,9 +17,6 @@ from selfprivacy_api.repositories.tokens.exceptions import (
|
|||
NewDeviceKeyNotFound,
|
||||
)
|
||||
|
||||
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
||||
JsonTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
|
||||
RedisTokensRepository,
|
||||
)
|
||||
|
@ -27,7 +24,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
|||
AbstractTokensRepository,
|
||||
)
|
||||
|
||||
from tests.common import five_minutes_into_past, five_minutes_into_future
|
||||
from tests.common import ten_minutes_into_past, ten_minutes_into_future
|
||||
|
||||
|
||||
ORIGINAL_DEVICE_NAMES = [
|
||||
|
@ -133,10 +130,8 @@ def mock_recovery_key_generate(mocker):
|
|||
return mock
|
||||
|
||||
|
||||
@pytest.fixture(params=["json", "redis"])
|
||||
def empty_repo(request, empty_json_repo, empty_redis_repo):
|
||||
if request.param == "json":
|
||||
return empty_json_repo
|
||||
@pytest.fixture(params=["redis"])
|
||||
def empty_repo(request, empty_redis_repo):
|
||||
if request.param == "redis":
|
||||
return empty_redis_repo
|
||||
# return empty_json_repo
|
||||
|
@ -363,7 +358,7 @@ def test_use_mnemonic_expired_recovery_key(
|
|||
some_tokens_repo,
|
||||
):
|
||||
repo = some_tokens_repo
|
||||
expiration = five_minutes_into_past()
|
||||
expiration = ten_minutes_into_past()
|
||||
assert repo.create_recovery_key(uses_left=2, expiration=expiration) is not None
|
||||
recovery_key = repo.get_recovery_key()
|
||||
# TODO: do not ignore timezone once json backend is deleted
|
||||
|
@ -543,7 +538,7 @@ def test_use_mnemonic_expired_new_device_key(
|
|||
some_tokens_repo,
|
||||
):
|
||||
repo = some_tokens_repo
|
||||
expiration = five_minutes_into_past()
|
||||
expiration = ten_minutes_into_past()
|
||||
|
||||
key = repo.get_new_device_key()
|
||||
assert key is not None
|
||||
|
@ -582,24 +577,3 @@ def assert_identical(
|
|||
assert token in tokens_b
|
||||
assert repo_a.get_recovery_key() == repo_b.get_recovery_key()
|
||||
assert repo_a._get_stored_new_device_key() == repo_b._get_stored_new_device_key()
|
||||
|
||||
|
||||
def clone_to_redis(repo: JsonTokensRepository):
|
||||
other_repo = RedisTokensRepository()
|
||||
other_repo.clone(repo)
|
||||
assert_identical(repo, other_repo)
|
||||
|
||||
|
||||
# we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist
|
||||
def test_clone_json_to_redis_empty(empty_repo):
|
||||
repo = empty_repo
|
||||
if isinstance(repo, JsonTokensRepository):
|
||||
clone_to_redis(repo)
|
||||
|
||||
|
||||
def test_clone_json_to_redis_full(some_tokens_repo):
|
||||
repo = some_tokens_repo
|
||||
if isinstance(repo, JsonTokensRepository):
|
||||
repo.get_new_device_key()
|
||||
repo.create_recovery_key(five_minutes_into_future(), 2)
|
||||
clone_to_redis(repo)
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698"
|
||||
}
|
||||
],
|
||||
"recovery_token": null,
|
||||
"new_device": null
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698"
|
||||
}
|
||||
],
|
||||
"recovery_token": {
|
||||
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
|
||||
"date": "2022-11-11T11:48:54.228038",
|
||||
"expiration": null,
|
||||
"uses_left": 2
|
||||
},
|
||||
"new_device": {
|
||||
"token": "2237238de23dc71ab558e317bdb8ff8e",
|
||||
"date": "2022-10-26 20:50:47.973212",
|
||||
"expiration": "2022-10-26 21:00:47.974153"
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@ from selfprivacy_api.services.test_service import DummyService
|
|||
from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService
|
||||
from selfprivacy_api.services import get_enabled_services
|
||||
|
||||
from tests.test_dkim import domain_file, dkim_file, no_dkim_file
|
||||
from tests.test_dkim import dkim_file, no_dkim_file
|
||||
|
||||
|
||||
def test_unimplemented_folders_raises():
|
||||
|
|
|
@ -100,7 +100,7 @@ def test_read_json(possibly_undefined_ssh_settings):
|
|||
assert get_ssh_settings().enable == data["ssh"]["enable"]
|
||||
|
||||
if "passwordAuthentication" not in data["ssh"].keys():
|
||||
assert get_ssh_settings().passwordAuthentication is True
|
||||
assert get_ssh_settings().passwordAuthentication is False
|
||||
else:
|
||||
assert (
|
||||
get_ssh_settings().passwordAuthentication
|
||||
|
@ -111,7 +111,6 @@ def test_read_json(possibly_undefined_ssh_settings):
|
|||
def test_enabling_disabling_writes_json(
|
||||
possibly_undefined_ssh_settings, ssh_enable_spectrum, password_auth_spectrum
|
||||
):
|
||||
|
||||
original_enable = get_raw_json_ssh_setting("enable")
|
||||
original_password_auth = get_raw_json_ssh_setting("passwordAuthentication")
|
||||
|
||||
|
@ -352,7 +351,6 @@ def test_read_user_keys_from_json(generic_userdata, username):
|
|||
|
||||
@pytest.mark.parametrize("username", regular_users)
|
||||
def test_adding_user_key_writes_json(generic_userdata, username):
|
||||
|
||||
with WriteUserData() as data:
|
||||
user_index = find_user_index_in_json_users(data["users"], username)
|
||||
del data["users"][user_index]["sshKeys"]
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
||||
from selfprivacy_api.actions.users import delete_user
|
||||
|
||||
"""
|
||||
A place for user storage tests and other user tests that are not Graphql-specific.
|
||||
"""
|
||||
|
@ -8,6 +9,7 @@ from selfprivacy_api.actions.users import delete_user
|
|||
# It was born in order to not lose things that REST API tests checked for
|
||||
# In the future, user storage tests that are not dependent on actual API (graphql or otherwise) go here.
|
||||
|
||||
|
||||
def test_delete_user_writes_json(generic_userdata):
|
||||
delete_user("user2")
|
||||
with ReadUserData() as data:
|
||||
|
@ -15,12 +17,11 @@ def test_delete_user_writes_json(generic_userdata):
|
|||
{
|
||||
"username": "user1",
|
||||
"hashedPassword": "HASHED_PASSWORD_1",
|
||||
"sshKeys": ["ssh-rsa KEY user1@pc"]
|
||||
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
||||
},
|
||||
{
|
||||
"username": "user3",
|
||||
"hashedPassword": "HASHED_PASSWORD_3",
|
||||
"sshKeys": ["ssh-rsa KEY user3@pc"]
|
||||
}
|
||||
"sshKeys": ["ssh-rsa KEY user3@pc"],
|
||||
},
|
||||
]
|
||||
|
||||
|
|
Loading…
Reference in a new issue