Merge pull request 'feat: Basic tracking of the NixOS rebuilds' (#98) from system-rebuild-tracking into master

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/98
Reviewed-by: houkime <houkime@protonmail.com>
This commit is contained in:
Inex Code 2024-03-06 18:12:21 +02:00
commit cf2f153cfe
16 changed files with 373 additions and 80 deletions

View file

@ -19,6 +19,7 @@
pytest
pytest-datadir
pytest-mock
pytest-subprocess
black
mypy
pylsp-mypy

View file

@ -4,6 +4,8 @@ import subprocess
import pytz
from typing import Optional, List
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.utils import WriteUserData, ReadUserData
@ -87,10 +89,16 @@ def run_blocking(cmd: List[str], new_session: bool = False) -> str:
return stdout
def rebuild_system() -> int:
def rebuild_system() -> Job:
"""Rebuild the system"""
run_blocking(["systemctl", "start", "sp-nixos-rebuild.service"], new_session=True)
return 0
job = Jobs.add(
type_id="system.nixos.rebuild",
name="Rebuild system",
description="Applying the new system configuration by building the new NixOS generation.",
status=JobStatus.CREATED,
)
rebuild_system_task(job)
return job
def rollback_system() -> int:
@ -99,10 +107,16 @@ def rollback_system() -> int:
return 0
def upgrade_system() -> int:
def upgrade_system() -> Job:
"""Upgrade the system"""
run_blocking(["systemctl", "start", "sp-nixos-upgrade.service"], new_session=True)
return 0
job = Jobs.add(
type_id="system.nixos.upgrade",
name="Upgrade system",
description="Upgrading the system to the latest version.",
status=JobStatus.CREATED,
)
rebuild_system_task(job, upgrade=True)
return job
def reboot_system() -> None:

View file

@ -3,7 +3,9 @@
import typing
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn,
GenericMutationReturn,
MutationReturnInterface,
)
@ -114,16 +116,17 @@ class SystemMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def run_system_rebuild(self) -> GenericMutationReturn:
def run_system_rebuild(self) -> GenericJobMutationReturn:
try:
system_actions.rebuild_system()
return GenericMutationReturn(
job = system_actions.rebuild_system()
return GenericJobMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system rebuild",
code=200,
job=job_to_api_job(job),
)
except system_actions.ShellException as e:
return GenericMutationReturn(
return GenericJobMutationReturn(
success=False,
message=str(e),
code=500,
@ -135,7 +138,7 @@ class SystemMutations:
try:
return GenericMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system rollback",
code=200,
)
except system_actions.ShellException as e:
@ -146,16 +149,17 @@ class SystemMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def run_system_upgrade(self) -> GenericMutationReturn:
system_actions.upgrade_system()
def run_system_upgrade(self) -> GenericJobMutationReturn:
try:
return GenericMutationReturn(
job = system_actions.upgrade_system()
return GenericJobMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system upgrade",
code=200,
job=job_to_api_job(job),
)
except system_actions.ShellException as e:
return GenericMutationReturn(
return GenericJobMutationReturn(
success=False,
message=str(e),
code=500,

View file

@ -0,0 +1,126 @@
"""
A task to start the system upgrade or rebuild by starting a systemd unit.
After starting, track the status of the systemd unit and update the Job
status accordingly.
"""
import subprocess
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs, Job
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils.systemd import (
get_service_status,
get_last_log_lines,
ServiceStatus,
)
START_TIMEOUT = 60 * 5
START_INTERVAL = 1
RUN_TIMEOUT = 60 * 60
RUN_INTERVAL = 5
def check_if_started(unit_name: str):
"""Check if the systemd unit has started"""
try:
status = get_service_status(unit_name)
if status == ServiceStatus.ACTIVE:
return True
return False
except subprocess.CalledProcessError:
return False
def check_running_status(job: Job, unit_name: str):
"""Check if the systemd unit is running"""
try:
status = get_service_status(unit_name)
if status == ServiceStatus.INACTIVE:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result="System rebuilt.",
progress=100,
)
return True
if status == ServiceStatus.FAILED:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild failed. Last log lines:\n" + "\n".join(log_lines),
)
return True
if status == ServiceStatus.ACTIVE:
log_lines = get_last_log_lines(unit_name, 1)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=log_lines[0] if len(log_lines) > 0 else "",
)
return False
return False
except subprocess.CalledProcessError:
return False
@huey.task()
def rebuild_system_task(job: Job, upgrade: bool = False):
"""Rebuild the system"""
unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service"
try:
command = ["systemctl", "start", unit_name]
subprocess.run(
command,
check=True,
start_new_session=True,
shell=False,
)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text="Starting the system rebuild...",
)
# Wait for the systemd unit to start
try:
wait_until_true(
lambda: check_if_started(unit_name),
timeout_sec=START_TIMEOUT,
interval=START_INTERVAL,
)
except TimeoutError:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out. Last log lines:\n"
+ "\n".join(log_lines),
)
return
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text="Rebuilding the system...",
)
# Wait for the systemd unit to finish
try:
wait_until_true(
lambda: check_running_status(job, unit_name),
timeout_sec=RUN_TIMEOUT,
interval=RUN_INTERVAL,
)
except TimeoutError:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out. Last log lines:\n"
+ "\n".join(log_lines),
)
return
except subprocess.CalledProcessError as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
status_text=str(e),
)

