Postpone non-data Takion packets until crypt is available

This commit is contained in:
Florian Märkl 2019-06-27 20:56:11 +02:00
commit a29560eb7f
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
2 changed files with 104 additions and 26 deletions

View file

@ -107,18 +107,29 @@ typedef struct chiaki_takion_connect_info_t
} ChiakiTakionConnectInfo;
typedef enum chiaki_takion_crypt_mode_t {
CHIAKI_TAKION_CRYPT_MODE_NO_CRYPT, // encryption disabled completely, for Senkusha
CHIAKI_TAKION_CRYPT_MODE_PRE_CRYPT, // encryption required, but not yet initialized (during handshake)
CHIAKI_TAKION_CRYPT_MODE_CRYPT // encryption required
} ChiakiTakionCryptMode;
typedef struct chiaki_takion_t
{
ChiakiLog *log;
ChiakiTakionCryptMode crypt_mode;
/**
* Whether encryption should be used.
*
* If false, encryption and MACs are disabled completely.
*
* If true, encryption and MACs will be used depending on whether gkcrypt_local and gkcrypt_remote are non-null, respectively.
* However, if gkcrypt_remote is null, only control data packets are passed to the callback and all other packets are postponed until
* gkcrypt_remote is set, so it has been set, so eventually all MACs will be checked.
*/
bool enable_crypt;
/**
* Array to be temporarily allocated when non-data packets come, enable_crypt is true, but gkcrypt_remote is NULL
* to not ignore any MACs in this period.
*/
struct chiaki_takion_postponed_packet_t *postponed_packets;
size_t postponed_packets_size;
size_t postponed_packets_count;
ChiakiGKCrypt *gkcrypt_local; // if NULL (default), no gmac is calculated and nothing is encrypted
size_t key_pos_local;
@ -145,6 +156,9 @@ typedef struct chiaki_takion_t
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, ChiakiTakionConnectInfo *info);
CHIAKI_EXPORT void chiaki_takion_close(ChiakiTakion *takion);
/**
* Must be called from within the Takion thread, i.e. inside the callback!
*/
static inline void chiaki_takion_set_crypt(ChiakiTakion *takion, ChiakiGKCrypt *gkcrypt_local, ChiakiGKCrypt *gkcrypt_remote) {
takion->gkcrypt_local = gkcrypt_local;
takion->gkcrypt_remote = gkcrypt_remote;

View file

@ -86,6 +86,8 @@ ssize_t takion_packet_type_key_pos_offset(TakionPacketType type)
#define TAKION_REORDER_QUEUE_SIZE_EXP 4 // => 16 entries
#define TAKION_POSTPONE_PACKETS_SIZE 32
#define MESSAGE_HEADER_SIZE 0x10
typedef enum takion_message_type_a_t {
@ -134,17 +136,26 @@ typedef struct takion_message_payload_init_ack_t
typedef struct
{
uint8_t *packet_buf;
size_t packet_size;
uint8_t type_b;
uint8_t *buf;
size_t buf_size;
uint8_t *payload; // inside packet_buf
size_t payload_size;
uint16_t channel;
} TakionDataPacketEntry;
typedef struct chiaki_takion_postponed_packet_t
{
uint8_t *buf;
size_t buf_size;
} ChiakiTakionPostponedPacket;
static void *takion_thread_func(void *user);
static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_size);
static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size);
static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size);
static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t *packet_buf, size_t packet_buf_size, uint8_t type_b, uint8_t *payload, size_t payload_size);
static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size);
static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg);
static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t type_a, uint8_t type_b, size_t payload_data_size);
@ -177,7 +188,10 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki
takion->recv_timeout.tv_usec = 0;
takion->send_retries = 5;
takion->crypt_mode = info->enable_crypt ? CHIAKI_TAKION_CRYPT_MODE_PRE_CRYPT : CHIAKI_TAKION_CRYPT_MODE_NO_CRYPT;
takion->enable_crypt = info->enable_crypt;
takion->postponed_packets = NULL;
takion->postponed_packets_size = 0;
takion->postponed_packets_count = 0;
CHIAKI_LOGI(takion->log, "Takion connecting\n");
@ -484,7 +498,9 @@ static ChiakiErrorCode takion_handshake(ChiakiTakion *takion)
static void takion_data_drop(uint64_t seq_num, void *elem_user, void *cb_user)
{
free(elem_user);
TakionDataPacketEntry *entry = elem_user;
free(entry->packet_buf);
free(entry);
}
static void *takion_thread_func(void *user)
@ -505,6 +521,23 @@ static void *takion_thread_func(void *user)
while(true)
{
if(takion->postponed_packets && takion->gkcrypt_remote)
{
// there are some postponed packets that were waiting until crypt is initialized and it is now :-)
CHIAKI_LOGI(takion->log, "Takion flushing %llu postpone packet(s)\n", (unsigned long long)takion->postponed_packets_count);
for(size_t i=0; i<takion->postponed_packets_count; i++)
{
ChiakiTakionPostponedPacket *packet = &takion->postponed_packets[i];
takion_handle_packet(takion, packet->buf, packet->buf_size);
}
free(takion->postponed_packets);
takion->postponed_packets = NULL;
takion->postponed_packets_size = 0;
takion->postponed_packets_count = 0;
}
size_t received_size = 1500;
uint8_t *buf = malloc(received_size); // TODO: no malloc?
if(!buf)
@ -592,7 +625,33 @@ static ChiakiErrorCode takion_handle_packet_mac(ChiakiTakion *takion, uint8_t ba
return CHIAKI_ERR_SUCCESS;
}
static void takion_postpone_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_size)
{
if(!takion->postponed_packets)
{
takion->postponed_packets = calloc(TAKION_POSTPONE_PACKETS_SIZE, sizeof(ChiakiTakionPostponedPacket));
if(!takion->postponed_packets)
return;
takion->postponed_packets_size = TAKION_POSTPONE_PACKETS_SIZE;
takion->postponed_packets_count = 0;
}
if(takion->postponed_packets_count >= takion->postponed_packets_size)
{
CHIAKI_LOGE(takion->log, "Should postpone a packet, but there is no space left\n");
return;
}
CHIAKI_LOGI(takion->log, "Postpone packet of size %#llx\n", (unsigned long long)buf_size);
ChiakiTakionPostponedPacket *packet = &takion->postponed_packets[takion->postponed_packets_count++];
packet->buf = buf;
packet->buf_size = buf_size;
}
/**
* @param buf ownership of this buf is taken.
*/
static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_size)
{
assert(buf_size > 0);
@ -611,8 +670,13 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_
break;
case TAKION_PACKET_TYPE_VIDEO:
case TAKION_PACKET_TYPE_AUDIO:
takion_handle_packet_av(takion, base_type, buf, buf_size);
free(buf);
if(takion->enable_crypt && !takion->gkcrypt_remote)
takion_postpone_packet(takion, buf, buf_size);
else
{
takion_handle_packet_av(takion, base_type, buf, buf_size);
free(buf);
}
break;
default:
CHIAKI_LOGW(takion->log, "Takion packet with unknown type %#x received\n", base_type);
@ -639,7 +703,7 @@ static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, siz
switch(msg.type_a)
{
case TAKION_MESSAGE_TYPE_A_DATA:
takion_handle_packet_message_data(takion, msg.type_b, msg.payload, msg.payload_size);
takion_handle_packet_message_data(takion, buf, buf_size, msg.type_b, msg.payload, msg.payload_size);
break;
case TAKION_MESSAGE_TYPE_A_DATA_ACK:
takion_handle_packet_message_data_ack(takion, msg.type_b, msg.payload, msg.payload_size);
@ -662,14 +726,14 @@ static void takion_flush_data_queue(ChiakiTakion *takion)
if(!pulled)
break;
if(entry->buf_size < 9)
if(entry->payload_size < 9)
{
free(entry);
continue;
}
uint16_t zero_a = *((uint16_t *)(entry->buf + 6));
uint8_t data_type = entry->buf[8]; // & 0xf
uint16_t zero_a = *((uint16_t *)(entry->payload + 6));
uint8_t data_type = entry->payload[8]; // & 0xf
if(zero_a != 0)
CHIAKI_LOGW(takion->log, "Takion received data with unexpected nonzero %#x at buf+6\n", zero_a);
@ -681,19 +745,19 @@ static void takion_flush_data_queue(ChiakiTakion *takion)
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_DATA;
event.data.data_type = (ChiakiTakionMessageDataType)data_type;
event.data.buf = entry->buf + 9;
event.data.buf_size = entry->buf_size - 9;
event.data.buf = entry->payload + 9;
event.data.buf_size = (size_t)(entry->payload_size - 9);
takion->cb(&event, takion->cb_user);
}
}
}
static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size)
static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t *packet_buf, size_t packet_buf_size, uint8_t type_b, uint8_t *payload, size_t payload_size)
{
if(type_b != 1)
CHIAKI_LOGW(takion->log, "Takion received data with type_b = %#x (was expecting %#x)\n", type_b, 1);
if(buf_size < 9)
if(payload_size < 9)
{
CHIAKI_LOGE(takion->log, "Takion received data with a size less than the header size\n");
return;
@ -704,10 +768,10 @@ static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type
return;
entry->type_b = type_b;
entry->buf = buf;
entry->buf_size = buf_size;
entry->channel = ntohs(*((uint16_t *)(buf + 4)));
ChiakiSeqNum32 seq_num = ntohl(*((uint32_t *)(buf + 0)));
entry->payload = payload;
entry->payload_size = payload_size;
entry->channel = ntohs(*((uint16_t *)(payload + 4)));
ChiakiSeqNum32 seq_num = ntohl(*((uint32_t *)(payload + 0)));
chiaki_takion_send_message_data_ack(takion, 0, entry->channel, seq_num);
chiaki_reorder_queue_push(&takion->data_queue, seq_num, entry);