Merge pull request 'Decomposition' (#2) from inex/selfprivacy-rest-api:master into master

Reviewed-on: https://git.selfprivacy.org/ilchub/selfprivacy-rest-api/pulls/2
This commit is contained in:
Illia Chub 2021-11-12 11:22:22 +02:00
commit dbb4c10956
22 changed files with 832 additions and 564 deletions

148
.gitignore vendored
View file

@ -1 +1,147 @@
users.nix
users.nix
### Flask ###
instance/*
!instance/.gitignore
.webassets-cache
.env
### Flask.Python Stack ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# End of https://www.toptal.com/developers/gitignore/api/flask

557
main.py
View file

@ -1,557 +0,0 @@
#!/usr/bin/env python3
from flask import Flask, jsonify, request, json
from flask_restful import Resource, Api, reqparse
import base64
import pandas as pd
import ast
import subprocess
import os
import fileinput
app = Flask(__name__)
api = Api(app)
@app.route("/systemVersion", methods=["GET"])
def Uname():
uname = subprocess.check_output(["uname", "-arm"])
return jsonify(uname)
@app.route("/getDKIM", methods=["GET"])
def GetDkimKey():
with open("/var/domain") as domainFile:
domain = domainFile.readline()
domain = domain.rstrip("\n")
catProcess = subprocess.Popen(["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE)
dkim = catProcess.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, 'utf-8')
print(dkim)
response = app.response_class(
response=json.dumps(dkim),
status=200,
mimetype='application/json'
)
return response
@app.route("/pythonVersion", methods=["GET"])
def GetPythonVersion():
pythonVersion = subprocess.check_output(["python","--version"])
return jsonify(pythonVersion)
@app.route("/system/configuration/apply", methods=["GET"])
def RebuildSystem():
rebuildResult = subprocess.Popen(["nixos-rebuild","switch"])
rebuildResult.communicate()[0]
return jsonify(
status=rebuildResult.returncode
)
@app.route("/system/configuration/rollback", methods=["GET"])
def RollbackSystem():
rollbackResult = subprocess.Popen(["nixos-rebuild","switch","--rollback"])
rollbackResult.communicate()[0]
return jsonify(rollbackResult.returncode)
@app.route("/system/upgrade", methods=["GET"])
def UpgradeSystem():
upgradeResult = subprocess.Popen(["nixos-rebuild","switch","--upgrade"])
upgradeResult.communicate()[0]
return jsonify(
status=upgradeResult.returncode
)
@app.route("/users/create", methods=["POST"])
def CreateUser():
rawPassword = request.headers.get("X-Password")
hashingCommand = '''
mkpasswd -m sha-512 {0}
'''.format(rawPassword)
passwordHashProcessDescriptor = subprocess.Popen(hashingCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
hashedPassword = passwordHashProcessDescriptor.communicate()[0]
hashedPassword = hashedPassword.decode("ascii")
hashedPassword = hashedPassword.rstrip()
print("[TRACE] {0}".format(hashedPassword))
print("[INFO] Opening /etc/nixos/users.nix...", sep="")
readOnlyFileDescriptor = open("/etc/nixos/users.nix", "r")
print("done")
fileContent = list()
index = int(0)
print("[INFO] Reading file content...", sep="")
while True:
line = readOnlyFileDescriptor.readline()
if not line:
break
else:
fileContent.append(line)
print("[DEBUG] Read line!")
userTemplate = """
#begin
\"{0}\" = {{
isNormalUser = true;
hashedPassword = \"{1}\";
}};
#end
""".format(request.headers.get("X-User"), hashedPassword)
mailUserTemplate = """
\"{0}@{2}\" = {{
hashedPassword =
\"{1}\";
catchAll = [ \"{2}\" ];
sieveScript = ''
require [\"fileinto\", \"mailbox\"];
if header :contains \"Chat-Version\" \"1.0\"
{{
fileinto :create \"DeltaChat\";
stop;
}}
'';
}};""".format(request.headers.get("X-User"), hashedPassword, request.headers.get("X-Domain"))
for line in fileContent:
index += 1
if line.startswith(" #begin"):
print("[DEBUG] Found user configuration snippet match!")
print("[INFO] Writing new user configuration snippet to memory...", sep="")
fileContent.insert(index-1, userTemplate)
print("done")
break
print("[INFO] Writing data from memory to file...", sep="")
readWriteFileDescriptor = open("/etc/nixos/users.nix", "w")
userConfigurationWriteOperationResult = readWriteFileDescriptor.writelines(fileContent)
print("done")
readOnlyFileDescriptor.close()
readWriteFileDescriptor.close()
print("[INFO] Opening /etc/nixos/mailserver/system/mailserver.nix.nix for reading...", sep="")
readOnlyFileDescriptor = open("/etc/nixos/mailserver/system/mailserver.nix")
print("done")
fileContent = list()
index = int(0)
while True:
line = readOnlyFileDescriptor.readline()
if not line:
break
else:
fileContent.append(line)
print("[DEBUG] Read line!")
for line in fileContent:
if line.startswith(" loginAccounts = {"):
print("[DEBUG] Found mailuser configuration snippet match!")
print("[INFO] Writing new user configuration snippet to memory...", sep="")
fileContent.insert(index+1, mailUserTemplate)
print("done")
break
index += 1
readWriteFileDescriptor = open("/etc/nixos/mailserver/system/mailserver.nix", "w")
mailUserConfigurationWriteOperationResult = readWriteFileDescriptor.writelines(fileContent)
return jsonify(
result=0,
descriptor0 = userConfigurationWriteOperationResult,
descriptor1 = mailUserConfigurationWriteOperationResult
)
@app.route("/deleteUser", methods=["DELETE"])
def deleteUser():
user = subprocess.Popen(["userdel",request.headers.get("X-User")])
user.communicate()[0]
return jsonify(user.returncode)
@app.route("/services/status", methods=["GET"])
def GetServiceStatus():
imapService = subprocess.Popen(["systemctl", "status", "dovecot2.service"])
imapService.communicate()[0]
smtpService = subprocess.Popen(["systemctl", "status", "postfix.service"])
smtpService.communicate()[0]
httpService = subprocess.Popen(["systemctl", "status", "nginx.service"])
httpService.communicate()[0]
bitwardenService = subprocess.Popen(["systemctl", "status", "bitwarden_rs.service"])
bitwardenService.communicate()[0]
giteaService = subprocess.Popen(["systemctl", "status", "gitea.service"])
giteaService.communicate()[0]
nextcloudService = subprocess.Popen(["systemctl", "status", "phpfpm-nextcloud.service"])
nextcloudService.communicate()[0]
ocservService = subprocess.Popen(["systemctl", "status", "ocserv.service"])
ocservService.communicate()[0]
pleromaService = subprocess.Popen(["systemctl", "status", "pleroma.service"])
pleromaService.communicate()[0]
return jsonify(
imap=imapService.returncode,
smtp=smtpService.returncode,
http=httpService.returncode,
bitwarden=bitwardenService.returncode,
gitea=giteaService.returncode,
nextcloud=nextcloudService.returncode,
ocserv=ocservService.returncode,
pleroma=pleromaService.returncode
)
@app.route("/decryptDisk", methods=["POST"])
def RequestDiskDecryption():
decryptionCommand = '''
echo -n {0} | cryptsetup luksOpen /dev/sdb decryptedVar'''.format(request.headers.get("X-Decryption-Key"))
decryptionService = subprocess.Popen(decryptionCommand, shell=True, stdout=subprocess.PIPE)
decryptionService.communicate()
return jsonify(
status=decryptionService.returncode
)
@app.route("/services/ssh/enable", methods=["POST"])
def EnableSSH():
readOnlyFileDescriptor = open("/etc/nixos/configuration.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/configuration.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
# Bitwarden
@app.route("/services/bitwarden/enable", methods=["POST"])
def EnableBitwarden():
readOnlyFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
@app.route("/services/bitwarden/disable", methods=["POST"])
def DisableBitwarden():
readOnlyFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
#Gitea
@app.route("/services/gitea/disable", methods=["POST"])
def DisableGitea():
readOnlyFileDescriptor = open("/etc/nixos/git/gitea.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/git/gitea.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
@app.route("/services/gitea/enable", methods=["POST"])
def EnableGitea():
readOnlyFileDescriptor = open("/etc/nixos/git/gitea.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/git/gitea.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
#Nextcloud
@app.route("/services/nextcloud/disable", methods=["POST"])
def DisableNextcloud():
readOnlyFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
@app.route("/services/nextcloud/enable", methods=["POST"])
def EnableNextcloud():
readOnlyFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
#Pleroma
@app.route("/services/pleroma/disable", methods=["POST"])
def DisablePleroma():
readOnlyFileDescriptor = open("/etc/nixos/social/pleroma.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/social/pleroma.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
@app.route("/services/pleroma/enable", methods=["POST"])
def EnablePleroma():
readOnlyFileDescriptor = open("/etc/nixos/social/pleroma.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/social/pleroma.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
#Ocserv
@app.route("/services/ocserv/disable", methods=["POST"])
def DisableOcserv():
readOnlyFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
@app.route("/services/ocserv/enable", methods=["POST"])
def EnableOcserv():
readOnlyFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return jsonify(
status=0,
descriptor=writeOperationDescriptor
)
@app.route("/services/restic/backup/list", methods=["GET"])
def ListAllBackups():
backupListingCommand = '''
restic -r b2:{0}:/sfbackup snapshots --password-file /var/lib/restic/rpass --json
'''.format(request.headers.get("X-Repository-Name"))
backupListingProcessDescriptor = subprocess.Popen(backupListingCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
snapshotsList = backupListingProcessDescriptor.communicate()[0]
return snapshotsList
@app.route("/services/restic/backup/create", methods=["PUT"])
def CreateSingleBackup():
backupCommand = '''
restic -r b2:{0}:/sfbackup --verbose backup /var --password-file /var/lib/restic/rpass
'''.format(request.headers.get("X-Repository-Name"))
backupProcessDescriptor = subprocess.Popen(backupCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return jsonify(
status=0,
message="Backup creation has started"
)
@app.route("/services/ssh/key/send", methods=["PUT"])
def ReadKey():
requestBody = request.get_json()
publicKey = requestBody.data(["public_key"])
print("[INFO] Opening /etc/nixos/configuration.nix...", sep="")
readOnlyFileDescriptor = open("/etc/nixos/configuration.nix", "r")
print("done")
fileContent = list()
index = int(0)
print("[INFO] Reading file content...", sep="")
while True:
line = readOnlyFileDescriptor.readline()
if not line:
break
else:
fileContent.append(line)
print("[DEBUG] Read line!")
for line in fileContent:
index += 1
if "openssh.authorizedKeys.keys = [" in line:
print("[DEBUG] Found SSH key configuration snippet match!")
print("[INFO] Writing new SSH key", sep="")
fileContent.append(index, "\n \"" + publicKey + "\"")
print("done")
break
print("[INFO] Writing data from memory to file...", sep="")
readWriteFileDescriptor = open("/etc/nixos/configuration.nix", "w")
print("done")
operationResult = readWriteFileDescriptor.writelines(fileContent)
return jsonify(
result=0,
descriptor = operationResult
)
if __name__ == '__main__':
app.run(port=5050, debug=False)

3
pyproject.toml Normal file
View file

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

View file

@ -1,6 +1,5 @@
wheel
flask
flask_restful
flask-restful
flask_socketio
pandas
setuptools

View file

26
selfprivacy_api/app.py Normal file
View file

@ -0,0 +1,26 @@
#!/usr/bin/env python3
from flask import Flask
from flask_restful import Api
from selfprivacy_api.resources.users import Users
from selfprivacy_api.resources.common import DecryptDisk
def create_app():
app = Flask(__name__)
api = Api(app)
api.add_resource(Users, "/users")
api.add_resource(DecryptDisk, "/decryptDisk")
from selfprivacy_api.resources.system import api_system
from selfprivacy_api.resources.services import services as api_services
app.register_blueprint(api_system)
app.register_blueprint(api_services)
return app
if __name__ == "__main__":
app = create_app()
app.run(port=5050, debug=False)

View file

View file

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from flask import Flask, jsonify, request, json
from flask_restful import Resource
import subprocess
from selfprivacy_api.utils import get_domain
# Decrypt disk
class DecryptDisk(Resource):
def post(self):
decryptionCommand = """
echo -n {0} | cryptsetup luksOpen /dev/sdb decryptedVar""".format(
request.headers.get("X-Decryption-Key")
)
decryptionService = subprocess.Popen(
decryptionCommand, shell=True, stdout=subprocess.PIPE
)
decryptionService.communicate()
return {"status": decryptionService.returncode}

View file

@ -0,0 +1,17 @@
from flask import Blueprint
from flask_restful import Api
services = Blueprint("services", __name__, url_prefix="/services")
api = Api(services)
from . import (
bitwarden,
gitea,
mailserver,
main,
nextcloud,
ocserv,
pleroma,
restic,
ssh,
)

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Bitwarden
class EnableBitwarden(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Bitwarden enabled",
}
# Disable Bitwarden
class DisableBitwarden(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Bitwarden disabled",
}
api.add_resource(EnableBitwarden, "/bitwarden/enable")
api.add_resource(DisableBitwarden, "/bitwarden/disable")

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Gitea
class EnableGitea(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/git/gitea.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/git/gitea.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Gitea enabled",
}
# Disable Gitea
class DisableGitea(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/git/gitea.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/git/gitea.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Gitea disabled",
}
api.add_resource(EnableGitea, "/gitea/enable")
api.add_resource(DisableGitea, "/gitea/disable")

View file

@ -0,0 +1,23 @@
#!/usr/bin/env python3
from flask_restful import Resource
import base64
import subprocess
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import get_domain
# Get DKIM key from file
class DKIMKey(Resource):
def get(self):
domain = get_domain()
catProcess = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = catProcess.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, "utf-8")
return dkim
api.add_resource(DKIMKey, "/mailserver/dkim")

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource, Api
import subprocess
from . import api
# Get service status
class ServiceStatus(Resource):
def get(self):
imapService = subprocess.Popen(["systemctl", "status", "dovecot2.service"])
imapService.communicate()[0]
smtpService = subprocess.Popen(["systemctl", "status", "postfix.service"])
smtpService.communicate()[0]
httpService = subprocess.Popen(["systemctl", "status", "nginx.service"])
httpService.communicate()[0]
bitwardenService = subprocess.Popen(
["systemctl", "status", "bitwarden_rs.service"]
)
bitwardenService.communicate()[0]
giteaService = subprocess.Popen(["systemctl", "status", "gitea.service"])
giteaService.communicate()[0]
nextcloudService = subprocess.Popen(
["systemctl", "status", "phpfpm-nextcloud.service"]
)
nextcloudService.communicate()[0]
ocservService = subprocess.Popen(["systemctl", "status", "ocserv.service"])
ocservService.communicate()[0]
pleromaService = subprocess.Popen(["systemctl", "status", "pleroma.service"])
pleromaService.communicate()[0]
return {
"imap": imapService.returncode,
"smtp": smtpService.returncode,
"http": httpService.returncode,
"bitwarden": bitwardenService.returncode,
"gitea": giteaService.returncode,
"nextcloud": nextcloudService.returncode,
"ocserv": ocservService.returncode,
"pleroma": pleromaService.returncode,
}
api.add_resource(ServiceStatus, "/status")

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Nextcloud
class EnableNextcloud(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Nextcloud enabled",
}
# Disable Nextcloud
class DisableNextcloud(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Nextcloud disabled",
}
api.add_resource(EnableNextcloud, "/nextcloud/enable")
api.add_resource(DisableNextcloud, "/nextcloud/disable")

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable OpenConnect VPN server
class EnableOcserv(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "OpenConnect VPN server enabled",
}
# Disable OpenConnect VPN server
class DisableOcserv(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "OpenConnect VPN server disabled",
}
api.add_resource(EnableOcserv, "/ocserv/enable")
api.add_resource(DisableOcserv, "/ocserv/disable")

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Pleroma
class EnablePleroma(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/social/pleroma.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/social/pleroma.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Pleroma enabled",
}
# Disable Pleroma
class DisablePleroma(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/social/pleroma.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = true;", "enable = false;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/social/pleroma.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "Pleroma disabled",
}
api.add_resource(EnablePleroma, "/pleroma/enable")
api.add_resource(DisablePleroma, "/pleroma/disable")

View file

@ -0,0 +1,51 @@
#!/usr/bin/env python3
from flask import request
from flask_restful import Resource
import subprocess
from selfprivacy_api.resources.services import api
# List all restic backups
class ListAllBackups(Resource):
def get(self):
backupListingCommand = """
restic -r b2:{0}:/sfbackup snapshots --password-file /var/lib/restic/rpass --json
""".format(
request.headers.get("X-Repository-Name")
)
backupListingProcessDescriptor = subprocess.Popen(
backupListingCommand,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
snapshotsList = backupListingProcessDescriptor.communicate()[0]
return snapshotsList.decode("utf-8")
# Create a new restic backup
class CreateBackup(Resource):
def put(self):
backupCommand = """
restic -r b2:{0}:/sfbackup --verbose backup /var --password-file /var/lib/restic/rpass
""".format(
request.headers.get("X-Repository-Name")
)
backupProcessDescriptor = subprocess.Popen(
backupCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
commandOutput = backupProcessDescriptor.communicate()[0]
return {
"status": 0,
"message": "Backup creation has started",
}
api.add_resource(ListAllBackups, "/restic/backup/list")
api.add_resource(CreateBackup, "/restic/backup/create")

View file

@ -0,0 +1,75 @@
#!/usr/bin/env python3
from flask import Blueprint, request
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.services import api
# Enable SSH
class EnableSSH(Resource):
def post(self):
readOnlyFileDescriptor = open("/etc/nixos/configuration.nix", "rt")
fileContent = readOnlyFileDescriptor.read()
fileContent = fileContent.replace("enable = false;", "enable = true;")
readOnlyFileDescriptor.close()
readWriteFileDescriptor = open("/etc/nixos/configuration.nix", "wt")
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
readWriteFileDescriptor.close()
return {
"status": 0,
"descriptor": writeOperationDescriptor,
"message": "SSH enabled",
}
# Write new SSH key
class WriteSSHKey(Resource):
def put(self):
parser = reqparse.RequestParser()
parser.add_argument(
"public_key", type=str, required=True, help="Key cannot be blank!"
)
args = parser.parse_args()
publicKey = args["public_key"]
print("[INFO] Opening /etc/nixos/configuration.nix...", sep="")
readOnlyFileDescriptor = open("/etc/nixos/configuration.nix", "r")
print("done")
fileContent = list()
index = int(0)
print("[INFO] Reading file content...", sep="")
while True:
line = readOnlyFileDescriptor.readline()
if not line:
break
else:
fileContent.append(line)
print("[DEBUG] Read line!")
for line in fileContent:
index += 1
if "openssh.authorizedKeys.keys = [" in line:
print("[DEBUG] Found SSH key configuration snippet match!")
print("[INFO] Writing new SSH key", sep="")
fileContent.append('\n "' + publicKey + '"')
print("done")
break
print("[INFO] Writing data from memory to file...", sep="")
readWriteFileDescriptor = open("/etc/nixos/configuration.nix", "w")
print("done")
operationResult = readWriteFileDescriptor.writelines(fileContent)
return {
"status": 0,
"descriptor": operationResult,
"message": "New SSH key successfully written to /etc/nixos/configuration.nix",
}
api.add_resource(EnableSSH, "/ssh/enable")
api.add_resource(WriteSSHKey, "/ssh/key/send")

View file

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from flask import Blueprint
from flask_restful import Resource, Api
import subprocess
api_system = Blueprint("system", __name__, url_prefix="/system")
api = Api(api_system)
# Rebuild NixOS
class RebuildSystem(Resource):
def get(self):
rebuildResult = subprocess.Popen(["nixos-rebuild", "switch"])
rebuildResult.communicate()[0]
return rebuildResult.returncode
# Rollback NixOS
class RollbackSystem(Resource):
def get(self):
rollbackResult = subprocess.Popen(["nixos-rebuild", "switch", "--rollback"])
rollbackResult.communicate()[0]
return rollbackResult.returncode
# Upgrade NixOS
class UpgradeSystem(Resource):
def get(self):
upgradeResult = subprocess.Popen(["nixos-rebuild", "switch", "--upgrade"])
upgradeResult.communicate()[0]
return upgradeResult.returncode
# Get system version from uname
class SystemVersion(Resource):
def get(self):
return {
"system_version": subprocess.check_output(["uname", "-a"])
.decode("utf-8")
.strip()
}
# Get python version
class PythonVersion(Resource):
def get(self):
return subprocess.check_output(["python", "-V"]).decode("utf-8").strip()
api.add_resource(RebuildSystem, "/configuration/apply")
api.add_resource(RollbackSystem, "/configuration/rollback")
api.add_resource(UpgradeSystem, "/upgrade")
api.add_resource(SystemVersion, "/version")
api.add_resource(PythonVersion, "/pythonVersion")

View file

@ -0,0 +1,146 @@
#!/usr/bin/env python3
from flask import Blueprint, jsonify, request
from flask_restful import Resource, Api
import subprocess
from selfprivacy_api import resources
api_users = Blueprint("api_users", __name__)
api = Api(api_users)
# Create a new user
class Users(Resource):
def post(self):
rawPassword = request.headers.get("X-Password")
hashingCommand = """
mkpasswd -m sha-512 {0}
""".format(
rawPassword
)
passwordHashProcessDescriptor = subprocess.Popen(
hashingCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
hashedPassword = passwordHashProcessDescriptor.communicate()[0]
hashedPassword = hashedPassword.decode("ascii")
hashedPassword = hashedPassword.rstrip()
print("[TRACE] {0}".format(hashedPassword))
print("[INFO] Opening /etc/nixos/users.nix...", sep="")
readOnlyFileDescriptor = open("/etc/nixos/users.nix", "r")
print("done")
fileContent = list()
index = int(0)
print("[INFO] Reading file content...", sep="")
while True:
line = readOnlyFileDescriptor.readline()
if not line:
break
else:
fileContent.append(line)
print("[DEBUG] Read line!")
userTemplate = """
#begin
\"{0}\" = {{
isNormalUser = true;
hashedPassword = \"{1}\";
}};
#end
""".format(
request.headers.get("X-User"), hashedPassword
)
mailUserTemplate = """
\"{0}@{2}\" = {{
hashedPassword =
\"{1}\";
catchAll = [ \"{2}\" ];
sieveScript = ''
require [\"fileinto\", \"mailbox\"];
if header :contains \"Chat-Version\" \"1.0\"
{{
fileinto :create \"DeltaChat\";
stop;
}}
'';
}};""".format(
request.headers.get("X-User"),
hashedPassword,
request.headers.get("X-Domain"),
)
for line in fileContent:
index += 1
if line.startswith(" #begin"):
print("[DEBUG] Found user configuration snippet match!")
print(
"[INFO] Writing new user configuration snippet to memory...", sep=""
)
fileContent.insert(index - 1, userTemplate)
print("done")
break
print("[INFO] Writing data from memory to file...", sep="")
readWriteFileDescriptor = open("/etc/nixos/users.nix", "w")
userConfigurationWriteOperationResult = readWriteFileDescriptor.writelines(
fileContent
)
print("done")
readOnlyFileDescriptor.close()
readWriteFileDescriptor.close()
print(
"[INFO] Opening /etc/nixos/mailserver/system/mailserver.nix.nix for reading...",
sep="",
)
readOnlyFileDescriptor = open("/etc/nixos/mailserver/system/mailserver.nix")
print("done")
fileContent = list()
index = int(0)
while True:
line = readOnlyFileDescriptor.readline()
if not line:
break
else:
fileContent.append(line)
print("[DEBUG] Read line!")
for line in fileContent:
if line.startswith(" loginAccounts = {"):
print("[DEBUG] Found mailuser configuration snippet match!")
print(
"[INFO] Writing new user configuration snippet to memory...", sep=""
)
fileContent.insert(index + 1, mailUserTemplate)
print("done")
break
index += 1
readWriteFileDescriptor = open(
"/etc/nixos/mailserver/system/mailserver.nix", "w"
)
mailUserConfigurationWriteOperationResult = readWriteFileDescriptor.writelines(
fileContent
)
return {
"result": 0,
"descriptor0": userConfigurationWriteOperationResult,
"descriptor1": mailUserConfigurationWriteOperationResult,
}
def delete(self):
user = subprocess.Popen(["userdel", request.headers.get("X-User")])
user.communicate()[0]
return user.returncode

7
selfprivacy_api/utils.py Normal file
View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
# Get domain from /var/domain without trailing new line
def get_domain():
with open("/var/domain", "r") as f:
domain = f.readline().rstrip()
return domain

View file

@ -1,7 +1,8 @@
from distutils.core import setup
from setuptools import setup, find_packages
setup(
name='selfprivacy-api',
version='1.0.1',
scripts=['main.py',],
name='selfprivacy_api',
version='1.1.0',
packages=find_packages(),
scripts=['selfprivacy_api/app.py',],
)