Add Takion Send Buffer

This commit is contained in:
Florian Märkl 2019-07-07 17:11:54 +02:00
commit 2071ad46bb
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
7 changed files with 421 additions and 49 deletions

View file

@ -29,7 +29,9 @@ set(HEADER_FILES
include/chiaki/discoveryservice.h
include/chiaki/feedback.h
include/chiaki/feedbacksender.h
include/chiaki/controller.h)
include/chiaki/controller.h
include/chiaki/takionsendbuffer.h
include/chiaki/time.h)
set(SOURCE_FILES
src/common.c
@ -61,7 +63,9 @@ set(SOURCE_FILES
src/discoveryservice.c
src/feedback.c
src/feedbacksender.c
src/controller.c)
src/controller.c
src/takionsendbuffer.c
src/time.c)
add_subdirectory(protobuf)
include_directories("${NANOPB_SOURCE_DIR}")

View file

@ -26,6 +26,7 @@
#include "stoppipe.h"
#include "reorderqueue.h"
#include "feedback.h"
#include "takionsendbuffer.h"
#include <netinet/in.h>
#include <stdbool.h>
@ -103,12 +104,10 @@ typedef struct chiaki_takion_connect_info_t
socklen_t sa_len;
ChiakiTakionCallback cb;
void *cb_user;
void *av_cb_user;
bool enable_crypt;
} ChiakiTakionConnectInfo;
typedef struct chiaki_takion_t
{
ChiakiLog *log;
@ -139,6 +138,7 @@ typedef struct chiaki_takion_t
ChiakiGKCrypt *gkcrypt_remote; // if NULL (default), remote gmacs are IGNORED (!) and everything is expected to be unencrypted
ChiakiReorderQueue data_queue;
ChiakiTakionSendBuffer send_buffer;
ChiakiTakionCallback cb;
void *cb_user;
@ -146,10 +146,9 @@ typedef struct chiaki_takion_t
ChiakiThread thread;
ChiakiStopPipe stop_pipe;
struct timeval recv_timeout;
int send_retries;
uint32_t tag_local;
uint32_t tag_remote;
uint32_t seq_num_local;
ChiakiSeqNum32 seq_num_local;
/**
* Advertised Receiver Window Credit
@ -184,8 +183,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_crypt_advance_key_pos(ChiakiTakion *
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, const uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_packet_mac(ChiakiGKCrypt *crypt, uint8_t *buf, size_t buf_size, uint8_t *mac_out, uint8_t *mac_old_out, ChiakiTakionPacketKeyPos *key_pos_out);
/**
* Calculate the MAC for the packet depending on the type derived from the first byte in buf,
* assign MAC inside buf at the respective position and send the packet.

View file

@ -0,0 +1,64 @@
/*
* This file is part of Chiaki.
*
* Chiaki is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chiaki is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chiaki. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef CHIAKI_TAKIONSENDBUFFER_H
#define CHIAKI_TAKIONSENDBUFFER_H
#include "common.h"
#include "log.h"
#include "thread.h"
#include "seqnum.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct chiaki_takion_t ChiakiTakion;
typedef struct chiaki_takion_send_buffer_packet_t ChiakiTakionSendBufferPacket;
typedef struct chiaki_takion_send_buffer_t
{
ChiakiLog *log;
ChiakiTakion *takion;
ChiakiTakionSendBufferPacket *packets;
size_t packets_size; // allocated size
size_t packets_count; // current count
ChiakiMutex mutex;
ChiakiCond cond;
bool should_stop;
ChiakiThread thread;
} ChiakiTakionSendBuffer;
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_init(ChiakiTakionSendBuffer *send_buffer, ChiakiTakion *takion, size_t size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_start(ChiakiTakionSendBuffer *send_buffer);
CHIAKI_EXPORT void chiaki_takion_send_buffer_fini(ChiakiTakionSendBuffer *send_buffer);
/**
* @param buf ownership of this is taken by the ChiakiTakionSendBuffer, which will free it automatically later!
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_push(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num);
#ifdef __cplusplus
}
#endif
#endif // CHIAKI_TAKIONSENDBUFFER_H

35
lib/include/chiaki/time.h Normal file
View file

@ -0,0 +1,35 @@
/*
* This file is part of Chiaki.
*
* Chiaki is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chiaki is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chiaki. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef CHIAKI_TIME_H
#define CHIAKI_TIME_H
#include "common.h"
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms();
#ifdef __cplusplus
}
#endif
#endif // CHIAKI_TIME_H

View file

@ -29,6 +29,19 @@
// VERY similar to SCTP, see RFC 4960
#define TAKION_A_RWND 0x19000
#define TAKION_OUTBOUND_STREAMS 0x64
#define TAKION_INBOUND_STREAMS 0x64
#define TAKION_REORDER_QUEUE_SIZE_EXP 4 // => 16 entries
#define TAKION_SEND_BUFFER_SIZE 16
#define TAKION_POSTPONE_PACKETS_SIZE 32
#define TAKION_MESSAGE_HEADER_SIZE 0x10
#define TAKION_PACKET_BASE_TYPE_MASK 0xf
/**
* Base type of Takion packets. Lower nibble of the first byte in datagrams.
@ -80,18 +93,6 @@ ssize_t takion_packet_type_key_pos_offset(TakionPacketType type)
}
}
#define TAKION_A_RWND 0x19000
#define TAKION_OUTBOUND_STREAMS 0x64
#define TAKION_INBOUND_STREAMS 0x64
#define TAKION_REORDER_QUEUE_SIZE_EXP 4 // => 16 entries
#define TAKION_POSTPONE_PACKETS_SIZE 32
#define MESSAGE_HEADER_SIZE 0x10
#define TAKION_PACKET_BASE_TYPE_MASK 0xf
typedef enum takion_chunk_type_t {
TAKION_CHUNK_TYPE_DATA = 0,
TAKION_CHUNK_TYPE_INIT = 1,
@ -159,7 +160,7 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_
static ChiakiErrorCode takion_handle_packet_mac(ChiakiTakion *takion, uint8_t base_type, uint8_t *buf, size_t buf_size);
static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size);
static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t *packet_buf, size_t packet_buf_size, uint8_t type_b, uint8_t *payload, size_t payload_size);
static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size);
static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t flags, uint8_t *buf, size_t buf_size);
static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg);
static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t chunk_type, uint8_t chunk_flags, size_t payload_data_size);
static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload);
@ -189,7 +190,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki
takion->tag_remote = 0;
takion->recv_timeout.tv_sec = 2;
takion->recv_timeout.tv_usec = 0;
takion->send_retries = 5;
takion->enable_crypt = info->enable_crypt;
takion->postponed_packets = NULL;
@ -287,7 +287,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, const
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_packet_mac(ChiakiGKCrypt *crypt, uint8_t *buf, size_t buf_size, uint8_t *mac_out, uint8_t *mac_old_out, ChiakiTakionPacketKeyPos *key_pos_out)
static ChiakiErrorCode chiaki_takion_packet_mac(ChiakiGKCrypt *crypt, uint8_t *buf, size_t buf_size, uint8_t *mac_out, uint8_t *mac_old_out, ChiakiTakionPacketKeyPos *key_pos_out)
{
if(buf_size < 1)
return CHIAKI_ERR_BUF_TOO_SMALL;
@ -351,7 +351,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki
if(err != CHIAKI_ERR_SUCCESS)
return err;
size_t packet_size = 1 + MESSAGE_HEADER_SIZE + 9 + buf_size;
size_t packet_size = 1 + TAKION_MESSAGE_HEADER_SIZE + 9 + buf_size;
uint8_t *packet_buf = malloc(packet_size);
if(!packet_buf)
return CHIAKI_ERR_MEMORY;
@ -359,22 +359,30 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki
takion_write_message_header(packet_buf + 1, takion->tag_remote, key_pos, TAKION_CHUNK_TYPE_DATA, flags, 9 + buf_size);
uint8_t *msg_payload = packet_buf + 1 + MESSAGE_HEADER_SIZE;
*((uint32_t *)(msg_payload + 0)) = htonl(takion->seq_num_local++);
uint8_t *msg_payload = packet_buf + 1 + TAKION_MESSAGE_HEADER_SIZE;
ChiakiSeqNum32 seq_num = takion->seq_num_local++;
*((uint32_t *)(msg_payload + 0)) = htonl(seq_num);
*((uint16_t *)(msg_payload + 4)) = htons(channel);
*((uint16_t *)(msg_payload + 6)) = 0;
*(msg_payload + 8) = 0;
memcpy(msg_payload + 9, buf, buf_size);
// TODO: instead of just sending and forgetting about it, make sure to receive data ack, resend if necessary, etc.
err = chiaki_takion_send(takion, packet_buf, packet_size);
free(packet_buf);
err = chiaki_takion_send(takion, packet_buf, packet_size); // will alter packet_buf with gmac
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to send data packet: %s\n", chiaki_error_string(err));
free(packet_buf);
return err;
}
chiaki_takion_send_buffer_push(&takion->send_buffer, seq_num, packet_buf, packet_size);
return err;
}
static ChiakiErrorCode chiaki_takion_send_message_data_ack(ChiakiTakion *takion, uint32_t seq_num)
{
uint8_t buf[1 + MESSAGE_HEADER_SIZE + 0xc];
uint8_t buf[1 + TAKION_MESSAGE_HEADER_SIZE + 0xc];
buf[0] = TAKION_PACKET_TYPE_CONTROL;
size_t key_pos;
@ -384,7 +392,7 @@ static ChiakiErrorCode chiaki_takion_send_message_data_ack(ChiakiTakion *takion,
takion_write_message_header(buf + 1, takion->tag_remote, key_pos, TAKION_CHUNK_TYPE_DATA_ACK, 0, 0xc);
uint8_t *data_ack = buf + 1 + MESSAGE_HEADER_SIZE;
uint8_t *data_ack = buf + 1 + TAKION_MESSAGE_HEADER_SIZE;
*((uint32_t *)(data_ack + 0)) = htonl(seq_num);
*((uint32_t *)(data_ack + 4)) = htonl(takion->a_rwnd);
*((uint16_t *)(data_ack + 8)) = 0;
@ -559,20 +567,13 @@ static ChiakiErrorCode takion_handshake(ChiakiTakion *takion, uint32_t *seq_num_
CHIAKI_LOGI(takion->log, "Takion connected\n");
if(takion->cb)
{
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_CONNECTED;
takion->cb(&event, takion->cb_user);
}
return CHIAKI_ERR_SUCCESS;
}
static void takion_data_drop(uint64_t seq_num, void *elem_user, void *cb_user)
{
ChiakiTakion *takion = cb_user;
CHIAKI_LOGE(takion->log, "Takion dropping data with seq num %llx\n", (unsigned long long)seq_num);
CHIAKI_LOGE(takion->log, "Takion dropping data with seq num %#llx\n", (unsigned long long)seq_num);
TakionDataPacketEntry *entry = elem_user;
free(entry->packet_buf);
free(entry);
@ -591,10 +592,20 @@ static void *takion_thread_func(void *user)
chiaki_reorder_queue_set_drop_cb(&takion->data_queue, takion_data_drop, takion);
if(chiaki_takion_send_buffer_init(&takion->send_buffer, takion, TAKION_SEND_BUFFER_SIZE) != CHIAKI_ERR_SUCCESS)
goto error_reoder_queue;
// TODO ChiakiCongestionControl congestion_control;
// if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS)
// goto beach;
if(takion->cb)
{
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_CONNECTED;
takion->cb(&event, takion->cb_user);
}
bool crypt_available = takion->gkcrypt_remote ? true : false;
while(true)
@ -656,6 +667,9 @@ static void *takion_thread_func(void *user)
// chiaki_congestion_control_stop(&congestion_control);
chiaki_takion_send_buffer_fini(&takion->send_buffer);
error_reoder_queue:
chiaki_reorder_queue_fini(&takion->data_queue);
beach:
@ -879,7 +893,7 @@ static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t *pac
takion_flush_data_queue(takion);
}
static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size)
static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t flags, uint8_t *buf, size_t buf_size)
{
if(buf_size != 0xc)
{
@ -888,7 +902,7 @@ static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t
}
uint32_t seq_num = ntohl(*((uint32_t *)(buf + 0)));
uint32_t something = ntohl(*((uint32_t *)(buf + 4)));
uint32_t a_rwnd = ntohl(*((uint32_t *)(buf + 4)));
uint16_t size_internal = ntohs(*((uint16_t *)(buf + 8)));
uint16_t zero = ntohs(*((uint16_t *)(buf + 0xa)));
@ -903,8 +917,8 @@ static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t
if(zero != 0)
CHIAKI_LOGW(takion->log, "Takion received data ack with nonzero %#x at buf+0xa\n", zero);
// TODO: check seq_num, etc.
//CHIAKI_LOGD(takion->log, "Takion received data ack with seq_num = %#x, something = %#x, size_or_something = %#x, zero = %#x\n", seq_num, something, size_internal, zero);
chiaki_takion_send_buffer_ack(&takion->send_buffer, seq_num);
}
/**
@ -926,7 +940,7 @@ static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key
static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg)
{
if(buf_size < MESSAGE_HEADER_SIZE)
if(buf_size < TAKION_MESSAGE_HEADER_SIZE)
{
CHIAKI_LOGE(takion->log, "Takion message received that is too short\n");
return CHIAKI_ERR_INVALID_DATA;
@ -963,11 +977,11 @@ static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf,
static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload)
{
uint8_t message[1 + MESSAGE_HEADER_SIZE + 0x10];
uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + 0x10];
message[0] = TAKION_PACKET_TYPE_CONTROL;
takion_write_message_header(message + 1, takion->tag_remote, 0, TAKION_CHUNK_TYPE_INIT, 0, 0x10);
uint8_t *pl = message + 1 + MESSAGE_HEADER_SIZE;
uint8_t *pl = message + 1 + TAKION_MESSAGE_HEADER_SIZE;
*((uint32_t *)(pl + 0)) = htonl(payload->tag);
*((uint32_t *)(pl + 4)) = htonl(payload->a_rwnd);
*((uint16_t *)(pl + 8)) = htons(payload->outbound_streams);
@ -981,10 +995,10 @@ static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMess
static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t *cookie)
{
uint8_t message[1 + MESSAGE_HEADER_SIZE + TAKION_COOKIE_SIZE];
uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + TAKION_COOKIE_SIZE];
message[0] = TAKION_PACKET_TYPE_CONTROL;
takion_write_message_header(message + 1, takion->tag_remote, 0, TAKION_CHUNK_TYPE_COOKIE, 0, TAKION_COOKIE_SIZE);
memcpy(message + 1 + MESSAGE_HEADER_SIZE, cookie, TAKION_COOKIE_SIZE);
memcpy(message + 1 + TAKION_MESSAGE_HEADER_SIZE, cookie, TAKION_COOKIE_SIZE);
return chiaki_takion_send_raw(takion, message, sizeof(message));
}
@ -992,7 +1006,7 @@ static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t
static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, TakionMessagePayloadInitAck *payload)
{
uint8_t message[1 + MESSAGE_HEADER_SIZE + 0x10 + TAKION_COOKIE_SIZE];
uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + 0x10 + TAKION_COOKIE_SIZE];
size_t received_size = sizeof(message);
ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout);
if(err != CHIAKI_ERR_SUCCESS)
@ -1040,7 +1054,7 @@ static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, Takion
static ChiakiErrorCode takion_recv_message_cookie_ack(ChiakiTakion *takion)
{
uint8_t message[1 + MESSAGE_HEADER_SIZE];
uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE];
size_t received_size = sizeof(message);
ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout);
if(err != CHIAKI_ERR_SUCCESS)

231
lib/src/takionsendbuffer.c Normal file
View file

@ -0,0 +1,231 @@
/*
* This file is part of Chiaki.
*
* Chiaki is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chiaki is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chiaki. If not, see <https://www.gnu.org/licenses/>.
*/
#include <chiaki/takionsendbuffer.h>
#include <chiaki/takion.h>
#include <chiaki/time.h>
#include <string.h>
#include <assert.h>
#define TAKION_DATA_RESEND_TIMEOUT_MS 200
#define TAKION_DATA_RESEND_WAKEUP_TIMEOUT_MS (TAKION_DATA_RESEND_TIMEOUT_MS/2)
#define TAKION_DATA_RESEND_TRIES_MAX 10
struct chiaki_takion_send_buffer_packet_t
{
ChiakiSeqNum32 seq_num;
uint64_t tries;
uint64_t last_send_ms; // chiaki_time_now_monotonic_ms()
uint8_t *buf;
size_t buf_size;
}; // ChiakiTakionSendBufferPacket
static void *takion_send_buffer_thread_func(void *user);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_init(ChiakiTakionSendBuffer *send_buffer, ChiakiTakion *takion, size_t size)
{
send_buffer->takion = takion;
send_buffer->log = takion->log;
send_buffer->packets = calloc(size, sizeof(ChiakiTakionSendBufferPacket));
if(!send_buffer->packets)
return CHIAKI_ERR_MEMORY;
send_buffer->packets_size = size;
send_buffer->packets_count = 0;
send_buffer->should_stop = false;
ChiakiErrorCode err = chiaki_mutex_init(&send_buffer->mutex, false);
if(err != CHIAKI_ERR_SUCCESS)
goto error_packets;
err = chiaki_cond_init(&send_buffer->cond);
if(err != CHIAKI_ERR_SUCCESS)
goto error_mutex;
err = chiaki_thread_create(&send_buffer->thread, takion_send_buffer_thread_func, send_buffer);
if(err != CHIAKI_ERR_SUCCESS)
goto error_cond;
return CHIAKI_ERR_SUCCESS;
error_cond:
chiaki_cond_fini(&send_buffer->cond);
error_mutex:
chiaki_mutex_fini(&send_buffer->mutex);
error_packets:
free(send_buffer->packets);
return err;
}
CHIAKI_EXPORT void chiaki_takion_send_buffer_fini(ChiakiTakionSendBuffer *send_buffer)
{
ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex);
assert(err == CHIAKI_ERR_SUCCESS);
send_buffer->should_stop = true;
chiaki_mutex_unlock(&send_buffer->mutex);
err = chiaki_cond_signal(&send_buffer->cond);
assert(err == CHIAKI_ERR_SUCCESS);
err = chiaki_thread_join(&send_buffer->thread, NULL);
assert(err == CHIAKI_ERR_SUCCESS);
for(size_t i=0; i<send_buffer->packets_count; i++)
free(send_buffer->packets[i].buf);
chiaki_cond_fini(&send_buffer->cond);
chiaki_mutex_fini(&send_buffer->mutex);
free(send_buffer->packets);
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_push(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, uint8_t *buf, size_t buf_size)
{
ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex);
if(err != CHIAKI_ERR_SUCCESS)
return err;
if(send_buffer->packets_count >= send_buffer->packets_size)
{
CHIAKI_LOGE(send_buffer->log, "Takion Send Buffer overflow\n");
err = CHIAKI_ERR_OVERFLOW;
goto beach;
}
for(size_t i=0; i<send_buffer->packets_count; i++)
{
if(send_buffer->packets[i].seq_num == seq_num)
{
CHIAKI_LOGE(send_buffer->log, "Tried to push duplicate seqnum into Takion Send Buffer\n");
err = CHIAKI_ERR_INVALID_DATA;
goto beach;
}
}
ChiakiTakionSendBufferPacket *packet = &send_buffer->packets[send_buffer->packets_count++];
packet->seq_num = seq_num;
packet->tries = 0;
packet->last_send_ms = chiaki_time_now_monotonic_ms();
packet->buf = buf;
packet->buf_size = buf_size;
CHIAKI_LOGD(send_buffer->log, "Pushed seq num %#llx into Takion Send Buffer\n", (unsigned long long)seq_num);
if(send_buffer->packets_count == 1)
{
// buffer was empty before, so it will sleep without timeout => WAKE UP!!
chiaki_cond_signal(&send_buffer->cond);
}
beach:
chiaki_mutex_unlock(&send_buffer->mutex);
return err;
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num)
{
ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex);
if(err != CHIAKI_ERR_SUCCESS)
return err;
size_t i;
for(i=0; i<send_buffer->packets_count; i++)
{
if(send_buffer->packets[i].seq_num == seq_num)
break;
}
if(i == send_buffer->packets_count)
{
CHIAKI_LOGW(send_buffer->log, "Takion Send Buffer got ack for seqnum not in buffer\n");
goto beach;
}
free(send_buffer->packets[i].buf);
if(i < send_buffer->packets_count - 1)
memmove(send_buffer->packets + i, send_buffer->packets + i + 1, send_buffer->packets_count - i - 1);
send_buffer->packets_count--;
CHIAKI_LOGD(send_buffer->log, "Acked seq num %#llx from Takion Send Buffer\n", (unsigned long long)seq_num);
beach:
chiaki_mutex_unlock(&send_buffer->mutex);
return err;
}
static void takion_send_buffer_resend(ChiakiTakionSendBuffer *send_buffer);
static bool takion_send_buffer_check_pred_packets(void *user)
{
ChiakiTakionSendBuffer *send_buffer = user;
return send_buffer->should_stop;
}
static bool takion_send_buffer_check_pred_no_packets(void *user)
{
ChiakiTakionSendBuffer *send_buffer = user;
return send_buffer->should_stop || send_buffer->packets_count;
}
static void *takion_send_buffer_thread_func(void *user)
{
ChiakiTakionSendBuffer *send_buffer = user;
ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex);
if(err != CHIAKI_ERR_SUCCESS)
return NULL;
while(true)
{
if(send_buffer->packets_count) // if there are packets, wait with timeout
err = chiaki_cond_timedwait_pred(&send_buffer->cond, &send_buffer->mutex, TAKION_DATA_RESEND_WAKEUP_TIMEOUT_MS, takion_send_buffer_check_pred_packets, send_buffer);
else // if not, wait without timeout, but also wakeup if packets become available
err = chiaki_cond_wait_pred(&send_buffer->cond, &send_buffer->mutex, takion_send_buffer_check_pred_no_packets, send_buffer);
if(err != CHIAKI_ERR_SUCCESS && err != CHIAKI_ERR_TIMEOUT)
break;
if(send_buffer->should_stop)
break;
takion_send_buffer_resend(send_buffer);
}
chiaki_mutex_unlock(&send_buffer->mutex);
return NULL;
}
static void takion_send_buffer_resend(ChiakiTakionSendBuffer *send_buffer)
{
uint64_t now = chiaki_time_now_monotonic_ms();
for(size_t i=0; i<send_buffer->packets_count; i++)
{
ChiakiTakionSendBufferPacket *packet = &send_buffer->packets[i];
if(now - packet->last_send_ms > TAKION_DATA_RESEND_TIMEOUT_MS)
{
CHIAKI_LOGI(send_buffer->log, "Takion Send Buffer re-sending packet with seqnum %#llx, tries: %llu\n", (unsigned long long)packet->seq_num, (unsigned long long)packet->tries);
packet->last_send_ms = now;
chiaki_takion_send_raw(send_buffer->takion, packet->buf, packet->buf_size);
packet->tries++;
// TODO: check tries and disconnect if necessary
}
}
}

27
lib/src/time.c Normal file
View file

@ -0,0 +1,27 @@
/*
* This file is part of Chiaki.
*
* Chiaki is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chiaki is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chiaki. If not, see <https://www.gnu.org/licenses/>.
*/
#include <chiaki/time.h>
#include <time.h>
CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms()
{
struct timespec time;
clock_gettime(CLOCK_MONOTONIC, &time);
return time.tv_sec * 1000 + time.tv_nsec / 1000000;
}