Add C++ wrapper around pubsub listeners
Some checks failed
/ build_macos (push) Has been cancelled
/ build_windows (push) Has been cancelled
/ build_ubuntu (push) Has been cancelled

This commit is contained in:
Grant Limberg 2025-08-15 16:53:58 -07:00
commit 650fc0c74b
3 changed files with 157 additions and 0 deletions

View file

@ -0,0 +1,99 @@
#ifdef ZT_CONTROLLER_USE_LIBPQ
#include "PubSubListener.hpp"
#include "rustybits.h"
namespace ZeroTier {
void listener_callback(void* user_ptr, const uint8_t* payload, uintptr_t length)
{
if (! user_ptr || ! payload || length == 0) {
fprintf(stderr, "Invalid parameters in listener_callback\n");
return;
}
auto* listener = static_cast<PubSubListener*>(user_ptr);
std::string payload_str(reinterpret_cast<const char*>(payload), length);
listener->onNotification(payload_str);
}
NetworkListener::NetworkListener(const char* controller_id, uint64_t listen_timeout, rustybits::NetworkListenerCallback callback) : _listener(nullptr)
{
_listener = rustybits::network_listener_new(controller_id, listen_timeout, callback, this);
_listenThread = std::thread(&NetworkListener::listenThread, this);
_changeHandlerThread = std::thread(&NetworkListener::changeHandlerThread, this);
}
NetworkListener::~NetworkListener()
{
if (_listener) {
rustybits::network_listener_delete(_listener);
_listener = nullptr;
}
}
void NetworkListener::onNotification(const std::string& payload)
{
fprintf(stderr, "Network notification received: %s\n", payload.c_str());
// TODO: Implement the logic to handle network notifications
}
void NetworkListener::listenThread()
{
if (_listener) {
while (rustybits::network_listener_listen(_listener)) {
// just keep looping
}
}
}
void NetworkListener::changeHandlerThread()
{
if (_listener) {
rustybits::network_listener_change_handler(_listener);
}
}
MemberListener::MemberListener(const char* controller_id, uint64_t listen_timeout, rustybits::NetworkListenerCallback callback) : _listener(nullptr)
{
// Initialize the member listener with the provided controller ID and timeout
// The callback will be called when a member notification is received
{
_listener = rustybits::member_listener_new("controller_id", 60, listener_callback, this);
}
}
MemberListener::~MemberListener()
{
if (_listener) {
rustybits::member_listener_delete(_listener);
_listener = nullptr;
}
}
void MemberListener::onNotification(const std::string& payload)
{
fprintf(stderr, "Member notification received: %s\n", payload.c_str());
// TODO: Implement the logic to handle network notifications
}
void MemberListener::listenThread()
{
if (_listener) {
while (rustybits::member_listener_listen(_listener)) {
// just keep looping
}
}
}
void MemberListener::changeHandlerThread()
{
if (_listener) {
rustybits::member_listener_change_handler(_listener);
}
}
} // namespace ZeroTier
#endif // ZT_CONTROLLER_USE_LIBPQ

View file

@ -0,0 +1,57 @@
#ifdef ZT_CONTROLLER_USE_LIBPQ
#ifndef ZT_CONTROLLER_PUBSUBLISTENER_HPP
#define ZT_CONTROLLER_PUBSUBLISTENER_HPP
#include "rustybits.h"
#include <memory>
#include <string>
#include <thread>
namespace ZeroTier {
class PubSubListener {
public:
virtual ~PubSubListener()
{
}
virtual void onNotification(const std::string& payload) = 0;
};
class NetworkListener : public PubSubListener {
public:
NetworkListener(const char* controller_id, uint64_t listen_timeout, rustybits::NetworkListenerCallback callback);
virtual ~NetworkListener();
virtual void onNotification(const std::string& payload) override;
private:
void listenThread();
void changeHandlerThread();
const rustybits::NetworkListener* _listener;
std::thread _listenThread;
std::thread _changeHandlerThread;
};
class MemberListener : public PubSubListener {
public:
MemberListener(const char* controller_id, uint64_t listen_timeout, rustybits::MemberListenerCallback callback);
virtual ~MemberListener();
virtual void onNotification(const std::string& payload) override;
private:
void listenThread();
void changeHandlerThread();
const rustybits::MemberListener* _listener;
std::thread _listenThread;
std::thread _changeHandlerThread;
};
} // namespace ZeroTier
#endif // ZT_CONTROLLER_PUBSUBLISTENER_HPP
#endif // ZT_CONTROLLER_USE_LIBPQ

View file

@ -42,6 +42,7 @@ ONE_OBJS=\
controller/CtlUtil.o \
controller/CV1.o \
controller/CV2.o \
controller/PubSubListener.o \
osdep/EthernetTap.o \
osdep/ManagedRoute.o \
osdep/Http.o \