Refactor StreamConnection Synchronization

This commit is contained in:
Florian Märkl 2019-06-25 15:08:58 +02:00
commit 044159e83d
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
5 changed files with 169 additions and 84 deletions

View file

@ -33,6 +33,10 @@ typedef struct chiaki_congestion_control_t
} ChiakiCongestionControl;
CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion);
/**
* Stop control and join the thread
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_stop(ChiakiCongestionControl *control);
#ifdef __cplusplus

View file

@ -30,19 +30,45 @@
extern "C" {
#endif
typedef struct chiaki_session_t ChiakiSession;
typedef struct chiaki_stream_connection_t
{
struct chiaki_session_t *session;
ChiakiLog *log;
ChiakiTakion takion;
ChiakiMirai mirai;
uint8_t *ecdh_secret;
ChiakiGKCrypt *gkcrypt_local;
ChiakiGKCrypt *gkcrypt_remote;
ChiakiBoolPredCond stop_cond;
/**
* signaled on change of state_finished or should_stop
*/
ChiakiCond state_cond;
/**
* protects state, state_finished, state_failed and should_stop
*/
ChiakiMutex state_mutex;
int state;
bool state_finished;
bool state_failed;
bool should_stop;
} ChiakiStreamConnection;
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(struct chiaki_session_t *session);
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_init(ChiakiStreamConnection *stream_connection, ChiakiSession *session);
CHIAKI_EXPORT void chiaki_stream_connection_fini(ChiakiStreamConnection *stream_connection);
/**
* Run stream_connection synchronously
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnection *stream_connection);
/**
* To be called from a thread other than the one chiaki_stream_connection_run() is running on to stop stream_connection
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_stop(ChiakiStreamConnection *stream_connection);
#ifdef __cplusplus
}

View file

@ -199,14 +199,24 @@ static void *session_thread_func(void *arg)
goto quit_audio_receiver;
}
err = chiaki_stream_connection_run(session);
err = chiaki_stream_connection_init(&session->stream_connection, session);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "Nagare failed\n");
CHIAKI_LOGE(&session->log, "StreamConnection init failed\n");
goto quit_video_receiver;
}
CHIAKI_LOGI(&session->log, "Nagare completed successfully\n");
err = chiaki_stream_connection_run(&session->stream_connection);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "StreamConnection run failed\n");
goto quit_stream_connection;
}
CHIAKI_LOGI(&session->log, "StreamConnection completed successfully\n");
quit_stream_connection:
chiaki_stream_connection_fini(&session->stream_connection);
quit_video_receiver:
chiaki_video_receiver_free(session->video_receiver);

View file

@ -44,53 +44,82 @@
typedef enum {
STREAM_CONNECTION_MIRAI_REQUEST_BANG = 0,
STREAM_CONNECTION_MIRAI_REQUEST_STREAMINFO,
} StreamConnectionMiraiRequest;
typedef enum {
STREAM_CONNECTION_MIRAI_RESPONSE_FAIL = 0,
STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS = 1
} StreamConnectionMiraiResponse;
STATE_IDLE,
STATE_EXPECT_BANG,
STATE_EXPECT_STREAMINFO
} StreamConnectionState;
static void stream_connection_takion_data(ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size, void *user);
static ChiakiErrorCode stream_connection_send_big(ChiakiStreamConnection *stream_connection);
static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection *stream_connection);
static void stream_connection_takion_data_idle(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size);
static void stream_connection_takion_data_expect_bang(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size);
static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size);
static ChiakiErrorCode stream_connection_send_streaminfo_ack(ChiakiStreamConnection *stream_connection);
static void stream_connection_takion_av(ChiakiTakionAVPacket *packet, void *user);
static ChiakiErrorCode stream_connection_takion_mac(uint8_t *buf, size_t buf_size, size_t key_pos, uint8_t *mac_out, void *user);
static ChiakiErrorCode stream_connection_send_heartbeat(ChiakiStreamConnection *stream_connection);
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *session)
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_init(ChiakiStreamConnection *stream_connection, ChiakiSession *session)
{
ChiakiStreamConnection *stream_connection = &session->stream_connection;
stream_connection->session = session;
stream_connection->log = &session->log;
stream_connection->ecdh_secret = NULL;
stream_connection->gkcrypt_remote = NULL;
stream_connection->gkcrypt_local = NULL;
ChiakiErrorCode err = chiaki_bool_pred_cond_init(&stream_connection->stop_cond);
ChiakiErrorCode err = chiaki_mutex_init(&stream_connection->state_mutex);
if(err != CHIAKI_ERR_SUCCESS)
goto error;
err = chiaki_mirai_init(&stream_connection->mirai);
err = chiaki_cond_init(&stream_connection->state_cond);
if(err != CHIAKI_ERR_SUCCESS)
goto error_stop_cond;
goto error_state_mutex;
stream_connection->state = STATE_IDLE;
stream_connection->state_finished = false;
stream_connection->state_failed = false;
stream_connection->should_stop = false;
return CHIAKI_ERR_SUCCESS;
error_state_mutex:
chiaki_mutex_fini(&stream_connection->state_mutex);
error:
return err;
}
CHIAKI_EXPORT void chiaki_stream_connection_fini(ChiakiStreamConnection *stream_connection)
{
chiaki_gkcrypt_free(stream_connection->gkcrypt_remote);
chiaki_gkcrypt_free(stream_connection->gkcrypt_local);
free(stream_connection->ecdh_secret);
chiaki_cond_fini(&stream_connection->state_cond);
chiaki_mutex_fini(&stream_connection->state_mutex);
}
bool state_finished_cond_check(void *user)
{
ChiakiStreamConnection *stream_connection = user;
return stream_connection->state_finished || stream_connection->should_stop;
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnection *stream_connection)
{
ChiakiSession *session = stream_connection->session;
ChiakiErrorCode err;
ChiakiTakionConnectInfo takion_info;
takion_info.log = stream_connection->log;
takion_info.sa_len = session->connect_info.host_addrinfo_selected->ai_addrlen;
takion_info.sa = malloc(takion_info.sa_len);
if(!takion_info.sa)
{
err = CHIAKI_ERR_MEMORY;
goto error_mirai;
}
return CHIAKI_ERR_MEMORY;
memcpy(takion_info.sa, session->connect_info.host_addrinfo_selected->ai_addr, takion_info.sa_len);
err = set_port(takion_info.sa, htons(STREAM_CONNECTION_PORT));
assert(err == CHIAKI_ERR_SUCCESS);
@ -100,39 +129,41 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio
takion_info.av_cb = stream_connection_takion_av;
takion_info.av_cb_user = stream_connection;
// TODO: make this call stoppable
err = chiaki_takion_connect(&stream_connection->takion, &takion_info);
free(takion_info.sa);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "StreamConnection connect failed\n");
goto error_mirai;
return err;
}
err = chiaki_mutex_lock(&stream_connection->state_mutex);
assert(err == CHIAKI_ERR_SUCCESS);
CHIAKI_LOGI(&session->log, "StreamConnection sending big\n");
err = chiaki_mirai_request_begin(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_REQUEST_BANG, true);
assert(err == CHIAKI_ERR_SUCCESS);
stream_connection->state = STATE_EXPECT_BANG;
stream_connection->state_finished = false;
stream_connection->state_failed = false;
err = stream_connection_send_big(stream_connection);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "StreamConnection failed to send big\n");
goto error_takion;
goto disconnect;
}
err = chiaki_mirai_request_wait(&stream_connection->mirai, EXPECT_TIMEOUT_MS, true);
err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, stream_connection);
assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT);
if(stream_connection->mirai.response != STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS)
if(!stream_connection->state_finished)
{
if(err == CHIAKI_ERR_TIMEOUT)
CHIAKI_LOGE(&session->log, "StreamConnection bang receive timeout\n");
chiaki_mirai_request_unlock(&stream_connection->mirai);
CHIAKI_LOGE(&session->log, "StreamConnection didn't receive bang\n");
err = CHIAKI_ERR_UNKNOWN;
goto error_takion;
goto disconnect;
}
CHIAKI_LOGI(&session->log, "StreamConnection successfully received bang\n");
@ -142,13 +173,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio
if(!stream_connection->gkcrypt_local)
{
CHIAKI_LOGE(&session->log, "StreamConnection failed to initialize GKCrypt with index 2\n");
goto error_takion;
goto disconnect;
}
stream_connection->gkcrypt_remote = chiaki_gkcrypt_new(&session->log, 0 /* TODO */, 3, session->handshake_key, stream_connection->ecdh_secret);
if(!stream_connection->gkcrypt_remote)
{
CHIAKI_LOGE(&session->log, "StreamConnection failed to initialize GKCrypt with index 3\n");
goto error_gkcrypt_a;
goto disconnect;
}
// TODO: IMPORTANT!!!
@ -158,31 +189,30 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio
// Also, access to the gkcrypts must be synchronized for key_pos and everything.
chiaki_takion_set_crypt(&stream_connection->takion, stream_connection->gkcrypt_local, stream_connection->gkcrypt_remote);
err = chiaki_mirai_request_begin(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_REQUEST_STREAMINFO, false);
assert(err == CHIAKI_ERR_SUCCESS);
err = chiaki_mirai_request_wait(&stream_connection->mirai, EXPECT_TIMEOUT_MS, false);
stream_connection->state = STATE_EXPECT_STREAMINFO;
stream_connection->state_finished = false;
stream_connection->state_failed = false;
err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, stream_connection);
assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT);
if(stream_connection->mirai.response != STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS)
if(!stream_connection->state_finished)
{
if(err == CHIAKI_ERR_TIMEOUT)
CHIAKI_LOGE(&session->log, "StreamConnection streaminfo receive timeout\n");
chiaki_mirai_request_unlock(&stream_connection->mirai);
CHIAKI_LOGE(&session->log, "StreamConnection didn't receive streaminfo\n");
err = CHIAKI_ERR_UNKNOWN;
goto error_takion;
goto disconnect;
}
CHIAKI_LOGI(&session->log, "StreamConnection successfully received streaminfo\n");
err = chiaki_bool_pred_cond_lock(&stream_connection->stop_cond);
assert(err == CHIAKI_ERR_SUCCESS);
stream_connection->state = STATE_IDLE;
stream_connection->state_finished = false;
stream_connection->state_failed = false;
while(true)
{
err = chiaki_bool_pred_cond_timedwait(&stream_connection->stop_cond, HEARTBEAT_INTERVAL_MS);
err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, HEARTBEAT_INTERVAL_MS, state_finished_cond_check, stream_connection);
if(err != CHIAKI_ERR_TIMEOUT)
break;
@ -193,33 +223,33 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio
CHIAKI_LOGI(stream_connection->log, "StreamConnection sent heartbeat\n");
}
err = chiaki_bool_pred_cond_unlock(&stream_connection->stop_cond);
assert(err == CHIAKI_ERR_SUCCESS);
CHIAKI_LOGI(&session->log, "StreamConnection is disconnecting\n");
stream_connection_send_disconnect(stream_connection);
err = CHIAKI_ERR_SUCCESS;
// TODO: can't roll everything back like this, takion has to be closed first always.
chiaki_gkcrypt_free(stream_connection->gkcrypt_remote);
error_gkcrypt_a:
chiaki_gkcrypt_free(stream_connection->gkcrypt_local);
error_takion:
disconnect:
CHIAKI_LOGI(&session->log, "StreamConnection is disconnecting\n");
stream_connection_send_disconnect(stream_connection);
if(stream_connection->should_stop)
CHIAKI_LOGI(stream_connection->log, "StreamConnection was requested to stop\n");
chiaki_mutex_unlock(&stream_connection->state_mutex);
chiaki_takion_close(&stream_connection->takion);
CHIAKI_LOGI(&session->log, "StreamConnection closed takion\n");
error_mirai:
chiaki_mirai_fini(&stream_connection->mirai);
error_stop_cond:
chiaki_bool_pred_cond_fini(&stream_connection->stop_cond);
error:
free(stream_connection->ecdh_secret);
return err;
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_stop(ChiakiStreamConnection *stream_connection)
{
ChiakiErrorCode err = chiaki_mutex_lock(&stream_connection->state_mutex);
if(err != CHIAKI_ERR_SUCCESS)
return err;
stream_connection->should_stop = true;
ChiakiErrorCode unlock_err = chiaki_mutex_unlock(&stream_connection->state_mutex);
err = chiaki_cond_signal(&stream_connection->state_cond);
return err == CHIAKI_ERR_SUCCESS ? unlock_err : err;
}
static void stream_connection_takion_data(ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size, void *user)
@ -229,18 +259,25 @@ static void stream_connection_takion_data(ChiakiTakionMessageDataType data_type,
ChiakiStreamConnection *stream_connection = user;
switch(stream_connection->mirai.request)
chiaki_mutex_lock(&stream_connection->state_mutex);
switch(stream_connection->state)
{
case STREAM_CONNECTION_MIRAI_REQUEST_BANG:
case STATE_EXPECT_BANG:
stream_connection_takion_data_expect_bang(stream_connection, buf, buf_size);
return;
case STREAM_CONNECTION_MIRAI_REQUEST_STREAMINFO:
break;
case STATE_EXPECT_STREAMINFO:
stream_connection_takion_data_expect_streaminfo(stream_connection, buf, buf_size);
return;
default:
break;
default: // STATE_IDLE
stream_connection_takion_data_idle(stream_connection, buf, buf_size);
break;
}
chiaki_mutex_unlock(&stream_connection->state_mutex);
}
static void stream_connection_takion_data_idle(ChiakiStreamConnection *stream_connection, uint8_t *buf, size_t buf_size)
{
tkproto_TakionMessage msg;
memset(&msg, 0, sizeof(msg));
@ -286,6 +323,8 @@ static void stream_connection_takion_data_expect_bang(ChiakiStreamConnection *st
return;
}
CHIAKI_LOGD(stream_connection->log, "Got a bang\n");
if(!msg.bang_payload.version_accepted)
{
CHIAKI_LOGE(stream_connection->log, "StreamConnection bang remote didn't accept version\n");
@ -332,10 +371,13 @@ static void stream_connection_takion_data_expect_bang(ChiakiStreamConnection *st
goto error;
}
chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS);
// stream_connection->state_mutex is expected to be locked by the caller of this function
stream_connection->state_finished = true;
chiaki_cond_signal(&stream_connection->state_cond);
return;
error:
chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_FAIL);
stream_connection->state_failed = true;
chiaki_cond_signal(&stream_connection->state_cond);
}
typedef struct decode_resolutions_context_t
@ -410,7 +452,7 @@ static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnecti
if(audio_header_buf.size != CHIAKI_AUDIO_HEADER_SIZE)
{
CHIAKI_LOGE(stream_connection->log, "StreamConnection receoved invalid audio header in streaminfo\n");
CHIAKI_LOGE(stream_connection->log, "StreamConnection received invalid audio header in streaminfo\n");
goto error;
}
@ -426,10 +468,13 @@ static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnecti
stream_connection_send_streaminfo_ack(stream_connection);
chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_SUCCESS);
// stream_connection->state_mutex is expected to be locked by the caller of this function
stream_connection->state_finished = true;
chiaki_cond_signal(&stream_connection->state_cond);
return;
error:
chiaki_mirai_signal(&stream_connection->mirai, STREAM_CONNECTION_MIRAI_RESPONSE_FAIL);
stream_connection->state_failed = true;
chiaki_cond_signal(&stream_connection->state_cond);
}

View file

@ -471,9 +471,9 @@ static void *takion_thread_func(void *user)
{
ChiakiTakion *takion = user;
ChiakiCongestionControl congestion_control;
if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS)
goto beach;
// TODO ChiakiCongestionControl congestion_control;
// if(chiaki_congestion_control_start(&congestion_control, takion) != CHIAKI_ERR_SUCCESS)
// goto beach;
while(true)
{
@ -485,7 +485,7 @@ static void *takion_thread_func(void *user)
takion_handle_packet(takion, buf, received_size);
}
chiaki_congestion_control_stop(&congestion_control);
// chiaki_congestion_control_stop(&congestion_control);
beach:
close(takion->sock);
@ -496,7 +496,7 @@ beach:
static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout)
{
ChiakiErrorCode err = chiaki_stop_pipe_select_single(&takion->stop_pipe, takion->sock, timeout);
if(err == CHIAKI_ERR_TIMEOUT)
if(err == CHIAKI_ERR_TIMEOUT || err == CHIAKI_ERR_CANCELED)
return err;
if(err != CHIAKI_ERR_SUCCESS)
{