Implement basic Takion Heartbeat

This commit is contained in:
Florian Märkl 2019-06-23 15:42:48 +02:00
parent d398680604
commit a4fd9a49e8
No known key found for this signature in database
GPG key ID: 125BC8A5A6A1E857
8 changed files with 93 additions and 32 deletions

View file

@ -25,22 +25,22 @@ int main(int argc, char *argv[])
QCommandLineParser parser; QCommandLineParser parser;
parser.addHelpOption(); parser.addHelpOption();
parser.addPositionalArgument("command", "Command"); parser.addPositionalArgument("command", "stream or discover");
parser.addPositionalArgument("host", "Host"); parser.addPositionalArgument("host", "Address to connect to");
QCommandLineOption regist_key_option("registkey"); QCommandLineOption regist_key_option("registkey", "", "registkey");
parser.addOption(regist_key_option); parser.addOption(regist_key_option);
QCommandLineOption ostype_option("ostype", "Win10.0.0"); QCommandLineOption ostype_option("ostype", "", "ostype", "Win10.0.0");
parser.addOption(ostype_option); parser.addOption(ostype_option);
QCommandLineOption auth_option("auth"); QCommandLineOption auth_option("auth", "", "auth");
parser.addOption(auth_option); parser.addOption(auth_option);
QCommandLineOption morning_option("morning"); QCommandLineOption morning_option("morning", "", "morning");
parser.addOption(morning_option); parser.addOption(morning_option);
QCommandLineOption did_option("did"); QCommandLineOption did_option("did", "", "did");
parser.addOption(did_option); parser.addOption(did_option);

View file

@ -39,6 +39,7 @@ typedef struct chiaki_stream_connection_t
uint8_t *ecdh_secret; uint8_t *ecdh_secret;
ChiakiGKCrypt *gkcrypt_local; ChiakiGKCrypt *gkcrypt_local;
ChiakiGKCrypt *gkcrypt_remote; ChiakiGKCrypt *gkcrypt_remote;
ChiakiPredCond stop_cond;
} ChiakiStreamConnection; } ChiakiStreamConnection;
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(struct chiaki_session_t *session); CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(struct chiaki_session_t *session);

View file

@ -112,7 +112,7 @@ typedef struct chiaki_takion_t
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, ChiakiTakionConnectInfo *info); CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_connect(ChiakiTakion *takion, ChiakiTakionConnectInfo *info);
CHIAKI_EXPORT void chiaki_takion_close(ChiakiTakion *takion); CHIAKI_EXPORT void chiaki_takion_close(ChiakiTakion *takion);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, uint8_t *buf, size_t buf_size); CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint32_t key_pos, uint8_t type_b, uint16_t channel, uint8_t *buf, size_t buf_size); CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint32_t key_pos, uint8_t *gmac, uint8_t type_b, uint16_t channel, uint8_t *buf, size_t buf_size);
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_congestion(ChiakiTakion *takion, ChiakiTakionCongestionPacket *packet); CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_congestion(ChiakiTakion *takion, ChiakiTakionCongestionPacket *packet);
static inline void chiaki_takion_set_crypt(ChiakiTakion *takion, ChiakiGKCrypt *gkcrypt_local, ChiakiGKCrypt *gkcrypt_remote) { static inline void chiaki_takion_set_crypt(ChiakiTakion *takion, ChiakiGKCrypt *gkcrypt_local, ChiakiGKCrypt *gkcrypt_remote) {

View file

@ -18,7 +18,7 @@
#include <chiaki/congestioncontrol.h> #include <chiaki/congestioncontrol.h>
#define CONGESTION_CONTROL_INTERVAL 200 // ms #define CONGESTION_CONTROL_INTERVAL_MS 200
static void *congestion_control_thread_func(void *user) static void *congestion_control_thread_func(void *user)
@ -31,13 +31,11 @@ static void *congestion_control_thread_func(void *user)
while(true) while(true)
{ {
err = chiaki_pred_cond_timedwait(&control->stop_cond, CONGESTION_CONTROL_INTERVAL); err = chiaki_pred_cond_timedwait(&control->stop_cond, CONGESTION_CONTROL_INTERVAL_MS);
if(err != CHIAKI_ERR_SUCCESS && err != CHIAKI_ERR_TIMEOUT)
break;
if(err != CHIAKI_ERR_TIMEOUT) if(err != CHIAKI_ERR_TIMEOUT)
break; break;
CHIAKI_LOGD(control->takion->log, "Sending Congestion Control Packet\n"); //CHIAKI_LOGD(control->takion->log, "Sending Congestion Control Packet\n");
ChiakiTakionCongestionPacket packet = { 0 }; // TODO: fill with real values ChiakiTakionCongestionPacket packet = { 0 }; // TODO: fill with real values
chiaki_takion_send_congestion(control->takion, &packet); chiaki_takion_send_congestion(control->takion, &packet);
} }

View file

@ -229,7 +229,7 @@ static void ctrl_message_received_heartbeat_req(ChiakiCtrl *ctrl, uint8_t *paylo
if(payload_size != 0) if(payload_size != 0)
CHIAKI_LOGW(&ctrl->session->log, "Received Heartbeat request with non-empty payload\n"); CHIAKI_LOGW(&ctrl->session->log, "Received Heartbeat request with non-empty payload\n");
CHIAKI_LOGD(&ctrl->session->log, "Received Heartbeat\n"); CHIAKI_LOGD(&ctrl->session->log, "Received Ctrl Heartbeat\n");
ctrl_message_send(ctrl, CTRL_MESSAGE_TYPE_HEARTBEAT_REP, NULL, 0); ctrl_message_send(ctrl, CTRL_MESSAGE_TYPE_HEARTBEAT_REP, NULL, 0);
} }

View file

@ -177,7 +177,7 @@ static ChiakiErrorCode senkusha_send_big(Senkusha *senkusha)
} }
buf_size = stream.bytes_written; buf_size = stream.bytes_written;
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 0, 1, 1, buf, buf_size); ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 0, NULL, 1, 1, buf, buf_size);
return err; return err;
} }
@ -204,7 +204,7 @@ static ChiakiErrorCode senkusha_send_disconnect(Senkusha *senkusha)
} }
buf_size = stream.bytes_written; buf_size = stream.bytes_written;
ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 0, 1, 1, buf, buf_size); ChiakiErrorCode err = chiaki_takion_send_message_data(&senkusha->takion, 0, NULL, 1, 1, buf, buf_size);
return err; return err;
} }

