Merge pull request 'Censor out secret keys from backup error messages' (#108) from censor-errors into master

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/108
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
This commit is contained in:
houkime 2024-03-20 14:18:39 +02:00
commit 3302fe2818
6 changed files with 111 additions and 4 deletions

2
.mypy.ini Normal file
View file

@ -0,0 +1,2 @@
[mypy]
plugins = pydantic.mypy

View file

@ -259,7 +259,7 @@ class Backups:
Backups._prune_auto_snaps(service)
service.post_restore()
except Exception as error:
Jobs.update(job, status=JobStatus.ERROR, status_text=str(error))
Jobs.update(job, status=JobStatus.ERROR, error=str(error))
raise error
Jobs.update(job, status=JobStatus.FINISHED)

View file

@ -172,6 +172,21 @@ class ResticBackupper(AbstractBackupper):
return messages
@staticmethod
def _replace_in_array(array: List[str], target, replacement) -> None:
if target == "":
return
for i, value in enumerate(array):
if target in value:
array[i] = array[i].replace(target, replacement)
def _censor_command(self, command: List[str]) -> List[str]:
result = command.copy()
ResticBackupper._replace_in_array(result, self.key, "CENSORED")
ResticBackupper._replace_in_array(result, LocalBackupSecret.get(), "CENSORED")
return result
@staticmethod
def _get_backup_job(service_name: str) -> Optional[Job]:
service = get_service_by_id(service_name)
@ -218,7 +233,7 @@ class ResticBackupper(AbstractBackupper):
"Could not create a snapshot: ",
str(error),
"command: ",
backup_command,
self._censor_command(backup_command),
) from error
@staticmethod

View file

@ -21,6 +21,8 @@ PROVIDER_MAPPING: dict[BackupProviderEnum, Type[AbstractBackupProvider]] = {
def get_provider(
provider_type: BackupProviderEnum,
) -> Type[AbstractBackupProvider]:
if provider_type not in PROVIDER_MAPPING.keys():
raise LookupError("could not look up provider", provider_type)
return PROVIDER_MAPPING[provider_type]

View file

@ -2,6 +2,8 @@ import json
from datetime import datetime, timezone, timedelta
from mnemonic import Mnemonic
from selfprivacy_api.jobs import Job, JobStatus
# for expiration tests. If headache, consider freezegun
RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
@ -79,3 +81,12 @@ def assert_recovery_recent(time_generated: str):
assert datetime.fromisoformat(time_generated) - timedelta(seconds=5) < datetime.now(
timezone.utc
)
def assert_job_errored(job: Job):
assert job is not None
assert job.status == JobStatus.ERROR
# consider adding a useful error message to an errored-out job
assert job.error is not None
assert job.error != ""

View file

@ -14,13 +14,14 @@ from selfprivacy_api.utils.huey import huey
from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.graphql.queries.providers import BackupProvider as ProviderEnum
from selfprivacy_api.graphql.common_types.backup import (
RestoreStrategy,
BackupReason,
)
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot
@ -38,6 +39,10 @@ from selfprivacy_api.backup.tasks import (
reload_snapshot_cache,
)
from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.local_secret import LocalBackupSecret
from selfprivacy_api.backup.jobs import get_backup_fail
from tests.common import assert_job_errored
REPO_NAME = "test_backup"
@ -188,6 +193,78 @@ def test_backup_service(dummy_service, backups):
assert_job_finished(f"services.{id}.backup", count=1)
def all_job_text(job: Job) -> str:
# Use when we update to pydantic 2.xxx
# return Job.model_dump_json()
result = ""
if job.status_text is not None:
result += job.status_text
if job.description is not None:
result += job.description
if job.error is not None:
result += job.error
return result
def test_error_censoring_encryptionkey(dummy_service, backups):
# Discard our key to inject a failure
old_key = LocalBackupSecret.get()
LocalBackupSecret.reset()
new_key = LocalBackupSecret.get()
with pytest.raises(ValueError):
# Should fail without correct key
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert old_key not in job_text
assert new_key not in job_text
# local backups do not have login key
# assert Backups.provider().key not in job_text
assert "CENSORED" in job_text
def test_error_censoring_loginkey(dummy_service, backups, fp):
# We do not want to screw up our teardown
old_provider = Backups.provider()
secret = "aSecretNYA"
Backups.set_provider(
ProviderEnum.BACKBLAZE, login="meow", key=secret, location="moon"
)
assert Backups.provider().key == secret
# We could have called real backblaze but it is kind of not privacy so.
fp.allow_unregistered(True)
fp.register(
["restic", fp.any()],
returncode=1,
stdout="only real cats are allowed",
# We do not want to suddenly call real backblaze even if code changes
occurrences=100,
)
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert secret not in job_text
assert job_text.count("CENSORED") == 2
# We do not want to screw up our teardown
Storage.store_provider(old_provider)
def test_no_repo(memory_backup):
with pytest.raises(ValueError):
assert memory_backup.backupper.get_snapshots() == []