View file

@ -11,9 +11,13 @@ Adding DISABLE_ALL to that array disables the migrations module entirely.
from selfprivacy_api.utils import ReadUserData, UserDataFiles
from selfprivacy_api.migrations.write_token_to_redis import WriteTokenToRedis
from selfprivacy_api.migrations.check_for_system_rebuild_jobs import (
CheckForSystemRebuildJobs,
)
migrations = [
WriteTokenToRedis(),
CheckForSystemRebuildJobs(),
]

View file

@ -0,0 +1,47 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.jobs import JobStatus, Jobs
class CheckForSystemRebuildJobs(Migration):
"""Check if there are unfinished system rebuild jobs and finish them"""
def get_migration_name(self):
return "check_for_system_rebuild_jobs"
def get_migration_description(self):
return "Check if there are unfinished system rebuild jobs and finish them"
def is_migration_needed(self):
# Check if there are any unfinished system rebuild jobs
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
return True
def migrate(self):
# As the API is restarted, we assume that the jobs are finished
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result="System rebuilt.",
progress=100,
)

View file

@ -5,7 +5,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON

View file

@ -5,7 +5,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.gitea.icon import GITEA_ICON

View file

@ -4,7 +4,7 @@ import subprocess
from typing import Optional, List
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.generic_status_getter import (
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceStatus

View file

@ -4,7 +4,7 @@ import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.services.generic_status_getter import (
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus

View file

@ -6,7 +6,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON

View file

@ -3,7 +3,7 @@ import base64
import subprocess
import typing
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.ocserv.icon import OCSERV_ICON

View file

@ -6,7 +6,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON

View file

@ -1,4 +1,5 @@
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.backup.tasks import *
from selfprivacy_api.services.generic_service_mover import move_service
from selfprivacy_api.services.tasks import move_service
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task

View file

@ -1,5 +1,6 @@
"""Generic service status fetcher using systemctl"""
import subprocess
from typing import List
from selfprivacy_api.services.service import ServiceStatus
@ -58,3 +59,24 @@ def get_service_status_from_several_units(services: list[str]) -> ServiceStatus:
if ServiceStatus.ACTIVE in service_statuses:
return ServiceStatus.ACTIVE
return ServiceStatus.OFF
def get_last_log_lines(service: str, lines_count: int) -> List[str]:
if lines_count < 1:
raise ValueError("lines_count must be greater than 0")
try:
logs = subprocess.check_output(
[
"journalctl",
"-u",
service,
"-n",
str(lines_count),
"-o",
"cat",
],
shell=False,
).decode("utf-8")
return logs.splitlines()
except subprocess.CalledProcessError:
return []

View file

@ -3,6 +3,9 @@
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.jobs import JobStatus, Jobs
from tests.test_graphql.common import assert_empty, assert_ok, get_data
class ProcessMock:
"""Mock subprocess.Popen"""
@ -37,6 +40,13 @@ def mock_subprocess_check_output(mocker):
return mock
@pytest.fixture
def mock_sleep_intervals(mocker):
mock_start = mocker.patch("selfprivacy_api.jobs.upgrade_system.START_INTERVAL", 0)
mock_run = mocker.patch("selfprivacy_api.jobs.upgrade_system.RUN_INTERVAL", 0)
return (mock_start, mock_run)
API_REBUILD_SYSTEM_MUTATION = """
mutation rebuildSystem {
system {
@ -44,46 +54,14 @@ mutation rebuildSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen):
"""Test system rebuild without authorization"""
response = client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen):
"""Test system rebuild"""
response = authorized_client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["success"] is True
assert response.json()["data"]["system"]["runSystemRebuild"]["message"] is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-rebuild.service",
]
API_UPGRADE_SYSTEM_MUTATION = """
mutation upgradeSystem {
system {
@ -91,44 +69,140 @@ mutation upgradeSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen):
"""Test system upgrade without authorization"""
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_unauthorized(client, fp, action):
"""Test system rebuild without authorization"""
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
response = client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
assert_empty(response)
assert fp.call_count([fp.any()]) == 0
def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen):
"""Test system upgrade"""
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
response = authorized_client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["success"] is True
assert response.json()["data"]["system"]["runSystemUpgrade"]["message"] is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-upgrade.service",
]
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
assert_ok(data)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "show", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.FINISHED
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_failed(
authorized_client, fp, action, mock_sleep_intervals
):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=failed")
fp.register(
["journalctl", "-u", unit_name, "-n", "10", "-o", "cat"], stdout="Some error"
)
response = authorized_client.post(
"/graphql",
json={
"query": query,
},
)
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
assert_ok(data)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "show", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.ERROR
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
API_ROLLBACK_SYSTEM_MUTATION = """