refactor(backups): api readability reorg

This commit is contained in:
Houkime 2023-06-26 18:00:42 +00:00
parent 273a1935a8
commit 8604caa331

View file

@ -34,125 +34,9 @@ DEFAULT_JSON_PROVIDER = {
class Backups: class Backups:
"""A singleton controller for backups""" """A stateless controller class for backups"""
@staticmethod ### Providers
def set_localfile_repo(file_path: str):
ProviderClass = get_provider(BackupProviderEnum.FILE)
provider = ProviderClass(
login="",
key="",
location=file_path,
repo_id="",
)
Storage.store_provider(provider)
@staticmethod
def get_last_backed_up(service: Service) -> Optional[datetime]:
"""Get a timezone-aware time of the last backup of a service"""
return Storage.get_last_backup_time(service.get_id())
@staticmethod
def get_cached_snapshots_service(service_id: str) -> List[Snapshot]:
snapshots = Storage.get_cached_snapshots()
return [snap for snap in snapshots if snap.service_name == service_id]
@staticmethod
def sync_service_snapshots(service_id: str, snapshots: List[Snapshot]):
for snapshot in snapshots:
if snapshot.service_name == service_id:
Storage.cache_snapshot(snapshot)
for snapshot in Backups.get_cached_snapshots_service(service_id):
if snapshot.id not in [snap.id for snap in snapshots]:
Storage.delete_cached_snapshot(snapshot)
@staticmethod
def enable_autobackup(service: Service):
Storage.set_autobackup(service)
@staticmethod
def _service_ids_to_back_up(time: datetime) -> List[str]:
services = Storage.services_with_autobackup()
return [
id
for id in services
if Backups.is_time_to_backup_service(
id,
time,
)
]
@staticmethod
def services_to_back_up(time: datetime) -> List[Service]:
result = []
for id in Backups._service_ids_to_back_up(time):
service = get_service_by_id(id)
if service is None:
raise ValueError(
"Cannot look up a service scheduled for backup!",
)
result.append(service)
return result
@staticmethod
def is_time_to_backup(time: datetime) -> bool:
"""
Intended as a time validator for huey cron scheduler
of automatic backups
"""
return Backups._service_ids_to_back_up(time) != []
@staticmethod
def is_time_to_backup_service(service_id: str, time: datetime):
period = Backups.autobackup_period_minutes()
if period is None:
return False
if not Storage.is_autobackup_set(service_id):
return False
last_backup = Storage.get_last_backup_time(service_id)
if last_backup is None:
# queue a backup immediately if there are no previous backups
return True
if time > last_backup + timedelta(minutes=period):
return True
return False
@staticmethod
def disable_autobackup(service: Service):
"""also see disable_all_autobackup()"""
Storage.unset_autobackup(service)
@staticmethod
def is_autobackup_enabled(service: Service) -> bool:
return Storage.is_autobackup_set(service.get_id())
@staticmethod
def autobackup_period_minutes() -> Optional[int]:
"""None means autobackup is disabled"""
return Storage.autobackup_period_minutes()
@staticmethod
def set_autobackup_period_minutes(minutes: int):
"""
0 and negative numbers are equivalent to disable.
Setting to a positive number may result in a backup very soon
if some services are not backed up.
"""
if minutes <= 0:
Backups.disable_all_autobackup()
return
Storage.store_autobackup_period_minutes(minutes)
@staticmethod
def disable_all_autobackup():
"""
Disables all automatic backing up,
but does not change per-service settings
"""
Storage.delete_backup_period()
@staticmethod @staticmethod
def provider(): def provider():
@ -175,32 +59,6 @@ class Backups:
) )
Storage.store_provider(provider) Storage.store_provider(provider)
@staticmethod
def construct_provider(
kind: BackupProviderEnum,
login: str,
key: str,
location: str,
repo_id: str = "",
) -> AbstractBackupProvider:
provider_class = get_provider(kind)
return provider_class(
login=login,
key=key,
location=location,
repo_id=repo_id,
)
@staticmethod
def reset(reset_json=True):
Storage.reset()
if reset_json:
try:
Backups.reset_provider_json()
except FileNotFoundError:
# if there is no userdata file, we do not need to reset it
pass
@staticmethod @staticmethod
def lookup_provider() -> AbstractBackupProvider: def lookup_provider() -> AbstractBackupProvider:
@ -223,6 +81,36 @@ class Backups:
Storage.store_provider(none_provider) Storage.store_provider(none_provider)
return none_provider return none_provider
@staticmethod
def construct_provider(
kind: BackupProviderEnum,
login: str,
key: str,
location: str,
repo_id: str = "",
) -> AbstractBackupProvider:
provider_class = get_provider(kind)
return provider_class(
login=login,
key=key,
location=location,
repo_id=repo_id,
)
@staticmethod
def load_provider_redis() -> Optional[AbstractBackupProvider]:
provider_model = Storage.load_provider()
if provider_model is None:
return None
return Backups.construct_provider(
BackupProviderEnum[provider_model.kind],
provider_model.login,
provider_model.key,
provider_model.location,
provider_model.repo_id,
)
@staticmethod @staticmethod
def load_provider_json() -> Optional[AbstractBackupProvider]: def load_provider_json() -> Optional[AbstractBackupProvider]:
with ReadUserData() as user_data: with ReadUserData() as user_data:
@ -261,18 +149,18 @@ class Backups:
user_data["backup"] = DEFAULT_JSON_PROVIDER user_data["backup"] = DEFAULT_JSON_PROVIDER
@staticmethod @staticmethod
def load_provider_redis() -> Optional[AbstractBackupProvider]: def reset(reset_json=True):
provider_model = Storage.load_provider() Storage.reset()
if provider_model is None: if reset_json:
return None try:
return Backups.construct_provider( Backups.reset_provider_json()
BackupProviderEnum[provider_model.kind], except FileNotFoundError:
provider_model.login, # if there is no userdata file, we do not need to reset it
provider_model.key, pass
provider_model.location,
provider_model.repo_id, ### Backup
)
@staticmethod @staticmethod
def back_up(service: Service): def back_up(service: Service):
@ -300,6 +188,8 @@ class Backups:
Jobs.update(job, status=JobStatus.FINISHED) Jobs.update(job, status=JobStatus.FINISHED)
return snapshot return snapshot
### Init
@staticmethod @staticmethod
def init_repo(): def init_repo():
Backups.provider().backupper.init() Backups.provider().backupper.init()
@ -317,6 +207,8 @@ class Backups:
return False return False
### Snapshots
@staticmethod @staticmethod
def get_snapshots(service: Service) -> List[Snapshot]: def get_snapshots(service: Service) -> List[Snapshot]:
snapshots = Backups.get_all_snapshots() snapshots = Backups.get_all_snapshots()
@ -363,6 +255,36 @@ class Backups:
for snapshot in upstream_snapshots: for snapshot in upstream_snapshots:
Storage.cache_snapshot(snapshot) Storage.cache_snapshot(snapshot)
@staticmethod
def service_snapshot_size(snapshot_id: str) -> int:
return Backups.provider().backupper.restored_size(
snapshot_id,
)
@staticmethod
def _store_last_snapshot(service_id: str, snapshot: Snapshot):
"""What do we do with a snapshot that is just made?"""
# non-expiring timestamp of the last
Storage.store_last_timestamp(service_id, snapshot)
# expiring cache entry
Storage.cache_snapshot(snapshot)
@staticmethod
def get_cached_snapshots_service(service_id: str) -> List[Snapshot]:
snapshots = Storage.get_cached_snapshots()
return [snap for snap in snapshots if snap.service_name == service_id]
@staticmethod
def sync_service_snapshots(service_id: str, snapshots: List[Snapshot]):
for snapshot in snapshots:
if snapshot.service_name == service_id:
Storage.cache_snapshot(snapshot)
for snapshot in Backups.get_cached_snapshots_service(service_id):
if snapshot.id not in [snap.id for snap in snapshots]:
Storage.delete_cached_snapshot(snapshot)
### Restoring
# to be deprecated/internalized in favor of restore_snapshot() # to be deprecated/internalized in favor of restore_snapshot()
@staticmethod @staticmethod
def restore_service_from_snapshot(service: Service, snapshot_id: str): def restore_service_from_snapshot(service: Service, snapshot_id: str):
@ -425,11 +347,101 @@ class Backups:
status=JobStatus.FINISHED, status=JobStatus.FINISHED,
) )
### Autobackup
@staticmethod @staticmethod
def service_snapshot_size(snapshot_id: str) -> int: def is_autobackup_enabled(service: Service) -> bool:
return Backups.provider().backupper.restored_size( return Storage.is_autobackup_set(service.get_id())
snapshot_id,
) @staticmethod
def enable_autobackup(service: Service):
Storage.set_autobackup(service)
@staticmethod
def disable_autobackup(service: Service):
"""also see disable_all_autobackup()"""
Storage.unset_autobackup(service)
@staticmethod
def disable_all_autobackup():
"""
Disables all automatic backing up,
but does not change per-service settings
"""
Storage.delete_backup_period()
@staticmethod
def autobackup_period_minutes() -> Optional[int]:
"""None means autobackup is disabled"""
return Storage.autobackup_period_minutes()
@staticmethod
def set_autobackup_period_minutes(minutes: int):
"""
0 and negative numbers are equivalent to disable.
Setting to a positive number may result in a backup very soon
if some services are not backed up.
"""
if minutes <= 0:
Backups.disable_all_autobackup()
return
Storage.store_autobackup_period_minutes(minutes)
@staticmethod
def is_time_to_backup(time: datetime) -> bool:
"""
Intended as a time validator for huey cron scheduler
of automatic backups
"""
return Backups._service_ids_to_back_up(time) != []
@staticmethod
def services_to_back_up(time: datetime) -> List[Service]:
result = []
for id in Backups._service_ids_to_back_up(time):
service = get_service_by_id(id)
if service is None:
raise ValueError(
"Cannot look up a service scheduled for backup!",
)
result.append(service)
return result
@staticmethod
def get_last_backed_up(service: Service) -> Optional[datetime]:
"""Get a timezone-aware time of the last backup of a service"""
return Storage.get_last_backup_time(service.get_id())
@staticmethod
def is_time_to_backup_service(service_id: str, time: datetime):
period = Backups.autobackup_period_minutes()
if period is None:
return False
if not Storage.is_autobackup_set(service_id):
return False
last_backup = Storage.get_last_backup_time(service_id)
if last_backup is None:
# queue a backup immediately if there are no previous backups
return True
if time > last_backup + timedelta(minutes=period):
return True
return False
@staticmethod
def _service_ids_to_back_up(time: datetime) -> List[str]:
services = Storage.services_with_autobackup()
return [
id
for id in services
if Backups.is_time_to_backup_service(
id,
time,
)
]
### Helpers
@staticmethod @staticmethod
def space_usable_for_service(service: Service) -> int: def space_usable_for_service(service: Service) -> int:
@ -442,9 +454,15 @@ class Backups:
return usable_bytes return usable_bytes
@staticmethod @staticmethod
def _store_last_snapshot(service_id: str, snapshot: Snapshot): def set_localfile_repo(file_path: str):
"""What do we do with a snapshot that is just made?""" ProviderClass = get_provider(BackupProviderEnum.FILE)
# non-expiring timestamp of the last provider = ProviderClass(
Storage.store_last_timestamp(service_id, snapshot) login="",
# expiring cache entry key="",
Storage.cache_snapshot(snapshot) location=file_path,
repo_id="",
)
Storage.store_provider(provider)