mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-26 14:01:30 +00:00
feature(backups): automatic backup
This commit is contained in:
parent
ec85f060f8
commit
4018dca184
|
@ -52,17 +52,29 @@ class Backups:
|
||||||
def enable_autobackup(service: Service):
|
def enable_autobackup(service: Service):
|
||||||
Storage.set_autobackup(service)
|
Storage.set_autobackup(service)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _service_ids_to_back_up(time: datetime) -> List[str]:
|
||||||
|
services = Storage.services_with_autobackup()
|
||||||
|
return [id for id in services if Backups.is_time_to_backup_service(id, time)]
|
||||||
|
|
||||||
|
# untestable until the dummy service is registered
|
||||||
|
@staticmethod
|
||||||
|
def services_to_back_up(time: datetime) -> List[Service]:
|
||||||
|
result = []
|
||||||
|
for id in Backups._service_ids_to_back_up(time):
|
||||||
|
service = get_service_by_id(id)
|
||||||
|
if service is None:
|
||||||
|
raise ValueError("Cannot look up a service scheduled for backup!")
|
||||||
|
result.append(service)
|
||||||
|
return result
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_time_to_backup(time: datetime) -> bool:
|
def is_time_to_backup(time: datetime) -> bool:
|
||||||
"""
|
"""
|
||||||
Intended as a time validator for huey cron scheduler of automatic backups
|
Intended as a time validator for huey cron scheduler of automatic backups
|
||||||
"""
|
"""
|
||||||
|
|
||||||
enabled_services = Storage.services_with_autobackup()
|
return Backups._service_ids_to_back_up(time) != []
|
||||||
for service_id in enabled_services:
|
|
||||||
if Backups.is_time_to_backup_service(service_id, time):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_time_to_backup_service(service_id: str, time: datetime):
|
def is_time_to_backup_service(service_id: str, time: datetime):
|
||||||
|
|
|
@ -1,9 +1,31 @@
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
from selfprivacy_api.utils.huey import huey
|
from selfprivacy_api.utils.huey import huey
|
||||||
from selfprivacy_api.services.service import Service
|
from selfprivacy_api.services.service import Service
|
||||||
from selfprivacy_api.backup import Backups
|
from selfprivacy_api.backup import Backups
|
||||||
|
|
||||||
|
|
||||||
|
def validate_datetime(dt: datetime):
|
||||||
|
# dt = datetime.now(timezone.utc)
|
||||||
|
if dt.timetz is None:
|
||||||
|
raise ValueError(
|
||||||
|
"""
|
||||||
|
huey passed in the timezone-unaware time!
|
||||||
|
Post it in support chat or maybe try uncommenting a line above
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
return Backups.is_time_to_backup(dt)
|
||||||
|
|
||||||
|
|
||||||
# huey tasks need to return something
|
# huey tasks need to return something
|
||||||
@huey.task()
|
@huey.task()
|
||||||
def start_backup(service: Service) -> bool:
|
def start_backup(service: Service) -> bool:
|
||||||
Backups.back_up(service)
|
Backups.back_up(service)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@huey.periodic_task(validate_datetime=validate_datetime)
|
||||||
|
def automatic_backup():
|
||||||
|
time = datetime.now()
|
||||||
|
for service in Backups.services_to_back_up(time):
|
||||||
|
start_backup(service)
|
||||||
|
|
Loading…
Reference in a new issue