diff --git a/gui/src/main.cpp b/gui/src/main.cpp index faedc7c..36ca884 100644 --- a/gui/src/main.cpp +++ b/gui/src/main.cpp @@ -76,7 +76,7 @@ int main(int argc, char *argv[]) chiaki_connect_video_profile_preset(&connect_info.video_profile, CHIAKI_VIDEO_RESOLUTION_PRESET_720p, - CHIAKI_VIDEO_FPS_PRESET_60); + CHIAKI_VIDEO_FPS_PRESET_30); if(connect_info.registkey.isEmpty() || connect_info.ostype.isEmpty() || connect_info.auth.isEmpty() || connect_info.morning.isEmpty() || connect_info.did.isEmpty()) parser.showHelp(1); diff --git a/lib/include/chiaki/senkusha.h b/lib/include/chiaki/senkusha.h index 0e3e5ec..731c81c 100644 --- a/lib/include/chiaki/senkusha.h +++ b/lib/include/chiaki/senkusha.h @@ -36,6 +36,7 @@ typedef struct senkusha_t bool state_finished; bool state_failed; bool should_stop; + ChiakiSeqNum32 data_ack_seq_num_expected; /** * signaled on change of state_finished or should_stop diff --git a/lib/include/chiaki/takion.h b/lib/include/chiaki/takion.h index b64528e..727a2b4 100644 --- a/lib/include/chiaki/takion.h +++ b/lib/include/chiaki/takion.h @@ -80,6 +80,7 @@ typedef enum { CHIAKI_TAKION_EVENT_TYPE_CONNECTED, CHIAKI_TAKION_EVENT_TYPE_DISCONNECT, CHIAKI_TAKION_EVENT_TYPE_DATA, + CHIAKI_TAKION_EVENT_TYPE_DATA_ACK, CHIAKI_TAKION_EVENT_TYPE_AV } ChiakiTakionEventType; @@ -95,6 +96,11 @@ typedef struct chiaki_takion_event_t size_t buf_size; } data; + struct + { + ChiakiSeqNum32 seq_num; + } data_ack; + ChiakiTakionAVPacket *av; }; } ChiakiTakionEvent; @@ -198,8 +204,10 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send(ChiakiTakion *takion, uint8_t * /** * Thread-safe while Takion is running. + * + * @param optional pointer to write the sequence number of the sent package to */ -CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size); +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size, ChiakiSeqNum32 *seq_num); /** * Thread-safe while Takion is running. diff --git a/lib/include/chiaki/takionsendbuffer.h b/lib/include/chiaki/takionsendbuffer.h index bdf6ffd..3ecff3a 100644 --- a/lib/include/chiaki/takionsendbuffer.h +++ b/lib/include/chiaki/takionsendbuffer.h @@ -63,7 +63,11 @@ CHIAKI_EXPORT void chiaki_takion_send_buffer_fini(ChiakiTakionSendBuffer *send_b * On error, buf is freed immediately. */ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_push(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, uint8_t *buf, size_t buf_size); -CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num); + +/** + * @param acked_seq_nums optional array of size of at least send_buffer->packets_size where acked seq nums will be stored + */ +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, ChiakiSeqNum32 *acked_seq_nums, size_t *acked_seq_nums_count); #ifdef __cplusplus } diff --git a/lib/src/senkusha.c b/lib/src/senkusha.c index 6f1d637..539232e 100644 --- a/lib/src/senkusha.c +++ b/lib/src/senkusha.c @@ -28,22 +28,30 @@ #include #include #include +#include #define SENKUSHA_PORT 9297 #define EXPECT_TIMEOUT_MS 5000 +#define SENKUSHA_PING_COUNT_DEFAULT 10 + typedef enum { STATE_IDLE, STATE_TAKION_CONNECT, - STATE_EXPECT_BANG + STATE_EXPECT_BANG, + STATE_EXPECT_DATA_ACK, + STATE_EXPECT_PONG, } SenkushaState; +static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count); static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user); static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size); +static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num); static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha); static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha); +static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable); CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_init(ChiakiSenkusha *senkusha, ChiakiSession *session) { @@ -178,7 +186,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSenkusha *senkusha) CHIAKI_LOGI(session->log, "Senkusha successfully received bang"); - // TODO: Do the actual tests + senkusha_run_ping_test(senkusha, SENKUSHA_PING_COUNT_DEFAULT); CHIAKI_LOGI(session->log, "Senkusha is disconnecting"); @@ -193,6 +201,33 @@ quit: return err; } +static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count) +{ + CHIAKI_LOGI(senkusha->log, "Senkusha Ping Test with count %u starting", (unsigned int)ping_count); + + ChiakiErrorCode err = senkusha_send_echo_command(senkusha, true); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha Ping Test failed because sending echo command (true) failed"); + return err; + } + + CHIAKI_LOGI(senkusha->log, "Senkusha enabled echo"); + + // TODO: do test + + err = senkusha_send_echo_command(senkusha, false); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha Ping Test failed because sending echo command (false) failed"); + return err; + } + + CHIAKI_LOGI(senkusha->log, "Senkusha disabled echo"); + + return CHIAKI_ERR_SUCCESS; +} + static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user) { ChiakiSenkusha *senkusha = user; @@ -212,6 +247,9 @@ static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user) case CHIAKI_TAKION_EVENT_TYPE_DATA: senkusha_takion_data(senkusha, event->data.data_type, event->data.buf, event->data.buf_size); break; + case CHIAKI_TAKION_EVENT_TYPE_DATA_ACK: + senkusha_takion_data_ack(senkusha, event->data_ack.seq_num); + break; default: break; } @@ -249,6 +287,19 @@ static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDa chiaki_mutex_unlock(&senkusha->state_mutex); } +static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num) +{ + chiaki_mutex_lock(&senkusha->state_mutex); + if(senkusha->state == STATE_EXPECT_DATA_ACK && senkusha->data_ack_seq_num_expected == seq_num) + { + senkusha->state_finished = true; + chiaki_mutex_unlock(&senkusha->state_mutex); + chiaki_cond_signal(&senkusha->state_cond); + } + else + chiaki_mutex_unlock(&senkusha->state_mutex); +} + static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha) { tkproto_TakionMessage msg; @@ -276,7 +327,7 @@ static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha) } buf_size = stream.bytes_written; - ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size); + ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, NULL); return err; } @@ -303,9 +354,57 @@ static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha) } buf_size = stream.bytes_written; - ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size); + ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, NULL); return err; } +static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable) +{ + tkproto_TakionMessage msg; + memset(&msg, 0, sizeof(msg)); + msg.type = tkproto_TakionMessage_PayloadType_SENKUSHA; + msg.has_senkusha_payload = true; + msg.senkusha_payload.command = tkproto_SenkushaPayload_Command_ECHO_COMMAND; + msg.senkusha_payload.has_echo_command = true; + msg.senkusha_payload.echo_command.state = enable; + + uint8_t buf[0x10]; + size_t buf_size; + + pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf)); + bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg); + if(!pbr) + { + CHIAKI_LOGE(senkusha->log, "Senkusha echo command protobuf encoding failed"); + return CHIAKI_ERR_UNKNOWN; + } + + buf_size = stream.bytes_written; + senkusha->state = STATE_EXPECT_DATA_ACK; + senkusha->state_finished = false; + senkusha->state_failed = false; + ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, &senkusha->data_ack_seq_num_expected); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha failed to send echo command"); + return err; + } + + err = chiaki_cond_timedwait_pred(&senkusha->state_cond, &senkusha->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, senkusha); + assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT); + + if(!senkusha->state_finished) + { + if(err == CHIAKI_ERR_TIMEOUT) + CHIAKI_LOGE(senkusha->log, "Senkusha data ack for echo command receive timeout"); + + if(senkusha->should_stop) + err = CHIAKI_ERR_CANCELED; + else + CHIAKI_LOGE(senkusha->log, "Senkusha failed to receive data ack for echo command"); + } + + return err; +} diff --git a/lib/src/session.c b/lib/src/session.c index 5044a9b..4ca70cc 100644 --- a/lib/src/session.c +++ b/lib/src/session.c @@ -257,7 +257,7 @@ static bool session_check_state_pred(void *user) || session->ctrl_session_id_received; } -//#define ENABLE_SENKUSHA +#define ENABLE_SENKUSHA static void *session_thread_func(void *arg) { diff --git a/lib/src/streamconnection.c b/lib/src/streamconnection.c index 4f6c7fb..849fe01 100644 --- a/lib/src/streamconnection.c +++ b/lib/src/streamconnection.c @@ -728,7 +728,7 @@ static ChiakiErrorCode stream_connection_send_big(ChiakiStreamConnection *stream } buf_size = stream.bytes_written; - err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size); + err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size, NULL); return err; } @@ -751,7 +751,7 @@ static ChiakiErrorCode stream_connection_send_streaminfo_ack(ChiakiStreamConnect } buf_size = stream.bytes_written; - return chiaki_takion_send_message_data(&stream_connection->takion, 1, 9, buf, buf_size); + return chiaki_takion_send_message_data(&stream_connection->takion, 1, 9, buf, buf_size, NULL); } static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection *stream_connection) @@ -778,7 +778,7 @@ static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection CHIAKI_LOGI(stream_connection->log, "StreamConnection sending Disconnect"); buf_size = stream.bytes_written; - ChiakiErrorCode err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size); + ChiakiErrorCode err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size, NULL); return err; } @@ -810,7 +810,7 @@ static ChiakiErrorCode stream_connection_send_heartbeat(ChiakiStreamConnection * return CHIAKI_ERR_UNKNOWN; } - return chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, stream.bytes_written); + return chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, stream.bytes_written, NULL); } CHIAKI_EXPORT ChiakiErrorCode stream_connection_send_corrupt_frame(ChiakiStreamConnection *stream_connection, ChiakiSeqNum16 start, ChiakiSeqNum16 end) @@ -832,5 +832,5 @@ CHIAKI_EXPORT ChiakiErrorCode stream_connection_send_corrupt_frame(ChiakiStreamC } CHIAKI_LOGD(stream_connection->log, "StreamConnection reporting corrupt frame(s) from %u to %u", (unsigned int)start, (unsigned int)end); - return chiaki_takion_send_message_data(&stream_connection->takion, 1, 2, buf, stream.bytes_written); + return chiaki_takion_send_message_data(&stream_connection->takion, 1, 2, buf, stream.bytes_written, NULL); } diff --git a/lib/src/takion.c b/lib/src/takion.c index 9b81272..2567bfb 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -349,7 +349,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send(ChiakiTakion *takion, uint8_t * return chiaki_takion_send_raw(takion, buf, buf_size); } -CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size) +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size, ChiakiSeqNum32 *seq_num) { // TODO: can we make this more memory-efficient? // TODO: split packet if necessary? @@ -372,10 +372,10 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki err = chiaki_mutex_lock(&takion->seq_num_local_mutex); if(err != CHIAKI_ERR_SUCCESS) return err; - ChiakiSeqNum32 seq_num = takion->seq_num_local++; + ChiakiSeqNum32 seq_num_val = takion->seq_num_local++; chiaki_mutex_unlock(&takion->seq_num_local_mutex); - *((uint32_t *)(msg_payload + 0)) = htonl(seq_num); + *((uint32_t *)(msg_payload + 0)) = htonl(seq_num_val); *((uint16_t *)(msg_payload + 4)) = htons(channel); *((uint16_t *)(msg_payload + 6)) = 0; *(msg_payload + 8) = 0; @@ -389,7 +389,10 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki return err; } - chiaki_takion_send_buffer_push(&takion->send_buffer, seq_num, packet_buf, packet_size); + chiaki_takion_send_buffer_push(&takion->send_buffer, seq_num_val, packet_buf, packet_size); + + if(seq_num) + *seq_num = seq_num_val; return err; } @@ -606,6 +609,7 @@ static void *takion_thread_func(void *user) chiaki_reorder_queue_set_drop_cb(&takion->data_queue, takion_data_drop, takion); + // The send buffer size MUST be consistent with the acked seqnums array size in takion_handle_packet_message_data_ack() if(chiaki_takion_send_buffer_init(&takion->send_buffer, takion, TAKION_SEND_BUFFER_SIZE) != CHIAKI_ERR_SUCCESS) goto error_reoder_queue; @@ -939,8 +943,20 @@ static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t if(dup_tsns_count != 0) CHIAKI_LOGW(takion->log, "Takion received data ack with nonzero dup_tsns_count %#x", dup_tsns_count); - //CHIAKI_LOGD(takion->log, "Takion received data ack with seq_num = %#x, something = %#x, size_or_something = %#x, zero = %#x", seq_num, something, size_internal, zero); - chiaki_takion_send_buffer_ack(&takion->send_buffer, cumulative_seq_num); + CHIAKI_LOGV(takion->log, "Takion received data ack with cumulative_seq_num = %#x, a_rwnd = %#x, gap_ack_blocks_count = %#x, dup_tsns_count = %#x", + cumulative_seq_num, a_rwnd, gap_ack_blocks_count, dup_tsns_count); + + ChiakiSeqNum32 acked_seq_nums[TAKION_SEND_BUFFER_SIZE]; + size_t acked_seq_nums_count = 0; + chiaki_takion_send_buffer_ack(&takion->send_buffer, cumulative_seq_num, acked_seq_nums, &acked_seq_nums_count); + + for(size_t i=0; icb(&event, takion->cb_user); + } } /** diff --git a/lib/src/takionsendbuffer.c b/lib/src/takionsendbuffer.c index 90086e6..0532c75 100644 --- a/lib/src/takionsendbuffer.c +++ b/lib/src/takionsendbuffer.c @@ -144,12 +144,15 @@ beach: return err; } -CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num) +CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, ChiakiSeqNum32 *acked_seq_nums, size_t *acked_seq_nums_count) { ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex); if(err != CHIAKI_ERR_SUCCESS) return err; + if(acked_seq_nums_count) + *acked_seq_nums_count = 0; + size_t i; size_t shift = 0; // amount to shift back size_t shift_start = SIZE_MAX; @@ -157,6 +160,9 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuff { if(send_buffer->packets[i].seq_num == seq_num || chiaki_seq_num_32_lt(send_buffer->packets[i].seq_num, seq_num)) { + if(acked_seq_nums) + acked_seq_nums[(*acked_seq_nums_count)++] = send_buffer->packets[i].seq_num; + free(send_buffer->packets[i].buf); if(shift_start == SIZE_MAX) {