View file

@ -40,6 +40,8 @@
#define EXPECT_TIMEOUT_MS 5000 #define EXPECT_TIMEOUT_MS 5000
#define HEARTBEAT_INTERVAL_MS 1000
typedef enum { typedef enum {
STREAM_CONNECTION_MIRAI_REQUEST_BANG = 0, STREAM_CONNECTION_MIRAI_REQUEST_BANG = 0,
@ -61,6 +63,7 @@ static void stream_connection_takion_data_expect_streaminfo(ChiakiStreamConnecti
static ChiakiErrorCode stream_connection_send_streaminfo_ack(ChiakiStreamConnection *stream_connection); static ChiakiErrorCode stream_connection_send_streaminfo_ack(ChiakiStreamConnection *stream_connection);
static void stream_connection_takion_av(ChiakiTakionAVPacket *packet, void *user); static void stream_connection_takion_av(ChiakiTakionAVPacket *packet, void *user);
static ChiakiErrorCode stream_connection_takion_mac(uint8_t *buf, size_t buf_size, size_t key_pos, uint8_t *mac_out, void *user); static ChiakiErrorCode stream_connection_takion_mac(uint8_t *buf, size_t buf_size, size_t key_pos, uint8_t *mac_out, void *user);
static ChiakiErrorCode stream_connection_send_heartbeat(ChiakiStreamConnection *stream_connection);
CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *session) CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *session)
@ -71,9 +74,13 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio
stream_connection->ecdh_secret = NULL; stream_connection->ecdh_secret = NULL;
ChiakiErrorCode err = chiaki_mirai_init(&stream_connection->mirai); ChiakiErrorCode err = chiaki_pred_cond_init(&stream_connection->stop_cond);
if(err != CHIAKI_ERR_SUCCESS) if(err != CHIAKI_ERR_SUCCESS)
goto error_mirai; goto error;
err = chiaki_mirai_init(&stream_connection->mirai);
if(err != CHIAKI_ERR_SUCCESS)
goto error_stop_cond;
ChiakiTakionConnectInfo takion_info; ChiakiTakionConnectInfo takion_info;
takion_info.log = stream_connection->log; takion_info.log = stream_connection->log;
@ -170,8 +177,24 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_stream_connection_run(ChiakiSession *sessio
CHIAKI_LOGI(&session->log, "StreamConnection successfully received streaminfo\n"); CHIAKI_LOGI(&session->log, "StreamConnection successfully received streaminfo\n");
while(1) err = chiaki_pred_cond_lock(&stream_connection->stop_cond);
sleep(1); assert(err == CHIAKI_ERR_SUCCESS);
while(true)
{
err = chiaki_pred_cond_timedwait(&stream_connection->stop_cond, HEARTBEAT_INTERVAL_MS);
if(err != CHIAKI_ERR_TIMEOUT)
break;
err = stream_connection_send_heartbeat(stream_connection);
if(err != CHIAKI_ERR_SUCCESS)
CHIAKI_LOGE(stream_connection->log, "StreamConnection failed to send heartbeat\n");
else
CHIAKI_LOGI(stream_connection->log, "StreamConnection sent heartbeat\n");
}
err = chiaki_pred_cond_unlock(&stream_connection->stop_cond);
assert(err == CHIAKI_ERR_SUCCESS);
CHIAKI_LOGI(&session->log, "StreamConnection is disconnecting\n"); CHIAKI_LOGI(&session->log, "StreamConnection is disconnecting\n");
@ -188,6 +211,9 @@ error_takion:
CHIAKI_LOGI(&session->log, "StreamConnection closed takion\n"); CHIAKI_LOGI(&session->log, "StreamConnection closed takion\n");
error_mirai: error_mirai:
chiaki_mirai_fini(&stream_connection->mirai); chiaki_mirai_fini(&stream_connection->mirai);
error_stop_cond:
chiaki_pred_cond_fini(&stream_connection->stop_cond);
error:
free(stream_connection->ecdh_secret); free(stream_connection->ecdh_secret);
return err; return err;
@ -499,7 +525,7 @@ static ChiakiErrorCode stream_connection_send_big(ChiakiStreamConnection *stream
} }
buf_size = stream.bytes_written; buf_size = stream.bytes_written;
err = chiaki_takion_send_message_data(&stream_connection->takion, 0, 1, 1, buf, buf_size); err = chiaki_takion_send_message_data(&stream_connection->takion, 0, 0, 1, 1, buf, buf_size);
return err; return err;
} }
@ -522,7 +548,7 @@ static ChiakiErrorCode stream_connection_send_streaminfo_ack(ChiakiStreamConnect
} }
buf_size = stream.bytes_written; buf_size = stream.bytes_written;
return chiaki_takion_send_message_data(&stream_connection->takion, 0, 1, 9, buf, buf_size); return chiaki_takion_send_message_data(&stream_connection->takion, 0, NULL, 1, 9, buf, buf_size);
} }
static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection *stream_connection) static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection *stream_connection)
@ -547,7 +573,7 @@ static ChiakiErrorCode stream_connection_send_disconnect(ChiakiStreamConnection
} }
buf_size = stream.bytes_written; buf_size = stream.bytes_written;
ChiakiErrorCode err = chiaki_takion_send_message_data(&stream_connection->takion, 0, 1, 1, buf, buf_size); ChiakiErrorCode err = chiaki_takion_send_message_data(&stream_connection->takion, 0, 0 /* TODO: gmac? */,1, 1, buf, buf_size);
return err; return err;
} }
@ -589,3 +615,36 @@ static void stream_connection_takion_av(ChiakiTakionAVPacket *packet, void *user
//CHIAKI_LOGD(stream_connection->log, "StreamConnection AV %lu\n", buf_size); //CHIAKI_LOGD(stream_connection->log, "StreamConnection AV %lu\n", buf_size);
//chiaki_log_hexdump(stream_connection->log, CHIAKI_LOG_DEBUG, buf, buf_size); //chiaki_log_hexdump(stream_connection->log, CHIAKI_LOG_DEBUG, buf, buf_size);
} }
static ChiakiErrorCode stream_connection_send_heartbeat(ChiakiStreamConnection *stream_connection)
{
tkproto_TakionMessage msg = { 0 };
msg.type = tkproto_TakionMessage_PayloadType_HEARTBEAT;
uint8_t buf[8];
pb_ostream_t stream = pb_ostream_from_buffer(buf, sizeof(buf));
bool pbr = pb_encode(&stream, tkproto_TakionMessage_fields, &msg);
if(!pbr)
{
CHIAKI_LOGE(stream_connection->log, "StreamConnection heartbeat protobuf encoding failed\n");
return CHIAKI_ERR_UNKNOWN;
}
size_t buf_size = stream.bytes_written;
uint8_t gmac[CHIAKI_GKCRYPT_GMAC_SIZE];
uint32_t key_pos;
ChiakiErrorCode err = chiaki_mutex_lock(&stream_connection->takion.gkcrypt_local_mutex);
if(err != CHIAKI_ERR_SUCCESS)
return err;
key_pos = stream_connection->takion.key_pos_local;
err = chiaki_gkcrypt_gmac(stream_connection->takion.gkcrypt_local, key_pos, buf, buf_size, gmac);
stream_connection->takion.key_pos_local += buf_size;
chiaki_mutex_unlock(&stream_connection->takion.gkcrypt_local_mutex);
if(err != CHIAKI_ERR_SUCCESS)
return err;
return chiaki_takion_send_message_data(&stream_connection->takion, key_pos, gmac, 1, 1, buf, buf_size);
}

