Compare commits

...

13 Commits

Author SHA1 Message Date
Bjorn 6c7ca1e275
Merge pull request #750 from bjornbytes/job
Job System
2024-02-28 12:51:25 -08:00
bjorn 224b6a495d Simplify job error string memory stuff;
Comparison against string literal isn't good.  I wish there was a way to
not heap allocate so much though, but exceptions are exceptional anyway.
2024-02-28 12:31:34 -08:00
bjorn 901dd268f2 job copies errors; add variadic version of job_abort;
I caved and depended on malloc/sprintf because I couldn't find anywhere
else good to put it...
2024-02-28 12:31:34 -08:00
bjorn f6af859984 job system error handling;
- Jobs can now throw exceptions using job_abort.  This will store an
  error message on the job and immediately stop the job.  The method
  will never return.  It must be called from the worker thread running
  the job, so in practice it should only be called by the job function.
- After waiting for the job to complete or abort using job_wait, the
  error message can be checked using job_get_error.  This is the raw
  pointer passed to job_abort, caller is responsible for freeing it or
  doing whatever necessary cleanup is required.
- job_free must be used to recycle the job after waiting on it.

Yes it uses longjmp.

There's also some general cleanup, and job_start just does random jobs
when the queue is full instead of returning NULL.
2024-02-28 12:31:34 -08:00
bjorn b32ac1c5a5 Details; 2024-02-28 12:31:34 -08:00
bjorn 1701409763 threads polyfill defines thread_local;
Might need to be removed for C23.
2024-02-28 12:31:34 -08:00
bjorn a4c1d9fce6 Thread module initializes job system;
Worker count can be set from conf.lua.

A negative worker count is relative to the number of cores.

-1 is the default.
2024-02-28 12:31:34 -08:00
bjorn d017ca0fac Details; 2024-02-28 12:31:34 -08:00
bjorn 35b5693a19 Add some more windows atomics; 2024-02-28 12:31:34 -08:00
bjorn e8dc1484d3 Cleanup; 2024-02-28 12:31:34 -08:00
bjorn 98e2e94ea8 Job done counter uses atomic loads/stores; 2024-02-28 12:31:34 -08:00
bjorn 4a175c4cf8 Fix atomic intrinsics; 2024-02-28 12:31:34 -08:00
bjorn 6347114030 Job system; 2024-02-28 12:31:34 -08:00
11 changed files with 255 additions and 19 deletions

View File

