From 74f2592a0889eb145e5ac5e445d5f481947b84f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=A4rkl?= Date: Thu, 27 Jun 2019 17:06:49 +0200 Subject: [PATCH] Use ChiakiReorderQueue for Data in Takion --- lib/include/chiaki/takion.h | 3 + lib/src/reorderqueue.c | 16 +++++- lib/src/takion.c | 112 +++++++++++++++++++++++++++++------- lib/src/thread.c | 1 - 4 files changed, 107 insertions(+), 25 deletions(-) diff --git a/lib/include/chiaki/takion.h b/lib/include/chiaki/takion.h index e66ebd7..b5abecd 100644 --- a/lib/include/chiaki/takion.h +++ b/lib/include/chiaki/takion.h @@ -24,6 +24,7 @@ #include "gkcrypt.h" #include "seqnum.h" #include "stoppipe.h" +#include "reorderqueue.h" #include #include @@ -125,6 +126,8 @@ typedef struct chiaki_takion_t ChiakiGKCrypt *gkcrypt_remote; // if NULL (default), remote gmacs are IGNORED (!) and everything is expected to be unencrypted + ChiakiReorderQueue data_queue; + ChiakiTakionCallback cb; void *cb_user; int sock; diff --git a/lib/src/reorderqueue.c b/lib/src/reorderqueue.c index ccae4a1..1cf52b8 100644 --- a/lib/src/reorderqueue.c +++ b/lib/src/reorderqueue.c @@ -62,6 +62,16 @@ REORDER_QUEUE_INIT(32) CHIAKI_EXPORT void chiaki_reorder_queue_fini(ChiakiReorderQueue *queue) { + if(queue->drop_cb) + { + for(uint64_t i=0; icount; i++) + { + uint64_t seq_num = add(queue->begin, i); + ChiakiReorderQueueEntry *entry = &queue->queue[idx(seq_num)]; + if(entry->set) + queue->drop_cb(seq_num, entry->user, queue->drop_cb_user); + } + } free(queue->queue); } @@ -140,8 +150,10 @@ CHIAKI_EXPORT bool chiaki_reorder_queue_pull(ChiakiReorderQueue *queue, uint64_t if(!entry->set) return false; - *seq_num = queue->begin; - *user = entry->user; + if(seq_num) + *seq_num = queue->begin; + if(user) + *user = entry->user; queue->begin = add(queue->begin, 1); queue->count--; return true; diff --git a/lib/src/takion.c b/lib/src/takion.c index f1056fd..2531442 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -84,6 +84,8 @@ ssize_t takion_packet_type_key_pos_offset(TakionPacketType type) #define TAKION_LOCAL_MIN 0x64 #define TAKION_LOCAL_MAX 0x64 +#define TAKION_REORDER_QUEUE_SIZE_EXP 4 // => 16 entries + #define MESSAGE_HEADER_SIZE 0x10 typedef enum takion_message_type_a_t { @@ -130,6 +132,14 @@ typedef struct takion_message_payload_init_ack_t uint8_t cookie[TAKION_COOKIE_SIZE]; } TakionMessagePayloadInitAck; +typedef struct +{ + uint8_t type_b; + uint8_t *buf; + size_t buf_size; + uint16_t channel; +} TakionDataPacketEntry; + static void *takion_thread_func(void *user); static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_size); @@ -472,6 +482,11 @@ static ChiakiErrorCode takion_handshake(ChiakiTakion *takion) return CHIAKI_ERR_SUCCESS; } +static void takion_data_drop(uint64_t seq_num, void *elem_user, void *cb_user) +{ + free(elem_user); +} + static void *takion_thread_func(void *user) { ChiakiTakion *takion = user; @@ -479,22 +494,37 @@ static void *takion_thread_func(void *user) if(takion_handshake(takion) != CHIAKI_ERR_SUCCESS) goto beach; + if(chiaki_reorder_queue_init_32(&takion->data_queue, TAKION_REORDER_QUEUE_SIZE_EXP, takion->tag_remote) != CHIAKI_ERR_SUCCESS) + goto beach; + + chiaki_reorder_queue_set_drop_cb(&takion->data_queue, takion_data_drop, NULL); + // TODO ChiakiCongestionControl congestion_control; // if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS) // goto beach; while(true) { - uint8_t buf[1500]; - size_t received_size = sizeof(buf); + size_t received_size = 1500; + uint8_t *buf = malloc(received_size); // TODO: no malloc? + if(!buf) + break; ChiakiErrorCode err = takion_recv(takion, buf, &received_size, NULL); if(err != CHIAKI_ERR_SUCCESS) break; - takion_handle_packet(takion, buf, received_size); + uint8_t *resized_buf = realloc(buf, received_size); + if(!resized_buf) + { + free(buf); + continue; + } + takion_handle_packet(takion, resized_buf, received_size); } // chiaki_congestion_control_stop(&congestion_control); + chiaki_reorder_queue_fini(&takion->data_queue); + beach: if(takion->cb) { @@ -569,7 +599,10 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_ uint8_t base_type = (uint8_t)(buf[0] & 0xf); if(takion_handle_packet_mac(takion, base_type, buf, buf_size) != CHIAKI_ERR_SUCCESS) + { + free(buf); return; + } switch(base_type) { @@ -579,9 +612,11 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_ case TAKION_PACKET_TYPE_VIDEO: case TAKION_PACKET_TYPE_AUDIO: takion_handle_packet_av(takion, base_type, buf, buf_size); + free(buf); break; default: CHIAKI_LOGW(takion->log, "Takion packet with unknown type %#x received\n", base_type); + free(buf); //chiaki_log_hexdump(takion->log, CHIAKI_LOG_WARNING, buf, buf_size); break; } @@ -593,7 +628,10 @@ static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, siz TakionMessage msg; ChiakiErrorCode err = takion_parse_message(takion, buf+1, buf_size-1, &msg); if(err != CHIAKI_ERR_SUCCESS) + { + free(buf); return; + } //CHIAKI_LOGD(takion->log, "Takion received message with tag %#x, key pos %#x, type (%#x, %#x), payload size %#x, payload:\n", msg.tag, msg.key_pos, msg.type_a, msg.type_b, msg.payload_size); //chiaki_log_hexdump(takion->log, CHIAKI_LOG_DEBUG, buf, buf_size); @@ -605,13 +643,51 @@ static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, siz break; case TAKION_MESSAGE_TYPE_A_DATA_ACK: takion_handle_packet_message_data_ack(takion, msg.type_b, msg.payload, msg.payload_size); + free(buf); break; default: CHIAKI_LOGW(takion->log, "Takion received message with unknown type_a = %#x\n", msg.type_a); + free(buf); break; } } + +static void takion_flush_data_queue(ChiakiTakion *takion) +{ + while(true) + { + TakionDataPacketEntry *entry; + bool pulled = chiaki_reorder_queue_pull(&takion->data_queue, NULL, (void **)&entry); + if(!pulled) + break; + + if(entry->buf_size < 9) + { + free(entry); + continue; + } + + uint16_t zero_a = *((uint16_t *)(entry->buf + 6)); + uint8_t data_type = entry->buf[8]; // & 0xf + + if(zero_a != 0) + CHIAKI_LOGW(takion->log, "Takion received data with unexpected nonzero %#x at buf+6\n", zero_a); + + if(data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_PROTOBUF && data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_9) + CHIAKI_LOGW(takion->log, "Takion received data with unexpected data type %#x\n", data_type); + else if(takion->cb) + { + ChiakiTakionEvent event = { 0 }; + event.type = CHIAKI_TAKION_EVENT_TYPE_DATA; + event.data.data_type = (ChiakiTakionMessageDataType)data_type; + event.data.buf = entry->buf + 9; + event.data.buf_size = entry->buf_size - 9; + takion->cb(&event, takion->cb_user); + } + } +} + static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size) { if(type_b != 1) @@ -623,27 +699,19 @@ static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type return; } - uint32_t seq_num = ntohl(*((uint32_t *)(buf + 0))); - uint16_t channel = ntohs(*((uint16_t *)(buf + 4))); - uint16_t zero_a = *((uint16_t *)(buf + 6)); - uint8_t data_type = buf[8]; // & 0xf + TakionDataPacketEntry *entry = malloc(sizeof(TakionDataPacketEntry)); + if(!entry) + return; - if(zero_a != 0) - CHIAKI_LOGW(takion->log, "Takion received data with unexpected nonzero %#x at buf+6\n", zero_a); + entry->type_b = type_b; + entry->buf = buf; + entry->buf_size = buf_size; + entry->channel = ntohs(*((uint16_t *)(buf + 4))); + ChiakiSeqNum32 seq_num = ntohl(*((uint32_t *)(buf + 0))); - if(data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_PROTOBUF && data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_9) - CHIAKI_LOGW(takion->log, "Takion received data with unexpected data type %#x\n", data_type); - else if(takion->cb) - { - ChiakiTakionEvent event = { 0 }; - event.type = CHIAKI_TAKION_EVENT_TYPE_DATA; - event.data.data_type = (ChiakiTakionMessageDataType)data_type; - event.data.buf = buf + 9; - event.data.buf_size = buf_size - 9; - takion->cb(&event, takion->cb_user); - } - - chiaki_takion_send_message_data_ack(takion, 0, channel, seq_num); + chiaki_takion_send_message_data_ack(takion, 0, entry->channel, seq_num); + chiaki_reorder_queue_push(&takion->data_queue, seq_num, entry); + takion_flush_data_queue(takion); } static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size) diff --git a/lib/src/thread.c b/lib/src/thread.c index 38594cb..b727d08 100644 --- a/lib/src/thread.c +++ b/lib/src/thread.c @@ -164,7 +164,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_wait_pred(ChiakiCond *cond, ChiakiMute return err; } return CHIAKI_ERR_SUCCESS; - } CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_timedwait_pred(ChiakiCond *cond, ChiakiMutex *mutex, uint64_t timeout_ms, ChiakiCheckPred check_pred, void *check_pred_user)