Add ChiakiReorderQueue

This commit is contained in:
Florian Märkl 2019-06-27 11:49:55 +02:00
commit 632cf6cf91
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
6 changed files with 490 additions and 3 deletions

View file

@ -24,7 +24,8 @@ set(HEADER_FILES
include/chiaki/seqnum.h
include/chiaki/discovery.h
include/chiaki/congestioncontrol.h
include/chiaki/stoppipe.h)
include/chiaki/stoppipe.h
include/chiaki/reorderqueue.h)
set(SOURCE_FILES
src/common.c
@ -51,7 +52,8 @@ set(SOURCE_FILES
src/frameprocessor.c
src/discovery.c
src/congestioncontrol.c
src/stoppipe.c)
src/stoppipe.c
src/reorderqueue.c)
add_subdirectory(protobuf)
include_directories("${NANOPB_SOURCE_DIR}")

View file

@ -0,0 +1,128 @@
/*
* This file is part of Chiaki.
*
* Chiaki is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chiaki is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chiaki. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef CHIAKI_REORDERQUEUE_H
#define CHIAKI_REORDERQUEUE_H
#include <stdlib.h>
#include "seqnum.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum chiaki_reorder_queue_drop_strategy_t {
CHIAKI_REORDER_QUEUE_DROP_STRATEGY_BEGIN, // drop packet with lowest number
CHIAKI_REORDER_QUEUE_DROP_STRATEGY_END // drop packet with highest number
} ChiakiReorderQueueDropStrategy;
typedef struct chiaki_reorder_queue_entry_t
{
void *user;
bool set;
} ChiakiReorderQueueEntry;
typedef void (*ChiakiReorderQueueDropCb)(uint64_t seq_num, void *elem_user, void *cb_user);
typedef bool (*ChiakiReorderQueueSeqNumGt)(uint64_t a, uint64_t b);
typedef bool (*ChiakiReorderQueueSeqNumLt)(uint64_t a, uint64_t b);
typedef uint64_t (*ChiakiReorderQueueSeqNumAdd)(uint64_t a, uint64_t b);
typedef uint64_t (*ChiakiReorderQueueSeqNumSub)(uint64_t a, uint64_t b);
typedef struct chiaki_reorder_queue_t
{
size_t size_exp; // real size = 2^size * sizeof(ChiakiReorderQueueEntry)
ChiakiReorderQueueEntry *queue;
uint64_t begin;
uint64_t count;
ChiakiReorderQueueSeqNumGt seq_num_gt;
ChiakiReorderQueueSeqNumLt seq_num_lt;
ChiakiReorderQueueSeqNumAdd seq_num_add;
ChiakiReorderQueueSeqNumSub seq_num_sub;
ChiakiReorderQueueDropStrategy drop_strategy;
ChiakiReorderQueueDropCb drop_cb;
void *drop_cb_user;
} ChiakiReorderQueue;
/**
* @param size exponent for 2
* @param seq_num_start sequence number of the first expected element
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init(ChiakiReorderQueue *queue, size_t size_exp,
uint64_t seq_num_start, ChiakiReorderQueueSeqNumGt seq_num_gt, ChiakiReorderQueueSeqNumLt seq_num_lt, ChiakiReorderQueueSeqNumAdd seq_num_add);
/**
* Helper to initialize a queue using ChiakiSeqNum16 sequence numbers
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init_16(ChiakiReorderQueue *queue, size_t size_exp, ChiakiSeqNum16 seq_num_start);
/**
* Helper to initialize a queue using ChiakiSeqNum32 sequence numbers
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init_32(ChiakiReorderQueue *queue, size_t size_exp, ChiakiSeqNum32 seq_num_start);
CHIAKI_EXPORT void chiaki_reorder_queue_fini(ChiakiReorderQueue *queue);
static inline void chiaki_reorder_queue_set_drop_strategy(ChiakiReorderQueue *queue, ChiakiReorderQueueDropStrategy drop_strategy)
{
queue->drop_strategy = drop_strategy;
}
static inline void chiaki_reorder_queue_set_drop_cb(ChiakiReorderQueue *queue, ChiakiReorderQueueDropCb cb, void *user)
{
queue->drop_cb = cb;
queue->drop_cb_user = user;
}
static inline size_t chiaki_reorder_queue_size(ChiakiReorderQueue *queue)
{
return 1 << queue->size_exp;
}
static inline uint64_t chiaki_reorder_queue_count(ChiakiReorderQueue *queue)
{
return queue->count;
}
/**
* Push a packet into the queue.
*
* Depending on the set drop strategy, this might drop elements and call the drop callback with the dropped elements.
* The callback will also be called with the new element íf there is already an element with the same sequence number
* or if the sequence number is less than queue->begin, i.e. the next element to be pulled.
*
* @param seq_num
* @param user pointer to be associated with the element
*/
CHIAKI_EXPORT void chiaki_reorder_queue_push(ChiakiReorderQueue *queue, uint64_t seq_num, void *user);
/**
* Pull the next element in order from the queue.
*
* Call this repeatedly until it returns false to get all subsequently available elements.
*
* @param seq_num pointer where the sequence number of the pulled packet is written, undefined contents if false is returned
* @param user pointer where the user pointer of the pulled packet is written, undefined contents if false is returned
* @return true if a packet was pulled in order
*/
CHIAKI_EXPORT bool chiaki_reorder_queue_pull(ChiakiReorderQueue *queue, uint64_t *seq_num, void **user);
#ifdef __cplusplus
}
#endif
#endif // CHIAKI_REORDERQUEUE_H

148
lib/src/reorderqueue.c Normal file
View file

