selfprivacy-rest-api/selfprivacy_api/utils/__init__.py

190 lines
5.5 KiB
Python
Raw Normal View History

2021-11-11 18:31:28 +00:00
#!/usr/bin/env python3
2021-11-16 16:14:01 +00:00
"""Various utility functions"""
2022-06-24 17:08:58 +00:00
import datetime
2022-01-14 05:38:53 +00:00
from enum import Enum
2021-11-22 16:50:50 +00:00
import json
2022-07-05 05:14:37 +00:00
import os
import subprocess
2021-11-22 16:50:50 +00:00
import portalocker
2021-11-16 16:14:01 +00:00
2021-11-11 18:31:28 +00:00
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
2022-01-14 05:38:53 +00:00
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
JOBS_FILE = "/etc/nixos/userdata/jobs.json"
DOMAIN_FILE = "/var/domain"
2021-11-30 21:53:39 +00:00
2022-01-14 05:38:53 +00:00
class UserDataFiles(Enum):
"""Enum for userdata files"""
USERDATA = 0
TOKENS = 1
JOBS = 2
2022-01-14 05:38:53 +00:00
2021-11-11 18:31:28 +00:00
def get_domain():
2021-11-16 16:14:01 +00:00
"""Get domain from /var/domain without trailing new line"""
with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file:
2021-11-16 16:14:01 +00:00
domain = domain_file.readline().rstrip()
2021-11-11 18:31:28 +00:00
return domain
2021-11-22 16:50:50 +00:00
class WriteUserData(object):
"""Write userdata.json with lock"""
2022-01-14 05:38:53 +00:00
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8")
elif file_type == UserDataFiles.JOBS:
# Make sure file exists
if not os.path.isfile(JOBS_FILE):
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
jobs_file.write("[]")
self.userdata_file = open(JOBS_FILE, "r+", encoding="utf-8")
2022-01-14 05:38:53 +00:00
else:
raise ValueError("Unknown file type")
2021-11-22 16:50:50 +00:00
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
self.data = json.load(self.userdata_file)
def __enter__(self):
return self.data
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.userdata_file.seek(0)
json.dump(self.data, self.userdata_file, indent=4)
self.userdata_file.truncate()
portalocker.unlock(self.userdata_file)
self.userdata_file.close()
class ReadUserData(object):
"""Read userdata.json with lock"""
2022-01-14 05:38:53 +00:00
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.JOBS:
2022-08-15 18:51:01 +00:00
# Make sure file exists
if not os.path.isfile(JOBS_FILE):
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
jobs_file.write("[]")
self.userdata_file = open(JOBS_FILE, "r", encoding="utf-8")
2022-01-14 05:38:53 +00:00
else:
raise ValueError("Unknown file type")
2021-11-22 16:50:50 +00:00
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
self.data = json.load(self.userdata_file)
2022-07-25 14:08:31 +00:00
def __enter__(self) -> dict:
2021-11-22 16:50:50 +00:00
return self.data
def __exit__(self, *args):
portalocker.unlock(self.userdata_file)
self.userdata_file.close()
2021-11-23 18:32:51 +00:00
def validate_ssh_public_key(key):
"""Validate SSH public key. It may be ssh-ed25519 or ssh-rsa."""
if not key.startswith("ssh-ed25519"):
if not key.startswith("ssh-rsa"):
return False
return True
def is_username_forbidden(username):
forbidden_prefixes = ["systemd", "nixbld"]
forbidden_usernames = [
"root",
"messagebus",
"postfix",
"polkituser",
"dovecot2",
"dovenull",
"nginx",
"postgres",
"prosody",
"opendkim",
"rspamd",
"sshd",
"selfprivacy-api",
"restic",
"redis",
"pleroma",
"ocserv",
"nextcloud",
"memcached",
"knot-resolver",
"gitea",
"bitwarden_rs",
"vaultwarden",
"acme",
"virtualMail",
"nobody",
]
for prefix in forbidden_prefixes:
if username.startswith(prefix):
return True
for forbidden_username in forbidden_usernames:
if username == forbidden_username:
return True
return False
2022-06-24 17:08:58 +00:00
2022-06-24 18:14:20 +00:00
2022-06-24 17:08:58 +00:00
def parse_date(date_str: str) -> datetime.datetime:
2022-07-08 15:28:08 +00:00
"""Parse date string which can be in one of these formats:
- %Y-%m-%dT%H:%M:%S.%fZ
- %Y-%m-%dT%H:%M:%S.%f
- %Y-%m-%d %H:%M:%S.%fZ
- %Y-%m-%d %H:%M:%S.%f
"""
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%fZ")
except ValueError:
pass
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
pass
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
pass
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
pass
raise ValueError("Invalid date string")
2022-07-05 05:14:37 +00:00
2022-07-07 13:53:19 +00:00
2022-07-05 05:14:37 +00:00
def get_dkim_key(domain):
"""Get DKIM key from /var/dkim/<domain>.selector.txt"""
if os.path.exists("/var/dkim/" + domain + ".selector.txt"):
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = cat_process.communicate()[0]
return str(dkim, "utf-8")
return None
def hash_password(password):
hashing_command = ["mkpasswd", "-m", "sha-512", password]
password_hash_process_descriptor = subprocess.Popen(
hashing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
return hashed_password