From 4ea95a6130e7cd62788b5dc05bd932327e744a14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=A4rkl?= Date: Thu, 13 Jun 2019 21:51:28 +0200 Subject: [PATCH] Implement Congestion Control Thread --- lib/CMakeLists.txt | 6 +- lib/include/chiaki/congestioncontrol.h | 44 ++++++++++++ lib/include/chiaki/takion.h | 14 ++++ lib/src/congestioncontrol.c | 97 ++++++++++++++++++++++++++ lib/src/takion.c | 39 ++++++++++- 5 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 lib/include/chiaki/congestioncontrol.h create mode 100644 lib/src/congestioncontrol.c diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index c256b0e..0ba5000 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -22,7 +22,8 @@ set(HEADER_FILES include/chiaki/videoreceiver.h include/chiaki/frameprocessor.h include/chiaki/seqnum.h - include/chiaki/discovery.h) + include/chiaki/discovery.h + include/chiaki/congestioncontrol.h) set(SOURCE_FILES src/common.c @@ -47,7 +48,8 @@ set(SOURCE_FILES src/audioreceiver.c src/videoreceiver.c src/frameprocessor.c - src/discovery.c) + src/discovery.c + src/congestioncontrol.c) add_subdirectory(protobuf) include_directories("${NANOPB_SOURCE_DIR}") diff --git a/lib/include/chiaki/congestioncontrol.h b/lib/include/chiaki/congestioncontrol.h new file mode 100644 index 0000000..a7db7b4 --- /dev/null +++ b/lib/include/chiaki/congestioncontrol.h @@ -0,0 +1,44 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#ifndef CHIAKI_CONGESTIONCONTROL_H +#define CHIAKI_CONGESTIONCONTROL_H + +#include "takion.h" +#include "thread.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct chiaki_congestion_control_t +{ + ChiakiTakion *takion; + ChiakiThread thread; + ChiakiCond stop_cond; + ChiakiMutex stop_cond_mutex; + bool stop_cond_predicate; +} ChiakiCongestionControl; + +CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion); +CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_stop(ChiakiCongestionControl *control); + +#ifdef __cplusplus +} +#endif + +#endif // CHIAKI_CONGESTIONCONTROL_H diff --git a/lib/include/chiaki/takion.h b/lib/include/chiaki/takion.h index 43d9801..ec9e4ee 100644 --- a/lib/include/chiaki/takion.h +++ b/lib/include/chiaki/takion.h @@ -58,6 +58,14 @@ typedef struct chiaki_takion_av_packet_t } ChiakiTakionAVPacket; +typedef struct chiaki_takion_congestion_packet_t +{ + uint16_t word_0; + uint16_t word_1; + uint16_t word_2; +} ChiakiTakionCongestionPacket; + + typedef void (*ChiakiTakionDataCallback)(ChiakiTakionMessageDataType, uint8_t *buf, size_t buf_size, void *user); typedef void (*ChiakiTakionAVCallback)(ChiakiTakionAVPacket *packet, void *user); @@ -77,8 +85,13 @@ typedef struct chiaki_takion_connect_info_t typedef struct chiaki_takion_t { ChiakiLog *log; + ChiakiGKCrypt *gkcrypt_local; // if NULL (default), no gmac is calculated and nothing is encrypted + size_t key_pos_local; + ChiakiMutex gkcrypt_local_mutex; + ChiakiGKCrypt *gkcrypt_remote; // if NULL (default), remote gmacs are IGNORED (!) and everything is expected to be unencrypted + ChiakiTakionDataCallback data_cb; void *data_cb_user; ChiakiTakionAVCallback av_cb; @@ -99,6 +112,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki CHIAKI_EXPORT void chiaki_takion_close(ChiakiTakion *takion); CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, uint8_t *buf, size_t buf_size); CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint32_t key_pos, uint8_t type_b, uint16_t channel, uint8_t *buf, size_t buf_size); +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_congestion(ChiakiTakion *takion, ChiakiTakionCongestionPacket *packet); static inline void chiaki_takion_set_crypt(ChiakiTakion *takion, ChiakiGKCrypt *gkcrypt_local, ChiakiGKCrypt *gkcrypt_remote) { takion->gkcrypt_local = gkcrypt_local; diff --git a/lib/src/congestioncontrol.c b/lib/src/congestioncontrol.c new file mode 100644 index 0000000..50a6a7c --- /dev/null +++ b/lib/src/congestioncontrol.c @@ -0,0 +1,97 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#include + + +#define CONGESTION_CONTROL_INTERVAL 200 // ms + + +static void *congestion_control_thread_func(void *user) +{ + ChiakiCongestionControl *control = user; + + ChiakiErrorCode err = chiaki_mutex_lock(&control->stop_cond_mutex); + if(err != CHIAKI_ERR_SUCCESS) + return NULL; + + while(true) + { + err = chiaki_cond_timedwait(&control->stop_cond, &control->stop_cond_mutex, CONGESTION_CONTROL_INTERVAL); + if(err != CHIAKI_ERR_SUCCESS && err != CHIAKI_ERR_TIMEOUT) + break; + if(control->stop_cond_predicate) + break; + // TODO: detect non-stop and non-timeout (spurious) wakeup and wait the rest of the timeout + + CHIAKI_LOGD(control->takion->log, "Sending Congestion Control Packet\n"); + ChiakiTakionCongestionPacket packet = { 0 }; // TODO: fill with real values + chiaki_takion_send_congestion(control->takion, &packet); + } + + chiaki_mutex_unlock(&control->stop_cond_mutex); + return NULL; +} + +CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion) +{ + control->takion = takion; + + control->stop_cond_predicate = false; + ChiakiErrorCode err = chiaki_cond_init(&control->stop_cond); + if(err != CHIAKI_ERR_SUCCESS) + return err; + err = chiaki_mutex_init(&control->stop_cond_mutex); + if(err != CHIAKI_ERR_SUCCESS) + { + chiaki_cond_fini(&control->stop_cond); + return err; + } + + err = chiaki_thread_create(&control->thread, congestion_control_thread_func, control); + if(err != CHIAKI_ERR_SUCCESS) + { + chiaki_mutex_fini(&control->stop_cond_mutex); + chiaki_cond_fini(&control->stop_cond); + return err; + } + + return CHIAKI_ERR_SUCCESS; +} + +CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_stop(ChiakiCongestionControl *control) +{ + ChiakiErrorCode err = chiaki_mutex_lock(&control->stop_cond_mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + control->stop_cond_predicate = true; + err = chiaki_cond_signal(&control->stop_cond); + if(err != CHIAKI_ERR_SUCCESS) + return err; + err = chiaki_mutex_unlock(&control->stop_cond_mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + + err = chiaki_thread_join(&control->thread, NULL); + if(err != CHIAKI_ERR_SUCCESS) + return err; + + chiaki_mutex_fini(&control->stop_cond_mutex); + chiaki_cond_fini(&control->stop_cond); + + return CHIAKI_ERR_SUCCESS; +} diff --git a/lib/src/takion.c b/lib/src/takion.c index 3d164b9..61333b8 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -16,6 +16,7 @@ */ #include +#include #include #include @@ -26,6 +27,8 @@ #include + + typedef enum takion_packet_type_t { TAKION_PACKET_TYPE_MESSAGE = 0, TAKION_PACKET_TYPE_VIDEO = 2, @@ -101,10 +104,14 @@ static void takion_handle_packet_av(ChiakiTakion *takion, uint8_t base_type, uin CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, ChiakiTakionConnectInfo *info) { - ChiakiErrorCode ret; + ChiakiErrorCode ret = CHIAKI_ERR_SUCCESS; takion->log = info->log; takion->gkcrypt_local = NULL; + ret = chiaki_mutex_init(&takion->gkcrypt_local_mutex); + if(ret != CHIAKI_ERR_SUCCESS) + return ret; + takion->key_pos_local = 0; takion->gkcrypt_remote = NULL; takion->data_cb = info->data_cb; takion->data_cb_user = info->data_cb_user; @@ -311,11 +318,38 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data_ack(ChiakiTakion * return chiaki_takion_send_raw(takion, buf, sizeof(buf)); } +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_congestion(ChiakiTakion *takion, ChiakiTakionCongestionPacket *packet) +{ + uint8_t buf[0xf]; + memset(buf, 0, sizeof(buf)); + buf[0] = 5; + *((uint16_t *)(buf + 1)) = htons(packet->word_0); + *((uint16_t *)(buf + 3)) = htons(packet->word_1); + *((uint16_t *)(buf + 5)) = htons(packet->word_2); + + ChiakiErrorCode err = chiaki_mutex_lock(&takion->gkcrypt_local_mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + *((uint32_t *)(buf + 0xb)) = htonl((uint32_t)takion->key_pos_local); + err = chiaki_gkcrypt_gmac(takion->gkcrypt_local, takion->key_pos_local, buf, sizeof(buf), buf + 7); + takion->key_pos_local += sizeof(buf); + chiaki_mutex_unlock(&takion->gkcrypt_local_mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + + chiaki_log_hexdump(takion->log, CHIAKI_LOG_DEBUG, buf, sizeof(buf)); + + return chiaki_takion_send_raw(takion, buf, sizeof(buf)); +} static void *takion_thread_func(void *user) { ChiakiTakion *takion = user; + ChiakiCongestionControl congestion_control; + if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS) + goto beach; + while(true) { uint8_t buf[1500]; @@ -326,6 +360,9 @@ static void *takion_thread_func(void *user) takion_handle_packet(takion, buf, received_size); } + chiaki_congestion_control_stop(&congestion_control); + +beach: close(takion->sock); close(takion->stop_pipe[0]); return NULL;