From 8f42cff06e455ac2ac21f4160c81efe1c425630d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=A4rkl?= Date: Sat, 24 Nov 2018 15:26:46 +0100 Subject: [PATCH] Senkusha finished excluding MTU and RTT tests --- lib/include/chiaki/thread.h | 2 + lib/src/senkusha.c | 115 ++++++++++++++++++++++++++++++++---- lib/src/session.c | 2 +- lib/src/takion.c | 20 +++---- lib/src/thread.c | 37 +++++++++++- 5 files changed, 153 insertions(+), 23 deletions(-) diff --git a/lib/include/chiaki/thread.h b/lib/include/chiaki/thread.h index d2a104e..5b6421b 100644 --- a/lib/include/chiaki/thread.h +++ b/lib/include/chiaki/thread.h @@ -24,6 +24,7 @@ extern "C" { #endif +#include #include typedef struct chiaki_thread_t @@ -57,6 +58,7 @@ typedef struct chiaki_cond_t CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_init(ChiakiCond *cond); CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_fini(ChiakiCond *cond); CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_wait(ChiakiCond *cond, ChiakiMutex *mutex); +CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_timedwait(ChiakiCond *cond, ChiakiMutex *mutex, uint64_t timeout_ms); CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_signal(ChiakiCond *cond); CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_broadcast(ChiakiCond *cond); diff --git a/lib/src/senkusha.c b/lib/src/senkusha.c index 8cb7910..3d5f8ea 100644 --- a/lib/src/senkusha.c +++ b/lib/src/senkusha.c @@ -29,26 +29,43 @@ #include #include +#include #include #define SENKUSHA_SOCKET 9297 +#define BIG_TIMEOUT_MS 5000 + typedef struct senkusha_t { ChiakiLog *log; ChiakiTakion takion; + + bool bang_expected; + bool bang_received; + ChiakiMutex bang_mutex; + ChiakiCond bang_cond; } Senkusha; static void senkusha_takion_data(uint8_t *buf, size_t buf_size, void *user); static ChiakiErrorCode senkusha_send_big(Senkusha *senkusha); +static ChiakiErrorCode senkusha_send_disconnect(Senkusha *senkusha); CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSession *session) { Senkusha senkusha; senkusha.log = &session->log; + senkusha.bang_expected = false; + senkusha.bang_received = false; + ChiakiErrorCode err = chiaki_mutex_init(&senkusha.bang_mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + err = chiaki_cond_init(&senkusha.bang_cond); + if(err != CHIAKI_ERR_SUCCESS) + goto error_bang_mutex; ChiakiTakionConnectInfo takion_info; takion_info.log = senkusha.log; @@ -57,7 +74,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSession *session) if(!takion_info.sa) return CHIAKI_ERR_MEMORY; memcpy(takion_info.sa, session->connect_info.host_addrinfo_selected->ai_addr, takion_info.sa_len); - ChiakiErrorCode err = set_port(takion_info.sa, htons(SENKUSHA_SOCKET)); + err = set_port(takion_info.sa, htons(SENKUSHA_SOCKET)); assert(err == CHIAKI_ERR_SUCCESS); takion_info.data_cb = senkusha_takion_data; @@ -68,38 +85,87 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSession *session) if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(&session->log, "Senkusha connect failed\n"); - return err; + goto error_bang_cond; } CHIAKI_LOGI(&session->log, "Senkusha sending big\n"); + err = chiaki_mutex_lock(&senkusha.bang_mutex); + assert(err == CHIAKI_ERR_SUCCESS); + + senkusha.bang_expected = true; + err = senkusha_send_big(&senkusha); if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(&session->log, "Senkusha failed to send big\n"); - return err; - // TODO: close takion + goto error_takion; } - while(true) - sleep(1); + err = chiaki_cond_timedwait(&senkusha.bang_cond, &senkusha.bang_mutex, BIG_TIMEOUT_MS); + senkusha.bang_expected = false; + chiaki_mutex_unlock(&senkusha.bang_mutex); - return CHIAKI_ERR_SUCCESS; + if(!senkusha.bang_received) + { + if(err == CHIAKI_ERR_TIMEOUT) + CHIAKI_LOGE(&session->log, "Senkusha bang receive timeout\n"); + + CHIAKI_LOGE(&session->log, "Senkusha didn't receive bang\n"); + err = CHIAKI_ERR_UNKNOWN; + goto error_takion; + } + + CHIAKI_LOGI(&session->log, "Senkusha successfully received bang\n"); + + CHIAKI_LOGI(&session->log, "Senkusha is disconnecting\n"); + + senkusha_send_disconnect(&senkusha); + + err = CHIAKI_ERR_SUCCESS; +error_takion: + chiaki_takion_close(&senkusha.takion); + CHIAKI_LOGI(&session->log, "Senkusha closed takion\n"); +error_bang_cond: + chiaki_cond_init(&senkusha.bang_cond); +error_bang_mutex: + chiaki_mutex_fini(&senkusha.bang_mutex); + return err; } static void senkusha_takion_data(uint8_t *buf, size_t buf_size, void *user) { Senkusha *senkusha = user; - CHIAKI_LOGD(senkusha->log, "Senkusha received data:\n"); - chiaki_log_hexdump(senkusha->log, CHIAKI_LOG_DEBUG, buf, buf_size); + tkproto_TakionMessage msg; + memset(&msg, 0, sizeof(msg)); + + pb_istream_t stream = pb_istream_from_buffer(buf, buf_size); + bool r = pb_decode(&stream, tkproto_TakionMessage_fields, &msg); + if(!r) + { + CHIAKI_LOGE(senkusha->log, "Senkusha failed to decode data protobuf\n"); + return; + } + + if(senkusha->bang_expected) + { + if(msg.type != tkproto_TakionMessage_PayloadType_BANG || !msg.has_bang_payload) + { + CHIAKI_LOGE(senkusha->log, "Senkusha expected bang payload but received something else\n"); + return; + } + chiaki_mutex_lock(&senkusha->bang_mutex); + senkusha->bang_received = true; + chiaki_cond_signal(&senkusha->bang_cond); + chiaki_mutex_unlock(&senkusha->bang_mutex); + } } static ChiakiErrorCode senkusha_send_big(Senkusha *senkusha) { tkproto_TakionMessage msg; memset(&msg, 0, sizeof(msg)); - CHIAKI_LOGD(senkusha->log, "sizeof(tkproto_TakionMessage) = %lu\n", sizeof(tkproto_TakionMessage)); msg.type = tkproto_TakionMessage_PayloadType_BIG; msg.has_big_payload = true; @@ -111,7 +177,7 @@ static ChiakiErrorCode senkusha_send_big(Senkusha *senkusha) msg.big_payload.encrypted_key.arg = ""; msg.big_payload.encrypted_key.funcs.encode = chiaki_pb_encode_string; - uint8_t buf[512]; + uint8_t buf[12]; size_t buf_size; pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf)); @@ -128,4 +194,31 @@ static ChiakiErrorCode senkusha_send_big(Senkusha *senkusha) return err; } +static ChiakiErrorCode senkusha_send_disconnect(Senkusha *senkusha) +{ + tkproto_TakionMessage msg; + memset(&msg, 0, sizeof(msg)); + + msg.type = tkproto_TakionMessage_PayloadType_DISCONNECT; + msg.has_disconnect_payload = true; + msg.disconnect_payload.reason.arg = "Client Disconnecting"; + msg.disconnect_payload.reason.funcs.encode = chiaki_pb_encode_string; + + uint8_t buf[26]; + size_t buf_size; + + pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf)); + bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg); + if(!pbr) + { + CHIAKI_LOGE(senkusha->log, "Senkusha disconnect protobuf encoding failed\n"); + return CHIAKI_ERR_UNKNOWN; + } + + buf_size = stream.bytes_written; + ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 0, 1, 1, buf, buf_size); + + return err; +} + diff --git a/lib/src/session.c b/lib/src/session.c index fe27f00..0e86651 100644 --- a/lib/src/session.c +++ b/lib/src/session.c @@ -164,7 +164,7 @@ static void *session_thread_func(void *arg) } else { - CHIAKI_LOGE(&session->log, "Senkusha completed successfully\n"); + CHIAKI_LOGI(&session->log, "Senkusha completed successfully\n"); } quit_ctrl: diff --git a/lib/src/takion.c b/lib/src/takion.c index 56571ea..795b293 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -87,6 +87,8 @@ typedef struct takion_message_payload_init_ack_t static void *takion_thread_func(void *user); static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_size); static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size); +static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size); +static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size); static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg); static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t type_a, uint8_t type_b, size_t payload_data_size); static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload); @@ -247,9 +249,9 @@ error_pipe: CHIAKI_EXPORT void chiaki_takion_close(ChiakiTakion *takion) { - write(takion->stop_pipe[0], "\x00", 1); + write(takion->stop_pipe[1], "\x00", 1); chiaki_thread_join(&takion->thread, NULL); - close(takion->stop_pipe[0]); + close(takion->stop_pipe[1]); } CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, uint8_t *buf, size_t buf_size) @@ -319,7 +321,7 @@ static void *takion_thread_func(void *user) } close(takion->sock); - close(takion->stop_pipe[1]); + close(takion->stop_pipe[0]); return NULL; } @@ -329,20 +331,20 @@ static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *b fd_set fds; FD_ZERO(&fds); FD_SET(takion->sock, &fds); - FD_SET(takion->stop_pipe[1], &fds); + FD_SET(takion->stop_pipe[0], &fds); int nfds = takion->sock; - if(takion->stop_pipe[1] > nfds) - nfds = takion->stop_pipe[1]; + if(takion->stop_pipe[0] > nfds) + nfds = takion->stop_pipe[0]; nfds++; - int r = select(nfds, &fds, NULL, NULL, NULL); + int r = select(nfds, &fds, NULL, NULL, timeout); if(r < 0) { CHIAKI_LOGE(takion->log, "Takion select failed: %s\n", strerror(errno)); return CHIAKI_ERR_UNKNOWN; } - if(FD_ISSET(takion->stop_pipe[1], &fds)) + if(FD_ISSET(takion->stop_pipe[0], &fds)) return CHIAKI_ERR_CANCELED; if(FD_ISSET(takion->sock, &fds)) @@ -384,8 +386,6 @@ static void takion_handle_packet(ChiakiTakion *takion, uint8_t *buf, size_t buf_ } } -static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size); -static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size); static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size) { diff --git a/lib/src/thread.c b/lib/src/thread.c index c066738..4ab573c 100644 --- a/lib/src/thread.c +++ b/lib/src/thread.c @@ -86,9 +86,23 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_mutex_unlock(ChiakiMutex *mutex) CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_init(ChiakiCond *cond) { - int r = pthread_cond_init(&cond->cond, NULL); + pthread_condattr_t attr; + int r = pthread_condattr_init(&attr); if(r != 0) return CHIAKI_ERR_UNKNOWN; + r = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + if(r != 0) + { + pthread_condattr_destroy(&attr); + return CHIAKI_ERR_UNKNOWN; + } + r = pthread_cond_init(&cond->cond, &attr); + if(r != 0) + { + pthread_condattr_destroy(&attr); + return CHIAKI_ERR_UNKNOWN; + } + pthread_condattr_destroy(&attr); return CHIAKI_ERR_SUCCESS; } @@ -110,6 +124,27 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_wait(ChiakiCond *cond, ChiakiMutex *mu return CHIAKI_ERR_SUCCESS; } +CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_timedwait(ChiakiCond *cond, ChiakiMutex *mutex, uint64_t timeout_ms) +{ + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += timeout_ms / 1000; + timeout.tv_nsec += (timeout_ms % 1000) * 1000000; + if(timeout.tv_nsec > 1000000000) + { + timeout.tv_sec += timeout.tv_nsec / 1000000000; + timeout.tv_nsec %= 1000000000; + } + int r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &timeout); + if(r != 0) + { + if(r == ETIMEDOUT) + return CHIAKI_ERR_TIMEOUT; + return CHIAKI_ERR_UNKNOWN; + } + return CHIAKI_ERR_SUCCESS; +} + CHIAKI_EXPORT ChiakiErrorCode chiaki_cond_signal(ChiakiCond *cond) { int r = pthread_cond_signal(&cond->cond);