Send Senkusha Echo Command

This commit is contained in:
Florian Märkl 2019-08-09 12:44:10 +02:00
commit 4c216db5d2
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
9 changed files with 154 additions and 20 deletions

View file

@ -76,7 +76,7 @@ int main(int argc, char *argv[])
chiaki_connect_video_profile_preset(&connect_info.video_profile,
CHIAKI_VIDEO_RESOLUTION_PRESET_720p,
CHIAKI_VIDEO_FPS_PRESET_60);
CHIAKI_VIDEO_FPS_PRESET_30);
if(connect_info.registkey.isEmpty() || connect_info.ostype.isEmpty() || connect_info.auth.isEmpty() || connect_info.morning.isEmpty() || connect_info.did.isEmpty())
parser.showHelp(1);

View file

@ -36,6 +36,7 @@ typedef struct senkusha_t
bool state_finished;
bool state_failed;
bool should_stop;
ChiakiSeqNum32 data_ack_seq_num_expected;
/**
* signaled on change of state_finished or should_stop

View file

@ -80,6 +80,7 @@ typedef enum {
CHIAKI_TAKION_EVENT_TYPE_CONNECTED,
CHIAKI_TAKION_EVENT_TYPE_DISCONNECT,
CHIAKI_TAKION_EVENT_TYPE_DATA,
CHIAKI_TAKION_EVENT_TYPE_DATA_ACK,
CHIAKI_TAKION_EVENT_TYPE_AV
} ChiakiTakionEventType;
@ -95,6 +96,11 @@ typedef struct chiaki_takion_event_t
size_t buf_size;
} data;
struct
{
ChiakiSeqNum32 seq_num;
} data_ack;
ChiakiTakionAVPacket *av;
};
} ChiakiTakionEvent;
@ -198,8 +204,10 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send(ChiakiTakion *takion, uint8_t *
/**
* Thread-safe while Takion is running.
*
* @param optional pointer to write the sequence number of the sent package to
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size, ChiakiSeqNum32 *seq_num);
/**
* Thread-safe while Takion is running.

View file

@ -63,7 +63,11 @@ CHIAKI_EXPORT void chiaki_takion_send_buffer_fini(ChiakiTakionSendBuffer *send_b
* On error, buf is freed immediately.
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_push(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num);
/**
* @param acked_seq_nums optional array of size of at least send_buffer->packets_size where acked seq nums will be stored
*/
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, ChiakiSeqNum32 *acked_seq_nums, size_t *acked_seq_nums_count);
#ifdef __cplusplus
}

View file

