selfprivacy-rest-api/selfprivacy_api/graphql/mutations/api_mutations.py

220 lines
6.5 KiB
Python
Raw Normal View History

2022-06-29 17:39:46 +00:00
"""API access mutations"""
# pylint: disable=too-few-public-methods
import datetime
import typing
import strawberry
from strawberry.types import Info
from selfprivacy_api.actions.api_tokens import (
CannotDeleteCallerException,
InvalidExpirationDate,
InvalidUsesLeft,
NotFoundException,
delete_api_token,
get_new_api_recovery_key,
use_mnemonic_recovery_token,
refresh_api_token,
delete_new_device_auth_token,
get_new_device_auth_token,
use_new_device_auth_token,
)
2022-06-29 17:39:46 +00:00
from selfprivacy_api.graphql import IsAuthenticated
2022-07-07 13:53:19 +00:00
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericMutationReturn,
MutationReturnInterface,
)
2022-06-29 17:39:46 +00:00
@strawberry.type
class ApiKeyMutationReturn(MutationReturnInterface):
key: typing.Optional[str]
2022-07-07 13:53:19 +00:00
2022-07-05 12:11:41 +00:00
@strawberry.type
class DeviceApiTokenMutationReturn(MutationReturnInterface):
token: typing.Optional[str]
2022-07-07 13:53:19 +00:00
2022-06-29 17:39:46 +00:00
@strawberry.input
class RecoveryKeyLimitsInput:
"""Recovery key limits input"""
2022-07-07 13:53:19 +00:00
2022-07-08 15:28:08 +00:00
expiration_date: typing.Optional[datetime.datetime] = None
uses: typing.Optional[int] = None
2022-06-29 17:39:46 +00:00
2022-07-07 13:53:19 +00:00
2022-07-05 12:11:41 +00:00
@strawberry.input
class UseRecoveryKeyInput:
"""Use recovery key input"""
2022-07-07 13:53:19 +00:00
2022-07-05 12:11:41 +00:00
key: str
deviceName: str
2022-07-07 13:53:19 +00:00
2022-07-05 12:11:41 +00:00
@strawberry.input
class UseNewDeviceKeyInput:
"""Use new device key input"""
2022-07-07 13:53:19 +00:00
2022-07-05 12:11:41 +00:00
key: str
deviceName: str
2022-07-07 13:53:19 +00:00
2022-06-29 17:39:46 +00:00
@strawberry.type
class ApiMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
2022-07-07 13:53:19 +00:00
def get_new_recovery_api_key(
2022-07-08 15:28:08 +00:00
self, limits: typing.Optional[RecoveryKeyLimitsInput] = None
2022-07-07 13:53:19 +00:00
) -> ApiKeyMutationReturn:
2022-06-29 17:39:46 +00:00
"""Generate recovery key"""
if limits is None:
limits = RecoveryKeyLimitsInput()
try:
key = get_new_api_recovery_key(limits.expiration_date, limits.uses)
except InvalidExpirationDate:
return ApiKeyMutationReturn(
success=False,
message="Expiration date must be in the future",
code=400,
key=None,
)
except InvalidUsesLeft:
return ApiKeyMutationReturn(
success=False,
message="Uses must be greater than 0",
code=400,
key=None,
)
2022-06-29 17:39:46 +00:00
return ApiKeyMutationReturn(
success=True,
message="Recovery key generated",
code=200,
key=key,
)
2022-07-05 12:11:41 +00:00
@strawberry.mutation()
2022-07-07 13:53:19 +00:00
def use_recovery_api_key(
self, input: UseRecoveryKeyInput
) -> DeviceApiTokenMutationReturn:
2022-07-05 12:11:41 +00:00
"""Use recovery key"""
token = use_mnemonic_recovery_token(input.key, input.deviceName)
if token is not None:
return DeviceApiTokenMutationReturn(
success=True,
message="Recovery key used",
code=200,
token=token,
)
else:
2022-07-05 12:11:41 +00:00
return DeviceApiTokenMutationReturn(
success=False,
message="Recovery key not found",
code=404,
token=None,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def refresh_device_api_token(self, info: Info) -> DeviceApiTokenMutationReturn:
2022-07-05 12:11:41 +00:00
"""Refresh device api token"""
token_string = (
info.context["request"]
.headers.get("Authorization", "")
.replace("Bearer ", "")
2022-07-07 13:53:19 +00:00
)
if token_string is None:
2022-07-05 12:11:41 +00:00
return DeviceApiTokenMutationReturn(
success=False,
message="Token not found",
code=404,
token=None,
)
try:
new_token = refresh_api_token(token_string)
return DeviceApiTokenMutationReturn(
success=True,
message="Token refreshed",
code=200,
token=new_token,
)
except NotFoundException:
2022-07-05 12:11:41 +00:00
return DeviceApiTokenMutationReturn(
success=False,
message="Token not found",
code=404,
token=None,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def delete_device_api_token(self, device: str, info: Info) -> GenericMutationReturn:
2022-07-05 12:11:41 +00:00
"""Delete device api token"""
2022-07-07 13:53:19 +00:00
self_token = (
info.context["request"]
.headers.get("Authorization", "")
.replace("Bearer ", "")
2022-07-07 13:53:19 +00:00
)
try:
delete_api_token(self_token, device)
except NotFoundException:
2022-07-05 12:11:41 +00:00
return GenericMutationReturn(
success=False,
message="Token not found",
code=404,
)
except CannotDeleteCallerException:
return GenericMutationReturn(
success=False,
message="Cannot delete caller token",
2022-07-05 12:11:41 +00:00
code=400,
)
except Exception as e:
2022-07-05 12:11:41 +00:00
return GenericMutationReturn(
success=False,
message=str(e),
code=500,
2022-07-05 12:11:41 +00:00
)
return GenericMutationReturn(
success=True,
message="Token deleted",
code=200,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def get_new_device_api_key(self) -> ApiKeyMutationReturn:
"""Generate device api key"""
key = get_new_device_auth_token()
return ApiKeyMutationReturn(
success=True,
message="Device api key generated",
code=200,
key=key,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def invalidate_new_device_api_key(self) -> GenericMutationReturn:
"""Invalidate new device api key"""
delete_new_device_auth_token()
return GenericMutationReturn(
success=True,
message="New device key deleted",
code=200,
)
@strawberry.mutation()
2022-07-07 13:53:19 +00:00
def authorize_with_new_device_api_key(
self, input: UseNewDeviceKeyInput
) -> DeviceApiTokenMutationReturn:
2022-07-05 12:11:41 +00:00
"""Authorize with new device api key"""
token = use_new_device_auth_token(input.key, input.deviceName)
if token is None:
return DeviceApiTokenMutationReturn(
success=False,
message="Token not found",
code=404,
token=None,
)
return DeviceApiTokenMutationReturn(
success=True,
message="Token used",
code=200,
token=token,
)