Merge pull request 'Adapt API to the NixOS configuration changes' (#79) from remove-rest-flaked into master

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/79
This commit is contained in:
Inex Code 2024-01-10 16:53:12 +02:00
commit 0ccb85d5cc
77 changed files with 1249 additions and 2056 deletions

2
.gitignore vendored
View file

@ -148,3 +148,5 @@ cython_debug/
*.db
*.rdb
/result

73
README.md Normal file
View file

@ -0,0 +1,73 @@
# SelfPrivacy GraphQL API which allows app to control your server
## build
```console
$ nix build
```
As a result, you should get the `./result` symlink to a folder (in `/nix/store`) with build contents.
## develop & test
```console
$ nix develop
$ [SP devshell] pytest .
=================================== test session starts =====================================
platform linux -- Python 3.10.11, pytest-7.1.3, pluggy-1.0.0
rootdir: /data/selfprivacy/selfprivacy-rest-api
plugins: anyio-3.5.0, datadir-1.4.1, mock-3.8.2
collected 692 items
tests/test_block_device_utils.py ................. [ 2%]
tests/test_common.py ..... [ 3%]
tests/test_jobs.py ........ [ 4%]
tests/test_model_storage.py .. [ 4%]
tests/test_models.py .. [ 4%]
tests/test_network_utils.py ...... [ 5%]
tests/test_services.py ...... [ 6%]
tests/test_graphql/test_api.py . [ 6%]
tests/test_graphql/test_api_backup.py ............... [ 8%]
tests/test_graphql/test_api_devices.py ................. [ 11%]
tests/test_graphql/test_api_recovery.py ......... [ 12%]
tests/test_graphql/test_api_version.py .. [ 13%]
tests/test_graphql/test_backup.py ............................... [ 21%]
tests/test_graphql/test_localsecret.py ... [ 22%]
tests/test_graphql/test_ssh.py ............ [ 23%]
tests/test_graphql/test_system.py ............................. [ 28%]
tests/test_graphql/test_system_nixos_tasks.py ........ [ 29%]
tests/test_graphql/test_users.py .................................. [ 42%]
tests/test_graphql/test_repository/test_json_tokens_repository.py [ 44%]
tests/test_graphql/test_repository/test_tokens_repository.py .... [ 53%]
tests/test_rest_endpoints/test_auth.py .......................... [ 58%]
tests/test_rest_endpoints/test_system.py ........................ [ 63%]
tests/test_rest_endpoints/test_users.py ................................ [ 76%]
tests/test_rest_endpoints/services/test_bitwarden.py ............ [ 78%]
tests/test_rest_endpoints/services/test_gitea.py .............. [ 80%]
tests/test_rest_endpoints/services/test_mailserver.py ..... [ 81%]
tests/test_rest_endpoints/services/test_nextcloud.py ............ [ 83%]
tests/test_rest_endpoints/services/test_ocserv.py .............. [ 85%]
tests/test_rest_endpoints/services/test_pleroma.py .............. [ 87%]
tests/test_rest_endpoints/services/test_services.py .... [ 88%]
tests/test_rest_endpoints/services/test_ssh.py ..................... [100%]
============================== 692 passed in 352.76s (0:05:52) ===============================
```
If you don't have experimental flakes enabled, you can use the following command:
```console
nix --extra-experimental-features nix-command --extra-experimental-features flakes develop
```
## dependencies and dependant modules
Current flake inherits nixpkgs from NixOS configuration flake. So there is no need to refer to extra nixpkgs dependency if you want to be aligned with exact NixOS configuration.
![diagram](http://www.plantuml.com/plantuml/proxy?src=https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/raw/branch/master/nix-dependencies-diagram.puml)
Nix code for NixOS service module for API is located in NixOS configuration repository.
## current issues
- It's not clear how to store in this repository information about several compatible NixOS configuration commits, where API application tests pass. Currently, here is only a single `flake.lock`.

33
default.nix Normal file
View file

@ -0,0 +1,33 @@
{ pythonPackages, rev ? "local" }:
pythonPackages.buildPythonPackage rec {
pname = "selfprivacy-graphql-api";
version = rev;
src = builtins.filterSource (p: t: p != ".git" && t != "symlink") ./.;
nativeCheckInputs = [ pythonPackages.pytestCheckHook ];
propagatedBuildInputs = with pythonPackages; [
fastapi
gevent
huey
mnemonic
portalocker
psutil
pydantic
pytest
pytest-datadir
pytest-mock
pytz
redis
setuptools
strawberry-graphql
typing-extensions
uvicorn
];
pythonImportsCheck = [ "selfprivacy_api" ];
doCheck = false;
meta = {
description = ''
SelfPrivacy Server Management API
'';
};
}

26
flake.lock Normal file
View file

@ -0,0 +1,26 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1702780907,
"narHash": "sha256-blbrBBXjjZt6OKTcYX1jpe9SRof2P9ZYWPzq22tzXAA=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "1e2e384c5b7c50dbf8e9c441a9e58d85f408b01f",
"type": "github"
},
"original": {
"owner": "nixos",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

50
flake.nix Normal file
View file

@ -0,0 +1,50 @@
{
description = "SelfPrivacy API flake";
inputs.nixpkgs.url = "github:nixos/nixpkgs";
outputs = { self, nixpkgs, ... }:
let
system = "x86_64-linux";
pkgs = nixpkgs.legacyPackages.${system};
selfprivacy-graphql-api = pkgs.callPackage ./default.nix {
pythonPackages = pkgs.python310Packages;
rev = self.shortRev or self.dirtyShortRev or "dirty";
};
in
{
packages.${system}.default = selfprivacy-graphql-api;
nixosModules.default =
import ./nixos/module.nix self.packages.${system}.default;
devShells.${system}.default = pkgs.mkShell {
packages =
let
# TODO is there a better way to get environment for VS Code?
python3 =
nixpkgs.lib.findFirst (p: p.pname == "python3") (abort "wtf")
self.packages.${system}.default.propagatedBuildInputs;
python-env =
python3.withPackages
(_: self.packages.${system}.default.propagatedBuildInputs);
in
with pkgs; [
python-env
black
rclone
redis
restic
];
shellHook = ''
# envs set with export and as attributes are treated differently.
# for example. printenv <Name> will not fetch the value of an attribute.
export USE_REDIS_PORT=6379
export TEST_MODE=true
pkill redis-server
sleep 2
setsid redis-server --bind 127.0.0.1 --port $USE_REDIS_PORT >/dev/null 2>/dev/null &
# maybe set more env-vars
'';
};
};
nixConfig.bash-prompt = ''\n\[\e[1;32m\][\[\e[0m\]\[\e[1;34m\]SP devshell\[\e[0m\]\[\e[1;32m\]:\w]\$\[\[\e[0m\] '';
}

View file

@ -0,0 +1,22 @@
@startuml
left to right direction
title repositories and flake inputs relations diagram
cloud nixpkgs as nixpkgs_transit
control "<font:monospaced><size:15>nixos-rebuild" as nixos_rebuild
component "SelfPrivacy\nAPI app" as selfprivacy_app
component "SelfPrivacy\nNixOS configuration" as nixos_configuration
note top of nixos_configuration : SelfPrivacy\nAPI service module
nixos_configuration ).. nixpkgs_transit
nixpkgs_transit ..> selfprivacy_app
selfprivacy_app --> nixos_configuration
[nixpkgs] --> nixos_configuration
nixos_configuration -> nixos_rebuild
footer %date("yyyy-MM-dd'T'HH:mmZ")
@enduml

166
nixos/module.nix Normal file
View file

@ -0,0 +1,166 @@
selfprivacy-graphql-api: { config, lib, pkgs, ... }:
let
cfg = config.services.selfprivacy-api;
config-id = "default";
nixos-rebuild = "${config.system.build.nixos-rebuild}/bin/nixos-rebuild";
nix = "${config.nix.package.out}/bin/nix";
in
{
options.services.selfprivacy-api = {
enable = lib.mkOption {
default = true;
type = lib.types.bool;
description = ''
Enable SelfPrivacy API service
'';
};
};
config = lib.mkIf cfg.enable {
users.users."selfprivacy-api" = {
isNormalUser = false;
isSystemUser = true;
extraGroups = [ "opendkim" ];
group = "selfprivacy-api";
};
users.groups."selfprivacy-api".members = [ "selfprivacy-api" ];
systemd.services.selfprivacy-api = {
description = "API Server used to control system from the mobile application";
environment = config.nix.envVars // {
HOME = "/root";
PYTHONUNBUFFERED = "1";
} // config.networking.proxy.envVars;
path = [
"/var/"
"/var/dkim/"
pkgs.coreutils
pkgs.gnutar
pkgs.xz.bin
pkgs.gzip
pkgs.gitMinimal
config.nix.package.out
pkgs.restic
pkgs.mkpasswd
pkgs.util-linux
pkgs.e2fsprogs
pkgs.iproute2
];
after = [ "network-online.target" ];
wantedBy = [ "network-online.target" ];
serviceConfig = {
User = "root";
ExecStart = "${selfprivacy-graphql-api}/bin/app.py";
Restart = "always";
RestartSec = "5";
};
};
systemd.services.selfprivacy-api-worker = {
description = "Task worker for SelfPrivacy API";
environment = config.nix.envVars // {
HOME = "/root";
PYTHONUNBUFFERED = "1";
PYTHONPATH =
pkgs.python310Packages.makePythonPath [ selfprivacy-graphql-api ];
} // config.networking.proxy.envVars;
path = [
"/var/"
"/var/dkim/"
pkgs.coreutils
pkgs.gnutar
pkgs.xz.bin
pkgs.gzip
pkgs.gitMinimal
config.nix.package.out
pkgs.restic
pkgs.mkpasswd
pkgs.util-linux
pkgs.e2fsprogs
pkgs.iproute2
];
after = [ "network-online.target" ];
wantedBy = [ "network-online.target" ];
serviceConfig = {
User = "root";
ExecStart = "${pkgs.python310Packages.huey}/bin/huey_consumer.py selfprivacy_api.task_registry.huey";
Restart = "always";
RestartSec = "5";
};
};
# One shot systemd service to rebuild NixOS using nixos-rebuild
systemd.services.sp-nixos-rebuild = {
description = "nixos-rebuild switch";
environment = config.nix.envVars // {
HOME = "/root";
} // config.networking.proxy.envVars;
# TODO figure out how to get dependencies list reliably
path = [ pkgs.coreutils pkgs.gnutar pkgs.xz.bin pkgs.gzip pkgs.gitMinimal config.nix.package.out ];
# TODO set proper timeout for reboot instead of service restart
serviceConfig = {
User = "root";
WorkingDirectory = "/etc/nixos";
# sync top-level flake with sp-modules sub-flake
# (https://github.com/NixOS/nix/issues/9339)
ExecStartPre = ''
${nix} flake lock --override-input sp-modules path:./sp-modules
'';
ExecStart = ''
${nixos-rebuild} switch --flake .#${config-id}
'';
KillMode = "none";
SendSIGKILL = "no";
};
restartIfChanged = false;
unitConfig.X-StopOnRemoval = false;
};
# One shot systemd service to upgrade NixOS using nixos-rebuild
systemd.services.sp-nixos-upgrade = {
# protection against simultaneous runs
after = [ "sp-nixos-rebuild.service" ];
description = "Upgrade NixOS and SP modules to latest versions";
environment = config.nix.envVars // {
HOME = "/root";
} // config.networking.proxy.envVars;
# TODO figure out how to get dependencies list reliably
path = [ pkgs.coreutils pkgs.gnutar pkgs.xz.bin pkgs.gzip pkgs.gitMinimal config.nix.package.out ];
serviceConfig = {
User = "root";
WorkingDirectory = "/etc/nixos";
# TODO get URL from systemd template parameter?
ExecStartPre = ''
${nix} flake update \
--override-input selfprivacy-nixos-config git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes
'';
ExecStart = ''
${nixos-rebuild} switch --flake .#${config-id}
'';
KillMode = "none";
SendSIGKILL = "no";
};
restartIfChanged = false;
unitConfig.X-StopOnRemoval = false;
};
# One shot systemd service to rollback NixOS using nixos-rebuild
systemd.services.sp-nixos-rollback = {
# protection against simultaneous runs
after = [ "sp-nixos-rebuild.service" "sp-nixos-upgrade.service" ];
description = "Rollback NixOS using nixos-rebuild";
environment = config.nix.envVars // {
HOME = "/root";
} // config.networking.proxy.envVars;
# TODO figure out how to get dependencies list reliably
path = [ pkgs.coreutils pkgs.gnutar pkgs.xz.bin pkgs.gzip pkgs.gitMinimal config.nix.package.out ];
serviceConfig = {
User = "root";
WorkingDirectory = "/etc/nixos";
ExecStart = ''
${nixos-rebuild} switch --rollback --flake .#${config-id}
'';
KillMode = "none";
SendSIGKILL = "no";
};
restartIfChanged = false;
unitConfig.X-StopOnRemoval = false;
};
};
}

View file

@ -31,7 +31,7 @@ def get_ssh_settings() -> UserdataSshSettings:
if "enable" not in data["ssh"]:
data["ssh"]["enable"] = True
if "passwordAuthentication" not in data["ssh"]:
data["ssh"]["passwordAuthentication"] = True
data["ssh"]["passwordAuthentication"] = False
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
return UserdataSshSettings(**data["ssh"])

View file

@ -13,7 +13,7 @@ def get_timezone() -> str:
with ReadUserData() as user_data:
if "timezone" in user_data:
return user_data["timezone"]
return "Europe/Uzhgorod"
return "Etc/UTC"
class InvalidTimezone(Exception):

View file

@ -372,7 +372,6 @@ class ResticBackupper(AbstractBackupper):
stderr=subprocess.STDOUT,
shell=False,
) as handle:
# for some reason restore does not support
# nice reporting of progress via json
output = handle.communicate()[0].decode("utf-8")

View file

@ -17,7 +17,6 @@ class UserType(Enum):
@strawberry.type
class User:
user_type: UserType
username: str
# userHomeFolderspace: UserHomeFolderUsage
@ -32,7 +31,6 @@ class UserMutationReturn(MutationReturnInterface):
def get_user_by_username(username: str) -> typing.Optional[User]:
user = users_actions.get_user_by_username(username)
if user is None:
return None

View file

@ -20,6 +20,7 @@ from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericMutationReturn,
)
from selfprivacy_api.graphql.mutations.services_mutations import (
ServiceJobMutationReturn,
ServiceMutationReturn,
ServicesMutations,
)
@ -201,7 +202,7 @@ class DeprecatedServicesMutations:
"services",
)
move_service: ServiceMutationReturn = deprecated_mutation(
move_service: ServiceJobMutationReturn = deprecated_mutation(
ServicesMutations.move_service,
"services",
)

View file

@ -15,7 +15,6 @@ from selfprivacy_api.jobs import Jobs
class Job:
@strawberry.field
def get_jobs(self) -> typing.List[ApiJob]:
Jobs.get_jobs()
return [job_to_api_job(job) for job in Jobs.get_jobs()]

View file

@ -8,37 +8,12 @@ at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.migrations.check_for_failed_binds_migration import (
CheckForFailedBindsMigration,
)
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
from selfprivacy_api.migrations.migrate_to_selfprivacy_channel import (
MigrateToSelfprivacyChannel,
)
from selfprivacy_api.migrations.mount_volume import MountVolume
from selfprivacy_api.migrations.providers import CreateProviderFields
from selfprivacy_api.migrations.modules_in_json import CreateModulesField
from selfprivacy_api.migrations.prepare_for_nixos_2211 import (
MigrateToSelfprivacyChannelFrom2205,
)
from selfprivacy_api.migrations.prepare_for_nixos_2305 import (
MigrateToSelfprivacyChannelFrom2211,
)
from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis
from selfprivacy_api.utils import ReadUserData, UserDataFiles
from selfprivacy_api.migrations.write_token_to_redis import WriteTokenToRedis
migrations = [
FixNixosConfigBranch(),
CreateTokensJson(),
MigrateToSelfprivacyChannel(),
MountVolume(),
CheckForFailedBindsMigration(),
CreateProviderFields(),
MigrateToSelfprivacyChannelFrom2205(),
MigrateToSelfprivacyChannelFrom2211(),
LoadTokensToRedis(),
CreateModulesField(),
WriteTokenToRedis(),
]
@ -47,7 +22,7 @@ def run_migrations():
Go over all migrations. If they are not skipped in userdata file, run them
if the migration needed.
"""
with ReadUserData() as data:
with ReadUserData(UserDataFiles.SECRETS) as data:
if "api" not in data:
skipped_migrations = []
elif "skippedMigrations" not in data["api"]:

View file

@ -1,48 +0,0 @@
from selfprivacy_api.jobs import JobStatus, Jobs
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import WriteUserData
class CheckForFailedBindsMigration(Migration):
"""Mount volume."""
def get_migration_name(self):
return "check_for_failed_binds_migration"
def get_migration_description(self):
return "If binds migration failed, try again."
def is_migration_needed(self):
try:
jobs = Jobs.get_jobs()
# If there is a job with type_id "migrations.migrate_to_binds" and status is not "FINISHED",
# then migration is needed and job is deleted
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
return True
return False
except Exception as e:
print(e)
return False
def migrate(self):
# Get info about existing volumes
# Write info about volumes to userdata.json
try:
jobs = Jobs.get_jobs()
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
Jobs.remove(job)
with WriteUserData() as userdata:
userdata["useBinds"] = False
print("Done")
except Exception as e:
print(e)
print("Error mounting volume")

View file

@ -1,58 +0,0 @@
from datetime import datetime
import os
import json
from pathlib import Path
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import TOKENS_FILE, ReadUserData
class CreateTokensJson(Migration):
def get_migration_name(self):
return "create_tokens_json"
def get_migration_description(self):
return """Selfprivacy API used a single token in userdata.json for authentication.
This migration creates a new tokens.json file with the old token in it.
This migration runs if the tokens.json file does not exist.
Old token is located at ["api"]["token"] in userdata.json.
tokens.json path is declared in TOKENS_FILE imported from utils.py
tokens.json must have the following format:
{
"tokens": [
{
"token": "token_string",
"name": "Master Token",
"date": "current date from str(datetime.now())",
}
]
}
tokens.json must have 0600 permissions.
"""
def is_migration_needed(self):
return not os.path.exists(TOKENS_FILE)
def migrate(self):
try:
print(f"Creating tokens.json file at {TOKENS_FILE}")
with ReadUserData() as userdata:
token = userdata["api"]["token"]
# Touch tokens.json with 0600 permissions
Path(TOKENS_FILE).touch(mode=0o600)
# Write token to tokens.json
structure = {
"tokens": [
{
"token": token,
"name": "primary_token",
"date": str(datetime.now()),
}
]
}
with open(TOKENS_FILE, "w", encoding="utf-8") as tokens:
json.dump(structure, tokens, indent=4)
print("Done")
except Exception as e:
print(e)
print("Error creating tokens.json")

View file

@ -1,57 +0,0 @@
import os
import subprocess
from selfprivacy_api.migrations.migration import Migration
class FixNixosConfigBranch(Migration):
def get_migration_name(self):
return "fix_nixos_config_branch"
def get_migration_description(self):
return """Mobile SelfPrivacy app introduced a bug in version 0.4.0.
New servers were initialized with a rolling-testing nixos config branch.
This was fixed in app version 0.4.2, but existing servers were not updated.
This migration fixes this by changing the nixos config branch to master.
"""
def is_migration_needed(self):
"""Check the current branch of /etc/nixos and return True if it is rolling-testing"""
current_working_directory = os.getcwd()
try:
os.chdir("/etc/nixos")
nixos_config_branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
)
os.chdir(current_working_directory)
return nixos_config_branch.decode("utf-8").strip() == "rolling-testing"
except subprocess.CalledProcessError:
os.chdir(current_working_directory)
return False
def migrate(self):
"""Affected server pulled the config with the --single-branch flag.
Git config remote.origin.fetch has to be changed, so all branches will be fetched.
Then, fetch all branches, pull and switch to master branch.
"""
print("Fixing Nixos config branch")
current_working_directory = os.getcwd()
try:
os.chdir("/etc/nixos")
subprocess.check_output(
[
"git",
"config",
"remote.origin.fetch",
"+refs/heads/*:refs/remotes/origin/*",
]
)
subprocess.check_output(["git", "fetch", "--all"])
subprocess.check_output(["git", "pull"])
subprocess.check_output(["git", "checkout", "master"])
os.chdir(current_working_directory)
print("Done")
except subprocess.CalledProcessError:
os.chdir(current_working_directory)
print("Error")

View file

@ -1,49 +0,0 @@
import os
import subprocess
from selfprivacy_api.migrations.migration import Migration
class MigrateToSelfprivacyChannel(Migration):
"""Migrate to selfprivacy Nix channel."""
def get_migration_name(self):
return "migrate_to_selfprivacy_channel"
def get_migration_description(self):
return "Migrate to selfprivacy Nix channel."
def is_migration_needed(self):
try:
output = subprocess.check_output(
["nix-channel", "--list"], start_new_session=True
)
output = output.decode("utf-8")
first_line = output.split("\n", maxsplit=1)[0]
return first_line.startswith("nixos") and (
first_line.endswith("nixos-21.11") or first_line.endswith("nixos-21.05")
)
except subprocess.CalledProcessError:
return False
def migrate(self):
# Change the channel and update them.
# Also, go to /etc/nixos directory and make a git pull
current_working_directory = os.getcwd()
try:
print("Changing channel")
os.chdir("/etc/nixos")
subprocess.check_output(
[
"nix-channel",
"--add",
"https://channel.selfprivacy.org/nixos-selfpricacy",
"nixos",
]
)
subprocess.check_output(["nix-channel", "--update"])
subprocess.check_output(["git", "pull"])
os.chdir(current_working_directory)
except subprocess.CalledProcessError:
os.chdir(current_working_directory)
print("Error")

View file

@ -1,50 +0,0 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.services import get_all_services
def migrate_services_to_modules():
with WriteUserData() as userdata:
if "modules" not in userdata.keys():
userdata["modules"] = {}
for service in get_all_services():
name = service.get_id()
if name in userdata.keys():
field_content = userdata[name]
userdata["modules"][name] = field_content
del userdata[name]
# If you ever want to get rid of modules field you will need to get rid of this migration
class CreateModulesField(Migration):
"""introduce 'modules' (services) into userdata"""
def get_migration_name(self):
return "modules_in_json"
def get_migration_description(self):
return "Group service settings into a 'modules' field in userdata.json"
def is_migration_needed(self) -> bool:
try:
with ReadUserData() as userdata:
for service in get_all_services():
if service.get_id() in userdata.keys():
return True
if "modules" not in userdata.keys():
return True
return False
except Exception as e:
print(e)
return False
def migrate(self):
# Write info about providers to userdata.json
try:
migrate_services_to_modules()
print("Done")
except Exception as e:
print(e)
print("Error migrating service fields")

View file

@ -1,51 +0,0 @@
import os
import subprocess
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.block_devices import BlockDevices
class MountVolume(Migration):
"""Mount volume."""
def get_migration_name(self):
return "mount_volume"
def get_migration_description(self):
return "Mount volume if it is not mounted."
def is_migration_needed(self):
try:
with ReadUserData() as userdata:
return "volumes" not in userdata
except Exception as e:
print(e)
return False
def migrate(self):
# Get info about existing volumes
# Write info about volumes to userdata.json
try:
volumes = BlockDevices().get_block_devices()
# If there is an unmounted volume sdb,
# Write it to userdata.json
is_there_a_volume = False
for volume in volumes:
if volume.name == "sdb":
is_there_a_volume = True
break
with WriteUserData() as userdata:
userdata["volumes"] = []
if is_there_a_volume:
userdata["volumes"].append(
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4",
}
)
print("Done")
except Exception as e:
print(e)
print("Error mounting volume")

View file

@ -1,58 +0,0 @@
import os
import subprocess
from selfprivacy_api.migrations.migration import Migration
class MigrateToSelfprivacyChannelFrom2205(Migration):
"""Migrate to selfprivacy Nix channel.
For some reason NixOS 22.05 servers initialized with the nixos channel instead of selfprivacy.
This stops us from upgrading to NixOS 22.11
"""
def get_migration_name(self):
return "migrate_to_selfprivacy_channel_from_2205"
def get_migration_description(self):
return "Migrate to selfprivacy Nix channel from NixOS 22.05."
def is_migration_needed(self):
try:
output = subprocess.check_output(
["nix-channel", "--list"], start_new_session=True
)
output = output.decode("utf-8")
first_line = output.split("\n", maxsplit=1)[0]
return first_line.startswith("nixos") and (
first_line.endswith("nixos-22.05")
)
except subprocess.CalledProcessError:
return False
def migrate(self):
# Change the channel and update them.
# Also, go to /etc/nixos directory and make a git pull
current_working_directory = os.getcwd()
try:
print("Changing channel")
os.chdir("/etc/nixos")
subprocess.check_output(
[
"nix-channel",
"--add",
"https://channel.selfprivacy.org/nixos-selfpricacy",
"nixos",
]
)
subprocess.check_output(["nix-channel", "--update"])
nixos_config_branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
)
if nixos_config_branch.decode("utf-8").strip() == "api-redis":
print("Also changing nixos-config branch from api-redis to master")
subprocess.check_output(["git", "checkout", "master"])
subprocess.check_output(["git", "pull"])
os.chdir(current_working_directory)
except subprocess.CalledProcessError:
os.chdir(current_working_directory)
print("Error")

View file

@ -1,58 +0,0 @@
import os
import subprocess
from selfprivacy_api.migrations.migration import Migration
class MigrateToSelfprivacyChannelFrom2211(Migration):
"""Migrate to selfprivacy Nix channel.
For some reason NixOS 22.11 servers initialized with the nixos channel instead of selfprivacy.
This stops us from upgrading to NixOS 23.05
"""
def get_migration_name(self):
return "migrate_to_selfprivacy_channel_from_2211"
def get_migration_description(self):
return "Migrate to selfprivacy Nix channel from NixOS 22.11."
def is_migration_needed(self):
try:
output = subprocess.check_output(
["nix-channel", "--list"], start_new_session=True
)
output = output.decode("utf-8")
first_line = output.split("\n", maxsplit=1)[0]
return first_line.startswith("nixos") and (
first_line.endswith("nixos-22.11")
)
except subprocess.CalledProcessError:
return False
def migrate(self):
# Change the channel and update them.
# Also, go to /etc/nixos directory and make a git pull
current_working_directory = os.getcwd()
try:
print("Changing channel")
os.chdir("/etc/nixos")
subprocess.check_output(
[
"nix-channel",
"--add",
"https://channel.selfprivacy.org/nixos-selfpricacy",
"nixos",
]
)
subprocess.check_output(["nix-channel", "--update"])
nixos_config_branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
)
if nixos_config_branch.decode("utf-8").strip() == "api-redis":
print("Also changing nixos-config branch from api-redis to master")
subprocess.check_output(["git", "checkout", "master"])
subprocess.check_output(["git", "pull"])
os.chdir(current_working_directory)
except subprocess.CalledProcessError:
os.chdir(current_working_directory)
print("Error")

View file

@ -1,43 +0,0 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import ReadUserData, WriteUserData
class CreateProviderFields(Migration):
"""Unhardcode providers"""
def get_migration_name(self):
return "create_provider_fields"
def get_migration_description(self):
return "Add DNS, backup and server provider fields to enable user to choose between different clouds and to make the deployment adapt to these preferences."
def is_migration_needed(self):
try:
with ReadUserData() as userdata:
return "dns" not in userdata
except Exception as e:
print(e)
return False
def migrate(self):
# Write info about providers to userdata.json
try:
with WriteUserData() as userdata:
userdata["dns"] = {
"provider": "CLOUDFLARE",
"apiKey": userdata["cloudflare"]["apiKey"],
}
userdata["server"] = {
"provider": "HETZNER",
}
userdata["backup"] = {
"provider": "BACKBLAZE",
"accountId": userdata["backblaze"]["accountId"],
"accountKey": userdata["backblaze"]["accountKey"],
"bucket": userdata["backblaze"]["bucket"],
}
print("Done")
except Exception as e:
print(e)
print("Error migrating provider fields")

View file

@ -1,48 +0,0 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
class LoadTokensToRedis(Migration):
"""Load Json tokens into Redis"""
def get_migration_name(self):
return "load_tokens_to_redis"
def get_migration_description(self):
return "Loads access tokens and recovery keys from legacy json file into redis token storage"
def is_repo_empty(self, repo: AbstractTokensRepository) -> bool:
if repo.get_tokens() != []:
return False
if repo.get_recovery_key() is not None:
return False
return True
def is_migration_needed(self):
try:
if not self.is_repo_empty(JsonTokensRepository()) and self.is_repo_empty(
RedisTokensRepository()
):
return True
except Exception as e:
print(e)
return False
def migrate(self):
# Write info about providers to userdata.json
try:
RedisTokensRepository().clone(JsonTokensRepository())
print("Done")
except Exception as e:
print(e)
print("Error migrating access tokens from json to redis")

View file

@ -0,0 +1,63 @@
from datetime import datetime
from typing import Optional
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from selfprivacy_api.utils import ReadUserData, UserDataFiles
class WriteTokenToRedis(Migration):
"""Load Json tokens into Redis"""
def get_migration_name(self):
return "write_token_to_redis"
def get_migration_description(self):
return "Loads the initial token into redis token storage"
def is_repo_empty(self, repo: AbstractTokensRepository) -> bool:
if repo.get_tokens() != []:
return False
return True
def get_token_from_json(self) -> Optional[Token]:
try:
with ReadUserData(UserDataFiles.SECRETS) as userdata:
return Token(
token=userdata["api"]["token"],
device_name="Initial device",
created_at=datetime.now(),
)
except Exception as e:
print(e)
return None
def is_migration_needed(self):
try:
if self.get_token_from_json() is not None and self.is_repo_empty(
RedisTokensRepository()
):
return True
except Exception as e:
print(e)
return False
def migrate(self):
# Write info about providers to userdata.json
try:
token = self.get_token_from_json()
if token is None:
print("No token found in secrets.json")
return
RedisTokensRepository()._store_token(token)
print("Done")
except Exception as e:
print(e)
print("Error migrating access tokens from json to redis")

View file

@ -1,8 +0,0 @@
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
repository = JsonTokensRepository()

View file

@ -1,153 +0,0 @@
"""
temporary legacy
"""
from typing import Optional
from datetime import datetime, timezone
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
class JsonTokensRepository(AbstractTokensRepository):
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
tokens_list = []
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
tokens_list.append(
Token(
token=userdata_token["token"],
device_name=userdata_token["name"],
created_at=userdata_token["date"],
)
)
return tokens_list
def _store_token(self, new_token: Token):
"""Store a token directly"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["tokens"].append(
{
"token": new_token.token,
"name": new_token.device_name,
"date": new_token.created_at.strftime(DATETIME_FORMAT),
}
)
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == input_token.token:
tokens_file["tokens"].remove(userdata_token)
return
raise TokenNotFound("Token not found!")
def __key_date_from_str(self, date_string: str) -> datetime:
if date_string is None or date_string == "":
return None
# we assume that we store dates in json as naive utc
utc_no_tz = datetime.fromisoformat(date_string)
utc_with_tz = utc_no_tz.replace(tzinfo=timezone.utc)
return utc_with_tz
def __date_from_tokens_file(
self, tokens_file: object, tokenfield: str, datefield: str
):
date_string = tokens_file[tokenfield].get(datefield)
return self.__key_date_from_str(date_string)
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if (
"recovery_token" not in tokens_file
or tokens_file["recovery_token"] is None
):
return
recovery_key = RecoveryKey(
key=tokens_file["recovery_token"].get("token"),
created_at=self.__date_from_tokens_file(
tokens_file, "recovery_token", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "recovery_token", "expiration"
),
uses_left=tokens_file["recovery_token"].get("uses_left"),
)
return recovery_key
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
key_expiration: Optional[str] = None
if recovery_key.expires_at is not None:
key_expiration = recovery_key.expires_at.strftime(DATETIME_FORMAT)
tokens_file["recovery_token"] = {
"token": recovery_key.key,
"date": recovery_key.created_at.strftime(DATETIME_FORMAT),
"expiration": key_expiration,
"uses_left": recovery_key.uses_left,
}
def _decrement_recovery_token(self):
"""Decrement recovery key use count by one"""
if self.is_recovery_key_valid():
with WriteUserData(UserDataFiles.TOKENS) as tokens:
if tokens["recovery_token"]["uses_left"] is not None:
tokens["recovery_token"]["uses_left"] -= 1
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
if "recovery_token" in tokens_file:
del tokens_file["recovery_token"]
return
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["new_device"] = {
"token": new_device_key.key,
"date": new_device_key.created_at.strftime(DATETIME_FORMAT),
"expiration": new_device_key.expires_at.strftime(DATETIME_FORMAT),
}
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
if "new_device" in tokens_file:
del tokens_file["new_device"]
return
def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]:
"""Retrieves new device key that is already stored."""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if "new_device" not in tokens_file or tokens_file["new_device"] is None:
return
new_device_key = NewDeviceKey(
key=tokens_file["new_device"]["token"],
created_at=self.__date_from_tokens_file(
tokens_file, "new_device", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "new_device", "expiration"
),
)
return new_device_key

