diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index d27d9dc..599c2d2 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -24,7 +24,8 @@ set(HEADER_FILES include/chiaki/seqnum.h include/chiaki/discovery.h include/chiaki/congestioncontrol.h - include/chiaki/stoppipe.h) + include/chiaki/stoppipe.h + include/chiaki/reorderqueue.h) set(SOURCE_FILES src/common.c @@ -51,7 +52,8 @@ set(SOURCE_FILES src/frameprocessor.c src/discovery.c src/congestioncontrol.c - src/stoppipe.c) + src/stoppipe.c + src/reorderqueue.c) add_subdirectory(protobuf) include_directories("${NANOPB_SOURCE_DIR}") diff --git a/lib/include/chiaki/reorderqueue.h b/lib/include/chiaki/reorderqueue.h new file mode 100644 index 0000000..8e7b480 --- /dev/null +++ b/lib/include/chiaki/reorderqueue.h @@ -0,0 +1,128 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#ifndef CHIAKI_REORDERQUEUE_H +#define CHIAKI_REORDERQUEUE_H + +#include + +#include "seqnum.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum chiaki_reorder_queue_drop_strategy_t { + CHIAKI_REORDER_QUEUE_DROP_STRATEGY_BEGIN, // drop packet with lowest number + CHIAKI_REORDER_QUEUE_DROP_STRATEGY_END // drop packet with highest number +} ChiakiReorderQueueDropStrategy; + +typedef struct chiaki_reorder_queue_entry_t +{ + void *user; + bool set; +} ChiakiReorderQueueEntry; + +typedef void (*ChiakiReorderQueueDropCb)(uint64_t seq_num, void *elem_user, void *cb_user); +typedef bool (*ChiakiReorderQueueSeqNumGt)(uint64_t a, uint64_t b); +typedef bool (*ChiakiReorderQueueSeqNumLt)(uint64_t a, uint64_t b); +typedef uint64_t (*ChiakiReorderQueueSeqNumAdd)(uint64_t a, uint64_t b); +typedef uint64_t (*ChiakiReorderQueueSeqNumSub)(uint64_t a, uint64_t b); + +typedef struct chiaki_reorder_queue_t +{ + size_t size_exp; // real size = 2^size * sizeof(ChiakiReorderQueueEntry) + ChiakiReorderQueueEntry *queue; + uint64_t begin; + uint64_t count; + ChiakiReorderQueueSeqNumGt seq_num_gt; + ChiakiReorderQueueSeqNumLt seq_num_lt; + ChiakiReorderQueueSeqNumAdd seq_num_add; + ChiakiReorderQueueSeqNumSub seq_num_sub; + ChiakiReorderQueueDropStrategy drop_strategy; + ChiakiReorderQueueDropCb drop_cb; + void *drop_cb_user; +} ChiakiReorderQueue; + +/** + * @param size exponent for 2 + * @param seq_num_start sequence number of the first expected element + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init(ChiakiReorderQueue *queue, size_t size_exp, + uint64_t seq_num_start, ChiakiReorderQueueSeqNumGt seq_num_gt, ChiakiReorderQueueSeqNumLt seq_num_lt, ChiakiReorderQueueSeqNumAdd seq_num_add); + +/** + * Helper to initialize a queue using ChiakiSeqNum16 sequence numbers + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init_16(ChiakiReorderQueue *queue, size_t size_exp, ChiakiSeqNum16 seq_num_start); + +/** + * Helper to initialize a queue using ChiakiSeqNum32 sequence numbers + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init_32(ChiakiReorderQueue *queue, size_t size_exp, ChiakiSeqNum32 seq_num_start); + +CHIAKI_EXPORT void chiaki_reorder_queue_fini(ChiakiReorderQueue *queue); + +static inline void chiaki_reorder_queue_set_drop_strategy(ChiakiReorderQueue *queue, ChiakiReorderQueueDropStrategy drop_strategy) +{ + queue->drop_strategy = drop_strategy; +} + +static inline void chiaki_reorder_queue_set_drop_cb(ChiakiReorderQueue *queue, ChiakiReorderQueueDropCb cb, void *user) +{ + queue->drop_cb = cb; + queue->drop_cb_user = user; +} + +static inline size_t chiaki_reorder_queue_size(ChiakiReorderQueue *queue) +{ + return 1 << queue->size_exp; +} + +static inline uint64_t chiaki_reorder_queue_count(ChiakiReorderQueue *queue) +{ + return queue->count; +} + +/** + * Push a packet into the queue. + * + * Depending on the set drop strategy, this might drop elements and call the drop callback with the dropped elements. + * The callback will also be called with the new element íf there is already an element with the same sequence number + * or if the sequence number is less than queue->begin, i.e. the next element to be pulled. + * + * @param seq_num + * @param user pointer to be associated with the element + */ +CHIAKI_EXPORT void chiaki_reorder_queue_push(ChiakiReorderQueue *queue, uint64_t seq_num, void *user); + +/** + * Pull the next element in order from the queue. + * + * Call this repeatedly until it returns false to get all subsequently available elements. + * + * @param seq_num pointer where the sequence number of the pulled packet is written, undefined contents if false is returned + * @param user pointer where the user pointer of the pulled packet is written, undefined contents if false is returned + * @return true if a packet was pulled in order + */ +CHIAKI_EXPORT bool chiaki_reorder_queue_pull(ChiakiReorderQueue *queue, uint64_t *seq_num, void **user); + +#ifdef __cplusplus +} +#endif + +#endif // CHIAKI_REORDERQUEUE_H diff --git a/lib/src/reorderqueue.c b/lib/src/reorderqueue.c new file mode 100644 index 0000000..ccae4a1 --- /dev/null +++ b/lib/src/reorderqueue.c @@ -0,0 +1,148 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#include + +#include + +#define gt(a, b) (queue->seq_num_gt((a), (b))) +#define lt(a, b) (queue->seq_num_lt((a), (b))) +#define ge(a, b) ((a) == (b) || gt((a), (b))) +#define le(a, b) ((a) == (b) || lt((a), (b))) +#define add(a, b) (queue->seq_num_add((a), (b))) +#define QUEUE_SIZE (1 << queue->size_exp) +#define IDX_MASK ((1 << queue->size_exp) - 1) +#define idx(seq_num) ((seq_num) & IDX_MASK) + +CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init(ChiakiReorderQueue *queue, size_t size_exp, + uint64_t seq_num_start, ChiakiReorderQueueSeqNumGt seq_num_gt, ChiakiReorderQueueSeqNumLt seq_num_lt, ChiakiReorderQueueSeqNumAdd seq_num_add) +{ + queue->size_exp = size_exp; + queue->begin = seq_num_start; + queue->count = 0; + queue->seq_num_gt = seq_num_gt; + queue->seq_num_lt = seq_num_lt; + queue->seq_num_add = seq_num_add; + queue->drop_strategy = CHIAKI_REORDER_QUEUE_DROP_STRATEGY_END; + queue->drop_cb = NULL; + queue->drop_cb_user = NULL; + queue->queue = calloc(1 << size_exp, sizeof(ChiakiReorderQueueEntry)); + if(!queue->queue) + return CHIAKI_ERR_MEMORY; + return CHIAKI_ERR_SUCCESS; +} + +#define REORDER_QUEUE_INIT(bits) \ +static bool seq_num_##bits##_gt(uint64_t a, uint64_t b) { return chiaki_seq_num_##bits##_gt((ChiakiSeqNum##bits)a, (ChiakiSeqNum##bits)b); } \ +static bool seq_num_##bits##_lt(uint64_t a, uint64_t b) { return chiaki_seq_num_##bits##_lt((ChiakiSeqNum##bits)a, (ChiakiSeqNum##bits)b); } \ +static uint64_t seq_num_##bits##_add(uint64_t a, uint64_t b) { return (uint64_t)((ChiakiSeqNum##bits)a + (ChiakiSeqNum##bits)b); } \ +\ +CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init_##bits(ChiakiReorderQueue *queue, size_t size_exp, ChiakiSeqNum##bits seq_num_start) \ +{ \ + return chiaki_reorder_queue_init(queue, size_exp, (uint64_t)seq_num_start, \ + seq_num_##bits##_gt, seq_num_##bits##_lt, seq_num_##bits##_add); \ +} + +REORDER_QUEUE_INIT(16) +REORDER_QUEUE_INIT(32) + +CHIAKI_EXPORT void chiaki_reorder_queue_fini(ChiakiReorderQueue *queue) +{ + free(queue->queue); +} + +CHIAKI_EXPORT void chiaki_reorder_queue_push(ChiakiReorderQueue *queue, uint64_t seq_num, void *user) +{ + assert(queue->count <= QUEUE_SIZE); + uint64_t end = add(queue->begin, queue->count); + + if(ge(seq_num, queue->begin) && lt(seq_num, end)) + { + ChiakiReorderQueueEntry *entry = &queue->queue[idx(seq_num)]; + if(entry->set) // received twice + goto drop_it; + entry->user = user; + entry->set = true; + return; + } + + if(lt(seq_num, queue->begin)) + goto drop_it; + + // => ge(seq_num, queue->end) == 1 + assert(ge(seq_num, end)); + + uint64_t free_elems = QUEUE_SIZE - queue->count; + uint64_t total_end = add(end, free_elems); + uint64_t new_end = add(seq_num, 1); + if(lt(total_end, new_end)) + { + if(queue->drop_strategy == CHIAKI_REORDER_QUEUE_DROP_STRATEGY_END) + goto drop_it; + + // drop first until empty or enough space + while(queue->count > 0 && lt(total_end, new_end)) + { + ChiakiReorderQueueEntry *entry = &queue->queue[idx(queue->begin)]; + if(entry->set) + queue->drop_cb(queue->begin, entry->user, queue->drop_cb_user); + queue->begin = add(queue->begin, 1); + queue->count--; + free_elems = QUEUE_SIZE - queue->count; + total_end = add(end, free_elems); + } + + // empty, just shift to the seq_num + if(queue->count == 0) + queue->begin = seq_num; + } + + // move end until new_end + end = add(queue->begin, queue->count); + while(lt(end, new_end)) + { + queue->count++; + queue->queue[idx(end)].set = false; + end = add(queue->begin, queue->count); + assert(queue->count <= QUEUE_SIZE); + } + + ChiakiReorderQueueEntry *entry = &queue->queue[idx(seq_num)]; + entry->set = true; + entry->user = user; + + return; +drop_it: + queue->drop_cb(seq_num, user, queue->drop_cb_user); +} + +CHIAKI_EXPORT bool chiaki_reorder_queue_pull(ChiakiReorderQueue *queue, uint64_t *seq_num, void **user) +{ + assert(queue->count <= QUEUE_SIZE); + if(queue->count == 0) + return false; + + ChiakiReorderQueueEntry *entry = &queue->queue[idx(queue->begin)]; + if(!entry->set) + return false; + + *seq_num = queue->begin; + *user = entry->user; + queue->begin = add(queue->begin, 1); + queue->count--; + return true; +} \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c843621..7a4e373 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -8,7 +8,8 @@ add_executable(chiaki-unit rpcrypt.c gkcrypt.c takion.c - seqnum.c) + seqnum.c + reorderqueue.c) target_link_libraries(chiaki-unit chiaki-lib munit) diff --git a/test/main.c b/test/main.c index 94e23ec..a8b210c 100644 --- a/test/main.c +++ b/test/main.c @@ -18,6 +18,7 @@ #include extern MunitTest tests_seq_num[]; +extern MunitTest tests_reorder_queue[]; extern MunitTest tests_http[]; extern MunitTest tests_rpcrypt[]; extern MunitTest tests_gkcrypt[]; @@ -31,6 +32,13 @@ static MunitSuite suites[] = { 1, MUNIT_SUITE_OPTION_NONE }, + { + "/reorder_queue", + tests_reorder_queue, + NULL, + 1, + MUNIT_SUITE_OPTION_NONE + }, { "/http", tests_http, diff --git a/test/reorderqueue.c b/test/reorderqueue.c new file mode 100644 index 0000000..e2485fb --- /dev/null +++ b/test/reorderqueue.c @@ -0,0 +1,200 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#include + +#include + +#define DROP_RECORD_MAX 16 + +typedef struct drop_record_t +{ + uint64_t count[DROP_RECORD_MAX]; + uint64_t seq_num[DROP_RECORD_MAX]; + bool failed; +} DropRecord; + +static void drop(uint64_t seq_num, void *elem_user, void *cb_user) +{ + DropRecord *record = cb_user; + uint64_t v = (uint64_t)elem_user; + if(v > DROP_RECORD_MAX) + { + record->failed = true; + return; + } + record->count[v]++; + record->seq_num[v] = seq_num; +} + +static MunitResult test_reorder_queue_16(const MunitParameter params[], void *test_user) +{ + ChiakiReorderQueue queue; + ChiakiErrorCode err = chiaki_reorder_queue_init_16(&queue, 2, 42); + munit_assert_int(err, ==, CHIAKI_ERR_SUCCESS); + munit_assert_size(chiaki_reorder_queue_size(&queue), ==, 4); + munit_assert_uint64(chiaki_reorder_queue_count(&queue), ==, 0); + + chiaki_reorder_queue_set_drop_strategy(&queue, CHIAKI_REORDER_QUEUE_DROP_STRATEGY_END); + + DropRecord drop_record = { 0 }; + chiaki_reorder_queue_set_drop_cb(&queue, drop, &drop_record); + + uint64_t seq_num = 0; + void *user = NULL; + + // pull from empty + bool pulled = chiaki_reorder_queue_pull(&queue, &seq_num, &user); + munit_assert(!pulled); + munit_assert_uint64(chiaki_reorder_queue_count(&queue), ==, 0); + munit_assert(!drop_record.failed); + + // push one + chiaki_reorder_queue_push(&queue, 42, (void *)0); + munit_assert_uint64(chiaki_reorder_queue_count(&queue), ==, 1); + + // pull one + pulled = chiaki_reorder_queue_pull(&queue, &seq_num, &user); + munit_assert(pulled); + munit_assert_uint64(chiaki_reorder_queue_count(&queue), ==, 0); + munit_assert(!drop_record.failed); + munit_assert_uint64(drop_record.count[0], ==, 0); + munit_assert_uint64((uint64_t)user, ==, 0); + munit_assert_uint64(seq_num, ==, 42); + + // push outdated + chiaki_reorder_queue_push(&queue, 42, (void *)0); + munit_assert_uint64(chiaki_reorder_queue_count(&queue), ==, 0); + munit_assert(!drop_record.failed); + munit_assert_uint64(drop_record.count[0], ==, 1); + munit_assert_uint64(drop_record.seq_num[0], ==, 42); + memset(&drop_record, 0, sizeof(drop_record)); + + // push until full out of order and try to pull in between + chiaki_reorder_queue_push(&queue, 46, (void *)1); + pulled = chiaki_reorder_queue_pull(&queue, &seq_num, &user); + munit_assert(!pulled); + chiaki_reorder_queue_push(&queue, 45, (void *)2); + pulled = chiaki_reorder_queue_pull(&queue, &seq_num, &user); + munit_assert(!pulled); + chiaki_reorder_queue_push(&queue, 44, (void *)3); + pulled = chiaki_reorder_queue_pull(&queue, &seq_num, &user); + munit_assert(!pulled); + chiaki_reorder_queue_push(&queue, 43, (void *)4); + munit_assert(!drop_record.failed); + for(size_t i=0; i