thread api abstraction. fix deadlock in debug code

This commit is contained in:
Nick Kelsey 2017-06-12 12:05:51 -07:00
parent e1076a2e97
commit f736fab382
7 changed files with 179 additions and 99 deletions

View file

@ -47,14 +47,14 @@ struct hdhomerun_debug_message_t
struct hdhomerun_debug_t struct hdhomerun_debug_t
{ {
pthread_t thread; thread_task_t thread;
volatile bool enabled; volatile bool enabled;
volatile bool terminate; volatile bool terminate;
char *prefix; char *prefix;
pthread_mutex_t print_lock; thread_mutex_t print_lock;
pthread_mutex_t queue_lock; thread_mutex_t queue_lock;
pthread_mutex_t send_lock; thread_mutex_t send_lock;
thread_cond_t queue_cond; thread_cond_t queue_cond;
struct hdhomerun_debug_message_t *queue_head; struct hdhomerun_debug_message_t *queue_head;
@ -68,7 +68,7 @@ struct hdhomerun_debug_t
struct hdhomerun_sock_t *sock; struct hdhomerun_sock_t *sock;
}; };
static THREAD_FUNC_PREFIX hdhomerun_debug_thread_execute(void *arg); static void hdhomerun_debug_thread_execute(void *arg);
struct hdhomerun_debug_t *hdhomerun_debug_create(void) struct hdhomerun_debug_t *hdhomerun_debug_create(void)
{ {
@ -77,12 +77,12 @@ struct hdhomerun_debug_t *hdhomerun_debug_create(void)
return NULL; return NULL;
} }
pthread_mutex_init(&dbg->print_lock, NULL); thread_mutex_init(&dbg->print_lock);
pthread_mutex_init(&dbg->queue_lock, NULL); thread_mutex_init(&dbg->queue_lock);
pthread_mutex_init(&dbg->send_lock, NULL); thread_mutex_init(&dbg->send_lock);
thread_cond_init(&dbg->queue_cond); thread_cond_init(&dbg->queue_cond);
if (pthread_create(&dbg->thread, NULL, &hdhomerun_debug_thread_execute, dbg) != 0) { if (!thread_task_create(&dbg->thread, &hdhomerun_debug_thread_execute, dbg)) {
free(dbg); free(dbg);
return NULL; return NULL;
} }
@ -97,7 +97,8 @@ void hdhomerun_debug_destroy(struct hdhomerun_debug_t *dbg)
} }
dbg->terminate = true; dbg->terminate = true;
pthread_join(dbg->thread, NULL); thread_cond_signal(&dbg->queue_cond);
thread_task_join(dbg->thread);
if (dbg->prefix) { if (dbg->prefix) {
free(dbg->prefix); free(dbg->prefix);
@ -113,9 +114,9 @@ void hdhomerun_debug_destroy(struct hdhomerun_debug_t *dbg)
} }
thread_cond_dispose(&dbg->queue_cond); thread_cond_dispose(&dbg->queue_cond);
pthread_mutex_dispose(&dbg->print_lock); thread_mutex_dispose(&dbg->print_lock);
pthread_mutex_dispose(&dbg->queue_lock); thread_mutex_dispose(&dbg->queue_lock);
pthread_mutex_dispose(&dbg->send_lock); thread_mutex_dispose(&dbg->send_lock);
free(dbg); free(dbg);
} }
@ -143,10 +144,10 @@ void hdhomerun_debug_close(struct hdhomerun_debug_t *dbg, uint64_t timeout)
hdhomerun_debug_flush(dbg, timeout); hdhomerun_debug_flush(dbg, timeout);
} }
pthread_mutex_lock(&dbg->send_lock); thread_mutex_lock(&dbg->send_lock);
hdhomerun_debug_close_internal(dbg); hdhomerun_debug_close_internal(dbg);
dbg->connect_delay = 0; dbg->connect_delay = 0;
pthread_mutex_unlock(&dbg->send_lock); thread_mutex_unlock(&dbg->send_lock);
} }
void hdhomerun_debug_set_filename(struct hdhomerun_debug_t *dbg, const char *filename) void hdhomerun_debug_set_filename(struct hdhomerun_debug_t *dbg, const char *filename)
@ -155,15 +156,15 @@ void hdhomerun_debug_set_filename(struct hdhomerun_debug_t *dbg, const char *fil
return; return;
} }
pthread_mutex_lock(&dbg->send_lock); thread_mutex_lock(&dbg->send_lock);
if (!filename && !dbg->file_name) { if (!filename && !dbg->file_name) {
pthread_mutex_unlock(&dbg->send_lock); thread_mutex_unlock(&dbg->send_lock);
return; return;
} }
if (filename && dbg->file_name) { if (filename && dbg->file_name) {
if (strcmp(filename, dbg->file_name) == 0) { if (strcmp(filename, dbg->file_name) == 0) {
pthread_mutex_unlock(&dbg->send_lock); thread_mutex_unlock(&dbg->send_lock);
return; return;
} }
} }
@ -179,7 +180,7 @@ void hdhomerun_debug_set_filename(struct hdhomerun_debug_t *dbg, const char *fil
dbg->file_name = strdup(filename); dbg->file_name = strdup(filename);
} }
pthread_mutex_unlock(&dbg->send_lock); thread_mutex_unlock(&dbg->send_lock);
} }
void hdhomerun_debug_set_prefix(struct hdhomerun_debug_t *dbg, const char *prefix) void hdhomerun_debug_set_prefix(struct hdhomerun_debug_t *dbg, const char *prefix)
@ -188,7 +189,7 @@ void hdhomerun_debug_set_prefix(struct hdhomerun_debug_t *dbg, const char *prefi
return; return;
} }
pthread_mutex_lock(&dbg->print_lock); thread_mutex_lock(&dbg->print_lock);
if (dbg->prefix) { if (dbg->prefix) {
free(dbg->prefix); free(dbg->prefix);
@ -199,7 +200,7 @@ void hdhomerun_debug_set_prefix(struct hdhomerun_debug_t *dbg, const char *prefi
dbg->prefix = strdup(prefix); dbg->prefix = strdup(prefix);
} }
pthread_mutex_unlock(&dbg->print_lock); thread_mutex_unlock(&dbg->print_lock);
} }
void hdhomerun_debug_enable(struct hdhomerun_debug_t *dbg) void hdhomerun_debug_enable(struct hdhomerun_debug_t *dbg)
@ -242,15 +243,15 @@ void hdhomerun_debug_flush(struct hdhomerun_debug_t *dbg, uint64_t timeout)
timeout = getcurrenttime() + timeout; timeout = getcurrenttime() + timeout;
while (getcurrenttime() < timeout) { while (getcurrenttime() < timeout) {
pthread_mutex_lock(&dbg->queue_lock); thread_mutex_lock(&dbg->queue_lock);
struct hdhomerun_debug_message_t *message = dbg->queue_head; struct hdhomerun_debug_message_t *message = dbg->queue_head;
pthread_mutex_unlock(&dbg->queue_lock); thread_mutex_unlock(&dbg->queue_lock);
if (!message) { if (!message) {
return; return;
} }
msleep_approx(10); msleep_approx(16);
} }
} }
@ -273,6 +274,8 @@ void hdhomerun_debug_vprintf(struct hdhomerun_debug_t *dbg, const char *fmt, va_
return; return;
} }
message->next = NULL;
char *ptr = message->buffer; char *ptr = message->buffer;
char *end = message->buffer + sizeof(message->buffer) - 2; char *end = message->buffer + sizeof(message->buffer) - 2;
*end = 0; *end = 0;
@ -289,14 +292,14 @@ void hdhomerun_debug_vprintf(struct hdhomerun_debug_t *dbg, const char *fmt, va_
/* /*
* Debug prefix. * Debug prefix.
*/ */
pthread_mutex_lock(&dbg->print_lock); thread_mutex_lock(&dbg->print_lock);
if (dbg->prefix) { if (dbg->prefix) {
hdhomerun_sprintf(ptr, end, "%s ", dbg->prefix); hdhomerun_sprintf(ptr, end, "%s ", dbg->prefix);
ptr = strchr(ptr, 0); ptr = strchr(ptr, 0);
} }
pthread_mutex_unlock(&dbg->print_lock); thread_mutex_unlock(&dbg->print_lock);
/* /*
* Message text. * Message text.
@ -314,21 +317,21 @@ void hdhomerun_debug_vprintf(struct hdhomerun_debug_t *dbg, const char *fmt, va_
/* /*
* Enqueue. * Enqueue.
*/ */
pthread_mutex_lock(&dbg->queue_lock); thread_mutex_lock(&dbg->queue_lock);
message->next = NULL;
if (dbg->queue_tail) { if (dbg->queue_tail) {
dbg->queue_tail->next = message; dbg->queue_tail->next = message;
dbg->queue_tail = message;
} else { } else {
dbg->queue_head = message; dbg->queue_head = message;
dbg->queue_tail = message;
} }
dbg->queue_tail = message;
dbg->queue_depth++; dbg->queue_depth++;
pthread_mutex_unlock(&dbg->queue_lock); bool signal_thread = dbg->enabled || (dbg->queue_depth > 1024 + 100);
if (dbg->enabled) { thread_mutex_unlock(&dbg->queue_lock);
if (signal_thread) {
thread_cond_signal(&dbg->queue_cond); thread_cond_signal(&dbg->queue_cond);
} }
} }
@ -393,7 +396,7 @@ static bool hdhomerun_debug_output_message_sock(struct hdhomerun_debug_t *dbg, s
static bool hdhomerun_debug_output_message(struct hdhomerun_debug_t *dbg, struct hdhomerun_debug_message_t *message) static bool hdhomerun_debug_output_message(struct hdhomerun_debug_t *dbg, struct hdhomerun_debug_message_t *message)
{ {
pthread_mutex_lock(&dbg->send_lock); thread_mutex_lock(&dbg->send_lock);
bool ret; bool ret;
if (dbg->file_name) { if (dbg->file_name) {
@ -402,13 +405,13 @@ static bool hdhomerun_debug_output_message(struct hdhomerun_debug_t *dbg, struct
ret = hdhomerun_debug_output_message_sock(dbg, message); ret = hdhomerun_debug_output_message_sock(dbg, message);
} }
pthread_mutex_unlock(&dbg->send_lock); thread_mutex_unlock(&dbg->send_lock);
return ret; return ret;
} }
static void hdhomerun_debug_pop_and_free_message(struct hdhomerun_debug_t *dbg) static void hdhomerun_debug_pop_and_free_message(struct hdhomerun_debug_t *dbg)
{ {
pthread_mutex_lock(&dbg->queue_lock); thread_mutex_lock(&dbg->queue_lock);
struct hdhomerun_debug_message_t *message = dbg->queue_head; struct hdhomerun_debug_message_t *message = dbg->queue_head;
dbg->queue_head = message->next; dbg->queue_head = message->next;
@ -417,20 +420,20 @@ static void hdhomerun_debug_pop_and_free_message(struct hdhomerun_debug_t *dbg)
} }
dbg->queue_depth--; dbg->queue_depth--;
pthread_mutex_unlock(&dbg->queue_lock); thread_mutex_unlock(&dbg->queue_lock);
free(message); free(message);
} }
static THREAD_FUNC_PREFIX hdhomerun_debug_thread_execute(void *arg) static void hdhomerun_debug_thread_execute(void *arg)
{ {
struct hdhomerun_debug_t *dbg = (struct hdhomerun_debug_t *)arg; struct hdhomerun_debug_t *dbg = (struct hdhomerun_debug_t *)arg;
while (!dbg->terminate) { while (!dbg->terminate) {
pthread_mutex_lock(&dbg->queue_lock); thread_mutex_lock(&dbg->queue_lock);
struct hdhomerun_debug_message_t *message = dbg->queue_head; struct hdhomerun_debug_message_t *message = dbg->queue_head;
uint32_t queue_depth = dbg->queue_depth; uint32_t queue_depth = dbg->queue_depth;
pthread_mutex_unlock(&dbg->queue_lock); thread_mutex_unlock(&dbg->queue_lock);
if (!message) { if (!message) {
thread_cond_wait(&dbg->queue_cond); thread_cond_wait(&dbg->queue_cond);
@ -454,6 +457,4 @@ static THREAD_FUNC_PREFIX hdhomerun_debug_thread_execute(void *arg)
hdhomerun_debug_pop_and_free_message(dbg); hdhomerun_debug_pop_and_free_message(dbg);
} }
return 0;
} }

View file

@ -1,7 +1,7 @@
/* /*
* hdhomerun_discover.c * hdhomerun_discover.c
* *
* Copyright © 2006-2015 Silicondust USA Inc. <www.silicondust.com>. * Copyright © 2006-2017 Silicondust USA Inc. <www.silicondust.com>.
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
@ -427,7 +427,7 @@ int hdhomerun_discover_find_devices_v2(struct hdhomerun_discover_t *ds, uint32_t
if (getcurrenttime() >= timeout) { if (getcurrenttime() >= timeout) {
break; break;
} }
msleep_approx(10); msleep_approx(16);
continue; continue;
} }

View file

@ -1,7 +1,7 @@
/* /*
* hdhomerun_os_posix.c * hdhomerun_os_posix.c
* *
* Copyright © 2006-2016 Silicondust USA Inc. <www.silicondust.com>. * Copyright © 2006-2017 Silicondust USA Inc. <www.silicondust.com>.
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
@ -124,8 +124,59 @@ void msleep_minimum(uint64_t ms)
} }
} }
void pthread_mutex_dispose(pthread_mutex_t *mutex) struct thread_task_execute_args_t {
thread_task_func_t func;
void *arg;
};
static void *thread_task_execute(void *arg)
{ {
struct thread_task_execute_args_t *execute_args = (struct thread_task_execute_args_t *)arg;
execute_args->func(execute_args->arg);
free(execute_args);
return NULL;
}
bool thread_task_create(thread_task_t *tid, thread_task_func_t func, void *arg)
{
struct thread_task_execute_args_t *execute_args = (struct thread_task_execute_args_t *)malloc(sizeof(struct thread_task_execute_args_t));
if (!execute_args) {
return false;
}
execute_args->func = func;
execute_args->arg = arg;
if (pthread_create(tid, NULL, thread_task_execute, execute_args) != 0) {
free(execute_args);
return false;
}
return true;
}
void thread_task_join(thread_task_t tid)
{
pthread_join(tid, NULL);
}
void thread_mutex_init(thread_mutex_t *mutex)
{
pthread_mutex_init(mutex, NULL);
}
void thread_mutex_dispose(pthread_mutex_t *mutex)
{
}
void thread_mutex_lock(thread_mutex_t *mutex)
{
pthread_mutex_lock(mutex);
}
void thread_mutex_unlock(thread_mutex_t *mutex)
{
pthread_mutex_unlock(mutex);
} }
void thread_cond_init(thread_cond_t *cond) void thread_cond_init(thread_cond_t *cond)

View file

@ -1,7 +1,7 @@
/* /*
* hdhomerun_os_posix.h * hdhomerun_os_posix.h
* *
* Copyright © 2006-2015 Silicondust USA Inc. <www.silicondust.com>. * Copyright © 2006-2017 Silicondust USA Inc. <www.silicondust.com>.
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
@ -41,6 +41,9 @@
#include <pthread.h> #include <pthread.h>
typedef void (*sig_t)(int); typedef void (*sig_t)(int);
typedef void (*thread_task_func_t)(void *arg);
typedef pthread_t thread_task_t;
typedef pthread_mutex_t thread_mutex_t;
typedef struct { typedef struct {
volatile bool signaled; volatile bool signaled;
@ -49,8 +52,6 @@ typedef struct {
} thread_cond_t; } thread_cond_t;
#define LIBHDHOMERUN_API #define LIBHDHOMERUN_API
#define THREAD_FUNC_PREFIX void *
#define THREAD_FUNC_RESULT NULL
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -61,7 +62,13 @@ extern LIBHDHOMERUN_API uint64_t getcurrenttime(void);
extern LIBHDHOMERUN_API void msleep_approx(uint64_t ms); extern LIBHDHOMERUN_API void msleep_approx(uint64_t ms);
extern LIBHDHOMERUN_API void msleep_minimum(uint64_t ms); extern LIBHDHOMERUN_API void msleep_minimum(uint64_t ms);
extern LIBHDHOMERUN_API void pthread_mutex_dispose(pthread_mutex_t *mutex); extern LIBHDHOMERUN_API bool thread_task_create(thread_task_t *tid, thread_task_func_t func, void *arg);
extern LIBHDHOMERUN_API void thread_task_join(thread_task_t tid);
extern LIBHDHOMERUN_API void thread_mutex_init(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void thread_mutex_dispose(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void thread_mutex_lock(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void thread_mutex_unlock(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void thread_cond_init(thread_cond_t *cond); extern LIBHDHOMERUN_API void thread_cond_init(thread_cond_t *cond);
extern LIBHDHOMERUN_API void thread_cond_dispose(thread_cond_t *cond); extern LIBHDHOMERUN_API void thread_cond_dispose(thread_cond_t *cond);

View file

@ -1,7 +1,7 @@
/* /*
* hdhomerun_os_windows.c * hdhomerun_os_windows.c
* *
* Copyright © 2006-2015 Silicondust USA Inc. <www.silicondust.com>. * Copyright © 2006-2017 Silicondust USA Inc. <www.silicondust.com>.
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
@ -73,44 +73,67 @@ void msleep_minimum(uint64_t ms)
} }
} }
int pthread_create(pthread_t *tid, void *attr, LPTHREAD_START_ROUTINE start, void *arg) struct thread_task_execute_args_t {
thread_task_func_t func;
void *arg;
};
static DWORD WINAPI thread_task_execute(void *arg)
{ {
*tid = CreateThread(NULL, 0, start, arg, 0, NULL); struct thread_task_execute_args_t *execute_args = (struct thread_task_execute_args_t *)arg;
if (!*tid) { execute_args->func(execute_args->arg);
return (int)GetLastError(); free(execute_args);
}
return 0; return 0;
} }
int pthread_join(pthread_t tid, void **value_ptr) bool thread_task_create(thread_task_t *tid, thread_task_func_t func, void *arg)
{
struct thread_task_execute_args_t *execute_args = (struct thread_task_execute_args_t *)malloc(sizeof(struct thread_task_execute_args_t));
if (!execute_args) {
return false;
}
execute_args->func = func;
execute_args->arg = arg;
*tid = CreateThread(NULL, 0, thread_task_execute, execute_args, 0, NULL);
if (!*tid) {
free(execute_args);
return false;
}
return true;
}
void thread_task_join(thread_task_t tid)
{ {
while (1) { while (1) {
DWORD ExitCode = 0; DWORD ExitCode = 0;
if (!GetExitCodeThread(tid, &ExitCode)) { if (!GetExitCodeThread(tid, &ExitCode)) {
return (int)GetLastError(); return;
} }
if (ExitCode != STILL_ACTIVE) { if (ExitCode != STILL_ACTIVE) {
return 0; return;
} }
} }
} }
void pthread_mutex_init(pthread_mutex_t *mutex, void *attr) void thread_mutex_init(thread_mutex_t *mutex)
{ {
*mutex = CreateMutex(NULL, false, NULL); *mutex = CreateMutex(NULL, false, NULL);
} }
void pthread_mutex_dispose(pthread_mutex_t *mutex) void thread_mutex_dispose(thread_mutex_t *mutex)
{ {
CloseHandle(*mutex); CloseHandle(*mutex);
} }
void pthread_mutex_lock(pthread_mutex_t *mutex) void thread_mutex_lock(thread_mutex_t *mutex)
{ {
WaitForSingleObject(*mutex, INFINITE); WaitForSingleObject(*mutex, INFINITE);
} }
void pthread_mutex_unlock(pthread_mutex_t *mutex) void thread_mutex_unlock(thread_mutex_t *mutex)
{ {
ReleaseMutex(*mutex); ReleaseMutex(*mutex);
} }

View file

@ -1,7 +1,7 @@
/* /*
* hdhomerun_os_windows.h * hdhomerun_os_windows.h
* *
* Copyright © 2006-2015 Silicondust USA Inc. <www.silicondust.com>. * Copyright © 2006-2017 Silicondust USA Inc. <www.silicondust.com>.
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
@ -57,8 +57,9 @@
#endif #endif
typedef void (*sig_t)(int); typedef void (*sig_t)(int);
typedef HANDLE pthread_t; typedef void (*thread_task_func_t)(void *arg);
typedef HANDLE pthread_mutex_t; typedef HANDLE thread_task_t;
typedef HANDLE thread_mutex_t;
typedef HANDLE thread_cond_t; typedef HANDLE thread_cond_t;
#if !defined(va_copy) #if !defined(va_copy)
@ -71,8 +72,6 @@ typedef HANDLE thread_cond_t;
#define strncasecmp _strnicmp #define strncasecmp _strnicmp
#define fseeko _fseeki64 #define fseeko _fseeki64
#define ftello _ftelli64 #define ftello _ftelli64
#define THREAD_FUNC_PREFIX DWORD WINAPI
#define THREAD_FUNC_RESULT 0
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -83,12 +82,13 @@ extern LIBHDHOMERUN_API uint64_t getcurrenttime(void);
extern LIBHDHOMERUN_API void msleep_approx(uint64_t ms); extern LIBHDHOMERUN_API void msleep_approx(uint64_t ms);
extern LIBHDHOMERUN_API void msleep_minimum(uint64_t ms); extern LIBHDHOMERUN_API void msleep_minimum(uint64_t ms);
extern LIBHDHOMERUN_API int pthread_create(pthread_t *tid, void *attr, LPTHREAD_START_ROUTINE start, void *arg); extern LIBHDHOMERUN_API bool thread_task_create(thread_task_t *tid, thread_task_func_t func, void *arg);
extern LIBHDHOMERUN_API int pthread_join(pthread_t tid, void **value_ptr); extern LIBHDHOMERUN_API void thread_task_join(thread_task_t tid);
extern LIBHDHOMERUN_API void pthread_mutex_init(pthread_mutex_t *mutex, void *attr);
extern LIBHDHOMERUN_API void pthread_mutex_dispose(pthread_mutex_t *mutex); extern LIBHDHOMERUN_API void thread_mutex_init(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void pthread_mutex_lock(pthread_mutex_t *mutex); extern LIBHDHOMERUN_API void thread_mutex_dispose(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void pthread_mutex_unlock(pthread_mutex_t *mutex); extern LIBHDHOMERUN_API void thread_mutex_lock(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void thread_mutex_unlock(thread_mutex_t *mutex);
extern LIBHDHOMERUN_API void thread_cond_init(thread_cond_t *cond); extern LIBHDHOMERUN_API void thread_cond_init(thread_cond_t *cond);
extern LIBHDHOMERUN_API void thread_cond_dispose(thread_cond_t *cond); extern LIBHDHOMERUN_API void thread_cond_dispose(thread_cond_t *cond);

View file

@ -21,7 +21,7 @@
#include "hdhomerun.h" #include "hdhomerun.h"
struct hdhomerun_video_sock_t { struct hdhomerun_video_sock_t {
pthread_mutex_t lock; thread_mutex_t lock;
struct hdhomerun_debug_t *dbg; struct hdhomerun_debug_t *dbg;
struct hdhomerun_sock_t *sock; struct hdhomerun_sock_t *sock;
@ -36,7 +36,7 @@ struct hdhomerun_video_sock_t {
size_t buffer_size; size_t buffer_size;
size_t advance; size_t advance;
pthread_t thread; thread_task_t thread;
volatile bool terminate; volatile bool terminate;
volatile uint32_t packet_count; volatile uint32_t packet_count;
@ -49,7 +49,7 @@ struct hdhomerun_video_sock_t {
volatile uint8_t sequence[0x2000]; volatile uint8_t sequence[0x2000];
}; };
static THREAD_FUNC_PREFIX hdhomerun_video_thread_execute(void *arg); static void hdhomerun_video_thread_execute(void *arg);
struct hdhomerun_video_sock_t *hdhomerun_video_create(uint16_t listen_port, bool allow_port_reuse, size_t buffer_size, struct hdhomerun_debug_t *dbg) struct hdhomerun_video_sock_t *hdhomerun_video_create(uint16_t listen_port, bool allow_port_reuse, size_t buffer_size, struct hdhomerun_debug_t *dbg)
{ {
@ -61,7 +61,7 @@ struct hdhomerun_video_sock_t *hdhomerun_video_create(uint16_t listen_port, bool
} }
vs->dbg = dbg; vs->dbg = dbg;
pthread_mutex_init(&vs->lock, NULL); thread_mutex_init(&vs->lock);
/* Reset sequence tracking. */ /* Reset sequence tracking. */
hdhomerun_video_flush(vs); hdhomerun_video_flush(vs);
@ -98,7 +98,7 @@ struct hdhomerun_video_sock_t *hdhomerun_video_create(uint16_t listen_port, bool
} }
/* Start thread. */ /* Start thread. */
if (pthread_create(&vs->thread, NULL, &hdhomerun_video_thread_execute, vs) != 0) { if (!thread_task_create(&vs->thread, &hdhomerun_video_thread_execute, vs)) {
hdhomerun_debug_printf(dbg, "hdhomerun_video_create: failed to start thread\n"); hdhomerun_debug_printf(dbg, "hdhomerun_video_create: failed to start thread\n");
goto error; goto error;
} }
@ -115,7 +115,7 @@ error:
free(vs->buffer); free(vs->buffer);
} }
pthread_mutex_dispose(&vs->lock); thread_mutex_dispose(&vs->lock);
free(vs); free(vs);
return NULL; return NULL;
@ -124,10 +124,10 @@ error:
void hdhomerun_video_destroy(struct hdhomerun_video_sock_t *vs) void hdhomerun_video_destroy(struct hdhomerun_video_sock_t *vs)
{ {
vs->terminate = true; vs->terminate = true;
pthread_join(vs->thread, NULL); thread_task_join(vs->thread);
hdhomerun_sock_destroy(vs->sock); hdhomerun_sock_destroy(vs->sock);
pthread_mutex_dispose(&vs->lock); thread_mutex_dispose(&vs->lock);
free(vs->buffer); free(vs->buffer);
free(vs); free(vs);
@ -135,7 +135,7 @@ void hdhomerun_video_destroy(struct hdhomerun_video_sock_t *vs)
void hdhomerun_video_set_keepalive(struct hdhomerun_video_sock_t *vs, uint32_t remote_addr, uint16_t remote_port, uint32_t lockkey) void hdhomerun_video_set_keepalive(struct hdhomerun_video_sock_t *vs, uint32_t remote_addr, uint16_t remote_port, uint32_t lockkey)
{ {
pthread_mutex_lock(&vs->lock); thread_mutex_lock(&vs->lock);
vs->keepalive_addr = remote_addr; vs->keepalive_addr = remote_addr;
vs->keepalive_port = remote_port; vs->keepalive_port = remote_port;
@ -145,7 +145,7 @@ void hdhomerun_video_set_keepalive(struct hdhomerun_video_sock_t *vs, uint32_t r
vs->keepalive_start = true; vs->keepalive_start = true;
} }
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
} }
struct hdhomerun_sock_t *hdhomerun_video_get_sock(struct hdhomerun_video_sock_t *vs) struct hdhomerun_sock_t *hdhomerun_video_get_sock(struct hdhomerun_video_sock_t *vs)
@ -244,12 +244,12 @@ static void hdhomerun_video_parse_rtp(struct hdhomerun_video_sock_t *vs, struct
static void hdhomerun_video_thread_send_keepalive(struct hdhomerun_video_sock_t *vs) static void hdhomerun_video_thread_send_keepalive(struct hdhomerun_video_sock_t *vs)
{ {
pthread_mutex_lock(&vs->lock); thread_mutex_lock(&vs->lock);
uint32_t keepalive_lockkey = vs->keepalive_lockkey; uint32_t keepalive_lockkey = vs->keepalive_lockkey;
uint32_t keepalive_addr = vs->keepalive_addr; uint32_t keepalive_addr = vs->keepalive_addr;
uint16_t keepalive_port = vs->keepalive_port; uint16_t keepalive_port = vs->keepalive_port;
vs->keepalive_start = false; vs->keepalive_start = false;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
if ((keepalive_addr == 0) || (keepalive_port == 0)) { if ((keepalive_addr == 0) || (keepalive_port == 0)) {
return; return;
@ -261,7 +261,7 @@ static void hdhomerun_video_thread_send_keepalive(struct hdhomerun_video_sock_t
hdhomerun_sock_sendto(vs->sock, keepalive_addr, keepalive_port, pkt.start, pkt.end - pkt.start, 25); hdhomerun_sock_sendto(vs->sock, keepalive_addr, keepalive_port, pkt.start, pkt.end - pkt.start, 25);
} }
static THREAD_FUNC_PREFIX hdhomerun_video_thread_execute(void *arg) static void hdhomerun_video_thread_execute(void *arg)
{ {
struct hdhomerun_video_sock_t *vs = (struct hdhomerun_video_sock_t *)arg; struct hdhomerun_video_sock_t *vs = (struct hdhomerun_video_sock_t *)arg;
uint64_t send_time = getcurrenttime(); uint64_t send_time = getcurrenttime();
@ -294,7 +294,7 @@ static THREAD_FUNC_PREFIX hdhomerun_video_thread_execute(void *arg)
continue; continue;
} }
pthread_mutex_lock(&vs->lock); thread_mutex_lock(&vs->lock);
/* Store in ring buffer. */ /* Store in ring buffer. */
size_t head = vs->head; size_t head = vs->head;
@ -320,21 +320,19 @@ static THREAD_FUNC_PREFIX hdhomerun_video_thread_execute(void *arg)
/* Check for buffer overflow. */ /* Check for buffer overflow. */
if (head == vs->tail) { if (head == vs->tail) {
vs->overflow_error_count++; vs->overflow_error_count++;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
continue; continue;
} }
vs->head = head; vs->head = head;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
} }
return THREAD_FUNC_RESULT;
} }
uint8_t *hdhomerun_video_recv(struct hdhomerun_video_sock_t *vs, size_t max_size, size_t *pactual_size) uint8_t *hdhomerun_video_recv(struct hdhomerun_video_sock_t *vs, size_t max_size, size_t *pactual_size)
{ {
pthread_mutex_lock(&vs->lock); thread_mutex_lock(&vs->lock);
size_t head = vs->head; size_t head = vs->head;
size_t tail = vs->tail; size_t tail = vs->tail;
@ -351,7 +349,7 @@ uint8_t *hdhomerun_video_recv(struct hdhomerun_video_sock_t *vs, size_t max_size
if (head == tail) { if (head == tail) {
vs->advance = 0; vs->advance = 0;
*pactual_size = 0; *pactual_size = 0;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
return NULL; return NULL;
} }
@ -359,7 +357,7 @@ uint8_t *hdhomerun_video_recv(struct hdhomerun_video_sock_t *vs, size_t max_size
if (size == 0) { if (size == 0) {
vs->advance = 0; vs->advance = 0;
*pactual_size = 0; *pactual_size = 0;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
return NULL; return NULL;
} }
@ -376,13 +374,13 @@ uint8_t *hdhomerun_video_recv(struct hdhomerun_video_sock_t *vs, size_t max_size
*pactual_size = size; *pactual_size = size;
uint8_t *result = vs->buffer + tail; uint8_t *result = vs->buffer + tail;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
return result; return result;
} }
void hdhomerun_video_flush(struct hdhomerun_video_sock_t *vs) void hdhomerun_video_flush(struct hdhomerun_video_sock_t *vs)
{ {
pthread_mutex_lock(&vs->lock); thread_mutex_lock(&vs->lock);
vs->tail = vs->head; vs->tail = vs->head;
vs->advance = 0; vs->advance = 0;
@ -400,7 +398,7 @@ void hdhomerun_video_flush(struct hdhomerun_video_sock_t *vs)
vs->sequence_error_count = 0; vs->sequence_error_count = 0;
vs->overflow_error_count = 0; vs->overflow_error_count = 0;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
} }
void hdhomerun_video_debug_print_stats(struct hdhomerun_video_sock_t *vs) void hdhomerun_video_debug_print_stats(struct hdhomerun_video_sock_t *vs)
@ -419,7 +417,7 @@ void hdhomerun_video_get_stats(struct hdhomerun_video_sock_t *vs, struct hdhomer
{ {
memset(stats, 0, sizeof(struct hdhomerun_video_stats_t)); memset(stats, 0, sizeof(struct hdhomerun_video_stats_t));
pthread_mutex_lock(&vs->lock); thread_mutex_lock(&vs->lock);
stats->packet_count = vs->packet_count; stats->packet_count = vs->packet_count;
stats->network_error_count = vs->network_error_count; stats->network_error_count = vs->network_error_count;
@ -427,5 +425,5 @@ void hdhomerun_video_get_stats(struct hdhomerun_video_sock_t *vs, struct hdhomer
stats->sequence_error_count = vs->sequence_error_count; stats->sequence_error_count = vs->sequence_error_count;
stats->overflow_error_count = vs->overflow_error_count; stats->overflow_error_count = vs->overflow_error_count;
pthread_mutex_unlock(&vs->lock); thread_mutex_unlock(&vs->lock);
} }