View file

@ -3,7 +3,7 @@
import typing
from selfprivacy_api.services.bitwarden import Bitwarden
from selfprivacy_api.services.gitea import Gitea
from selfprivacy_api.services.jitsi import Jitsi
from selfprivacy_api.services.jitsimeet import JitsiMeet
from selfprivacy_api.services.mailserver import MailServer
from selfprivacy_api.services.nextcloud import Nextcloud
from selfprivacy_api.services.pleroma import Pleroma
@ -18,7 +18,7 @@ services: list[Service] = [
Nextcloud(),
Pleroma(),
Ocserv(),
Jitsi(),
JitsiMeet(),
]

View file

@ -244,9 +244,11 @@ def move_service(
progress=95,
)
with WriteUserData() as user_data:
if userdata_location not in user_data:
user_data[userdata_location] = {}
user_data[userdata_location]["location"] = volume.name
if "modules" not in user_data:
user_data["modules"] = {}
if userdata_location not in user_data["modules"]:
user_data["modules"][userdata_location] = {}
user_data["modules"][userdata_location]["location"] = volume.name
# Start service
service.start()
Jobs.update(

View file

@ -1,4 +1,4 @@
"""Class representing Jitsi service"""
"""Class representing Jitsi Meet service"""
import base64
import subprocess
import typing
@ -11,26 +11,26 @@ from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceS
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.jitsi.icon import JITSI_ICON
from selfprivacy_api.services.jitsimeet.icon import JITSI_ICON
class Jitsi(Service):
class JitsiMeet(Service):
"""Class representing Jitsi service"""
@staticmethod
def get_id() -> str:
"""Return service id."""
return "jitsi"
return "jitsi-meet"
@staticmethod
def get_display_name() -> str:
"""Return service display name."""
return "Jitsi"
return "JitsiMeet"
@staticmethod
def get_description() -> str:
"""Return service description."""
return "Jitsi is a free and open-source video conferencing solution."
return "Jitsi Meet is a free and open-source video conferencing solution."
@staticmethod
def get_svg_icon() -> str:
@ -123,4 +123,4 @@ class Jitsi(Service):
]
def move_to_volume(self, volume: BlockDevice) -> Job:
raise NotImplementedError("jitsi service is not movable")
raise NotImplementedError("jitsi-meet service is not movable")

