mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-24 13:01:28 +00:00
163 lines
5.1 KiB
Python
163 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Users management module"""
|
|
import subprocess
|
|
import re
|
|
from flask_restful import Resource, reqparse
|
|
|
|
from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden
|
|
|
|
|
|
class Users(Resource):
|
|
"""Users management"""
|
|
|
|
def get(self):
|
|
"""
|
|
Get a list of users
|
|
---
|
|
tags:
|
|
- Users
|
|
security:
|
|
- bearerAuth: []
|
|
responses:
|
|
200:
|
|
description: A list of users
|
|
401:
|
|
description: Unauthorized
|
|
"""
|
|
parser = reqparse.RequestParser(bundle_errors=True)
|
|
parser.add_argument("withMainUser", type=bool, required=False)
|
|
args = parser.parse_args()
|
|
with_main_user = False if args["withMainUser"] is None else args["withMainUser"]
|
|
|
|
with ReadUserData() as data:
|
|
users = []
|
|
if with_main_user:
|
|
users.append(data["username"])
|
|
if "users" in data:
|
|
for user in data["users"]:
|
|
users.append(user["username"])
|
|
return users
|
|
|
|
def post(self):
|
|
"""
|
|
Create a new user
|
|
---
|
|
consumes:
|
|
- application/json
|
|
tags:
|
|
- Users
|
|
security:
|
|
- bearerAuth: []
|
|
parameters:
|
|
- in: body
|
|
name: user
|
|
required: true
|
|
description: User to create
|
|
schema:
|
|
type: object
|
|
required:
|
|
- username
|
|
- password
|
|
properties:
|
|
username:
|
|
type: string
|
|
description: Unix username. Must be alphanumeric and less than 32 characters
|
|
password:
|
|
type: string
|
|
description: Unix password.
|
|
responses:
|
|
201:
|
|
description: Created user
|
|
400:
|
|
description: Bad request
|
|
401:
|
|
description: Unauthorized
|
|
409:
|
|
description: User already exists
|
|
"""
|
|
parser = reqparse.RequestParser(bundle_errors=True)
|
|
parser.add_argument("username", type=str, required=True)
|
|
parser.add_argument("password", type=str, required=True)
|
|
args = parser.parse_args()
|
|
hashing_command = ["mkpasswd", "-m", "sha-512", args["password"]]
|
|
password_hash_process_descriptor = subprocess.Popen(
|
|
hashing_command,
|
|
shell=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
hashed_password = password_hash_process_descriptor.communicate()[0]
|
|
hashed_password = hashed_password.decode("ascii")
|
|
hashed_password = hashed_password.rstrip()
|
|
# Check if username is forbidden
|
|
if is_username_forbidden(args["username"]):
|
|
return {"message": "Username is forbidden"}, 409
|
|
# Check is username passes regex
|
|
if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]):
|
|
return {"error": "username must be alphanumeric"}, 400
|
|
# Check if username less than 32 characters
|
|
if len(args["username"]) >= 32:
|
|
return {"error": "username must be less than 32 characters"}, 400
|
|
|
|
with WriteUserData() as data:
|
|
if "users" not in data:
|
|
data["users"] = []
|
|
|
|
# Return 409 if user already exists
|
|
if data["username"] == args["username"]:
|
|
return {"error": "User already exists"}, 409
|
|
|
|
for user in data["users"]:
|
|
if user["username"] == args["username"]:
|
|
return {"error": "User already exists"}, 409
|
|
|
|
data["users"].append(
|
|
{
|
|
"username": args["username"],
|
|
"hashedPassword": hashed_password,
|
|
}
|
|
)
|
|
|
|
return {"result": 0, "username": args["username"]}, 201
|
|
|
|
|
|
class User(Resource):
|
|
"""Single user managment"""
|
|
|
|
def delete(self, username):
|
|
"""
|
|
Delete a user
|
|
---
|
|
tags:
|
|
- Users
|
|
security:
|
|
- bearerAuth: []
|
|
parameters:
|
|
- in: path
|
|
name: username
|
|
required: true
|
|
description: User to delete
|
|
type: string
|
|
responses:
|
|
200:
|
|
description: Deleted user
|
|
400:
|
|
description: Bad request
|
|
401:
|
|
description: Unauthorized
|
|
404:
|
|
description: User not found
|
|
"""
|
|
with WriteUserData() as data:
|
|
if username == data["username"]:
|
|
return {"error": "Cannot delete root user"}, 400
|
|
# Return 400 if user does not exist
|
|
for user in data["users"]:
|
|
if user["username"] == username:
|
|
data["users"].remove(user)
|
|
break
|
|
else:
|
|
return {"error": "User does not exist"}, 404
|
|
|
|
return {"result": 0, "username": username}
|