Add Senkusha MTU In Test

This commit is contained in:
Florian Märkl 2019-08-10 22:36:31 +02:00
commit ab740fbe58
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
4 changed files with 180 additions and 6 deletions

View file

@ -41,6 +41,7 @@ typedef struct senkusha_t
uint16_t ping_test_index;
uint16_t ping_index;
uint32_t ping_tag;
uint32_t mtu_id;
/**
* signaled on change of state_finished or should_stop

View file

@ -46,9 +46,11 @@ typedef enum {
STATE_EXPECT_BANG,
STATE_EXPECT_DATA_ACK,
STATE_EXPECT_PONG,
STATE_EXPECT_MTU
} SenkushaState;
static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t ping_test_index, uint16_t ping_count);
static ChiakiErrorCode senkusha_run_mtu_in_test(ChiakiSenkusha *senkusha, uint32_t min, uint32_t max, uint32_t retries, uint64_t timeout_ms, uint32_t *mtu);
static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user);
static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size);
static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num);
@ -56,6 +58,9 @@ static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *p
static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha);
static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha);
static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable);
static ChiakiErrorCode senkusha_send_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaMtuCommand *command);
static ChiakiErrorCode senkusha_send_client_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaClientMtuCommand *command);
static ChiakiErrorCode senkusha_send_data_wait_for_ack(ChiakiSenkusha *senkusha, uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_init(ChiakiSenkusha *senkusha, ChiakiSession *session)
{
@ -194,14 +199,28 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSenkusha *senkusha)
CHIAKI_LOGI(session->log, "Senkusha successfully received bang");
senkusha_run_ping_test(senkusha, 0, SENKUSHA_PING_COUNT_DEFAULT);
err = senkusha_run_ping_test(senkusha, 0, SENKUSHA_PING_COUNT_DEFAULT);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha Ping Test failed");
goto disconnect;
}
// TODO: timeout should be measured rtt * 5
uint32_t mtu_in;
err = senkusha_run_mtu_in_test(senkusha, 576, 1454, 3, 200, &mtu_in);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha MTU test failed");
goto disconnect;
}
disconnect:
CHIAKI_LOGI(session->log, "Senkusha is disconnecting");
senkusha_send_disconnect(senkusha);
chiaki_mutex_unlock(&senkusha->state_mutex);
err = CHIAKI_ERR_SUCCESS;
quit_takion:
chiaki_takion_close(&senkusha->takion);
CHIAKI_LOGI(session->log, "Senkusha closed takion");
@ -295,6 +314,80 @@ static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t
return CHIAKI_ERR_SUCCESS;
}
static ChiakiErrorCode senkusha_run_mtu_in_test(ChiakiSenkusha *senkusha, uint32_t min, uint32_t max, uint32_t retries, uint64_t timeout_ms, uint32_t *mtu)
{
CHIAKI_LOGI(senkusha->log, "Senkusha starting MTU test with min %u, max %u, retries %u, timeout %llu ms",
(unsigned int)min, (unsigned int)max, (unsigned int)retries, (unsigned long long)timeout_ms);
uint32_t cur = max;
uint32_t request_id = 0;
while(max > min)
{
bool success = false;
for(uint32_t attempt=0; attempt<retries; attempt++)
{
senkusha->state = STATE_EXPECT_MTU;
senkusha->state_finished = false;
senkusha->mtu_id = ++request_id;
tkproto_SenkushaMtuCommand mtu_cmd;
mtu_cmd.id = request_id;
mtu_cmd.mtu_req = cur;
mtu_cmd.num = 1;
ChiakiErrorCode err = senkusha_send_mtu_command(senkusha, &mtu_cmd);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha failed to send MTU command");
return err;
}
CHIAKI_LOGI(senkusha->log, "Senkusha MTU request %u (min %u, max %u), id %u, attempt %u",
(unsigned int)cur, (unsigned int)min, (unsigned int)max, (unsigned int)request_id, (unsigned int)attempt);
err = chiaki_cond_timedwait_pred(&senkusha->state_cond, &senkusha->state_mutex, timeout_ms, state_finished_cond_check, senkusha);
assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT);
if(!senkusha->state_finished)
{
if(err == CHIAKI_ERR_TIMEOUT)
{
CHIAKI_LOGI(senkusha->log, "Senkusha MTU %u timeout", (unsigned int)cur);
continue;
}
if(senkusha->should_stop)
return CHIAKI_ERR_CANCELED;
else
CHIAKI_LOGE(senkusha->log, "Senkusha failed to receive MTU response");
}
CHIAKI_LOGI(senkusha->log, "Senkusha MTU %u success", (unsigned int)cur);
success = true;
break;
}
if(success)
min = cur + 1;
else
max = cur - 1;
cur = min + (max - min) / 2;
}
CHIAKI_LOGI(senkusha->log, "Senkusha determined MTU %u", (unsigned int)max);
*mtu = max;
/*tkproto_SenkushaClientMtuCommand client_mtu_cmd;
client_mtu_cmd.id = 2;
client_mtu_cmd.state = false;
client_mtu_cmd.mtu_req = 1454;
client_mtu_cmd.has_mtu_down = true;
client_mtu_cmd.mtu_down = 1454;
ChiakiErrorCode err = senkusha_send_client_mtu_command(senkusha, &client_mtu_cmd);
CHIAKI_LOGD(senkusha->log, "MTU result: %d\n", err);*/
return CHIAKI_ERR_SUCCESS;
}
static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user)
{
ChiakiSenkusha *senkusha = user;
@ -401,6 +494,25 @@ static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *p
chiaki_cond_signal(&senkusha->state_cond);
return;
}
else if(senkusha->state == STATE_EXPECT_MTU)
{
//CHIAKI_LOGD(senkusha->log, "Senkusha received av while expecting mtu");
//chiaki_log_hexdump(senkusha->log, CHIAKI_LOG_DEBUG, packet->data, packet->data_size);
//CHIAKI_LOGD(senkusha->log, "packet index: %u, frame index: %u, unit index: %u, units in frame: %u", packet->packet_index, packet->frame_index, packet->unit_index, packet->units_in_frame_total);
if(!packet->is_video
|| packet->frame_index != senkusha->mtu_id)
{
CHIAKI_LOGW(senkusha->log, "Senkusha received invalid MTU response %u, size: %#llx, is video: %d",
(unsigned int)packet->frame_index, (unsigned long long)packet->data_size, packet->is_video ? 1 : 0);
goto beach;
}
senkusha->state_finished = true;
chiaki_mutex_unlock(&senkusha->state_mutex);
chiaki_cond_signal(&senkusha->state_cond);
return;
}
beach:
chiaki_mutex_unlock(&senkusha->state_mutex);
@ -477,8 +589,6 @@ static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool
msg.senkusha_payload.echo_command.state = enable;
uint8_t buf[0x10];
size_t buf_size;
pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf));
bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg);
if(!pbr)
@ -487,11 +597,61 @@ static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool
return CHIAKI_ERR_UNKNOWN;
}
buf_size = stream.bytes_written;
return senkusha_send_data_wait_for_ack(senkusha, buf, stream.bytes_written);
}
static ChiakiErrorCode senkusha_send_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaMtuCommand *command)
{
tkproto_TakionMessage msg;
memset(&msg, 0, sizeof(msg));
msg.type = tkproto_TakionMessage_PayloadType_SENKUSHA;
msg.has_senkusha_payload = true;
msg.senkusha_payload.command = tkproto_SenkushaPayload_Command_MTU_COMMAND;
msg.senkusha_payload.has_mtu_command = true;
memcpy(&msg.senkusha_payload.mtu_command, command, sizeof(msg.senkusha_payload.mtu_command));
uint8_t buf[0x20];
pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf));
bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg);
if(!pbr)
{
CHIAKI_LOGE(senkusha->log, "Senkusha mtu command protobuf encoding failed");
return CHIAKI_ERR_UNKNOWN;
}
return chiaki_takion_send_message_data(&senkusha->takion, 1, 8, buf, stream.bytes_written, NULL);
}
static ChiakiErrorCode senkusha_send_client_mtu_command(ChiakiSenkusha *senkusha, tkproto_SenkushaClientMtuCommand *command)
{
tkproto_TakionMessage msg;
memset(&msg, 0, sizeof(msg));
msg.type = tkproto_TakionMessage_PayloadType_SENKUSHA;
msg.has_senkusha_payload = true;
msg.senkusha_payload.command = tkproto_SenkushaPayload_Command_CLIENT_MTU_COMMAND;
msg.senkusha_payload.has_client_mtu_command = true;
memcpy(&msg.senkusha_payload.client_mtu_command, command, sizeof(msg.senkusha_payload.client_mtu_command));
uint8_t buf[0x20];
pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf));
bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg);
if(!pbr)
{
CHIAKI_LOGE(senkusha->log, "Senkusha client mtu command protobuf encoding failed");
return CHIAKI_ERR_UNKNOWN;
}
return senkusha_send_data_wait_for_ack(senkusha, buf, stream.bytes_written);
}
static ChiakiErrorCode senkusha_send_data_wait_for_ack(ChiakiSenkusha *senkusha, uint8_t *buf, size_t buf_size)
{
senkusha->state = STATE_EXPECT_DATA_ACK;
senkusha->state_finished = false;
senkusha->state_failed = false;
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, &senkusha->data_ack_seq_num_expected);
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 8, buf, buf_size, &senkusha->data_ack_seq_num_expected);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha failed to send echo command");
@ -514,3 +674,4 @@ static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool
return err;
}

View file

@ -470,6 +470,7 @@ static bool session_thread_request_session(ChiakiSession *session)
set_port(sa, htons(SESSION_PORT));
// TODO: this can block, make cancelable somehow
int r = getnameinfo(sa, ai->ai_addrlen, session->connect_info.hostname, sizeof(session->connect_info.hostname), NULL, 0, 0);
if(r != 0)
{

View file

@ -27,6 +27,8 @@
#include <string.h>
#include <assert.h>
#include <netinet/ip.h>
// VERY similar to SCTP, see RFC 4960
@ -239,6 +241,15 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki
goto error_sock;
}
const int mtu_discover_val = IP_PMTUDISC_DO;
r = setsockopt(takion->sock, IPPROTO_IP, IP_MTU_DISCOVER, &mtu_discover_val, sizeof(mtu_discover_val));
if(r < 0)
{
CHIAKI_LOGE(takion->log, "Takion failed to setsockopt IP_MTU_DISCOVER: %s", strerror(errno));
ret = CHIAKI_ERR_NETWORK;
goto error_sock;
}
r = connect(takion->sock, info->sa, info->sa_len);
if(r < 0)
{