Merge pull request 'API 1.1.0 relaese' (#6) from system-configuration into master

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/6
This commit is contained in:
inex 2021-12-09 15:24:25 +02:00
commit 0d92be8fb7
53 changed files with 3610 additions and 391 deletions

12
.drone.yml Normal file
View file

@ -0,0 +1,12 @@
kind: pipeline
type: exec
name: default
platform:
os: linux
arch: amd64
steps:
- name: test
commands:
- pytest

View file

@ -6,3 +6,11 @@ setuptools
portalocker
flask-swagger
flask-swagger-ui
pytz
huey
gevent
pytest
coverage
pytest-mock
pytest-datadir

View file

@ -1,6 +1,9 @@
#!/usr/bin/env python3
"""SelfPrivacy server management API"""
import os
from gevent import monkey
from flask import Flask, request, jsonify
from flask_restful import Api
from flask_swagger import swagger
@ -11,18 +14,26 @@ from selfprivacy_api.resources.common import ApiVersion, DecryptDisk
from selfprivacy_api.resources.system import api_system
from selfprivacy_api.resources.services import services as api_services
from selfprivacy_api.restic_controller.tasks import huey, init_restic
swagger_blueprint = get_swaggerui_blueprint(
"/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"}
)
def create_app():
def create_app(test_config=None):
"""Initiate Flask app and bind routes"""
app = Flask(__name__)
api = Api(app)
app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN")
app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0")
if test_config is None:
app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN")
if app.config["AUTH_TOKEN"] is None:
raise ValueError("AUTH_TOKEN is not set")
app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0")
app.config["B2_BUCKET"] = os.environ.get("B2_BUCKET")
else:
app.config.update(test_config)
# Check bearer token
@app.before_request
@ -49,7 +60,7 @@ def create_app():
def spec():
if app.config["ENABLE_SWAGGER"] == "1":
swag = swagger(app)
swag["info"]["version"] = "1.0.0"
swag["info"]["version"] = "1.1.0"
swag["info"]["title"] = "SelfPrivacy API"
swag["info"]["description"] = "SelfPrivacy API"
swag["securityDefinitions"] = {
@ -71,5 +82,8 @@ def create_app():
if __name__ == "__main__":
monkey.patch_all()
created_app = create_app()
huey.start()
init_restic()
created_app.run(port=5050, debug=False)

View file

@ -24,7 +24,7 @@ class ApiVersion(Resource):
401:
description: Unauthorized
"""
return {"version": "1.0.0"}
return {"version": "1.1.0"}
class DecryptDisk(Resource):

View file

@ -1,10 +1,9 @@
#!/usr/bin/env python3
"""Bitwarden management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData
class EnableBitwarden(Resource):
@ -24,20 +23,10 @@ class EnableBitwarden(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "bitwarden" not in data:
data["bitwarden"] = {}
data["bitwarden"]["enable"] = True
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "bitwarden" not in data:
data["bitwarden"] = {}
data["bitwarden"]["enable"] = True
return {
"status": 0,
@ -62,20 +51,10 @@ class DisableBitwarden(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "bitwarden" not in data:
data["bitwarden"] = {}
data["bitwarden"]["enable"] = False
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "bitwarden" not in data:
data["bitwarden"] = {}
data["bitwarden"]["enable"] = False
return {
"status": 0,

View file

@ -1,10 +1,9 @@
#!/usr/bin/env python3
"""Gitea management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData
class EnableGitea(Resource):
@ -24,20 +23,10 @@ class EnableGitea(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "gitea" not in data:
data["gitea"] = {}
data["gitea"]["enable"] = True
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "gitea" not in data:
data["gitea"] = {}
data["gitea"]["enable"] = True
return {
"status": 0,
@ -62,20 +51,10 @@ class DisableGitea(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "gitea" not in data:
data["gitea"] = {}
data["gitea"]["enable"] = False
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "gitea" not in data:
data["gitea"] = {}
data["gitea"]["enable"] = False
return {
"status": 0,

View file

@ -2,6 +2,7 @@
"""Mail server management module"""
import base64
import subprocess
import os
from flask_restful import Resource
from selfprivacy_api.resources.services import api
@ -25,15 +26,20 @@ class DKIMKey(Resource):
description: DKIM key encoded in base64
401:
description: Unauthorized
404:
description: DKIM key not found
"""
domain = get_domain()
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = cat_process.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, "utf-8")
return dkim
if os.path.exists("/var/dkim/" + domain + ".selector.txt"):
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = cat_process.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, "utf-8")
return dkim
return "DKIM file not found", 404
api.add_resource(DKIMKey, "/mailserver/dkim")

View file

@ -1,10 +1,9 @@
#!/usr/bin/env python3
"""Nextcloud management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData
class EnableNextcloud(Resource):
@ -24,20 +23,10 @@ class EnableNextcloud(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "nextcloud" not in data:
data["nextcloud"] = {}
data["nextcloud"]["enable"] = True
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "nextcloud" not in data:
data["nextcloud"] = {}
data["nextcloud"]["enable"] = True
return {
"status": 0,
@ -62,20 +51,10 @@ class DisableNextcloud(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "nextcloud" not in data:
data["nextcloud"] = {}
data["nextcloud"]["enable"] = False
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "nextcloud" not in data:
data["nextcloud"] = {}
data["nextcloud"]["enable"] = False
return {
"status": 0,

View file

@ -1,10 +1,9 @@
#!/usr/bin/env python3
"""OpenConnect VPN server management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData
class EnableOcserv(Resource):
@ -24,20 +23,10 @@ class EnableOcserv(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "ocserv" not in data:
data["ocserv"] = {}
data["ocserv"]["enable"] = True
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "ocserv" not in data:
data["ocserv"] = {}
data["ocserv"]["enable"] = True
return {
"status": 0,
@ -62,20 +51,10 @@ class DisableOcserv(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "ocserv" not in data:
data["ocserv"] = {}
data["ocserv"]["enable"] = False
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "ocserv" not in data:
data["ocserv"] = {}
data["ocserv"]["enable"] = False
return {
"status": 0,

View file

@ -1,10 +1,9 @@
#!/usr/bin/env python3
"""Pleroma management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData
class EnablePleroma(Resource):
@ -24,20 +23,10 @@ class EnablePleroma(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "pleroma" not in data:
data["pleroma"] = {}
data["pleroma"]["enable"] = True
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "pleroma" not in data:
data["pleroma"] = {}
data["pleroma"]["enable"] = True
return {
"status": 0,
@ -62,20 +51,10 @@ class DisablePleroma(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "pleroma" not in data:
data["pleroma"] = {}
data["pleroma"]["enable"] = False
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "pleroma" not in data:
data["pleroma"] = {}
data["pleroma"]["enable"] = False
return {
"status": 0,

View file

@ -1,11 +1,11 @@
#!/usr/bin/env python3
"""Backups management module"""
import json
import subprocess
from flask import request
from flask_restful import Resource
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData
from selfprivacy_api.restic_controller import tasks as restic_tasks
from selfprivacy_api.restic_controller import ResticController, ResticStates
class ListAllBackups(Resource):
@ -27,25 +27,9 @@ class ListAllBackups(Resource):
401:
description: Unauthorized
"""
repository_name = request.headers.get("X-Repository-Name")
backup_listing_command = [
"restic",
"-r",
f"rclone:backblaze:{repository_name}:/sfbackup",
"snapshots",
"--json",
]
with subprocess.Popen(
backup_listing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_listing_process_descriptor:
snapshots_list = backup_listing_process_descriptor.communicate()[0]
return snapshots_list.decode("utf-8")
restic = ResticController()
return restic.snapshot_list
class AsyncCreateBackup(Resource):
@ -66,24 +50,17 @@ class AsyncCreateBackup(Resource):
description: Bad request
401:
description: Unauthorized
409:
description: Backup already in progress
"""
repository_name = request.headers.get("X-Repository-Name")
backup_command = [
"restic",
"-r",
f"rclone:backblaze:{repository_name}:/sfbackup",
"--verbose",
"--json",
"backup",
"/var",
]
with open("/tmp/backup.log", "w", encoding="utf-8") as log_file:
subprocess.Popen(
backup_command, shell=False, stdout=log_file, stderr=subprocess.STDOUT
)
restic = ResticController()
if restic.state is ResticStates.NO_KEY:
return {"error": "No key provided"}, 400
if restic.state is ResticStates.INITIALIZING:
return {"error": "Backup is initializing"}, 400
if restic.state is ResticStates.BACKING_UP:
return {"error": "Backup is already running"}, 409
restic_tasks.start_backup()
return {
"status": 0,
"message": "Backup creation has started",
@ -109,23 +86,39 @@ class CheckBackupStatus(Resource):
401:
description: Unauthorized
"""
backup_status_check_command = ["tail", "-1", "/tmp/backup.log"]
restic = ResticController()
with subprocess.Popen(
backup_status_check_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_status_check_process_descriptor:
backup_process_status = (
backup_status_check_process_descriptor.communicate()[0].decode("utf-8")
)
return {
"status": restic.state.name,
"progress": restic.progress,
"error_message": restic.error_message,
}
try:
json.loads(backup_process_status)
except ValueError:
return {"message": backup_process_status}
return backup_process_status
class ForceReloadSnapshots(Resource):
"""Force reload snapshots"""
def get(self):
"""
Force reload snapshots
---
tags:
- Backups
security:
- bearerAuth: []
responses:
200:
description: Snapshots reloaded
400:
description: Bad request
401:
description: Unauthorized
"""
restic_tasks.load_snapshots()
return {
"status": 0,
"message": "Snapshots reload started",
}
class AsyncRestoreBackup(Resource):
@ -139,6 +132,18 @@ class AsyncRestoreBackup(Resource):
- Backups
security:
- bearerAuth: []
parameters:
- in: body
required: true
name: backup
description: Backup to restore
schema:
type: object
required:
- backupId
properties:
backupId:
type: string
responses:
200:
description: Backup restoration process started
@ -147,26 +152,88 @@ class AsyncRestoreBackup(Resource):
401:
description: Unauthorized
"""
backup_restoration_command = ["restic", "-r", "rclone:backblaze:sfbackup", "var", "--json"]
parser = reqparse.RequestParser()
parser.add_argument("backupId", type=str, required=True)
args = parser.parse_args()
with open("/tmp/backup.log", "w", encoding="utf-8") as backup_log_file_descriptor:
with subprocess.Popen(
backup_restoration_command,
shell=False,
stdout=subprocess.PIPE,
stderr=backup_log_file_descriptor,
) as backup_restoration_process_descriptor:
backup_restoration_status = (
"Backup restoration procedure started"
)
return {
"status": 0,
"message": backup_restoration_status
}
restic = ResticController()
if restic.state is ResticStates.NO_KEY:
return {"error": "No key provided"}, 400
if restic.state is ResticStates.NOT_INITIALIZED:
return {"error": "Repository is not initialized"}, 400
if restic.state is ResticStates.BACKING_UP:
return {"error": "Backup is already running"}, 409
if restic.state is ResticStates.INITIALIZING:
return {"error": "Repository is initializing"}, 400
if restic.state is ResticStates.RESTORING:
return {"error": "Restore is already running"}, 409
for backup in restic.snapshot_list:
if backup["short_id"] == args["backupId"]:
restic_tasks.restore_from_backup(args["backupId"])
return {
"status": 0,
"message": "Backup restoration procedure started",
}
return {"error": "Backup not found"}, 404
class BackblazeConfig(Resource):
"""Backblaze config"""
def put(self):
"""
Set the new key for backblaze
---
tags:
- Backups
security:
- bearerAuth: []
parameters:
- in: body
required: true
name: backblazeSettings
description: New Backblaze settings
schema:
type: object
required:
- accountId
- accountKey
- bucket
properties:
accountId:
type: string
accountKey:
type: string
bucket:
type: string
responses:
200:
description: New Backblaze settings
400:
description: Bad request
401:
description: Unauthorized
"""
parser = reqparse.RequestParser()
parser.add_argument("accountId", type=str, required=True)
parser.add_argument("accountKey", type=str, required=True)
parser.add_argument("bucket", type=str, required=True)
args = parser.parse_args()
with WriteUserData() as data:
data["backblaze"]["accountId"] = args["accountId"]
data["backblaze"]["accountKey"] = args["accountKey"]
data["backblaze"]["bucket"] = args["bucket"]
restic_tasks.update_keys_from_userdata()
return "New Backblaze settings saved"
api.add_resource(ListAllBackups, "/restic/backup/list")
api.add_resource(AsyncCreateBackup, "/restic/backup/create")
api.add_resource(CheckBackupStatus, "/restic/backup/status")
api.add_resource(AsyncRestoreBackup, "/restic/backup/restore")
api.add_resource(BackblazeConfig, "/restic/backblaze/config")
api.add_resource(ForceReloadSnapshots, "/restic/backup/reload")

View file

@ -1,10 +1,9 @@
#!/usr/bin/env python3
"""SSH management module"""
import json
import portalocker
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData, ReadUserData, validate_ssh_public_key
class EnableSSH(Resource):
@ -24,20 +23,10 @@ class EnableSSH(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "ssh" not in data:
data["ssh"] = {}
data["ssh"]["enable"] = True
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
if "ssh" not in data:
data["ssh"] = {}
data["ssh"]["enable"] = True
return {
"status": 0,
@ -45,6 +34,82 @@ class EnableSSH(Resource):
}
class SSHSettings(Resource):
"""Enable/disable SSH"""
def get(self):
"""
Get current SSH settings
---
tags:
- SSH
security:
- bearerAuth: []
responses:
200:
description: SSH settings
400:
description: Bad request
"""
with ReadUserData() as data:
if "ssh" not in data:
return {"enable": True, "passwordAuthentication": True}
if "enable" not in data["ssh"]:
data["ssh"]["enable"] = True
if "passwordAuthentication" not in data["ssh"]:
data["ssh"]["passwordAuthentication"] = True
return {
"enable": data["ssh"]["enable"],
"passwordAuthentication": data["ssh"]["passwordAuthentication"],
}
def put(self):
"""
Change SSH settings
---
tags:
- SSH
security:
- bearerAuth: []
parameters:
- name: sshSettings
in: body
required: true
description: SSH settings
schema:
type: object
required:
- enable
- passwordAuthentication
properties:
enable:
type: boolean
passwordAuthentication:
type: boolean
responses:
200:
description: New settings saved
400:
description: Bad request
"""
parser = reqparse.RequestParser()
parser.add_argument("enable", type=bool, required=False)
parser.add_argument("passwordAuthentication", type=bool, required=False)
args = parser.parse_args()
enable = args["enable"]
password_authentication = args["passwordAuthentication"]
with WriteUserData() as data:
if "ssh" not in data:
data["ssh"] = {}
if enable is not None:
data["ssh"]["enable"] = enable
if password_authentication is not None:
data["ssh"]["passwordAuthentication"] = password_authentication
return "SSH settings changed"
class WriteSSHKey(Resource):
"""Write new SSH key"""
@ -89,28 +154,23 @@ class WriteSSHKey(Resource):
public_key = args["public_key"]
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
if "ssh" not in data:
data["ssh"] = {}
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
# Return 409 if key already in array
for key in data["ssh"]["rootKeys"]:
if key == public_key:
return {
"error": "Key already exists",
}, 409
data["ssh"]["rootKeys"].append(public_key)
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
if not validate_ssh_public_key(public_key):
return {
"error": "Invalid key type. Only ssh-ed25519 and ssh-rsa are supported.",
}, 400
with WriteUserData() as data:
if "ssh" not in data:
data["ssh"] = {}
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
# Return 409 if key already in array
for key in data["ssh"]["rootKeys"]:
if key == public_key:
return {
"error": "Key already exists",
}, 409
data["ssh"]["rootKeys"].append(public_key)
return {
"status": 0,
@ -118,5 +178,228 @@ class WriteSSHKey(Resource):
}, 201
class SSHKeys(Resource):
"""List SSH keys"""
def get(self, username):
"""
List SSH keys
---
tags:
- SSH
security:
- bearerAuth: []
parameters:
- in: path
name: username
type: string
required: true
description: User to list keys for
responses:
200:
description: SSH keys
401:
description: Unauthorized
"""
with ReadUserData() as data:
if username == "root":
if "ssh" not in data:
data["ssh"] = {}
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
return data["ssh"]["rootKeys"]
if username == data["username"]:
if "sshKeys" not in data:
data["sshKeys"] = []
return data["sshKeys"]
else:
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["name"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
return user["ssh"]["sshKeys"]
return {
"error": "User not found",
}, 404
def post(self, username):
"""
Add SSH key to the user
---
tags:
- SSH
security:
- bearerAuth: []
parameters:
- in: body
required: true
name: public_key
schema:
type: object
required:
- public_key
properties:
public_key:
type: string
- in: path
name: username
type: string
required: true
description: User to add keys for
responses:
201:
description: SSH key added
401:
description: Unauthorized
404:
description: User not found
409:
description: Key already exists
"""
parser = reqparse.RequestParser()
parser.add_argument(
"public_key", type=str, required=True, help="Key cannot be blank!"
)
args = parser.parse_args()
if username == "root":
return {
"error": "Use /ssh/key/send to add root keys",
}, 400
if not validate_ssh_public_key(args["public_key"]):
return {
"error": "Invalid key type. Only ssh-ed25519 and ssh-rsa are supported.",
}, 400
with WriteUserData() as data:
if username == data["username"]:
if "sshKeys" not in data:
data["sshKeys"] = []
# Return 409 if key already in array
for key in data["sshKeys"]:
if key == args["public_key"]:
return {
"error": "Key already exists",
}, 409
data["sshKeys"].append(args["public_key"])
return {
"message": "New SSH key successfully written",
}, 201
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
# Return 409 if key already in array
for key in user["sshKeys"]:
if key == args["public_key"]:
return {
"error": "Key already exists",
}, 409
user["sshKeys"].append(args["public_key"])
return {
"message": "New SSH key successfully written",
}, 201
return {
"error": "User not found",
}, 404
def delete(self, username):
"""
Delete SSH key
---
tags:
- SSH
security:
- bearerAuth: []
parameters:
- in: body
name: public_key
required: true
description: Key to delete
schema:
type: object
required:
- public_key
properties:
public_key:
type: string
- in: path
name: username
type: string
required: true
description: User to delete keys for
responses:
200:
description: SSH key deleted
401:
description: Unauthorized
404:
description: Key not found
"""
parser = reqparse.RequestParser()
parser.add_argument(
"public_key", type=str, required=True, help="Key cannot be blank!"
)
args = parser.parse_args()
with WriteUserData() as data:
if username == "root":
if "ssh" not in data:
data["ssh"] = {}
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
# Return 404 if key not in array
for key in data["ssh"]["rootKeys"]:
if key == args["public_key"]:
data["ssh"]["rootKeys"].remove(key)
return {
"message": "SSH key deleted",
}, 200
return {
"error": "Key not found",
}, 404
if username == data["username"]:
if "sshKeys" not in data:
data["sshKeys"] = []
# Return 404 if key not in array
for key in data["sshKeys"]:
if key == args["public_key"]:
data["sshKeys"].remove(key)
return {
"message": "SSH key deleted",
}, 200
return {
"error": "Key not found",
}, 404
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
# Return 404 if key not in array
for key in user["sshKeys"]:
if key == args["public_key"]:
user["sshKeys"].remove(key)
return {
"message": "SSH key successfully deleted",
}, 200
return {
"error": "Key not found",
}, 404
return {
"error": "User not found",
}, 404
api.add_resource(EnableSSH, "/ssh/enable")
api.add_resource(SSHSettings, "/ssh")
api.add_resource(WriteSSHKey, "/ssh/key/send")
api.add_resource(SSHKeys, "/ssh/keys/<string:username>")

View file

@ -1,59 +0,0 @@
#!/usr/bin/env/python3
"""Update dispatch module"""
import os
import subprocess
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.services import api
class PullRepositoryChanges(Resource):
def get(self):
"""
Pull Repository Changes
---
tags:
- Update
security:
- bearerAuth: []
responses:
200:
description: Got update
201:
description: Nothing to update
401:
description: Unauthorized
500:
description: Something went wrong
"""
git_pull_command = ["git", "pull"]
current_working_directory = os.getcwd()
os.chdir("/etc/nixos")
git_pull_process_descriptor = subprocess.Popen(
git_pull_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False
)
git_pull_process_descriptor.communicate()[0]
os.chdir(current_working_directory)
if git_pull_process_descriptor.returncode == 0:
return {
"status": 0,
"message": "Update completed successfully"
}
elif git_pull_process_descriptor.returncode > 0:
return {
"status": git_pull_process_descriptor.returncode,
"message": "Something went wrong"
}, 500
api.add_resource(PullRepositoryChanges, "/update")

View file

@ -1,13 +1,150 @@
#!/usr/bin/env python3
"""System management module"""
import os
import subprocess
import pytz
from flask import Blueprint
from flask_restful import Resource, Api
from flask_restful import Resource, Api, reqparse
from selfprivacy_api.utils import WriteUserData, ReadUserData
api_system = Blueprint("system", __name__, url_prefix="/system")
api = Api(api_system)
class Timezone(Resource):
"""Change timezone of NixOS"""
def get(self):
"""
Get current system timezone
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: Timezone
400:
description: Bad request
"""
with ReadUserData() as data:
if "timezone" not in data:
return "Europe/Uzhgorod"
return data["timezone"]
def put(self):
"""
Change system timezone
---
tags:
- System
security:
- bearerAuth: []
parameters:
- name: timezone
in: body
required: true
description: Timezone to set
schema:
type: object
required:
- timezone
properties:
timezone:
type: string
responses:
200:
description: Timezone changed
400:
description: Bad request
"""
parser = reqparse.RequestParser()
parser.add_argument("timezone", type=str, required=True)
timezone = parser.parse_args()["timezone"]
# Check if timezone is a valid tzdata string
if timezone not in pytz.all_timezones:
return {"error": "Invalid timezone"}, 400
with WriteUserData() as data:
data["timezone"] = timezone
return "Timezone changed"
class AutoUpgrade(Resource):
"""Enable/disable automatic upgrades and reboots"""
def get(self):
"""
Get current system autoupgrade settings
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: Auto-upgrade settings
400:
description: Bad request
"""
with ReadUserData() as data:
if "autoUpgrade" not in data:
return {"enable": True, "allowReboot": False}
if "enable" not in data["autoUpgrade"]:
data["autoUpgrade"]["enable"] = True
if "allowReboot" not in data["autoUpgrade"]:
data["autoUpgrade"]["allowReboot"] = False
return data["autoUpgrade"]
def put(self):
"""
Change system auto upgrade settings
---
tags:
- System
security:
- bearerAuth: []
parameters:
- name: autoUpgrade
in: body
required: true
description: Auto upgrade settings
schema:
type: object
required:
- enable
- allowReboot
properties:
enable:
type: boolean
allowReboot:
type: boolean
responses:
200:
description: New settings saved
400:
description: Bad request
"""
parser = reqparse.RequestParser()
parser.add_argument("enable", type=bool, required=False)
parser.add_argument("allowReboot", type=bool, required=False)
args = parser.parse_args()
enable = args["enable"]
allow_reboot = args["allowReboot"]
with WriteUserData() as data:
if "autoUpgrade" not in data:
data["autoUpgrade"] = {}
if enable is not None:
data["autoUpgrade"]["enable"] = enable
if allow_reboot is not None:
data["autoUpgrade"]["allowReboot"] = allow_reboot
return "Auto-upgrade settings changed"
class RebuildSystem(Resource):
"""Rebuild NixOS"""
@ -145,9 +282,62 @@ class PythonVersion(Resource):
return subprocess.check_output(["python", "-V"]).decode("utf-8").strip()
class PullRepositoryChanges(Resource):
def get(self):
"""
Pull Repository Changes
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: Got update
201:
description: Nothing to update
401:
description: Unauthorized
500:
description: Something went wrong
"""
git_pull_command = ["git", "pull"]
current_working_directory = os.getcwd()
os.chdir("/etc/nixos")
git_pull_process_descriptor = subprocess.Popen(
git_pull_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False,
)
data = git_pull_process_descriptor.communicate()[0].decode("utf-8")
os.chdir(current_working_directory)
if git_pull_process_descriptor.returncode == 0:
return {
"status": 0,
"message": "Update completed successfully",
"data": data,
}
elif git_pull_process_descriptor.returncode > 0:
return {
"status": git_pull_process_descriptor.returncode,
"message": "Something went wrong",
"data": data,
}, 500
api.add_resource(Timezone, "/configuration/timezone")
api.add_resource(AutoUpgrade, "/configuration/autoUpgrade")
api.add_resource(RebuildSystem, "/configuration/apply")
api.add_resource(RollbackSystem, "/configuration/rollback")
api.add_resource(UpgradeSystem, "/configuration/upgrade")
api.add_resource(RebootSystem, "/reboot")
api.add_resource(SystemVersion, "/version")
api.add_resource(PythonVersion, "/pythonVersion")
api.add_resource(PullRepositoryChanges, "/configuration/pull")

View file

@ -1,11 +1,11 @@
#!/usr/bin/env python3
"""Users management module"""
import subprocess
import json
import re
import portalocker
from flask_restful import Resource, reqparse
from selfprivacy_api.utils import WriteUserData, ReadUserData
class Users(Resource):
"""Users management"""
@ -24,17 +24,10 @@ class Users(Resource):
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_SH)
try:
data = json.load(userdata_file)
users = []
for user in data["users"]:
users.append(user["username"])
finally:
portalocker.unlock(userdata_file)
with ReadUserData() as data:
users = []
for user in data["users"]:
users.append(user["username"])
return users
def post(self):
@ -97,32 +90,21 @@ class Users(Resource):
if len(args["username"]) > 32:
return {"error": "username must be less than 32 characters"}, 400
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
with WriteUserData() as data:
if "users" not in data:
data["users"] = []
if "users" not in data:
data["users"] = []
# Return 400 if user already exists
for user in data["users"]:
if user["username"] == args["username"]:
return {"error": "User already exists"}, 409
# Return 400 if user already exists
for user in data["users"]:
if user["username"] == args["username"]:
return {"error": "User already exists"}, 409
data["users"].append(
{
"username": args["username"],
"hashedPassword": hashed_password,
}
)
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
data["users"].append(
{
"username": args["username"],
"hashedPassword": hashed_password,
}
)
return {"result": 0, "username": args["username"]}, 201
@ -154,29 +136,18 @@ class User(Resource):
404:
description: User not found
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
# Return 400 if username is not provided
if username is None:
return {"error": "username is required"}, 400
if username == data["username"]:
return {"error": "Cannot delete root user"}, 400
# Return 400 if user does not exist
for user in data["users"]:
if user["username"] == username:
data["users"].remove(user)
break
else:
return {"error": "User does not exist"}, 404
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(userdata_file)
with WriteUserData() as data:
# Return 400 if username is not provided
if username is None:
return {"error": "username is required"}, 400
if username == data["username"]:
return {"error": "Cannot delete root user"}, 400
# Return 400 if user does not exist
for user in data["users"]:
if user["username"] == username:
data["users"].remove(user)
break
else:
return {"error": "User does not exist"}, 404
return {"result": 0, "username": username}

View file

@ -0,0 +1,261 @@
"""Restic singleton controller."""
from datetime import datetime
import json
import subprocess
import os
from threading import Lock
from enum import Enum
import portalocker
from selfprivacy_api.utils import ReadUserData
class ResticStates(Enum):
"""Restic states enum."""
NO_KEY = 0
NOT_INITIALIZED = 1
INITIALIZED = 2
BACKING_UP = 3
RESTORING = 4
ERROR = 5
INITIALIZING = 6
class ResticController:
"""
States in wich the restic_controller may be
- no backblaze key
- backblaze key is provided, but repository is not initialized
- backblaze key is provided, repository is initialized
- fetching list of snapshots
- creating snapshot, current progress can be retrieved
- recovering from snapshot
Any ongoing operation acquires the lock
Current state can be fetched with get_state()
"""
_instance = None
_lock = Lock()
_initialized = False
def __new__(cls):
print("new is called!")
if not cls._instance:
with cls._lock:
cls._instance = super(ResticController, cls).__new__(cls)
return cls._instance
def __init__(self):
if self._initialized:
return
self.state = ResticStates.NO_KEY
self.lock = False
self.progress = 0
self._backblaze_account = None
self._backblaze_key = None
self._repository_name = None
self.snapshot_list = []
self.error_message = None
self._initialized = True
print("init is called!")
self.load_configuration()
self.write_rclone_config()
self.load_snapshots()
def load_configuration(self):
"""Load current configuration from user data to singleton."""
with ReadUserData() as user_data:
self._backblaze_account = user_data["backblaze"]["accountId"]
self._backblaze_key = user_data["backblaze"]["accountKey"]
self._repository_name = user_data["backblaze"]["bucket"]
if self._backblaze_account and self._backblaze_key and self._repository_name:
self.state = ResticStates.INITIALIZING
else:
self.state = ResticStates.NO_KEY
def write_rclone_config(self):
"""
Open /root/.config/rclone/rclone.conf with portalocker
and write configuration in the following format:
[backblaze]
type = b2
account = {self.backblaze_account}
key = {self.backblaze_key}
"""
with portalocker.Lock(
"/root/.config/rclone/rclone.conf", "w", timeout=None
) as rclone_config:
rclone_config.write(
f"[backblaze]\n"
f"type = b2\n"
f"account = {self._backblaze_account}\n"
f"key = {self._backblaze_key}\n"
)
def load_snapshots(self):
"""
Load list of snapshots from repository
"""
backup_listing_command = [
"restic",
"-o",
"rclone.args=serve restic --stdio",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"snapshots",
"--json",
]
if (
self.state == ResticStates.BACKING_UP
or self.state == ResticStates.RESTORING
):
return
print("preparing to read snapshots")
with subprocess.Popen(
backup_listing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_listing_process_descriptor:
snapshots_list = backup_listing_process_descriptor.communicate()[0].decode(
"utf-8"
)
try:
starting_index = snapshots_list.find("[")
json.loads(snapshots_list[starting_index:])
self.snapshot_list = json.loads(snapshots_list[starting_index:])
self.state = ResticStates.INITIALIZED
print(snapshots_list)
except ValueError:
if "Is there a repository at the following location?" in snapshots_list:
self.state = ResticStates.NOT_INITIALIZED
return
else:
self.state = ResticStates.ERROR
self.error_message = snapshots_list
return
def initialize_repository(self):
"""
Initialize repository with restic
"""
initialize_repository_command = [
"restic",
"-o",
"rclone.args=serve restic --stdio",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"init",
]
with subprocess.Popen(
initialize_repository_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as initialize_repository_process_descriptor:
msg = initialize_repository_process_descriptor.communicate()[0].decode(
"utf-8"
)
if initialize_repository_process_descriptor.returncode == 0:
self.state = ResticStates.INITIALIZED
else:
self.state = ResticStates.ERROR
self.error_message = msg
self.state = ResticStates.INITIALIZED
def start_backup(self):
"""
Start backup with restic
"""
backup_command = [
"restic",
"-o",
"rclone.args=serve restic --stdio",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"--verbose",
"--json",
"backup",
"/var",
]
with open("/tmp/backup.log", "w", encoding="utf-8") as log_file:
subprocess.Popen(
backup_command,
shell=False,
stdout=log_file,
stderr=subprocess.STDOUT,
)
self.state = ResticStates.BACKING_UP
self.progress = 0
def check_progress(self):
"""
Check progress of ongoing backup operation
"""
backup_status_check_command = ["tail", "-1", "/tmp/backup.log"]
if (
self.state == ResticStates.NO_KEY
or self.state == ResticStates.NOT_INITIALIZED
):
return
# If the log file does not exists
if os.path.exists("/tmp/backup.log") is False:
self.state = ResticStates.INITIALIZED
with subprocess.Popen(
backup_status_check_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_status_check_process_descriptor:
backup_process_status = (
backup_status_check_process_descriptor.communicate()[0].decode("utf-8")
)
try:
status = json.loads(backup_process_status)
except ValueError:
print(backup_process_status)
self.error_message = backup_process_status
return
if status["message_type"] == "status":
self.progress = status["percent_done"]
self.state = ResticStates.BACKING_UP
elif status["message_type"] == "summary":
self.state = ResticStates.INITIALIZED
self.progress = 0
self.snapshot_list.append(
{
"short_id": status["snapshot_id"],
# Current time in format 2021-12-02T00:02:51.086452543+03:00
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f%z"),
}
)
def restore_from_backup(self, snapshot_id):
"""
Restore from backup with restic
"""
backup_restoration_command = [
"restic",
"-o",
"rclone.args=serve restic --stdio",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"restore",
snapshot_id,
"--target",
"/",
]
self.state = ResticStates.RESTORING
subprocess.run(backup_restoration_command, shell=False)
self.state = ResticStates.INITIALIZED

View file

@ -0,0 +1,72 @@
"""Tasks for the restic controller."""
from huey import crontab
from huey.contrib.mini import MiniHuey
from . import ResticController, ResticStates
huey = MiniHuey()
@huey.task()
def init_restic():
controller = ResticController()
if controller.state == ResticStates.NOT_INITIALIZED:
initialize_repository()
@huey.task()
def update_keys_from_userdata():
controller = ResticController()
controller.load_configuration()
controller.write_rclone_config()
initialize_repository()
# Check every morning at 5:00 AM
@huey.task(crontab(hour=5, minute=0))
def cron_load_snapshots():
controller = ResticController()
controller.load_snapshots()
# Check every morning at 5:00 AM
@huey.task()
def load_snapshots():
controller = ResticController()
controller.load_snapshots()
if controller.state == ResticStates.NOT_INITIALIZED:
load_snapshots.schedule(delay=120)
@huey.task()
def initialize_repository():
controller = ResticController()
if controller.state is not ResticStates.NO_KEY:
controller.initialize_repository()
load_snapshots()
@huey.task()
def fetch_backup_status():
controller = ResticController()
if controller.state is ResticStates.BACKING_UP:
controller.check_progress()
if controller.state is ResticStates.BACKING_UP:
fetch_backup_status.schedule(delay=2)
else:
load_snapshots.schedule(delay=240)
@huey.task()
def start_backup():
controller = ResticController()
if controller.state is ResticStates.NOT_INITIALIZED:
resp = initialize_repository()
resp.get()
controller.start_backup()
fetch_backup_status.schedule(delay=3)
@huey.task()
def restore_from_backup(snapshot):
controller = ResticController()
controller.restore_from_backup(snapshot)

View file

@ -1,5 +1,10 @@
#!/usr/bin/env python3
"""Various utility functions"""
import json
import portalocker
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
def get_domain():
@ -7,3 +12,47 @@ def get_domain():
with open("/var/domain", "r", encoding="utf-8") as domain_file:
domain = domain_file.readline().rstrip()
return domain
class WriteUserData(object):
"""Write userdata.json with lock"""
def __init__(self):
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
self.data = json.load(self.userdata_file)
def __enter__(self):
return self.data
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.userdata_file.seek(0)
json.dump(self.data, self.userdata_file, indent=4)
self.userdata_file.truncate()
portalocker.unlock(self.userdata_file)
self.userdata_file.close()
class ReadUserData(object):
"""Read userdata.json with lock"""
def __init__(self):
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
self.data = json.load(self.userdata_file)
def __enter__(self):
return self.data
def __exit__(self, *args):
portalocker.unlock(self.userdata_file)
self.userdata_file.close()
def validate_ssh_public_key(key):
"""Validate SSH public key. It may be ssh-ed25519 or ssh-rsa."""
if not key.startswith("ssh-ed25519"):
if not key.startswith("ssh-rsa"):
return False
return True

0
tests/__init__.py Normal file
View file

43
tests/conftest.py Normal file
View file

@ -0,0 +1,43 @@
import pytest
from flask import testing
from selfprivacy_api.app import create_app
@pytest.fixture
def app():
app = create_app(
{
"AUTH_TOKEN": "TEST_TOKEN",
"ENABLE_SWAGGER": "0",
}
)
yield app
@pytest.fixture
def client(app):
return app.test_client()
class AuthorizedClient(testing.FlaskClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.token = "TEST_TOKEN"
def open(self, *args, **kwargs):
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.token}"
return super().open(*args, **kwargs)
@pytest.fixture
def authorized_client(app):
app.test_client_class = AuthorizedClient
return app.test_client()
@pytest.fixture
def runner(app):
return app.test_cli_runner()

View file

@ -0,0 +1,125 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def bitwarden_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["bitwarden"]["enable"] == False
return datadir
@pytest.fixture
def bitwarden_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["bitwarden"]["enable"] == True
return datadir
@pytest.fixture
def bitwarden_enable_undefined(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json"
)
assert "enable" not in read_json(datadir / "enable_undefined.json")["bitwarden"]
return datadir
@pytest.fixture
def bitwarden_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "bitwarden" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, bitwarden_off, endpoint):
response = client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, bitwarden_off, endpoint):
response = authorized_client.get(f"/services/bitwarden/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/bitwarden/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/bitwarden/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_off(authorized_client, bitwarden_off, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_off / "turned_off.json") == read_json(
bitwarden_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_on(authorized_client, bitwarden_on, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_on / "turned_on.json") == read_json(
bitwarden_on / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_twice(authorized_client, bitwarden_off, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_off / "turned_off.json") == read_json(
bitwarden_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_attribute_deleted(
authorized_client, bitwarden_enable_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_enable_undefined / "enable_undefined.json") == read_json(
bitwarden_enable_undefined / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_bitwarden_undefined(
authorized_client, bitwarden_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_undefined / "undefined.json") == read_json(
bitwarden_undefined / target_file
)

View file

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,49 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,121 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def gitea_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["gitea"]["enable"] == False
return datadir
@pytest.fixture
def gitea_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["gitea"]["enable"] == True
return datadir
@pytest.fixture
def gitea_enable_undefined(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json"
)
assert "enable" not in read_json(datadir / "enable_undefined.json")["gitea"]
return datadir
@pytest.fixture
def gitea_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "gitea" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, gitea_off, endpoint):
response = client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, gitea_off, endpoint):
response = authorized_client.get(f"/services/gitea/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/gitea/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/gitea/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_off(authorized_client, gitea_off, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_off / "turned_off.json") == read_json(
gitea_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_on(authorized_client, gitea_on, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_on / "turned_on.json") == read_json(gitea_on / target_file)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_twice(authorized_client, gitea_off, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_off / "turned_off.json") == read_json(
gitea_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_attribute_deleted(
authorized_client, gitea_enable_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_enable_undefined / "enable_undefined.json") == read_json(
gitea_enable_undefined / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_gitea_undefined(authorized_client, gitea_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_undefined / "undefined.json") == read_json(
gitea_undefined / target_file
)

View file

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": true
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,49 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,87 @@
import base64
import json
import pytest
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
###############################################################################
class ProcessMock:
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate():
return (b"I am a DKIM key", None)
class NoFileMock(ProcessMock):
def communicate():
return (b"", None)
@pytest.fixture
def mock_subproccess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
mocker.patch(
"selfprivacy_api.resources.services.mailserver.get_domain",
autospec=True,
return_value="example.com",
)
mocker.patch("os.path.exists", autospec=True, return_value=True)
return mock
@pytest.fixture
def mock_no_file(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=NoFileMock)
mocker.patch(
"selfprivacy_api.resources.services.mailserver.get_domain",
autospec=True,
return_value="example.com",
)
mocker.patch("os.path.exists", autospec=True, return_value=False)
return mock
###############################################################################
def test_unauthorized(client, mock_subproccess_popen):
"""Test unauthorized"""
response = client.get("/services/mailserver/dkim")
assert response.status_code == 401
def test_illegal_methods(authorized_client, mock_subproccess_popen):
response = authorized_client.post("/services/mailserver/dkim")
assert response.status_code == 405
response = authorized_client.put("/services/mailserver/dkim")
assert response.status_code == 405
response = authorized_client.delete("/services/mailserver/dkim")
assert response.status_code == 405
def test_dkim_key(authorized_client, mock_subproccess_popen):
"""Test DKIM key"""
response = authorized_client.get("/services/mailserver/dkim")
assert response.status_code == 200
assert base64.b64decode(response.data) == b"I am a DKIM key"
assert mock_subproccess_popen.call_args[0][0] == [
"cat",
"/var/dkim/example.com.selector.txt",
]
def test_no_dkim_key(authorized_client, mock_no_file):
"""Test no DKIM key"""
response = authorized_client.get("/services/mailserver/dkim")
assert response.status_code == 404
assert mock_no_file.called == False

View file

@ -0,0 +1,123 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def nextcloud_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["nextcloud"]["enable"] == False
return datadir
@pytest.fixture
def nextcloud_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["nextcloud"]["enable"] == True
return datadir
@pytest.fixture
def nextcloud_enable_undefined(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json"
)
assert "enable" not in read_json(datadir / "enable_undefined.json")["nextcloud"]
return datadir
@pytest.fixture
def nextcloud_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "nextcloud" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, nextcloud_off, endpoint):
response = client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, nextcloud_off, endpoint):
response = authorized_client.get(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_off(authorized_client, nextcloud_off, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_off / "turned_off.json") == read_json(
nextcloud_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_on(authorized_client, nextcloud_on, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_on / "turned_on.json") == read_json(
nextcloud_on / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_twice(authorized_client, nextcloud_off, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_off / "turned_off.json") == read_json(
nextcloud_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_attribute_deleted(
authorized_client, nextcloud_enable_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_enable_undefined / "enable_undefined.json") == read_json(
nextcloud_enable_undefined / target_file
)
@pytest.mark.parametrize("endpoint,target", [("enable", True), ("disable", False)])
def test_on_nextcloud_undefined(
authorized_client, nextcloud_undefined, endpoint, target
):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert (
read_json(nextcloud_undefined / "undefined.json")["nextcloud"]["enable"]
== target
)

View file

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN"
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,44 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,123 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def ocserv_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["ocserv"]["enable"] == False
return datadir
@pytest.fixture
def ocserv_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["ocserv"]["enable"] == True
return datadir
@pytest.fixture
def ocserv_enable_undefined(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json"
)
assert "enable" not in read_json(datadir / "enable_undefined.json")["ocserv"]
return datadir
@pytest.fixture
def ocserv_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "ocserv" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, ocserv_off, endpoint):
response = client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, ocserv_off, endpoint):
response = authorized_client.get(f"/services/ocserv/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/ocserv/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/ocserv/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_off(authorized_client, ocserv_off, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_off / "turned_off.json") == read_json(
ocserv_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_on(authorized_client, ocserv_on, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_on / "turned_on.json") == read_json(ocserv_on / target_file)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_twice(authorized_client, ocserv_off, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_off / "turned_off.json") == read_json(
ocserv_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_attribute_deleted(
authorized_client, ocserv_enable_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_enable_undefined / "enable_undefined.json") == read_json(
ocserv_enable_undefined / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_ocserv_undefined(
authorized_client, ocserv_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_undefined / "undefined.json") == read_json(
ocserv_undefined / target_file
)

View file

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": false
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,49 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,125 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def pleroma_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["pleroma"]["enable"] == False
return datadir
@pytest.fixture
def pleroma_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["pleroma"]["enable"] == True
return datadir
@pytest.fixture
def pleroma_enable_undefined(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json"
)
assert "enable" not in read_json(datadir / "enable_undefined.json")["pleroma"]
return datadir
@pytest.fixture
def pleroma_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "pleroma" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, pleroma_off, endpoint):
response = client.post(f"/services/pleroma/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, pleroma_off, endpoint):
response = authorized_client.get(f"/services/pleroma/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/pleroma/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/pleroma/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_off(authorized_client, pleroma_off, endpoint, target_file):
response = authorized_client.post(f"/services/pleroma/{endpoint}")
assert response.status_code == 200
assert read_json(pleroma_off / "turned_off.json") == read_json(
pleroma_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_on(authorized_client, pleroma_on, endpoint, target_file):
response = authorized_client.post(f"/services/pleroma/{endpoint}")
assert response.status_code == 200
assert read_json(pleroma_on / "turned_on.json") == read_json(
pleroma_on / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_twice(authorized_client, pleroma_off, endpoint, target_file):
response = authorized_client.post(f"/services/pleroma/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/pleroma/{endpoint}")
assert response.status_code == 200
assert read_json(pleroma_off / "turned_off.json") == read_json(
pleroma_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_attribute_deleted(
authorized_client, pleroma_enable_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/pleroma/{endpoint}")
assert response.status_code == 200
assert read_json(pleroma_enable_undefined / "enable_undefined.json") == read_json(
pleroma_enable_undefined / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_pleroma_undefined(
authorized_client, pleroma_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/pleroma/{endpoint}")
assert response.status_code == 200
assert read_json(pleroma_undefined / "undefined.json") == read_json(
pleroma_undefined / target_file
)

View file

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": false
},
"pleroma": {
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": false
},
"pleroma": {
"enable": false
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": false
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,49 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": false
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,133 @@
import base64
import json
import pytest
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
def call_args_asserts(mocked_object):
assert mocked_object.call_count == 8
assert mocked_object.call_args_list[0][0][0] == [
"systemctl",
"status",
"dovecot2.service",
]
assert mocked_object.call_args_list[1][0][0] == [
"systemctl",
"status",
"postfix.service",
]
assert mocked_object.call_args_list[2][0][0] == [
"systemctl",
"status",
"nginx.service",
]
assert mocked_object.call_args_list[3][0][0] == [
"systemctl",
"status",
"bitwarden_rs.service",
]
assert mocked_object.call_args_list[4][0][0] == [
"systemctl",
"status",
"gitea.service",
]
assert mocked_object.call_args_list[5][0][0] == [
"systemctl",
"status",
"phpfpm-nextcloud.service",
]
assert mocked_object.call_args_list[6][0][0] == [
"systemctl",
"status",
"ocserv.service",
]
assert mocked_object.call_args_list[7][0][0] == [
"systemctl",
"status",
"pleroma.service",
]
class ProcessMock:
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate():
return (b"", None)
returncode = 0
class BrokenServiceMock(ProcessMock):
returncode = 3
@pytest.fixture
def mock_subproccess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
return mock
@pytest.fixture
def mock_broken_service(mocker):
mock = mocker.patch(
"subprocess.Popen", autospec=True, return_value=BrokenServiceMock
)
return mock
###############################################################################
def test_unauthorized(client, mock_subproccess_popen):
"""Test unauthorized"""
response = client.get("/services/status")
assert response.status_code == 401
def test_illegal_methods(authorized_client, mock_subproccess_popen):
response = authorized_client.post("/services/status")
assert response.status_code == 405
response = authorized_client.put("/services/status")
assert response.status_code == 405
response = authorized_client.delete("/services/status")
assert response.status_code == 405
def test_dkim_key(authorized_client, mock_subproccess_popen):
response = authorized_client.get("/services/status")
assert response.status_code == 200
assert response.get_json() == {
"imap": 0,
"smtp": 0,
"http": 0,
"bitwarden": 0,
"gitea": 0,
"nextcloud": 0,
"ocserv": 0,
"pleroma": 0,
}
call_args_asserts(mock_subproccess_popen)
def test_no_dkim_key(authorized_client, mock_broken_service):
response = authorized_client.get("/services/status")
assert response.status_code == 200
assert response.get_json() == {
"imap": 3,
"smtp": 3,
"http": 3,
"bitwarden": 3,
"gitea": 3,
"nextcloud": 3,
"ocserv": 3,
"pleroma": 3,
}
call_args_asserts(mock_broken_service)

314
tests/services/test_ssh.py Normal file
View file

@ -0,0 +1,314 @@
import json
from os import read
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def ssh_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["ssh"]["enable"] == False
assert (
read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True
)
return datadir
@pytest.fixture
def ssh_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert (
read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True
)
assert read_json(datadir / "turned_on.json")["ssh"]["enable"] == True
return datadir
@pytest.fixture
def all_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "all_off.json")
assert read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] == False
assert read_json(datadir / "all_off.json")["ssh"]["enable"] == False
return datadir
@pytest.fixture
def undefined_settings(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "ssh" not in read_json(datadir / "undefined.json")
return datadir
@pytest.fixture
def root_and_admin_have_keys(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE",
new=datadir / "root_and_admin_have_keys.json",
)
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] == True
assert (
read_json(datadir / "root_and_admin_have_keys.json")["ssh"][
"passwordAuthentication"
]
== True
)
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [
"ssh-ed25519 KEY test@pc"
]
assert read_json(datadir / "root_and_admin_have_keys.json")["sshKeys"] == [
"ssh-rsa KEY test@pc"
]
return datadir
###############################################################################
@pytest.mark.parametrize(
"endpoint", ["ssh", "ssh/enable", "ssh/key/send", "ssh/keys/user"]
)
def test_unauthorized(client, ssh_off, endpoint):
response = client.post(f"/services/{endpoint}")
assert response.status_code == 401
def test_legacy_enable(authorized_client, ssh_off):
response = authorized_client.post(f"/services/ssh/enable")
assert response.status_code == 200
assert read_json(ssh_off / "turned_off.json") == read_json(
ssh_off / "turned_on.json"
)
def test_legacy_enable_when_enabled(authorized_client, ssh_on):
response = authorized_client.post(f"/services/ssh/enable")
assert response.status_code == 200
assert read_json(ssh_on / "turned_on.json") == read_json(ssh_on / "turned_on.json")
def test_get_current_settings_ssh_off(authorized_client, ssh_off):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
assert response.json == {"enable": False, "passwordAuthentication": True}
def test_get_current_settings_ssh_on(authorized_client, ssh_on):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
assert response.json == {"enable": True, "passwordAuthentication": True}
def test_get_current_settings_all_off(authorized_client, all_off):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
assert response.json == {"enable": False, "passwordAuthentication": False}
def test_get_current_settings_undefined(authorized_client, undefined_settings):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
assert response.json == {"enable": True, "passwordAuthentication": True}
available_settings = [
{"enable": True, "passwordAuthentication": True},
{"enable": True, "passwordAuthentication": False},
{"enable": False, "passwordAuthentication": True},
{"enable": False, "passwordAuthentication": False},
{"enable": True},
{"enable": False},
{"passwordAuthentication": True},
{"passwordAuthentication": False},
]
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_ssh_off(authorized_client, ssh_off, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(ssh_off / "turned_off.json")["ssh"]
if "enable" in settings:
assert data["enable"] == settings["enable"]
if "passwordAuthentication" in settings:
assert data["passwordAuthentication"] == settings["passwordAuthentication"]
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_ssh_on(authorized_client, ssh_on, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(ssh_on / "turned_on.json")["ssh"]
if "enable" in settings:
assert data["enable"] == settings["enable"]
if "passwordAuthentication" in settings:
assert data["passwordAuthentication"] == settings["passwordAuthentication"]
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_all_off(authorized_client, all_off, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(all_off / "all_off.json")["ssh"]
if "enable" in settings:
assert data["enable"] == settings["enable"]
if "passwordAuthentication" in settings:
assert data["passwordAuthentication"] == settings["passwordAuthentication"]
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_undefined(authorized_client, undefined_settings, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(undefined_settings / "undefined.json")["ssh"]
if "enable" in settings:
assert data["enable"] == settings["enable"]
if "passwordAuthentication" in settings:
assert data["passwordAuthentication"] == settings["passwordAuthentication"]
def test_add_root_key(authorized_client, ssh_on):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(ssh_on / "turned_on.json")["ssh"]["rootKeys"] == [
"ssh-rsa KEY test@pc",
]
def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
"rootKeys"
] == [
"ssh-ed25519 KEY test@pc",
"ssh-rsa KEY test@pc",
]
def test_add_existing_root_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"}
)
assert response.status_code == 409
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
"rootKeys"
] == [
"ssh-ed25519 KEY test@pc",
]
def test_add_invalid_root_key(authorized_client, ssh_on):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"}
)
assert response.status_code == 400
def test_add_root_key_via_wrong_endpoint(authorized_client, ssh_on):
response = authorized_client.post(
f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 400
def test_get_root_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.get(f"/services/ssh/keys/root")
assert response.status_code == 200
assert response.json == ["ssh-ed25519 KEY test@pc"]
def test_get_root_key_when_none(authorized_client, ssh_on):
response = authorized_client.get(f"/services/ssh/keys/root")
assert response.status_code == 200
assert response.json == []
def test_delete_root_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.delete(
f"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"}
)
assert response.status_code == 200
assert (
read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
"rootKeys"
]
== []
)
def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.delete(
f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 404
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
"rootKeys"
] == [
"ssh-ed25519 KEY test@pc",
]
def test_get_admin_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.get(f"/services/ssh/keys/tester")
assert response.status_code == 200
assert response.json == ["ssh-rsa KEY test@pc"]
def test_get_admin_key_when_none(authorized_client, ssh_on):
response = authorized_client.get(f"/services/ssh/keys/tester")
assert response.status_code == 200
assert response.json == []
def test_delete_admin_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.delete(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 200
assert (
read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["sshKeys"]
== []
)
def test_add_admin_key(authorized_client, ssh_on):
response = authorized_client.post(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(ssh_on / "turned_on.json")["sshKeys"] == [
"ssh-rsa KEY test@pc",
]
def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys):
response = authorized_client.post(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"}
)
assert response.status_code == 201
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[
"sshKeys"
] == ["ssh-rsa KEY test@pc", "ssh-rsa KEY_2 test@pc"]
def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.post(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 409
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[
"sshKeys"
] == [
"ssh-rsa KEY test@pc",
]

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": false,
"passwordAuthentication": false,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,46 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": false,
"passwordAuthentication": true
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow"
}

View file

@ -0,0 +1,46 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow"
}

View file

@ -0,0 +1,45 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}