diff --git a/lib/include/chiaki/congestioncontrol.h b/lib/include/chiaki/congestioncontrol.h index 39c2b85..424396a 100644 --- a/lib/include/chiaki/congestioncontrol.h +++ b/lib/include/chiaki/congestioncontrol.h @@ -33,6 +33,10 @@ typedef struct chiaki_congestion_control_t } ChiakiCongestionControl; CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion); + +/** + * Stop control and join the thread + */ CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_stop(ChiakiCongestionControl *control); #ifdef __cplusplus diff --git a/lib/include/chiaki/streamconnection.h b/lib/include/chiaki/streamconnection.h index cf86a50..6f630a6 100644 --- a/lib/include/chiaki/streamconnection.h +++ b/lib/include/chiaki/streamconnection.h @@ -30,19 +30,45 @@ extern "C" { #endif +typedef struct chiaki_session_t ChiakiSession; + typedef struct chiaki_stream_connection_t { struct chiaki_session_t *session; ChiakiLog *log; ChiakiTakion takion; - ChiakiMirai mirai; uint8_t *ecdh_secret; ChiakiGKCrypt *gkcrypt_local; ChiakiGKCrypt *gkcrypt_remote; - ChiakiBoolPredCond stop_cond; + + /** + * signaled on change of state_finished or should_stop + */ + ChiakiCond state_cond; + + /** + * protects state, state_finished, state_failed and should_stop + */ + ChiakiMutex state_mutex; + + int state; + bool state_finished; + bool state_failed; + bool should_stop; } ChiakiStreamConnection; -CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(struct chiaki_session_t *session); +CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_init(ChiakiStreamConnection *stream_connection, ChiakiSession *session); +CHIAKI_EXPORT void chiaki_stream_connection_fini(ChiakiStreamConnection *stream_connection); + +/** + * Run stream_connection synchronously + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnection *stream_connection); + +/** + * To be called from a thread other than the one chiaki_stream_connection_run() is running on to stop stream_connection + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_stop(ChiakiStreamConnection *stream_connection); #ifdef __cplusplus } diff --git a/lib/src/session.c b/lib/src/session.c index 54855a0..51cb4ad 100644 --- a/lib/src/session.c +++ b/lib/src/session.c @@ -199,14 +199,24 @@ static void *session_thread_func(void *arg) goto quit_audio_receiver; } - err = chiaki_stream_connection_run(session); + err = chiaki_stream_connection_init(&session->stream_connection, session); if(err != CHIAKI_ERR_SUCCESS) { - CHIAKI_LOGE(&session->log, "Nagare failed\n"); + CHIAKI_LOGE(&session->log, "StreamConnection init failed\n"); goto quit_video_receiver; } - CHIAKI_LOGI(&session->log, "Nagare completed successfully\n"); + err = chiaki_stream_connection_run(&session->stream_connection); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(&session->log, "StreamConnection run failed\n"); + goto quit_stream_connection; + } + + CHIAKI_LOGI(&session->log, "StreamConnection completed successfully\n"); + +quit_stream_connection: + chiaki_stream_connection_fini(&session->stream_connection); quit_video_receiver: chiaki_video_receiver_free(session->video_receiver); diff --git a/lib/src/streamconnection.c b/lib/src/streamconnection.c index ab13b15..0918ea2 100644 --- a/lib/src/streamconnection.c +++ b/lib/src/streamconnection.c @@ -44,53 +44,82 @@ typedef enum { - STREAM_CONNECTION_MIRAI_REQUEST_BANG = 0, - STREAM_CONNECTION_MIRAI_REQUEST_STREAMINFO, -} StreamConnectionMiraiRequest; - -typedef enum { - STREAM_CONNECTION_MIRAI_RESPONSE_FAIL = 0, - STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS = 1 -} StreamConnectionMiraiResponse; - + STATE_IDLE, + STATE_EXPECT_BANG, + STATE_EXPECT_STREAMINFO +} StreamConnectionState; static void stream_connection_takion_data(ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size, void *user); static ChiakiErrorCode stream_connection_send_big(ChiakiStreamConnection *stream_connection); static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection *stream_connection); +static void stream_connection_takion_data_idle(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size); static void stream_connection_takion_data_expect_bang(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size); static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size); static ChiakiErrorCode stream_connection_send_streaminfo_ack(ChiakiStreamConnection *stream_connection); static void stream_connection_takion_av(ChiakiTakionAVPacket *packet, void *user); -static ChiakiErrorCode stream_connection_takion_mac(uint8_t *buf, size_t buf_size, size_t key_pos, uint8_t *mac_out, void *user); static ChiakiErrorCode stream_connection_send_heartbeat(ChiakiStreamConnection *stream_connection); -CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *session) +CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_init(ChiakiStreamConnection *stream_connection, ChiakiSession *session) { - ChiakiStreamConnection *stream_connection = &session->stream_connection; stream_connection->session = session; stream_connection->log = &session->log; stream_connection->ecdh_secret = NULL; + stream_connection->gkcrypt_remote = NULL; + stream_connection->gkcrypt_local = NULL; - ChiakiErrorCode err = chiaki_bool_pred_cond_init(&stream_connection->stop_cond); + ChiakiErrorCode err = chiaki_mutex_init(&stream_connection->state_mutex); if(err != CHIAKI_ERR_SUCCESS) goto error; - err = chiaki_mirai_init(&stream_connection->mirai); + err = chiaki_cond_init(&stream_connection->state_cond); if(err != CHIAKI_ERR_SUCCESS) - goto error_stop_cond; + goto error_state_mutex; + + stream_connection->state = STATE_IDLE; + stream_connection->state_finished = false; + stream_connection->state_failed = false; + stream_connection->should_stop = false; + + return CHIAKI_ERR_SUCCESS; + +error_state_mutex: + chiaki_mutex_fini(&stream_connection->state_mutex); +error: + return err; +} + +CHIAKI_EXPORT void chiaki_stream_connection_fini(ChiakiStreamConnection *stream_connection) +{ + chiaki_gkcrypt_free(stream_connection->gkcrypt_remote); + chiaki_gkcrypt_free(stream_connection->gkcrypt_local); + + free(stream_connection->ecdh_secret); + + chiaki_cond_fini(&stream_connection->state_cond); + chiaki_mutex_fini(&stream_connection->state_mutex); +} + + +bool state_finished_cond_check(void *user) +{ + ChiakiStreamConnection *stream_connection = user; + return stream_connection->state_finished || stream_connection->should_stop; +} + +CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnection *stream_connection) +{ + ChiakiSession *session = stream_connection->session; + ChiakiErrorCode err; ChiakiTakionConnectInfo takion_info; takion_info.log = stream_connection->log; takion_info.sa_len = session->connect_info.host_addrinfo_selected->ai_addrlen; takion_info.sa = malloc(takion_info.sa_len); if(!takion_info.sa) - { - err = CHIAKI_ERR_MEMORY; - goto error_mirai; - } + return CHIAKI_ERR_MEMORY; memcpy(takion_info.sa, session->connect_info.host_addrinfo_selected->ai_addr, takion_info.sa_len); err = set_port(takion_info.sa, htons(STREAM_CONNECTION_PORT)); assert(err == CHIAKI_ERR_SUCCESS); @@ -100,39 +129,41 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio takion_info.av_cb = stream_connection_takion_av; takion_info.av_cb_user = stream_connection; + // TODO: make this call stoppable err = chiaki_takion_connect(&stream_connection->takion, &takion_info); free(takion_info.sa); if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(&session->log, "StreamConnection connect failed\n"); - goto error_mirai; + return err; } + err = chiaki_mutex_lock(&stream_connection->state_mutex); + assert(err == CHIAKI_ERR_SUCCESS); + CHIAKI_LOGI(&session->log, "StreamConnection sending big\n"); - err = chiaki_mirai_request_begin(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_REQUEST_BANG, true); - assert(err == CHIAKI_ERR_SUCCESS); - + stream_connection->state = STATE_EXPECT_BANG; + stream_connection->state_finished = false; + stream_connection->state_failed = false; err = stream_connection_send_big(stream_connection); if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(&session->log, "StreamConnection failed to send big\n"); - goto error_takion; + goto disconnect; } - err = chiaki_mirai_request_wait(&stream_connection->mirai, EXPECT_TIMEOUT_MS, true); + err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, stream_connection); assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT); - if(stream_connection->mirai.response != STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS) + if(!stream_connection->state_finished) { if(err == CHIAKI_ERR_TIMEOUT) CHIAKI_LOGE(&session->log, "StreamConnection bang receive timeout\n"); - chiaki_mirai_request_unlock(&stream_connection->mirai); - CHIAKI_LOGE(&session->log, "StreamConnection didn't receive bang\n"); err = CHIAKI_ERR_UNKNOWN; - goto error_takion; + goto disconnect; } CHIAKI_LOGI(&session->log, "StreamConnection successfully received bang\n"); @@ -142,13 +173,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio if(!stream_connection->gkcrypt_local) { CHIAKI_LOGE(&session->log, "StreamConnection failed to initialize GKCrypt with index 2\n"); - goto error_takion; + goto disconnect; } stream_connection->gkcrypt_remote = chiaki_gkcrypt_new(&session->log, 0 /* TODO */, 3, session->handshake_key, stream_connection->ecdh_secret); if(!stream_connection->gkcrypt_remote) { CHIAKI_LOGE(&session->log, "StreamConnection failed to initialize GKCrypt with index 3\n"); - goto error_gkcrypt_a; + goto disconnect; } // TODO: IMPORTANT!!! @@ -158,31 +189,30 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio // Also, access to the gkcrypts must be synchronized for key_pos and everything. chiaki_takion_set_crypt(&stream_connection->takion, stream_connection->gkcrypt_local, stream_connection->gkcrypt_remote); - err = chiaki_mirai_request_begin(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_REQUEST_STREAMINFO, false); - assert(err == CHIAKI_ERR_SUCCESS); - err = chiaki_mirai_request_wait(&stream_connection->mirai, EXPECT_TIMEOUT_MS, false); + stream_connection->state = STATE_EXPECT_STREAMINFO; + stream_connection->state_finished = false; + stream_connection->state_failed = false; + err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, stream_connection); assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT); - if(stream_connection->mirai.response != STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS) + if(!stream_connection->state_finished) { if(err == CHIAKI_ERR_TIMEOUT) CHIAKI_LOGE(&session->log, "StreamConnection streaminfo receive timeout\n"); - chiaki_mirai_request_unlock(&stream_connection->mirai); - CHIAKI_LOGE(&session->log, "StreamConnection didn't receive streaminfo\n"); err = CHIAKI_ERR_UNKNOWN; - goto error_takion; + goto disconnect; } CHIAKI_LOGI(&session->log, "StreamConnection successfully received streaminfo\n"); - err = chiaki_bool_pred_cond_lock(&stream_connection->stop_cond); - assert(err == CHIAKI_ERR_SUCCESS); - + stream_connection->state = STATE_IDLE; + stream_connection->state_finished = false; + stream_connection->state_failed = false; while(true) { - err = chiaki_bool_pred_cond_timedwait(&stream_connection->stop_cond, HEARTBEAT_INTERVAL_MS); + err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, HEARTBEAT_INTERVAL_MS, state_finished_cond_check, stream_connection); if(err != CHIAKI_ERR_TIMEOUT) break; @@ -193,33 +223,33 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio CHIAKI_LOGI(stream_connection->log, "StreamConnection sent heartbeat\n"); } - err = chiaki_bool_pred_cond_unlock(&stream_connection->stop_cond); - assert(err == CHIAKI_ERR_SUCCESS); - - CHIAKI_LOGI(&session->log, "StreamConnection is disconnecting\n"); - - stream_connection_send_disconnect(stream_connection); - err = CHIAKI_ERR_SUCCESS; - // TODO: can't roll everything back like this, takion has to be closed first always. - chiaki_gkcrypt_free(stream_connection->gkcrypt_remote); -error_gkcrypt_a: - chiaki_gkcrypt_free(stream_connection->gkcrypt_local); -error_takion: +disconnect: + CHIAKI_LOGI(&session->log, "StreamConnection is disconnecting\n"); + stream_connection_send_disconnect(stream_connection); + + if(stream_connection->should_stop) + CHIAKI_LOGI(stream_connection->log, "StreamConnection was requested to stop\n"); + + chiaki_mutex_unlock(&stream_connection->state_mutex); + chiaki_takion_close(&stream_connection->takion); CHIAKI_LOGI(&session->log, "StreamConnection closed takion\n"); -error_mirai: - chiaki_mirai_fini(&stream_connection->mirai); -error_stop_cond: - chiaki_bool_pred_cond_fini(&stream_connection->stop_cond); -error: - free(stream_connection->ecdh_secret); + return err; - - } +CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_stop(ChiakiStreamConnection *stream_connection) +{ + ChiakiErrorCode err = chiaki_mutex_lock(&stream_connection->state_mutex); + if(err != CHIAKI_ERR_SUCCESS) + return err; + stream_connection->should_stop = true; + ChiakiErrorCode unlock_err = chiaki_mutex_unlock(&stream_connection->state_mutex); + err = chiaki_cond_signal(&stream_connection->state_cond); + return err == CHIAKI_ERR_SUCCESS ? unlock_err : err; +} static void stream_connection_takion_data(ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size, void *user) @@ -229,18 +259,25 @@ static void stream_connection_takion_data(ChiakiTakionMessageDataType data_type, ChiakiStreamConnection *stream_connection = user; - switch(stream_connection->mirai.request) + chiaki_mutex_lock(&stream_connection->state_mutex); + switch(stream_connection->state) { - case STREAM_CONNECTION_MIRAI_REQUEST_BANG: + case STATE_EXPECT_BANG: stream_connection_takion_data_expect_bang(stream_connection, buf, buf_size); - return; - case STREAM_CONNECTION_MIRAI_REQUEST_STREAMINFO: + break; + case STATE_EXPECT_STREAMINFO: stream_connection_takion_data_expect_streaminfo(stream_connection, buf, buf_size); - return; - default: + break; + default: // STATE_IDLE + stream_connection_takion_data_idle(stream_connection, buf, buf_size); break; } + chiaki_mutex_unlock(&stream_connection->state_mutex); +} + +static void stream_connection_takion_data_idle(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size) +{ tkproto_TakionMessage msg; memset(&msg, 0, sizeof(msg)); @@ -286,6 +323,8 @@ static void stream_connection_takion_data_expect_bang(ChiakiStreamConnection *st return; } + CHIAKI_LOGD(stream_connection->log, "Got a bang\n"); + if(!msg.bang_payload.version_accepted) { CHIAKI_LOGE(stream_connection->log, "StreamConnection bang remote didn't accept version\n"); @@ -332,10 +371,13 @@ static void stream_connection_takion_data_expect_bang(ChiakiStreamConnection *st goto error; } - chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS); + // stream_connection->state_mutex is expected to be locked by the caller of this function + stream_connection->state_finished = true; + chiaki_cond_signal(&stream_connection->state_cond); return; error: - chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_FAIL); + stream_connection->state_failed = true; + chiaki_cond_signal(&stream_connection->state_cond); } typedef struct decode_resolutions_context_t @@ -410,7 +452,7 @@ static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnecti if(audio_header_buf.size != CHIAKI_AUDIO_HEADER_SIZE) { - CHIAKI_LOGE(stream_connection->log, "StreamConnection receoved invalid audio header in streaminfo\n"); + CHIAKI_LOGE(stream_connection->log, "StreamConnection received invalid audio header in streaminfo\n"); goto error; } @@ -426,10 +468,13 @@ static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnecti stream_connection_send_streaminfo_ack(stream_connection); - chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS); + // stream_connection->state_mutex is expected to be locked by the caller of this function + stream_connection->state_finished = true; + chiaki_cond_signal(&stream_connection->state_cond); return; error: - chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_FAIL); + stream_connection->state_failed = true; + chiaki_cond_signal(&stream_connection->state_cond); } diff --git a/lib/src/takion.c b/lib/src/takion.c index 9d5c8b7..96882c8 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -471,9 +471,9 @@ static void *takion_thread_func(void *user) { ChiakiTakion *takion = user; - ChiakiCongestionControl congestion_control; - if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS) - goto beach; + // TODO ChiakiCongestionControl congestion_control; + // if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS) + // goto beach; while(true) { @@ -485,7 +485,7 @@ static void *takion_thread_func(void *user) takion_handle_packet(takion, buf, received_size); } - chiaki_congestion_control_stop(&congestion_control); + // chiaki_congestion_control_stop(&congestion_control); beach: close(takion->sock); @@ -496,7 +496,7 @@ beach: static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout) { ChiakiErrorCode err = chiaki_stop_pipe_select_single(&takion->stop_pipe, takion->sock, timeout); - if(err == CHIAKI_ERR_TIMEOUT) + if(err == CHIAKI_ERR_TIMEOUT || err == CHIAKI_ERR_CANCELED) return err; if(err != CHIAKI_ERR_SUCCESS) {