diff --git a/CMakeLists.txt b/CMakeLists.txt index 4060cc07..f4852abd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -582,6 +582,7 @@ endif() if(LOVR_ENABLE_THREAD) target_sources(lovr PRIVATE + src/core/job.c src/modules/thread/thread.c src/api/l_thread.c src/api/l_thread_channel.c diff --git a/Tupfile.lua b/Tupfile.lua index ea8d2a25..138bd054 100644 --- a/Tupfile.lua +++ b/Tupfile.lua @@ -451,6 +451,7 @@ src += (config.modules.audio or config.modules.data) and 'src/lib/miniaudio/*.c' src += config.modules.data and 'src/lib/jsmn/*.c' or nil src += config.modules.data and 'src/lib/minimp3/*.c' or nil src += config.modules.math and 'src/lib/noise/*.c' or nil +src += config.modules.thread and 'src/core/job.c' or nil -- embed resource files with xxd diff --git a/src/core/job.c b/src/core/job.c new file mode 100644 index 00000000..e9a007c0 --- /dev/null +++ b/src/core/job.c @@ -0,0 +1,133 @@ +#include "job.h" +#include +#include +#include + +#define MAX_WORKERS 64 +#define MAX_JOBS 1024 + +struct job { + job* next; + atomic_bool done; + fn_job* fn; + void* arg; +}; + +static struct { + job jobs[MAX_JOBS]; + thrd_t workers[MAX_WORKERS]; + uint32_t workerCount; + job* freeJob; + job* nextJob; + job* lastJob; + cnd_t hasJob; + mtx_t lock; + bool done; +} state; + +static int worker_loop(void* arg) { + for (;;) { + mtx_lock(&state.lock); + while (!state.nextJob && !state.done) { + cnd_wait(&state.hasJob, &state.lock); + } + + if (state.done) { + break; + } + + job* job = state.nextJob; + state.nextJob = job->next; + if (!state.nextJob) state.lastJob = NULL; + mtx_unlock(&state.lock); + + job->fn(job->arg); + job->done = true; + } + + mtx_unlock(&state.lock); + return 0; +} + +bool job_init(uint32_t count) { + if (mtx_init(&state.lock, mtx_plain) != thrd_success) return false; + if (cnd_init(&state.hasJob) != thrd_success) return false; + + state.freeJob = state.jobs; + for (uint32_t i = 0; i < MAX_JOBS - 1; i++) { + state.jobs[i].next = &state.jobs[i + 1]; + } + + if (count > MAX_WORKERS) count = MAX_WORKERS; + for (uint32_t i = 0; i < count; i++, state.workerCount++) { + if (thrd_create(&state.workers[i], worker_loop, (void*) (uintptr_t) i) != thrd_success) { + return false; + } + } + + return true; +} + +void job_destroy(void) { + state.done = true; + cnd_broadcast(&state.hasJob); + for (uint32_t i = 0; i < state.workerCount; i++) { + thrd_join(state.workers[i], NULL); + } + cnd_destroy(&state.hasJob); + mtx_destroy(&state.lock); + memset(&state, 0, sizeof(state)); +} + +job* job_start(fn_job* fn, void* arg) { + if (!state.freeJob) { + return NULL; + } + + mtx_lock(&state.lock); + + if (!state.freeJob) { + mtx_unlock(&state.lock); + return NULL; + } + + job* job = state.freeJob; + state.freeJob = job->next; + if (!state.nextJob) state.nextJob = job; + if (state.lastJob) state.lastJob->next = job; + state.lastJob = job; + + job->next = NULL; + job->done = false; + job->fn = fn; + job->arg = arg; + + mtx_unlock(&state.lock); + cnd_signal(&state.hasJob); + return job; +} + +void job_wait(job* job) { + mtx_lock(&state.lock); + + while (!job->done) { + struct job* task = state.nextJob; + if (task) state.nextJob = task->next; + if (!state.nextJob) state.lastJob = NULL; + mtx_unlock(&state.lock); + + if (task) { + task->fn(task->arg); + task->done = true; + } else { + thrd_yield(); + } + + mtx_lock(&state.lock); + } + + job->next = state.freeJob; + state.freeJob = job; + + mtx_unlock(&state.lock); +} diff --git a/src/core/job.h b/src/core/job.h new file mode 100644 index 00000000..b04c5ae1 --- /dev/null +++ b/src/core/job.h @@ -0,0 +1,13 @@ +#include +#include + +#pragma once + +typedef struct job job; + +typedef void fn_job(void* arg); + +bool job_init(uint32_t workerCount); +void job_destroy(void); +job* job_start(fn_job* fn, void* arg); +void job_wait(job* job);