Add Senkusha Ping Test

This commit is contained in:
Florian Märkl 2019-08-10 13:05:07 +02:00
commit ad612d2ce8
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
5 changed files with 121 additions and 10 deletions

View file

@ -37,6 +37,10 @@ typedef struct senkusha_t
bool state_failed;
bool should_stop;
ChiakiSeqNum32 data_ack_seq_num_expected;
uint64_t pong_time_us;
uint16_t ping_test_index;
uint16_t ping_index;
uint32_t ping_tag;
/**
* signaled on change of state_finished or should_stop

View file

@ -26,7 +26,9 @@
extern "C" {
#endif
CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms();
CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_us();
static inline uint64_t chiaki_time_now_monotonic_ms() { return chiaki_time_now_monotonic_us() / 1000; }
#ifdef __cplusplus
}

View file

@ -17,6 +17,8 @@
#include <chiaki/senkusha.h>
#include <chiaki/session.h>
#include <chiaki/random.h>
#include <chiaki/time.h>
#include <string.h>
#include <assert.h>
@ -36,6 +38,7 @@
#define EXPECT_TIMEOUT_MS 5000
#define SENKUSHA_PING_COUNT_DEFAULT 10
#define EXPECT_PONG_TIMEOUT_MS 1000
typedef enum {
STATE_IDLE,
@ -45,10 +48,11 @@ typedef enum {
STATE_EXPECT_PONG,
} SenkushaState;
static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count);
static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t ping_test_index, uint16_t ping_count);
static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user);
static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size);
static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num);
static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *packet);
static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha);
static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha);
static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable);
@ -70,6 +74,9 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_init(ChiakiSenkusha *senkusha, Chi
senkusha->state_finished = false;
senkusha->state_failed = false;
senkusha->should_stop = false;
senkusha->data_ack_seq_num_expected = 0;
senkusha->ping_tag = 0;
senkusha->pong_time_us = 0;
return CHIAKI_ERR_SUCCESS;
@ -187,7 +194,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSenkusha *senkusha)
CHIAKI_LOGI(session->log, "Senkusha successfully received bang");
senkusha_run_ping_test(senkusha, SENKUSHA_PING_COUNT_DEFAULT);
senkusha_run_ping_test(senkusha, 0, SENKUSHA_PING_COUNT_DEFAULT);
CHIAKI_LOGI(session->log, "Senkusha is disconnecting");
@ -202,7 +209,7 @@ quit:
return err;
}
static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count)
static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint16_t ping_test_index, uint16_t ping_count)
{
CHIAKI_LOGI(senkusha->log, "Senkusha Ping Test with count %u starting", (unsigned int)ping_count);
@ -215,7 +222,66 @@ static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t
CHIAKI_LOGI(senkusha->log, "Senkusha enabled echo");
// TODO: do test
for(uint16_t ping_index=0; ping_index<ping_count; ping_index++)
{
CHIAKI_LOGI(senkusha->log, "Senkusha sending Ping %u of test index %u", (unsigned int)ping_index, (unsigned int)ping_test_index);
ChiakiTakionAVPacket av_packet = { 0 };
av_packet.codec = 0xff;
av_packet.is_video = false;
av_packet.frame_index = ping_test_index;
av_packet.unit_index = ping_index;
av_packet.units_in_frame_total = 0x800; // or 0
uint8_t data[0x224];
memset(data, 0, sizeof(data));
size_t header_size;
err = chiaki_takion_v7_av_packet_format_header(data, sizeof(data), &header_size, &av_packet);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha failed to format AV Header");
return err;
}
uint32_t tag = 0x1337; //chiaki_random_32();
*((uint32_t *)(data + header_size + 4)) = htonl(tag);
senkusha->state = STATE_EXPECT_PONG;
senkusha->state_finished = false;
senkusha->state_failed = false;
senkusha->ping_test_index = ping_test_index;
senkusha->ping_index = ping_index;
senkusha->ping_tag = tag;
uint64_t time_start_us = chiaki_time_now_monotonic_us();
err = chiaki_takion_send_raw(&senkusha->takion, data, sizeof(data));
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha failed to send ping");
return err;
}
err = chiaki_cond_timedwait_pred(&senkusha->state_cond, &senkusha->state_mutex, EXPECT_PONG_TIMEOUT_MS, state_finished_cond_check, senkusha);
assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT);
if(!senkusha->state_finished)
{
if(err == CHIAKI_ERR_TIMEOUT)
CHIAKI_LOGE(senkusha->log, "Senkusha pong receive timeout");
if(senkusha->should_stop)
return CHIAKI_ERR_CANCELED;
else
CHIAKI_LOGE(senkusha->log, "Senkusha failed to receive pong");
continue;
}
uint64_t delta_us = senkusha->pong_time_us - time_start_us;
CHIAKI_LOGI(senkusha->log, "Senkusha received Pong, RTT = %.3f ms", (float)delta_us * 0.001f);
}
err = senkusha_send_echo_command(senkusha, false);
if(err != CHIAKI_ERR_SUCCESS)
@ -251,6 +317,9 @@ static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user)
case CHIAKI_TAKION_EVENT_TYPE_DATA_ACK:
senkusha_takion_data_ack(senkusha, event->data_ack.seq_num);
break;
case CHIAKI_TAKION_EVENT_TYPE_AV:
senkusha_takion_av(senkusha, event->av);
break;
default:
break;
}
@ -301,6 +370,42 @@ static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 se
chiaki_mutex_unlock(&senkusha->state_mutex);
}
static void senkusha_takion_av(ChiakiSenkusha *senkusha, ChiakiTakionAVPacket *packet)
{
uint64_t time_us = chiaki_time_now_monotonic_us();
ChiakiErrorCode err = chiaki_mutex_lock(&senkusha->state_mutex);
assert(err == CHIAKI_ERR_SUCCESS);
if(senkusha->state == STATE_EXPECT_PONG)
{
if(packet->frame_index != senkusha->ping_test_index
|| packet->unit_index != senkusha->ping_index
|| packet->data_size < 8)
{
CHIAKI_LOGW(senkusha->log, "Senkusha received invalid Pong %u/%u, size: %#llx",
(unsigned int)packet->frame_index, (unsigned int)packet->unit_index, (unsigned long long)packet->data_size);
goto beach;
}
uint32_t tag = ntohl(*((uint32_t *)(packet->data + 4)));
if(tag != senkusha->ping_tag)
{
CHIAKI_LOGW(senkusha->log, "Senkusha received Pong with invalid tag");
goto beach;
}
senkusha->pong_time_us = time_us;
senkusha->state_finished = true;
chiaki_mutex_unlock(&senkusha->state_mutex);
chiaki_cond_signal(&senkusha->state_cond);
return;
}
beach:
chiaki_mutex_unlock(&senkusha->state_mutex);
}
static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha)
{
tkproto_TakionMessage msg;

View file

@ -1152,7 +1152,7 @@ static void takion_handle_packet_av(ChiakiTakion *takion, uint8_t base_type, uin
assert(base_type == TAKION_PACKET_TYPE_VIDEO || base_type == TAKION_PACKET_TYPE_AUDIO);
ChiakiTakionAVPacket packet;
ChiakiErrorCode err = chiaki_takion_v9_av_packet_parse(&packet, buf, buf_size);
ChiakiErrorCode err = takion->av_packet_parse(&packet, buf, buf_size);
if(err != CHIAKI_ERR_SUCCESS)
{
if(err == CHIAKI_ERR_BUF_TOO_SMALL)
@ -1255,7 +1255,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_v9_av_packet_parse(ChiakiTakionAVPac
return CHIAKI_ERR_SUCCESS;
}
#define CHIAKI_TAKION_V7_AV_HEADER_SIZE_BASE 0x11
#define CHIAKI_TAKION_V7_AV_HEADER_SIZE_BASE 0x12
#define CHIAKI_TAKION_V7_AV_HEADER_SIZE_VIDEO_ADD 0x3
#define CHIAKI_TAKION_V7_AV_HEADER_SIZE_NALU_INFO_STRUCTS_ADD 0x3
@ -1303,7 +1303,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_v7_av_packet_format_header(uint8_t *
cur[2] = 0; // unknown
}
return CHIAKI_ERR_BUF_TOO_SMALL;
return CHIAKI_ERR_SUCCESS;
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_v7_av_packet_parse(ChiakiTakionAVPacket *packet, uint8_t *buf, size_t buf_size)

View file

@ -19,9 +19,9 @@
#include <time.h>
CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_ms()
CHIAKI_EXPORT uint64_t chiaki_time_now_monotonic_us()
{
struct timespec time;
clock_gettime(CLOCK_MONOTONIC, &time);
return time.tv_sec * 1000 + time.tv_nsec / 1000000;
return time.tv_sec * 1000000 + time.tv_nsec / 1000;
}