diff --git a/lib/include/chiaki/senkusha.h b/lib/include/chiaki/senkusha.h index 731c81c..c4137a1 100644 --- a/lib/include/chiaki/senkusha.h +++ b/lib/include/chiaki/senkusha.h @@ -37,6 +37,10 @@ typedef struct senkusha_t bool state_failed; bool should_stop; ChiakiSeqNum32 data_ack_seq_num_expected; + uint64_t pong_time_us; + uint16_t ping_test_index; + uint16_t ping_index; + uint32_t ping_tag; /** * signaled on change of state_finished or should_stop diff --git a/lib/include/chiaki/time.h b/lib/include/chiaki/time.h index e2cdb08..ef39af8 100644 --- a/lib/include/chiaki/time.h +++ b/lib/include/chiaki/time.h @@ -26,7 +26,9 @@ extern "C" { #endif -CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms(); +CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_us(); + +static inline uint64_t chiaki_time_now_monotonic_ms() { return chiaki_time_now_monotonic_us() / 1000; } #ifdef __cplusplus } diff --git a/lib/src/senkusha.c b/lib/src/senkusha.c index 251be2e..df2260c 100644 --- a/lib/src/senkusha.c +++ b/lib/src/senkusha.c @@ -17,6 +17,8 @@ #include #include +#include +#include #include #include @@ -36,6 +38,7 @@ #define EXPECT_TIMEOUT_MS 5000 #define SENKUSHA_PING_COUNT_DEFAULT 10 +#define EXPECT_PONG_TIMEOUT_MS 1000 typedef enum { STATE_IDLE, @@ -45,10 +48,11 @@ typedef enum { STATE_EXPECT_PONG, } SenkushaState; -static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count); +static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t ping_test_index, uint16_t ping_count); static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user); static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size); static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num); +static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *packet); static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha); static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha); static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable); @@ -70,6 +74,9 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_init(ChiakiSenkusha *senkusha, Chi senkusha->state_finished = false; senkusha->state_failed = false; senkusha->should_stop = false; + senkusha->data_ack_seq_num_expected = 0; + senkusha->ping_tag = 0; + senkusha->pong_time_us = 0; return CHIAKI_ERR_SUCCESS; @@ -187,7 +194,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSenkusha *senkusha) CHIAKI_LOGI(session->log, "Senkusha successfully received bang"); - senkusha_run_ping_test(senkusha, SENKUSHA_PING_COUNT_DEFAULT); + senkusha_run_ping_test(senkusha, 0, SENKUSHA_PING_COUNT_DEFAULT); CHIAKI_LOGI(session->log, "Senkusha is disconnecting"); @@ -202,7 +209,7 @@ quit: return err; } -static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count) +static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t ping_test_index, uint16_t ping_count) { CHIAKI_LOGI(senkusha->log, "Senkusha Ping Test with count %u starting", (unsigned int)ping_count); @@ -215,7 +222,66 @@ static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t CHIAKI_LOGI(senkusha->log, "Senkusha enabled echo"); - // TODO: do test + for(uint16_t ping_index=0; ping_indexlog, "Senkusha sending Ping %u of test index %u", (unsigned int)ping_index, (unsigned int)ping_test_index); + + ChiakiTakionAVPacket av_packet = { 0 }; + av_packet.codec = 0xff; + av_packet.is_video = false; + av_packet.frame_index = ping_test_index; + av_packet.unit_index = ping_index; + av_packet.units_in_frame_total = 0x800; // or 0 + + uint8_t data[0x224]; + memset(data, 0, sizeof(data)); + + size_t header_size; + err = chiaki_takion_v7_av_packet_format_header(data, sizeof(data), &header_size, &av_packet); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha failed to format AV Header"); + return err; + } + + uint32_t tag = 0x1337; //chiaki_random_32(); + *((uint32_t *)(data + header_size + 4)) = htonl(tag); + + senkusha->state = STATE_EXPECT_PONG; + senkusha->state_finished = false; + senkusha->state_failed = false; + senkusha->ping_test_index = ping_test_index; + senkusha->ping_index = ping_index; + senkusha->ping_tag = tag; + + uint64_t time_start_us = chiaki_time_now_monotonic_us(); + + err = chiaki_takion_send_raw(&senkusha->takion, data, sizeof(data)); + if(err != CHIAKI_ERR_SUCCESS) + { + CHIAKI_LOGE(senkusha->log, "Senkusha failed to send ping"); + return err; + } + + err = chiaki_cond_timedwait_pred(&senkusha->state_cond, &senkusha->state_mutex, EXPECT_PONG_TIMEOUT_MS, state_finished_cond_check, senkusha); + assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT); + + if(!senkusha->state_finished) + { + if(err == CHIAKI_ERR_TIMEOUT) + CHIAKI_LOGE(senkusha->log, "Senkusha pong receive timeout"); + + if(senkusha->should_stop) + return CHIAKI_ERR_CANCELED; + else + CHIAKI_LOGE(senkusha->log, "Senkusha failed to receive pong"); + + continue; + } + + uint64_t delta_us = senkusha->pong_time_us - time_start_us; + CHIAKI_LOGI(senkusha->log, "Senkusha received Pong, RTT = %.3f ms", (float)delta_us * 0.001f); + } err = senkusha_send_echo_command(senkusha, false); if(err != CHIAKI_ERR_SUCCESS) @@ -251,6 +317,9 @@ static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user) case CHIAKI_TAKION_EVENT_TYPE_DATA_ACK: senkusha_takion_data_ack(senkusha, event->data_ack.seq_num); break; + case CHIAKI_TAKION_EVENT_TYPE_AV: + senkusha_takion_av(senkusha, event->av); + break; default: break; } @@ -301,6 +370,42 @@ static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 se chiaki_mutex_unlock(&senkusha->state_mutex); } +static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *packet) +{ + uint64_t time_us = chiaki_time_now_monotonic_us(); + + ChiakiErrorCode err = chiaki_mutex_lock(&senkusha->state_mutex); + assert(err == CHIAKI_ERR_SUCCESS); + + if(senkusha->state == STATE_EXPECT_PONG) + { + if(packet->frame_index != senkusha->ping_test_index + || packet->unit_index != senkusha->ping_index + || packet->data_size < 8) + { + CHIAKI_LOGW(senkusha->log, "Senkusha received invalid Pong %u/%u, size: %#llx", + (unsigned int)packet->frame_index, (unsigned int)packet->unit_index, (unsigned long long)packet->data_size); + goto beach; + } + + uint32_t tag = ntohl(*((uint32_t *)(packet->data + 4))); + if(tag != senkusha->ping_tag) + { + CHIAKI_LOGW(senkusha->log, "Senkusha received Pong with invalid tag"); + goto beach; + } + + senkusha->pong_time_us = time_us; + senkusha->state_finished = true; + chiaki_mutex_unlock(&senkusha->state_mutex); + chiaki_cond_signal(&senkusha->state_cond); + return; + } + +beach: + chiaki_mutex_unlock(&senkusha->state_mutex); +} + static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha) { tkproto_TakionMessage msg; diff --git a/lib/src/takion.c b/lib/src/takion.c index a20f687..ac776fa 100644 --- a/lib/src/takion.c +++ b/lib/src/takion.c @@ -1152,7 +1152,7 @@ static void takion_handle_packet_av(ChiakiTakion *takion, uint8_t base_type, uin assert(base_type == TAKION_PACKET_TYPE_VIDEO || base_type == TAKION_PACKET_TYPE_AUDIO); ChiakiTakionAVPacket packet; - ChiakiErrorCode err = chiaki_takion_v9_av_packet_parse(&packet, buf, buf_size); + ChiakiErrorCode err = takion->av_packet_parse(&packet, buf, buf_size); if(err != CHIAKI_ERR_SUCCESS) { if(err == CHIAKI_ERR_BUF_TOO_SMALL) @@ -1255,7 +1255,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_v9_av_packet_parse(ChiakiTakionAVPac return CHIAKI_ERR_SUCCESS; } -#define CHIAKI_TAKION_V7_AV_HEADER_SIZE_BASE 0x11 +#define CHIAKI_TAKION_V7_AV_HEADER_SIZE_BASE 0x12 #define CHIAKI_TAKION_V7_AV_HEADER_SIZE_VIDEO_ADD 0x3 #define CHIAKI_TAKION_V7_AV_HEADER_SIZE_NALU_INFO_STRUCTS_ADD 0x3 @@ -1303,7 +1303,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_v7_av_packet_format_header(uint8_t * cur[2] = 0; // unknown } - return CHIAKI_ERR_BUF_TOO_SMALL; + return CHIAKI_ERR_SUCCESS; } CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_v7_av_packet_parse(ChiakiTakionAVPacket *packet, uint8_t *buf, size_t buf_size) diff --git a/lib/src/time.c b/lib/src/time.c index 6ca5faa..d4ab6d8 100644 --- a/lib/src/time.c +++ b/lib/src/time.c @@ -19,9 +19,9 @@ #include -CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms() +CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_us() { struct timespec time; clock_gettime(CLOCK_MONOTONIC, &time); - return time.tv_sec * 1000 + time.tv_nsec / 1000000; + return time.tv_sec * 1000000 + time.tv_nsec / 1000; }