View file

@ -21,7 +21,7 @@ class MailServer(Service):
@staticmethod
def get_id() -> str:
return "email"
return "simple-nixos-mailserver"
@staticmethod
def get_display_name() -> str:
@ -173,7 +173,7 @@ class MailServer(Service):
volume,
job,
FolderMoveNames.default_foldermoves(self),
"email",
"simple-nixos-mailserver",
)
return job

View file

@ -225,9 +225,13 @@ class Service(ABC):
return root_device
with utils.ReadUserData() as userdata:
if userdata.get("useBinds", False):
return userdata.get(cls.get_id(), {}).get(
"location",
root_device,
return (
userdata.get("modules", {})
.get(cls.get_id(), {})
.get(
"location",
root_device,
)
)
else:
return root_device

View file

@ -9,10 +9,8 @@ import portalocker
import typing
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
JOBS_FILE = "/etc/nixos/userdata/jobs.json"
DOMAIN_FILE = "/var/domain"
USERDATA_FILE = "/etc/nixos/userdata.json"
SECRETS_FILE = "/etc/selfprivacy/secrets.json"
DKIM_DIR = "/var/dkim/"
@ -20,15 +18,13 @@ class UserDataFiles(Enum):
"""Enum for userdata files"""
USERDATA = 0
TOKENS = 1
JOBS = 2
SECRETS = 3
def get_domain():
"""Get domain from /var/domain without trailing new line"""
with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file:
domain = domain_file.readline().rstrip()
return domain
"""Get domain from userdata.json"""
with ReadUserData() as user_data:
return user_data["domain"]
class WriteUserData(object):
@ -37,14 +33,12 @@ class WriteUserData(object):
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8")
elif file_type == UserDataFiles.JOBS:
elif file_type == UserDataFiles.SECRETS:
# Make sure file exists
if not os.path.exists(JOBS_FILE):
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
jobs_file.write("{}")
self.userdata_file = open(JOBS_FILE, "r+", encoding="utf-8")
if not os.path.exists(SECRETS_FILE):
with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file:
secrets_file.write("{}")
self.userdata_file = open(SECRETS_FILE, "r+", encoding="utf-8")
else:
raise ValueError("Unknown file type")
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
@ -68,14 +62,11 @@ class ReadUserData(object):
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.JOBS:
# Make sure file exists
if not os.path.exists(JOBS_FILE):
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
jobs_file.write("{}")
self.userdata_file = open(JOBS_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.SECRETS:
if not os.path.exists(SECRETS_FILE):
with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file:
secrets_file.write("{}")
self.userdata_file = open(SECRETS_FILE, "r", encoding="utf-8")
else:
raise ValueError("Unknown file type")
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)

View file

@ -2,14 +2,15 @@
import os
from huey import SqliteHuey
HUEY_DATABASE = "/etc/nixos/userdata/tasks.db"
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
# Singleton instance containing the huey database.
test_mode = os.environ.get("TEST_MODE")
huey = SqliteHuey(
HUEY_DATABASE,
"selfprivacy-api",
filename=HUEY_DATABASE if not test_mode else None,
immediate=test_mode == "true",
utc=True,
)

View file

@ -1,48 +0,0 @@
{ pkgs ? import <nixos-22.11> { } }:
let
sp-python = pkgs.python310.withPackages (p: with p; [
setuptools
portalocker
pytz
pytest
pytest-mock
pytest-datadir
huey
gevent
mnemonic
coverage
pylint
rope
mypy
pylsp-mypy
pydantic
typing-extensions
psutil
black
fastapi
uvicorn
redis
strawberry-graphql
flake8-bugbear
flake8
]);
in
pkgs.mkShell {
buildInputs = [
sp-python
pkgs.black
pkgs.redis
pkgs.restic
pkgs.rclone
];
shellHook = ''
PYTHONPATH=${sp-python}/${sp-python.sitePackages}
# envs set with export and as attributes are treated differently.
# for example. printenv <Name> will not fetch the value of an attribute.
export USE_REDIS_PORT=6379
pkill redis-server
sleep 2
setsid redis-server --bind 127.0.0.1 --port $USE_REDIS_PORT >/dev/null 2>/dev/null &
# maybe set more env-vars
'';
}

View file

@ -7,28 +7,28 @@ RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
def five_minutes_into_future_naive():
return datetime.now() + timedelta(minutes=5)
def ten_minutes_into_future_naive():
return datetime.now() + timedelta(minutes=10)
def five_minutes_into_future_naive_utc():
return datetime.utcnow() + timedelta(minutes=5)
def ten_minutes_into_future_naive_utc():
return datetime.utcnow() + timedelta(minutes=10)
def five_minutes_into_future():
return datetime.now(timezone.utc) + timedelta(minutes=5)
def ten_minutes_into_future():
return datetime.now(timezone.utc) + timedelta(minutes=10)
def five_minutes_into_past_naive():
return datetime.now() - timedelta(minutes=5)
def ten_minutes_into_past_naive():
return datetime.now() - timedelta(minutes=10)
def five_minutes_into_past_naive_utc():
return datetime.utcnow() - timedelta(minutes=5)
def ten_minutes_into_past_naive_utc():
return datetime.utcnow() - timedelta(minutes=10)
def five_minutes_into_past():
return datetime.now(timezone.utc) - timedelta(minutes=5)
def ten_minutes_into_past():
return datetime.now(timezone.utc) - timedelta(minutes=10)
class NearFuture(datetime):

View file

@ -9,6 +9,7 @@ from os import path
from os import makedirs
from typing import Generator
from fastapi.testclient import TestClient
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.utils.huey import huey
@ -16,22 +17,14 @@ import selfprivacy_api.services as services
from selfprivacy_api.services import get_service_by_id, Service
from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from tests.common import read_json
TESTFILE_BODY = "testytest!"
TESTFILE_2_BODY = "testissimo!"
EMPTY_TOKENS_JSON = ' {"tokens": []}'
TOKENS_FILE_CONTENTS = {
"tokens": [
{
@ -47,6 +40,19 @@ TOKENS_FILE_CONTENTS = {
]
}
TOKENS = [
Token(
token="TEST_TOKEN",
device_name="test_token",
created_at=datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
),
Token(
token="TEST_TOKEN2",
device_name="test_token2",
created_at=datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
),
]
DEVICE_WE_AUTH_TESTS_WITH = TOKENS_FILE_CONTENTS["tokens"][0]
@ -58,25 +64,6 @@ def global_data_dir():
return path.join(path.dirname(__file__), "data")
@pytest.fixture
def empty_tokens(mocker, tmpdir):
tokenfile = tmpdir / "empty_tokens.json"
with open(tokenfile, "w") as file:
file.write(EMPTY_TOKENS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile)
assert read_json(tokenfile)["tokens"] == []
return tmpdir
@pytest.fixture
def empty_json_repo(empty_tokens):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()
@ -86,25 +73,14 @@ def empty_redis_repo():
@pytest.fixture
def tokens_file(empty_redis_repo, tmpdir):
"""A state with tokens"""
repo = empty_redis_repo
for token in TOKENS_FILE_CONTENTS["tokens"]:
repo._store_token(
Token(
token=token["token"],
device_name=token["name"],
created_at=token["date"],
)
)
return repo
@pytest.fixture
def jobs_file(mocker, shared_datadir):
"""Mock tokens file."""
mock = mocker.patch("selfprivacy_api.utils.JOBS_FILE", shared_datadir / "jobs.json")
return mock
def redis_repo_with_tokens():
repo = RedisTokensRepository()
repo.reset()
for token in TOKENS:
repo._store_token(token)
assert sorted(repo.get_tokens(), key=lambda x: x.token) == sorted(
TOKENS, key=lambda x: x.token
)
@pytest.fixture
@ -131,14 +107,14 @@ def huey_database(mocker, shared_datadir):
@pytest.fixture
def client(tokens_file, huey_database, jobs_file):
def client(huey_database, redis_repo_with_tokens):
from selfprivacy_api.app import app
return TestClient(app)
@pytest.fixture
def authorized_client(tokens_file, huey_database, jobs_file):
def authorized_client(huey_database, redis_repo_with_tokens):
"""Authorized test client fixture."""
from selfprivacy_api.app import app
@ -150,7 +126,7 @@ def authorized_client(tokens_file, huey_database, jobs_file):
@pytest.fixture
def wrong_auth_client(tokens_file, huey_database, jobs_file):
def wrong_auth_client(huey_database, redis_repo_with_tokens):
"""Wrong token test client fixture."""
from selfprivacy_api.app import app

View file

@ -1 +0,0 @@
{}

View file

@ -1,14 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View file

@ -1,40 +1,20 @@
{
"api": {"token": "TEST_TOKEN", "enableSwagger": false},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"server": {
"provider": "HETZNER"
},
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": ["ssh-ed25519 KEY test@pc"]
},
"timezone": "Etc/UTC",
"username": "tester",
"autoUpgrade": {"enable": true, "allowReboot": true},
"useBinds": true,
"timezone": "Europe/Moscow",
"sshKeys": ["ssh-rsa KEY test@pc"],
"dns": {"provider": "CLOUDFLARE", "apiKey": "TOKEN"},
"server": {"provider": "HETZNER"},
"modules": {
"bitwarden": {"enable": true},
"gitea": {"enable": true},
"ocserv": {"enable": true},
"pleroma": {"enable": true},
"jitsi": {"enable": true},
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
}
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [
{
"username": "user1",
@ -51,5 +31,57 @@
"hashedPassword": "HASHED_PASSWORD_3",
"sshKeys": ["ssh-rsa KEY user3@pc"]
}
]
],
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -67,7 +67,7 @@ def only_root_in_userdata(mocker, datadir):
read_json(datadir / "only_root.json")["volumes"][0]["mountPoint"]
== "/volumes/sda1"
)
assert read_json(datadir / "only_root.json")["volumes"][0]["filesystem"] == "ext4"
assert read_json(datadir / "only_root.json")["volumes"][0]["fsType"] == "ext4"
return datadir

View file

@ -1,59 +1,59 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": true
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"resticPassword": "PASS",
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"volumes": [],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"volumes": [
]
}
}

