Compare commits

...

117 Commits

Author SHA1 Message Date
Inex Code 524adaa8bc add nix-collect-garbage endpoint (#112)
Continuation of the broken #21

Co-authored-by: dettlaff <dettlaff@riseup.net>
Co-authored-by: def <dettlaff@riseup.net>
Co-authored-by: Houkime <>
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/112
Reviewed-by: houkime <houkime@protonmail.com>
2024-05-01 16:10:39 +03:00
houkime 5e93e6499f Merge pull request 'redis-huey' (#84) from redis-huey into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/84
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-20 14:19:07 +02:00
houkime 3302fe2818 Merge pull request 'Censor out secret keys from backup error messages' (#108) from censor-errors into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/108
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-20 14:18:39 +02:00
Houkime 9ee72c1fcb test(huey): make timeout more so that vm gets it in time 2024-03-20 09:02:10 +00:00
Houkime 28556bd22d test(backups): move errored job checker into common test utils 2024-03-18 17:40:48 +00:00
Houkime c5b227226c fix(backups): do not rely on obscure behaviour 2024-03-18 17:33:45 +00:00
Inex Code 5ec677339b Merge pull request 'docs(api): add a CI badge' (#107) from ci-badge into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/107
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-18 19:28:31 +02:00
Houkime f2446dcee2 docs(api): add missing dollar sign 2024-03-18 19:28:20 +02:00
Houkime 97960f77f2 docs(api): use title case in README 2024-03-18 19:28:20 +02:00
Houkime 677ed27773 docs(api): add a CI badge 2024-03-18 19:28:20 +02:00
Houkime b40df670f8 fix(backups): censor out keys from error messages
We do not have any automated sending of errors to Selfprivacy
but it was inconvenient for people who want to send a
screenshot of their error.
2024-03-18 17:15:40 +00:00
Houkime b36701e31c style(api): enable pydantic support in mypy 2024-03-18 17:11:27 +00:00
Houkime b39558ea1f fix(backups): report error in the error field of the job 2024-03-18 17:00:55 +00:00
Houkime 6f38b2309f fix(huey): adapt to new VM test environment 2024-03-18 12:18:55 +00:00
Houkime baf7843349 test(huey): only import test task if it is a test 2024-03-18 12:18:55 +00:00
Houkime 8e48a5ad5f test(huey): add a scheduling test (expected-fails for now) 2024-03-18 12:18:55 +00:00
Houkime fde461b4b9 test(huey): test that redis socket connection works 2024-03-18 12:18:55 +00:00
Houkime 9954737791 use kill() instead of terminate in huey tests 2024-03-18 12:18:55 +00:00
Houkime 2b19633cbd test(huey): break out preparing the environment vars
I did it for testing redis socketing too, but I guess this will wait for
another time. Somehow it worked even without an actual redis socket and it was
creepy. Idk yet how one can best make redis to make sockets at arbitrary
temporary dirs without starting another redis.
2024-03-18 12:18:55 +00:00
Houkime 83592b7bf4 feature(huey): use RedisHuey 2024-03-18 12:18:55 +00:00
houkime efc6b47cfe Merge pull request 'rebuild-when-moving' (#101) from rebuild-when-moving into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/101
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-18 14:14:08 +02:00
Houkime b2edfe784a refactor(service): add return typing to DNSrecord conversion and comments 2024-03-18 11:44:53 +00:00
Houkime 6e29da4a4f test(service): test moving with rebuilding via fp 2024-03-18 11:32:02 +00:00
Houkime 12b2153b7c test(service): do not call bash needlessly (it screwed up with fp) 2024-03-18 11:32:02 +00:00
Houkime 8c8c9a51cc refactor(service): visually break down the move function a bit 2024-03-18 11:32:02 +00:00
Houkime fed5735b24 refactor(service): break out DNS records into a separate resolver field 2024-03-18 11:32:02 +00:00
Houkime b257d7f39e fix(service): FAILING TESTS, rebuild when moving 2024-03-18 11:32:02 +00:00
Houkime 70a0287794 refactor(service): move finishing the job out of moving function 2024-03-18 11:32:02 +00:00
Houkime 534d965cab refactor(service): break out sync rebuilding 2024-03-18 11:32:02 +00:00
Houkime f333e791e1 refactor(service): break out ServiceStatus and ServiceDNSRecord 2024-03-18 11:32:02 +00:00
houkime 962e8d5ca7 Merge pull request 'CI: run pytest and coverage tests inside ephemeral VM in the "builder" VM (nested)' (#103) from ci-vm-for-pytest into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/103
Reviewed-by: houkime <houkime@protonmail.com>
2024-03-18 12:07:54 +02:00
Alexander Tomokhov 5e29816c84 ci: delete USE_REDIS_PORT environment variable 2024-03-16 00:18:01 +04:00
Alexander Tomokhov 53ec774c90 flake: VM test: remove Redis service port number setting 2024-03-15 16:23:21 +04:00
Inex Code bda21b7507 fix: Mark md5 as not used for security 2024-03-15 16:14:31 +04:00
Inex Code 2d5ac51c06 fix: future mock are now more in the future 2024-03-15 16:14:31 +04:00
Alexander Tomokhov 61b9a00cea ci: run pytest and coverage as part of nix flake check in VM 2024-03-15 16:14:31 +04:00
houkime edcc7860e4 Merge pull request 'chore(api): update nixpkgs version and add a script to do it' (#104) from update-nixpkgs into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/104
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-15 13:07:08 +02:00
Houkime 64da8503dd chore(api): update nixpkgs version and add a script to do it 2024-03-15 11:01:34 +00:00
houkime d464f3b82d Merge pull request 'flake VM: add additional /dev/vdb disk with empty ext4 FS' (#102) from vm-disk into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/102
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
Reviewed-by: houkime <houkime@protonmail.com>
2024-03-15 11:42:37 +02:00
Alexander Tomokhov bddc6d1831 flake: VM: add one more disk (/dev/vdc) volume with empty ext4 FS 2024-03-14 07:07:23 +04:00
Alexander Tomokhov 5d01c25f3b flake: VM: add additional disk with empty ext4 FS 2024-03-08 14:43:31 +04:00
Alexander Tomokhov 69774ba186 flake: small optimization: mkShell => mkShellNoCC 2024-03-08 14:43:31 +04:00
Inex Code 1f1fcc223b fix: division by zero 2024-03-07 23:29:37 +03:00
Inex Code a543f6da2a chore: Bump version to 3.1.0 2024-03-07 23:12:45 +03:00
Inex Code cf2f153cfe Merge pull request 'feat: Basic tracking of the NixOS rebuilds' (#98) from system-rebuild-tracking into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/98
Reviewed-by: houkime <houkime@protonmail.com>
2024-03-06 18:12:21 +02:00
Inex Code 0eff0ef735 fix: move_service task path 2024-03-06 18:43:55 +03:00
Houkime 7dae81530e test(services): clean up tests 2024-03-06 18:40:05 +03:00
Houkime fd43a6ccf1 doc(services): explain the Owned Path reason d'etre after trying to remove it 2024-03-06 18:40:05 +03:00
Houkime eeef2891c9 fix(services): fix merge bug 2024-03-06 18:40:05 +03:00
Houkime 3f9d2b2481 refactor(services): remove too many imports and cleanup 2024-03-06 18:40:05 +03:00
Houkime 305e5cc2c3 refactor(services): introduce Bind class and test moving deeper 2024-03-06 18:40:05 +03:00
Houkime 1e51f51844 feature(backups): intermittent commit for binds, to be replaced 2024-03-06 18:40:05 +03:00
Houkime 235c59b556 refactor(services): break out location construction when moving 2024-03-06 18:40:05 +03:00
Houkime ddca1b0cde refactor(services): fix type annotation 2024-03-06 18:40:05 +03:00
Houkime c22802f693 fix(services): check for possible None progress when moving folders 2024-03-06 18:40:05 +03:00
Houkime 17a1e34c0d feature(services): check before moving task and before move itself 2024-03-06 18:40:05 +03:00
Houkime d7ef2ed09a refactor(services): make moving a part of generic service functionality 2024-03-06 18:39:27 +03:00
Houkime 7fd09982a4 fix(services): a better error message 2024-03-06 18:39:27 +03:00
Houkime b054235d96 test(services): remove unused json 2024-03-06 18:39:27 +03:00
Houkime 2519a50aac test(services): merge def and current service tests 2024-03-06 18:39:27 +03:00
Houkime d34db3d661 fix(services): report moving errors fully 2024-03-06 18:39:27 +03:00
Houkime 28fdf8fb49 refactor(service_mover): decompose the giant move_service 2024-03-06 18:39:27 +03:00
def 18327ffa85 test: remove unused mocks, fix tests naming 2024-03-06 18:39:27 +03:00
def b5183948af fix: service tests 2024-03-06 18:39:27 +03:00
def e01b8ed8f0 add test_api_services.py 2024-03-06 18:39:27 +03:00
def 5cd1e28632 add storage tests 2024-03-06 18:39:27 +03:00
Inex Code f895f2a38b refactor: Return last 10 log lines when system rebuild failed 2024-03-06 18:33:55 +03:00
Inex Code 8a607b9609 Merge pull request 'def_tests_reworked' (#88) from def_tests_reworked into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/88
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-05 16:40:15 +02:00
Inex Code c733cfeb9e Merge remote-tracking branch 'origin/system-rebuild-tracking' into system-rebuild-tracking 2024-03-05 14:41:43 +03:00
Inex Code 71433da424 refactor: move systemd functions to utils 2024-03-05 11:55:52 +03:00
Houkime ee7c41e0c2 test(services): clean up tests 2024-03-04 17:37:26 +00:00
Houkime 1bed9d87ca doc(services): explain the Owned Path reason d'etre after trying to remove it 2024-03-04 17:16:08 +00:00
Houkime 2c1c783b5e fix(services): fix merge bug 2024-03-04 14:26:26 +00:00
Houkime 8402f66a33 refactor(services): remove too many imports and cleanup 2024-03-04 14:12:44 +00:00
Houkime 1599f601a2 refactor(services): introduce Bind class and test moving deeper 2024-03-04 14:12:44 +00:00
Houkime 0068272382 feature(backups): intermittent commit for binds, to be replaced 2024-03-04 14:12:43 +00:00
Houkime 18934a53e6 refactor(services): break out location construction when moving 2024-03-04 14:12:43 +00:00
Houkime baaf3299ce refactor(services): fix type annotation 2024-03-04 14:12:43 +00:00
Houkime f059c83b57 fix(services): check for possible None progress when moving folders 2024-03-04 14:12:43 +00:00
Houkime fb41c092f1 feature(services): check before moving task and before move itself 2024-03-04 14:12:37 +00:00
Houkime c947922a5d refactor(services): make moving a part of generic service functionality 2024-03-04 13:30:03 +00:00
Houkime b22dfc0469 fix(services): a better error message 2024-03-04 13:30:03 +00:00
Houkime b3c7e2fa9e test(services): remove unused json 2024-03-04 13:30:03 +00:00
Houkime 6cd1d27902 test(services): merge def and current service tests 2024-03-04 13:30:03 +00:00
Houkime e42da357fb fix(services): report moving errors fully 2024-03-04 13:30:03 +00:00
Houkime 2863dd9763 refactor(service_mover): decompose the giant move_service 2024-03-04 13:30:03 +00:00
def 0309e6b76e test: remove unused mocks, fix tests naming 2024-03-04 13:30:03 +00:00
def f4739d4539 fix: service tests 2024-03-04 13:30:03 +00:00
def 20c089154d add test_api_services.py 2024-03-04 13:30:03 +00:00
def e703206e9d add storage tests 2024-03-04 13:30:03 +00:00
Inex Code 96f8aad146 Merge branch 'master' into system-rebuild-tracking 2024-03-04 10:54:43 +02:00
Inex Code 0e94590420 Merge pull request 'simplify autobackups tasking to avoid deadlocks' (#97) from fix-autobackup-typing into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/97
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-03 23:46:15 +02:00
Inex Code 36d026a8ca style: Formatting 2024-03-04 00:45:45 +03:00
Inex Code 8cb812be56 chore: Remove debug leftover 2024-03-03 12:00:07 +03:00
Houkime 7ccf495958 refactor(backups): remove excessive format-strings 2024-03-01 13:59:43 +00:00
Houkime f840a6e204 feature(devshell): add pyflakes to catch missing imports 2024-03-01 13:55:02 +00:00
Houkime f5d7666614 refactor(backups): remove excessive imports 2024-03-01 13:54:10 +00:00
Houkime 76f5b57c86 refactor(jobs): add explicit return statements 2024-03-01 12:44:08 +00:00
Houkime bf33fff20d fix(backups): finish the autobackup job 2024-03-01 12:44:08 +00:00
Houkime 742bb239e7 fix(backups): simplify autobackups to avoid deadlocks 2024-03-01 12:44:08 +00:00
Inex Code e16f4499f8 Merge pull request 'fix(dns): Ignore link-local IPv6 address' (#99) from inex/fix-linklocal-ipv6 into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/99
2024-03-01 14:13:15 +02:00
Inex Code 5616dbe77a style: rename ip6 addresses variable 2024-03-01 15:06:32 +03:00
Inex Code bbec9d9d33 refactor: use ipaddress library for ip validation 2024-03-01 14:58:28 +03:00
Inex Code a4327fa669 fix(dns): Ignore link-local IPv6 address 2024-03-01 03:21:31 +03:00
Inex Code 2443ae0144 chore: Remove version flavor 2024-02-26 22:51:31 +03:00
Inex Code c63552241c tests: Cover upgrade and rebuild task 2024-02-26 22:49:32 +03:00
Inex Code d8666fa179 Merge commit '4757bedc4ec62d3577fd1f259abbe34ba6dce893' into system-rebuild-tracking 2024-02-26 18:27:54 +03:00
Inex Code 25c691104f fix: non-0 exit status of is-active 2024-02-12 18:58:27 +03:00
Inex Code 1a34558e23 chore: Shorten the output on status_text 2024-02-12 18:54:32 +03:00
Inex Code c851c3d193 chore: more debugging outuput 2024-02-12 18:53:14 +03:00
Inex Code ad069a2ad2 fix: wrong unit name again 2024-02-12 18:47:37 +03:00
Inex Code b98c020f23 fix: wrong systemd unit used 2024-02-12 18:41:24 +03:00
Inex Code 94456af7d4 fix: debugging 2024-02-12 18:34:55 +03:00
Inex Code ab1ca6e59c fix: register huey task 2024-02-12 18:27:32 +03:00
Inex Code 00bcca0f99 fix: invalid setuptools version 2024-02-12 18:24:54 +03:00
Inex Code 56de00226a chore: Testing env 2024-02-12 18:21:09 +03:00
Inex Code 2019da1e10 feat: Track the status of the nixos rebuild systemd unit 2024-02-12 18:17:18 +03:00
63 changed files with 2451 additions and 905 deletions

View File

@ -5,18 +5,11 @@ name: default
steps:
- name: Run Tests and Generate Coverage Report
commands:
- kill $(ps aux | grep 'redis-server 127.0.0.1:6389' | awk '{print $2}') || true
- redis-server --bind 127.0.0.1 --port 6389 >/dev/null &
# We do not care about persistance on CI
- sleep 10
- redis-cli -h 127.0.0.1 -p 6389 config set stop-writes-on-bgsave-error no
- coverage run -m pytest -q
- coverage xml
- nix flake check -L
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN"
environment:
SONARQUBE_TOKEN:
from_secret: SONARQUBE_TOKEN
USE_REDIS_PORT: 6389
- name: Run Bandit Checks

0
.gitignore vendored Executable file → Normal file
View File

2
.mypy.ini Normal file
View File

@ -0,0 +1,2 @@
[mypy]
plugins = pydantic.mypy

View File

@ -1,6 +1,8 @@
# SelfPrivacy GraphQL API which allows app to control your server
## build
![CI status](https://ci.selfprivacy.org/api/badges/SelfPrivacy/selfprivacy-rest-api/status.svg)
## Build
```console
$ nix build
@ -8,7 +10,7 @@ $ nix build
In case of successful build, you should get the `./result` symlink to a folder (in `/nix/store`) with build contents.
## develop
## Develop
```console
$ nix develop
@ -21,10 +23,10 @@ Type "help", "copyright", "credits" or "license" for more information.
If you don't have experimental flakes enabled, you can use the following command:
```console
nix --extra-experimental-features nix-command --extra-experimental-features flakes develop
$ nix --extra-experimental-features nix-command --extra-experimental-features flakes develop
```
## testing
## Testing
Run the test suite by running coverage with pytest inside an ephemeral NixOS VM with redis service enabled:
```console
@ -61,7 +63,7 @@ $ TMPDIR=".nixos-vm-tmp-dir" nix run .#checks.x86_64-linux.default.driverInterac
Option `-L`/`--print-build-logs` is optional for all nix commands. It tells nix to print each log line one after another instead of overwriting a single one.
## dependencies and dependant modules
## Dependencies and Dependant Modules
This flake depends on a single Nix flake input - nixpkgs repository. nixpkgs repository is used for all software packages used to build, run API service, tests, etc.
@ -85,6 +87,6 @@ $ nix flake metadata git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nix
Nix code for NixOS service module for API is located in NixOS configuration repository.
## troubleshooting
## Troubleshooting
Sometimes commands inside `nix develop` refuse to work properly if the calling shell lacks `LANG` environment variable. Try to set it before entering `nix develop`.

View File

@ -2,11 +2,11 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1702780907,
"narHash": "sha256-blbrBBXjjZt6OKTcYX1jpe9SRof2P9ZYWPzq22tzXAA=",
"lastModified": 1709677081,
"narHash": "sha256-tix36Y7u0rkn6mTm0lA45b45oab2cFLqAzDbJxeXS+c=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "1e2e384c5b7c50dbf8e9c441a9e58d85f408b01f",
"rev": "880992dcc006a5e00dd0591446fdf723e6a51a64",
"type": "github"
},
"original": {

View File

@ -19,12 +19,15 @@
pytest
pytest-datadir
pytest-mock
pytest-subprocess
black
mypy
pylsp-mypy
python-lsp-black
python-lsp-server
pyflakes
typer # for strawberry
types-redis # for mypy
] ++ strawberry-graphql.optional-dependencies.cli));
vmtest-src-dir = "/root/source";
@ -38,7 +41,7 @@
[testing in NixOS VM]
nixos-test-driver - run an interactive NixOS VM with with all dependencies
nixos-test-driver - run an interactive NixOS VM with all dependencies included and 2 disk volumes
pytest-vm - run pytest in an ephemeral NixOS VM with Redis, accepting pytest arguments
'';
in
@ -76,7 +79,7 @@
};
nixosModules.default =
import ./nixos/module.nix self.packages.${system}.default;
devShells.${system}.default = pkgs.mkShell {
devShells.${system}.default = pkgs.mkShellNoCC {
name = "SP API dev shell";
packages = with pkgs; [
nixpkgs-fmt
@ -111,38 +114,47 @@
"black --check ${self.outPath} > $out";
default =
pkgs.testers.runNixOSTest {
imports = [{
name = "default";
nodes.machine = { lib, pkgs, ... }: {
imports = [{
boot.consoleLogLevel = lib.mkForce 3;
documentation.enable = false;
services.journald.extraConfig = lib.mkForce "";
services.redis.servers.sp-api = {
enable = true;
save = [ ];
port = 6379; # FIXME
settings.notify-keyspace-events = "KEA";
};
environment.systemPackages = with pkgs; [
python-env
# TODO: these can be passed via wrapper script around app
rclone
restic
];
environment.variables.TEST_MODE = "true";
systemd.tmpfiles.settings.src.${vmtest-src-dir}.L.argument =
self.outPath;
}];
name = "default";
nodes.machine = { lib, pkgs, ... }: {
# 2 additional disks (1024 MiB and 200 MiB) with empty ext4 FS
virtualisation.emptyDiskImages = [ 1024 200 ];
virtualisation.fileSystems."/volumes/vdb" = {
autoFormat = true;
device = "/dev/vdb"; # this name is chosen by QEMU, not here
fsType = "ext4";
noCheck = true;
};
testScript = ''
start_all()
machine.succeed("cd ${vmtest-src-dir} && coverage run --data-file=/tmp/.coverage -m pytest -p no:cacheprovider -v >&2")
machine.succeed("coverage xml --rcfile=${vmtest-src-dir}/.coveragerc --data-file=/tmp/.coverage >&2")
machine.copy_from_vm("coverage.xml", ".")
machine.succeed("coverage report >&2")
'';
}];
virtualisation.fileSystems."/volumes/vdc" = {
autoFormat = true;
device = "/dev/vdc"; # this name is chosen by QEMU, not here
fsType = "ext4";
noCheck = true;
};
boot.consoleLogLevel = lib.mkForce 3;
documentation.enable = false;
services.journald.extraConfig = lib.mkForce "";
services.redis.servers.sp-api = {
enable = true;
save = [ ];
settings.notify-keyspace-events = "KEA";
};
environment.systemPackages = with pkgs; [
python-env
# TODO: these can be passed via wrapper script around app
rclone
restic
];
environment.variables.TEST_MODE = "true";
systemd.tmpfiles.settings.src.${vmtest-src-dir}.L.argument =
self.outPath;
};
testScript = ''
start_all()
machine.succeed("cd ${vmtest-src-dir} && coverage run --data-file=/tmp/.coverage -m pytest -p no:cacheprovider -v >&2")
machine.succeed("coverage xml --rcfile=${vmtest-src-dir}/.coveragerc --data-file=/tmp/.coverage >&2")
machine.copy_from_vm("coverage.xml", ".")
machine.succeed("coverage report >&2")
'';
};
};
};

View File

@ -0,0 +1,34 @@
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.jobs import Jobs, Job
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.services.tasks import move_service as move_service_task
class ServiceNotFoundError(Exception):
pass
class VolumeNotFoundError(Exception):
pass
def move_service(service_id: str, volume_name: str) -> Job:
service = get_service_by_id(service_id)
if service is None:
raise ServiceNotFoundError(f"No such service:{service_id}")
volume = BlockDevices().get_block_device(volume_name)
if volume is None:
raise VolumeNotFoundError(f"No such volume:{volume_name}")
service.assert_can_move(volume)
job = Jobs.add(
type_id=f"services.{service.get_id()}.move",
name=f"Move {service.get_display_name()}",
description=f"Moving {service.get_display_name()} data to {volume.name}",
)
move_service_task(service, volume, job)
return job

View File

@ -4,6 +4,8 @@ import subprocess
import pytz
from typing import Optional, List
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.utils import WriteUserData, ReadUserData
@ -87,10 +89,16 @@ def run_blocking(cmd: List[str], new_session: bool = False) -> str:
return stdout
def rebuild_system() -> int:
def rebuild_system() -> Job:
"""Rebuild the system"""
run_blocking(["systemctl", "start", "sp-nixos-rebuild.service"], new_session=True)
return 0
job = Jobs.add(
type_id="system.nixos.rebuild",
name="Rebuild system",
description="Applying the new system configuration by building the new NixOS generation.",
status=JobStatus.CREATED,
)
rebuild_system_task(job)
return job
def rollback_system() -> int:
@ -99,10 +107,16 @@ def rollback_system() -> int:
return 0
def upgrade_system() -> int:
def upgrade_system() -> Job:
"""Upgrade the system"""
run_blocking(["systemctl", "start", "sp-nixos-upgrade.service"], new_session=True)
return 0
job = Jobs.add(
type_id="system.nixos.upgrade",
name="Upgrade system",
description="Upgrading the system to the latest version.",
status=JobStatus.CREATED,
)
rebuild_system_task(job, upgrade=True)
return job
def reboot_system() -> None:

View File

@ -259,7 +259,7 @@ class Backups:
Backups._prune_auto_snaps(service)
service.post_restore()
except Exception as error:
Jobs.update(job, status=JobStatus.ERROR, status_text=str(error))
Jobs.update(job, status=JobStatus.ERROR, error=str(error))
raise error
Jobs.update(job, status=JobStatus.FINISHED)

View File

@ -172,6 +172,21 @@ class ResticBackupper(AbstractBackupper):
return messages
@staticmethod
def _replace_in_array(array: List[str], target, replacement) -> None:
if target == "":
return
for i, value in enumerate(array):
if target in value:
array[i] = array[i].replace(target, replacement)
def _censor_command(self, command: List[str]) -> List[str]:
result = command.copy()
ResticBackupper._replace_in_array(result, self.key, "CENSORED")
ResticBackupper._replace_in_array(result, LocalBackupSecret.get(), "CENSORED")
return result
@staticmethod
def _get_backup_job(service_name: str) -> Optional[Job]:
service = get_service_by_id(service_name)
@ -218,7 +233,7 @@ class ResticBackupper(AbstractBackupper):
"Could not create a snapshot: ",
str(error),
"command: ",
backup_command,
self._censor_command(backup_command),
) from error
@staticmethod

View File

@ -14,6 +14,10 @@ def backup_job_type(service: Service) -> str:
return f"{job_type_prefix(service)}.backup"
def autobackup_job_type() -> str:
return "backups.autobackup"
def restore_job_type(service: Service) -> str:
return f"{job_type_prefix(service)}.restore"
@ -36,6 +40,17 @@ def is_something_running_for(service: Service) -> bool:
return len(running_jobs) != 0
def add_autobackup_job(services: List[Service]) -> Job:
service_names = [s.get_display_name() for s in services]
pretty_service_list: str = ", ".join(service_names)
job = Jobs.add(
type_id=autobackup_job_type(),
name="Automatic backup",
description=f"Scheduled backup for services: {pretty_service_list}",
)
return job
def add_backup_job(service: Service) -> Job:
if is_something_running_for(service):
message = (
@ -78,12 +93,14 @@ def get_job_by_type(type_id: str) -> Optional[Job]:
JobStatus.RUNNING,
]:
return job
return None
def get_failed_job_by_type(type_id: str) -> Optional[Job]:
for job in Jobs.get_jobs():
if job.type_id == type_id and job.status == JobStatus.ERROR:
return job
return None
def get_backup_job(service: Service) -> Optional[Job]:

View File

@ -21,6 +21,8 @@ PROVIDER_MAPPING: dict[BackupProviderEnum, Type[AbstractBackupProvider]] = {
def get_provider(
provider_type: BackupProviderEnum,
) -> Type[AbstractBackupProvider]:
if provider_type not in PROVIDER_MAPPING.keys():
raise LookupError("could not look up provider", provider_type)
return PROVIDER_MAPPING[provider_type]

View File

@ -12,9 +12,9 @@ from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.utils.huey import huey
from huey import crontab
from selfprivacy_api.services.service import Service
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.backup import Backups
from selfprivacy_api.backup.jobs import add_autobackup_job
from selfprivacy_api.jobs import Jobs, JobStatus, Job
@ -72,26 +72,44 @@ def restore_snapshot(
return True
def do_autobackup():
def do_autobackup() -> None:
"""
Body of autobackup task, broken out to test it
For some reason, we cannot launch periodic huey tasks
inside tests
"""
time = datetime.utcnow().replace(tzinfo=timezone.utc)
for service in Backups.services_to_back_up(time):
handle = start_backup(service.get_id(), BackupReason.AUTO)
# To be on safe side, we do not do it in parallel
handle(blocking=True)
services_to_back_up = Backups.services_to_back_up(time)
if not services_to_back_up:
return
job = add_autobackup_job(services_to_back_up)
progress_per_service = 100 // len(services_to_back_up)
progress = 0
Jobs.update(job, JobStatus.RUNNING, progress=progress)
for service in services_to_back_up:
try:
Backups.back_up(service, BackupReason.AUTO)
except Exception as error:
Jobs.update(
job,
status=JobStatus.ERROR,
error=type(error).__name__ + ": " + str(error),
)
return
progress = progress + progress_per_service
Jobs.update(job, JobStatus.RUNNING, progress=progress)
Jobs.update(job, JobStatus.FINISHED)
@huey.periodic_task(validate_datetime=validate_datetime)
def automatic_backup() -> bool:
def automatic_backup() -> None:
"""
The worker periodic task that starts the automatic backup process.
"""
do_autobackup()
return True
@huey.periodic_task(crontab(hour="*/" + str(SNAPSHOT_CACHE_TTL_HOURS)))

View File

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str:
"""Get API version"""
return "3.0.1"
return "3.1.0"

View File

@ -2,6 +2,7 @@ import typing
import strawberry
# TODO: use https://strawberry.rocks/docs/integrations/pydantic when it is stable
@strawberry.type
class DnsRecord:
"""DNS record"""

View File

@ -1,13 +1,17 @@
from enum import Enum
import typing
import strawberry
from typing import Optional, List
import datetime
import strawberry
from selfprivacy_api.graphql.common_types.backup import BackupReason
from selfprivacy_api.graphql.common_types.dns import DnsRecord
from selfprivacy_api.services import get_service_by_id, get_services_by_location
from selfprivacy_api.services import Service as ServiceInterface
from selfprivacy_api.services import ServiceDnsRecord
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.utils.network import get_ip4, get_ip6
def get_usages(root: "StorageVolume") -> list["StorageUsageInterface"]:
@ -32,8 +36,8 @@ class StorageVolume:
used_space: str
root: bool
name: str
model: typing.Optional[str]
serial: typing.Optional[str]
model: Optional[str]
serial: Optional[str]
type: str
@strawberry.field
@ -45,7 +49,7 @@ class StorageVolume:
@strawberry.interface
class StorageUsageInterface:
used_space: str
volume: typing.Optional[StorageVolume]
volume: Optional[StorageVolume]
title: str
@ -53,7 +57,7 @@ class StorageUsageInterface:
class ServiceStorageUsage(StorageUsageInterface):
"""Storage usage for a service"""
service: typing.Optional["Service"]
service: Optional["Service"]
@strawberry.enum
@ -85,6 +89,20 @@ def get_storage_usage(root: "Service") -> ServiceStorageUsage:
)
# TODO: This won't be needed when deriving DnsRecord via strawberry pydantic integration
# https://strawberry.rocks/docs/integrations/pydantic
# Remove when the link above says it got stable.
def service_dns_to_graphql(record: ServiceDnsRecord) -> DnsRecord:
return DnsRecord(
record_type=record.type,
name=record.name,
content=record.content,
ttl=record.ttl,
priority=record.priority,
display_name=record.display_name,
)
@strawberry.type
class Service:
id: str
@ -97,16 +115,26 @@ class Service:
can_be_backed_up: bool
backup_description: str
status: ServiceStatusEnum
url: typing.Optional[str]
dns_records: typing.Optional[typing.List[DnsRecord]]
url: Optional[str]
@strawberry.field
def dns_records(self) -> Optional[List[DnsRecord]]:
service = get_service_by_id(self.id)
if service is None:
raise LookupError(f"no service {self.id}. Should be unreachable")
raw_records = service.get_dns_records(get_ip4(), get_ip6())
dns_records = [service_dns_to_graphql(record) for record in raw_records]
return dns_records
@strawberry.field
def storage_usage(self) -> ServiceStorageUsage:
"""Get storage usage for a service"""
return get_storage_usage(self)
# TODO: fill this
@strawberry.field
def backup_snapshots(self) -> typing.Optional[typing.List["SnapshotInfo"]]:
def backup_snapshots(self) -> Optional[List["SnapshotInfo"]]:
return None
@ -132,21 +160,10 @@ def service_to_graphql_service(service: ServiceInterface) -> Service:
backup_description=service.get_backup_description(),
status=ServiceStatusEnum(service.get_status().value),
url=service.get_url(),
dns_records=[
DnsRecord(
record_type=record.type,
name=record.name,
content=record.content,
ttl=record.ttl,
priority=record.priority,
display_name=record.display_name,
)
for record in service.get_dns_records()
],
)
def get_volume_by_id(volume_id: str) -> typing.Optional[StorageVolume]:
def get_volume_by_id(volume_id: str) -> Optional[StorageVolume]:
"""Get volume by id"""
volume = BlockDevices().get_block_device(volume_id)
if volume is None:

View File

@ -6,17 +6,24 @@ from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.jobs import JobStatus
from selfprivacy_api.graphql.common_types.service import (
Service,
service_to_graphql_service,
)
from traceback import format_tb as format_traceback
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn,
GenericMutationReturn,
)
from selfprivacy_api.graphql.common_types.service import (
Service,
service_to_graphql_service,
)
from selfprivacy_api.actions.services import (
move_service,
ServiceNotFoundError,
VolumeNotFoundError,
)
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.utils.block_devices import BlockDevices
@strawberry.type
@ -60,7 +67,7 @@ class ServicesMutations:
except Exception as e:
return ServiceMutationReturn(
success=False,
message=format_error(e),
message=pretty_error(e),
code=400,
)
@ -86,7 +93,7 @@ class ServicesMutations:
except Exception as e:
return ServiceMutationReturn(
success=False,
message=format_error(e),
message=pretty_error(e),
code=400,
)
return ServiceMutationReturn(
@ -153,31 +160,32 @@ class ServicesMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
def move_service(self, input: MoveServiceInput) -> ServiceJobMutationReturn:
"""Move service."""
# We need a service instance for a reply later
service = get_service_by_id(input.service_id)
if service is None:
return ServiceJobMutationReturn(
success=False,
message="Service not found.",
message=f"Service does not exist: {input.service_id}",
code=404,
)
# TODO: make serviceImmovable and BlockdeviceNotFound exceptions
# in the move_to_volume() function and handle them here
if not service.is_movable():
try:
job = move_service(input.service_id, input.location)
except (ServiceNotFoundError, VolumeNotFoundError) as e:
return ServiceJobMutationReturn(
success=False,
message="Service is not movable.",
message=pretty_error(e),
code=404,
)
except Exception as e:
return ServiceJobMutationReturn(
success=False,
message=pretty_error(e),
code=400,
service=service_to_graphql_service(service),
)
volume = BlockDevices().get_block_device(input.location)
if volume is None:
return ServiceJobMutationReturn(
success=False,
message="Volume not found.",
code=404,
service=service_to_graphql_service(service),
)
job = service.move_to_volume(volume)
if job.status in [JobStatus.CREATED, JobStatus.RUNNING]:
return ServiceJobMutationReturn(
success=True,
@ -197,12 +205,13 @@ class ServicesMutations:
else:
return ServiceJobMutationReturn(
success=False,
message=f"Service move failure: {job.status_text}",
message=f"While moving service and performing the step '{job.status_text}', error occured: {job.error}",
code=400,
service=service_to_graphql_service(service),
job=job_to_api_job(job),
)
def format_error(e: Exception) -> str:
return type(e).__name__ + ": " + str(e)
def pretty_error(e: Exception) -> str:
traceback = "/r".join(format_traceback(e.__traceback__))
return type(e).__name__ + ": " + str(e) + ": " + traceback

View File

@ -3,12 +3,17 @@
import typing
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn,
GenericMutationReturn,
MutationReturnInterface,
GenericJobMutationReturn,
)
import selfprivacy_api.actions.system as system_actions
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.jobs.nix_collect_garbage import start_nix_collect_garbage
import selfprivacy_api.actions.ssh as ssh_actions
@ -114,16 +119,17 @@ class SystemMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def run_system_rebuild(self) -> GenericMutationReturn:
def run_system_rebuild(self) -> GenericJobMutationReturn:
try:
system_actions.rebuild_system()
return GenericMutationReturn(
job = system_actions.rebuild_system()
return GenericJobMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system rebuild",
code=200,
job=job_to_api_job(job),
)
except system_actions.ShellException as e:
return GenericMutationReturn(
return GenericJobMutationReturn(
success=False,
message=str(e),
code=500,
@ -135,7 +141,7 @@ class SystemMutations:
try:
return GenericMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system rollback",
code=200,
)
except system_actions.ShellException as e:
@ -146,16 +152,17 @@ class SystemMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def run_system_upgrade(self) -> GenericMutationReturn:
system_actions.upgrade_system()
def run_system_upgrade(self) -> GenericJobMutationReturn:
try:
return GenericMutationReturn(
job = system_actions.upgrade_system()
return GenericJobMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system upgrade",
code=200,
job=job_to_api_job(job),
)
except system_actions.ShellException as e:
return GenericMutationReturn(
return GenericJobMutationReturn(
success=False,
message=str(e),
code=500,
@ -191,3 +198,14 @@ class SystemMutations:
message=f"Failed to pull repository changes:\n{result.data}",
code=500,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def nix_collect_garbage(self) -> GenericJobMutationReturn:
job = start_nix_collect_garbage()
return GenericJobMutationReturn(
success=True,
code=200,
message="Garbage collector started...",
job=job_to_api_job(job),
)

View File

@ -268,6 +268,20 @@ class Jobs:
return False
def report_progress(progress: int, job: Job, status_text: str) -> None:
"""
A terse way to call a common operation, for readability
job.report_progress() would be even better
but it would go against how this file is written
"""
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=status_text,
progress=progress,
)
def _redis_key_from_uuid(uuid_string) -> str:
return "jobs:" + str(uuid_string)

View File

@ -67,8 +67,8 @@ def move_folder(
try:
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
except Exception as e:
print(f"Error creating data path: {e}")
except Exception as error:
print(f"Error creating data path: {error}")
return
try:

View File

@ -0,0 +1,147 @@
import re
import subprocess
from typing import Tuple, Iterable
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs, Job
class ShellException(Exception):
"""Shell-related errors"""
COMPLETED_WITH_ERROR = "Error occurred, please report this to the support chat."
RESULT_WAS_NOT_FOUND_ERROR = (
"We are sorry, garbage collection result was not found. "
"Something went wrong, please report this to the support chat."
)
CLEAR_COMPLETED = "Garbage collection completed."
def delete_old_gens_and_return_dead_report() -> str:
subprocess.run(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
check=False,
)
result = subprocess.check_output(["nix-store", "--gc", "--print-dead"]).decode(
"utf-8"
)
return " " if result is None else result
def run_nix_collect_garbage() -> Iterable[bytes]:
process = subprocess.Popen(
["nix-store", "--gc"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
return process.stdout if process.stdout else iter([])
def parse_line(job: Job, line: str) -> Job:
"""
We parse the string for the presence of a final line,
with the final amount of space cleared.
Simply put, we're just looking for a similar string:
"1537 store paths deleted, 339.84 MiB freed".
"""
pattern = re.compile(r"[+-]?\d+\.\d+ \w+(?= freed)")
match = re.search(pattern, line)
if match is None:
raise ShellException("nix returned gibberish output")
else:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
status_text=CLEAR_COMPLETED,
result=f"{match.group(0)} have been cleared",
)
return job
def process_stream(job: Job, stream: Iterable[bytes], total_dead_packages: int) -> None:
completed_packages = 0
prev_progress = 0
for line in stream:
line = line.decode("utf-8")
if "deleting '/nix/store/" in line:
completed_packages += 1
percent = int((completed_packages / total_dead_packages) * 100)
if percent - prev_progress >= 5:
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=percent,
status_text="Cleaning...",
)
prev_progress = percent
elif "store paths deleted," in line:
parse_line(job, line)
def get_dead_packages(output) -> Tuple[int, float]:
dead = len(re.findall("/nix/store/", output))
percent = 0
if dead != 0:
percent = 100 / dead
return dead, percent
@huey.task()
def calculate_and_clear_dead_paths(job: Job):
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text="Calculate the number of dead packages...",
)
dead_packages, package_equal_to_percent = get_dead_packages(
delete_old_gens_and_return_dead_report()
)
if dead_packages == 0:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
status_text="Nothing to clear",
result="System is clear",
)
return True
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text=f"Found {dead_packages} packages to remove!",
)
stream = run_nix_collect_garbage()
try:
process_stream(job, stream, dead_packages)
except ShellException as error:
Jobs.update(
job=job,
status=JobStatus.ERROR,
status_text=COMPLETED_WITH_ERROR,
error=RESULT_WAS_NOT_FOUND_ERROR,
)
def start_nix_collect_garbage() -> Job:
job = Jobs.add(
type_id="maintenance.collect_nix_garbage",
name="Collect garbage",
description="Cleaning up unused packages",
)
calculate_and_clear_dead_paths(job=job)
return job

View File

@ -0,0 +1,136 @@
"""
A task to start the system upgrade or rebuild by starting a systemd unit.
After starting, track the status of the systemd unit and update the Job
status accordingly.
"""
import subprocess
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs, Job
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils.systemd import (
get_service_status,
get_last_log_lines,
ServiceStatus,
)
START_TIMEOUT = 60 * 5
START_INTERVAL = 1
RUN_TIMEOUT = 60 * 60
RUN_INTERVAL = 5
def check_if_started(unit_name: str):
"""Check if the systemd unit has started"""
try:
status = get_service_status(unit_name)
if status == ServiceStatus.ACTIVE:
return True
return False
except subprocess.CalledProcessError:
return False
def check_running_status(job: Job, unit_name: str):
"""Check if the systemd unit is running"""
try:
status = get_service_status(unit_name)
if status == ServiceStatus.INACTIVE:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result="System rebuilt.",
progress=100,
)
return True
if status == ServiceStatus.FAILED:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild failed. Last log lines:\n" + "\n".join(log_lines),
)
return True
if status == ServiceStatus.ACTIVE:
log_lines = get_last_log_lines(unit_name, 1)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=log_lines[0] if len(log_lines) > 0 else "",
)
return False
return False
except subprocess.CalledProcessError:
return False
def rebuild_system(job: Job, upgrade: bool = False):
"""
Broken out to allow calling it synchronously.
We cannot just block until task is done because it will require a second worker
Which we do not have
"""
unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service"
try:
command = ["systemctl", "start", unit_name]
subprocess.run(
command,
check=True,
start_new_session=True,
shell=False,
)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text="Starting the system rebuild...",
)
# Wait for the systemd unit to start
try:
wait_until_true(
lambda: check_if_started(unit_name),
timeout_sec=START_TIMEOUT,
interval=START_INTERVAL,
)
except TimeoutError:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out. Last log lines:\n"
+ "\n".join(log_lines),
)
return
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text="Rebuilding the system...",
)
# Wait for the systemd unit to finish
try:
wait_until_true(
lambda: check_running_status(job, unit_name),
timeout_sec=RUN_TIMEOUT,
interval=RUN_INTERVAL,
)
except TimeoutError:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out. Last log lines:\n"
+ "\n".join(log_lines),
)
return
except subprocess.CalledProcessError as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
status_text=str(e),
)
@huey.task()
def rebuild_system_task(job: Job, upgrade: bool = False):
"""Rebuild the system"""
rebuild_system(job, upgrade)

View File

@ -11,9 +11,13 @@ Adding DISABLE_ALL to that array disables the migrations module entirely.
from selfprivacy_api.utils import ReadUserData, UserDataFiles
from selfprivacy_api.migrations.write_token_to_redis import WriteTokenToRedis
from selfprivacy_api.migrations.check_for_system_rebuild_jobs import (
CheckForSystemRebuildJobs,
)
migrations = [
WriteTokenToRedis(),
CheckForSystemRebuildJobs(),
]

View File

@ -0,0 +1,47 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.jobs import JobStatus, Jobs
class CheckForSystemRebuildJobs(Migration):
"""Check if there are unfinished system rebuild jobs and finish them"""
def get_migration_name(self):
return "check_for_system_rebuild_jobs"
def get_migration_description(self):
return "Check if there are unfinished system rebuild jobs and finish them"
def is_migration_needed(self):
# Check if there are any unfinished system rebuild jobs
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
return True
def migrate(self):
# As the API is restarted, we assume that the jobs are finished
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result="System rebuilt.",
progress=100,
)

View File

@ -0,0 +1,24 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel
class ServiceStatus(Enum):
"""Enum for service status"""
ACTIVE = "ACTIVE"
RELOADING = "RELOADING"
INACTIVE = "INACTIVE"
FAILED = "FAILED"
ACTIVATING = "ACTIVATING"
DEACTIVATING = "DEACTIVATING"
OFF = "OFF"
class ServiceDnsRecord(BaseModel):
type: str
name: str
content: str
ttl: int
display_name: str
priority: Optional[int] = None

View File

@ -30,7 +30,7 @@ class RedisTokensRepository(AbstractTokensRepository):
@staticmethod
def token_key_for_device(device_name: str):
md5_hash = md5()
md5_hash = md5(usedforsecurity=False)
md5_hash.update(bytes(device_name, "utf-8"))
digest = md5_hash.hexdigest()
return TOKENS_PREFIX + digest

View File

@ -56,14 +56,18 @@ def get_all_required_dns_records() -> list[ServiceDnsRecord]:
ttl=3600,
display_name="SelfPrivacy API",
),
ServiceDnsRecord(
type="AAAA",
name="api",
content=ip6,
ttl=3600,
display_name="SelfPrivacy API (IPv6)",
),
]
if ip6 is not None:
dns_records.append(
ServiceDnsRecord(
type="AAAA",
name="api",
content=ip6,
ttl=3600,
display_name="SelfPrivacy API (IPv6)",
)
)
for service in get_enabled_services():
dns_records += service.get_dns_records()
dns_records += service.get_dns_records(ip4, ip6)
return dns_records

View File

@ -1,15 +1,12 @@
"""Class representing Bitwarden service"""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON
@ -41,11 +38,15 @@ class Bitwarden(Service):
return "vaultwarden"
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://password.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "password"
@staticmethod
def is_movable() -> bool:
return True
@ -96,42 +97,5 @@ class Bitwarden(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/bitwarden", "/var/lib/bitwarden_rs"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
"""Return list of DNS records for Bitwarden service."""
return [
ServiceDnsRecord(
type="A",
name="password",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Bitwarden",
),
ServiceDnsRecord(
type="AAAA",
name="password",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Bitwarden (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.bitwarden.move",
name="Move Bitwarden",
description=f"Moving Bitwarden data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"bitwarden",
)
return job

View File

@ -1,260 +0,0 @@
"""Generic handler for moving services"""
from __future__ import annotations
import subprocess
import time
import pathlib
import shutil
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.owned_path import OwnedPath
class FolderMoveNames(BaseModel):
name: str
bind_location: str
owner: str
group: str
@staticmethod
def from_owned_path(path: OwnedPath) -> FolderMoveNames:
return FolderMoveNames(
name=FolderMoveNames.get_foldername(path.path),
bind_location=path.path,
owner=path.owner,
group=path.group,
)
@staticmethod
def get_foldername(path: str) -> str:
return path.split("/")[-1]
@staticmethod
def default_foldermoves(service: Service) -> list[FolderMoveNames]:
return [
FolderMoveNames.from_owned_path(folder)
for folder in service.get_owned_folders()
]
@huey.task()
def move_service(
service: Service,
volume: BlockDevice,
job: Job,
folder_names: list[FolderMoveNames],
userdata_location: str,
):
"""Move a service to another volume."""
job = Jobs.update(
job=job,
status_text="Performing pre-move checks...",
status=JobStatus.RUNNING,
)
service_name = service.get_display_name()
with ReadUserData() as user_data:
if not user_data.get("useBinds", False):
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Server is not using binds.",
)
return
# Check if we are on the same volume
old_volume = service.get_drive()
if old_volume == volume.name:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is already on this volume.",
)
return
# Check if there is enough space on the new volume
if int(volume.fsavail) < service.get_storage_usage():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Not enough space on the new volume.",
)
return
# Make sure the volume is mounted
if not volume.is_root() and f"/volumes/{volume.name}" not in volume.mountpoints:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Volume is not mounted.",
)
return
# Make sure current actual directory exists and if its user and group are correct
for folder in folder_names:
if not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").exists():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is not found.",
)
return
if not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").is_dir():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is not a directory.",
)
return
if (
not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").owner()
== folder.owner
):
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} owner is not {folder.owner}.",
)
return
# Stop service
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=f"Stopping {service_name}...",
progress=5,
)
service.stop()
# Wait for the service to stop, check every second
# If it does not stop in 30 seconds, abort
for _ in range(30):
if service.get_status() not in (
ServiceStatus.ACTIVATING,
ServiceStatus.DEACTIVATING,
):
break
time.sleep(1)
else:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} did not stop in 30 seconds.",
)
return
# Unmount old volume
Jobs.update(
job=job,
status_text="Unmounting old folder...",
status=JobStatus.RUNNING,
progress=10,
)
for folder in folder_names:
try:
subprocess.run(
["umount", folder.bind_location],
check=True,
)
except subprocess.CalledProcessError:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Unable to unmount old volume.",
)
return
# Move data to new volume and set correct permissions
Jobs.update(
job=job,
status_text="Moving data to new volume...",
status=JobStatus.RUNNING,
progress=20,
)
current_progress = 20
folder_percentage = 50 // len(folder_names)
for folder in folder_names:
shutil.move(
f"/volumes/{old_volume}/{folder.name}",
f"/volumes/{volume.name}/{folder.name}",
)
Jobs.update(
job=job,
status_text="Moving data to new volume...",
status=JobStatus.RUNNING,
progress=current_progress + folder_percentage,
)
Jobs.update(
job=job,
status_text=f"Making sure {service_name} owns its files...",
status=JobStatus.RUNNING,
progress=70,
)
for folder in folder_names:
try:
subprocess.run(
[
"chown",
"-R",
f"{folder.owner}:{folder.group}",
f"/volumes/{volume.name}/{folder.name}",
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.",
)
# Mount new volume
Jobs.update(
job=job,
status_text=f"Mounting {service_name} data...",
status=JobStatus.RUNNING,
progress=90,
)
for folder in folder_names:
try:
subprocess.run(
[
"mount",
"--bind",
f"/volumes/{volume.name}/{folder.name}",
folder.bind_location,
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Unable to mount new volume.",
)
return
# Update userdata
Jobs.update(
job=job,
status_text="Finishing move...",
status=JobStatus.RUNNING,
progress=95,
)
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if userdata_location not in user_data["modules"]:
user_data["modules"][userdata_location] = {}
user_data["modules"][userdata_location]["location"] = volume.name
# Start service
service.start()
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)

View File

@ -1,15 +1,12 @@
"""Class representing Bitwarden service"""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.gitea.icon import GITEA_ICON
@ -37,11 +34,15 @@ class Gitea(Service):
return base64.b64encode(GITEA_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://git.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "git"
@staticmethod
def is_movable() -> bool:
return True
@ -91,41 +92,5 @@ class Gitea(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/gitea"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="git",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Gitea",
),
ServiceDnsRecord(
type="AAAA",
name="git",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Gitea (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.gitea.move",
name="Move Gitea",
description=f"Moving Gitea data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"gitea",
)
return job

View File

@ -1,16 +1,15 @@
"""Class representing Jitsi Meet service"""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.generic_status_getter import (
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.jitsimeet.icon import JITSI_ICON
@ -38,11 +37,15 @@ class JitsiMeet(Service):
return base64.b64encode(JITSI_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://meet.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "meet"
@staticmethod
def is_movable() -> bool:
return False
@ -98,29 +101,8 @@ class JitsiMeet(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/jitsi-meet"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
ip4 = network_utils.get_ip4()
ip6 = network_utils.get_ip6()
return [
ServiceDnsRecord(
type="A",
name="meet",
content=ip4,
ttl=3600,
display_name="Jitsi",
),
ServiceDnsRecord(
type="AAAA",
name="meet",
content=ip6,
ttl=3600,
display_name="Jitsi (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
raise NotImplementedError("jitsi-meet service is not movable")

View File

@ -2,17 +2,13 @@
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_status_getter import (
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api import utils
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.mailserver.icon import MAILSERVER_ICON
@ -40,10 +36,14 @@ class MailServer(Service):
return "virtualMail"
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
return None
@staticmethod
def get_subdomain() -> Optional[str]:
return None
@staticmethod
def is_movable() -> bool:
return True
@ -102,20 +102,18 @@ class MailServer(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/vmail", "/var/sieve"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
@classmethod
def get_dns_records(cls, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]:
domain = utils.get_domain()
dkim_record = utils.get_dkim_key(domain)
ip4 = network_utils.get_ip4()
ip6 = network_utils.get_ip6()
if dkim_record is None:
return []
return [
dns_records = [
ServiceDnsRecord(
type="A",
name=domain,
@ -123,13 +121,6 @@ class MailServer(Service):
ttl=3600,
display_name="Root Domain",
),
ServiceDnsRecord(
type="AAAA",
name=domain,
content=ip6,
ttl=3600,
display_name="Root Domain (IPv6)",
),
ServiceDnsRecord(
type="MX",
name=domain,
@ -161,19 +152,14 @@ class MailServer(Service):
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.email.move",
name="Move Mail Server",
description=f"Moving mailserver data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"simple-nixos-mailserver",
)
return job
if ip6 is not None:
dns_records.append(
ServiceDnsRecord(
type="AAAA",
name=domain,
content=ip6,
ttl=3600,
display_name="Root Domain (IPv6)",
),
)
return dns_records

View File

@ -0,0 +1,72 @@
"""Generic handler for moving services"""
from __future__ import annotations
import shutil
from typing import List
from selfprivacy_api.jobs import Job, report_progress
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.owned_path import Bind
class MoveError(Exception):
"""Move of the data has failed"""
def check_volume(volume: BlockDevice, space_needed: int) -> None:
# Check if there is enough space on the new volume
if int(volume.fsavail) < space_needed:
raise MoveError("Not enough space on the new volume.")
# Make sure the volume is mounted
if not volume.is_root() and f"/volumes/{volume.name}" not in volume.mountpoints:
raise MoveError("Volume is not mounted.")
def check_binds(volume_name: str, binds: List[Bind]) -> None:
# Make sure current actual directory exists and if its user and group are correct
for bind in binds:
bind.validate()
def unbind_folders(owned_folders: List[Bind]) -> None:
for folder in owned_folders:
folder.unbind()
# May be moved into Bind
def move_data_to_volume(
binds: List[Bind],
new_volume: BlockDevice,
job: Job,
) -> List[Bind]:
current_progress = job.progress
if current_progress is None:
current_progress = 0
progress_per_folder = 50 // len(binds)
for bind in binds:
old_location = bind.location_at_volume()
bind.drive = new_volume
new_location = bind.location_at_volume()
try:
shutil.move(old_location, new_location)
except Exception as error:
raise MoveError(
f"could not move {old_location} to {new_location} : {str(error)}"
) from error
progress = current_progress + progress_per_folder
report_progress(progress, job, "Moving data to new volume...")
return binds
def ensure_folder_ownership(folders: List[Bind]) -> None:
for folder in folders:
folder.ensure_ownership()
def bind_folders(folders: List[Bind]):
for folder in folders:
folder.bind()

View File

@ -1,14 +1,14 @@
"""Class representing Nextcloud service."""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON
@ -36,11 +36,15 @@ class Nextcloud(Service):
return base64.b64encode(NEXTCLOUD_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://cloud.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "cloud"
@staticmethod
def is_movable() -> bool:
return True
@ -96,39 +100,5 @@ class Nextcloud(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/nextcloud"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="cloud",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Nextcloud",
),
ServiceDnsRecord(
type="AAAA",
name="cloud",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Nextcloud (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.nextcloud.move",
name="Move Nextcloud",
description=f"Moving Nextcloud to volume {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"nextcloud",
)
return job

View File

@ -3,12 +3,10 @@ import base64
import subprocess
import typing
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.ocserv.icon import OCSERV_ICON
import selfprivacy_api.utils.network as network_utils
class Ocserv(Service):
@ -35,6 +33,10 @@ class Ocserv(Service):
"""Return service url."""
return None
@staticmethod
def get_subdomain() -> typing.Optional[str]:
return "vpn"
@staticmethod
def is_movable() -> bool:
return False
@ -79,25 +81,6 @@ class Ocserv(Service):
def get_logs():
return ""
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="vpn",
content=network_utils.get_ip4(),
ttl=3600,
display_name="OpenConnect VPN",
),
ServiceDnsRecord(
type="AAAA",
name="vpn",
content=network_utils.get_ip6(),
ttl=3600,
display_name="OpenConnect VPN (IPv6)",
),
]
@staticmethod
def get_folders() -> typing.List[str]:
return []

View File

@ -1,7 +1,126 @@
from __future__ import annotations
import subprocess
import pathlib
from pydantic import BaseModel
from os.path import exists
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
# tests override it to a tmpdir
VOLUMES_PATH = "/volumes"
class BindError(Exception):
pass
class OwnedPath(BaseModel):
"""
A convenient interface for explicitly defining ownership of service folders.
One overrides Service.get_owned_paths() for this.
Why this exists?:
One could use Bind to define ownership but then one would need to handle drive which
is unnecessary and produces code duplication.
It is also somewhat semantically wrong to include Owned Path into Bind
instead of user and group. Because owner and group in Bind are applied to
the original folder on the drive, not to the binding path. But maybe it is
ok since they are technically both owned. Idk yet.
"""
path: str
owner: str
group: str
class Bind:
"""
A directory that resides on some volume but we mount it into fs where we need it.
Used for storing service data.
"""
def __init__(self, binding_path: str, owner: str, group: str, drive: BlockDevice):
self.binding_path = binding_path
self.owner = owner
self.group = group
self.drive = drive
# TODO: delete owned path interface from Service
@staticmethod
def from_owned_path(path: OwnedPath, drive_name: str) -> Bind:
drive = BlockDevices().get_block_device(drive_name)
if drive is None:
raise BindError(f"No such drive: {drive_name}")
return Bind(
binding_path=path.path, owner=path.owner, group=path.group, drive=drive
)
def bind_foldername(self) -> str:
return self.binding_path.split("/")[-1]
def location_at_volume(self) -> str:
return f"{VOLUMES_PATH}/{self.drive.name}/{self.bind_foldername()}"
def validate(self) -> None:
path = pathlib.Path(self.location_at_volume())
if not path.exists():
raise BindError(f"directory {path} is not found.")
if not path.is_dir():
raise BindError(f"{path} is not a directory.")
if path.owner() != self.owner:
raise BindError(f"{path} is not owned by {self.owner}.")
def bind(self) -> None:
if not exists(self.binding_path):
raise BindError(f"cannot bind to a non-existing path: {self.binding_path}")
source = self.location_at_volume()
target = self.binding_path
try:
subprocess.run(
["mount", "--bind", source, target],
stderr=subprocess.PIPE,
check=True,
)
except subprocess.CalledProcessError as error:
print(error.stderr)
raise BindError(f"Unable to bind {source} to {target} :{error.stderr}")
def unbind(self) -> None:
if not exists(self.binding_path):
raise BindError(f"cannot unbind a non-existing path: {self.binding_path}")
try:
subprocess.run(
# umount -l ?
["umount", self.binding_path],
check=True,
)
except subprocess.CalledProcessError:
raise BindError(f"Unable to unmount folder {self.binding_path}.")
pass
def ensure_ownership(self) -> None:
true_location = self.location_at_volume()
try:
subprocess.run(
[
"chown",
"-R",
f"{self.owner}:{self.group}",
# Could we just chown the binded location instead?
true_location,
],
check=True,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as error:
print(error.stderr)
error_message = (
f"Unable to set ownership of {true_location} :{error.stderr}"
)
raise BindError(error_message)

View File

@ -1,15 +1,14 @@
"""Class representing Nextcloud service."""
import base64
import subprocess
import typing
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON
@ -33,11 +32,15 @@ class Pleroma(Service):
return base64.b64encode(PLEROMA_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://social.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "social"
@staticmethod
def is_movable() -> bool:
return True
@ -82,10 +85,10 @@ class Pleroma(Service):
return ""
@staticmethod
def get_owned_folders() -> typing.List[OwnedPath]:
def get_owned_folders() -> List[OwnedPath]:
"""
Get a list of occupied directories with ownership info
pleroma has folders that are owned by different users
Pleroma has folders that are owned by different users
"""
return [
OwnedPath(
@ -99,37 +102,3 @@ class Pleroma(Service):
group="postgres",
),
]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="social",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Pleroma",
),
ServiceDnsRecord(
type="AAAA",
name="social",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Pleroma (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.pleroma.move",
name="Move Pleroma",
description=f"Moving Pleroma to volume {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"pleroma",
)
return job

View File

@ -1,43 +1,32 @@
"""Abstract class for a service running on a server"""
from abc import ABC, abstractmethod
from enum import Enum
import typing
from pydantic import BaseModel
from selfprivacy_api.jobs import Job
from typing import List, Optional
from selfprivacy_api import utils
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
from selfprivacy_api.jobs.upgrade_system import rebuild_system
from selfprivacy_api.models.services import ServiceStatus, ServiceDnsRecord
from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api import utils
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.services.owned_path import OwnedPath, Bind
from selfprivacy_api.services.moving import (
check_binds,
check_volume,
unbind_folders,
bind_folders,
ensure_folder_ownership,
MoveError,
move_data_to_volume,
)
DEFAULT_START_STOP_TIMEOUT = 5 * 60
class ServiceStatus(Enum):
"""Enum for service status"""
ACTIVE = "ACTIVE"
RELOADING = "RELOADING"
INACTIVE = "INACTIVE"
FAILED = "FAILED"
ACTIVATING = "ACTIVATING"
DEACTIVATING = "DEACTIVATING"
OFF = "OFF"
class ServiceDnsRecord(BaseModel):
type: str
name: str
content: str
ttl: int
display_name: str
priority: typing.Optional[int] = None
class Service(ABC):
"""
Service here is some software that is hosted on the server and
@ -78,14 +67,22 @@ class Service(ABC):
@staticmethod
@abstractmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""
The url of the service if it is accessible from the internet browser.
"""
pass
@staticmethod
@abstractmethod
def get_subdomain() -> Optional[str]:
"""
The assigned primary subdomain for this service.
"""
pass
@classmethod
def get_user(cls) -> typing.Optional[str]:
def get_user(cls) -> Optional[str]:
"""
The user that owns the service's files.
Defaults to the service's id.
@ -93,7 +90,7 @@ class Service(ABC):
return cls.get_id()
@classmethod
def get_group(cls) -> typing.Optional[str]:
def get_group(cls) -> Optional[str]:
"""
The group that owns the service's files.
Defaults to the service's user.
@ -209,10 +206,32 @@ class Service(ABC):
storage_used += get_storage_usage(folder)
return storage_used
@staticmethod
@abstractmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
pass
@classmethod
def get_dns_records(cls, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]:
subdomain = cls.get_subdomain()
display_name = cls.get_display_name()
if subdomain is None:
return []
dns_records = [
ServiceDnsRecord(
type="A",
name=subdomain,
content=ip4,
ttl=3600,
display_name=display_name,
)
]
if ip6 is not None:
dns_records.append(
ServiceDnsRecord(
type="AAAA",
name=subdomain,
content=ip6,
ttl=3600,
display_name=f"{display_name} (IPv6)",
)
)
return dns_records
@classmethod
def get_drive(cls) -> str:
@ -237,7 +256,7 @@ class Service(ABC):
return root_device
@classmethod
def get_folders(cls) -> typing.List[str]:
def get_folders(cls) -> List[str]:
"""
get a plain list of occupied directories
Default extracts info from overriden get_owned_folders()
@ -249,7 +268,7 @@ class Service(ABC):
return [owned_folder.path for owned_folder in cls.get_owned_folders()]
@classmethod
def get_owned_folders(cls) -> typing.List[OwnedPath]:
def get_owned_folders(cls) -> List[OwnedPath]:
"""
Get a list of occupied directories with ownership info
Default extracts info from overriden get_folders()
@ -264,19 +283,137 @@ class Service(ABC):
def get_foldername(path: str) -> str:
return path.split("/")[-1]
@abstractmethod
def move_to_volume(self, volume: BlockDevice) -> Job:
"""Cannot raise errors.
Returns errors as an errored out Job instead."""
pass
# TODO: with better json utils, it can be one line, and not a separate function
@classmethod
def set_location(cls, volume: BlockDevice):
"""
Only changes userdata
"""
service_id = cls.get_id()
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if service_id not in user_data["modules"]:
user_data["modules"][service_id] = {}
user_data["modules"][service_id]["location"] = volume.name
def binds(self) -> List[Bind]:
owned_folders = self.get_owned_folders()
return [
Bind.from_owned_path(folder, self.get_drive()) for folder in owned_folders
]
def assert_can_move(self, new_volume):
"""
Checks if the service can be moved to new volume
Raises errors if it cannot
"""
service_name = self.get_display_name()
if not self.is_movable():
raise MoveError(f"{service_name} is not movable")
with ReadUserData() as user_data:
if not user_data.get("useBinds", False):
raise MoveError("Server is not using binds.")
current_volume_name = self.get_drive()
if current_volume_name == new_volume.name:
raise MoveError(f"{service_name} is already on volume {new_volume}")
check_volume(new_volume, space_needed=self.get_storage_usage())
binds = self.binds()
if binds == []:
raise MoveError("nothing to move")
check_binds(current_volume_name, binds)
def do_move_to_volume(
self,
new_volume: BlockDevice,
job: Job,
):
"""
Move a service to another volume.
Note: It may be much simpler to write it per bind, but a bit less safe?
"""
service_name = self.get_display_name()
binds = self.binds()
report_progress(10, job, "Unmounting folders from old volume...")
unbind_folders(binds)
report_progress(20, job, "Moving data to new volume...")
binds = move_data_to_volume(binds, new_volume, job)
report_progress(70, job, f"Making sure {service_name} owns its files...")
try:
ensure_folder_ownership(binds)
except Exception as error:
# We have logged it via print and we additionally log it here in the error field
# We are continuing anyway but Job has no warning field
Jobs.update(
job,
JobStatus.RUNNING,
error=f"Service {service_name} will not be able to write files: "
+ str(error),
)
report_progress(90, job, f"Mounting {service_name} data...")
bind_folders(binds)
report_progress(95, job, f"Finishing moving {service_name}...")
self.set_location(new_volume)
def move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
service_name = self.get_display_name()
report_progress(0, job, "Performing pre-move checks...")
self.assert_can_move(volume)
report_progress(5, job, f"Stopping {service_name}...")
assert self is not None
with StoppedService(self):
report_progress(9, job, "Stopped service, starting the move...")
self.do_move_to_volume(volume, job)
report_progress(98, job, "Move complete, rebuilding...")
rebuild_system(job, upgrade=False)
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)
return job
@classmethod
def owned_path(cls, path: str):
"""A default guess on folder ownership"""
"""Default folder ownership"""
service_name = cls.get_display_name()
try:
owner = cls.get_user()
if owner is None:
# TODO: assume root?
# (if we do not want to do assumptions, maybe not declare user optional?)
raise LookupError(f"no user for service: {service_name}")
group = cls.get_group()
if group is None:
raise LookupError(f"no group for service: {service_name}")
except Exception as error:
raise LookupError(
f"when deciding a bind for folder {path} of service {service_name}, error: {str(error)}"
)
return OwnedPath(
path=path,
owner=cls.get_user(),
group=cls.get_group(),
owner=owner,
group=group,
)
def pre_backup(self):

View File

@ -0,0 +1,22 @@
from selfprivacy_api.services import Service
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import Job, Jobs, JobStatus
@huey.task()
def move_service(service: Service, new_volume: BlockDevice, job: Job) -> bool:
"""
Move service's folders to new physical volume
Does not raise exceptions (we cannot handle exceptions from tasks).
Reports all errors via job.
"""
try:
service.move_to_volume(new_volume, job)
except Exception as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=type(e).__name__ + " " + str(e),
)
return True

View File

@ -8,11 +8,9 @@ from os import path
# from enum import Enum
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.generic_service_mover import move_service, FolderMoveNames
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
@ -65,6 +63,10 @@ class DummyService(Service):
domain = "test.com"
return f"https://password.{domain}"
@staticmethod
def get_subdomain() -> typing.Optional[str]:
return "password"
@classmethod
def is_movable(cls) -> bool:
return cls.movable
@ -86,7 +88,7 @@ class DummyService(Service):
@classmethod
def set_status(cls, status: ServiceStatus):
with open(cls.status_file(), "w") as file:
status_string = file.write(status.value)
file.write(status.value)
@classmethod
def get_status(cls) -> ServiceStatus:
@ -99,16 +101,17 @@ class DummyService(Service):
cls, new_status: ServiceStatus, delay_sec: float
):
"""simulating a delay on systemd side"""
status_file = cls.status_file()
if delay_sec == 0:
cls.set_status(new_status)
return
status_file = cls.status_file()
command = [
"bash",
"-c",
f" sleep {delay_sec} && echo {new_status.value} > {status_file}",
]
handle = subprocess.Popen(command)
if delay_sec == 0:
handle.communicate()
subprocess.Popen(command)
@classmethod
def set_backuppable(cls, new_value: bool) -> None:
@ -185,43 +188,9 @@ class DummyService(Service):
def get_folders(cls) -> List[str]:
return cls.folders
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
"""Return list of DNS records for Bitwarden service."""
return [
ServiceDnsRecord(
type="A",
name="password",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Test Service",
),
ServiceDnsRecord(
type="AAAA",
name="password",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Test Service (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id=f"services.{self.get_id()}.move",
name=f"Move {self.get_display_name()}",
description=f"Moving {self.get_display_name()} data to {volume.name}",
)
def do_move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
if self.simulate_moving is False:
# completely generic code, TODO: make it the default impl.
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
self.get_id(),
)
return super(DummyService, self).do_move_to_volume(volume, job)
else:
Jobs.update(job, status=JobStatus.FINISHED)
self.set_drive(volume.name)
return job
self.set_drive(volume.name)
return job

View File

@ -1,4 +1,14 @@
from os import environ
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.backup.tasks import *
from selfprivacy_api.services.generic_service_mover import move_service
from selfprivacy_api.services.tasks import move_service
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.jobs.nix_collect_garbage import calculate_and_clear_dead_paths
if environ.get("TEST_MODE"):
from tests.test_huey import sum

View File

@ -4,6 +4,8 @@ import subprocess
import json
import typing
from pydantic import BaseModel
from selfprivacy_api.utils import WriteUserData
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
@ -169,6 +171,9 @@ class BlockDevice:
return False
# TODO: SingletonMetaclass messes with tests and is able to persist state
# between them. If you have very weird test crosstalk that's probably why
# I am not sure it NEEDS to be SingletonMetaclass
class BlockDevices(metaclass=SingletonMetaclass):
"""Singleton holding all Block devices"""

View File

@ -1,16 +1,24 @@
"""MiniHuey singleton."""
import os
from huey import SqliteHuey
from os import environ
from huey import RedisHuey
from selfprivacy_api.utils.redis_pool import RedisPool
HUEY_DATABASE_NUMBER = 10
def immediate() -> bool:
if environ.get("HUEY_QUEUES_FOR_TESTS"):
return False
if environ.get("TEST_MODE"):
return True
return False
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
# Singleton instance containing the huey database.
test_mode = os.environ.get("TEST_MODE")
huey = SqliteHuey(
huey = RedisHuey(
"selfprivacy-api",
filename=HUEY_DATABASE if not test_mode else None,
immediate=test_mode == "true",
url=RedisPool.connection_url(dbnumber=HUEY_DATABASE_NUMBER),
immediate=immediate(),
utc=True,
)

View File

@ -2,6 +2,7 @@
"""Network utils"""
import subprocess
import re
import ipaddress
from typing import Optional
@ -17,13 +18,15 @@ def get_ip4() -> str:
return ip4.group(1) if ip4 else ""
def get_ip6() -> str:
def get_ip6() -> Optional[str]:
"""Get IPv6 address"""
try:
ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode(
"utf-8"
)
ip6 = re.search(r"inet6 (\S+)\/\d+", ip6)
ip6_addresses = subprocess.check_output(
["ip", "addr", "show", "dev", "eth0"]
).decode("utf-8")
ip6_addresses = re.findall(r"inet6 (\S+)\/\d+", ip6_addresses)
for address in ip6_addresses:
if ipaddress.IPv6Address(address).is_global:
return address
except subprocess.CalledProcessError:
ip6 = None
return ip6.group(1) if ip6 else ""
return None

View File

@ -1,8 +1,8 @@
"""
Redis pool module for selfprivacy_api
"""
from os import environ
import redis
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
REDIS_SOCKET = "/run/redis-sp-api/redis.sock"
@ -14,20 +14,20 @@ class RedisPool(metaclass=SingletonMetaclass):
"""
def __init__(self):
if "USE_REDIS_PORT" in environ:
self._pool = redis.ConnectionPool(
host="127.0.0.1",
port=int(environ["USE_REDIS_PORT"]),
decode_responses=True,
)
else:
self._pool = redis.ConnectionPool.from_url(
f"unix://{REDIS_SOCKET}",
decode_responses=True,
)
self._pool = redis.ConnectionPool.from_url(
RedisPool.connection_url(dbnumber=0),
decode_responses=True,
)
self._pubsub_connection = self.get_connection()
@staticmethod
def connection_url(dbnumber: int) -> str:
"""
redis://[[username]:[password]]@localhost:6379/0
unix://[username@]/path/to/socket.sock?db=0[&password=password]
"""
return f"unix://{REDIS_SOCKET}?db={dbnumber}"
def get_connection(self):
"""
Get a connection from the pool.

View File

@ -1,16 +1,17 @@
"""Generic service status fetcher using systemctl"""
import subprocess
from typing import List
from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.models.services import ServiceStatus
def get_service_status(service: str) -> ServiceStatus:
def get_service_status(unit: str) -> ServiceStatus:
"""
Return service status from systemd.
Use systemctl show to get the status of a service.
Get ActiveState from the output.
"""
service_status = subprocess.check_output(["systemctl", "show", service])
service_status = subprocess.check_output(["systemctl", "show", unit])
if b"LoadState=not-found" in service_status:
return ServiceStatus.OFF
if b"ActiveState=active" in service_status:
@ -58,3 +59,24 @@ def get_service_status_from_several_units(services: list[str]) -> ServiceStatus:
if ServiceStatus.ACTIVE in service_statuses:
return ServiceStatus.ACTIVE
return ServiceStatus.OFF
def get_last_log_lines(service: str, lines_count: int) -> List[str]:
if lines_count < 1:
raise ValueError("lines_count must be greater than 0")
try:
logs = subprocess.check_output(
[
"journalctl",
"-u",
service,
"-n",
str(lines_count),
"-o",
"cat",
],
shell=False,
).decode("utf-8")
return logs.splitlines()
except subprocess.CalledProcessError:
return []

2
setup.py Executable file → Normal file
View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="selfprivacy_api",
version="3.0.1",
version="3.1.0",
packages=find_packages(),
scripts=[
"selfprivacy_api/app.py",

4
sync-nixpkgs.sh Executable file
View File

@ -0,0 +1,4 @@
#!/usr/bin/bash
# sync the version of nixpkgs used in the repo with one set in nixos-config
nix flake lock --override-input nixpkgs nixpkgs --inputs-from 'git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes'

View File

@ -2,21 +2,23 @@ import json
from datetime import datetime, timezone, timedelta
from mnemonic import Mnemonic
from selfprivacy_api.jobs import Job, JobStatus
# for expiration tests. If headache, consider freezegun
RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
def ten_minutes_into_future_naive():
return datetime.now() + timedelta(minutes=10)
def ten_hours_into_future_naive():
return datetime.now() + timedelta(hours=10)
def ten_minutes_into_future_naive_utc():
return datetime.utcnow() + timedelta(minutes=10)
def ten_hours_into_future_naive_utc():
return datetime.utcnow() + timedelta(hours=10)
def ten_minutes_into_future():
return datetime.now(timezone.utc) + timedelta(minutes=10)
def ten_hours_into_future():
return datetime.now(timezone.utc) + timedelta(hours=10)
def ten_minutes_into_past_naive():
@ -34,11 +36,11 @@ def ten_minutes_into_past():
class NearFuture(datetime):
@classmethod
def now(cls, tz=None):
return datetime.now(tz) + timedelta(minutes=13)
return datetime.now(tz) + timedelta(hours=13)
@classmethod
def utcnow(cls):
return datetime.utcnow() + timedelta(minutes=13)
return datetime.utcnow() + timedelta(hours=13)
def read_json(file_path):
@ -79,3 +81,12 @@ def assert_recovery_recent(time_generated: str):
assert datetime.fromisoformat(time_generated) - timedelta(seconds=5) < datetime.now(
timezone.utc
)
def assert_job_errored(job: Job):
assert job is not None
assert job.status == JobStatus.ERROR
# consider adding a useful error message to an errored-out job
assert job.error is not None
assert job.error != ""

View File

@ -4,6 +4,7 @@
import os
import pytest
import datetime
import subprocess
from os import path
from os import makedirs
@ -98,23 +99,14 @@ def generic_userdata(mocker, tmpdir):
@pytest.fixture
def huey_database(mocker, shared_datadir):
"""Mock huey database."""
mock = mocker.patch(
"selfprivacy_api.utils.huey.HUEY_DATABASE", shared_datadir / "huey.db"
)
return mock
@pytest.fixture
def client(huey_database, redis_repo_with_tokens):
def client(redis_repo_with_tokens):
from selfprivacy_api.app import app
return TestClient(app)
@pytest.fixture
def authorized_client(huey_database, redis_repo_with_tokens):
def authorized_client(redis_repo_with_tokens):
"""Authorized test client fixture."""
from selfprivacy_api.app import app
@ -126,7 +118,7 @@ def authorized_client(huey_database, redis_repo_with_tokens):
@pytest.fixture
def wrong_auth_client(huey_database, redis_repo_with_tokens):
def wrong_auth_client(redis_repo_with_tokens):
"""Wrong token test client fixture."""
from selfprivacy_api.app import app
@ -136,7 +128,19 @@ def wrong_auth_client(huey_database, redis_repo_with_tokens):
@pytest.fixture()
def raw_dummy_service(tmpdir):
def volume_folders(tmpdir, mocker):
volumes_dir = path.join(tmpdir, "volumes")
makedirs(volumes_dir)
volumenames = ["sda1", "sda2"]
for d in volumenames:
service_dir = path.join(volumes_dir, d)
makedirs(service_dir)
mock = mocker.patch("selfprivacy_api.services.owned_path.VOLUMES_PATH", volumes_dir)
@pytest.fixture()
def raw_dummy_service(tmpdir) -> DummyService:
dirnames = ["test_service", "also_test_service"]
service_dirs = []
for d in dirnames:
@ -161,11 +165,38 @@ def raw_dummy_service(tmpdir):
return service
def ensure_user_exists(user: str):
try:
output = subprocess.check_output(
["useradd", "-U", user], stderr=subprocess.PIPE, shell=False
)
except subprocess.CalledProcessError as error:
if b"already exists" not in error.stderr:
raise error
try:
output = subprocess.check_output(
["useradd", user], stderr=subprocess.PIPE, shell=False
)
except subprocess.CalledProcessError as error:
assert b"already exists" in error.stderr
return
raise ValueError("could not create user", user)
@pytest.fixture()
def dummy_service(
tmpdir, raw_dummy_service, generic_userdata
) -> Generator[Service, None, None]:
service = raw_dummy_service
user = service.get_user()
# TODO: use create_user from users actions. But it will need NIXOS to be there
# and react to our changes to files.
# from selfprivacy_api.actions.users import create_user
# create_user(user, "yay, it is me")
ensure_user_exists(user)
# register our service
services.services.append(service)

View File

@ -14,11 +14,11 @@ from selfprivacy_api.graphql.common_types.backup import (
from selfprivacy_api.backup import Backups, Snapshot
from selfprivacy_api.backup.tasks import (
prune_autobackup_snapshots,
automatic_backup,
do_autobackup,
)
from selfprivacy_api.backup.jobs import autobackup_job_type
from tests.test_backup import backups
from tests.test_backup import backups, assert_job_finished
from tests.test_graphql.test_services import only_dummy_service
@ -74,6 +74,7 @@ def test_autobackup_taskbody(backups, only_dummy_service):
backup_period = 13 # minutes
assert Backups.get_all_snapshots() == []
assert_job_finished(autobackup_job_type(), count=0)
Backups.set_autobackup_period_minutes(backup_period)
assert Backups.is_time_to_backup_service(dummy_service, now)
@ -88,6 +89,8 @@ def test_autobackup_taskbody(backups, only_dummy_service):
assert snapshots[0].service_name == dummy_service.get_id()
assert snapshots[0].reason == BackupReason.AUTO
assert_job_finished(autobackup_job_type(), count=1)
def test_autobackup_timer_periods(backups, dummy_service):
now = datetime.now(timezone.utc)

View File

@ -14,13 +14,14 @@ from selfprivacy_api.utils.huey import huey
from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.graphql.queries.providers import BackupProvider as ProviderEnum
from selfprivacy_api.graphql.common_types.backup import (
RestoreStrategy,
BackupReason,
)
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot
@ -38,6 +39,10 @@ from selfprivacy_api.backup.tasks import (
reload_snapshot_cache,
)
from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.local_secret import LocalBackupSecret
from selfprivacy_api.backup.jobs import get_backup_fail
from tests.common import assert_job_errored
REPO_NAME = "test_backup"
@ -188,6 +193,78 @@ def test_backup_service(dummy_service, backups):
assert_job_finished(f"services.{id}.backup", count=1)
def all_job_text(job: Job) -> str:
# Use when we update to pydantic 2.xxx
# return Job.model_dump_json()
result = ""
if job.status_text is not None:
result += job.status_text
if job.description is not None:
result += job.description
if job.error is not None:
result += job.error
return result
def test_error_censoring_encryptionkey(dummy_service, backups):
# Discard our key to inject a failure
old_key = LocalBackupSecret.get()
LocalBackupSecret.reset()
new_key = LocalBackupSecret.get()
with pytest.raises(ValueError):
# Should fail without correct key
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert old_key not in job_text
assert new_key not in job_text
# local backups do not have login key
# assert Backups.provider().key not in job_text
assert "CENSORED" in job_text
def test_error_censoring_loginkey(dummy_service, backups, fp):
# We do not want to screw up our teardown
old_provider = Backups.provider()
secret = "aSecretNYA"
Backups.set_provider(
ProviderEnum.BACKBLAZE, login="meow", key=secret, location="moon"
)
assert Backups.provider().key == secret
# We could have called real backblaze but it is kind of not privacy so.
fp.allow_unregistered(True)
fp.register(
["restic", fp.any()],
returncode=1,
stdout="only real cats are allowed",
# We do not want to suddenly call real backblaze even if code changes
occurrences=100,
)
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert secret not in job_text
assert job_text.count("CENSORED") == 2
# We do not want to screw up our teardown
Storage.store_provider(old_provider)
def test_no_repo(memory_backup):
with pytest.raises(ValueError):
assert memory_backup.backupper.get_snapshots() == []

92
tests/test_binds.py Normal file
View File

@ -0,0 +1,92 @@
import pytest
from os import mkdir, rmdir
from os.path import join, exists
from tests.conftest import ensure_user_exists
from tests.test_graphql.test_services import mock_lsblk_devices
from selfprivacy_api.services.owned_path import Bind, BindError
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.utils.waitloop import wait_until_true
BINDTESTS_USER = "binduser"
TESTFILE_CONTENTS = "testissimo"
TESTFILE_NAME = "testfile"
@pytest.fixture()
def bind_user():
ensure_user_exists(BINDTESTS_USER)
return BINDTESTS_USER
def prepare_test_bind(tmpdir, bind_user) -> Bind:
test_binding_name = "bindy_dir"
binding_path = join(tmpdir, test_binding_name)
drive = BlockDevices().get_block_device("sda2")
assert drive is not None
bind = Bind(
binding_path=binding_path, owner=bind_user, group=bind_user, drive=drive
)
source_dir = bind.location_at_volume()
mkdir(source_dir)
mkdir(binding_path)
testfile_path = join(source_dir, TESTFILE_NAME)
with open(testfile_path, "w") as file:
file.write(TESTFILE_CONTENTS)
return bind
def test_bind_unbind(volume_folders, tmpdir, bind_user, mock_lsblk_devices):
bind = prepare_test_bind(tmpdir, bind_user)
bind.ensure_ownership()
bind.validate()
testfile_path = join(bind.location_at_volume(), TESTFILE_NAME)
assert exists(testfile_path)
with open(testfile_path, "r") as file:
assert file.read() == TESTFILE_CONTENTS
bind.bind()
testfile_binding_path = join(bind.binding_path, TESTFILE_NAME)
assert exists(testfile_path)
with open(testfile_path, "r") as file:
assert file.read() == TESTFILE_CONTENTS
bind.unbind()
# wait_until_true(lambda : not exists(testfile_binding_path), timeout_sec=2)
assert not exists(testfile_binding_path)
assert exists(bind.binding_path)
def test_bind_nonexistent_target(volume_folders, tmpdir, bind_user, mock_lsblk_devices):
bind = prepare_test_bind(tmpdir, bind_user)
bind.ensure_ownership()
bind.validate()
rmdir(bind.binding_path)
with pytest.raises(BindError):
bind.bind()
def test_unbind_nonexistent_target(
volume_folders, tmpdir, bind_user, mock_lsblk_devices
):
bind = prepare_test_bind(tmpdir, bind_user)
bind.ensure_ownership()
bind.validate()
bind.bind()
bind.binding_path = "/bogus"
with pytest.raises(BindError):
bind.unbind()

View File

@ -410,6 +410,7 @@ def lsblk_full_mock(mocker):
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=FULL_LSBLK_OUTPUT
)
BlockDevices().update()
return mock

View File

@ -14,9 +14,9 @@ from tests.common import (
)
# Graphql API's output should be timezone-naive
from tests.common import ten_minutes_into_future_naive_utc as ten_minutes_into_future
from tests.common import ten_minutes_into_future as ten_minutes_into_future_tz
from tests.common import ten_minutes_into_past_naive_utc as ten_minutes_into_past
from tests.common import ten_hours_into_future_naive_utc as ten_hours_into_future
from tests.common import ten_hours_into_future as ten_hours_into_future_tz
from tests.common import ten_minutes_into_past_naive_utc as ten_hours_into_past
from tests.test_graphql.common import (
assert_empty,
@ -168,7 +168,7 @@ def test_graphql_generate_recovery_key(client, authorized_client):
@pytest.mark.parametrize(
"expiration_date", [ten_minutes_into_future(), ten_minutes_into_future_tz()]
"expiration_date", [ten_hours_into_future(), ten_hours_into_future_tz()]
)
def test_graphql_generate_recovery_key_with_expiration_date(
client, authorized_client, expiration_date: datetime
@ -193,7 +193,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mocker):
expiration_date = ten_minutes_into_future()
expiration_date = ten_hours_into_future()
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
# Timewarp to after it expires
@ -219,7 +219,7 @@ def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mo
def test_graphql_generate_recovery_key_with_expiration_in_the_past(authorized_client):
expiration_date = ten_minutes_into_past()
expiration_date = ten_hours_into_past()
response = request_make_new_recovery_key(
authorized_client, expires_at=expiration_date
)

View File

@ -0,0 +1,268 @@
import pytest
class BlockDeviceMockReturnTrue:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return True
def unmount(self):
return True
def resize(self):
return True
returncode = 0
class BlockDevicesMockReturnTrue:
def get_block_device(name: str): # type: ignore
return BlockDeviceMockReturnTrue()
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
class BlockDevicesMockReturnNone:
def get_block_device(name: str): # type: ignore
return None
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
@pytest.fixture
def mock_block_devices_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.mutations.storage_mutations.BlockDevices",
# "selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnTrue,
)
return mock
API_RESIZE_VOLUME_MUTATION = """
mutation resizeVolume($name: String!) {
resizeVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_resize_volume_unauthorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_resize_volume_nonexistent_block_device(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 404
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is False
def test_graphql_resize_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 200
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is True
API_MOUNT_VOLUME_MUTATION = """
mutation mountVolume($name: String!) {
mountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_mount_volume_unauthorized_client(
client, mock_block_device_return_true
):
response = client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_mount_already_mounted_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_not_found_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 200
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is True
API_UNMOUNT_VOLUME_MUTATION = """
mutation unmountVolume($name: String!) {
unmountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_unmount_volume_unauthorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_unmount_not_found_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 200
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is True

View File

@ -0,0 +1,229 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs
from tests.test_graphql.common import (
get_data,
assert_ok,
assert_empty,
)
from selfprivacy_api.jobs.nix_collect_garbage import (
get_dead_packages,
parse_line,
ShellException,
)
OUTPUT_PRINT_DEAD = """
finding garbage collector roots...
determining live/dead paths...
/nix/store/02k8pmw00p7p7mf2dg3n057771w7liia-python3.10-cchardet-2.1.7
/nix/store/03vc6dznx8njbvyd3gfhfa4n5j4lvhbl-python3.10-async-timeout-4.0.2
/nix/store/03ybv2dvfk7c3cpb527y5kzf6i35ch41-python3.10-pycparser-2.21
/nix/store/04dn9slfqwhqisn1j3jv531lms9w5wlj-python3.10-hypothesis-6.50.1.drv
/nix/store/04hhx2z1iyi3b48hxykiw1g03lp46jk7-python-remove-bin-bytecode-hook
"""
OUTPUT_COLLECT_GARBAGE = """
removing old generations of profile /nix/var/nix/profiles/per-user/def/channels
finding garbage collector roots...
deleting garbage...
deleting '/nix/store/02k8pmw00p7p7mf2dg3n057771w7liia-python3.10-cchardet-2.1.7'
deleting '/nix/store/03vc6dznx8njbvyd3gfhfa4n5j4lvhbl-python3.10-async-timeout-4.0.2'
deleting '/nix/store/03ybv2dvfk7c3cpb527y5kzf6i35ch41-python3.10-pycparser-2.21'
deleting '/nix/store/04dn9slfqwhqisn1j3jv531lms9w5wlj-python3.10-hypothesis-6.50.1.drv'
deleting '/nix/store/04hhx2z1iyi3b48hxykiw1g03lp46jk7-python-remove-bin-bytecode-hook'
deleting unused links...
note: currently hard linking saves -0.00 MiB
190 store paths deleted, 425.51 MiB freed
"""
OUTPUT_COLLECT_GARBAGE_ZERO_TRASH = """
removing old generations of profile /nix/var/nix/profiles/per-user/def/profile
removing old generations of profile /nix/var/nix/profiles/per-user/def/channels
finding garbage collector roots...
deleting garbage...
deleting unused links...
note: currently hard linking saves 0.00 MiB
0 store paths deleted, 0.00 MiB freed
"""
# ---
def test_parse_line():
txt = "note: currently hard linking saves -0.00 MiB 190 store paths deleted, 425.51 MiB freed"
job = Jobs.add(
name="name",
type_id="parse_line",
description="description",
)
output = parse_line(job, txt)
assert output.result == "425.51 MiB have been cleared"
assert output.status == JobStatus.FINISHED
assert output.error is None
def test_parse_line_with_blank_line():
txt = ""
job = Jobs.add(
name="name",
type_id="parse_line",
description="description",
)
with pytest.raises(ShellException):
output = parse_line(job, txt)
def test_get_dead_packages():
assert get_dead_packages(OUTPUT_PRINT_DEAD) == (5, 20.0)
def test_get_dead_packages_zero():
assert get_dead_packages("") == (0, 0)
RUN_NIX_COLLECT_GARBAGE_MUTATION = """
mutation CollectGarbage {
system {
nixCollectGarbage {
success
message
code
job {
uid,
typeId,
name,
description,
status,
statusText,
progress,
createdAt,
updatedAt,
finishedAt,
error,
result,
}
}
}
}
"""
def test_graphql_nix_collect_garbage(authorized_client, fp):
assert huey.immediate is True
fp.register(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
stdout="",
)
fp.register(["nix-store", "--gc", "--print-dead"], stdout=OUTPUT_PRINT_DEAD)
fp.register(["nix-store", "--gc"], stdout=OUTPUT_COLLECT_GARBAGE)
response = authorized_client.post(
"/graphql",
json={
"query": RUN_NIX_COLLECT_GARBAGE_MUTATION,
},
)
output = get_data(response)["system"]["nixCollectGarbage"]
assert_ok(output)
assert output["job"] is not None
assert output["job"]["status"] == "FINISHED"
assert output["job"]["error"] is None
assert (
fp.call_count(
[
"nix-env",
"-p",
"/nix/var/nix/profiles/system",
"--delete-generations old",
]
)
== 1
)
assert fp.call_count(["nix-store", "--gc", "--print-dead"]) == 1
assert fp.call_count(["nix-store", "--gc"]) == 1
def test_graphql_nix_collect_garbage_return_zero_trash(authorized_client, fp):
assert huey.immediate is True
fp.register(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
stdout="",
)
fp.register(["nix-store", "--gc", "--print-dead"], stdout=OUTPUT_PRINT_DEAD)
fp.register(["nix-store", "--gc"], stdout=OUTPUT_COLLECT_GARBAGE_ZERO_TRASH)
response = authorized_client.post(
"/graphql",
json={
"query": RUN_NIX_COLLECT_GARBAGE_MUTATION,
},
)
output = get_data(response)["system"]["nixCollectGarbage"]
assert_ok(output)
assert output["job"] is not None
assert output["job"]["status"] == "FINISHED"
assert output["job"]["error"] is None
assert (
fp.call_count(
[
"nix-env",
"-p",
"/nix/var/nix/profiles/system",
"--delete-generations old",
]
)
== 1
)
assert fp.call_count(["nix-store", "--gc", "--print-dead"]) == 1
assert fp.call_count(["nix-store", "--gc"]) == 1
def test_graphql_nix_collect_garbage_not_authorized_client(client, fp):
assert huey.immediate is True
fp.register(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
stdout="",
)
fp.register(["nix-store", "--gc", "--print-dead"], stdout=OUTPUT_PRINT_DEAD)
fp.register(["nix-store", "--gc"], stdout=OUTPUT_COLLECT_GARBAGE)
response = client.post(
"/graphql",
json={
"query": RUN_NIX_COLLECT_GARBAGE_MUTATION,
},
)
assert_empty(response)
assert (
fp.call_count(
[
"nix-env",
"-p",
"/nix/var/nix/profiles/system",
"--delete-generations old",
]
)
== 0
)
assert fp.call_count(["nix-store", "--gc", "--print-dead"]) == 0
assert fp.call_count(["nix-store", "--gc"]) == 0

View File

@ -1,5 +1,8 @@
import pytest
import shutil
from typing import Generator
from os import mkdir
from selfprivacy_api.utils.block_devices import BlockDevices
@ -10,6 +13,74 @@ from selfprivacy_api.services.test_service import DummyService
from tests.common import generate_service_query
from tests.test_graphql.common import assert_empty, assert_ok, get_data
from tests.test_graphql.test_system_nixos_tasks import prepare_nixos_rebuild_calls
LSBLK_BLOCKDEVICES_DICTS = [
{
"name": "sda1",
"path": "/dev/sda1",
"fsavail": "4614107136",
"fssize": "19814920192",
"fstype": "ext4",
"fsused": "14345314304",
"mountpoints": ["/nix/store", "/"],
"label": None,
"uuid": "ec80c004-baec-4a2c-851d-0e1807135511",
"size": 20210236928,
"model": None,
"serial": None,
"type": "part",
},
{
"name": "sda2",
"path": "/dev/sda2",
"fsavail": "4614107136",
"fssize": "19814920192",
"fstype": "ext4",
"fsused": "14345314304",
"mountpoints": ["/home"],
"label": None,
"uuid": "deadbeef-baec-4a2c-851d-0e1807135511",
"size": 20210236928,
"model": None,
"serial": None,
"type": "part",
},
]
@pytest.fixture()
def mock_lsblk_devices(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices.lsblk_device_dicts",
autospec=True,
return_value=LSBLK_BLOCKDEVICES_DICTS,
)
BlockDevices().update()
assert BlockDevices().lsblk_device_dicts() == LSBLK_BLOCKDEVICES_DICTS
devices = BlockDevices().get_block_devices()
assert len(devices) == 2
names = [device.name for device in devices]
assert "sda1" in names
assert "sda2" in names
return mock
@pytest.fixture()
def dummy_service_with_binds(dummy_service, mock_lsblk_devices, volume_folders):
binds = dummy_service.binds()
for bind in binds:
path = bind.binding_path
shutil.move(bind.binding_path, bind.location_at_volume())
mkdir(bind.binding_path)
bind.ensure_ownership()
bind.validate()
bind.bind()
return dummy_service
@pytest.fixture()
@ -23,6 +94,16 @@ def only_dummy_service(dummy_service) -> Generator[DummyService, None, None]:
service_module.services.extend(back_copy)
@pytest.fixture()
def mock_check_volume(mocker):
mock = mocker.patch(
"selfprivacy_api.services.service.check_volume",
autospec=True,
return_value=None,
)
return mock
API_START_MUTATION = """
mutation TestStartService($service_id: String!) {
services {
@ -465,23 +546,36 @@ def test_disable_enable(authorized_client, only_dummy_service):
def test_move_immovable(authorized_client, only_dummy_service):
dummy_service = only_dummy_service
dummy_service.set_movable(False)
mutation_response = api_move(authorized_client, dummy_service, "sda1")
root = BlockDevices().get_root_block_device()
mutation_response = api_move(authorized_client, dummy_service, root.name)
data = get_data(mutation_response)["services"]["moveService"]
assert_errorcode(data, 400)
try:
assert "not movable" in data["message"]
except AssertionError:
raise ValueError("wrong type of error?: ", data["message"])
# is there a meaning in returning the service in this?
assert data["service"] is not None
assert data["job"] is None
def test_move_no_such_service(authorized_client, only_dummy_service):
mutation_response = api_move_by_name(authorized_client, "bogus_service", "sda1")
data = get_data(mutation_response)["services"]["moveService"]
assert_errorcode(data, 404)
assert data["service"] is None
assert data["job"] is None
def test_move_no_such_volume(authorized_client, only_dummy_service):
dummy_service = only_dummy_service
mutation_response = api_move(authorized_client, dummy_service, "bogus_volume")
data = get_data(mutation_response)["services"]["moveService"]
assert_notfound(data)
# is there a meaning in returning the service in this?
assert data["service"] is not None
assert data["service"] is None
assert data["job"] is None
@ -499,7 +593,66 @@ def test_move_same_volume(authorized_client, dummy_service):
# is there a meaning in returning the service in this?
assert data["service"] is not None
assert data["job"] is not None
# We do not create a job if task is not created
assert data["job"] is None
def test_graphql_move_service_without_folders_on_old_volume(
authorized_client,
generic_userdata,
mock_lsblk_devices,
dummy_service: DummyService,
):
target = "sda1"
BlockDevices().update()
assert BlockDevices().get_block_device(target) is not None
dummy_service.set_simulated_moves(False)
dummy_service.set_drive("sda2")
mutation_response = api_move(authorized_client, dummy_service, target)
data = get_data(mutation_response)["services"]["moveService"]
assert_errorcode(data, 400)
assert "sda2/test_service is not found" in data["message"]
def test_graphql_move_service(
authorized_client, generic_userdata, mock_check_volume, dummy_service_with_binds, fp
):
dummy_service = dummy_service_with_binds
origin = "sda1"
target = "sda2"
assert BlockDevices().get_block_device(target) is not None
assert BlockDevices().get_block_device(origin) is not None
dummy_service.set_drive(origin)
dummy_service.set_simulated_moves(False)
unit_name = "sp-nixos-rebuild.service"
rebuild_command = ["systemctl", "start", unit_name]
prepare_nixos_rebuild_calls(fp, unit_name)
# We will be mounting and remounting folders
mount_command = ["mount", fp.any()]
unmount_command = ["umount", fp.any()]
fp.pass_command(mount_command, 2)
fp.pass_command(unmount_command, 2)
# We will be changing ownership
chown_command = ["chown", fp.any()]
fp.pass_command(chown_command, 2)
mutation_response = api_move(authorized_client, dummy_service, target)
data = get_data(mutation_response)["services"]["moveService"]
assert_ok(data)
assert data["service"] is not None
assert fp.call_count(rebuild_command) == 1
assert fp.call_count(mount_command) == 2
assert fp.call_count(unmount_command) == 2
assert fp.call_count(chown_command) == 2
def test_mailservice_cannot_enable_disable(authorized_client):

View File

@ -3,6 +3,9 @@
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.jobs import JobStatus, Jobs
from tests.test_graphql.common import assert_empty, assert_ok, get_data
class ProcessMock:
"""Mock subprocess.Popen"""
@ -37,6 +40,13 @@ def mock_subprocess_check_output(mocker):
return mock
@pytest.fixture
def mock_sleep_intervals(mocker):
mock_start = mocker.patch("selfprivacy_api.jobs.upgrade_system.START_INTERVAL", 0)
mock_run = mocker.patch("selfprivacy_api.jobs.upgrade_system.RUN_INTERVAL", 0)
return (mock_start, mock_run)
API_REBUILD_SYSTEM_MUTATION = """
mutation rebuildSystem {
system {
@ -44,46 +54,14 @@ mutation rebuildSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen):
"""Test system rebuild without authorization"""
response = client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen):
"""Test system rebuild"""
response = authorized_client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["success"] is True
assert response.json()["data"]["system"]["runSystemRebuild"]["message"] is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-rebuild.service",
]
API_UPGRADE_SYSTEM_MUTATION = """
mutation upgradeSystem {
system {
@ -91,44 +69,144 @@ mutation upgradeSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen):
"""Test system upgrade without authorization"""
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_unauthorized(client, fp, action):
"""Test system rebuild without authorization"""
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
response = client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
assert_empty(response)
assert fp.call_count([fp.any()]) == 0
def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen):
"""Test system upgrade"""
def prepare_nixos_rebuild_calls(fp, unit_name):
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
prepare_nixos_rebuild_calls(fp, unit_name)
response = authorized_client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["success"] is True
assert response.json()["data"]["system"]["runSystemUpgrade"]["message"] is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-upgrade.service",
]
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
assert_ok(data)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "show", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.FINISHED
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_failed(
authorized_client, fp, action, mock_sleep_intervals
):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=failed")
fp.register(
["journalctl", "-u", unit_name, "-n", "10", "-o", "cat"], stdout="Some error"
)
response = authorized_client.post(
"/graphql",
json={
"query": query,
},
)
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
assert_ok(data)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "show", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.ERROR
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
API_ROLLBACK_SYSTEM_MUTATION = """

132
tests/test_huey.py Normal file
View File

@ -0,0 +1,132 @@
import pytest
import redis
from typing import List
import subprocess
from subprocess import Popen, check_output, TimeoutExpired
from os import environ, path, set_blocking
from io import BufferedReader
from huey.exceptions import HueyException
from selfprivacy_api.utils.huey import huey, immediate, HUEY_DATABASE_NUMBER
from selfprivacy_api.utils.redis_pool import RedisPool, REDIS_SOCKET
@huey.task()
def sum(a: int, b: int) -> int:
return a + b
def reset_huey_storage():
huey.storage = huey.create_storage()
def flush_huey_redis_forcefully():
url = RedisPool.connection_url(HUEY_DATABASE_NUMBER)
pool = redis.ConnectionPool.from_url(url, decode_responses=True)
connection = redis.Redis(connection_pool=pool)
connection.flushdb()
# TODO: may be useful in other places too, move to utils/ tests common if using it somewhere
def read_all_ready_output(stream: BufferedReader) -> str:
set_blocking(stream.fileno(), False)
output: List[bytes] = []
while True:
line = stream.readline()
raise ValueError(line)
if line == b"":
break
else:
output.append(line)
set_blocking(stream.fileno(), True)
result = b"".join(output)
return result.decode("utf-8")
@pytest.fixture()
def not_immediate():
assert environ["TEST_MODE"] == "true"
old_immediate = huey.immediate
environ["HUEY_QUEUES_FOR_TESTS"] = "Yes"
huey.immediate = False
assert huey.immediate is False
yield
del environ["HUEY_QUEUES_FOR_TESTS"]
huey.immediate = old_immediate
assert huey.immediate == old_immediate
@pytest.fixture()
def huey_socket_consumer(not_immediate):
"""
Same as above, but with socketed redis
"""
flush_huey_redis_forcefully()
command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"]
# First assert that consumer does not fail by itself
# Idk yet how to do it more elegantly
try:
check_output(command, timeout=2)
except TimeoutExpired:
pass
# Then open it for real
consumer_handle = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert path.exists(REDIS_SOCKET)
yield consumer_handle
consumer_handle.kill()
def test_huey_over_redis_socket(huey_socket_consumer):
assert huey.immediate is False
assert immediate() is False
assert "unix" in RedisPool.connection_url(HUEY_DATABASE_NUMBER)
try:
assert (
RedisPool.connection_url(HUEY_DATABASE_NUMBER)
in huey.storage_kwargs.values()
)
except AssertionError:
raise ValueError(
"our test-side huey does not connect over socket: ", huey.storage_kwargs
)
result = sum(2, 5)
try:
assert result(blocking=True, timeout=100) == 7
except HueyException as error:
if "timed out" in str(error):
output = read_all_ready_output(huey_socket_consumer.stdout)
errorstream = read_all_ready_output(huey_socket_consumer.stderr)
raise TimeoutError(
f"Huey timed out: {str(error)}",
f"Consumer output: {output}",
f"Consumer errorstream: {errorstream}",
)
else:
raise error
@pytest.mark.xfail(reason="cannot yet schedule with sockets for some reason")
def test_huey_schedule(huey_queues_socket):
# We do not schedule tasks anywhere, but concerning that it fails.
sum.schedule((2, 5), delay=10)
try:
assert len(huey.scheduled()) == 1
except AssertionError:
raise ValueError("have wrong amount of scheduled tasks", huey.scheduled())

View File

@ -8,6 +8,19 @@ import pytest
from selfprivacy_api.utils.network import get_ip4, get_ip6
OUTPUT_STRING = b"""
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff
altname enp0s3
altname ens3
inet 157.90.247.192/32 brd 157.90.247.192 scope global dynamic eth0
valid_lft 46061sec preferred_lft 35261sec
inet6 fe80::9400:ff:fef1:34ae/64 scope link
valid_lft forever preferred_lft forever
inet6 2a01:4f8:c17:7e3d::2/64 scope global
valid_lft forever preferred_lft forever
"""
OUTPUT_STRING_WITOUT_IP6 = b"""
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff
altname enp0s3
@ -31,6 +44,14 @@ def ip_process_mock(mocker):
return mock
@pytest.fixture
def ip_process_mock_without_ip6(mocker):
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=OUTPUT_STRING_WITOUT_IP6
)
return mock
@pytest.fixture
def failed_ip_process_mock(mocker):
mock = mocker.patch(
@ -62,24 +83,29 @@ def test_get_ip4(ip_process_mock):
def test_get_ip6(ip_process_mock):
"""Test get IPv6 address"""
ip6 = get_ip6()
assert ip6 == "fe80::9400:ff:fef1:34ae"
assert ip6 == "2a01:4f8:c17:7e3d::2"
def test_failed_get_ip4(failed_ip_process_mock):
ip4 = get_ip4()
assert ip4 is ""
assert ip4 == ""
def test_failed_get_ip6(failed_ip_process_mock):
ip6 = get_ip6()
assert ip6 is ""
assert ip6 is None
def test_failed_get_ip6_when_none(ip_process_mock_without_ip6):
ip6 = get_ip6()
assert ip6 is None
def test_failed_subprocess_get_ip4(failed_subprocess_call):
ip4 = get_ip4()
assert ip4 is ""
assert ip4 == ""
def test_failed_subprocess_get_ip6(failed_subprocess_call):
ip6 = get_ip6()
assert ip6 is ""
assert ip6 is None

View File

@ -24,7 +24,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from tests.common import ten_minutes_into_past, ten_minutes_into_future
from tests.common import ten_minutes_into_past, ten_hours_into_future
ORIGINAL_DEVICE_NAMES = [

View File

@ -13,7 +13,6 @@ from selfprivacy_api.services.bitwarden import Bitwarden
from selfprivacy_api.services.pleroma import Pleroma
from selfprivacy_api.services.mailserver import MailServer
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.generic_service_mover import FolderMoveNames
from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService
@ -81,21 +80,6 @@ def test_paths_from_owned_paths():
]
def test_foldermoves_from_ownedpaths():
owned = OwnedPath(
path="var/lib/bitwarden",
group="vaultwarden",
owner="vaultwarden",
)
assert FolderMoveNames.from_owned_path(owned) == FolderMoveNames(
name="bitwarden",
bind_location="var/lib/bitwarden",
group="vaultwarden",
owner="vaultwarden",
)
def test_enabling_disabling_reads_json(dummy_service: DummyService):
with WriteUserData() as data:
data["modules"][dummy_service.get_id()]["enable"] = False
@ -168,13 +152,14 @@ def test_enabling_disabling_writes_json(
# more detailed testing of this is in test_graphql/test_system.py
# Using the same random global IPs as the test_network_utils
def test_mailserver_with_dkim_returns_some_dns(dkim_file):
records = MailServer().get_dns_records()
records = MailServer().get_dns_records("157.90.247.192", "2a01:4f8:c17:7e3d::2")
assert len(records) > 0
def test_mailserver_with_no_dkim_returns_no_dns(no_dkim_file):
assert MailServer().get_dns_records() == []
assert MailServer().get_dns_records("157.90.247.192", "2a01:4f8:c17:7e3d::2") == []
def test_services_enabled_by_default(generic_userdata):