mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-17 08:02:36 +00:00
554 lines
16 KiB
Python
Executable file
554 lines
16 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
from flask import Flask, jsonify, request, json
|
|
from flask_restful import Resource, Api, reqparse
|
|
import base64
|
|
import pandas as pd
|
|
import ast
|
|
import subprocess
|
|
import os
|
|
import fileinput
|
|
|
|
|
|
app = Flask(__name__)
|
|
api = Api(app)
|
|
|
|
|
|
@app.route("/systemVersion", methods=["GET"])
|
|
def Uname():
|
|
uname = subprocess.check_output(["uname", "-arm"])
|
|
return jsonify(uname)
|
|
|
|
|
|
@app.route("/getDKIM", methods=["GET"])
|
|
def GetDkimKey():
|
|
with open("/var/domain") as domainFile:
|
|
domain = domainFile.readline()
|
|
domain = domain.rstrip("\n")
|
|
catProcess = subprocess.Popen(["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE)
|
|
dkim = catProcess.communicate()[0]
|
|
dkim = base64.b64encode(dkim)
|
|
dkim = str(dkim, 'utf-8')
|
|
print(dkim)
|
|
response = app.response_class(
|
|
response=json.dumps(dkim),
|
|
status=200,
|
|
mimetype='application/json'
|
|
)
|
|
return response
|
|
|
|
|
|
@app.route("/pythonVersion", methods=["GET"])
|
|
def GetPythonVersion():
|
|
pythonVersion = subprocess.check_output(["python","--version"])
|
|
return jsonify(pythonVersion)
|
|
|
|
|
|
@app.route("/system/configuration/apply", methods=["GET"])
|
|
def RebuildSystem():
|
|
rebuildResult = subprocess.Popen(["nixos-rebuild","switch"])
|
|
rebuildResult.communicate()[0]
|
|
return jsonify(
|
|
status=rebuildResult.returncode
|
|
)
|
|
|
|
|
|
@app.route("/system/configuration/rollback", methods=["GET"])
|
|
def RollbackSystem():
|
|
rollbackResult = subprocess.Popen(["nixos-rebuild","switch","--rollback"])
|
|
rollbackResult.communicate()[0]
|
|
return jsonify(rollbackResult.returncode)
|
|
|
|
|
|
@app.route("/system/upgrade", methods=["GET"])
|
|
def UpgradeSystem():
|
|
upgradeResult = subprocess.Popen(["nixos-rebuild","switch","--upgrade"])
|
|
upgradeResult.communicate()[0]
|
|
return jsonify(
|
|
status=upgradeResult.returncode
|
|
)
|
|
|
|
|
|
@app.route("/users/create", methods=["POST"])
|
|
def CreateUser():
|
|
|
|
rawPassword = request.headers.get("X-Password")
|
|
hashingCommand = '''
|
|
mkpasswd -m sha-512 {0}
|
|
'''.format(rawPassword)
|
|
passwordHashProcessDescriptor = subprocess.Popen(hashingCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
hashedPassword = passwordHashProcessDescriptor.communicate()[0]
|
|
hashedPassword = hashedPassword.decode("ascii")
|
|
hashedPassword = hashedPassword.rstrip()
|
|
|
|
print("[TRACE] {0}".format(hashedPassword))
|
|
|
|
print("[INFO] Opening /etc/nixos/users.nix...", sep="")
|
|
readOnlyFileDescriptor = open("/etc/nixos/users.nix", "r")
|
|
print("done")
|
|
fileContent = list()
|
|
index = int(0)
|
|
|
|
print("[INFO] Reading file content...", sep="")
|
|
|
|
while True:
|
|
line = readOnlyFileDescriptor.readline()
|
|
|
|
if not line:
|
|
break
|
|
else:
|
|
fileContent.append(line)
|
|
print("[DEBUG] Read line!")
|
|
|
|
|
|
userTemplate = """
|
|
|
|
#begin
|
|
\"{0}\" = {{
|
|
isNormalUser = true;
|
|
hashedPassword = \"{1}\";
|
|
}};
|
|
#end
|
|
""".format(request.headers.get("X-User"), hashedPassword)
|
|
|
|
mailUserTemplate = """
|
|
\"{0}@{2}\" = {{
|
|
hashedPassword =
|
|
\"{1}\";
|
|
catchAll = [ \"{2}\" ];
|
|
|
|
sieveScript = ''
|
|
require [\"fileinto\", \"mailbox\"];
|
|
if header :contains \"Chat-Version\" \"1.0\"
|
|
{{
|
|
fileinto :create \"DeltaChat\";
|
|
stop;
|
|
}}
|
|
'';
|
|
}};""".format(request.headers.get("X-User"), hashedPassword, request.headers.get("X-Domain"))
|
|
|
|
for line in fileContent:
|
|
index += 1
|
|
if line.startswith(" #begin"):
|
|
print("[DEBUG] Found user configuration snippet match!")
|
|
print("[INFO] Writing new user configuration snippet to memory...", sep="")
|
|
fileContent.insert(index-1, userTemplate)
|
|
print("done")
|
|
break
|
|
|
|
print("[INFO] Writing data from memory to file...", sep="")
|
|
readWriteFileDescriptor = open("/etc/nixos/users.nix", "w")
|
|
userConfigurationWriteOperationResult = readWriteFileDescriptor.writelines(fileContent)
|
|
print("done")
|
|
|
|
readOnlyFileDescriptor.close()
|
|
readWriteFileDescriptor.close()
|
|
|
|
print("[INFO] Opening /etc/nixos/mailserver/system/mailserver.nix.nix for reading...", sep="")
|
|
readOnlyFileDescriptor = open("/etc/nixos/mailserver/system/mailserver.nix")
|
|
print("done")
|
|
|
|
fileContent = list()
|
|
index = int(0)
|
|
|
|
while True:
|
|
line = readOnlyFileDescriptor.readline()
|
|
|
|
if not line:
|
|
break
|
|
else:
|
|
fileContent.append(line)
|
|
print("[DEBUG] Read line!")
|
|
|
|
for line in fileContent:
|
|
if line.startswith(" loginAccounts = {"):
|
|
print("[DEBUG] Found mailuser configuration snippet match!")
|
|
print("[INFO] Writing new user configuration snippet to memory...", sep="")
|
|
fileContent.insert(index+1, mailUserTemplate)
|
|
print("done")
|
|
break
|
|
index += 1
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/mailserver/system/mailserver.nix", "w")
|
|
|
|
mailUserConfigurationWriteOperationResult = readWriteFileDescriptor.writelines(fileContent)
|
|
|
|
return jsonify(
|
|
result=0,
|
|
descriptor0 = userConfigurationWriteOperationResult,
|
|
descriptor1 = mailUserConfigurationWriteOperationResult
|
|
)
|
|
|
|
|
|
@app.route("/deleteUser", methods=["DELETE"])
|
|
def deleteUser():
|
|
user = subprocess.Popen(["userdel",request.headers.get("X-User")])
|
|
user.communicate()[0]
|
|
return jsonify(user.returncode)
|
|
|
|
|
|
@app.route("/services/status", methods=["GET"])
|
|
|
|
def GetServiceStatus():
|
|
imapService = subprocess.Popen(["systemctl", "status", "dovecot2.service"])
|
|
imapService.communicate()[0]
|
|
smtpService = subprocess.Popen(["systemctl", "status", "postfix.service"])
|
|
smtpService.communicate()[0]
|
|
httpService = subprocess.Popen(["systemctl", "status", "nginx.service"])
|
|
httpService.communicate()[0]
|
|
bitwardenService = subprocess.Popen(["systemctl", "status", "bitwarden_rs.service"])
|
|
bitwardenService.communicate()[0]
|
|
giteaService = subprocess.Popen(["systemctl", "status", "gitea.service"])
|
|
giteaService.communicate()[0]
|
|
nextcloudService = subprocess.Popen(["systemctl", "status", "phpfpm-nextcloud.service"])
|
|
nextcloudService.communicate()[0]
|
|
ocservService = subprocess.Popen(["systemctl", "status", "ocserv.service"])
|
|
ocservService.communicate()[0]
|
|
pleromaService = subprocess.Popen(["systemctl", "status", "pleroma.service"])
|
|
pleromaService.communicate()[0]
|
|
|
|
return jsonify(
|
|
imap=imapService.returncode,
|
|
smtp=smtpService.returncode,
|
|
http=httpService.returncode,
|
|
bitwarden=bitwardenService.returncode,
|
|
gitea=giteaService.returncode,
|
|
nextcloud=nextcloudService.returncode,
|
|
ocserv=ocservService.returncode,
|
|
pleroma=pleromaService.returncode
|
|
)
|
|
|
|
|
|
@app.route("/decryptDisk", methods=["POST"])
|
|
def RequestDiskDecryption():
|
|
|
|
decryptionCommand = '''
|
|
echo -n {0} | cryptsetup luksOpen /dev/sdb decryptedVar'''.format(request.headers.get("X-Decryption-Key"))
|
|
|
|
decryptionService = subprocess.Popen(decryptionCommand, shell=True, stdout=subprocess.PIPE)
|
|
decryptionService.communicate()
|
|
return jsonify(
|
|
status=decryptionService.returncode
|
|
)
|
|
|
|
|
|
@app.route("/services/ssh/enable", methods=["POST"])
|
|
|
|
def EnableSSH():
|
|
readOnlyFileDescriptor = open("/etc/nixos/configuration.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = false;", "enable = true;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/configuration.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
|
|
)
|
|
|
|
# Bitwarden
|
|
|
|
@app.route("/services/bitwarden/enable", methods=["POST"])
|
|
|
|
def EnableBitwarden():
|
|
readOnlyFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = false;", "enable = true;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
@app.route("/services/bitwarden/disable", methods=["POST"])
|
|
|
|
def DisableBitwarden():
|
|
|
|
readOnlyFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = true;", "enable = false;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/passmgr/bitwarden.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
|
|
#Gitea
|
|
|
|
@app.route("/services/gitea/disable", methods=["POST"])
|
|
|
|
def DisableGitea():
|
|
readOnlyFileDescriptor = open("/etc/nixos/git/gitea.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = true;", "enable = false;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/git/gitea.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
@app.route("/services/gitea/enable", methods=["POST"])
|
|
|
|
def EnableGitea():
|
|
readOnlyFileDescriptor = open("/etc/nixos/git/gitea.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = false;", "enable = true;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/git/gitea.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
#Nextcloud
|
|
|
|
@app.route("/services/nextcloud/disable", methods=["POST"])
|
|
|
|
def DisableNextcloud():
|
|
readOnlyFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = true;", "enable = false;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
@app.route("/services/nextcloud/enable", methods=["POST"])
|
|
|
|
def EnableNextcloud():
|
|
readOnlyFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = false;", "enable = true;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/nextcloud/nextcloud.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
#Pleroma
|
|
|
|
@app.route("/services/pleroma/disable", methods=["POST"])
|
|
|
|
def DisablePleroma():
|
|
readOnlyFileDescriptor = open("/etc/nixos/social/pleroma.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = true;", "enable = false;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/social/pleroma.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
@app.route("/services/pleroma/enable", methods=["POST"])
|
|
|
|
def EnablePleroma():
|
|
readOnlyFileDescriptor = open("/etc/nixos/social/pleroma.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = false;", "enable = true;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/social/pleroma.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
#Ocserv
|
|
|
|
@app.route("/services/ocserv/disable", methods=["POST"])
|
|
|
|
def DisableOcserv():
|
|
readOnlyFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = true;", "enable = false;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
@app.route("/services/ocserv/enable", methods=["POST"])
|
|
|
|
def EnableOcserv():
|
|
readOnlyFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "rt")
|
|
|
|
|
|
fileContent = readOnlyFileDescriptor.read()
|
|
|
|
fileContent = fileContent.replace("enable = true;", "enable = false;")
|
|
readOnlyFileDescriptor.close()
|
|
|
|
readWriteFileDescriptor = open("/etc/nixos/vpn/ocserv.nix", "wt")
|
|
|
|
writeOperationDescriptor = readWriteFileDescriptor.write(fileContent)
|
|
readWriteFileDescriptor.close()
|
|
|
|
return jsonify(
|
|
status=0,
|
|
descriptor=writeOperationDescriptor
|
|
)
|
|
|
|
@app.route("/services/restic/backup/list", methods=["GET"])
|
|
|
|
def ListAllBackups():
|
|
backupListingProcessDescriptor = subprocess.Popen(["restic", "-r", "b2:" +
|
|
request.headers.get("X-Repository-Name") + ":/sfbackup",
|
|
"snapshots", "list", "--password-file", "/var/lib/restic/rpass", "--json"
|
|
], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
|
|
snapshotsList = backupListingProcessDescriptor.communicate()[0]
|
|
|
|
return snapshotsList
|
|
|
|
|
|
|
|
|
|
@app.route("/services/restic/backup/create", methods=["PUT"])
|
|
|
|
def CreateSingleBackup():
|
|
|
|
backupCommand = '''
|
|
restic -r b2:{0}:/sfbackup --verbose backup /var --password-file /var/lib/restic/rpass
|
|
'''.format(request.headers.get("X-Repository-Name"))
|
|
|
|
backupProcessDescriptor = subprocess.Popen(backupCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
|
|
commandOutput=backupProcessDescriptor.communicate()[0]
|
|
|
|
return commandOutput
|
|
|
|
@app.route("/services/ssh/key/send", methods=["PUT"])
|
|
|
|
def ReadKey():
|
|
|
|
requestBody = request.get_json()
|
|
|
|
publicKey = requestBody.data(["public_key"])
|
|
|
|
|
|
print("[INFO] Opening /etc/nixos/configuration.nix...", sep="")
|
|
readOnlyFileDescriptor = open("/etc/nixos/configuration.nix", "r")
|
|
print("done")
|
|
fileContent = list()
|
|
index = int(0)
|
|
|
|
print("[INFO] Reading file content...", sep="")
|
|
|
|
while True:
|
|
line = readOnlyFileDescriptor.readline()
|
|
|
|
if not line:
|
|
break
|
|
else:
|
|
fileContent.append(line)
|
|
print("[DEBUG] Read line!")
|
|
|
|
for line in fileContent:
|
|
index += 1
|
|
if "openssh.authorizedKeys.keys = [" in line:
|
|
print("[DEBUG] Found SSH key configuration snippet match!")
|
|
print("[INFO] Writing new SSH key", sep="")
|
|
fileContent.append(index, "\n \"" + publicKey + "\"")
|
|
print("done")
|
|
break
|
|
|
|
print("[INFO] Writing data from memory to file...", sep="")
|
|
readWriteFileDescriptor = open("/etc/nixos/configuration.nix", "w")
|
|
print("done")
|
|
operationResult = readWriteFileDescriptor.writelines(fileContent)
|
|
|
|
|
|
return jsonify(
|
|
result=0,
|
|
descriptor = operationResult
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(port=5050, debug=False) |