@ -28,22 +28,30 @@
#include <pb_encode.h>
#include <pb_decode.h>
#include <pb.h>
#include <chiaki/takion.h>
#define SENKUSHA_PORT 9297
#define EXPECT_TIMEOUT_MS 5000
#define SENKUSHA_PING_COUNT_DEFAULT 10
typedef enum {
STATE_IDLE,
STATE_TAKION_CONNECT,
STATE_EXPECT_BANG
STATE_EXPECT_BANG,
STATE_EXPECT_DATA_ACK,
STATE_EXPECT_PONG,
} SenkushaState;
static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count);
static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user);
static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDataType data_type, uint8_t *buf, size_t buf_size);
static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num);
static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha);
static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha);
static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable);
CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_init(ChiakiSenkusha *senkusha, ChiakiSession *session)
{
@ -178,7 +186,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_senkusha_run(ChiakiSenkusha *senkusha)
CHIAKI_LOGI(session->log, "Senkusha successfully received bang");
// TODO: Do the actual tests
senkusha_run_ping_test(senkusha, SENKUSHA_PING_COUNT_DEFAULT);
CHIAKI_LOGI(session->log, "Senkusha is disconnecting");
@ -193,6 +201,33 @@ quit:
return err;
}
static ChiakiErrorCode senkusha_run_ping_test(ChiakiSenkusha *senkusha, uint32_t ping_count)
{
CHIAKI_LOGI(senkusha->log, "Senkusha Ping Test with count %u starting", (unsigned int)ping_count);
ChiakiErrorCode err = senkusha_send_echo_command(senkusha, true);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha Ping Test failed because sending echo command (true) failed");
return err;
}
CHIAKI_LOGI(senkusha->log, "Senkusha enabled echo");
// TODO: do test
err = senkusha_send_echo_command(senkusha, false);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha Ping Test failed because sending echo command (false) failed");
return err;
}
CHIAKI_LOGI(senkusha->log, "Senkusha disabled echo");
return CHIAKI_ERR_SUCCESS;
}
static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user)
{
ChiakiSenkusha *senkusha = user;
@ -212,6 +247,9 @@ static void senkusha_takion_cb(ChiakiTakionEvent *event, void *user)
case CHIAKI_TAKION_EVENT_TYPE_DATA:
senkusha_takion_data(senkusha, event->data.data_type, event->data.buf, event->data.buf_size);
break;
case CHIAKI_TAKION_EVENT_TYPE_DATA_ACK:
senkusha_takion_data_ack(senkusha, event->data_ack.seq_num);
break;
default:
break;
}
@ -249,6 +287,19 @@ static void senkusha_takion_data(ChiakiSenkusha *senkusha, ChiakiTakionMessageDa
chiaki_mutex_unlock(&senkusha->state_mutex);
}
static void senkusha_takion_data_ack(ChiakiSenkusha *senkusha, ChiakiSeqNum32 seq_num)
{
chiaki_mutex_lock(&senkusha->state_mutex);
if(senkusha->state == STATE_EXPECT_DATA_ACK && senkusha->data_ack_seq_num_expected == seq_num)
{
senkusha->state_finished = true;
chiaki_mutex_unlock(&senkusha->state_mutex);
chiaki_cond_signal(&senkusha->state_cond);
}
else
chiaki_mutex_unlock(&senkusha->state_mutex);
}
static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha)
{
tkproto_TakionMessage msg;
@ -276,7 +327,7 @@ static ChiakiErrorCode senkusha_send_big(ChiakiSenkusha *senkusha)
}
buf_size = stream.bytes_written;
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size);
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, NULL);
return err;
}
@ -303,9 +354,57 @@ static ChiakiErrorCode senkusha_send_disconnect(ChiakiSenkusha *senkusha)
}
buf_size = stream.bytes_written;
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size);
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, NULL);
return err;
}
static ChiakiErrorCode senkusha_send_echo_command(ChiakiSenkusha *senkusha, bool enable)
{
tkproto_TakionMessage msg;
memset(&msg, 0, sizeof(msg));
msg.type = tkproto_TakionMessage_PayloadType_SENKUSHA;
msg.has_senkusha_payload = true;
msg.senkusha_payload.command = tkproto_SenkushaPayload_Command_ECHO_COMMAND;
msg.senkusha_payload.has_echo_command = true;
msg.senkusha_payload.echo_command.state = enable;
uint8_t buf[0x10];
size_t buf_size;
pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf));
bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg);
if(!pbr)
{
CHIAKI_LOGE(senkusha->log, "Senkusha echo command protobuf encoding failed");
return CHIAKI_ERR_UNKNOWN;
}
buf_size = stream.bytes_written;
senkusha->state = STATE_EXPECT_DATA_ACK;
senkusha->state_finished = false;
senkusha->state_failed = false;
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 1, 1, buf, buf_size, &senkusha->data_ack_seq_num_expected);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(senkusha->log, "Senkusha failed to send echo command");
return err;
}
err = chiaki_cond_timedwait_pred(&senkusha->state_cond, &senkusha->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, senkusha);
assert(err == CHIAKI_ERR_SUCCESS || err == CHIAKI_ERR_TIMEOUT);
if(!senkusha->state_finished)
{
if(err == CHIAKI_ERR_TIMEOUT)
CHIAKI_LOGE(senkusha->log, "Senkusha data ack for echo command receive timeout");
if(senkusha->should_stop)
err = CHIAKI_ERR_CANCELED;
else
CHIAKI_LOGE(senkusha->log, "Senkusha failed to receive data ack for echo command");
}
return err;
}

View file

@ -257,7 +257,7 @@ static bool session_check_state_pred(void *user)
|| session->ctrl_session_id_received;
}
//#define ENABLE_SENKUSHA
#define ENABLE_SENKUSHA
static void *session_thread_func(void *arg)
{

View file

@ -728,7 +728,7 @@ static ChiakiErrorCode stream_connection_send_big(ChiakiStreamConnection *stream
}
buf_size = stream.bytes_written;
err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size);
err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size, NULL);
return err;
}
@ -751,7 +751,7 @@ static ChiakiErrorCode stream_connection_send_streaminfo_ack(ChiakiStreamConnect
}
buf_size = stream.bytes_written;
return chiaki_takion_send_message_data(&stream_connection->takion, 1, 9, buf, buf_size);
return chiaki_takion_send_message_data(&stream_connection->takion, 1, 9, buf, buf_size, NULL);
}
static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection *stream_connection)
@ -778,7 +778,7 @@ static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection
CHIAKI_LOGI(stream_connection->log, "StreamConnection sending Disconnect");
buf_size = stream.bytes_written;
ChiakiErrorCode err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size);
ChiakiErrorCode err = chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, buf_size, NULL);
return err;
}
@ -810,7 +810,7 @@ static ChiakiErrorCode stream_connection_send_heartbeat(ChiakiStreamConnection *
return CHIAKI_ERR_UNKNOWN;
}
return chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, stream.bytes_written);
return chiaki_takion_send_message_data(&stream_connection->takion, 1, 1, buf, stream.bytes_written, NULL);
}
CHIAKI_EXPORT ChiakiErrorCode stream_connection_send_corrupt_frame(ChiakiStreamConnection *stream_connection, ChiakiSeqNum16 start, ChiakiSeqNum16 end)
@ -832,5 +832,5 @@ CHIAKI_EXPORT ChiakiErrorCode stream_connection_send_corrupt_frame(ChiakiStreamC
}
CHIAKI_LOGD(stream_connection->log, "StreamConnection reporting corrupt frame(s) from %u to %u", (unsigned int)start, (unsigned int)end);
return chiaki_takion_send_message_data(&stream_connection->takion, 1, 2, buf, stream.bytes_written);
return chiaki_takion_send_message_data(&stream_connection->takion, 1, 2, buf, stream.bytes_written, NULL);
}

