From 33f1f91cbed3be2c7986cf77ba700c54b06d6072 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=A4rkl?= Date: Thu, 27 Jun 2019 15:59:36 +0200 Subject: [PATCH] Move Takion Handshake to Thread --- lib/include/chiaki/takion.h | 2 + lib/src/senkusha.c | 29 +++++- lib/src/session.c | 4 +- lib/src/streamconnection.c | 29 +++++- lib/src/takion.c | 186 ++++++++++++++++++++---------------- 5 files changed, 158 insertions(+), 92 deletions(-) diff --git a/lib/include/chiaki/takion.h b/lib/include/chiaki/takion.h index 0a600b7..e66ebd7 100644 --- a/lib/include/chiaki/takion.h +++ b/lib/include/chiaki/takion.h @@ -70,6 +70,8 @@ typedef struct chiaki_takion_congestion_packet_t typedef enum { + CHIAKI_TAKION_EVENT_TYPE_CONNECTED, + CHIAKI_TAKION_EVENT_TYPE_DISCONNECT, CHIAKI_TAKION_EVENT_TYPE_DATA, CHIAKI_TAKION_EVENT_TYPE_AV } ChiakiTakionEventType; diff --git a/lib/src/senkusha.c b/lib/src/senkusha.c index fe6c6fd..84c3712 100644 --- a/lib/src/senkusha.c +++ b/lib/src/senkusha.c @@ -44,6 +44,9 @@ typedef struct senkusha_t ChiakiMirai bang_mirai; } Senkusha; +#define MIRAI_REQUEST_CONNECT 1 +#define MIRAI_REQUEST_BANG 2 + static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user); static void senkusha_takion_data(Senkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size); @@ -74,6 +77,9 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSession *session) takion_info.cb = senkusha_takion_cb; takion_info.cb_user = &senkusha; + err = chiaki_mirai_request_begin(&senkusha.bang_mirai, MIRAI_REQUEST_CONNECT, true); + assert(err == CHIAKI_ERR_SUCCESS); + err = chiaki_takion_connect(&senkusha.takion, &takion_info); free(takion_info.sa); if(err != CHIAKI_ERR_SUCCESS) @@ -82,9 +88,22 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSession *session) goto error_bang_mirai; } + err = chiaki_mirai_request_wait(&senkusha.bang_mirai, BIG_TIMEOUT_MS, false); + assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT); + + if(!senkusha.bang_mirai.response) + { + if(err == CHIAKI_ERR_TIMEOUT) + CHIAKI_LOGE(&session->log, "Senkusha connect timeout\n"); + + CHIAKI_LOGE(&session->log, "Senkusha Takion connect failed\n"); + err = CHIAKI_ERR_UNKNOWN; + goto error_takion; + } + CHIAKI_LOGI(&session->log, "Senkusha sending big\n"); - err = chiaki_mirai_request_begin(&senkusha.bang_mirai, 1, true); + err = chiaki_mirai_request_begin(&senkusha.bang_mirai, MIRAI_REQUEST_BANG, true); assert(err == CHIAKI_ERR_SUCCESS); err = senkusha_send_big(&senkusha); @@ -127,6 +146,12 @@ static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user) Senkusha *senkusha = user; switch(event->type) { + case CHIAKI_TAKION_EVENT_TYPE_CONNECTED: + case CHIAKI_TAKION_EVENT_TYPE_DISCONNECT: + if(senkusha->bang_mirai.request == MIRAI_REQUEST_CONNECT) + { + chiaki_mirai_signal(&senkusha->bang_mirai, event->type == CHIAKI_TAKION_EVENT_TYPE_CONNECTED); + } case CHIAKI_TAKION_EVENT_TYPE_DATA: senkusha_takion_data(senkusha, event->data.data_type, event->data.buf, event->data.buf_size); break; @@ -151,7 +176,7 @@ static void senkusha_takion_data(Senkusha *senkusha, ChiakiTakionMessageDataType return; } - if(senkusha->bang_mirai.request) + if(senkusha->bang_mirai.request == MIRAI_REQUEST_BANG) { if(msg.type != tkproto_TakionMessage_PayloadType_BANG || !msg.has_bang_payload) { diff --git a/lib/src/session.c b/lib/src/session.c index 51cb4ad..cfb14e7 100644 --- a/lib/src/session.c +++ b/lib/src/session.c @@ -158,12 +158,12 @@ static void *session_thread_func(void *arg) CHIAKI_LOGI(&session->log, "Starting Senkusha\n"); - err = chiaki_senkusha_run(session); + /* TODO err = chiaki_senkusha_run(session); if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(&session->log, "Senkusha failed\n"); goto quit_ctrl; - } + }*/ // TODO: Senkusha should set that session->mtu = 1454; diff --git a/lib/src/streamconnection.c b/lib/src/streamconnection.c index 72f6655..1643bbb 100644 --- a/lib/src/streamconnection.c +++ b/lib/src/streamconnection.c @@ -45,6 +45,7 @@ typedef enum { STATE_IDLE, + STATE_TAKION_CONNECT, STATE_EXPECT_BANG, STATE_EXPECT_STREAMINFO } StreamConnectionState; @@ -128,7 +129,12 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnectio takion_info.cb = stream_connection_takion_cb; takion_info.cb_user = stream_connection; - // TODO: make this call stoppable + err = chiaki_mutex_lock(&stream_connection->state_mutex); + assert(err == CHIAKI_ERR_SUCCESS); + + stream_connection->state = STATE_TAKION_CONNECT; + stream_connection->state_finished = false; + stream_connection->state_failed = false; err = chiaki_takion_connect(&stream_connection->takion, &takion_info); free(takion_info.sa); if(err != CHIAKI_ERR_SUCCESS) @@ -137,8 +143,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnectio return err; } - err = chiaki_mutex_lock(&stream_connection->state_mutex); - assert(err == CHIAKI_ERR_SUCCESS); + err = chiaki_cond_wait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, state_finished_cond_check, stream_connection); + assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(&session->log, "StreamConnection Takion connect failed\n"); + goto close_takion; + } CHIAKI_LOGI(&session->log, "StreamConnection sending big\n"); @@ -233,6 +244,7 @@ disconnect: chiaki_mutex_unlock(&stream_connection->state_mutex); +close_takion: chiaki_takion_close(&stream_connection->takion); CHIAKI_LOGI(&session->log, "StreamConnection closed takion\n"); @@ -255,6 +267,17 @@ static void stream_connection_takion_cb(ChiakiTakionEvent *event, void *user) ChiakiStreamConnection *stream_connection = user; switch(event->type) { + case CHIAKI_TAKION_EVENT_TYPE_CONNECTED: + case CHIAKI_TAKION_EVENT_TYPE_DISCONNECT: + chiaki_mutex_lock(&stream_connection->state_mutex); + if(stream_connection->state == STATE_TAKION_CONNECT) + { + stream_connection->state_finished = event->type == CHIAKI_TAKION_EVENT_TYPE_CONNECTED; + stream_connection->state_failed = event->type == CHIAKI_TAKION_EVENT_TYPE_DISCONNECT; + chiaki_cond_signal(&stream_connection->state_cond); + } + chiaki_mutex_unlock(&stream_connection->state_mutex); + break; case CHIAKI_TAKION_EVENT_TYPE_DATA: stream_connection_takion_data(stream_connection, event->data.data_type, event->data.buf, event->data.buf_size); break; diff --git a/lib/src/takion.c b/lib/src/takion.c index d9d22e6..f1056fd 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -175,7 +175,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(takion->log, "Takion failed to create stop pipe\n"); - return CHIAKI_ERR_UNKNOWN; + return err; } takion->sock = socket(info->sa->sa_family, SOCK_DGRAM, IPPROTO_UDP); @@ -194,90 +194,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki goto error_sock; } - - // INIT -> - - TakionMessagePayloadInit init_payload; - init_payload.tag0 = takion->tag_local; - init_payload.something = TAKION_LOCAL_SOMETHING; - init_payload.min = TAKION_LOCAL_MIN; - init_payload.max = TAKION_LOCAL_MIN; - init_payload.tag1 = takion->tag_local; - err = takion_send_message_init(takion, &init_payload); - if(err != CHIAKI_ERR_SUCCESS) - { - CHIAKI_LOGE(takion->log, "Takion failed to send init\n"); - ret = err; - goto error_sock; - } - - CHIAKI_LOGI(takion->log, "Takion sent init\n"); - - - // INIT_ACK <- - - TakionMessagePayloadInitAck init_ack_payload; - err = takion_recv_message_init_ack(takion, &init_ack_payload); - if(err != CHIAKI_ERR_SUCCESS) - { - CHIAKI_LOGE(takion->log, "Takion failed to receive init ack\n"); - ret = CHIAKI_ERR_UNKNOWN; - goto error_sock; - } - - if(init_ack_payload.tag == 0) - { - CHIAKI_LOGE(takion->log, "Takion remote tag in init ack is 0\n"); - ret = CHIAKI_ERR_INVALID_RESPONSE; - goto error_sock; - } - - CHIAKI_LOGI(takion->log, "Takion received init ack with remote tag %#x, min: %#x, max: %#x\n", - init_ack_payload.tag, init_ack_payload.min, init_ack_payload.max); - - takion->tag_remote = init_ack_payload.tag; - - if(init_ack_payload.min == 0 || init_ack_payload.max == 0 - || init_ack_payload.min > TAKION_LOCAL_MAX - || init_ack_payload.max < TAKION_LOCAL_MAX) - { - CHIAKI_LOGE(takion->log, "Takion min/max check failed\n"); - ret = CHIAKI_ERR_INVALID_RESPONSE; - goto error_sock; - } - - - - // COOKIE -> - - err = takion_send_message_cookie(takion, init_ack_payload.cookie); - if(err != CHIAKI_ERR_SUCCESS) - { - CHIAKI_LOGE(takion->log, "Takion failed to send cookie\n"); - ret = err; - goto error_sock; - } - - CHIAKI_LOGI(takion->log, "Takion sent cookie\n"); - - - // COOKIE_ACK <- - - err = takion_recv_message_cookie_ack(takion); - if(err != CHIAKI_ERR_SUCCESS) - { - CHIAKI_LOGE(takion->log, "Takion failed to receive cookie ack\n"); - ret = err; - goto error_sock; - } - - CHIAKI_LOGI(takion->log, "Takion received cookie ack\n"); - - - // done! - - CHIAKI_LOGI(takion->log, "Takion connected\n"); - err = chiaki_thread_create(&takion->thread, takion_thread_func, takion); if(r != CHIAKI_ERR_SUCCESS) { @@ -465,10 +381,104 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_congestion(ChiakiTakion *takion return chiaki_takion_send_raw(takion, buf, sizeof(buf)); } +static ChiakiErrorCode takion_handshake(ChiakiTakion *takion) +{ + ChiakiErrorCode err; + + // INIT -> + + TakionMessagePayloadInit init_payload; + init_payload.tag0 = takion->tag_local; + init_payload.something = TAKION_LOCAL_SOMETHING; + init_payload.min = TAKION_LOCAL_MIN; + init_payload.max = TAKION_LOCAL_MIN; + init_payload.tag1 = takion->tag_local; + err = takion_send_message_init(takion, &init_payload); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(takion->log, "Takion failed to send init\n"); + return err; + } + + CHIAKI_LOGI(takion->log, "Takion sent init\n"); + + + // INIT_ACK <- + + TakionMessagePayloadInitAck init_ack_payload; + err = takion_recv_message_init_ack(takion, &init_ack_payload); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(takion->log, "Takion failed to receive init ack\n"); + return err; + } + + if(init_ack_payload.tag == 0) + { + CHIAKI_LOGE(takion->log, "Takion remote tag in init ack is 0\n"); + return CHIAKI_ERR_INVALID_RESPONSE; + } + + CHIAKI_LOGI(takion->log, "Takion received init ack with remote tag %#x, min: %#x, max: %#x\n", + init_ack_payload.tag, init_ack_payload.min, init_ack_payload.max); + + takion->tag_remote = init_ack_payload.tag; + + if(init_ack_payload.min == 0 || init_ack_payload.max == 0 + || init_ack_payload.min > TAKION_LOCAL_MAX + || init_ack_payload.max < TAKION_LOCAL_MAX) + { + CHIAKI_LOGE(takion->log, "Takion min/max check failed\n"); + return CHIAKI_ERR_INVALID_RESPONSE; + } + + + + // COOKIE -> + + err = takion_send_message_cookie(takion, init_ack_payload.cookie); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(takion->log, "Takion failed to send cookie\n"); + return err; + } + + CHIAKI_LOGI(takion->log, "Takion sent cookie\n"); + + + // COOKIE_ACK <- + + err = takion_recv_message_cookie_ack(takion); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(takion->log, "Takion failed to receive cookie ack\n"); + return err; + } + + CHIAKI_LOGI(takion->log, "Takion received cookie ack\n"); + + + // done! + + CHIAKI_LOGI(takion->log, "Takion connected\n"); + + if(takion->cb) + { + ChiakiTakionEvent event = { 0 }; + event.type = CHIAKI_TAKION_EVENT_TYPE_CONNECTED; + takion->cb(&event, takion->cb_user); + } + + return CHIAKI_ERR_SUCCESS; +} + static void *takion_thread_func(void *user) { ChiakiTakion *takion = user; + if(takion_handshake(takion) != CHIAKI_ERR_SUCCESS) + goto beach; + // TODO ChiakiCongestionControl congestion_control; // if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS) // goto beach; @@ -486,6 +496,12 @@ static void *takion_thread_func(void *user) // chiaki_congestion_control_stop(&congestion_control); beach: + if(takion->cb) + { + ChiakiTakionEvent event = { 0 }; + event.type = CHIAKI_TAKION_EVENT_TYPE_DISCONNECT; + takion->cb(&event, takion->cb_user); + } close(takion->sock); return NULL; }