test(backups): test reinitting repository

This commit is contained in:
Houkime 2023-06-16 13:43:41 +00:00
parent b3724e240e
commit 33c60f971d
3 changed files with 73 additions and 7 deletions

View file

@ -135,7 +135,7 @@ class Backups:
@staticmethod @staticmethod
def set_provider(kind: str, login: str, key: str, location: str, repo_id: str = ""): def set_provider(kind: str, login: str, key: str, location: str, repo_id: str = ""):
provider = Backups.construct_provider(kind, login, key, location, id) provider = Backups.construct_provider(kind, login, key, location, repo_id)
Storage.store_provider(provider) Storage.store_provider(provider)
@staticmethod @staticmethod

View file

@ -48,15 +48,17 @@ class BackupMutations:
self, repository: InitializeRepositoryInput self, repository: InitializeRepositoryInput
) -> GenericBackupConfigReturn: ) -> GenericBackupConfigReturn:
"""Initialize a new repository""" """Initialize a new repository"""
provider = Backups.construct_provider( Backups.set_provider(
kind=repository.provider, kind=repository.provider.value,
login=repository.login, login=repository.login,
key=repository.password, key=repository.password,
location=repository.location_name, location=repository.location_name,
repo_id=repository.location_id, repo_id=repository.location_id,
) )
Backups.set_provider(provider)
Backups.init_repo() Backups.init_repo()
return GenericBackupConfigReturn(
success=True, message="", code="200", configuration=Backup().configuration()
)
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def remove_repository(self) -> GenericBackupConfigReturn: def remove_repository(self) -> GenericBackupConfigReturn:
@ -73,9 +75,7 @@ class BackupMutations:
return Backup.configuration() return Backup.configuration()
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def start_backup( def start_backup(self, service_id: str) -> GenericJobButationReturn:
self, service_id: typing.Optional[str] = None
) -> GenericJobButationReturn:
"""Start backup""" """Start backup"""
service = get_service_by_id(service_id) service = get_service_by_id(service_id)

View file

@ -1,3 +1,4 @@
from os import path
from tests.test_graphql.test_backup import dummy_service, backups, raw_dummy_service from tests.test_graphql.test_backup import dummy_service, backups, raw_dummy_service
from tests.common import generate_backup_query from tests.common import generate_backup_query
@ -5,6 +6,23 @@ from tests.common import generate_backup_query
from selfprivacy_api.graphql.common_types.service import service_to_graphql_service from selfprivacy_api.graphql.common_types.service import service_to_graphql_service
from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.jobs import Jobs, JobStatus
API_INIT_MUTATION = """
mutation TestInitRepo($input: InitializeRepositoryInput!) {
initializeRepository(repository: $input) {
success
message
code
configuration {
provider
encryptionKey
isInitialized
autobackupPeriod
locationName
locationId
}
}
}
"""
API_RESTORE_MUTATION = """ API_RESTORE_MUTATION = """
mutation TestRestoreService($snapshot_id: String!) { mutation TestRestoreService($snapshot_id: String!) {
@ -67,6 +85,32 @@ def api_backup(authorized_client, service):
return response return response
def api_init_without_key(
authorized_client, kind, login, password, location_name, location_id
):
response = authorized_client.post(
"/graphql",
json={
"query": API_INIT_MUTATION,
"variables": {
"input": {
"provider": kind,
"locationId": location_id,
"locationName": location_name,
"login": login,
"password": password,
}
},
},
)
return response
def assert_ok(data):
assert data["code"] == 200
assert data["success"] is True
def get_data(response): def get_data(response):
assert response.status_code == 200 assert response.status_code == 200
response = response.json() response = response.json()
@ -127,3 +171,25 @@ def test_restore(authorized_client, dummy_service):
job = data["job"] job = data["job"]
assert Jobs.get_job(job["uid"]).status == JobStatus.FINISHED assert Jobs.get_job(job["uid"]).status == JobStatus.FINISHED
def test_reinit(authorized_client, dummy_service, tmpdir):
test_repo_path = path.join(tmpdir, "not_at_all_sus")
response = api_init_without_key(
authorized_client, "FILE", "", "", test_repo_path, ""
)
data = get_data(response)["initializeRepository"]
assert_ok(data)
configuration = data["configuration"]
assert configuration["provider"] == "FILE"
assert configuration["locationId"] == ""
assert configuration["locationName"] == test_repo_path
assert len(configuration["encryptionKey"]) > 1
assert configuration["isInitialized"] is True
response = api_backup(authorized_client, dummy_service)
data = get_data(response)["startBackup"]
assert data["success"] is True
job = data["job"]
assert Jobs.get_job(job["uid"]).status == JobStatus.FINISHED