job system error handling;

- Jobs can now throw exceptions using job_abort.  This will store an
  error message on the job and immediately stop the job.  The method
  will never return.  It must be called from the worker thread running
  the job, so in practice it should only be called by the job function.
- After waiting for the job to complete or abort using job_wait, the
  error message can be checked using job_get_error.  This is the raw
  pointer passed to job_abort, caller is responsible for freeing it or
  doing whatever necessary cleanup is required.
- job_free must be used to recycle the job after waiting on it.

Yes it uses longjmp.

There's also some general cleanup, and job_start just does random jobs
when the queue is full instead of returning NULL.
This commit is contained in:
bjorn 2024-01-05 14:24:52 -08:00
parent b32ac1c5a5
commit f6af859984
2 changed files with 53 additions and 30 deletions

View File

@ -1,6 +1,7 @@
#include "job.h"
#include <stdatomic.h>
#include <threads.h>
#include <setjmp.h>
#include <string.h>
#define MAX_WORKERS 64
@ -9,7 +10,7 @@
struct job {
job* next;
atomic_uint done;
fn_job* fn;
union { fn_job* fn; const char* error; };
void* arg;
};
@ -22,28 +23,40 @@ static struct {
job* pool;
cnd_t hasJob;
mtx_t lock;
bool done;
bool quit;
} state;
static int worker_loop(void* arg) {
static thread_local jmp_buf catch;
// Must hold lock, this will unlock it, state.head must exist
static void runJob(void) {
job* job = state.head;
state.head = job->next;
if (!job->next) state.tail = NULL;
mtx_unlock(&state.lock);
if (setjmp(catch) == 0) {
fn_job* fn = job->fn;
job->error = NULL;
fn(job, job->arg);
}
job->done = true;
}
static int workerLoop(void* arg) {
for (;;) {
mtx_lock(&state.lock);
while (!state.head && !state.done) {
while (!state.head && !state.quit) {
cnd_wait(&state.hasJob, &state.lock);
}
if (state.done) {
if (state.quit) {
break;
}
job* job = state.head;
state.head = job->next;
if (!job->next) state.tail = NULL;
mtx_unlock(&state.lock);
job->fn(job->arg);
job->done = true;
runJob();
}
mtx_unlock(&state.lock);
@ -61,7 +74,7 @@ bool job_init(uint32_t count) {
if (count > MAX_WORKERS) count = MAX_WORKERS;
for (uint32_t i = 0; i < count; i++, state.workerCount++) {
if (thrd_create(&state.workers[i], worker_loop, (void*) (uintptr_t) i) != thrd_success) {
if (thrd_create(&state.workers[i], workerLoop, (void*) (uintptr_t) i) != thrd_success) {
return false;
}
}
@ -70,7 +83,7 @@ bool job_init(uint32_t count) {
}
void job_destroy(void) {
state.done = true;
state.quit = true;
cnd_broadcast(&state.hasJob);
for (uint32_t i = 0; i < state.workerCount; i++) {
thrd_join(state.workers[i], NULL);
@ -81,15 +94,17 @@ void job_destroy(void) {
}
job* job_start(fn_job* fn, void* arg) {
if (!state.pool) {
return NULL;
}
for (;;) {
mtx_lock(&state.lock);
mtx_lock(&state.lock);
if (!state.pool) {
mtx_unlock(&state.lock);
return NULL;
if (state.pool) {
break;
} else if (state.head) {
runJob();
} else { // Might not be a job to do if worker count is bigger than pool size
mtx_unlock(&state.lock);
thrd_yield();
}
}
job* job = state.pool;
@ -113,23 +128,29 @@ job* job_start(fn_job* fn, void* arg) {
return job;
}
void job_abort(job* job, const char* error) {
job->error = error;
longjmp(catch, 22);
}
void job_wait(job* job) {
while (!job->done) {
mtx_lock(&state.lock);
if (state.head) {
struct job* task = state.head;
state.head = task->next;
if (!task->next) state.tail = NULL;
mtx_unlock(&state.lock);
task->fn(task->arg);
task->done = true;
runJob();
} else {
mtx_unlock(&state.lock);
thrd_yield();
}
}
}
char* job_get_error(job* job) {
return (char*) job->error;
}
void job_free(job* job) {
mtx_lock(&state.lock);
job->next = state.pool;
state.pool = job;

View File

@ -4,10 +4,12 @@
#pragma once
typedef struct job job;
typedef void fn_job(void* arg);
typedef void fn_job(job* job, void* arg);
bool job_init(uint32_t workerCount);
void job_destroy(void);
job* job_start(fn_job* fn, void* arg);
void job_abort(job* job, const char* error);
void job_wait(job* job);
char* job_get_error(job* job);
void job_free(job* job);