Move Takion Handshake to Thread

This commit is contained in:
Florian Märkl 2019-06-27 15:59:36 +02:00
commit 33f1f91cbe
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
5 changed files with 158 additions and 92 deletions

View file

@ -70,6 +70,8 @@ typedef struct chiaki_takion_congestion_packet_t
typedef enum {
CHIAKI_TAKION_EVENT_TYPE_CONNECTED,
CHIAKI_TAKION_EVENT_TYPE_DISCONNECT,
CHIAKI_TAKION_EVENT_TYPE_DATA,
CHIAKI_TAKION_EVENT_TYPE_AV
} ChiakiTakionEventType;

View file

@ -44,6 +44,9 @@ typedef struct senkusha_t
ChiakiMirai bang_mirai;
} Senkusha;
#define MIRAI_REQUEST_CONNECT 1
#define MIRAI_REQUEST_BANG 2
static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user);
static void senkusha_takion_data(Senkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size);
@ -74,6 +77,9 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSession *session)
takion_info.cb = senkusha_takion_cb;
takion_info.cb_user = &senkusha;
err = chiaki_mirai_request_begin(&senkusha.bang_mirai, MIRAI_REQUEST_CONNECT, true);
assert(err == CHIAKI_ERR_SUCCESS);
err = chiaki_takion_connect(&senkusha.takion, &takion_info);
free(takion_info.sa);
if(err != CHIAKI_ERR_SUCCESS)
@ -82,9 +88,22 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSession *session)
goto error_bang_mirai;
}
err = chiaki_mirai_request_wait(&senkusha.bang_mirai, BIG_TIMEOUT_MS, false);
assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT);
if(!senkusha.bang_mirai.response)
{
if(err == CHIAKI_ERR_TIMEOUT)
CHIAKI_LOGE(&session->log, "Senkusha connect timeout\n");
CHIAKI_LOGE(&session->log, "Senkusha Takion connect failed\n");
err = CHIAKI_ERR_UNKNOWN;
goto error_takion;
}
CHIAKI_LOGI(&session->log, "Senkusha sending big\n");
err = chiaki_mirai_request_begin(&senkusha.bang_mirai, 1, true);
err = chiaki_mirai_request_begin(&senkusha.bang_mirai, MIRAI_REQUEST_BANG, true);
assert(err == CHIAKI_ERR_SUCCESS);
err = senkusha_send_big(&senkusha);
@ -127,6 +146,12 @@ static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user)
Senkusha *senkusha = user;
switch(event->type)
{
case CHIAKI_TAKION_EVENT_TYPE_CONNECTED:
case CHIAKI_TAKION_EVENT_TYPE_DISCONNECT:
if(senkusha->bang_mirai.request == MIRAI_REQUEST_CONNECT)
{
chiaki_mirai_signal(&senkusha->bang_mirai, event->type == CHIAKI_TAKION_EVENT_TYPE_CONNECTED);
}
case CHIAKI_TAKION_EVENT_TYPE_DATA:
senkusha_takion_data(senkusha, event->data.data_type, event->data.buf, event->data.buf_size);
break;
@ -151,7 +176,7 @@ static void senkusha_takion_data(Senkusha *senkusha, ChiakiTakionMessageDataType
return;
}
if(senkusha->bang_mirai.request)
if(senkusha->bang_mirai.request == MIRAI_REQUEST_BANG)
{
if(msg.type != tkproto_TakionMessage_PayloadType_BANG || !msg.has_bang_payload)
{

View file

@ -158,12 +158,12 @@ static void *session_thread_func(void *arg)
CHIAKI_LOGI(&session->log, "Starting Senkusha\n");
err = chiaki_senkusha_run(session);
/* TODO err = chiaki_senkusha_run(session);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "Senkusha failed\n");
goto quit_ctrl;
}
}*/
// TODO: Senkusha should set that
session->mtu = 1454;

View file

@ -45,6 +45,7 @@
typedef enum {
STATE_IDLE,
STATE_TAKION_CONNECT,
STATE_EXPECT_BANG,
STATE_EXPECT_STREAMINFO
} StreamConnectionState;
@ -128,7 +129,12 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnectio
takion_info.cb = stream_connection_takion_cb;
takion_info.cb_user = stream_connection;
// TODO: make this call stoppable
err = chiaki_mutex_lock(&stream_connection->state_mutex);
assert(err == CHIAKI_ERR_SUCCESS);
stream_connection->state = STATE_TAKION_CONNECT;
stream_connection->state_finished = false;
stream_connection->state_failed = false;
err = chiaki_takion_connect(&stream_connection->takion, &takion_info);
free(takion_info.sa);
if(err != CHIAKI_ERR_SUCCESS)
@ -137,8 +143,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnectio
return err;
}
err = chiaki_mutex_lock(&stream_connection->state_mutex);
assert(err == CHIAKI_ERR_SUCCESS);
err = chiaki_cond_wait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, state_finished_cond_check, stream_connection);
assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "StreamConnection Takion connect failed\n");
goto close_takion;
}
CHIAKI_LOGI(&session->log, "StreamConnection sending big\n");
@ -233,6 +244,7 @@ disconnect:
chiaki_mutex_unlock(&stream_connection->state_mutex);
close_takion:
chiaki_takion_close(&stream_connection->takion);
CHIAKI_LOGI(&session->log, "StreamConnection closed takion\n");
@ -255,6 +267,17 @@ static void stream_connection_takion_cb(ChiakiTakionEvent *event, void *user)
ChiakiStreamConnection *stream_connection = user;
switch(event->type)
{
case CHIAKI_TAKION_EVENT_TYPE_CONNECTED:
case CHIAKI_TAKION_EVENT_TYPE_DISCONNECT:
chiaki_mutex_lock(&stream_connection->state_mutex);
if(stream_connection->state == STATE_TAKION_CONNECT)
{
stream_connection->state_finished = event->type == CHIAKI_TAKION_EVENT_TYPE_CONNECTED;
stream_connection->state_failed = event->type == CHIAKI_TAKION_EVENT_TYPE_DISCONNECT;
chiaki_cond_signal(&stream_connection->state_cond);
}
chiaki_mutex_unlock(&stream_connection->state_mutex);
break;
case CHIAKI_TAKION_EVENT_TYPE_DATA:
stream_connection_takion_data(stream_connection, event->data.data_type, event->data.buf, event->data.buf_size);
break;

View file

@ -175,7 +175,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to create stop pipe\n");
return CHIAKI_ERR_UNKNOWN;
return err;
}
takion->sock = socket(info->sa->sa_family, SOCK_DGRAM, IPPROTO_UDP);
@ -194,90 +194,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki
goto error_sock;
}
// INIT ->
TakionMessagePayloadInit init_payload;
init_payload.tag0 = takion->tag_local;
init_payload.something = TAKION_LOCAL_SOMETHING;
init_payload.min = TAKION_LOCAL_MIN;
init_payload.max = TAKION_LOCAL_MIN;
init_payload.tag1 = takion->tag_local;
err = takion_send_message_init(takion, &init_payload);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to send init\n");
ret = err;
goto error_sock;
}
CHIAKI_LOGI(takion->log, "Takion sent init\n");
// INIT_ACK <-
TakionMessagePayloadInitAck init_ack_payload;
err = takion_recv_message_init_ack(takion, &init_ack_payload);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to receive init ack\n");
ret = CHIAKI_ERR_UNKNOWN;
goto error_sock;
}
if(init_ack_payload.tag == 0)
{
CHIAKI_LOGE(takion->log, "Takion remote tag in init ack is 0\n");
ret = CHIAKI_ERR_INVALID_RESPONSE;
goto error_sock;
}
CHIAKI_LOGI(takion->log, "Takion received init ack with remote tag %#x, min: %#x, max: %#x\n",
init_ack_payload.tag, init_ack_payload.min, init_ack_payload.max);
takion->tag_remote = init_ack_payload.tag;
if(init_ack_payload.min == 0 || init_ack_payload.max == 0
|| init_ack_payload.min > TAKION_LOCAL_MAX
|| init_ack_payload.max < TAKION_LOCAL_MAX)
{
CHIAKI_LOGE(takion->log, "Takion min/max check failed\n");
ret = CHIAKI_ERR_INVALID_RESPONSE;
goto error_sock;
}
// COOKIE ->
err = takion_send_message_cookie(takion, init_ack_payload.cookie);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to send cookie\n");
ret = err;
goto error_sock;
}
CHIAKI_LOGI(takion->log, "Takion sent cookie\n");
// COOKIE_ACK <-
err = takion_recv_message_cookie_ack(takion);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to receive cookie ack\n");
ret = err;
goto error_sock;
}
CHIAKI_LOGI(takion->log, "Takion received cookie ack\n");
// done!
CHIAKI_LOGI(takion->log, "Takion connected\n");
err = chiaki_thread_create(&takion->thread, takion_thread_func, takion);
if(r != CHIAKI_ERR_SUCCESS)
{
@ -465,10 +381,104 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_congestion(ChiakiTakion *takion
return chiaki_takion_send_raw(takion, buf, sizeof(buf));
}
static ChiakiErrorCode takion_handshake(ChiakiTakion *takion)
{
ChiakiErrorCode err;
// INIT ->
TakionMessagePayloadInit init_payload;
init_payload.tag0 = takion->tag_local;
init_payload.something = TAKION_LOCAL_SOMETHING;
init_payload.min = TAKION_LOCAL_MIN;
init_payload.max = TAKION_LOCAL_MIN;
init_payload.tag1 = takion->tag_local;
err = takion_send_message_init(takion, &init_payload);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to send init\n");
return err;
}
CHIAKI_LOGI(takion->log, "Takion sent init\n");
// INIT_ACK <-
TakionMessagePayloadInitAck init_ack_payload;
err = takion_recv_message_init_ack(takion, &init_ack_payload);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to receive init ack\n");
return err;
}
if(init_ack_payload.tag == 0)
{
CHIAKI_LOGE(takion->log, "Takion remote tag in init ack is 0\n");
return CHIAKI_ERR_INVALID_RESPONSE;
}
CHIAKI_LOGI(takion->log, "Takion received init ack with remote tag %#x, min: %#x, max: %#x\n",
init_ack_payload.tag, init_ack_payload.min, init_ack_payload.max);
takion->tag_remote = init_ack_payload.tag;
if(init_ack_payload.min == 0 || init_ack_payload.max == 0
|| init_ack_payload.min > TAKION_LOCAL_MAX
|| init_ack_payload.max < TAKION_LOCAL_MAX)
{
CHIAKI_LOGE(takion->log, "Takion min/max check failed\n");
return CHIAKI_ERR_INVALID_RESPONSE;
}
// COOKIE ->
err = takion_send_message_cookie(takion, init_ack_payload.cookie);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to send cookie\n");
return err;
}
CHIAKI_LOGI(takion->log, "Takion sent cookie\n");
// COOKIE_ACK <-
err = takion_recv_message_cookie_ack(takion);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(takion->log, "Takion failed to receive cookie ack\n");
return err;
}
CHIAKI_LOGI(takion->log, "Takion received cookie ack\n");
// done!
CHIAKI_LOGI(takion->log, "Takion connected\n");
if(takion->cb)
{
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_CONNECTED;
takion->cb(&event, takion->cb_user);
}
return CHIAKI_ERR_SUCCESS;
}
static void *takion_thread_func(void *user)
{
ChiakiTakion *takion = user;
if(takion_handshake(takion) != CHIAKI_ERR_SUCCESS)
goto beach;
// TODO ChiakiCongestionControl congestion_control;
// if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS)
// goto beach;
@ -486,6 +496,12 @@ static void *takion_thread_func(void *user)
// chiaki_congestion_control_stop(&congestion_control);
beach:
if(takion->cb)
{
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_DISCONNECT;
takion->cb(&event, takion->cb_user);
}
close(takion->sock);
return NULL;
}