mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-05 23:54:19 +00:00
Merge pull request 'autobackup-job-ttl' (#62) from autobackup-job-ttl into master
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/62 Reviewed-by: Inex Code <inex.code@selfprivacy.org>
This commit is contained in:
commit
badd619dd0
|
@ -56,6 +56,8 @@ BACKUP_PROVIDER_ENVS = {
|
|||
"location": "BACKUP_LOCATION",
|
||||
}
|
||||
|
||||
AUTOBACKUP_JOB_EXPIRATION_SECONDS = 60 * 60 # one hour
|
||||
|
||||
|
||||
class NotDeadError(AssertionError):
|
||||
"""
|
||||
|
@ -316,6 +318,8 @@ class Backups:
|
|||
raise error
|
||||
|
||||
Jobs.update(job, status=JobStatus.FINISHED)
|
||||
if reason in [BackupReason.AUTO, BackupReason.PRE_RESTORE]:
|
||||
Jobs.set_expiration(job, AUTOBACKUP_JOB_EXPIRATION_SECONDS)
|
||||
return snapshot
|
||||
|
||||
@staticmethod
|
||||
|
|
|
@ -224,6 +224,14 @@ class Jobs:
|
|||
|
||||
return job
|
||||
|
||||
@staticmethod
|
||||
def set_expiration(job: Job, expiration_seconds: int) -> Job:
|
||||
redis = RedisPool().get_connection()
|
||||
key = _redis_key_from_uuid(job.uid)
|
||||
if redis.exists(key):
|
||||
redis.expire(key, expiration_seconds)
|
||||
return job
|
||||
|
||||
@staticmethod
|
||||
def get_job(uid: str) -> typing.Optional[Job]:
|
||||
"""
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# pylint: disable=redefined-outer-name
|
||||
# pylint: disable=unused-argument
|
||||
import pytest
|
||||
from time import sleep
|
||||
|
||||
from selfprivacy_api.jobs import Jobs, JobStatus
|
||||
import selfprivacy_api.jobs as jobsmodule
|
||||
|
@ -49,6 +50,20 @@ def test_remove_get_nonexistent(jobs_with_one_job):
|
|||
assert jobs_with_one_job.get_job(uid_str) is None
|
||||
|
||||
|
||||
def test_set_zeroing_ttl(jobs_with_one_job):
|
||||
test_job = jobs_with_one_job.get_jobs()[0]
|
||||
jobs_with_one_job.set_expiration(test_job, 0)
|
||||
assert jobs_with_one_job.get_jobs() == []
|
||||
|
||||
|
||||
def test_not_zeroing_ttl(jobs_with_one_job):
|
||||
test_job = jobs_with_one_job.get_jobs()[0]
|
||||
jobs_with_one_job.set_expiration(test_job, 1)
|
||||
assert len(jobs_with_one_job.get_jobs()) == 1
|
||||
sleep(1.2)
|
||||
assert len(jobs_with_one_job.get_jobs()) == 0
|
||||
|
||||
|
||||
def test_jobs(jobs_with_one_job):
|
||||
jobs = jobs_with_one_job
|
||||
test_job = jobs_with_one_job.get_jobs()[0]
|
||||
|
|
Loading…
Reference in a new issue