View file

@ -349,7 +349,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send(ChiakiTakion *takion, uint8_t *
return chiaki_takion_send_raw(takion, buf, buf_size);
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size)
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint8_t chunk_flags, uint16_t channel, uint8_t *buf, size_t buf_size, ChiakiSeqNum32 *seq_num)
{
// TODO: can we make this more memory-efficient?
// TODO: split packet if necessary?
@ -372,10 +372,10 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki
err = chiaki_mutex_lock(&takion->seq_num_local_mutex);
if(err != CHIAKI_ERR_SUCCESS)
return err;
ChiakiSeqNum32 seq_num = takion->seq_num_local++;
ChiakiSeqNum32 seq_num_val = takion->seq_num_local++;
chiaki_mutex_unlock(&takion->seq_num_local_mutex);
*((uint32_t *)(msg_payload + 0)) = htonl(seq_num);
*((uint32_t *)(msg_payload + 0)) = htonl(seq_num_val);
*((uint16_t *)(msg_payload + 4)) = htons(channel);
*((uint16_t *)(msg_payload + 6)) = 0;
*(msg_payload + 8) = 0;
@ -389,7 +389,10 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki
return err;
}
chiaki_takion_send_buffer_push(&takion->send_buffer, seq_num, packet_buf, packet_size);
chiaki_takion_send_buffer_push(&takion->send_buffer, seq_num_val, packet_buf, packet_size);
if(seq_num)
*seq_num = seq_num_val;
return err;
}
@ -606,6 +609,7 @@ static void *takion_thread_func(void *user)
chiaki_reorder_queue_set_drop_cb(&takion->data_queue, takion_data_drop, takion);
// The send buffer size MUST be consistent with the acked seqnums array size in takion_handle_packet_message_data_ack()
if(chiaki_takion_send_buffer_init(&takion->send_buffer, takion, TAKION_SEND_BUFFER_SIZE) != CHIAKI_ERR_SUCCESS)
goto error_reoder_queue;
@ -939,8 +943,20 @@ static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t
if(dup_tsns_count != 0)
CHIAKI_LOGW(takion->log, "Takion received data ack with nonzero dup_tsns_count %#x", dup_tsns_count);
//CHIAKI_LOGD(takion->log, "Takion received data ack with seq_num = %#x, something = %#x, size_or_something = %#x, zero = %#x", seq_num, something, size_internal, zero);
chiaki_takion_send_buffer_ack(&takion->send_buffer, cumulative_seq_num);
CHIAKI_LOGV(takion->log, "Takion received data ack with cumulative_seq_num = %#x, a_rwnd = %#x, gap_ack_blocks_count = %#x, dup_tsns_count = %#x",
cumulative_seq_num, a_rwnd, gap_ack_blocks_count, dup_tsns_count);
ChiakiSeqNum32 acked_seq_nums[TAKION_SEND_BUFFER_SIZE];
size_t acked_seq_nums_count = 0;
chiaki_takion_send_buffer_ack(&takion->send_buffer, cumulative_seq_num, acked_seq_nums, &acked_seq_nums_count);
for(size_t i=0; i<acked_seq_nums_count; i++)
{
ChiakiTakionEvent event = { 0 };
event.type = CHIAKI_TAKION_EVENT_TYPE_DATA_ACK;
event.data_ack.seq_num = acked_seq_nums[i];
takion->cb(&event, takion->cb_user);
}
}
/**

View file

@ -144,12 +144,15 @@ beach:
return err;
}
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num)
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuffer *send_buffer, ChiakiSeqNum32 seq_num, ChiakiSeqNum32 *acked_seq_nums, size_t *acked_seq_nums_count)
{
ChiakiErrorCode err = chiaki_mutex_lock(&send_buffer->mutex);
if(err != CHIAKI_ERR_SUCCESS)
return err;
if(acked_seq_nums_count)
*acked_seq_nums_count = 0;
size_t i;
size_t shift = 0; // amount to shift back
size_t shift_start = SIZE_MAX;
@ -157,6 +160,9 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_buffer_ack(ChiakiTakionSendBuff
{
if(send_buffer->packets[i].seq_num == seq_num || chiaki_seq_num_32_lt(send_buffer->packets[i].seq_num, seq_num))
{
if(acked_seq_nums)
acked_seq_nums[(*acked_seq_nums_count)++] = send_buffer->packets[i].seq_num;
free(send_buffer->packets[i].buf);
if(shift_start == SIZE_MAX)
{