Use ChiakiReorderQueue for Data in Takion

This commit is contained in:
Florian Märkl 2019-06-27 17:06:49 +02:00
commit 74f2592a08
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
4 changed files with 107 additions and 25 deletions

View file

@ -24,6 +24,7 @@
#include "gkcrypt.h"
#include "seqnum.h"
#include "stoppipe.h"
#include "reorderqueue.h"
#include <netinet/in.h>
#include <stdbool.h>
@ -125,6 +126,8 @@ typedef struct chiaki_takion_t
ChiakiGKCrypt *gkcrypt_remote; // if NULL (default), remote gmacs are IGNORED (!) and everything is expected to be unencrypted
ChiakiReorderQueue data_queue;
ChiakiTakionCallback cb;
void *cb_user;
int sock;

View file

@ -62,6 +62,16 @@ REORDER_QUEUE_INIT(32)
CHIAKI_EXPORT void chiaki_reorder_queue_fini(ChiakiReorderQueue *queue)
{
if(queue->drop_cb)
{
for(uint64_t i=0; i<queue->count; i++)
{
uint64_t seq_num = add(queue->begin, i);
ChiakiReorderQueueEntry *entry = &queue->queue[idx(seq_num)];
if(entry->set)
queue->drop_cb(seq_num, entry->user, queue->drop_cb_user);
}
}
free(queue->queue);
}
@ -140,8 +150,10 @@ CHIAKI_EXPORT bool chiaki_reorder_queue_pull(ChiakiReorderQueue *queue, uint64_t
if(!entry->set)
return false;
*seq_num = queue->begin;
*user = entry->user;
if(seq_num)
*seq_num = queue->begin;
if(user)
*user = entry->user;
queue->begin = add(queue->begin, 1);
queue->count--;
return true;

View file

@ -84,6 +84,8 @@ ssize_t takion_packet_type_key_pos_offset(TakionPacketType type)
#define TAKION_LOCAL_MIN 0x64
#define TAKION_LOCAL_MAX 0x64
#define TAKION_REORDER_QUEUE_SIZE_EXP 4 // => 16 entries
#define MESSAGE_HEADER_SIZE 0x10
typedef enum takion_message_type_a_t {
@ -130,6 +132,14 @@ typedef struct takion_message_payload_init_ack_t
uint8_t cookie[TAKION_COOKIE_SIZE];
} TakionMessagePayloadInitAck;
typedef struct
{
uint8_t type_b;
uint8_t *buf;
size_t buf_size;
uint16_t channel;
} TakionDataPacketEntry;
static void *takion_thread_func(void *user);
static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_size);
@ -472,6 +482,11 @@ static ChiakiErrorCode takion_handshake(ChiakiTakion *takion)
return CHIAKI_ERR_SUCCESS;
}
static void takion_data_drop(uint64_t seq_num, void *elem_user, void *cb_user)
{
free(elem_user);
}
static void *takion_thread_func(void *user)
{
ChiakiTakion *takion = user;
@ -479,22 +494,37 @@ static void *takion_thread_func(void *user)
if(takion_handshake(takion) != CHIAKI_ERR_SUCCESS)
goto beach;
if(chiaki_reorder_queue_init_32(&takion->data_queue, TAKION_REORDER_QUEUE_SIZE_EXP, takion->tag_remote) != CHIAKI_ERR_SUCCESS)
goto beach;
chiaki_reorder_queue_set_drop_cb(&takion->data_queue, takion_data_drop, NULL);
// TODO ChiakiCongestionControl congestion_control;
// if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS)
// goto beach;
while(true)
{
uint8_t buf[1500];
size_t received_size = sizeof(buf);
size_t received_size = 1500;
uint8_t *buf = malloc(received_size); // TODO: no malloc?
if(!buf)
break;
ChiakiErrorCode err = takion_recv(takion, buf, &received_size, NULL);
if(err != CHIAKI_ERR_SUCCESS)
break;
takion_handle_packet(takion, buf, received_size);
uint8_t *resized_buf = realloc(buf, received_size);
if(!resized_buf)
{
free(buf);
continue;
}
takion_handle_packet(takion, resized_buf, received_size);
}
// chiaki_congestion_control_stop(&congestion_control);
chiaki_reorder_queue_fini(&takion->data_queue);
beach:
if(takion->cb)
{
@ -569,7 +599,10 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_
uint8_t base_type = (uint8_t)(buf[0] & 0xf);
if(takion_handle_packet_mac(takion, base_type, buf, buf_size) != CHIAKI_ERR_SUCCESS)
{
free(buf);
return;
}
switch(base_type)
{
@ -579,9 +612,11 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_
case TAKION_PACKET_TYPE_VIDEO:
case TAKION_PACKET_TYPE_AUDIO:
takion_handle_packet_av(takion, base_type, buf, buf_size);
free(buf);
break;
default:
CHIAKI_LOGW(takion->log, "Takion packet with unknown type %#x received\n", base_type);
free(buf);
//chiaki_log_hexdump(takion->log, CHIAKI_LOG_WARNING, buf, buf_size);
break;
}
@ -593,7 +628,10 @@ static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, siz
TakionMessage msg;
ChiakiErrorCode err = takion_parse_message(takion, buf+1, buf_size-1, &msg);
if(err != CHIAKI_ERR_SUCCESS)
{
free(buf);
return;
}
//CHIAKI_LOGD(takion->log, "Takion received message with tag %#x, key pos %#x, type (%#x, %#x), payload size %#x, payload:\n", msg.tag, msg.key_pos, msg.type_a, msg.type_b, msg.payload_size);
//chiaki_log_hexdump(takion->log, CHIAKI_LOG_DEBUG, buf, buf_size);
@ -605,13 +643,51 @@ static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, siz
break;
case TAKION_MESSAGE_TYPE_A_DATA_ACK:
takion_handle_packet_message_data_ack(takion, msg.type_b, msg.payload, msg.payload_size);
free(buf);
break;
default:
CHIAKI_LOGW(takion->log, "Takion received message with unknown type_a = %#x\n", msg.type_a);
free(buf);
break;
}
}
static void takion_flush_data_queue(ChiakiTakion *takion)
{
while(true)
{
TakionDataPacketEntry *entry;
bool pulled = chiaki_reorder_queue_pull(&takion->data_queue, NULL, (void **)&entry);
if(!pulled)
break;
if(entry->buf_size < 9)
{
free(entry);
continue;
}
uint16_t zero_a = *((uint16_t *)(entry->buf + 6));
uint8_t data_type = entry->buf[8]; // & 0xf
if(zero_a != 0)
CHIAKI_LOGW(takion->log, "Takion received data with unexpected nonzero %#x at buf+6\n", zero_a);
if(data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_PROTOBUF && data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_9)
CHIAKI_LOGW(takion->log, "Takion received data with unexpected data type %#x\n", data_type);
else if(takion->cb)
{
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_DATA;
event.data.data_type = (ChiakiTakionMessageDataType)data_type;
event.data.buf = entry->buf + 9;
event.data.buf_size = entry->buf_size - 9;
takion->cb(&event, takion->cb_user);
}
}
}
static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size)
{
if(type_b != 1)
@ -623,27 +699,19 @@ static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type
return;
}
uint32_t seq_num = ntohl(*((uint32_t *)(buf + 0)));
uint16_t channel = ntohs(*((uint16_t *)(buf + 4)));
uint16_t zero_a = *((uint16_t *)(buf + 6));
uint8_t data_type = buf[8]; // & 0xf
TakionDataPacketEntry *entry = malloc(sizeof(TakionDataPacketEntry));
if(!entry)
return;
if(zero_a != 0)
CHIAKI_LOGW(takion->log, "Takion received data with unexpected nonzero %#x at buf+6\n", zero_a);
entry->type_b = type_b;
entry->buf = buf;
entry->buf_size = buf_size;
entry->channel = ntohs(*((uint16_t *)(buf + 4)));
ChiakiSeqNum32 seq_num = ntohl(*((uint32_t *)(buf + 0)));
if(data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_PROTOBUF && data_type != CHIAKI_TAKION_MESSAGE_DATA_TYPE_9)
CHIAKI_LOGW(takion->log, "Takion received data with unexpected data type %#x\n", data_type);
else if(takion->cb)
{
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_DATA;
event.data.data_type = (ChiakiTakionMessageDataType)data_type;
event.data.buf = buf + 9;
event.data.buf_size = buf_size - 9;
takion->cb(&event, takion->cb_user);
}
chiaki_takion_send_message_data_ack(takion, 0, channel, seq_num);
chiaki_takion_send_message_data_ack(takion, 0, entry->channel, seq_num);
chiaki_reorder_queue_push(&takion->data_queue, seq_num, entry);
takion_flush_data_queue(takion);
}
static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size)

View file

@ -164,7 +164,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_wait_pred(ChiakiCond *cond, ChiakiMute
return err;
}
return CHIAKI_ERR_SUCCESS;
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_timedwait_pred(ChiakiCond *cond, ChiakiMutex *mutex, uint64_t timeout_ms, ChiakiCheckPred check_pred, void *check_pred_user)