fix(backups): censor out keys from error messages

We do not have any automated sending of errors to Selfprivacy
but it was inconvenient for people who want to send a
screenshot of their error.
This commit is contained in:
Houkime 2024-03-18 17:13:06 +00:00
parent b36701e31c
commit b40df670f8
2 changed files with 102 additions and 3 deletions

View file

@ -172,6 +172,21 @@ class ResticBackupper(AbstractBackupper):
return messages return messages
@staticmethod
def _replace_in_array(array: List[str], target, replacement) -> None:
if target == "":
return
for i, value in enumerate(array):
if target in value:
array[i] = array[i].replace(target, replacement)
def _censor_command(self, command: List[str]) -> List[str]:
result = command.copy()
ResticBackupper._replace_in_array(result, self.key, "CENSORED")
ResticBackupper._replace_in_array(result, LocalBackupSecret.get(), "CENSORED")
return result
@staticmethod @staticmethod
def _get_backup_job(service_name: str) -> Optional[Job]: def _get_backup_job(service_name: str) -> Optional[Job]:
service = get_service_by_id(service_name) service = get_service_by_id(service_name)
@ -218,7 +233,7 @@ class ResticBackupper(AbstractBackupper):
"Could not create a snapshot: ", "Could not create a snapshot: ",
str(error), str(error),
"command: ", "command: ",
backup_command, self._censor_command(backup_command),
) from error ) from error
@staticmethod @staticmethod

View file

@ -14,13 +14,14 @@ from selfprivacy_api.utils.huey import huey
from selfprivacy_api.services.service import ServiceStatus from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.graphql.queries.providers import BackupProvider from selfprivacy_api.graphql.queries.providers import BackupProvider as ProviderEnum
from selfprivacy_api.graphql.common_types.backup import ( from selfprivacy_api.graphql.common_types.backup import (
RestoreStrategy, RestoreStrategy,
BackupReason, BackupReason,
) )
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot from selfprivacy_api.models.backup.snapshot import Snapshot
@ -38,6 +39,8 @@ from selfprivacy_api.backup.tasks import (
reload_snapshot_cache, reload_snapshot_cache,
) )
from selfprivacy_api.backup.storage import Storage from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.local_secret import LocalBackupSecret
from selfprivacy_api.backup.jobs import get_backup_fail
REPO_NAME = "test_backup" REPO_NAME = "test_backup"
@ -188,6 +191,87 @@ def test_backup_service(dummy_service, backups):
assert_job_finished(f"services.{id}.backup", count=1) assert_job_finished(f"services.{id}.backup", count=1)
def all_job_text(job: Job) -> str:
# Use when we update to pydantic 2.xxx
# return Job.model_dump_json()
result = ""
if job.status_text is not None:
result += job.status_text
if job.description is not None:
result += job.description
if job.error is not None:
result += job.error
return result
def assert_job_errored(job: Job):
assert job is not None
assert job.status == JobStatus.ERROR
# consider adding a useful error message to an errored-out job
assert job.error is not None
assert job.error != ""
def test_error_censoring_encryptionkey(dummy_service, backups):
# Discard our key to inject a failure
old_key = LocalBackupSecret.get()
LocalBackupSecret.reset()
new_key = LocalBackupSecret.get()
with pytest.raises(ValueError):
# Should fail without correct key
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert old_key not in job_text
assert new_key not in job_text
# local backups do not have login key
# assert Backups.provider().key not in job_text
assert "CENSORED" in job_text
def test_error_censoring_loginkey(dummy_service, backups, fp):
# We do not want to screw up our teardown
old_provider = Backups.provider()
secret = "aSecretNYA"
Backups.set_provider(
BackupProvider.BACKBLAZE, login="meow", key=secret, location="moon"
)
assert Backups.provider().key == secret
# We could have called real backblaze but it is kind of not privacy so.
fp.allow_unregistered(True)
fp.register(
["restic", fp.any()],
returncode=1,
stdout="only real cats are allowed",
# We do not want to suddenly call real backblaze even if code changes
occurrences=100,
)
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert secret not in job_text
assert job_text.count("CENSORED") == 2
# We do not want to screw up our teardown
Storage.store_provider(old_provider)
def test_no_repo(memory_backup): def test_no_repo(memory_backup):
with pytest.raises(ValueError): with pytest.raises(ValueError):
assert memory_backup.backupper.get_snapshots() == [] assert memory_backup.backupper.get_snapshots() == []