selfprivacy-rest-api/selfprivacy_api/resources/users.py

153 lines
4.6 KiB
Python
Raw Normal View History

2021-11-11 18:31:28 +00:00
#!/usr/bin/env python3
2021-11-16 16:14:01 +00:00
"""Users management module"""
2021-11-11 18:31:28 +00:00
import subprocess
import re
2021-11-16 16:14:01 +00:00
from flask_restful import Resource, reqparse
2021-11-11 18:31:28 +00:00
from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden
2021-11-22 16:50:50 +00:00
2021-11-11 18:31:28 +00:00
class Users(Resource):
2021-11-16 16:14:01 +00:00
"""Users management"""
def get(self):
"""
Get a list of users
---
tags:
- Users
security:
- bearerAuth: []
responses:
200:
description: A list of users
401:
description: Unauthorized
"""
2021-11-22 16:50:50 +00:00
with ReadUserData() as data:
users = []
if "users" in data:
for user in data["users"]:
users.append(user["username"])
2021-11-16 16:14:01 +00:00
return users
2021-11-11 18:31:28 +00:00
def post(self):
2021-11-16 16:14:01 +00:00
"""
Create a new user
---
consumes:
- application/json
tags:
- Users
security:
- bearerAuth: []
parameters:
- in: body
name: user
required: true
description: User to create
schema:
type: object
required:
- username
- password
properties:
username:
type: string
description: Unix username. Must be alphanumeric and less than 32 characters
password:
type: string
description: Unix password.
responses:
201:
description: Created user
400:
description: Bad request
401:
description: Unauthorized
409:
description: User already exists
"""
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument("username", type=str, required=True)
parser.add_argument("password", type=str, required=True)
args = parser.parse_args()
hashing_command = ["mkpasswd", "-m", "sha-512", args["password"]]
password_hash_process_descriptor = subprocess.Popen(
hashing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
2021-11-11 18:31:28 +00:00
)
2021-11-16 16:14:01 +00:00
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
# Check if username is forbidden
if is_username_forbidden(args["username"]):
return {"message": "Username is forbidden"}, 409
2021-11-16 16:14:01 +00:00
# Check is username passes regex
if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]):
return {"error": "username must be alphanumeric"}, 400
# Check if username less than 32 characters
if len(args["username"]) > 32:
return {"error": "username must be less than 32 characters"}, 400
2021-11-11 18:31:28 +00:00
2021-11-22 16:50:50 +00:00
with WriteUserData() as data:
if "users" not in data:
data["users"] = []
2021-11-22 16:50:50 +00:00
# Return 400 if user already exists
for user in data["users"]:
if user["username"] == args["username"]:
return {"error": "User already exists"}, 409
2021-11-16 16:14:01 +00:00
2021-11-22 16:50:50 +00:00
data["users"].append(
{
"username": args["username"],
"hashedPassword": hashed_password,
}
)
2021-11-16 16:14:01 +00:00
return {"result": 0, "username": args["username"]}, 201
2021-11-11 18:31:28 +00:00
2021-11-16 16:14:01 +00:00
class User(Resource):
"""Single user managment"""
2021-11-11 18:31:28 +00:00
2021-11-16 16:14:01 +00:00
def delete(self, username):
"""
Delete a user
---
tags:
- Users
security:
- bearerAuth: []
parameters:
- in: path
name: username
required: true
description: User to delete
type: string
responses:
200:
description: Deleted user
400:
description: Bad request
401:
description: Unauthorized
404:
description: User not found
"""
2021-11-22 16:50:50 +00:00
with WriteUserData() as data:
if username == data["username"]:
return {"error": "Cannot delete root user"}, 400
# Return 400 if user does not exist
for user in data["users"]:
if user["username"] == username:
data["users"].remove(user)
break
else:
return {"error": "User does not exist"}, 404
2021-11-16 16:14:01 +00:00
return {"result": 0, "username": username}