refactor: nix-collect-garbage is now pure

This commit is contained in:
def 2022-12-13 05:44:52 +04:00
parent 3ecd522bd2
commit 510b94039e
2 changed files with 110 additions and 53 deletions

View file

@ -5,31 +5,17 @@ from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.utils.huey import huey
@huey.task()
def nix_collect_garbage(job: Job):
def run_nix_store_print_dead():
return subprocess.check_output(["nix-store", "--gc", "--print-dead"])
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text="Сalculate the number of dead packages...",
def run_nix_collect_garbage():
return subprocess.Popen(
["nix-collect-garbage", "-d"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
output = subprocess.check_output(
["nix-store --gc --print-dead", "--gc", "--print-dead"]
)
dead_packages = len(re.findall("/nix/store/", output.decode("utf-8")))
package_equal_to_percent = 100 / dead_packages
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text=f"Found {dead_packages} packages to remove!",
)
def _parse_line(line):
def parse_line(line, job: Job):
pattern = re.compile(r"[+-]?\d+\.\d+ \w+ freed")
match = re.search(
pattern,
@ -54,7 +40,12 @@ def nix_collect_garbage(job: Job):
result=f"{match.group(0)} have been cleared",
)
def _stream_process(process):
def stream_process(
process,
package_equal_to_percent,
job: Job,
):
go = process.poll() is None
percent = 0
@ -70,11 +61,45 @@ def nix_collect_garbage(job: Job):
)
elif "store paths deleted," in line:
_parse_line(line)
parse_line(line, job)
return go
process = subprocess.Popen(
["nix-collect-garbage", "-d"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
@huey.task()
def nix_collect_garbage(
job: Job,
run_nix_store=run_nix_store_print_dead,
run_nix_collect=run_nix_collect_garbage,
): # innocent as a pure function
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text="Сalculate the number of dead packages...",
)
_stream_process(process)
output = run_nix_store()
dead_packages = len(re.findall("/nix/store/", output.decode("utf-8")))
if dead_packages == 0:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
progress=100,
status_text="Nothing to clear",
result="System is clear",
)
package_equal_to_percent = 100 / dead_packages
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text=f"Found {dead_packages} packages to remove!",
)
stream_process(run_nix_collect, package_equal_to_percent, job)

View file

@ -0,0 +1,32 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.jobs.nix_collect_garbage import nix_collect_garbage
created_at: datetime.datetime
updated_at: datetime.datetime
uid: UUID
type_id: str
name: str
description: str
status: JobStatus
def test_nix_collect_garbage(job(
created_at = "2019-12-04",
updated_at = "2019-12-04",
uid = UUID,
type_id = "typeid",
name = "name",
description: "desc",
status = status(CREATED = "CREATED"),
)):
assert nix_collect_garbage() is not None