Reorganize thread code;

- Put channel into thread module file.
- Make thread internals private.
- Handle more thread bookkeeping in thread module instead of Lua API.
- Fix a few race conditions/leaks nobody was probably ever going to hit.
This commit is contained in:
bjorn 2022-12-10 20:11:14 -08:00
parent 06dad182db
commit f013831b73
10 changed files with 246 additions and 235 deletions

View File

@ -575,7 +575,6 @@ endif()
if(LOVR_ENABLE_THREAD)
target_sources(lovr PRIVATE
src/modules/thread/channel.c
src/modules/thread/thread.c
src/api/l_thread.c
src/api/l_thread_channel.c

View File

@ -141,6 +141,7 @@ static int nextEvent(lua_State* L) {
luax_pushtype(L, Thread, event.data.thread.thread);
lua_pushstring(L, event.data.thread.error);
lovrRelease(event.data.thread.thread, lovrThreadDestroy);
free(event.data.thread.error);
return 3;
#endif

View File

@ -1,20 +1,13 @@
#include "api.h"
#include "data/blob.h"
#include "event/event.h"
#include "thread/thread.h"
#include "thread/channel.h"
#include "util.h"
#include <lualib.h>
#include <stdlib.h>
#include <string.h>
static int threadRunner(void* data) {
Thread* thread = (Thread*) data;
lovrRetain(thread);
mtx_lock(&thread->lock);
thread->running = true;
mtx_unlock(&thread->lock);
static char* threadRunner(Thread* thread, Blob* body, Variant* arguments, uint32_t argumentCount) {
lua_State* L = luaL_newstate();
luaL_openlibs(L);
luax_preload(L);
@ -23,42 +16,29 @@ static int threadRunner(void* data) {
lua_pushcfunction(L, luax_getstack);
int errhandler = lua_gettop(L);
if (!luaL_loadbuffer(L, thread->body->data, thread->body->size, "thread")) {
for (uint32_t i = 0; i < thread->argumentCount; i++) {
luax_pushvariant(L, &thread->arguments[i]);
if (!luaL_loadbuffer(L, body->data, body->size, "thread")) {
for (uint32_t i = 0; i < argumentCount; i++) {
luax_pushvariant(L, &arguments[i]);
}
if (!lua_pcall(L, thread->argumentCount, 0, errhandler)) {
mtx_lock(&thread->lock);
thread->running = false;
mtx_unlock(&thread->lock);
lovrRelease(thread, lovrThreadDestroy);
lua_close(L);
return 0;
if (!lua_pcall(L, argumentCount, 0, errhandler)) {
return NULL;
}
}
mtx_lock(&thread->lock);
// Error handling
size_t length;
const char* error = lua_tolstring(L, -1, &length);
const char* message = lua_tolstring(L, -1, &length);
char* error = message ? malloc(length + 1) : NULL;
if (error) {
thread->error = malloc(length + 1);
if (thread->error) {
memcpy(thread->error, error, length + 1);
lovrEventPush((Event) {
.type = EVENT_THREAD_ERROR,
.data.thread = { thread, thread->error }
});
}
memcpy(error, message, length + 1);
lua_close(L);
return error;
}
thread->running = false;
mtx_unlock(&thread->lock);
lovrRelease(thread, lovrThreadDestroy);
lua_close(L);
return 1;
return NULL;
}
static int l_lovrThreadNewThread(lua_State* L) {

View File

@ -1,5 +1,5 @@
#include "api.h"
#include "thread/channel.h"
#include "thread/thread.h"
#include "event/event.h"
#include "util.h"
#include <math.h>

View File

@ -1,4 +1,5 @@
#include "api.h"
#include "event/event.h"
#include "thread/thread.h"
#include "util.h"
@ -32,7 +33,8 @@ static int l_lovrThreadGetError(lua_State* L) {
static int l_lovrThreadIsRunning(lua_State* L) {
Thread* thread = luax_checktype(L, 1, Thread);
lua_pushboolean(L, thread->running);
bool running = lovrThreadIsRunning(thread);
lua_pushboolean(L, running);
return 1;
}

View File

@ -53,6 +53,12 @@ void lovrEventPush(Event event) {
#ifndef LOVR_DISABLE_THREAD
if (event.type == EVENT_THREAD_ERROR) {
lovrRetain(event.data.thread.thread);
size_t length = strlen(event.data.thread.error);
char* copy = malloc(length + 1);
lovrAssert(copy, "Out of memory");
memcpy(copy, event.data.thread.error, length);
copy[length] = '\0';
event.data.thread.error = copy;
}
#endif

View File

@ -1,151 +0,0 @@
#include "thread/channel.h"
#include "event/event.h"
#include "util.h"
#include "lib/tinycthread/tinycthread.h"
#include <stdlib.h>
#include <stddef.h>
#include <math.h>
struct Channel {
uint32_t ref;
mtx_t lock;
cnd_t cond;
arr_t(Variant) messages;
size_t head;
uint64_t sent;
uint64_t received;
uint64_t hash;
};
Channel* lovrChannelCreate(uint64_t hash) {
Channel* channel = calloc(1, sizeof(Channel));
lovrAssert(channel, "Out of memory");
channel->ref = 1;
arr_init(&channel->messages, arr_alloc);
mtx_init(&channel->lock, mtx_plain | mtx_timed);
cnd_init(&channel->cond);
channel->hash = hash;
return channel;
}
void lovrChannelDestroy(void* ref) {
Channel* channel = ref;
lovrChannelClear(channel);
arr_free(&channel->messages);
mtx_destroy(&channel->lock);
cnd_destroy(&channel->cond);
free(channel);
}
bool lovrChannelPush(Channel* channel, Variant* variant, double timeout, uint64_t* id) {
mtx_lock(&channel->lock);
if (channel->messages.length == 0) {
lovrRetain(channel);
}
arr_push(&channel->messages, *variant);
*id = ++channel->sent;
cnd_broadcast(&channel->cond);
if (isnan(timeout) || timeout < 0) {
mtx_unlock(&channel->lock);
return false;
}
while (channel->received < *id && timeout >= 0) {
if (isinf(timeout)) {
cnd_wait(&channel->cond, &channel->lock);
} else {
struct timespec start;
struct timespec until;
struct timespec stop;
timespec_get(&start, TIME_UTC);
double whole, fraction;
fraction = modf(timeout, &whole);
until.tv_sec = start.tv_sec + whole;
until.tv_nsec = start.tv_nsec + fraction * 1e9;
cnd_timedwait(&channel->cond, &channel->lock, &until);
timespec_get(&stop, TIME_UTC);
timeout -= (stop.tv_sec - start.tv_sec) + (stop.tv_nsec - start.tv_nsec) / 1e9;
}
}
bool read = channel->received >= *id;
mtx_unlock(&channel->lock);
return read;
}
bool lovrChannelPop(Channel* channel, Variant* variant, double timeout) {
mtx_lock(&channel->lock);
do {
if (channel->head < channel->messages.length) {
*variant = channel->messages.data[channel->head++];
if (channel->head == channel->messages.length) {
channel->head = channel->messages.length = 0;
lovrRelease(channel, lovrChannelDestroy);
}
channel->received++;
cnd_broadcast(&channel->cond);
mtx_unlock(&channel->lock);
return true;
} else if (isnan(timeout) || timeout < 0) {
mtx_unlock(&channel->lock);
return false;
}
if (isinf(timeout)) {
cnd_wait(&channel->cond, &channel->lock);
} else {
struct timespec start;
struct timespec until;
struct timespec stop;
timespec_get(&start, TIME_UTC);
double whole, fraction;
fraction = modf(timeout, &whole);
until.tv_sec = start.tv_sec + whole;
until.tv_nsec = start.tv_nsec + fraction * 1e9;
cnd_timedwait(&channel->cond, &channel->lock, &until);
timespec_get(&stop, TIME_UTC);
timeout -= (stop.tv_sec - start.tv_sec) + (stop.tv_nsec - start.tv_nsec) / (double) 1e9;
}
} while (1);
}
bool lovrChannelPeek(Channel* channel, Variant* variant) {
mtx_lock(&channel->lock);
if (channel->head < channel->messages.length) {
*variant = channel->messages.data[channel->head];
mtx_unlock(&channel->lock);
return true;
}
mtx_unlock(&channel->lock);
return false;
}
void lovrChannelClear(Channel* channel) {
mtx_lock(&channel->lock);
for (size_t i = channel->head; i < channel->messages.length; i++) {
lovrVariantDestroy(&channel->messages.data[i]);
}
channel->received = channel->sent;
arr_clear(&channel->messages);
channel->head = 0;
cnd_broadcast(&channel->cond);
mtx_unlock(&channel->lock);
}
uint64_t lovrChannelGetCount(Channel* channel) {
mtx_lock(&channel->lock);
uint64_t length = channel->messages.length - channel->head;
mtx_unlock(&channel->lock);
return length;
}
bool lovrChannelHasRead(Channel* channel, uint64_t id) {
mtx_lock(&channel->lock);
bool received = channel->received >= id;
mtx_unlock(&channel->lock);
return received;
}

View File

@ -1,16 +0,0 @@
#include <stdbool.h>
#include <stdint.h>
#pragma once
struct Variant;
typedef struct Channel Channel;
Channel* lovrChannelCreate(uint64_t hash);
void lovrChannelDestroy(void* ref);
bool lovrChannelPush(Channel* channel, struct Variant* variant, double timeout, uint64_t* id);
bool lovrChannelPop(Channel* channel, struct Variant* variant, double timeout);
bool lovrChannelPeek(Channel* channel, struct Variant* variant);
void lovrChannelClear(Channel* channel);
uint64_t lovrChannelGetCount(Channel* channel);
bool lovrChannelHasRead(Channel* channel, uint64_t id);

View File

@ -1,9 +1,35 @@
#include "thread/thread.h"
#include "thread/channel.h"
#include "data/blob.h"
#include "event/event.h"
#include "util.h"
#include "lib/tinycthread/tinycthread.h"
#include <math.h>
#include <stdlib.h>
#include <string.h>
struct Thread {
uint32_t ref;
thrd_t handle;
mtx_t lock;
ThreadFunction* function;
Blob* body;
Variant arguments[MAX_THREAD_ARGUMENTS];
uint32_t argumentCount;
char* error;
bool running;
};
struct Channel {
uint32_t ref;
mtx_t lock;
cnd_t cond;
arr_t(Variant) messages;
size_t head;
uint64_t sent;
uint64_t received;
uint64_t hash;
};
static struct {
bool initialized;
mtx_t channelLock;
@ -47,12 +73,35 @@ Channel* lovrThreadGetChannel(const char* name) {
return channel;
}
Thread* lovrThreadCreate(int (*runner)(void*), Blob* body) {
// Thread
static int threadFunction(void* data) {
Thread* thread = data;
lovrRetain(thread);
char* error = thread->function(thread, thread->body, thread->arguments, thread->argumentCount);
mtx_lock(&thread->lock);
thread->running = false;
if (error) {
thread->error = error;
lovrEventPush((Event) {
.type = EVENT_THREAD_ERROR,
.data.thread = { thread, thread->error }
});
}
mtx_unlock(&thread->lock);
lovrRelease(thread, lovrThreadDestroy);
return 0;
}
Thread* lovrThreadCreate(ThreadFunction* function, Blob* body) {
Thread* thread = calloc(1, sizeof(Thread));
lovrAssert(thread, "Out of memory");
thread->ref = 1;
thread->runner = runner;
thread->body = body;
thread->function = function;
mtx_init(&thread->lock, mtx_plain);
lovrRetain(body);
return thread;
@ -68,20 +117,26 @@ void lovrThreadDestroy(void* ref) {
}
void lovrThreadStart(Thread* thread, Variant* arguments, uint32_t argumentCount) {
bool running = lovrThreadIsRunning(thread);
if (running) {
mtx_lock(&thread->lock);
if (thread->running) {
mtx_unlock(&thread->lock);
return;
}
free(thread->error);
thread->error = NULL;
lovrAssert(argumentCount <= MAX_THREAD_ARGUMENTS, "Too many Thread arguments (max is %d)", MAX_THREAD_ARGUMENTS);
thread->argumentCount = argumentCount;
memcpy(thread->arguments, arguments, argumentCount * sizeof(Variant));
if (thrd_create(&thread->handle, thread->runner, thread) != thrd_success) {
thread->argumentCount = argumentCount;
if (thrd_create(&thread->handle, threadFunction, thread) != thrd_success) {
mtx_unlock(&thread->lock);
lovrThrow("Could not create thread...sorry");
}
thread->running = true;
mtx_unlock(&thread->lock);
}
void lovrThreadWait(Thread* thread) {
@ -89,12 +144,144 @@ void lovrThreadWait(Thread* thread) {
}
bool lovrThreadIsRunning(Thread* thread) {
mtx_lock(&thread->lock);
bool running = thread->running;
mtx_unlock(&thread->lock);
return running;
return thread->running;
}
const char* lovrThreadGetError(Thread* thread) {
return thread->error;
}
// Channel
Channel* lovrChannelCreate(uint64_t hash) {
Channel* channel = calloc(1, sizeof(Channel));
lovrAssert(channel, "Out of memory");
channel->ref = 1;
arr_init(&channel->messages, arr_alloc);
mtx_init(&channel->lock, mtx_plain | mtx_timed);
cnd_init(&channel->cond);
channel->hash = hash;
return channel;
}
void lovrChannelDestroy(void* ref) {
Channel* channel = ref;
lovrChannelClear(channel);
arr_free(&channel->messages);
mtx_destroy(&channel->lock);
cnd_destroy(&channel->cond);
free(channel);
}
bool lovrChannelPush(Channel* channel, Variant* variant, double timeout, uint64_t* id) {
mtx_lock(&channel->lock);
if (channel->messages.length == 0) {
lovrRetain(channel);
}
arr_push(&channel->messages, *variant);
*id = ++channel->sent;
cnd_broadcast(&channel->cond);
if (isnan(timeout) || timeout < 0) {
mtx_unlock(&channel->lock);
return false;
}
while (channel->received < *id && timeout >= 0) {
if (isinf(timeout)) {
cnd_wait(&channel->cond, &channel->lock);
} else {
struct timespec start;
struct timespec until;
struct timespec stop;
timespec_get(&start, TIME_UTC);
double whole, fraction;
fraction = modf(timeout, &whole);
until.tv_sec = start.tv_sec + whole;
until.tv_nsec = start.tv_nsec + fraction * 1e9;
cnd_timedwait(&channel->cond, &channel->lock, &until);
timespec_get(&stop, TIME_UTC);
timeout -= (stop.tv_sec - start.tv_sec) + (stop.tv_nsec - start.tv_nsec) / 1e9;
}
}
bool read = channel->received >= *id;
mtx_unlock(&channel->lock);
return read;
}
bool lovrChannelPop(Channel* channel, Variant* variant, double timeout) {
mtx_lock(&channel->lock);
do {
if (channel->head < channel->messages.length) {
*variant = channel->messages.data[channel->head++];
if (channel->head == channel->messages.length) {
channel->head = channel->messages.length = 0;
lovrRelease(channel, lovrChannelDestroy);
}
channel->received++;
cnd_broadcast(&channel->cond);
mtx_unlock(&channel->lock);
return true;
} else if (isnan(timeout) || timeout < 0) {
mtx_unlock(&channel->lock);
return false;
}
if (isinf(timeout)) {
cnd_wait(&channel->cond, &channel->lock);
} else {
struct timespec start;
struct timespec until;
struct timespec stop;
timespec_get(&start, TIME_UTC);
double whole, fraction;
fraction = modf(timeout, &whole);
until.tv_sec = start.tv_sec + whole;
until.tv_nsec = start.tv_nsec + fraction * 1e9;
cnd_timedwait(&channel->cond, &channel->lock, &until);
timespec_get(&stop, TIME_UTC);
timeout -= (stop.tv_sec - start.tv_sec) + (stop.tv_nsec - start.tv_nsec) / (double) 1e9;
}
} while (1);
}
bool lovrChannelPeek(Channel* channel, Variant* variant) {
mtx_lock(&channel->lock);
if (channel->head < channel->messages.length) {
*variant = channel->messages.data[channel->head];
mtx_unlock(&channel->lock);
return true;
}
mtx_unlock(&channel->lock);
return false;
}
void lovrChannelClear(Channel* channel) {
mtx_lock(&channel->lock);
for (size_t i = channel->head; i < channel->messages.length; i++) {
lovrVariantDestroy(&channel->messages.data[i]);
}
channel->received = channel->sent;
arr_clear(&channel->messages);
channel->head = 0;
cnd_broadcast(&channel->cond);
mtx_unlock(&channel->lock);
}
uint64_t lovrChannelGetCount(Channel* channel) {
mtx_lock(&channel->lock);
uint64_t length = channel->messages.length - channel->head;
mtx_unlock(&channel->lock);
return length;
}
bool lovrChannelHasRead(Channel* channel, uint64_t id) {
mtx_lock(&channel->lock);
bool received = channel->received >= id;
mtx_unlock(&channel->lock);
return received;
}

View File

@ -1,6 +1,3 @@
#include "data/blob.h"
#include "event/event.h"
#include "lib/tinycthread/tinycthread.h"
#include <stdbool.h>
#include <stdint.h>
@ -11,28 +8,34 @@
#define MAX_THREAD_ARGUMENTS 4
struct Channel;
struct Blob;
struct Variant;
typedef struct Thread {
uint32_t ref;
thrd_t handle;
mtx_t lock;
Blob* body;
Variant arguments[MAX_THREAD_ARGUMENTS];
uint32_t argumentCount;
int (*runner)(void*);
char* error;
bool running;
} Thread;
typedef struct Thread Thread;
typedef struct Channel Channel;
bool lovrThreadModuleInit(void);
void lovrThreadModuleDestroy(void);
struct Channel* lovrThreadGetChannel(const char* name);
void lovrThreadRemoveChannel(uint64_t hash);
Thread* lovrThreadCreate(int (*runner)(void*), Blob* body);
// Thread
typedef char* ThreadFunction(Thread* thread, struct Blob* body, struct Variant* arguments, uint32_t argumentCount);
Thread* lovrThreadCreate(ThreadFunction* function, struct Blob* body);
void lovrThreadDestroy(void* ref);
void lovrThreadStart(Thread* thread, Variant* arguments, uint32_t argumentCount);
void lovrThreadStart(Thread* thread, struct Variant* arguments, uint32_t argumentCount);
void lovrThreadWait(Thread* thread);
const char* lovrThreadGetError(Thread* thread);
bool lovrThreadIsRunning(Thread* thread);
const char* lovrThreadGetError(Thread* thread);
// Channel
Channel* lovrChannelCreate(uint64_t hash);
void lovrChannelDestroy(void* ref);
bool lovrChannelPush(Channel* channel, struct Variant* variant, double timeout, uint64_t* id);
bool lovrChannelPop(Channel* channel, struct Variant* variant, double timeout);
bool lovrChannelPeek(Channel* channel, struct Variant* variant);
void lovrChannelClear(Channel* channel);
uint64_t lovrChannelGetCount(Channel* channel);
bool lovrChannelHasRead(Channel* channel, uint64_t id);