diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 10dfcb6..07a5412 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -29,7 +29,9 @@ set(HEADER_FILES include/chiaki/discoveryservice.h include/chiaki/feedback.h include/chiaki/feedbacksender.h - include/chiaki/controller.h) + include/chiaki/controller.h + include/chiaki/takionsendbuffer.h + include/chiaki/time.h) set(SOURCE_FILES src/common.c @@ -61,7 +63,9 @@ set(SOURCE_FILES src/discoveryservice.c src/feedback.c src/feedbacksender.c - src/controller.c) + src/controller.c + src/takionsendbuffer.c + src/time.c) add_subdirectory(protobuf) include_directories("${NANOPB_SOURCE_DIR}") diff --git a/lib/include/chiaki/takion.h b/lib/include/chiaki/takion.h index fe43ca1..6849768 100644 --- a/lib/include/chiaki/takion.h +++ b/lib/include/chiaki/takion.h @@ -26,6 +26,7 @@ #include "stoppipe.h" #include "reorderqueue.h" #include "feedback.h" +#include "takionsendbuffer.h" #include #include @@ -103,12 +104,10 @@ typedef struct chiaki_takion_connect_info_t socklen_t sa_len; ChiakiTakionCallback cb; void *cb_user; - void *av_cb_user; bool enable_crypt; } ChiakiTakionConnectInfo; - typedef struct chiaki_takion_t { ChiakiLog *log; @@ -139,6 +138,7 @@ typedef struct chiaki_takion_t ChiakiGKCrypt *gkcrypt_remote; // if NULL (default), remote gmacs are IGNORED (!) and everything is expected to be unencrypted ChiakiReorderQueue data_queue; + ChiakiTakionSendBuffer send_buffer; ChiakiTakionCallback cb; void *cb_user; @@ -146,10 +146,9 @@ typedef struct chiaki_takion_t ChiakiThread thread; ChiakiStopPipe stop_pipe; struct timeval recv_timeout; - int send_retries; uint32_t tag_local; uint32_t tag_remote; - uint32_t seq_num_local; + ChiakiSeqNum32 seq_num_local; /** * Advertised Receiver Window Credit @@ -184,8 +183,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_crypt_advance_key_pos(ChiakiTakion * */ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, const uint8_t *buf, size_t buf_size); -CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_packet_mac(ChiakiGKCrypt *crypt, uint8_t *buf, size_t buf_size, uint8_t *mac_out, uint8_t *mac_old_out, ChiakiTakionPacketKeyPos *key_pos_out); - /** * Calculate the MAC for the packet depending on the type derived from the first byte in buf, * assign MAC inside buf at the respective position and send the packet. diff --git a/lib/include/chiaki/takionsendbuffer.h b/lib/include/chiaki/takionsendbuffer.h new file mode 100644 index 0000000..35f6335 --- /dev/null +++ b/lib/include/chiaki/takionsendbuffer.h @@ -0,0 +1,64 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#ifndef CHIAKI_TAKIONSENDBUFFER_H +#define CHIAKI_TAKIONSENDBUFFER_H + +#include "common.h" +#include "log.h" +#include "thread.h" +#include "seqnum.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct chiaki_takion_t ChiakiTakion; + +typedef struct chiaki_takion_send_buffer_packet_t ChiakiTakionSendBufferPacket; + +typedef struct chiaki_takion_send_buffer_t +{ + ChiakiLog *log; + ChiakiTakion *takion; + + ChiakiTakionSendBufferPacket *packets; + size_t packets_size; // allocated size + size_t packets_count; // current count + + ChiakiMutex mutex; + ChiakiCond cond; + bool should_stop; + ChiakiThread thread; +} ChiakiTakionSendBuffer; + + +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_init(ChiakiTakionSendBuffer *send_buffer, ChiakiTakion *takion, size_t size); +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_start(ChiakiTakionSendBuffer *send_buffer); +CHIAKI_EXPORT void chiaki_takion_send_buffer_fini(ChiakiTakionSendBuffer *send_buffer); + +/** + * @param buf ownership of this is taken by the ChiakiTakionSendBuffer, which will free it automatically later! + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_push(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, uint8_t *buf, size_t buf_size); +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num); + +#ifdef __cplusplus +} +#endif + +#endif // CHIAKI_TAKIONSENDBUFFER_H diff --git a/lib/include/chiaki/time.h b/lib/include/chiaki/time.h new file mode 100644 index 0000000..e2cdb08 --- /dev/null +++ b/lib/include/chiaki/time.h @@ -0,0 +1,35 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#ifndef CHIAKI_TIME_H +#define CHIAKI_TIME_H + +#include "common.h" + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms(); + +#ifdef __cplusplus +} +#endif + +#endif // CHIAKI_TIME_H diff --git a/lib/src/takion.c b/lib/src/takion.c index 4a6d93a..25b3701 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -29,6 +29,19 @@ // VERY similar to SCTP, see RFC 4960 +#define TAKION_A_RWND 0x19000 +#define TAKION_OUTBOUND_STREAMS 0x64 +#define TAKION_INBOUND_STREAMS 0x64 + +#define TAKION_REORDER_QUEUE_SIZE_EXP 4 // => 16 entries +#define TAKION_SEND_BUFFER_SIZE 16 + +#define TAKION_POSTPONE_PACKETS_SIZE 32 + +#define TAKION_MESSAGE_HEADER_SIZE 0x10 + +#define TAKION_PACKET_BASE_TYPE_MASK 0xf + /** * Base type of Takion packets. Lower nibble of the first byte in datagrams. @@ -80,18 +93,6 @@ ssize_t takion_packet_type_key_pos_offset(TakionPacketType type) } } -#define TAKION_A_RWND 0x19000 -#define TAKION_OUTBOUND_STREAMS 0x64 -#define TAKION_INBOUND_STREAMS 0x64 - -#define TAKION_REORDER_QUEUE_SIZE_EXP 4 // => 16 entries - -#define TAKION_POSTPONE_PACKETS_SIZE 32 - -#define MESSAGE_HEADER_SIZE 0x10 - -#define TAKION_PACKET_BASE_TYPE_MASK 0xf - typedef enum takion_chunk_type_t { TAKION_CHUNK_TYPE_DATA = 0, TAKION_CHUNK_TYPE_INIT = 1, @@ -159,7 +160,7 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_ static ChiakiErrorCode takion_handle_packet_mac(ChiakiTakion *takion, uint8_t base_type, uint8_t *buf, size_t buf_size); static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size); static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t *packet_buf, size_t packet_buf_size, uint8_t type_b, uint8_t *payload, size_t payload_size); -static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size); +static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t flags, uint8_t *buf, size_t buf_size); static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg); static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t chunk_type, uint8_t chunk_flags, size_t payload_data_size); static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload); @@ -189,7 +190,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki takion->tag_remote = 0; takion->recv_timeout.tv_sec = 2; takion->recv_timeout.tv_usec = 0; - takion->send_retries = 5; takion->enable_crypt = info->enable_crypt; takion->postponed_packets = NULL; @@ -287,7 +287,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, const } -CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_packet_mac(ChiakiGKCrypt *crypt, uint8_t *buf, size_t buf_size, uint8_t *mac_out, uint8_t *mac_old_out, ChiakiTakionPacketKeyPos *key_pos_out) +static ChiakiErrorCode chiaki_takion_packet_mac(ChiakiGKCrypt *crypt, uint8_t *buf, size_t buf_size, uint8_t *mac_out, uint8_t *mac_old_out, ChiakiTakionPacketKeyPos *key_pos_out) { if(buf_size < 1) return CHIAKI_ERR_BUF_TOO_SMALL; @@ -351,7 +351,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki if(err != CHIAKI_ERR_SUCCESS) return err; - size_t packet_size = 1 + MESSAGE_HEADER_SIZE + 9 + buf_size; + size_t packet_size = 1 + TAKION_MESSAGE_HEADER_SIZE + 9 + buf_size; uint8_t *packet_buf = malloc(packet_size); if(!packet_buf) return CHIAKI_ERR_MEMORY; @@ -359,22 +359,30 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki takion_write_message_header(packet_buf + 1, takion->tag_remote, key_pos, TAKION_CHUNK_TYPE_DATA, flags, 9 + buf_size); - uint8_t *msg_payload = packet_buf + 1 + MESSAGE_HEADER_SIZE; - *((uint32_t *)(msg_payload + 0)) = htonl(takion->seq_num_local++); + uint8_t *msg_payload = packet_buf + 1 + TAKION_MESSAGE_HEADER_SIZE; + ChiakiSeqNum32 seq_num = takion->seq_num_local++; + *((uint32_t *)(msg_payload + 0)) = htonl(seq_num); *((uint16_t *)(msg_payload + 4)) = htons(channel); *((uint16_t *)(msg_payload + 6)) = 0; *(msg_payload + 8) = 0; memcpy(msg_payload + 9, buf, buf_size); - // TODO: instead of just sending and forgetting about it, make sure to receive data ack, resend if necessary, etc. - err = chiaki_takion_send(takion, packet_buf, packet_size); - free(packet_buf); + err = chiaki_takion_send(takion, packet_buf, packet_size); // will alter packet_buf with gmac + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(takion->log, "Takion failed to send data packet: %s\n", chiaki_error_string(err)); + free(packet_buf); + return err; + } + + chiaki_takion_send_buffer_push(&takion->send_buffer, seq_num, packet_buf, packet_size); + return err; } static ChiakiErrorCode chiaki_takion_send_message_data_ack(ChiakiTakion *takion, uint32_t seq_num) { - uint8_t buf[1 + MESSAGE_HEADER_SIZE + 0xc]; + uint8_t buf[1 + TAKION_MESSAGE_HEADER_SIZE + 0xc]; buf[0] = TAKION_PACKET_TYPE_CONTROL; size_t key_pos; @@ -384,7 +392,7 @@ static ChiakiErrorCode chiaki_takion_send_message_data_ack(ChiakiTakion *takion, takion_write_message_header(buf + 1, takion->tag_remote, key_pos, TAKION_CHUNK_TYPE_DATA_ACK, 0, 0xc); - uint8_t *data_ack = buf + 1 + MESSAGE_HEADER_SIZE; + uint8_t *data_ack = buf + 1 + TAKION_MESSAGE_HEADER_SIZE; *((uint32_t *)(data_ack + 0)) = htonl(seq_num); *((uint32_t *)(data_ack + 4)) = htonl(takion->a_rwnd); *((uint16_t *)(data_ack + 8)) = 0; @@ -559,20 +567,13 @@ static ChiakiErrorCode takion_handshake(ChiakiTakion *takion, uint32_t *seq_num_ CHIAKI_LOGI(takion->log, "Takion connected\n"); - if(takion->cb) - { - ChiakiTakionEvent event = { 0 }; - event.type = CHIAKI_TAKION_EVENT_TYPE_CONNECTED; - takion->cb(&event, takion->cb_user); - } - return CHIAKI_ERR_SUCCESS; } static void takion_data_drop(uint64_t seq_num, void *elem_user, void *cb_user) { ChiakiTakion *takion = cb_user; - CHIAKI_LOGE(takion->log, "Takion dropping data with seq num %llx\n", (unsigned long long)seq_num); + CHIAKI_LOGE(takion->log, "Takion dropping data with seq num %#llx\n", (unsigned long long)seq_num); TakionDataPacketEntry *entry = elem_user; free(entry->packet_buf); free(entry); @@ -591,10 +592,20 @@ static void *takion_thread_func(void *user) chiaki_reorder_queue_set_drop_cb(&takion->data_queue, takion_data_drop, takion); + if(chiaki_takion_send_buffer_init(&takion->send_buffer, takion, TAKION_SEND_BUFFER_SIZE) != CHIAKI_ERR_SUCCESS) + goto error_reoder_queue; + // TODO ChiakiCongestionControl congestion_control; // if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS) // goto beach; + if(takion->cb) + { + ChiakiTakionEvent event = { 0 }; + event.type = CHIAKI_TAKION_EVENT_TYPE_CONNECTED; + takion->cb(&event, takion->cb_user); + } + bool crypt_available = takion->gkcrypt_remote ? true : false; while(true) @@ -656,6 +667,9 @@ static void *takion_thread_func(void *user) // chiaki_congestion_control_stop(&congestion_control); + chiaki_takion_send_buffer_fini(&takion->send_buffer); + +error_reoder_queue: chiaki_reorder_queue_fini(&takion->data_queue); beach: @@ -879,7 +893,7 @@ static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t *pac takion_flush_data_queue(takion); } -static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size) +static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t flags, uint8_t *buf, size_t buf_size) { if(buf_size != 0xc) { @@ -888,7 +902,7 @@ static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t } uint32_t seq_num = ntohl(*((uint32_t *)(buf + 0))); - uint32_t something = ntohl(*((uint32_t *)(buf + 4))); + uint32_t a_rwnd = ntohl(*((uint32_t *)(buf + 4))); uint16_t size_internal = ntohs(*((uint16_t *)(buf + 8))); uint16_t zero = ntohs(*((uint16_t *)(buf + 0xa))); @@ -903,8 +917,8 @@ static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t if(zero != 0) CHIAKI_LOGW(takion->log, "Takion received data ack with nonzero %#x at buf+0xa\n", zero); - // TODO: check seq_num, etc. //CHIAKI_LOGD(takion->log, "Takion received data ack with seq_num = %#x, something = %#x, size_or_something = %#x, zero = %#x\n", seq_num, something, size_internal, zero); + chiaki_takion_send_buffer_ack(&takion->send_buffer, seq_num); } /** @@ -926,7 +940,7 @@ static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg) { - if(buf_size < MESSAGE_HEADER_SIZE) + if(buf_size < TAKION_MESSAGE_HEADER_SIZE) { CHIAKI_LOGE(takion->log, "Takion message received that is too short\n"); return CHIAKI_ERR_INVALID_DATA; @@ -963,11 +977,11 @@ static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload) { - uint8_t message[1 + MESSAGE_HEADER_SIZE + 0x10]; + uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + 0x10]; message[0] = TAKION_PACKET_TYPE_CONTROL; takion_write_message_header(message + 1, takion->tag_remote, 0, TAKION_CHUNK_TYPE_INIT, 0, 0x10); - uint8_t *pl = message + 1 + MESSAGE_HEADER_SIZE; + uint8_t *pl = message + 1 + TAKION_MESSAGE_HEADER_SIZE; *((uint32_t *)(pl + 0)) = htonl(payload->tag); *((uint32_t *)(pl + 4)) = htonl(payload->a_rwnd); *((uint16_t *)(pl + 8)) = htons(payload->outbound_streams); @@ -981,10 +995,10 @@ static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMess static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t *cookie) { - uint8_t message[1 + MESSAGE_HEADER_SIZE + TAKION_COOKIE_SIZE]; + uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + TAKION_COOKIE_SIZE]; message[0] = TAKION_PACKET_TYPE_CONTROL; takion_write_message_header(message + 1, takion->tag_remote, 0, TAKION_CHUNK_TYPE_COOKIE, 0, TAKION_COOKIE_SIZE); - memcpy(message + 1 + MESSAGE_HEADER_SIZE, cookie, TAKION_COOKIE_SIZE); + memcpy(message + 1 + TAKION_MESSAGE_HEADER_SIZE, cookie, TAKION_COOKIE_SIZE); return chiaki_takion_send_raw(takion, message, sizeof(message)); } @@ -992,7 +1006,7 @@ static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, TakionMessagePayloadInitAck *payload) { - uint8_t message[1 + MESSAGE_HEADER_SIZE + 0x10 + TAKION_COOKIE_SIZE]; + uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + 0x10 + TAKION_COOKIE_SIZE]; size_t received_size = sizeof(message); ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout); if(err != CHIAKI_ERR_SUCCESS) @@ -1040,7 +1054,7 @@ static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, Takion static ChiakiErrorCode takion_recv_message_cookie_ack(ChiakiTakion *takion) { - uint8_t message[1 + MESSAGE_HEADER_SIZE]; + uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE]; size_t received_size = sizeof(message); ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout); if(err != CHIAKI_ERR_SUCCESS) diff --git a/lib/src/takionsendbuffer.c b/lib/src/takionsendbuffer.c new file mode 100644 index 0000000..2cf6431 --- /dev/null +++ b/lib/src/takionsendbuffer.c @@ -0,0 +1,231 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#include +#include +#include + +#include +#include + + +#define TAKION_DATA_RESEND_TIMEOUT_MS 200 +#define TAKION_DATA_RESEND_WAKEUP_TIMEOUT_MS (TAKION_DATA_RESEND_TIMEOUT_MS/2) +#define TAKION_DATA_RESEND_TRIES_MAX 10 + + +struct chiaki_takion_send_buffer_packet_t +{ + ChiakiSeqNum32 seq_num; + uint64_t tries; + uint64_t last_send_ms; // chiaki_time_now_monotonic_ms() + uint8_t *buf; + size_t buf_size; +}; // ChiakiTakionSendBufferPacket + +static void *takion_send_buffer_thread_func(void *user); + +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_init(ChiakiTakionSendBuffer *send_buffer, ChiakiTakion *takion, size_t size) +{ + send_buffer->takion = takion; + send_buffer->log = takion->log; + + send_buffer->packets = calloc(size, sizeof(ChiakiTakionSendBufferPacket)); + if(!send_buffer->packets) + return CHIAKI_ERR_MEMORY; + send_buffer->packets_size = size; + send_buffer->packets_count = 0; + + send_buffer->should_stop = false; + + ChiakiErrorCode err = chiaki_mutex_init(&send_buffer->mutex, false); + if(err != CHIAKI_ERR_SUCCESS) + goto error_packets; + + err = chiaki_cond_init(&send_buffer->cond); + if(err != CHIAKI_ERR_SUCCESS) + goto error_mutex; + + err = chiaki_thread_create(&send_buffer->thread, takion_send_buffer_thread_func, send_buffer); + if(err != CHIAKI_ERR_SUCCESS) + goto error_cond; + + return CHIAKI_ERR_SUCCESS; +error_cond: + chiaki_cond_fini(&send_buffer->cond); +error_mutex: + chiaki_mutex_fini(&send_buffer->mutex); +error_packets: + free(send_buffer->packets); + return err; +} + +CHIAKI_EXPORT void chiaki_takion_send_buffer_fini(ChiakiTakionSendBuffer *send_buffer) +{ + ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex); + assert(err == CHIAKI_ERR_SUCCESS); + send_buffer->should_stop = true; + chiaki_mutex_unlock(&send_buffer->mutex); + err = chiaki_cond_signal(&send_buffer->cond); + assert(err == CHIAKI_ERR_SUCCESS); + err = chiaki_thread_join(&send_buffer->thread, NULL); + assert(err == CHIAKI_ERR_SUCCESS); + + for(size_t i=0; ipackets_count; i++) + free(send_buffer->packets[i].buf); + + chiaki_cond_fini(&send_buffer->cond); + chiaki_mutex_fini(&send_buffer->mutex); + free(send_buffer->packets); +} + +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_push(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, uint8_t *buf, size_t buf_size) +{ + ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + + if(send_buffer->packets_count >= send_buffer->packets_size) + { + CHIAKI_LOGE(send_buffer->log, "Takion Send Buffer overflow\n"); + err = CHIAKI_ERR_OVERFLOW; + goto beach; + } + + for(size_t i=0; ipackets_count; i++) + { + if(send_buffer->packets[i].seq_num == seq_num) + { + CHIAKI_LOGE(send_buffer->log, "Tried to push duplicate seqnum into Takion Send Buffer\n"); + err = CHIAKI_ERR_INVALID_DATA; + goto beach; + } + } + + ChiakiTakionSendBufferPacket *packet = &send_buffer->packets[send_buffer->packets_count++]; + packet->seq_num = seq_num; + packet->tries = 0; + packet->last_send_ms = chiaki_time_now_monotonic_ms(); + packet->buf = buf; + packet->buf_size = buf_size; + + CHIAKI_LOGD(send_buffer->log, "Pushed seq num %#llx into Takion Send Buffer\n", (unsigned long long)seq_num); + + if(send_buffer->packets_count == 1) + { + // buffer was empty before, so it will sleep without timeout => WAKE UP!! + chiaki_cond_signal(&send_buffer->cond); + } + +beach: + chiaki_mutex_unlock(&send_buffer->mutex); + return err; +} + +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num) +{ + ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + + size_t i; + for(i=0; ipackets_count; i++) + { + if(send_buffer->packets[i].seq_num == seq_num) + break; + } + + if(i == send_buffer->packets_count) + { + CHIAKI_LOGW(send_buffer->log, "Takion Send Buffer got ack for seqnum not in buffer\n"); + goto beach; + } + + free(send_buffer->packets[i].buf); + + if(i < send_buffer->packets_count - 1) + memmove(send_buffer->packets + i, send_buffer->packets + i + 1, send_buffer->packets_count - i - 1); + + send_buffer->packets_count--; + + CHIAKI_LOGD(send_buffer->log, "Acked seq num %#llx from Takion Send Buffer\n", (unsigned long long)seq_num); + +beach: + chiaki_mutex_unlock(&send_buffer->mutex); + return err; +} + +static void takion_send_buffer_resend(ChiakiTakionSendBuffer *send_buffer); + +static bool takion_send_buffer_check_pred_packets(void *user) +{ + ChiakiTakionSendBuffer *send_buffer = user; + return send_buffer->should_stop; +} + +static bool takion_send_buffer_check_pred_no_packets(void *user) +{ + ChiakiTakionSendBuffer *send_buffer = user; + return send_buffer->should_stop || send_buffer->packets_count; +} + +static void *takion_send_buffer_thread_func(void *user) +{ + ChiakiTakionSendBuffer *send_buffer = user; + + ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex); + if(err != CHIAKI_ERR_SUCCESS) + return NULL; + + while(true) + { + if(send_buffer->packets_count) // if there are packets, wait with timeout + err = chiaki_cond_timedwait_pred(&send_buffer->cond, &send_buffer->mutex, TAKION_DATA_RESEND_WAKEUP_TIMEOUT_MS, takion_send_buffer_check_pred_packets, send_buffer); + else // if not, wait without timeout, but also wakeup if packets become available + err = chiaki_cond_wait_pred(&send_buffer->cond, &send_buffer->mutex, takion_send_buffer_check_pred_no_packets, send_buffer); + + if(err != CHIAKI_ERR_SUCCESS && err != CHIAKI_ERR_TIMEOUT) + break; + + if(send_buffer->should_stop) + break; + + takion_send_buffer_resend(send_buffer); + } + + chiaki_mutex_unlock(&send_buffer->mutex); + + return NULL; +} + +static void takion_send_buffer_resend(ChiakiTakionSendBuffer *send_buffer) +{ + uint64_t now = chiaki_time_now_monotonic_ms(); + + for(size_t i=0; ipackets_count; i++) + { + ChiakiTakionSendBufferPacket *packet = &send_buffer->packets[i]; + if(now - packet->last_send_ms > TAKION_DATA_RESEND_TIMEOUT_MS) + { + CHIAKI_LOGI(send_buffer->log, "Takion Send Buffer re-sending packet with seqnum %#llx, tries: %llu\n", (unsigned long long)packet->seq_num, (unsigned long long)packet->tries); + packet->last_send_ms = now; + chiaki_takion_send_raw(send_buffer->takion, packet->buf, packet->buf_size); + packet->tries++; + // TODO: check tries and disconnect if necessary + } + } +} diff --git a/lib/src/time.c b/lib/src/time.c new file mode 100644 index 0000000..6ca5faa --- /dev/null +++ b/lib/src/time.c @@ -0,0 +1,27 @@ +/* + * This file is part of Chiaki. + * + * Chiaki is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Chiaki is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Chiaki. If not, see . + */ + +#include + +#include + +CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms() +{ + struct timespec time; + clock_gettime(CLOCK_MONOTONIC, &time); + return time.tv_sec * 1000 + time.tv_nsec / 1000000; +}