selfprivacy-rest-api/selfprivacy_api/resources/users.py

182 lines
5.7 KiB
Python
Raw Normal View History

2021-11-11 18:31:28 +00:00
#!/usr/bin/env python3
2021-11-16 16:14:01 +00:00
"""Users management module"""
2021-11-11 18:31:28 +00:00
import subprocess
import json
import re
2021-11-16 16:14:01 +00:00
import portalocker
from flask_restful import Resource, reqparse
2021-11-11 18:31:28 +00:00
class Users(Resource):
2021-11-16 16:14:01 +00:00
"""Users management"""
def get(self):
"""
Get a list of users
---
tags:
- Users
security:
- bearerAuth: []
responses:
200:
description: A list of users
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_SH)
try:
data = json.load(userdata_file)
users = []
for user in data["users"]:
users.append(user["username"])
finally:
portalocker.unlock(userdata_file)
return users
2021-11-11 18:31:28 +00:00
def post(self):
2021-11-16 16:14:01 +00:00
"""
Create a new user
---
consumes:
- application/json
tags:
- Users
security:
- bearerAuth: []
parameters:
- in: body
name: user
required: true
description: User to create
schema:
type: object
required:
- username
- password
properties:
username:
type: string
description: Unix username. Must be alphanumeric and less than 32 characters
password:
type: string
description: Unix password.
responses:
201:
description: Created user
400:
description: Bad request
401:
description: Unauthorized
409:
description: User already exists
"""
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument("username", type=str, required=True)
parser.add_argument("password", type=str, required=True)
args = parser.parse_args()
hashing_command = ["mkpasswd", "-m", "sha-512", args["password"]]
password_hash_process_descriptor = subprocess.Popen(
hashing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
2021-11-11 18:31:28 +00:00
)
2021-11-16 16:14:01 +00:00
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
# Check is username passes regex
if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]):
return {"error": "username must be alphanumeric"}, 400
# Check if username less than 32 characters
if len(args["username"]) > 32:
return {"error": "username must be less than 32 characters"}, 400
2021-11-11 18:31:28 +00:00
2021-11-16 16:14:01 +00:00
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
2021-11-16 16:14:01 +00:00
data = json.load(userdata_file)
# Return 400 if user already exists
for user in data["users"]:
2021-11-16 16:14:01 +00:00
if user["username"] == args["username"]:
return {"error": "User already exists"}, 409
if "users" not in data:
data["users"] = []
data["users"].append(
{
2021-11-16 16:14:01 +00:00
"username": args["username"],
"hashedPassword": hashed_password,
}
2021-11-11 18:31:28 +00:00
)
2021-11-16 16:14:01 +00:00
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
2021-11-16 16:14:01 +00:00
portalocker.unlock(userdata_file)
return {"result": 0, "username": args["username"]}, 201
2021-11-11 18:31:28 +00:00
2021-11-16 16:14:01 +00:00
class User(Resource):
"""Single user managment"""
2021-11-11 18:31:28 +00:00
2021-11-16 16:14:01 +00:00
def delete(self, username):
"""
Delete a user
---
tags:
- Users
security:
- bearerAuth: []
parameters:
- in: path
name: username
required: true
description: User to delete
type: string
responses:
200:
description: Deleted user
400:
description: Bad request
401:
description: Unauthorized
404:
description: User not found
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
2021-11-16 16:14:01 +00:00
data = json.load(userdata_file)
# Return 400 if username is not provided
2021-11-16 16:14:01 +00:00
if username is None:
return {"error": "username is required"}, 400
2021-11-16 16:14:01 +00:00
if username == data["username"]:
return {"error": "Cannot delete root user"}, 400
# Return 400 if user does not exist
for user in data["users"]:
2021-11-16 16:14:01 +00:00
if user["username"] == username:
data["users"].remove(user)
break
else:
2021-11-16 16:14:01 +00:00
return {"error": "User does not exist"}, 404
2021-11-16 16:14:01 +00:00
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
2021-11-16 16:14:01 +00:00
portalocker.unlock(userdata_file)
2021-11-16 16:14:01 +00:00
return {"result": 0, "username": username}