@ -0,0 +1,148 @@
/*
* This file is part of Chiaki.
*
* Chiaki is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chiaki is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chiaki. If not, see <https://www.gnu.org/licenses/>.
*/
#include <chiaki/reorderqueue.h>
#include <assert.h>
#define gt(a, b) (queue->seq_num_gt((a), (b)))
#define lt(a, b) (queue->seq_num_lt((a), (b)))
#define ge(a, b) ((a) == (b) || gt((a), (b)))
#define le(a, b) ((a) == (b) || lt((a), (b)))
#define add(a, b) (queue->seq_num_add((a), (b)))
#define QUEUE_SIZE (1 << queue->size_exp)
#define IDX_MASK ((1 << queue->size_exp) - 1)
#define idx(seq_num) ((seq_num) & IDX_MASK)
CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init(ChiakiReorderQueue *queue, size_t size_exp,
uint64_t seq_num_start, ChiakiReorderQueueSeqNumGt seq_num_gt, ChiakiReorderQueueSeqNumLt seq_num_lt, ChiakiReorderQueueSeqNumAdd seq_num_add)
{
queue->size_exp = size_exp;
queue->begin = seq_num_start;
queue->count = 0;
queue->seq_num_gt = seq_num_gt;
queue->seq_num_lt = seq_num_lt;
queue->seq_num_add = seq_num_add;
queue->drop_strategy = CHIAKI_REORDER_QUEUE_DROP_STRATEGY_END;
queue->drop_cb = NULL;
queue->drop_cb_user = NULL;
queue->queue = calloc(1 << size_exp, sizeof(ChiakiReorderQueueEntry));
if(!queue->queue)
return CHIAKI_ERR_MEMORY;
return CHIAKI_ERR_SUCCESS;
}
#define REORDER_QUEUE_INIT(bits) \
static bool seq_num_##bits##_gt(uint64_t a, uint64_t b) { return chiaki_seq_num_##bits##_gt((ChiakiSeqNum##bits)a, (ChiakiSeqNum##bits)b); } \
static bool seq_num_##bits##_lt(uint64_t a, uint64_t b) { return chiaki_seq_num_##bits##_lt((ChiakiSeqNum##bits)a, (ChiakiSeqNum##bits)b); } \
static uint64_t seq_num_##bits##_add(uint64_t a, uint64_t b) { return (uint64_t)((ChiakiSeqNum##bits)a + (ChiakiSeqNum##bits)b); } \
\
CHIAKI_EXPORT ChiakiErrorCode chiaki_reorder_queue_init_##bits(ChiakiReorderQueue *queue, size_t size_exp, ChiakiSeqNum##bits seq_num_start) \
{ \
return chiaki_reorder_queue_init(queue, size_exp, (uint64_t)seq_num_start, \
seq_num_##bits##_gt, seq_num_##bits##_lt, seq_num_##bits##_add); \
}
REORDER_QUEUE_INIT(16)
REORDER_QUEUE_INIT(32)
CHIAKI_EXPORT void chiaki_reorder_queue_fini(ChiakiReorderQueue *queue)
{
free(queue->queue);
}
CHIAKI_EXPORT void chiaki_reorder_queue_push(ChiakiReorderQueue *queue, uint64_t seq_num, void *user)
{
assert(queue->count <= QUEUE_SIZE);
uint64_t end = add(queue->begin, queue->count);
if(ge(seq_num, queue->begin) && lt(seq_num, end))
{
ChiakiReorderQueueEntry *entry = &queue->queue[idx(seq_num)];
if(entry->set) // received twice
goto drop_it;
entry->user = user;
entry->set = true;
return;
}
if(lt(seq_num, queue->begin))
goto drop_it;
// => ge(seq_num, queue->end) == 1
assert(ge(seq_num, end));
uint64_t free_elems = QUEUE_SIZE - queue->count;
uint64_t total_end = add(end, free_elems);
uint64_t new_end = add(seq_num, 1);
if(lt(total_end, new_end))
{
if(queue->drop_strategy == CHIAKI_REORDER_QUEUE_DROP_STRATEGY_END)
goto drop_it;
// drop first until empty or enough space
while(queue->count > 0 && lt(total_end, new_end))
{
ChiakiReorderQueueEntry *entry = &queue->queue[idx(queue->begin)];
if(entry->set)
queue->drop_cb(queue->begin, entry->user, queue->drop_cb_user);
queue->begin = add(queue->begin, 1);
queue->count--;
free_elems = QUEUE_SIZE - queue->count;
total_end = add(end, free_elems);
}
// empty, just shift to the seq_num
if(queue->count == 0)
queue->begin = seq_num;
}
// move end until new_end
end = add(queue->begin, queue->count);
while(lt(end, new_end))
{
queue->count++;
queue->queue[idx(end)].set = false;
end = add(queue->begin, queue->count);
assert(queue->count <= QUEUE_SIZE);
}
ChiakiReorderQueueEntry *entry = &queue->queue[idx(seq_num)];
entry->set = true;
entry->user = user;
return;
drop_it:
queue->drop_cb(seq_num, user, queue->drop_cb_user);
}
CHIAKI_EXPORT bool chiaki_reorder_queue_pull(ChiakiReorderQueue *queue, uint64_t *seq_num, void **user)
{
assert(queue->count <= QUEUE_SIZE);
if(queue->count == 0)
return false;
ChiakiReorderQueueEntry *entry = &queue->queue[idx(queue->begin)];
if(!entry->set)
return false;
*seq_num = queue->begin;
*user = entry->user;
queue->begin = add(queue->begin, 1);
queue->count--;
return true;
}