Add Stop Pipe to Session

This commit is contained in:
Florian Märkl 2019-08-03 12:31:26 +02:00
commit e7b4c2b661
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
10 changed files with 68 additions and 22 deletions

View file

@ -19,6 +19,7 @@
#define CHIAKI_HTTP_H
#include "common.h"
#include "stoppipe.h"
#include <stdlib.h>
@ -45,7 +46,11 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_http_header_parse(ChiakiHttpHeader **header
CHIAKI_EXPORT void chiaki_http_response_fini(ChiakiHttpResponse *response);
CHIAKI_EXPORT ChiakiErrorCode chiaki_http_response_parse(ChiakiHttpResponse *response, char *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size);
/**
* @param stop_pipe optional
* @param timeout_ms only used if stop_pipe is not NULL
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size, ChiakiStopPipe *stop_pipe, uint64_t timeout_ms);
#ifdef __cplusplus

View file

@ -30,6 +30,7 @@
#include "audioreceiver.h"
#include "videoreceiver.h"
#include "controller.h"
#include "stoppipe.h"
#include <stdint.h>
#include <netdb.h>
@ -132,6 +133,7 @@ typedef struct chiaki_session_t
ChiakiCond state_cond;
ChiakiMutex state_mutex;
ChiakiStopPipe stop_pipe;
bool should_stop;
bool ctrl_failed;
bool ctrl_session_id_received;

View file

@ -36,7 +36,7 @@ typedef struct chiaki_stop_pipe_t
CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_init(ChiakiStopPipe *stop_pipe);
CHIAKI_EXPORT void chiaki_stop_pipe_fini(ChiakiStopPipe *stop_pipe);
CHIAKI_EXPORT void chiaki_stop_pipe_stop(ChiakiStopPipe *stop_pipe);
CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, struct timeval *timeout);
CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, uint64_t timeout_ms);
#ifdef __cplusplus
}

View file

@ -149,7 +149,6 @@ typedef struct chiaki_takion_t
int sock;
ChiakiThread thread;
ChiakiStopPipe stop_pipe;
struct timeval recv_timeout;
uint32_t tag_local;
uint32_t tag_remote;

View file

@ -371,7 +371,7 @@ static ChiakiErrorCode ctrl_connect(ChiakiCtrl *ctrl)
size_t header_size;
size_t received_size;
err = chiaki_recv_http_header(sock, buf, sizeof(buf), &header_size, &received_size);
err = chiaki_recv_http_header(sock, buf, sizeof(buf), &header_size, &received_size, NULL, UINT64_MAX); // TODO: stop pipe!
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "Failed to receive ctrl request response");

View file

@ -148,7 +148,7 @@ static void *discovery_thread_func(void *user)
while(1)
{
ChiakiErrorCode err = chiaki_stop_pipe_select_single(&thread->stop_pipe, discovery->socket, NULL);
ChiakiErrorCode err = chiaki_stop_pipe_select_single(&thread->stop_pipe, discovery->socket, UINT64_MAX);
if(err == CHIAKI_ERR_CANCELED)
break;
if(err != CHIAKI_ERR_SUCCESS)

View file

@ -132,7 +132,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_http_response_parse(ChiakiHttpResponse *res
return chiaki_http_header_parse(&response->headers, buf, buf_size);
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size)
CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_t buf_size, size_t *header_size, size_t *received_size, ChiakiStopPipe *stop_pipe, uint64_t timeout_ms)
{
// 0 = ""
// 1 = "\r"
@ -146,6 +146,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_recv_http_header(int sock, char *buf, size_
*received_size = 0;
while(true)
{
if(stop_pipe)
{
ChiakiErrorCode err = chiaki_stop_pipe_select_single(stop_pipe, sock, timeout_ms);
if(err != CHIAKI_ERR_SUCCESS)
return err;
}
ssize_t received = recv(sock, buf, buf_size, 0);
if(received <= 0)
return CHIAKI_ERR_NETWORK;

View file

@ -41,7 +41,7 @@
#define RP_APPLICATION_REASON_IN_USE 0x80108b10
#define RP_APPLICATION_REASON_CRASH 0x80108b15
#define SESSION_ID_TIMEOUT_MS 10000
#define SESSION_EXPECT_TIMEOUT_MS 5000
static void *session_thread_func(void *arg);
@ -63,13 +63,17 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_session_init(ChiakiSession *session, Chiaki
if(err != CHIAKI_ERR_SUCCESS)
goto error_state_cond;
err = chiaki_stop_pipe_init(&session->stop_pipe);
if(err != CHIAKI_ERR_SUCCESS)
goto error_state_mutex;
session->should_stop = false;
err = chiaki_stream_connection_init(&session->stream_connection, session);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "StreamConnection init failed");
goto error_state_mutex;
goto error_stop_pipe;
}
int r = getaddrinfo(connect_info->host, NULL, NULL, &session->connect_info.host_addrinfos);
@ -100,6 +104,8 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_session_init(ChiakiSession *session, Chiaki
memcpy(session->connect_info.did, connect_info->did, sizeof(session->connect_info.did));
return CHIAKI_ERR_SUCCESS;
error_stop_pipe:
chiaki_stop_pipe_fini(&session->stop_pipe);
error_state_mutex:
chiaki_mutex_fini(&session->state_mutex);
error_state_cond:
@ -113,6 +119,7 @@ CHIAKI_EXPORT void chiaki_session_fini(ChiakiSession *session)
if(!session)
return;
chiaki_stream_connection_fini(&session->stream_connection);
chiaki_stop_pipe_fini(&session->stop_pipe);
chiaki_cond_fini(&session->state_cond);
chiaki_mutex_fini(&session->state_mutex);
free(session->connect_info.regist_key);
@ -127,7 +134,14 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_session_start(ChiakiSession *session)
CHIAKI_EXPORT ChiakiErrorCode chiaki_session_stop(ChiakiSession *session)
{
// TODO
ChiakiErrorCode err = chiaki_mutex_lock(&session->state_mutex);
assert(err == CHIAKI_ERR_SUCCESS);
session->should_stop = true;
chiaki_stop_pipe_stop(&session->stop_pipe);
chiaki_cond_signal(&session->state_cond);
chiaki_mutex_unlock(&session->state_mutex);
return CHIAKI_ERR_SUCCESS;
}
@ -208,7 +222,7 @@ static void *session_thread_func(void *arg)
if(err != CHIAKI_ERR_SUCCESS)
QUIT(quit);
chiaki_cond_timedwait_pred(&session->state_cond, &session->state_mutex, SESSION_ID_TIMEOUT_MS, session_check_state_pred, session);
chiaki_cond_timedwait_pred(&session->state_cond, &session->state_mutex, SESSION_EXPECT_TIMEOUT_MS, session_check_state_pred, session);
CHECK_STOP(quit_ctrl);
if(!session->ctrl_session_id_received)
@ -440,12 +454,22 @@ static bool session_thread_request_session(ChiakiSession *session)
size_t header_size;
size_t received_size;
ChiakiErrorCode err = chiaki_recv_http_header(session_sock, buf, sizeof(buf), &header_size, &received_size);
chiaki_mutex_unlock(&session->state_mutex);
ChiakiErrorCode err = chiaki_recv_http_header(session_sock, buf, sizeof(buf), &header_size, &received_size, &session->stop_pipe, SESSION_EXPECT_TIMEOUT_MS);
ChiakiErrorCode mutex_err = chiaki_mutex_lock(&session->state_mutex);
assert(mutex_err == CHIAKI_ERR_SUCCESS);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(&session->log, "Failed to receive session request response");
if(err == CHIAKI_ERR_CANCELED)
{
session->quit_reason = CHIAKI_QUIT_REASON_STOPPED;
}
else
{
CHIAKI_LOGE(&session->log, "Failed to receive session request response");
session->quit_reason = CHIAKI_QUIT_REASON_SESSION_REQUEST_UNKNOWN;
}
close(session_sock);
session->quit_reason = CHIAKI_QUIT_REASON_SESSION_REQUEST_UNKNOWN;
return false;
}

View file

@ -50,7 +50,7 @@ CHIAKI_EXPORT void chiaki_stop_pipe_stop(ChiakiStopPipe *stop_pipe)
write(stop_pipe->fds[1], "\x00", 1);
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, struct timeval *timeout)
CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *stop_pipe, int fd, uint64_t timeout_ms)
{
fd_set fds;
FD_ZERO(&fds);
@ -61,6 +61,16 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stop_pipe_select_single(ChiakiStopPipe *sto
if(stop_pipe->fds[0] > nfds)
nfds = stop_pipe->fds[0];
nfds++;
struct timeval timeout_s;
struct timeval *timeout = NULL;
if(timeout_ms != UINT64_MAX)
{
timeout_s.tv_sec = timeout_ms / 1000;
timeout_s.tv_usec = (timeout_ms % 1000) * 1000;
timeout = &timeout_s;
}
int r = select(nfds, &fds, NULL, NULL, timeout);
if(r < 0)
return CHIAKI_ERR_UNKNOWN;

View file

@ -42,6 +42,7 @@
#define TAKION_PACKET_BASE_TYPE_MASK 0xf
#define TAKION_EXPECT_TIMEOUT_MS 5000
/**
* Base type of Takion packets. Lower nibble of the first byte in datagrams.
@ -165,7 +166,7 @@ static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf,
static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t chunk_type, uint8_t chunk_flags, size_t payload_data_size);
static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload);
static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t *cookie);
static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout);
static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, uint64_t timeout_ms);
static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, TakionMessagePayloadInitAck *payload);
static ChiakiErrorCode takion_recv_message_cookie_ack(ChiakiTakion *takion);
static void takion_handle_packet_av(ChiakiTakion *takion, uint8_t base_type, uint8_t *buf, size_t buf_size);
@ -191,8 +192,6 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, Chiaki
if(ret != CHIAKI_ERR_SUCCESS)
goto error_gkcrypt_local_mutex;
takion->tag_remote = 0;
takion->recv_timeout.tv_sec = 2;
takion->recv_timeout.tv_usec = 0;
takion->enable_crypt = info->enable_crypt;
takion->postponed_packets = NULL;
@ -666,7 +665,7 @@ static void *takion_thread_func(void *user)
uint8_t *buf = malloc(received_size); // TODO: no malloc?
if(!buf)
break;
ChiakiErrorCode err = takion_recv(takion, buf, &received_size, NULL);
ChiakiErrorCode err = takion_recv(takion, buf, &received_size, UINT64_MAX);
if(err != CHIAKI_ERR_SUCCESS)
break;
uint8_t *resized_buf = realloc(buf, received_size);
@ -697,9 +696,9 @@ beach:
}
static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout)
static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, uint64_t timeout_ms)
{
ChiakiErrorCode err = chiaki_stop_pipe_select_single(&takion->stop_pipe, takion->sock, timeout);
ChiakiErrorCode err = chiaki_stop_pipe_select_single(&takion->stop_pipe, takion->sock, timeout_ms);
if(err == CHIAKI_ERR_TIMEOUT || err == CHIAKI_ERR_CANCELED)
return err;
if(err != CHIAKI_ERR_SUCCESS)
@ -1019,7 +1018,7 @@ static ChiakiErrorCode takion_recv_message_init_ack(ChiakiTakion *takion, Takion
{
uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE + 0x10 + TAKION_COOKIE_SIZE];
size_t received_size = sizeof(message);
ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout);
ChiakiErrorCode err = takion_recv(takion, message, &received_size, TAKION_EXPECT_TIMEOUT_MS);
if(err != CHIAKI_ERR_SUCCESS)
return err;
@ -1067,7 +1066,7 @@ static ChiakiErrorCode takion_recv_message_cookie_ack(ChiakiTakion *takion)
{
uint8_t message[1 + TAKION_MESSAGE_HEADER_SIZE];
size_t received_size = sizeof(message);
ChiakiErrorCode err = takion_recv(takion, message, &received_size, &takion->recv_timeout);
ChiakiErrorCode err = takion_recv(takion, message, &received_size, TAKION_EXPECT_TIMEOUT_MS);
if(err != CHIAKI_ERR_SUCCESS)
return err;