Add Congestion Control

This commit is contained in:
Florian Märkl 2020-11-18 20:15:24 +01:00
commit ffb8851835
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
16 changed files with 269 additions and 60 deletions

View file

@ -17,7 +17,6 @@ for Linux, FreeBSD, OpenBSD, Android, macOS, Windows, Nintendo Switch and potent
Everything necessary for a full streaming session, including the initial Everything necessary for a full streaming session, including the initial
registration and wakeup of the console, is supported. registration and wakeup of the console, is supported.
The following features however are yet to be implemented: The following features however are yet to be implemented:
* Congestion Control
* H264 Error Concealment (FEC and active error recovery however are implemented) * H264 Error Concealment (FEC and active error recovery however are implemented)
* Touchpad support (Triggering the Touchpad Button is currently possible from the keyboard though) * Touchpad support (Triggering the Touchpad Button is currently possible from the keyboard though)
* Rumble * Rumble

View file

@ -21,6 +21,7 @@ set(HEADER_FILES
include/chiaki/video.h include/chiaki/video.h
include/chiaki/videoreceiver.h include/chiaki/videoreceiver.h
include/chiaki/frameprocessor.h include/chiaki/frameprocessor.h
include/chiaki/packetstats.h
include/chiaki/seqnum.h include/chiaki/seqnum.h
include/chiaki/discovery.h include/chiaki/discovery.h
include/chiaki/congestioncontrol.h include/chiaki/congestioncontrol.h
@ -59,6 +60,7 @@ set(SOURCE_FILES
src/audioreceiver.c src/audioreceiver.c
src/videoreceiver.c src/videoreceiver.c
src/frameprocessor.c src/frameprocessor.c
src/packetstats.c
src/discovery.c src/discovery.c
src/congestioncontrol.c src/congestioncontrol.c
src/stoppipe.c src/stoppipe.c

View file

@ -8,6 +8,7 @@
#include "audio.h" #include "audio.h"
#include "takion.h" #include "takion.h"
#include "thread.h" #include "thread.h"
#include "packetstats.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -33,19 +34,20 @@ typedef struct chiaki_audio_receiver_t
ChiakiMutex mutex; ChiakiMutex mutex;
ChiakiSeqNum16 frame_index_prev; ChiakiSeqNum16 frame_index_prev;
bool frame_index_startup; // whether frame_index_prev has definitely not wrapped yet bool frame_index_startup; // whether frame_index_prev has definitely not wrapped yet
ChiakiPacketStats *packet_stats;
} ChiakiAudioReceiver; } ChiakiAudioReceiver;
CHIAKI_EXPORT ChiakiErrorCode chiaki_audio_receiver_init(ChiakiAudioReceiver *audio_receiver, struct chiaki_session_t *session); CHIAKI_EXPORT ChiakiErrorCode chiaki_audio_receiver_init(ChiakiAudioReceiver *audio_receiver, struct chiaki_session_t *session, ChiakiPacketStats *packet_stats);
CHIAKI_EXPORT void chiaki_audio_receiver_fini(ChiakiAudioReceiver *audio_receiver); CHIAKI_EXPORT void chiaki_audio_receiver_fini(ChiakiAudioReceiver *audio_receiver);
CHIAKI_EXPORT void chiaki_audio_receiver_stream_info(ChiakiAudioReceiver *audio_receiver, ChiakiAudioHeader *audio_header); CHIAKI_EXPORT void chiaki_audio_receiver_stream_info(ChiakiAudioReceiver *audio_receiver, ChiakiAudioHeader *audio_header);
CHIAKI_EXPORT void chiaki_audio_receiver_av_packet(ChiakiAudioReceiver *audio_receiver, ChiakiTakionAVPacket *packet); CHIAKI_EXPORT void chiaki_audio_receiver_av_packet(ChiakiAudioReceiver *audio_receiver, ChiakiTakionAVPacket *packet);
static inline ChiakiAudioReceiver *chiaki_audio_receiver_new(struct chiaki_session_t *session) static inline ChiakiAudioReceiver *chiaki_audio_receiver_new(struct chiaki_session_t *session, ChiakiPacketStats *packet_stats)
{ {
ChiakiAudioReceiver *audio_receiver = CHIAKI_NEW(ChiakiAudioReceiver); ChiakiAudioReceiver *audio_receiver = CHIAKI_NEW(ChiakiAudioReceiver);
if(!audio_receiver) if(!audio_receiver)
return NULL; return NULL;
ChiakiErrorCode err = chiaki_audio_receiver_init(audio_receiver, session); ChiakiErrorCode err = chiaki_audio_receiver_init(audio_receiver, session, packet_stats);
if(err != CHIAKI_ERR_SUCCESS) if(err != CHIAKI_ERR_SUCCESS)
{ {
free(audio_receiver); free(audio_receiver);

View file

@ -5,6 +5,7 @@
#include "takion.h" #include "takion.h"
#include "thread.h" #include "thread.h"
#include "packetstats.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -13,11 +14,12 @@ extern "C" {
typedef struct chiaki_congestion_control_t typedef struct chiaki_congestion_control_t
{ {
ChiakiTakion *takion; ChiakiTakion *takion;
ChiakiPacketStats *stats;
ChiakiThread thread; ChiakiThread thread;
ChiakiBoolPredCond stop_cond; ChiakiBoolPredCond stop_cond;
} ChiakiCongestionControl; } ChiakiCongestionControl;
CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion); CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion, ChiakiPacketStats *stats);
/** /**
* Stop control and join the thread * Stop control and join the thread

View file

@ -5,6 +5,7 @@
#include "common.h" #include "common.h"
#include "takion.h" #include "takion.h"
#include "packetstats.h"
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
@ -13,6 +14,16 @@
extern "C" { extern "C" {
#endif #endif
typedef struct chiaki_stream_stats_t
{
uint64_t frames;
uint64_t bytes;
} ChiakiStreamStats;
CHIAKI_EXPORT void chiaki_stream_stats_reset(ChiakiStreamStats *stats);
CHIAKI_EXPORT void chiaki_stream_stats_frame(ChiakiStreamStats *stats, uint64_t size);
CHIAKI_EXPORT uint64_t chiaki_stream_stats_bitrate(ChiakiStreamStats *stats, uint64_t framerate);
struct chiaki_frame_unit_t; struct chiaki_frame_unit_t;
typedef struct chiaki_frame_unit_t ChiakiFrameUnit; typedef struct chiaki_frame_unit_t ChiakiFrameUnit;
@ -29,6 +40,8 @@ typedef struct chiaki_frame_processor_t
unsigned int units_fec_received; unsigned int units_fec_received;
ChiakiFrameUnit *unit_slots; ChiakiFrameUnit *unit_slots;
size_t unit_slots_size; size_t unit_slots_size;
bool flushed; // whether we have already flushed the current frame, i.e. are only interested in stats, not data.
ChiakiStreamStats stream_stats;
} ChiakiFrameProcessor; } ChiakiFrameProcessor;
typedef enum chiaki_frame_flush_result_t { typedef enum chiaki_frame_flush_result_t {
@ -41,6 +54,7 @@ typedef enum chiaki_frame_flush_result_t {
CHIAKI_EXPORT void chiaki_frame_processor_init(ChiakiFrameProcessor *frame_processor, ChiakiLog *log); CHIAKI_EXPORT void chiaki_frame_processor_init(ChiakiFrameProcessor *frame_processor, ChiakiLog *log);
CHIAKI_EXPORT void chiaki_frame_processor_fini(ChiakiFrameProcessor *frame_processor); CHIAKI_EXPORT void chiaki_frame_processor_fini(ChiakiFrameProcessor *frame_processor);
CHIAKI_EXPORT void chiaki_frame_processor_report_packet_stats(ChiakiFrameProcessor *frame_processor, ChiakiPacketStats *packet_stats);
CHIAKI_EXPORT ChiakiErrorCode chiaki_frame_processor_alloc_frame(ChiakiFrameProcessor *frame_processor, ChiakiTakionAVPacket *packet); CHIAKI_EXPORT ChiakiErrorCode chiaki_frame_processor_alloc_frame(ChiakiFrameProcessor *frame_processor, ChiakiTakionAVPacket *packet);
CHIAKI_EXPORT ChiakiErrorCode chiaki_frame_processor_put_unit(ChiakiFrameProcessor *frame_processor, ChiakiTakionAVPacket *packet); CHIAKI_EXPORT ChiakiErrorCode chiaki_frame_processor_put_unit(ChiakiFrameProcessor *frame_processor, ChiakiTakionAVPacket *packet);

View file

@ -0,0 +1,38 @@
// SPDX-License-Identifier: LicenseRef-GPL-3.0-or-later-OpenSSL
#ifndef CHIAKI_PACKETSTATS_H
#define CHIAKI_PACKETSTATS_H
#include "thread.h"
#include "seqnum.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct chiaki_packet_stats_t
{
ChiakiMutex mutex;
// For generations of packets, i.e. where we know the number of expected packets per generation
uint64_t gen_received;
uint64_t gen_lost;
// For sequential packets, i.e. where packets are identified by a sequence number
ChiakiSeqNum16 seq_min; // sequence number that was max at the last reset
ChiakiSeqNum16 seq_max; // currently maximal sequence number
uint64_t seq_received; // total received packets since the last reset
} ChiakiPacketStats;
CHIAKI_EXPORT ChiakiErrorCode chiaki_packet_stats_init(ChiakiPacketStats *stats);
CHIAKI_EXPORT void chiaki_packet_stats_fini(ChiakiPacketStats *stats);
CHIAKI_EXPORT void chiaki_packet_stats_reset(ChiakiPacketStats *stats);
CHIAKI_EXPORT void chiaki_packet_stats_push_generation(ChiakiPacketStats *stats, uint64_t received, uint64_t lost);
CHIAKI_EXPORT void chiaki_packet_stats_push_seq(ChiakiPacketStats *stats, ChiakiSeqNum16 seq_num);
CHIAKI_EXPORT void chiaki_packet_stats_get(ChiakiPacketStats *stats, bool reset, uint64_t *received, uint64_t *lost);
#ifdef __cplusplus
}
#endif
#endif // CHIAKI_PACKETSTATS_H

View file

@ -12,8 +12,6 @@
#include "takion.h" #include "takion.h"
#include "ecdh.h" #include "ecdh.h"
#include "audio.h" #include "audio.h"
#include "audioreceiver.h"
#include "videoreceiver.h"
#include "controller.h" #include "controller.h"
#include "stoppipe.h" #include "stoppipe.h"
@ -199,8 +197,6 @@ typedef struct chiaki_session_t
ChiakiLog *log; ChiakiLog *log;
ChiakiStreamConnection stream_connection; ChiakiStreamConnection stream_connection;
ChiakiAudioReceiver *audio_receiver;
ChiakiVideoReceiver *video_receiver;
ChiakiControllerState controller_state; ChiakiControllerState controller_state;
} ChiakiSession; } ChiakiSession;

View file

@ -8,6 +8,9 @@
#include "log.h" #include "log.h"
#include "ecdh.h" #include "ecdh.h"
#include "gkcrypt.h" #include "gkcrypt.h"
#include "audioreceiver.h"
#include "videoreceiver.h"
#include "congestioncontrol.h"
#include <stdbool.h> #include <stdbool.h>
@ -26,6 +29,10 @@ typedef struct chiaki_stream_connection_t
ChiakiGKCrypt *gkcrypt_local; ChiakiGKCrypt *gkcrypt_local;
ChiakiGKCrypt *gkcrypt_remote; ChiakiGKCrypt *gkcrypt_remote;
ChiakiPacketStats packet_stats;
ChiakiAudioReceiver *audio_receiver;
ChiakiVideoReceiver *video_receiver;
ChiakiFeedbackSender feedback_sender; ChiakiFeedbackSender feedback_sender;
/** /**
* whether feedback_sender is initialized * whether feedback_sender is initialized

View file

@ -27,9 +27,10 @@ typedef struct chiaki_video_receiver_t
int32_t frame_index_prev; // last frame that has been at least partially decoded int32_t frame_index_prev; // last frame that has been at least partially decoded
int32_t frame_index_prev_complete; // last frame that has been completely decoded int32_t frame_index_prev_complete; // last frame that has been completely decoded
ChiakiFrameProcessor frame_processor; ChiakiFrameProcessor frame_processor;
ChiakiPacketStats *packet_stats;
} ChiakiVideoReceiver; } ChiakiVideoReceiver;
CHIAKI_EXPORT void chiaki_video_receiver_init(ChiakiVideoReceiver *video_receiver, struct chiaki_session_t *session); CHIAKI_EXPORT void chiaki_video_receiver_init(ChiakiVideoReceiver *video_receiver, struct chiaki_session_t *session, ChiakiPacketStats *packet_stats);
CHIAKI_EXPORT void chiaki_video_receiver_fini(ChiakiVideoReceiver *video_receiver); CHIAKI_EXPORT void chiaki_video_receiver_fini(ChiakiVideoReceiver *video_receiver);
/** /**
@ -43,12 +44,12 @@ CHIAKI_EXPORT void chiaki_video_receiver_stream_info(ChiakiVideoReceiver *video_
CHIAKI_EXPORT void chiaki_video_receiver_av_packet(ChiakiVideoReceiver *video_receiver, ChiakiTakionAVPacket *packet); CHIAKI_EXPORT void chiaki_video_receiver_av_packet(ChiakiVideoReceiver *video_receiver, ChiakiTakionAVPacket *packet);
static inline ChiakiVideoReceiver *chiaki_video_receiver_new(struct chiaki_session_t *session) static inline ChiakiVideoReceiver *chiaki_video_receiver_new(struct chiaki_session_t *session, ChiakiPacketStats *packet_stats)
{ {
ChiakiVideoReceiver *video_receiver = CHIAKI_NEW(ChiakiVideoReceiver); ChiakiVideoReceiver *video_receiver = CHIAKI_NEW(ChiakiVideoReceiver);
if(!video_receiver) if(!video_receiver)
return NULL; return NULL;
chiaki_video_receiver_init(video_receiver, session); chiaki_video_receiver_init(video_receiver, session, packet_stats);
return video_receiver; return video_receiver;
} }

View file

@ -7,10 +7,11 @@
static void chiaki_audio_receiver_frame(ChiakiAudioReceiver *audio_receiver, ChiakiSeqNum16 frame_index, uint8_t *buf, size_t buf_size); static void chiaki_audio_receiver_frame(ChiakiAudioReceiver *audio_receiver, ChiakiSeqNum16 frame_index, uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_audio_receiver_init(ChiakiAudioReceiver *audio_receiver, ChiakiSession *session) CHIAKI_EXPORT ChiakiErrorCode chiaki_audio_receiver_init(ChiakiAudioReceiver *audio_receiver, ChiakiSession *session, ChiakiPacketStats *packet_stats)
{ {
audio_receiver->session = session; audio_receiver->session = session;
audio_receiver->log = session->log; audio_receiver->log = session->log;
audio_receiver->packet_stats = packet_stats;
audio_receiver->frame_index_prev = 0; audio_receiver->frame_index_prev = 0;
audio_receiver->frame_index_startup = true; audio_receiver->frame_index_startup = true;
@ -101,8 +102,11 @@ CHIAKI_EXPORT void chiaki_audio_receiver_av_packet(ChiakiAudioReceiver *audio_re
frame_index = packet->frame_index - fec_units_count + fec_index; frame_index = packet->frame_index - fec_units_count + fec_index;
} }
chiaki_audio_receiver_frame(audio_receiver->session->audio_receiver, frame_index, packet->data + unit_size * i, unit_size); chiaki_audio_receiver_frame(audio_receiver, frame_index, packet->data + unit_size * i, unit_size);
} }
if(audio_receiver->packet_stats)
chiaki_packet_stats_push_seq(audio_receiver->packet_stats, packet->frame_index);
} }
static void chiaki_audio_receiver_frame(ChiakiAudioReceiver *audio_receiver, ChiakiSeqNum16 frame_index, uint8_t *buf, size_t buf_size) static void chiaki_audio_receiver_frame(ChiakiAudioReceiver *audio_receiver, ChiakiSeqNum16 frame_index, uint8_t *buf, size_t buf_size)

View file

@ -2,10 +2,8 @@
#include <chiaki/congestioncontrol.h> #include <chiaki/congestioncontrol.h>
#define CONGESTION_CONTROL_INTERVAL_MS 200 #define CONGESTION_CONTROL_INTERVAL_MS 200
static void *congestion_control_thread_func(void *user) static void *congestion_control_thread_func(void *user)
{ {
ChiakiCongestionControl *control = user; ChiakiCongestionControl *control = user;
@ -20,8 +18,14 @@ static void *congestion_control_thread_func(void *user)
if(err != CHIAKI_ERR_TIMEOUT) if(err != CHIAKI_ERR_TIMEOUT)
break; break;
//CHIAKI_LOGD(control->takion->log, "Sending Congestion Control Packet"); uint64_t received;
ChiakiTakionCongestionPacket packet = { 0 }; // TODO: fill with real values uint64_t lost;
chiaki_packet_stats_get(control->stats, true, &received, &lost);
ChiakiTakionCongestionPacket packet = { 0 };
packet.received = (uint16_t)received;
packet.lost = (uint16_t)lost;
CHIAKI_LOGV(control->takion->log, "Sending Congestion Control Packet, received: %u, lost: %u",
(unsigned int)packet.received, (unsigned int)packet.lost);
chiaki_takion_send_congestion(control->takion, &packet); chiaki_takion_send_congestion(control->takion, &packet);
} }
@ -29,9 +33,10 @@ static void *congestion_control_thread_func(void *user)
return NULL; return NULL;
} }
CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion) CHIAKI_EXPORT ChiakiErrorCode chiaki_congestion_control_start(ChiakiCongestionControl *control, ChiakiTakion *takion, ChiakiPacketStats *stats)
{ {
control->takion = takion; control->takion = takion;
control->stats = stats;
ChiakiErrorCode err = chiaki_bool_pred_cond_init(&control->stop_cond); ChiakiErrorCode err = chiaki_bool_pred_cond_init(&control->stop_cond);
if(err != CHIAKI_ERR_SUCCESS) if(err != CHIAKI_ERR_SUCCESS)

View file

@ -13,15 +13,32 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#endif #endif
#define UNIT_SLOTS_MAX 256 CHIAKI_EXPORT void chiaki_stream_stats_reset(ChiakiStreamStats *stats)
{
stats->frames = 0;
stats->bytes = 0;
}
CHIAKI_EXPORT void chiaki_stream_stats_frame(ChiakiStreamStats *stats, uint64_t size)
{
stats->frames++;
stats->bytes += size;
//float br = (float)chiaki_stream_stats_bitrate(stats, 60) / 1000000.0f;
//CHIAKI_LOGD(NULL, "bitrate: %f", br);
}
CHIAKI_EXPORT uint64_t chiaki_stream_stats_bitrate(ChiakiStreamStats *stats, uint64_t framerate)
{
return (stats->bytes * 8 * framerate) / stats->frames;
}
#define UNIT_SLOTS_MAX 256
struct chiaki_frame_unit_t struct chiaki_frame_unit_t
{ {
size_t data_size; size_t data_size;
}; };
CHIAKI_EXPORT void chiaki_frame_processor_init(ChiakiFrameProcessor *frame_processor, ChiakiLog *log) CHIAKI_EXPORT void chiaki_frame_processor_init(ChiakiFrameProcessor *frame_processor, ChiakiLog *log)
{ {
frame_processor->log = log; frame_processor->log = log;
@ -33,6 +50,8 @@ CHIAKI_EXPORT void chiaki_frame_processor_init(ChiakiFrameProcessor *frame_proce
frame_processor->units_fec_expected = 0; frame_processor->units_fec_expected = 0;
frame_processor->unit_slots = NULL; frame_processor->unit_slots = NULL;
frame_processor->unit_slots_size = 0; frame_processor->unit_slots_size = 0;
frame_processor->flushed = true;
chiaki_stream_stats_reset(&frame_processor->stream_stats);
} }
CHIAKI_EXPORT void chiaki_frame_processor_fini(ChiakiFrameProcessor *frame_processor) CHIAKI_EXPORT void chiaki_frame_processor_fini(ChiakiFrameProcessor *frame_processor)
@ -49,6 +68,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_frame_processor_alloc_frame(ChiakiFrameProc
return CHIAKI_ERR_INVALID_DATA; return CHIAKI_ERR_INVALID_DATA;
} }
frame_processor->flushed = false;
frame_processor->units_source_expected = packet->units_in_frame_total - packet->units_in_frame_fec; frame_processor->units_source_expected = packet->units_in_frame_total - packet->units_in_frame_fec;
frame_processor->units_fec_expected = packet->units_in_frame_fec; frame_processor->units_fec_expected = packet->units_in_frame_fec;
if(frame_processor->units_fec_expected < 1) if(frame_processor->units_fec_expected < 1)
@ -151,9 +171,12 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_frame_processor_put_unit(ChiakiFrameProcess
} }
unit->data_size = packet->data_size; unit->data_size = packet->data_size;
memcpy(frame_processor->frame_buf + packet->unit_index * frame_processor->buf_stride_per_unit, if(!frame_processor->flushed)
packet->data, {
packet->data_size); memcpy(frame_processor->frame_buf + packet->unit_index * frame_processor->buf_stride_per_unit,
packet->data,
packet->data_size);
}
if(packet->unit_index < frame_processor->units_source_expected) if(packet->unit_index < frame_processor->units_source_expected)
frame_processor->units_source_received++; frame_processor->units_source_received++;
@ -163,6 +186,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_frame_processor_put_unit(ChiakiFrameProcess
return CHIAKI_ERR_SUCCESS; return CHIAKI_ERR_SUCCESS;
} }
CHIAKI_EXPORT void chiaki_frame_processor_report_packet_stats(ChiakiFrameProcessor *frame_processor, ChiakiPacketStats *packet_stats)
{
uint64_t received = frame_processor->units_source_received + frame_processor->units_fec_received;
uint64_t expected = frame_processor->units_source_expected + frame_processor->units_fec_expected;
chiaki_packet_stats_push_generation(packet_stats, received, expected - received);
}
static ChiakiErrorCode chiaki_frame_processor_fec(ChiakiFrameProcessor *frame_processor) static ChiakiErrorCode chiaki_frame_processor_fec(ChiakiFrameProcessor *frame_processor)
{ {
CHIAKI_LOGI(frame_processor->log, "Frame Processor received %u+%u / %u+%u units, attempting FEC", CHIAKI_LOGI(frame_processor->log, "Frame Processor received %u+%u / %u+%u units, attempting FEC",
@ -232,9 +262,13 @@ static ChiakiErrorCode chiaki_frame_processor_fec(ChiakiFrameProcessor *frame_pr
CHIAKI_EXPORT ChiakiFrameProcessorFlushResult chiaki_frame_processor_flush(ChiakiFrameProcessor *frame_processor, uint8_t **frame, size_t *frame_size) CHIAKI_EXPORT ChiakiFrameProcessorFlushResult chiaki_frame_processor_flush(ChiakiFrameProcessor *frame_processor, uint8_t **frame, size_t *frame_size)
{ {
if(frame_processor->units_source_expected == 0) if(frame_processor->units_source_expected == 0 || frame_processor->flushed)
return CHIAKI_FRAME_PROCESSOR_FLUSH_RESULT_FAILED; return CHIAKI_FRAME_PROCESSOR_FLUSH_RESULT_FAILED;
//CHIAKI_LOGD(NULL, "source: %u, fec: %u",
// frame_processor->units_source_expected,
// frame_processor->units_fec_expected);
ChiakiFrameProcessorFlushResult result = CHIAKI_FRAME_PROCESSOR_FLUSH_RESULT_SUCCESS; ChiakiFrameProcessorFlushResult result = CHIAKI_FRAME_PROCESSOR_FLUSH_RESULT_SUCCESS;
if(frame_processor->units_source_received < frame_processor->units_source_expected) if(frame_processor->units_source_received < frame_processor->units_source_expected)
{ {
@ -266,6 +300,8 @@ CHIAKI_EXPORT ChiakiFrameProcessorFlushResult chiaki_frame_processor_flush(Chiak
cur += part_size; cur += part_size;
} }
chiaki_stream_stats_frame(&frame_processor->stream_stats, (uint64_t)cur);
*frame = frame_processor->frame_buf; *frame = frame_processor->frame_buf;
*frame_size = cur; *frame_size = cur;
return result; return result;

77
lib/src/packetstats.c Normal file
View file

@ -0,0 +1,77 @@
// SPDX-License-Identifier: LicenseRef-GPL-3.0-or-later-OpenSSL
#include <chiaki/packetstats.h>
#include <chiaki/log.h>
CHIAKI_EXPORT ChiakiErrorCode chiaki_packet_stats_init(ChiakiPacketStats *stats)
{
stats->gen_received = 0;
stats->gen_lost = 0;
stats->seq_min = 0;
stats->seq_max = 0;
stats->seq_received = 0;
return chiaki_mutex_init(&stats->mutex, false);
}
CHIAKI_EXPORT void chiaki_packet_stats_fini(ChiakiPacketStats *stats)
{
chiaki_mutex_fini(&stats->mutex);
}
static void reset_stats(ChiakiPacketStats *stats)
{
stats->gen_received = 0;
stats->gen_lost = 0;
stats->seq_min = stats->seq_max;
stats->seq_received = 0;
}
CHIAKI_EXPORT void chiaki_packet_stats_reset(ChiakiPacketStats *stats)
{
chiaki_mutex_lock(&stats->mutex);
reset_stats(stats);
chiaki_mutex_unlock(&stats->mutex);
}
CHIAKI_EXPORT void chiaki_packet_stats_push_generation(ChiakiPacketStats *stats, uint64_t received, uint64_t lost)
{
chiaki_mutex_lock(&stats->mutex);
stats->gen_received += received;
stats->gen_lost += lost;
chiaki_mutex_unlock(&stats->mutex);
}
CHIAKI_EXPORT void chiaki_packet_stats_push_seq(ChiakiPacketStats *stats, ChiakiSeqNum16 seq_num)
{
stats->seq_received++;
if(chiaki_seq_num_16_gt(seq_num, stats->seq_max))
stats->seq_max = seq_num;
}
CHIAKI_EXPORT void chiaki_packet_stats_get(ChiakiPacketStats *stats, bool reset, uint64_t *received, uint64_t *lost)
{
chiaki_mutex_lock(&stats->mutex);
// gen
*received = stats->gen_received;
*lost = stats->gen_lost;
//CHIAKI_LOGD(NULL, "gen received: %llu, lost: %llu",
// (unsigned long long)stats->gen_received,
// (unsigned long long)stats->gen_lost);
// seq
uint64_t seq_diff = stats->seq_max - stats->seq_min; // overflow on purpose if max < min
uint64_t seq_lost = stats->seq_received > seq_diff ? seq_diff : seq_diff - stats->seq_received;
*received += stats->seq_received;
*lost += seq_lost;
//CHIAKI_LOGD(NULL, "seq received: %llu, lost: %llu",
// (unsigned long long)stats->seq_received,
// (unsigned long long)seq_lost);
if(reset)
reset_stats(stats);
chiaki_mutex_unlock(&stats->mutex);
}

View file

@ -484,20 +484,6 @@ ctrl_failed:
QUIT(quit_ctrl); QUIT(quit_ctrl);
} }
session->audio_receiver = chiaki_audio_receiver_new(session);
if(!session->audio_receiver)
{
CHIAKI_LOGE(session->log, "Session failed to initialize Audio Receiver");
QUIT(quit_ecdh);
}
session->video_receiver = chiaki_video_receiver_new(session);
if(!session->video_receiver)
{
CHIAKI_LOGE(session->log, "Session failed to initialize Video Receiver");
QUIT(quit_audio_receiver);
}
chiaki_mutex_unlock(&session->state_mutex); chiaki_mutex_unlock(&session->state_mutex);
err = chiaki_stream_connection_run(&session->stream_connection); err = chiaki_stream_connection_run(&session->stream_connection);
chiaki_mutex_lock(&session->state_mutex); chiaki_mutex_lock(&session->state_mutex);
@ -518,16 +504,7 @@ ctrl_failed:
session->quit_reason = CHIAKI_QUIT_REASON_STOPPED; session->quit_reason = CHIAKI_QUIT_REASON_STOPPED;
} }
chiaki_video_receiver_free(session->video_receiver);
session->video_receiver = NULL;
chiaki_mutex_unlock(&session->state_mutex); chiaki_mutex_unlock(&session->state_mutex);
quit_audio_receiver:
chiaki_audio_receiver_free(session->audio_receiver);
session->audio_receiver = NULL;
quit_ecdh:
chiaki_ecdh_fini(&session->ecdh); chiaki_ecdh_fini(&session->ecdh);
quit_ctrl: quit_ctrl:

View file

@ -71,10 +71,17 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_init(ChiakiStreamConnecti
if(err != CHIAKI_ERR_SUCCESS) if(err != CHIAKI_ERR_SUCCESS)
goto error_state_mutex; goto error_state_mutex;
err = chiaki_mutex_init(&stream_connection->feedback_sender_mutex, false); err = chiaki_packet_stats_init(&stream_connection->packet_stats);
if(err != CHIAKI_ERR_SUCCESS) if(err != CHIAKI_ERR_SUCCESS)
goto error_state_cond; goto error_state_cond;
stream_connection->video_receiver = NULL;
stream_connection->audio_receiver = NULL;
err = chiaki_mutex_init(&stream_connection->feedback_sender_mutex, false);
if(err != CHIAKI_ERR_SUCCESS)
goto error_packet_stats;
stream_connection->state = STATE_IDLE; stream_connection->state = STATE_IDLE;
stream_connection->state_finished = false; stream_connection->state_finished = false;
stream_connection->state_failed = false; stream_connection->state_failed = false;
@ -84,6 +91,8 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_init(ChiakiStreamConnecti
return CHIAKI_ERR_SUCCESS; return CHIAKI_ERR_SUCCESS;
error_packet_stats:
chiaki_packet_stats_fini(&stream_connection->packet_stats);
error_state_cond: error_state_cond:
chiaki_cond_fini(&stream_connection->state_cond); chiaki_cond_fini(&stream_connection->state_cond);
error_state_mutex: error_state_mutex:
@ -101,6 +110,8 @@ CHIAKI_EXPORT void chiaki_stream_connection_fini(ChiakiStreamConnection *stream_
free(stream_connection->ecdh_secret); free(stream_connection->ecdh_secret);
chiaki_packet_stats_fini(&stream_connection->packet_stats);
chiaki_mutex_fini(&stream_connection->feedback_sender_mutex); chiaki_mutex_fini(&stream_connection->feedback_sender_mutex);
chiaki_cond_fini(&stream_connection->state_cond); chiaki_cond_fini(&stream_connection->state_cond);
@ -145,6 +156,21 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnectio
goto quit_label; \ goto quit_label; \
} } while(0) } } while(0)
stream_connection->audio_receiver = chiaki_audio_receiver_new(session, &stream_connection->packet_stats);
if(!stream_connection->audio_receiver)
{
CHIAKI_LOGE(session->log, "StreamConnection failed to initialize Audio Receiver");
return CHIAKI_ERR_UNKNOWN;
}
stream_connection->video_receiver = chiaki_video_receiver_new(session, &stream_connection->packet_stats);
if(!stream_connection->video_receiver)
{
CHIAKI_LOGE(session->log, "StreamConnection failed to initialize Video Receiver");
err = CHIAKI_ERR_UNKNOWN;
goto err_audio_receiver;
}
stream_connection->state = STATE_TAKION_CONNECT; stream_connection->state = STATE_TAKION_CONNECT;
stream_connection->state_finished = false; stream_connection->state_finished = false;
stream_connection->state_failed = false; stream_connection->state_failed = false;
@ -154,7 +180,15 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnectio
{ {
CHIAKI_LOGE(session->log, "StreamConnection connect failed"); CHIAKI_LOGE(session->log, "StreamConnection connect failed");
chiaki_mutex_unlock(&stream_connection->state_mutex); chiaki_mutex_unlock(&stream_connection->state_mutex);
return err; goto err_video_receiver;
}
ChiakiCongestionControl congestion_control;
err = chiaki_congestion_control_start(&congestion_control, &stream_connection->takion, &stream_connection->packet_stats);
if(err != CHIAKI_ERR_SUCCESS)
{
CHIAKI_LOGE(session->log, "StreamConnection failed to start Congestion Control");
goto close_takion;
} }
err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, stream_connection); err = chiaki_cond_timedwait_pred(&stream_connection->state_cond, &stream_connection->state_mutex, EXPECT_TIMEOUT_MS, state_finished_cond_check, stream_connection);
@ -163,7 +197,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiStreamConnectio
if(err != CHIAKI_ERR_SUCCESS) if(err != CHIAKI_ERR_SUCCESS)
{ {
CHIAKI_LOGE(session->log, "StreamConnection Takion connect failed"); CHIAKI_LOGE(session->log, "StreamConnection Takion connect failed");
goto close_takion; goto err_congestion_control;
} }
CHIAKI_LOGI(session->log, "StreamConnection sending big"); CHIAKI_LOGI(session->log, "StreamConnection sending big");
@ -273,12 +307,23 @@ disconnect:
err = CHIAKI_ERR_DISCONNECTED; err = CHIAKI_ERR_DISCONNECTED;
} }
err_congestion_control:
chiaki_congestion_control_stop(&congestion_control);
close_takion: close_takion:
chiaki_mutex_unlock(&stream_connection->state_mutex); chiaki_mutex_unlock(&stream_connection->state_mutex);
chiaki_takion_close(&stream_connection->takion); chiaki_takion_close(&stream_connection->takion);
CHIAKI_LOGI(session->log, "StreamConnection closed takion"); CHIAKI_LOGI(session->log, "StreamConnection closed takion");
err_video_receiver:
chiaki_video_receiver_free(stream_connection->video_receiver);
stream_connection->video_receiver = NULL;
err_audio_receiver:
chiaki_audio_receiver_free(stream_connection->audio_receiver);
stream_connection->audio_receiver = NULL;
return err; return err;
} }
@ -613,9 +658,9 @@ static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnecti
ChiakiAudioHeader audio_header_s; ChiakiAudioHeader audio_header_s;
chiaki_audio_header_load(&audio_header_s, audio_header); chiaki_audio_header_load(&audio_header_s, audio_header);
chiaki_audio_receiver_stream_info(stream_connection->session->audio_receiver, &audio_header_s); chiaki_audio_receiver_stream_info(stream_connection->audio_receiver, &audio_header_s);
chiaki_video_receiver_stream_info(stream_connection->session->video_receiver, chiaki_video_receiver_stream_info(stream_connection->video_receiver,
decode_resolutions_context.video_profiles, decode_resolutions_context.video_profiles,
decode_resolutions_context.video_profiles_count); decode_resolutions_context.video_profiles_count);
@ -794,9 +839,9 @@ static void stream_connection_takion_av(ChiakiStreamConnection *stream_connectio
chiaki_gkcrypt_decrypt(stream_connection->gkcrypt_remote, packet->key_pos + CHIAKI_GKCRYPT_BLOCK_SIZE, packet->data, packet->data_size); chiaki_gkcrypt_decrypt(stream_connection->gkcrypt_remote, packet->key_pos + CHIAKI_GKCRYPT_BLOCK_SIZE, packet->data, packet->data_size);
if(packet->is_video) if(packet->is_video)
chiaki_video_receiver_av_packet(stream_connection->session->video_receiver, packet); chiaki_video_receiver_av_packet(stream_connection->video_receiver, packet);
else else
chiaki_audio_receiver_av_packet(stream_connection->session->audio_receiver, packet); chiaki_audio_receiver_av_packet(stream_connection->audio_receiver, packet);
} }

View file

@ -7,7 +7,7 @@
static ChiakiErrorCode chiaki_video_receiver_flush_frame(ChiakiVideoReceiver *video_receiver); static ChiakiErrorCode chiaki_video_receiver_flush_frame(ChiakiVideoReceiver *video_receiver);
CHIAKI_EXPORT void chiaki_video_receiver_init(ChiakiVideoReceiver *video_receiver, struct chiaki_session_t *session) CHIAKI_EXPORT void chiaki_video_receiver_init(ChiakiVideoReceiver *video_receiver, struct chiaki_session_t *session, ChiakiPacketStats *packet_stats)
{ {
video_receiver->session = session; video_receiver->session = session;
video_receiver->log = session->log; video_receiver->log = session->log;
@ -19,6 +19,7 @@ CHIAKI_EXPORT void chiaki_video_receiver_init(ChiakiVideoReceiver *video_receive
video_receiver->frame_index_prev = -1; video_receiver->frame_index_prev = -1;
chiaki_frame_processor_init(&video_receiver->frame_processor, video_receiver->log); chiaki_frame_processor_init(&video_receiver->frame_processor, video_receiver->log);
video_receiver->packet_stats = packet_stats;
} }
CHIAKI_EXPORT void chiaki_video_receiver_fini(ChiakiVideoReceiver *video_receiver) CHIAKI_EXPORT void chiaki_video_receiver_fini(ChiakiVideoReceiver *video_receiver)
@ -81,6 +82,9 @@ CHIAKI_EXPORT void chiaki_video_receiver_av_packet(ChiakiVideoReceiver *video_re
if(video_receiver->frame_index_cur < 0 || if(video_receiver->frame_index_cur < 0 ||
chiaki_seq_num_16_gt(frame_index, (ChiakiSeqNum16)video_receiver->frame_index_cur)) chiaki_seq_num_16_gt(frame_index, (ChiakiSeqNum16)video_receiver->frame_index_cur))
{ {
if(video_receiver->packet_stats)
chiaki_frame_processor_report_packet_stats(&video_receiver->frame_processor, video_receiver->packet_stats);
// last frame not flushed yet? // last frame not flushed yet?
if(video_receiver->frame_index_cur >= 0 && video_receiver->frame_index_prev != video_receiver->frame_index_cur) if(video_receiver->frame_index_cur >= 0 && video_receiver->frame_index_prev != video_receiver->frame_index_cur)
chiaki_video_receiver_flush_frame(video_receiver); chiaki_video_receiver_flush_frame(video_receiver);
@ -97,11 +101,11 @@ CHIAKI_EXPORT void chiaki_video_receiver_av_packet(ChiakiVideoReceiver *video_re
chiaki_frame_processor_alloc_frame(&video_receiver->frame_processor, packet); chiaki_frame_processor_alloc_frame(&video_receiver->frame_processor, packet);
} }
chiaki_frame_processor_put_unit(&video_receiver->frame_processor, packet);
// if we are currently building up a frame // if we are currently building up a frame
if(video_receiver->frame_index_cur != video_receiver->frame_index_prev) if(video_receiver->frame_index_cur != video_receiver->frame_index_prev)
{ {
chiaki_frame_processor_put_unit(&video_receiver->frame_processor, packet);
// if we already have enough for the whole frame, flush it already // if we already have enough for the whole frame, flush it already
if(chiaki_frame_processor_flush_possible(&video_receiver->frame_processor)) if(chiaki_frame_processor_flush_possible(&video_receiver->frame_processor))
chiaki_video_receiver_flush_frame(video_receiver); chiaki_video_receiver_flush_frame(video_receiver);