View file

@ -1,64 +1,65 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": true
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"resticPassword": "PASS",
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"volumes": [
{
"device": "/dev/sda1",
"mountPoint": "/volumes/sda1",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"volumes": [
{
"device": "/dev/sda1",
"mountPoint": "/volumes/sda1",
"filesystem": "ext4"
}
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -1,57 +1,58 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": true
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -12,7 +12,7 @@ DKIM_FILE_CONTENT = b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p
@pytest.fixture
def dkim_file(mocker, domain_file, tmpdir):
def dkim_file(mocker, tmpdir, generic_userdata):
domain = get_domain()
assert domain is not None
assert domain != ""
@ -27,14 +27,6 @@ def dkim_file(mocker, domain_file, tmpdir):
return dkim_path
@pytest.fixture
def domain_file(mocker):
# TODO: move to conftest. Challenge: it does not behave with "/" like pytest datadir does
domain_path = path.join(global_data_dir(), "domain")
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", domain_path)
return domain_path
@pytest.fixture
def no_dkim_file(dkim_file):
os.remove(dkim_file)
@ -45,7 +37,7 @@ def no_dkim_file(dkim_file):
###############################################################################
def test_get_dkim_key(domain_file, dkim_file):
def test_get_dkim_key(dkim_file):
"""Test DKIM key"""
dkim_key = get_dkim_key("test-domain.tld")
assert (
@ -54,7 +46,7 @@ def test_get_dkim_key(domain_file, dkim_file):
)
def test_no_dkim_key(domain_file, no_dkim_file):
def test_no_dkim_key(no_dkim_file):
"""Test no DKIM key"""
dkim_key = get_dkim_key("test-domain.tld")
assert dkim_key is None

View file

@ -9,7 +9,7 @@ from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY
from tests.test_graphql.test_api_version import API_VERSION_QUERY
def test_graphql_get_entire_api_data(authorized_client, tokens_file):
def test_graphql_get_entire_api_data(authorized_client):
response = authorized_client.post(
"/graphql",
json={

View file

@ -7,7 +7,7 @@ from tests.common import (
NearFuture,
generate_api_query,
)
from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH, TOKENS_FILE_CONTENTS
from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH
from tests.test_graphql.common import (
get_data,
assert_empty,
@ -66,11 +66,11 @@ def graphql_authorize_new_device(client, mnemonic_key, device_name) -> str:
return token
def test_graphql_tokens_info(authorized_client, tokens_file):
def test_graphql_tokens_info(authorized_client):
assert_original(authorized_client)
def test_graphql_tokens_info_unauthorized(client, tokens_file):
def test_graphql_tokens_info_unauthorized(client):
response = request_devices(client)
assert_empty(response)
@ -88,7 +88,7 @@ mutation DeleteToken($device: String!) {
"""
def test_graphql_delete_token_unauthorized(client, tokens_file):
def test_graphql_delete_token_unauthorized(client):
response = client.post(
"/graphql",
json={
@ -101,7 +101,7 @@ def test_graphql_delete_token_unauthorized(client, tokens_file):
assert_empty(response)
def test_graphql_delete_token(authorized_client, tokens_file):
def test_graphql_delete_token(authorized_client):
test_devices = ORIGINAL_DEVICES.copy()
device_to_delete = test_devices.pop(1)
assert device_to_delete != DEVICE_WE_AUTH_TESTS_WITH
@ -121,7 +121,7 @@ def test_graphql_delete_token(authorized_client, tokens_file):
assert_same(devices, test_devices)
def test_graphql_delete_self_token(authorized_client, tokens_file):
def test_graphql_delete_self_token(authorized_client):
response = authorized_client.post(
"/graphql",
json={
@ -137,7 +137,6 @@ def test_graphql_delete_self_token(authorized_client, tokens_file):
def test_graphql_delete_nonexistent_token(
authorized_client,
tokens_file,
):
response = authorized_client.post(
"/graphql",
@ -167,7 +166,7 @@ mutation RefreshToken {
"""
def test_graphql_refresh_token_unauthorized(client, tokens_file):
def test_graphql_refresh_token_unauthorized(client):
response = client.post(
"/graphql",
json={"query": REFRESH_TOKEN_MUTATION},
@ -175,7 +174,7 @@ def test_graphql_refresh_token_unauthorized(client, tokens_file):
assert_empty(response)
def test_graphql_refresh_token(authorized_client, client, tokens_file):
def test_graphql_refresh_token(authorized_client, client):
caller_name_and_date = graphql_get_caller_token_info(authorized_client)
response = authorized_client.post(
"/graphql",
@ -206,7 +205,6 @@ mutation NewDeviceKey {
def test_graphql_get_new_device_auth_key_unauthorized(
client,
tokens_file,
):
response = client.post(
"/graphql",
@ -230,7 +228,6 @@ mutation InvalidateNewDeviceKey {
def test_graphql_invalidate_new_device_token_unauthorized(
client,
tokens_file,
):
response = client.post(
"/graphql",
@ -244,7 +241,7 @@ def test_graphql_invalidate_new_device_token_unauthorized(
assert_empty(response)
def test_graphql_get_and_delete_new_device_key(client, authorized_client, tokens_file):
def test_graphql_get_and_delete_new_device_key(client, authorized_client):
mnemonic_key = graphql_get_new_device_key(authorized_client)
response = authorized_client.post(
@ -271,7 +268,7 @@ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
"""
def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file):
def test_graphql_get_and_authorize_new_device(client, authorized_client):
mnemonic_key = graphql_get_new_device_key(authorized_client)
old_devices = graphql_get_devices(authorized_client)
@ -282,16 +279,14 @@ def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_
assert "new_device" in [device["name"] for device in new_devices]
def test_graphql_authorize_new_device_with_invalid_key(
client, authorized_client, tokens_file
):
def test_graphql_authorize_new_device_with_invalid_key(client, authorized_client):
response = graphql_try_auth_new_device(client, "invalid_token", "new_device")
assert_errorcode(get_data(response)["api"]["authorizeWithNewDeviceApiKey"], 404)
assert_original(authorized_client)
def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file):
def test_graphql_get_and_authorize_used_key(client, authorized_client):
mnemonic_key = graphql_get_new_device_key(authorized_client)
graphql_authorize_new_device(client, mnemonic_key, "new_device")
@ -304,7 +299,7 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi
def test_graphql_get_and_authorize_key_after_12_minutes(
client, authorized_client, tokens_file, mocker
client, authorized_client, mocker
):
mnemonic_key = graphql_get_new_device_key(authorized_client)
mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
@ -315,7 +310,6 @@ def test_graphql_get_and_authorize_key_after_12_minutes(
def test_graphql_authorize_without_token(
client,
tokens_file,
):
response = client.post(
"/graphql",

View file

@ -14,9 +14,9 @@ from tests.common import (
)
# Graphql API's output should be timezone-naive
from tests.common import five_minutes_into_future_naive_utc as five_minutes_into_future
from tests.common import five_minutes_into_future as five_minutes_into_future_tz
from tests.common import five_minutes_into_past_naive_utc as five_minutes_into_past
from tests.common import ten_minutes_into_future_naive_utc as ten_minutes_into_future
from tests.common import ten_minutes_into_future as ten_minutes_into_future_tz
from tests.common import ten_minutes_into_past_naive_utc as ten_minutes_into_past
from tests.test_graphql.common import (
assert_empty,
@ -111,12 +111,12 @@ def graphql_use_recovery_key(client, key, device_name):
return token
def test_graphql_recovery_key_status_unauthorized(client, tokens_file):
def test_graphql_recovery_key_status_unauthorized(client):
response = request_recovery_status(client)
assert_empty(response)
def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file):
def test_graphql_recovery_key_status_when_none_exists(authorized_client):
status = graphql_recovery_status(authorized_client)
assert status["exists"] is False
assert status["valid"] is False
@ -152,7 +152,7 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
"""
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
def test_graphql_generate_recovery_key(client, authorized_client):
key = graphql_make_new_recovery_key(authorized_client)
status = graphql_recovery_status(authorized_client)
@ -168,10 +168,10 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
@pytest.mark.parametrize(
"expiration_date", [five_minutes_into_future(), five_minutes_into_future_tz()]
"expiration_date", [ten_minutes_into_future(), ten_minutes_into_future_tz()]
)
def test_graphql_generate_recovery_key_with_expiration_date(
client, authorized_client, tokens_file, expiration_date: datetime
client, authorized_client, expiration_date: datetime
):
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
@ -192,10 +192,8 @@ def test_graphql_generate_recovery_key_with_expiration_date(
graphql_use_recovery_key(client, key, "new_test_token2")
def test_graphql_use_recovery_key_after_expiration(
client, authorized_client, tokens_file, mocker
):
expiration_date = five_minutes_into_future()
def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mocker):
expiration_date = ten_minutes_into_future()
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
# Timewarp to after it expires
@ -220,10 +218,8 @@ def test_graphql_use_recovery_key_after_expiration(
assert status["usesLeft"] is None
def test_graphql_generate_recovery_key_with_expiration_in_the_past(
authorized_client, tokens_file
):
expiration_date = five_minutes_into_past()
def test_graphql_generate_recovery_key_with_expiration_in_the_past(authorized_client):
expiration_date = ten_minutes_into_past()
response = request_make_new_recovery_key(
authorized_client, expires_at=expiration_date
)
@ -235,9 +231,7 @@ def test_graphql_generate_recovery_key_with_expiration_in_the_past(
assert graphql_recovery_status(authorized_client)["exists"] is False
def test_graphql_generate_recovery_key_with_invalid_time_format(
authorized_client, tokens_file
):
def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_client):
expiration_date = "invalid_time_format"
expiration_date_str = expiration_date
@ -256,10 +250,7 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(
assert graphql_recovery_status(authorized_client)["exists"] is False
def test_graphql_generate_recovery_key_with_limited_uses(
authorized_client, client, tokens_file
):
def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, client):
mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2)
status = graphql_recovery_status(authorized_client)
@ -292,9 +283,7 @@ def test_graphql_generate_recovery_key_with_limited_uses(
assert_errorcode(output, 404)
def test_graphql_generate_recovery_key_with_negative_uses(
authorized_client, tokens_file
):
def test_graphql_generate_recovery_key_with_negative_uses(authorized_client):
response = request_make_new_recovery_key(authorized_client, uses=-1)
output = get_data(response)["api"]["getNewRecoveryApiKey"]
@ -303,7 +292,7 @@ def test_graphql_generate_recovery_key_with_negative_uses(
assert graphql_recovery_status(authorized_client)["exists"] is False
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file):
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client):
response = request_make_new_recovery_key(authorized_client, uses=0)
output = get_data(response)["api"]["getNewRecoveryApiKey"]

View file

@ -503,7 +503,7 @@ def test_move_same_volume(authorized_client, dummy_service):
def test_mailservice_cannot_enable_disable(authorized_client):
mailservice = get_service_by_id("email")
mailservice = get_service_by_id("simple-nixos-mailserver")
mutation_response = api_enable(authorized_client, mailservice)
data = get_data(mutation_response)["services"]["enableService"]

View file

@ -308,7 +308,6 @@ original_settings = [
def test_graphql_readwrite_ssh_settings(
authorized_client, some_users, settings, original_settings
):
# Userdata-related tests like undefined fields are in actions-level tests.
output = api_set_ssh_settings_dict(authorized_client, original_settings)
assert_includes(api_ssh_settings(authorized_client), output)
@ -334,7 +333,6 @@ forbidden_settings = [
def test_graphql_readwrite_ssh_settings_partial(
authorized_client, some_users, settings, original_settings
):
output = api_set_ssh_settings_dict(authorized_client, original_settings)
with pytest.raises(Exception):
output = api_set_ssh_settings_dict(authorized_client, settings)

View file

@ -1,43 +1,17 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": false
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
@ -60,17 +34,50 @@
"hashedPassword": "HASHED_PASSWORD_3"
}
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"server": {
"provider": "HETZNER"
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
}
}

View file

@ -9,18 +9,12 @@ from tests.test_graphql.common import assert_empty
from tests.test_dkim import no_dkim_file, dkim_file
@pytest.fixture
def domain_file(mocker, datadir):
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
return datadir
@pytest.fixture
def turned_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["autoUpgrade"]["enable"] == True
assert read_json(datadir / "turned_on.json")["autoUpgrade"]["allowReboot"] == True
assert read_json(datadir / "turned_on.json")["timezone"] == "Europe/Moscow"
assert read_json(datadir / "turned_on.json")["timezone"] == "Etc/UTC"
return datadir
@ -29,7 +23,7 @@ def turned_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["autoUpgrade"]["enable"] == False
assert read_json(datadir / "turned_off.json")["autoUpgrade"]["allowReboot"] == False
assert read_json(datadir / "turned_off.json")["timezone"] == "Europe/Moscow"
assert read_json(datadir / "turned_off.json")["timezone"] == "Etc/UTC"
return datadir
@ -251,7 +245,7 @@ def is_dns_record_in_array(records, dns_record) -> bool:
def test_graphql_get_domain(
authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
authorized_client, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
):
"""Test get domain"""
response = authorized_client.post(
@ -262,7 +256,9 @@ def test_graphql_get_domain(
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["domainInfo"]["domain"] == "test.tld"
assert (
response.json()["data"]["system"]["domainInfo"]["domain"] == "test-domain.tld"
)
assert (
response.json()["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
)
@ -335,7 +331,6 @@ def test_graphql_get_domain(
def test_graphql_get_domain_no_dkim(
authorized_client,
domain_file,
mock_get_ip4,
mock_get_ip6,
no_dkim_file,
@ -384,7 +379,7 @@ def test_graphql_get_timezone(authorized_client, turned_on):
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["settings"]["timezone"] == "Europe/Moscow"
assert response.json()["data"]["system"]["settings"]["timezone"] == "Etc/UTC"
def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config):
@ -397,9 +392,7 @@ def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config):
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["system"]["settings"]["timezone"] == "Europe/Uzhgorod"
)
assert response.json()["data"]["system"]["settings"]["timezone"] == "Etc/UTC"
API_CHANGE_TIMEZONE_MUTATION = """
@ -423,7 +416,7 @@ def test_graphql_change_timezone_unauthorized(client, turned_on):
json={
"query": API_CHANGE_TIMEZONE_MUTATION,
"variables": {
"timezone": "Europe/Moscow",
"timezone": "Etc/UTC",
},
},
)
@ -495,7 +488,7 @@ def test_graphql_change_timezone_without_timezone(authorized_client, turned_on):
assert response.json()["data"]["system"]["changeTimezone"]["message"] is not None
assert response.json()["data"]["system"]["changeTimezone"]["code"] == 400
assert response.json()["data"]["system"]["changeTimezone"]["timezone"] is None
assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow"
assert read_json(turned_on / "turned_on.json")["timezone"] == "Etc/UTC"
def test_graphql_change_timezone_with_invalid_timezone(authorized_client, turned_on):
@ -515,7 +508,7 @@ def test_graphql_change_timezone_with_invalid_timezone(authorized_client, turned
assert response.json()["data"]["system"]["changeTimezone"]["message"] is not None
assert response.json()["data"]["system"]["changeTimezone"]["code"] == 400
assert response.json()["data"]["system"]["changeTimezone"]["timezone"] is None
assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow"
assert read_json(turned_on / "turned_on.json")["timezone"] == "Etc/UTC"
API_GET_AUTO_UPGRADE_SETTINGS_QUERY = """

View file

@ -1 +0,0 @@
test-domain.tld

View file

@ -1,55 +1,62 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": true
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"autoUpgrade": {},
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"resticPassword": "PASS",
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -1,57 +1,65 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": true
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"autoUpgrade": {
"enable": false,
"allowReboot": false
},
"resticPassword": "PASS",
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": false,
"allowReboot": false
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -1,62 +1,65 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"server": {
"provider": "HETZNER"
},
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"resticPassword": "PASS",
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"modules": {
"gitea": {
"enable": true
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"jitsi": {
"enable": true
},
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"bitwarden": {
"enable": true
}
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -1,52 +1,60 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": true
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"resticPassword": "PASS",
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -4,12 +4,6 @@
import pytest
@pytest.fixture
def domain_file(mocker, datadir):
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
return datadir
class ProcessMock:
"""Mock subprocess.Popen"""

View file

@ -254,7 +254,6 @@ def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_pop
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
response = authorized_client.post(
"/graphql",
json={
@ -275,7 +274,6 @@ def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen
def test_graphql_get_some_user_undefined(authorized_client, undefined_settings):
response = authorized_client.post(
"/graphql",
json={

View file

@ -1,59 +1,65 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": false
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [],
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"resticPassword": "PASS",
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -1,43 +1,17 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": false
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"timezone": "Etc/UTC",
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
@ -50,17 +24,50 @@
]
}
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"server": {
"provider": "HETZNER"
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
}
}

View file

@ -1,43 +1,17 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": false
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"timezone": "Etc/UTC",
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
@ -60,17 +34,50 @@
"hashedPassword": "HASHED_PASSWORD_3"
}
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"server": {
"provider": "HETZNER"
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
}
}

View file

@ -1,57 +1,64 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
"dns": {
"provider": "CLOUDFLARE",
"useStagingACME": false
},
"bitwarden": {
"enable": false
"server": {
"provider": "HETZNER"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"domain": "test-domain.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
"timezone": "Etc/UTC",
"username": "tester",
"useBinds": true,
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"resticPassword": "PASS",
"modules": {
"bitwarden": {
"enable": true,
"location": "sdb"
},
"gitea": {
"enable": true,
"location": "sdb"
},
"jitsi-meet": {
"enable": true
},
"nextcloud": {
"enable": true,
"location": "sdb"
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true,
"location": "sdb"
},
"simple-nixos-mailserver": {
"enable": true,
"location": "sdb"
}
},
"volumes": [
{
"device": "/dev/sdb",
"mountPoint": "/volumes/sdb",
"fsType": "ext4"
}
],
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"dns": {
"provider": "CLOUDFLARE",
"apiKey": "TOKEN"
},
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
}
}

View file

@ -1,60 +0,0 @@
import pytest
from selfprivacy_api.migrations.modules_in_json import CreateModulesField
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.services import get_all_services
@pytest.fixture()
def stray_services(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "strays.json")
return datadir
@pytest.fixture()
def empty_json(generic_userdata):
with WriteUserData() as data:
data.clear()
with ReadUserData() as data:
assert len(data.keys()) == 0
return
def test_modules_empty_json(empty_json):
with ReadUserData() as data:
assert "modules" not in data.keys()
assert CreateModulesField().is_migration_needed()
CreateModulesField().migrate()
assert not CreateModulesField().is_migration_needed()
with ReadUserData() as data:
assert "modules" in data.keys()
@pytest.mark.parametrize("modules_field", [True, False])
def test_modules_stray_services(modules_field, stray_services):
if not modules_field:
with WriteUserData() as data:
del data["modules"]
assert CreateModulesField().is_migration_needed()
CreateModulesField().migrate()
for service in get_all_services():
# assumes we do not tolerate previous format
assert service.is_enabled()
if service.get_id() == "email":
continue
with ReadUserData() as data:
assert service.get_id() in data["modules"].keys()
assert service.get_id() not in data.keys()
assert not CreateModulesField().is_migration_needed()
def test_modules_no_migration_on_generic_data(generic_userdata):
assert not CreateModulesField().is_migration_needed()

View file

@ -1,23 +0,0 @@
{
"bitwarden": {
"enable": true
},
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"gitea": {
"enable": true
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"jitsi": {
"enable": true
},
"modules": {}
}

View file

@ -1,245 +0,0 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
"""
tests that restrict json token repository implementation
"""
import pytest
from datetime import datetime
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
RecoveryKeyNotFound,
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from tests.common import read_json
from test_tokens_repository import (
mock_recovery_key_generate,
mock_generate_token,
mock_new_device_key_generate,
)
ORIGINAL_TOKEN_CONTENT = [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
EMPTY_KEYS_JSON = """
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}
"""
@pytest.fixture
def tokens(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
return datadir
@pytest.fixture
def empty_keys(mocker, tmpdir):
tokens_file = tmpdir / "empty_keys.json"
with open(tokens_file, "w") as file:
file.write(EMPTY_KEYS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file)
assert read_json(tokens_file)["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return tmpdir
@pytest.fixture
def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")
assert read_json(datadir / "null_keys.json")["recovery_token"] is None
assert read_json(datadir / "null_keys.json")["new_device"] is None
return datadir
def test_delete_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
repo.delete_token(input_token)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
def test_delete_not_found_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="imbadtoken",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
with pytest.raises(TokenNotFound):
assert repo.delete_token(input_token) is None
assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
def test_create_recovery_key(tokens, mock_recovery_key_generate):
repo = JsonTokensRepository()
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"token": "889bf49c1d3199d71a2e704718772bd53a422020334db051",
"date": "2022-07-15T17:41:31.675698",
"expiration": None,
"uses_left": 1,
}
def test_use_mnemonic_recovery_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
repo = JsonTokensRepository()
assert repo.use_mnemonic_recovery_key(
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
device_name="newdevice",
) == Token(
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
device_name="newdevice",
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"date": "2022-07-15 17:41:31.675698",
"name": "primary_token",
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
{
"date": "2022-11-14T06:06:32.777123",
"name": "newdevice",
"token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
},
]
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"date": "2022-11-11T11:48:54.228038",
"expiration": None,
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"uses_left": 1,
}
def test_get_new_device_key(tokens, mock_new_device_key_generate):
repo = JsonTokensRepository()
assert repo.get_new_device_key() is not None
assert read_json(tokens / "tokens.json")["new_device"] == {
"date": "2022-07-15T17:41:31.675698",
"expiration": "2022-07-15T17:41:31.675698",
"token": "43478d05b35e4781598acd76e33832bb",
}
def test_delete_new_device_key(tokens):
repo = JsonTokensRepository()
assert repo.delete_new_device_key() is None
assert "new_device" not in read_json(tokens / "tokens.json")
def test_delete_new_device_key_when_empty(empty_keys):
repo = JsonTokensRepository()
repo.delete_new_device_key()
assert "new_device" not in read_json(empty_keys / "empty_keys.json")
def test_use_mnemonic_new_device_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(NewDeviceKeyNotFound):
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
)
is None
)

View file

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View file

@ -1,26 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z"
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z"
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698"
}
],
"recovery_token": null,
"new_device": null
}

View file

@ -1,35 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z"
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z"
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698"
}
],
"recovery_token": {
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"date": "2022-11-11T11:48:54.228038",
"expiration": null,
"uses_left": 2
},
"new_device": {
"token": "2237238de23dc71ab558e317bdb8ff8e",
"date": "2022-10-26 20:50:47.973212",
"expiration": "2022-10-26 21:00:47.974153"
}
}

View file

@ -17,9 +17,6 @@ from selfprivacy_api.repositories.tokens.exceptions import (
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
@ -27,7 +24,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from tests.common import five_minutes_into_past, five_minutes_into_future
from tests.common import ten_minutes_into_past, ten_minutes_into_future
ORIGINAL_DEVICE_NAMES = [
@ -133,10 +130,8 @@ def mock_recovery_key_generate(mocker):
return mock
@pytest.fixture(params=["json", "redis"])
def empty_repo(request, empty_json_repo, empty_redis_repo):
if request.param == "json":
return empty_json_repo
@pytest.fixture(params=["redis"])
def empty_repo(request, empty_redis_repo):
if request.param == "redis":
return empty_redis_repo
# return empty_json_repo
@ -363,7 +358,7 @@ def test_use_mnemonic_expired_recovery_key(
some_tokens_repo,
):
repo = some_tokens_repo
expiration = five_minutes_into_past()
expiration = ten_minutes_into_past()
assert repo.create_recovery_key(uses_left=2, expiration=expiration) is not None
recovery_key = repo.get_recovery_key()
# TODO: do not ignore timezone once json backend is deleted
@ -543,7 +538,7 @@ def test_use_mnemonic_expired_new_device_key(
some_tokens_repo,
):
repo = some_tokens_repo
expiration = five_minutes_into_past()
expiration = ten_minutes_into_past()
key = repo.get_new_device_key()
assert key is not None
@ -582,24 +577,3 @@ def assert_identical(
assert token in tokens_b
assert repo_a.get_recovery_key() == repo_b.get_recovery_key()
assert repo_a._get_stored_new_device_key() == repo_b._get_stored_new_device_key()
def clone_to_redis(repo: JsonTokensRepository):
other_repo = RedisTokensRepository()
other_repo.clone(repo)
assert_identical(repo, other_repo)
# we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist
def test_clone_json_to_redis_empty(empty_repo):
repo = empty_repo
if isinstance(repo, JsonTokensRepository):
clone_to_redis(repo)
def test_clone_json_to_redis_full(some_tokens_repo):
repo = some_tokens_repo
if isinstance(repo, JsonTokensRepository):
repo.get_new_device_key()
repo.create_recovery_key(five_minutes_into_future(), 2)
clone_to_redis(repo)

View file

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View file

@ -1,26 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z"
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z"
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698"
}
],
"recovery_token": null,
"new_device": null
}

View file

@ -1,35 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z"
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z"
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698"
}
],
"recovery_token": {
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"date": "2022-11-11T11:48:54.228038",
"expiration": null,
"uses_left": 2
},
"new_device": {
"token": "2237238de23dc71ab558e317bdb8ff8e",
"date": "2022-10-26 20:50:47.973212",
"expiration": "2022-10-26 21:00:47.974153"
}
}

View file

@ -19,7 +19,7 @@ from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService
from selfprivacy_api.services import get_enabled_services
from tests.test_dkim import domain_file, dkim_file, no_dkim_file
from tests.test_dkim import dkim_file, no_dkim_file
def test_unimplemented_folders_raises():

View file

@ -100,7 +100,7 @@ def test_read_json(possibly_undefined_ssh_settings):
assert get_ssh_settings().enable == data["ssh"]["enable"]
if "passwordAuthentication" not in data["ssh"].keys():
assert get_ssh_settings().passwordAuthentication is True
assert get_ssh_settings().passwordAuthentication is False
else:
assert (
get_ssh_settings().passwordAuthentication
@ -111,7 +111,6 @@ def test_read_json(possibly_undefined_ssh_settings):
def test_enabling_disabling_writes_json(
possibly_undefined_ssh_settings, ssh_enable_spectrum, password_auth_spectrum
):
original_enable = get_raw_json_ssh_setting("enable")
original_password_auth = get_raw_json_ssh_setting("passwordAuthentication")
@ -352,7 +351,6 @@ def test_read_user_keys_from_json(generic_userdata, username):
@pytest.mark.parametrize("username", regular_users)
def test_adding_user_key_writes_json(generic_userdata, username):
with WriteUserData() as data:
user_index = find_user_index_in_json_users(data["users"], username)
del data["users"][user_index]["sshKeys"]

View file

@ -1,5 +1,6 @@
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.actions.users import delete_user
"""
A place for user storage tests and other user tests that are not Graphql-specific.
"""
@ -8,19 +9,19 @@ from selfprivacy_api.actions.users import delete_user
# It was born in order to not lose things that REST API tests checked for
# In the future, user storage tests that are not dependent on actual API (graphql or otherwise) go here.
def test_delete_user_writes_json(generic_userdata):
delete_user("user2")
with ReadUserData() as data:
assert data["users"] == [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": ["ssh-rsa KEY user1@pc"]
},
{
"username": "user3",
"hashedPassword": "HASHED_PASSWORD_3",
"sshKeys": ["ssh-rsa KEY user3@pc"]
}
]
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": ["ssh-rsa KEY user1@pc"],
},
{
"username": "user3",
"hashedPassword": "HASHED_PASSWORD_3",
"sshKeys": ["ssh-rsa KEY user3@pc"],
},
]