From e7b4c2b6612aaa7a4a514c48369e0ba7a0848fe7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=A4rkl?= Date: Sat, 3 Aug 2019 12:31:26 +0200 Subject: [PATCH] Add Stop Pipe to Session --- lib/include/chiaki/http.h | 7 ++++++- lib/include/chiaki/session.h | 2 ++ lib/include/chiaki/stoppipe.h | 2 +- lib/include/chiaki/takion.h | 1 - lib/src/ctrl.c | 2 +- lib/src/discovery.c | 2 +- lib/src/http.c | 9 ++++++++- lib/src/session.c | 38 ++++++++++++++++++++++++++++------- lib/src/stoppipe.c | 12 ++++++++++- lib/src/takion.c | 15 +++++++------- 10 files changed, 68 insertions(+), 22 deletions(-) diff --git a/lib/include/chiaki/http.h b/lib/include/chiaki/http.h index f694921..56c3a6f 100644 --- a/lib/include/chiaki/http.h +++ b/lib/include/chiaki/http.h @@ -19,6 +19,7 @@ #define CHIAKI_HTTP_H #include "common.h" +#include "stoppipe.h" #include @@ -45,7 +46,11 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_http_header_parse(ChiakiHttpHeader **header CHIAKI_EXPORT void chiaki_http_response_fini(ChiakiHttpResponse *response); CHIAKI_EXPORT ChiakiErrorCode chiaki_http_response_parse(ChiakiHttpResponse *response, char *buf, size_t buf_size); -CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size); +/** + * @param stop_pipe optional + * @param timeout_ms only used if stop_pipe is not NULL + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size, ChiakiStopPipe *stop_pipe, uint64_t timeout_ms); #ifdef __cplusplus diff --git a/lib/include/chiaki/session.h b/lib/include/chiaki/session.h index feb9c51..2f3235c 100644 --- a/lib/include/chiaki/session.h +++ b/lib/include/chiaki/session.h @@ -30,6 +30,7 @@ #include "audioreceiver.h" #include "videoreceiver.h" #include "controller.h" +#include "stoppipe.h" #include #include @@ -132,6 +133,7 @@ typedef struct chiaki_session_t ChiakiCond state_cond; ChiakiMutex state_mutex; + ChiakiStopPipe stop_pipe; bool should_stop; bool ctrl_failed; bool ctrl_session_id_received; diff --git a/lib/include/chiaki/stoppipe.h b/lib/include/chiaki/stoppipe.h index 5b06da3..491674c 100644 --- a/lib/include/chiaki/stoppipe.h +++ b/lib/include/chiaki/stoppipe.h @@ -36,7 +36,7 @@ typedef struct chiaki_stop_pipe_t CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_init(ChiakiStopPipe *stop_pipe); CHIAKI_EXPORT void chiaki_stop_pipe_fini(ChiakiStopPipe *stop_pipe); CHIAKI_EXPORT void chiaki_stop_pipe_stop(ChiakiStopPipe *stop_pipe); -CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, struct timeval *timeout); +CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, uint64_t timeout_ms); #ifdef __cplusplus } diff --git a/lib/include/chiaki/takion.h b/lib/include/chiaki/takion.h index ab02800..b64528e 100644 --- a/lib/include/chiaki/takion.h +++ b/lib/include/chiaki/takion.h @@ -149,7 +149,6 @@ typedef struct chiaki_takion_t int sock; ChiakiThread thread; ChiakiStopPipe stop_pipe; - struct timeval recv_timeout; uint32_t tag_local; uint32_t tag_remote; diff --git a/lib/src/ctrl.c b/lib/src/ctrl.c index a1d1298..91c02ac 100644 --- a/lib/src/ctrl.c +++ b/lib/src/ctrl.c @@ -371,7 +371,7 @@ static ChiakiErrorCode ctrl_connect(ChiakiCtrl *ctrl) size_t header_size; size_t received_size; - err = chiaki_recv_http_header(sock, buf, sizeof(buf), &header_size, &received_size); + err = chiaki_recv_http_header(sock, buf, sizeof(buf), &header_size, &received_size, NULL, UINT64_MAX); // TODO: stop pipe! if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(&session->log, "Failed to receive ctrl request response"); diff --git a/lib/src/discovery.c b/lib/src/discovery.c index e8e70b2..379ba92 100644 --- a/lib/src/discovery.c +++ b/lib/src/discovery.c @@ -148,7 +148,7 @@ static void *discovery_thread_func(void *user) while(1) { - ChiakiErrorCode err = chiaki_stop_pipe_select_single(&thread->stop_pipe, discovery->socket, NULL); + ChiakiErrorCode err = chiaki_stop_pipe_select_single(&thread->stop_pipe, discovery->socket, UINT64_MAX); if(err == CHIAKI_ERR_CANCELED) break; if(err != CHIAKI_ERR_SUCCESS) diff --git a/lib/src/http.c b/lib/src/http.c index bff6093..80516b8 100644 --- a/lib/src/http.c +++ b/lib/src/http.c @@ -132,7 +132,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_http_response_parse(ChiakiHttpResponse *res return chiaki_http_header_parse(&response->headers, buf, buf_size); } -CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size) +CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size, ChiakiStopPipe *stop_pipe, uint64_t timeout_ms) { // 0 = "" // 1 = "\r" @@ -146,6 +146,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_ *received_size = 0; while(true) { + if(stop_pipe) + { + ChiakiErrorCode err = chiaki_stop_pipe_select_single(stop_pipe, sock, timeout_ms); + if(err != CHIAKI_ERR_SUCCESS) + return err; + } + ssize_t received = recv(sock, buf, buf_size, 0); if(received <= 0) return CHIAKI_ERR_NETWORK; diff --git a/lib/src/session.c b/lib/src/session.c index 3f070e5..a3a01c1 100644 --- a/lib/src/session.c +++ b/lib/src/session.c @@ -41,7 +41,7 @@ #define RP_APPLICATION_REASON_IN_USE 0x80108b10 #define RP_APPLICATION_REASON_CRASH 0x80108b15 -#define SESSION_ID_TIMEOUT_MS 10000 +#define SESSION_EXPECT_TIMEOUT_MS 5000 static void *session_thread_func(void *arg); @@ -63,13 +63,17 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_session_init(ChiakiSession *session, Chiaki if(err != CHIAKI_ERR_SUCCESS) goto error_state_cond; + err = chiaki_stop_pipe_init(&session->stop_pipe); + if(err != CHIAKI_ERR_SUCCESS) + goto error_state_mutex; + session->should_stop = false; err = chiaki_stream_connection_init(&session->stream_connection, session); if(err != CHIAKI_ERR_SUCCESS) { CHIAKI_LOGE(&session->log, "StreamConnection init failed"); - goto error_state_mutex; + goto error_stop_pipe; } int r = getaddrinfo(connect_info->host, NULL, NULL, &session->connect_info.host_addrinfos); @@ -100,6 +104,8 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_session_init(ChiakiSession *session, Chiaki memcpy(session->connect_info.did, connect_info->did, sizeof(session->connect_info.did)); return CHIAKI_ERR_SUCCESS; +error_stop_pipe: + chiaki_stop_pipe_fini(&session->stop_pipe); error_state_mutex: chiaki_mutex_fini(&session->state_mutex); error_state_cond: @@ -113,6 +119,7 @@ CHIAKI_EXPORT void chiaki_session_fini(ChiakiSession *session) if(!session) return; chiaki_stream_connection_fini(&session->stream_connection); + chiaki_stop_pipe_fini(&session->stop_pipe); chiaki_cond_fini(&session->state_cond); chiaki_mutex_fini(&session->state_mutex); free(session->connect_info.regist_key); @@ -127,7 +134,14 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_session_start(ChiakiSession *session) CHIAKI_EXPORT ChiakiErrorCode chiaki_session_stop(ChiakiSession *session) { - // TODO + ChiakiErrorCode err = chiaki_mutex_lock(&session->state_mutex); + assert(err == CHIAKI_ERR_SUCCESS); + + session->should_stop = true; + chiaki_stop_pipe_stop(&session->stop_pipe); + chiaki_cond_signal(&session->state_cond); + + chiaki_mutex_unlock(&session->state_mutex); return CHIAKI_ERR_SUCCESS; } @@ -208,7 +222,7 @@ static void *session_thread_func(void *arg) if(err != CHIAKI_ERR_SUCCESS) QUIT(quit); - chiaki_cond_timedwait_pred(&session->state_cond, &session->state_mutex, SESSION_ID_TIMEOUT_MS, session_check_state_pred, session); + chiaki_cond_timedwait_pred(&session->state_cond, &session->state_mutex, SESSION_EXPECT_TIMEOUT_MS, session_check_state_pred, session); CHECK_STOP(quit_ctrl); if(!session->ctrl_session_id_received) @@ -440,12 +454,22 @@ static bool session_thread_request_session(ChiakiSession *session) size_t header_size; size_t received_size; - ChiakiErrorCode err = chiaki_recv_http_header(session_sock, buf, sizeof(buf), &header_size, &received_size); + chiaki_mutex_unlock(&session->state_mutex); + ChiakiErrorCode err = chiaki_recv_http_header(session_sock, buf, sizeof(buf), &header_size, &received_size, &session->stop_pipe, SESSION_EXPECT_TIMEOUT_MS); + ChiakiErrorCode mutex_err = chiaki_mutex_lock(&session->state_mutex); + assert(mutex_err == CHIAKI_ERR_SUCCESS); if(err != CHIAKI_ERR_SUCCESS) { - CHIAKI_LOGE(&session->log, "Failed to receive session request response"); + if(err == CHIAKI_ERR_CANCELED) + { + session->quit_reason = CHIAKI_QUIT_REASON_STOPPED; + } + else + { + CHIAKI_LOGE(&session->log, "Failed to receive session request response"); + session->quit_reason = CHIAKI_QUIT_REASON_SESSION_REQUEST_UNKNOWN; + } close(session_sock); - session->quit_reason = CHIAKI_QUIT_REASON_SESSION_REQUEST_UNKNOWN; return false; } diff --git a/lib/src/stoppipe.c b/lib/src/stoppipe.c index ca8275d..e85db41 100644 --- a/lib/src/stoppipe.c +++ b/lib/src/stoppipe.c @@ -50,7 +50,7 @@ CHIAKI_EXPORT void chiaki_stop_pipe_stop(ChiakiStopPipe *stop_pipe) write(stop_pipe->fds[1], "\x00", 1); } -CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, struct timeval *timeout) +CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, uint64_t timeout_ms) { fd_set fds; FD_ZERO(&fds); @@ -61,6 +61,16 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *sto if(stop_pipe->fds[0] > nfds) nfds = stop_pipe->fds[0]; nfds++; + + struct timeval timeout_s; + struct timeval *timeout = NULL; + if(timeout_ms != UINT64_MAX) + { + timeout_s.tv_sec = timeout_ms / 1000; + timeout_s.tv_usec = (timeout_ms % 1000) * 1000; + timeout = &timeout_s; + } + int r = select(nfds, &fds, NULL, NULL, timeout); if(r < 0) return CHIAKI_ERR_UNKNOWN; diff --git a/lib/src/takion.c b/lib/src/takion.c index 158ab78..57660a5 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -42,6 +42,7 @@ #define TAKION_PACKET_BASE_TYPE_MASK 0xf +#define TAKION_EXPECT_TIMEOUT_MS 5000 /** * Base type of Takion packets. Lower nibble of the first byte in datagrams. @@ -165,7 +166,7 @@ static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t chunk_type, uint8_t chunk_flags, size_t payload_data_size); static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload); static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t *cookie); -static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout); +static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, uint64_t timeout_ms); static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, TakionMessagePayloadInitAck *payload); static ChiakiErrorCode takion_recv_message_cookie_ack(ChiakiTakion *takion); static void takion_handle_packet_av(ChiakiTakion *takion, uint8_t base_type, uint8_t *buf, size_t buf_size); @@ -191,8 +192,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki if(ret != CHIAKI_ERR_SUCCESS) goto error_gkcrypt_local_mutex; takion->tag_remote = 0; - takion->recv_timeout.tv_sec = 2; - takion->recv_timeout.tv_usec = 0; takion->enable_crypt = info->enable_crypt; takion->postponed_packets = NULL; @@ -666,7 +665,7 @@ static void *takion_thread_func(void *user) uint8_t *buf = malloc(received_size); // TODO: no malloc? if(!buf) break; - ChiakiErrorCode err = takion_recv(takion, buf, &received_size, NULL); + ChiakiErrorCode err = takion_recv(takion, buf, &received_size, UINT64_MAX); if(err != CHIAKI_ERR_SUCCESS) break; uint8_t *resized_buf = realloc(buf, received_size); @@ -697,9 +696,9 @@ beach: } -static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout) +static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, uint64_t timeout_ms) { - ChiakiErrorCode err = chiaki_stop_pipe_select_single(&takion->stop_pipe, takion->sock, timeout); + ChiakiErrorCode err = chiaki_stop_pipe_select_single(&takion->stop_pipe, takion->sock, timeout_ms); if(err == CHIAKI_ERR_TIMEOUT || err == CHIAKI_ERR_CANCELED) return err; if(err != CHIAKI_ERR_SUCCESS) @@ -1019,7 +1018,7 @@ static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, Takion { uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + 0x10 + TAKION_COOKIE_SIZE]; size_t received_size = sizeof(message); - ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout); + ChiakiErrorCode err = takion_recv(takion, message, &received_size, TAKION_EXPECT_TIMEOUT_MS); if(err != CHIAKI_ERR_SUCCESS) return err; @@ -1067,7 +1066,7 @@ static ChiakiErrorCode takion_recv_message_cookie_ack(ChiakiTakion *takion) { uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE]; size_t received_size = sizeof(message); - ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout); + ChiakiErrorCode err = takion_recv(takion, message, &received_size, TAKION_EXPECT_TIMEOUT_MS); if(err != CHIAKI_ERR_SUCCESS) return err;