View file

@ -94,7 +94,7 @@ static void takion_handle_packet_message(ChiakiTakion *takion, uint8_t *buf, siz
static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size); static void takion_handle_packet_message_data(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size);
static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size); static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t type_b, uint8_t *buf, size_t buf_size);
static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg); static ChiakiErrorCode takion_parse_message(ChiakiTakion *takion, uint8_t *buf, size_t buf_size, TakionMessage *msg);
static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t type_a, uint8_t type_b, size_t payload_data_size); static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint8_t *mac, uint32_t key_pos, uint8_t type_a, uint8_t type_b, size_t payload_data_size);
static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload); static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMessagePayloadInit *payload);
static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t *cookie); static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t *cookie);
static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout); static ChiakiErrorCode takion_recv(ChiakiTakion *takion, uint8_t *buf, size_t *buf_size, struct timeval *timeout);
@ -269,7 +269,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_raw(ChiakiTakion *takion, uint8
return CHIAKI_ERR_SUCCESS; return CHIAKI_ERR_SUCCESS;
} }
CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint32_t key_pos, uint8_t type_b, uint16_t channel, uint8_t *buf, size_t buf_size) CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *takion, uint32_t key_pos, uint8_t *gmac, uint8_t type_b, uint16_t channel, uint8_t *buf, size_t buf_size)
{ {
// TODO: can we make this more memory-efficient? // TODO: can we make this more memory-efficient?
// TODO: split packet if necessary? // TODO: split packet if necessary?
@ -279,7 +279,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data(ChiakiTakion *taki
if(!packet_buf) if(!packet_buf)
return CHIAKI_ERR_MEMORY; return CHIAKI_ERR_MEMORY;
packet_buf[0] = TAKION_PACKET_TYPE_MESSAGE; packet_buf[0] = TAKION_PACKET_TYPE_MESSAGE;
takion_write_message_header(packet_buf + 1, takion->tag_remote, key_pos, TAKION_MESSAGE_TYPE_A_DATA, type_b, 9 + buf_size); takion_write_message_header(packet_buf + 1, takion->tag_remote, gmac, key_pos, TAKION_MESSAGE_TYPE_A_DATA, type_b, 9 + buf_size);
uint8_t *msg_payload = packet_buf + 1 + MESSAGE_HEADER_SIZE; uint8_t *msg_payload = packet_buf + 1 + MESSAGE_HEADER_SIZE;
*((uint32_t *)(msg_payload + 0)) = htonl(takion->seq_num_local++); *((uint32_t *)(msg_payload + 0)) = htonl(takion->seq_num_local++);
@ -298,7 +298,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_message_data_ack(ChiakiTakion *
{ {
uint8_t buf[1 + MESSAGE_HEADER_SIZE + 0xc]; uint8_t buf[1 + MESSAGE_HEADER_SIZE + 0xc];
buf[0] = TAKION_PACKET_TYPE_MESSAGE; buf[0] = TAKION_PACKET_TYPE_MESSAGE;
takion_write_message_header(buf + 1, takion->tag_remote, 0, TAKION_MESSAGE_TYPE_A_DATA_ACK, 0, 0xc); takion_write_message_header(buf + 1, takion->tag_remote, NULL, 0, TAKION_MESSAGE_TYPE_A_DATA_ACK, 0, 0xc);
uint8_t *data_ack = buf + 1 + MESSAGE_HEADER_SIZE; uint8_t *data_ack = buf + 1 + MESSAGE_HEADER_SIZE;
*((uint32_t *)(data_ack + 0)) = htonl(seq_num); *((uint32_t *)(data_ack + 0)) = htonl(seq_num);
@ -328,7 +328,7 @@ CHIAKI_EXPORT ChiakiErrorCode chiaki_takion_send_congestion(ChiakiTakion *takion
if(err != CHIAKI_ERR_SUCCESS) if(err != CHIAKI_ERR_SUCCESS)
return err; return err;
chiaki_log_hexdump(takion->log, CHIAKI_LOG_DEBUG, buf, sizeof(buf)); //chiaki_log_hexdump(takion->log, CHIAKI_LOG_DEBUG, buf, sizeof(buf));
return chiaki_takion_send_raw(takion, buf, sizeof(buf)); return chiaki_takion_send_raw(takion, buf, sizeof(buf));
} }
@ -563,10 +563,13 @@ static void takion_handle_packet_message_data_ack(ChiakiTakion *takion, uint8_t
* *
* @param raw_payload_size size of the actual data of the payload excluding type_a, type_b and payload_size * @param raw_payload_size size of the actual data of the payload excluding type_a, type_b and payload_size
*/ */
static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint32_t key_pos, uint8_t type_a, uint8_t type_b, size_t payload_data_size) static void takion_write_message_header(uint8_t *buf, uint32_t tag, uint8_t *mac, uint32_t key_pos, uint8_t type_a, uint8_t type_b, size_t payload_data_size)
{ {
*((uint32_t *)(buf + 0)) = htonl(tag); *((uint32_t *)(buf + 0)) = htonl(tag);
*((uint32_t *)(buf + 4)) = 0; if(mac)
memcpy(buf + 4, mac, CHIAKI_GKCRYPT_GMAC_SIZE);
else
memset(buf + 4, 0, CHIAKI_GKCRYPT_GMAC_SIZE);
*((uint32_t *)(buf + 8)) = htonl(key_pos); *((uint32_t *)(buf + 8)) = htonl(key_pos);
*(buf + 0xc) = type_a; *(buf + 0xc) = type_a;
*(buf + 0xd) = type_b; *(buf + 0xd) = type_b;
@ -614,7 +617,7 @@ static ChiakiErrorCode takion_send_message_init(ChiakiTakion *takion, TakionMess
{ {
uint8_t message[1 + MESSAGE_HEADER_SIZE + 0x10]; uint8_t message[1 + MESSAGE_HEADER_SIZE + 0x10];
message[0] = TAKION_PACKET_TYPE_MESSAGE; message[0] = TAKION_PACKET_TYPE_MESSAGE;
takion_write_message_header(message + 1, takion->tag_remote, 0, TAKION_MESSAGE_TYPE_A_INIT, 0, 0x10); takion_write_message_header(message + 1, takion->tag_remote, NULL, 0, TAKION_MESSAGE_TYPE_A_INIT, 0, 0x10);
uint8_t *pl = message + 1 + MESSAGE_HEADER_SIZE; uint8_t *pl = message + 1 + MESSAGE_HEADER_SIZE;
*((uint32_t *)(pl + 0)) = htonl(payload->tag0); *((uint32_t *)(pl + 0)) = htonl(payload->tag0);
@ -632,7 +635,7 @@ static ChiakiErrorCode takion_send_message_cookie(ChiakiTakion *takion, uint8_t
{ {
uint8_t message[1 + MESSAGE_HEADER_SIZE + TAKION_COOKIE_SIZE]; uint8_t message[1 + MESSAGE_HEADER_SIZE + TAKION_COOKIE_SIZE];
message[0] = TAKION_PACKET_TYPE_MESSAGE; message[0] = TAKION_PACKET_TYPE_MESSAGE;
takion_write_message_header(message + 1, takion->tag_remote, 0, TAKION_MESSAGE_TYPE_A_COOKIE, 0, TAKION_COOKIE_SIZE); takion_write_message_header(message + 1, takion->tag_remote, NULL, 0, TAKION_MESSAGE_TYPE_A_COOKIE, 0, TAKION_COOKIE_SIZE);
memcpy(message + 1 + MESSAGE_HEADER_SIZE, cookie, TAKION_COOKIE_SIZE); memcpy(message + 1 + MESSAGE_HEADER_SIZE, cookie, TAKION_COOKIE_SIZE);
return chiaki_takion_send_raw(takion, message, sizeof(message)); return chiaki_takion_send_raw(takion, message, sizeof(message));
} }