@ -582,6 +582,7 @@ endif()
if(LOVR_ENABLE_THREAD)
target_sources(lovr PRIVATE
src/core/job.c
src/modules/thread/thread.c
src/api/l_thread.c
src/api/l_thread_channel.c

View File

@ -451,6 +451,7 @@ src += (config.modules.audio or config.modules.data) and 'src/lib/miniaudio/*.c'
src += config.modules.data and 'src/lib/jsmn/*.c' or nil
src += config.modules.data and 'src/lib/minimp3/*.c' or nil
src += config.modules.math and 'src/lib/noise/*.c' or nil
src += config.modules.thread and 'src/core/job.c' or nil
-- embed resource files with xxd

View File

@ -41,6 +41,9 @@ local conf = {
math = {
globals = true
},
thread = {
workers = -1
},
window = {
width = 720,
height = 800,

View File

@ -6,5 +6,4 @@ function lovr.conf(t)
t.headset.supersample = true
t.modules.audio = false
t.modules.physics = false
t.modules.thread = false
end

View File

@ -2,6 +2,7 @@
#include "data/blob.h"
#include "event/event.h"
#include "thread/thread.h"
#include "core/os.h"
#include "util.h"
#include <lualib.h>
#include <stdlib.h>
@ -88,7 +89,21 @@ int luaopen_lovr_thread(lua_State* L) {
luax_register(L, lovrThreadModule);
luax_registertype(L, Thread);
luax_registertype(L, Channel);
lovrThreadModuleInit();
int32_t workers = -1;
luax_pushconf(L);
lua_getfield(L, -1, "thread");
if (lua_istable(L, -1)) {
lua_getfield(L, -1, "workers");
if (lua_type(L, -1) == LUA_TNUMBER) {
workers = lua_tointeger(L, -1);
}
lua_pop(L, 1);
}
lua_pop(L, 2);
lovrThreadModuleInit(workers);
luax_atexit(L, lovrThreadModuleDestroy);
return 1;
}

172
src/core/job.c Normal file
View File

@ -0,0 +1,172 @@
#include "job.h"
#include <stdatomic.h>
#include <threads.h>
#include <setjmp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define MAX_WORKERS 64
#define MAX_JOBS 1024
struct job {
job* next;
atomic_uint done;
union { fn_job* fn; char* error; };
void* arg;
};
static struct {
job jobs[MAX_JOBS];
thrd_t workers[MAX_WORKERS];
uint32_t workerCount;
job* head;
job* tail;
job* pool;
cnd_t hasJob;
mtx_t lock;
bool quit;
} state;
static thread_local jmp_buf catch;
// Must hold lock, this will unlock it, state.head must exist
static void runJob(void) {
job* job = state.head;
state.head = job->next;
if (!job->next) state.tail = NULL;
mtx_unlock(&state.lock);
if (setjmp(catch) == 0) {
fn_job* fn = job->fn;
job->error = NULL;
fn(job, job->arg);
}
job->done = true;
}
static int workerLoop(void* arg) {
for (;;) {
mtx_lock(&state.lock);
while (!state.head && !state.quit) {
cnd_wait(&state.hasJob, &state.lock);
}
if (state.quit) {
break;
}
runJob();
}
mtx_unlock(&state.lock);
return 0;
}
bool job_init(uint32_t count) {
mtx_init(&state.lock, mtx_plain);
cnd_init(&state.hasJob);
state.pool = state.jobs;
for (uint32_t i = 0; i < MAX_JOBS - 1; i++) {
state.jobs[i].next = &state.jobs[i + 1];
}
if (count > MAX_WORKERS) count = MAX_WORKERS;
for (uint32_t i = 0; i < count; i++, state.workerCount++) {
if (thrd_create(&state.workers[i], workerLoop, (void*) (uintptr_t) i) != thrd_success) {
return false;
}
}
return true;
}
void job_destroy(void) {
state.quit = true;
cnd_broadcast(&state.hasJob);
for (uint32_t i = 0; i < state.workerCount; i++) {
thrd_join(state.workers[i], NULL);
}
cnd_destroy(&state.hasJob);
mtx_destroy(&state.lock);
memset(&state, 0, sizeof(state));
}
job* job_start(fn_job* fn, void* arg) {
for (;;) {
mtx_lock(&state.lock);
if (state.pool) {
break;
} else if (state.head) {
runJob();
} else { // Might not be a job to do if worker count is bigger than pool size
mtx_unlock(&state.lock);
thrd_yield();
}
}
job* job = state.pool;
state.pool = job->next;
if (state.tail) {
state.tail->next = job;
state.tail = job;
} else {
state.head = job;
state.tail = job;
cnd_signal(&state.hasJob);
}
job->next = NULL;
job->done = false;
job->fn = fn;
job->arg = arg;
mtx_unlock(&state.lock);
return job;
}
void job_abort(job* job, const char* error) {
size_t length = strlen(error);
job->error = malloc(length + 1);
if (job->error) memcpy(job->error, error, length + 1);
else job->error = strdup("Out of memory");
longjmp(catch, 22);
}
void job_vabort(job* job, const char* format, va_list args) {
int length = vsnprintf(NULL, 0, format, args);
job->error = malloc(length + 1);
if (job->error) vsnprintf(job->error, length + 1, format, args);
else job->error = strdup("Out of memory");
longjmp(catch, 22);
}
void job_wait(job* job) {
while (!job->done) {
mtx_lock(&state.lock);
if (state.head) {
runJob();
} else {
mtx_unlock(&state.lock);
thrd_yield();
}
}
}
const char* job_get_error(job* job) {
return (char*) job->error;
}
void job_free(job* job) {
mtx_lock(&state.lock);
if (job->error) free(job->error);
job->next = state.pool;
state.pool = job;
mtx_unlock(&state.lock);
}

17
src/core/job.h Normal file
View File

@ -0,0 +1,17 @@
#include <stdarg.h>
#include <stdint.h>
#include <stdbool.h>
#pragma once
typedef struct job job;
typedef void fn_job(job* job, void* arg);
bool job_init(uint32_t workerCount);
void job_destroy(void);
job* job_start(fn_job* fn, void* arg);
void job_abort(job* job, const char* error);
void job_vabort(job* job, const char* format, va_list args);
void job_wait(job* job);
const char* job_get_error(job* job);
void job_free(job* job);

View File

@ -92,35 +92,35 @@ typedef uintmax_t atomic_uintmax_t;
// 7.17.7
#define atomic_store(p, x) __atomic_store(p, x, __ATOMIC_SEQ_CST)
#define atomic_store_explicit __atomic_store
#define atomic_store_explicit __atomic_store_n
#define atomic_store(p, x) atomic_store_explicit(p, x, __ATOMIC_SEQ_CST)
#define atomic_load(p, x) __atomic_load(p, x, __ATOMIC_SEQ_CST)
#define atomic_load_explicit __atomic_load
#define atomic_load_explicit __atomic_load_n
#define atomic_load(p) atomic_load_explicit(p, __ATOMIC_SEQ_CST)
#define atomic_exchange(p, x) __atomic_exchange(p, x, __ATOMIC_SEQ_CST)
#define atomic_exchange_explicit __atomic_exchange
#define atomic_exchange_explicit __atomic_exchange_n
#define atomic_exchange(p, x) atomic_exchange_explicit(p, x, __ATOMIC_SEQ_CST)
#define atomic_compare_exchange_strong(p, x, y) __atomic_compare_exchange(p, x, y, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
#define atomic_compare_exchange_strong_explicit(p, x, y, o1, o2) __atomic_compare_exchange(p, x, y, false, o1, o2)
#define atomic_compare_exchange_strong_explicit(p, x, y, o1, o2) __atomic_compare_exchange_n(p, x, y, false, o1, o2)
#define atomic_compare_exchange_strong(p, x, y) atomic_compare_exchange_strong_explicit(p, x, y, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
#define atomic_compare_exchange_weak(p, x, y) __atomic_compare_exchange(p, x, y, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
#define atomic_compare_exchange_weak_explicit(p, x, y, o1, o2) __atomic_compare_exchange(p, x, y, true, o1, o2)
#define atomic_compare_exchange_weak_explicit(p, x, y, o1, o2) __atomic_compare_exchange_n(p, x, y, true, o1, o2)
#define atomic_compare_exchange_weak(p, x, y) atomic_compare_exchange_weak_explicit(p, x, y, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
#define atomic_fetch_add(p, x) __atomic_fetch_add(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_add_explicit __atomic_fetch_add
#define atomic_fetch_add(p, x) atomic_fetch_add_explicit(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_sub(p, x) __atomic_fetch_sub(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_explicit __atomic_fetch_sub
#define atomic_fetch_sub(p, x) atomic_fetch_sub_explicit(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_or(p, x) __atomic_fetch_or(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_or_explicit __atomic_fetch_or
#define atomic_fetch_or(p, x) atomic_fetch_or_explicit(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_xor(p, x) __atomic_fetch_xor(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_explicit __atomic_fetch_xor
#define atomic_fetch_xor(p, x) atomic_fetch_xor_explicit(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_and(p, x) __atomic_fetch_and(p, x, __ATOMIC_SEQ_CST)
#define atomic_fetch_and_explicit __atomic_fetch_and
#define atomic_fetch_and(p, x) atomic_fetch_and_explicit(p, x, __ATOMIC_SEQ_CST)
// 7.17.8
@ -145,8 +145,26 @@ _Bool atomic_flag_clear_explicit(volatile atomic_flag*, memory_order);
typedef volatile long atomic_uint;
#define atomic_store(p, x) *(p) = (x);
#define atomic_store_explicit(p, x, o) atomic_store(p, x)
#define atomic_load(p) *(p)
#define atomic_load_explicit(p, o) atomic_load(p)
#define atomic_fetch_add(p, x) _InterlockedExchangeAdd(p, x)
#define atomic_fetch_add_explicit(p, x, o) atomic_fetch_add(p, x)
#define atomic_fetch_sub(p, x) _InterlockedExchangeAdd(p, -(x))
#define atomic_fetch_sub_explicit(p, x, o) atomic_fetch_sub(p, x)
#define atomic_fetch_or(p, x) InterlockedOr(p, x)
#define atomic_fetch_or_explicit(p, x, o) atomic_fetch_or(p, x)
#define atomic_fetch_xor(p, x) InterlockedXor(p, x)
#define atomic_fetch_xor_explicit(p, x, o) atomic_fetch_xor(p, x)
#define atomic_fetch_and(p, x) InterlockedAnd(p, x)
#define atomic_fetch_and_explicit(p, x, o) atomic_fetch_and(p, x)
#define ATOMIC_INT_LOCK_FREE 2

View File

@ -13,11 +13,13 @@ enum { mtx_plain };
typedef HANDLE thrd_t;
typedef CRITICAL_SECTION mtx_t;
typedef CONDITION_VARIABLE cnd_t;
#define thread_local __declspec(thread)
#else
#include <pthread.h>
typedef pthread_t thrd_t;
typedef pthread_mutex_t mtx_t;
typedef pthread_cond_t cnd_t;
#define thread_local _Thread_local
#endif
static inline int thrd_create(thrd_t* thread, thrd_start_t fn, void* arg);

View File

@ -1,6 +1,7 @@
#include "thread/thread.h"
#include "data/blob.h"
#include "event/event.h"
#include "core/job.h"
#include "core/os.h"
#include "util.h"
#include <math.h>
@ -38,10 +39,16 @@ static struct {
map_t channels;
} state;
bool lovrThreadModuleInit(void) {
bool lovrThreadModuleInit(int32_t workers) {
if (atomic_fetch_add(&state.ref, 1)) return false;
mtx_init(&state.channelLock, mtx_plain);
map_init(&state.channels, 0);
uint32_t cores = os_get_core_count();
if (workers < 0) workers += cores;
workers = MAX(workers, 0);
job_init(workers);
return true;
}
@ -54,6 +61,7 @@ void lovrThreadModuleDestroy(void) {
}
mtx_destroy(&state.channelLock);
map_free(&state.channels);
job_destroy();
memset(&state, 0, sizeof(state));
}

View File

@ -14,7 +14,7 @@ struct Variant;
typedef struct Thread Thread;
typedef struct Channel Channel;
bool lovrThreadModuleInit(void);
bool lovrThreadModuleInit(int32_t workers);
void lovrThreadModuleDestroy(void);
struct Channel* lovrThreadGetChannel(const char* name);