From ab740fbe589a15379776d3d0018ae4ae8a469633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=A4rkl?= Date: Sat, 10 Aug 2019 22:36:31 +0200 Subject: [PATCH] Add Senkusha MTU In Test --- lib/include/chiaki/senkusha.h | 1 + lib/src/senkusha.c | 173 ++++++++++++++++++++++++++++++++-- lib/src/session.c | 1 + lib/src/takion.c | 11 +++ 4 files changed, 180 insertions(+), 6 deletions(-) diff --git a/lib/include/chiaki/senkusha.h b/lib/include/chiaki/senkusha.h index c4137a1..a31f2bc 100644 --- a/lib/include/chiaki/senkusha.h +++ b/lib/include/chiaki/senkusha.h @@ -41,6 +41,7 @@ typedef struct senkusha_t uint16_t ping_test_index; uint16_t ping_index; uint32_t ping_tag; + uint32_t mtu_id; /** * signaled on change of state_finished or should_stop diff --git a/lib/src/senkusha.c b/lib/src/senkusha.c index df2260c..5a5572b 100644 --- a/lib/src/senkusha.c +++ b/lib/src/senkusha.c @@ -46,9 +46,11 @@ typedef enum { STATE_EXPECT_BANG, STATE_EXPECT_DATA_ACK, STATE_EXPECT_PONG, + STATE_EXPECT_MTU } SenkushaState; static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t ping_test_index, uint16_t ping_count); +static ChiakiErrorCode senkusha_run_mtu_in_test(ChiakiSenkusha *senkusha, uint32_t min, uint32_t max, uint32_t retries, uint64_t timeout_ms, uint32_t *mtu); static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user); static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size); static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num); @@ -56,6 +58,9 @@ static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *p static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha); static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha); static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable); +static ChiakiErrorCode senkusha_send_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaMtuCommand *command); +static ChiakiErrorCode senkusha_send_client_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaClientMtuCommand *command); +static ChiakiErrorCode senkusha_send_data_wait_for_ack(ChiakiSenkusha *senkusha, uint8_t *buf, size_t buf_size); CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_init(ChiakiSenkusha *senkusha, ChiakiSession *session) { @@ -194,14 +199,28 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSenkusha *senkusha) CHIAKI_LOGI(session->log, "Senkusha successfully received bang"); - senkusha_run_ping_test(senkusha, 0, SENKUSHA_PING_COUNT_DEFAULT); + err = senkusha_run_ping_test(senkusha, 0, SENKUSHA_PING_COUNT_DEFAULT); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha Ping Test failed"); + goto disconnect; + } + // TODO: timeout should be measured rtt * 5 + uint32_t mtu_in; + err = senkusha_run_mtu_in_test(senkusha, 576, 1454, 3, 200, &mtu_in); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha MTU test failed"); + goto disconnect; + } + +disconnect: CHIAKI_LOGI(session->log, "Senkusha is disconnecting"); senkusha_send_disconnect(senkusha); chiaki_mutex_unlock(&senkusha->state_mutex); - err = CHIAKI_ERR_SUCCESS; quit_takion: chiaki_takion_close(&senkusha->takion); CHIAKI_LOGI(session->log, "Senkusha closed takion"); @@ -295,6 +314,80 @@ static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t return CHIAKI_ERR_SUCCESS; } +static ChiakiErrorCode senkusha_run_mtu_in_test(ChiakiSenkusha *senkusha, uint32_t min, uint32_t max, uint32_t retries, uint64_t timeout_ms, uint32_t *mtu) +{ + CHIAKI_LOGI(senkusha->log, "Senkusha starting MTU test with min %u, max %u, retries %u, timeout %llu ms", + (unsigned int)min, (unsigned int)max, (unsigned int)retries, (unsigned long long)timeout_ms); + + uint32_t cur = max; + uint32_t request_id = 0; + while(max > min) + { + bool success = false; + for(uint32_t attempt=0; attemptstate = STATE_EXPECT_MTU; + senkusha->state_finished = false; + senkusha->mtu_id = ++request_id; + + tkproto_SenkushaMtuCommand mtu_cmd; + mtu_cmd.id = request_id; + mtu_cmd.mtu_req = cur; + mtu_cmd.num = 1; + ChiakiErrorCode err = senkusha_send_mtu_command(senkusha, &mtu_cmd); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha failed to send MTU command"); + return err; + } + + CHIAKI_LOGI(senkusha->log, "Senkusha MTU request %u (min %u, max %u), id %u, attempt %u", + (unsigned int)cur, (unsigned int)min, (unsigned int)max, (unsigned int)request_id, (unsigned int)attempt); + + err = chiaki_cond_timedwait_pred(&senkusha->state_cond, &senkusha->state_mutex, timeout_ms, state_finished_cond_check, senkusha); + assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT); + + if(!senkusha->state_finished) + { + if(err == CHIAKI_ERR_TIMEOUT) + { + CHIAKI_LOGI(senkusha->log, "Senkusha MTU %u timeout", (unsigned int)cur); + continue; + } + + if(senkusha->should_stop) + return CHIAKI_ERR_CANCELED; + else + CHIAKI_LOGE(senkusha->log, "Senkusha failed to receive MTU response"); + } + + CHIAKI_LOGI(senkusha->log, "Senkusha MTU %u success", (unsigned int)cur); + success = true; + break; + } + + if(success) + min = cur + 1; + else + max = cur - 1; + cur = min + (max - min) / 2; + } + + CHIAKI_LOGI(senkusha->log, "Senkusha determined MTU %u", (unsigned int)max); + *mtu = max; + + /*tkproto_SenkushaClientMtuCommand client_mtu_cmd; + client_mtu_cmd.id = 2; + client_mtu_cmd.state = false; + client_mtu_cmd.mtu_req = 1454; + client_mtu_cmd.has_mtu_down = true; + client_mtu_cmd.mtu_down = 1454; + ChiakiErrorCode err = senkusha_send_client_mtu_command(senkusha, &client_mtu_cmd); + CHIAKI_LOGD(senkusha->log, "MTU result: %d\n", err);*/ + + return CHIAKI_ERR_SUCCESS; +} + static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user) { ChiakiSenkusha *senkusha = user; @@ -401,6 +494,25 @@ static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *p chiaki_cond_signal(&senkusha->state_cond); return; } + else if(senkusha->state == STATE_EXPECT_MTU) + { + //CHIAKI_LOGD(senkusha->log, "Senkusha received av while expecting mtu"); + //chiaki_log_hexdump(senkusha->log, CHIAKI_LOG_DEBUG, packet->data, packet->data_size); + //CHIAKI_LOGD(senkusha->log, "packet index: %u, frame index: %u, unit index: %u, units in frame: %u", packet->packet_index, packet->frame_index, packet->unit_index, packet->units_in_frame_total); + + if(!packet->is_video + || packet->frame_index != senkusha->mtu_id) + { + CHIAKI_LOGW(senkusha->log, "Senkusha received invalid MTU response %u, size: %#llx, is video: %d", + (unsigned int)packet->frame_index, (unsigned long long)packet->data_size, packet->is_video ? 1 : 0); + goto beach; + } + + senkusha->state_finished = true; + chiaki_mutex_unlock(&senkusha->state_mutex); + chiaki_cond_signal(&senkusha->state_cond); + return; + } beach: chiaki_mutex_unlock(&senkusha->state_mutex); @@ -477,8 +589,6 @@ static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool msg.senkusha_payload.echo_command.state = enable; uint8_t buf[0x10]; - size_t buf_size; - pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf)); bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg); if(!pbr) @@ -487,11 +597,61 @@ static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool return CHIAKI_ERR_UNKNOWN; } - buf_size = stream.bytes_written; + return senkusha_send_data_wait_for_ack(senkusha, buf, stream.bytes_written); +} + +static ChiakiErrorCode senkusha_send_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaMtuCommand *command) +{ + tkproto_TakionMessage msg; + memset(&msg, 0, sizeof(msg)); + + msg.type = tkproto_TakionMessage_PayloadType_SENKUSHA; + msg.has_senkusha_payload = true; + msg.senkusha_payload.command = tkproto_SenkushaPayload_Command_MTU_COMMAND; + msg.senkusha_payload.has_mtu_command = true; + memcpy(&msg.senkusha_payload.mtu_command, command, sizeof(msg.senkusha_payload.mtu_command)); + + uint8_t buf[0x20]; + pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf)); + bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg); + if(!pbr) + { + CHIAKI_LOGE(senkusha->log, "Senkusha mtu command protobuf encoding failed"); + return CHIAKI_ERR_UNKNOWN; + } + + return chiaki_takion_send_message_data(&senkusha->takion, 1, 8, buf, stream.bytes_written, NULL); +} + +static ChiakiErrorCode senkusha_send_client_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaClientMtuCommand *command) +{ + tkproto_TakionMessage msg; + memset(&msg, 0, sizeof(msg)); + + msg.type = tkproto_TakionMessage_PayloadType_SENKUSHA; + msg.has_senkusha_payload = true; + msg.senkusha_payload.command = tkproto_SenkushaPayload_Command_CLIENT_MTU_COMMAND; + msg.senkusha_payload.has_client_mtu_command = true; + memcpy(&msg.senkusha_payload.client_mtu_command, command, sizeof(msg.senkusha_payload.client_mtu_command)); + + uint8_t buf[0x20]; + pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf)); + bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg); + if(!pbr) + { + CHIAKI_LOGE(senkusha->log, "Senkusha client mtu command protobuf encoding failed"); + return CHIAKI_ERR_UNKNOWN; + } + + return senkusha_send_data_wait_for_ack(senkusha, buf, stream.bytes_written); +} + +static ChiakiErrorCode senkusha_send_data_wait_for_ack(ChiakiSenkusha *senkusha, uint8_t *buf, size_t buf_size) +{ senkusha->state = STATE_EXPECT_DATA_ACK; senkusha->state_finished = false; senkusha->state_failed = false; - ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, &senkusha->data_ack_seq_num_expected); + ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 8, buf, buf_size, &senkusha->data_ack_seq_num_expected); if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(senkusha->log, "Senkusha failed to send echo command"); @@ -514,3 +674,4 @@ static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool return err; } + diff --git a/lib/src/session.c b/lib/src/session.c index 655382d..231036c 100644 --- a/lib/src/session.c +++ b/lib/src/session.c @@ -470,6 +470,7 @@ static bool session_thread_request_session(ChiakiSession *session) set_port(sa, htons(SESSION_PORT)); + // TODO: this can block, make cancelable somehow int r = getnameinfo(sa, ai->ai_addrlen, session->connect_info.hostname, sizeof(session->connect_info.hostname), NULL, 0, 0); if(r != 0) { diff --git a/lib/src/takion.c b/lib/src/takion.c index ac776fa..ffcf89f 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -27,6 +27,8 @@ #include #include +#include + // VERY similar to SCTP, see RFC 4960 @@ -239,6 +241,15 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki goto error_sock; } + const int mtu_discover_val = IP_PMTUDISC_DO; + r = setsockopt(takion->sock, IPPROTO_IP, IP_MTU_DISCOVER, &mtu_discover_val, sizeof(mtu_discover_val)); + if(r < 0) + { + CHIAKI_LOGE(takion->log, "Takion failed to setsockopt IP_MTU_DISCOVER: %s", strerror(errno)); + ret = CHIAKI_ERR_NETWORK; + goto error_sock; + } + r = connect(takion->sock, info->sa, info->sa_len); if(r < 0) {