selfprivacy-rest-api/tests/test_jobs.py

85 lines
2 KiB
Python
Raw Normal View History

# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import pytest
from selfprivacy_api.jobs import Jobs, JobStatus
import selfprivacy_api.jobs as jobsmodule
2022-11-30 14:31:37 +00:00
def test_add_reset(jobs_with_one_job):
jobs_with_one_job.reset()
assert jobs_with_one_job.get_jobs() == []
2022-11-30 14:17:53 +00:00
2022-11-30 15:12:42 +00:00
def test_minimal_update(jobs_with_one_job):
jobs = jobs_with_one_job
test_job = jobs_with_one_job.get_jobs()[0]
jobs.update(job=test_job, status=JobStatus.ERROR)
assert jobs.get_jobs() == [test_job]
def test_remove_update_nonexistent(jobs_with_one_job):
test_job = jobs_with_one_job.get_jobs()[0]
jobs_with_one_job.remove(test_job)
assert jobs_with_one_job.get_jobs() == []
result = jobs_with_one_job.update(job=test_job, status=JobStatus.ERROR)
assert result == test_job # even though we might consider changing this behavior
2022-11-30 14:31:37 +00:00
def test_jobs(jobs_with_one_job):
jobs = jobs_with_one_job
test_job = jobs_with_one_job.get_jobs()[0]
assert not jobs.is_busy()
jobs.update(
job=test_job,
2022-11-30 15:21:57 +00:00
name="Write Tests",
description="An oddly satisfying experience",
status=JobStatus.RUNNING,
status_text="Status text",
progress=50,
)
assert jobs.get_jobs() == [test_job]
assert jobs.is_busy()
backup = jobsmodule.JOB_EXPIRATION_SECONDS
jobsmodule.JOB_EXPIRATION_SECONDS = 0
jobs.update(
job=test_job,
status=JobStatus.FINISHED,
status_text="Yaaay!",
progress=100,
)
assert jobs.get_jobs() == []
jobsmodule.JOB_EXPIRATION_SECONDS = backup
@pytest.fixture
2022-11-30 14:17:53 +00:00
def jobs():
j = Jobs()
j.reset()
assert j.get_jobs() == []
yield j
j.reset()
2022-11-30 14:31:37 +00:00
@pytest.fixture
def jobs_with_one_job(jobs):
test_job = jobs.add(
type_id="test",
name="Test job",
description="This is a test job.",
status=JobStatus.CREATED,
status_text="Status text",
progress=0,
)
assert jobs.get_jobs() == [test_job]
return jobs