mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-02-07 08:30:38 +00:00
fix: corrected from comment from pr
This commit is contained in:
parent
eda5923cc6
commit
c0f8b9026b
|
@ -1,6 +1,7 @@
|
|||
import re
|
||||
import subprocess
|
||||
from subprocess import Popen, PIPE
|
||||
from typing import Tuple, Iterable, IO, Any, Optional
|
||||
|
||||
from selfprivacy_api.utils.huey import huey
|
||||
|
||||
|
@ -14,7 +15,7 @@ RESULT_WAS_NOT_FOUND_ERROR = "We are sorry, garbage collection result was not fo
|
|||
CLEAR_COMPLETED = "Cleaning completed."
|
||||
|
||||
|
||||
def run_nix_store_print_dead() -> PIPE:
|
||||
def run_nix_store_print_dead() -> IO[Any]:
|
||||
subprocess.run(
|
||||
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
|
||||
check=False,
|
||||
|
@ -24,14 +25,13 @@ def run_nix_store_print_dead() -> PIPE:
|
|||
"utf-8"
|
||||
)
|
||||
|
||||
|
||||
def run_nix_collect_garbage() -> subprocess.Popen:
|
||||
def run_nix_collect_garbage() -> Optional[IO[bytes]]:
|
||||
return subprocess.Popen(
|
||||
["nix-store", "--gc"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
||||
).stdout
|
||||
|
||||
|
||||
def parse_line(line):
|
||||
def parse_line(line: str):
|
||||
"""
|
||||
We parse the string for the presence of a final line,
|
||||
with the final amount of space cleared.
|
||||
|
@ -58,7 +58,7 @@ def parse_line(line):
|
|||
)
|
||||
|
||||
|
||||
def process_stream(job, stream, total_dead_packages):
|
||||
def process_stream(job: Job, stream: Iterable[bytes], total_dead_packages: int) -> None:
|
||||
completed_packages = 0
|
||||
prev_progress = 0
|
||||
|
||||
|
@ -89,7 +89,7 @@ def process_stream(job, stream, total_dead_packages):
|
|||
)
|
||||
|
||||
|
||||
def get_dead_packages(output):
|
||||
def get_dead_packages(output) -> Tuple[int, int]:
|
||||
dead = len(re.findall("/nix/store/", output))
|
||||
percent = 0
|
||||
if dead != 0:
|
||||
|
|
|
@ -18,9 +18,6 @@ from selfprivacy_api.jobs.nix_collect_garbage import (
|
|||
RESULT_WAS_NOT_FOUND_ERROR,
|
||||
)
|
||||
|
||||
pytest_plugins = ("pytest_asyncio",)
|
||||
|
||||
|
||||
OUTPUT_PRINT_DEAD = """
|
||||
finding garbage collector roots...
|
||||
determining live/dead paths...
|
||||
|
@ -102,7 +99,7 @@ def test_get_dead_packages_zero(job_reset):
|
|||
assert get_dead_packages("") == (0, 0)
|
||||
|
||||
|
||||
RUN_NIX_COLLECT_GARBAGE_QUERY = """
|
||||
RUN_NIX_COLLECT_GARBAGE_MUTATION = """
|
||||
mutation CollectGarbage {
|
||||
system {
|
||||
nixCollectGarbage {
|
||||
|
@ -133,7 +130,7 @@ def test_graphql_nix_collect_garbage(authorized_client):
|
|||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": RUN_NIX_COLLECT_GARBAGE_QUERY,
|
||||
"query": RUN_NIX_COLLECT_GARBAGE_MUTATION,
|
||||
},
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue