diff --git a/controller/PubSubListener.cpp b/controller/PubSubListener.cpp new file mode 100644 index 000000000..738bb0130 --- /dev/null +++ b/controller/PubSubListener.cpp @@ -0,0 +1,99 @@ +#ifdef ZT_CONTROLLER_USE_LIBPQ +#include "PubSubListener.hpp" + +#include "rustybits.h" + +namespace ZeroTier { + +void listener_callback(void* user_ptr, const uint8_t* payload, uintptr_t length) +{ + if (! user_ptr || ! payload || length == 0) { + fprintf(stderr, "Invalid parameters in listener_callback\n"); + return; + } + + auto* listener = static_cast(user_ptr); + std::string payload_str(reinterpret_cast(payload), length); + listener->onNotification(payload_str); +} + +NetworkListener::NetworkListener(const char* controller_id, uint64_t listen_timeout, rustybits::NetworkListenerCallback callback) : _listener(nullptr) +{ + _listener = rustybits::network_listener_new(controller_id, listen_timeout, callback, this); + _listenThread = std::thread(&NetworkListener::listenThread, this); + _changeHandlerThread = std::thread(&NetworkListener::changeHandlerThread, this); +} + +NetworkListener::~NetworkListener() +{ + if (_listener) { + rustybits::network_listener_delete(_listener); + _listener = nullptr; + } +} + +void NetworkListener::onNotification(const std::string& payload) +{ + fprintf(stderr, "Network notification received: %s\n", payload.c_str()); + // TODO: Implement the logic to handle network notifications +} + +void NetworkListener::listenThread() +{ + if (_listener) { + while (rustybits::network_listener_listen(_listener)) { + // just keep looping + } + } +} + +void NetworkListener::changeHandlerThread() +{ + if (_listener) { + rustybits::network_listener_change_handler(_listener); + } +} + +MemberListener::MemberListener(const char* controller_id, uint64_t listen_timeout, rustybits::NetworkListenerCallback callback) : _listener(nullptr) +{ + // Initialize the member listener with the provided controller ID and timeout + // The callback will be called when a member notification is received + { + _listener = rustybits::member_listener_new("controller_id", 60, listener_callback, this); + } +} + +MemberListener::~MemberListener() +{ + if (_listener) { + rustybits::member_listener_delete(_listener); + _listener = nullptr; + } +} + +void MemberListener::onNotification(const std::string& payload) +{ + fprintf(stderr, "Member notification received: %s\n", payload.c_str()); + + // TODO: Implement the logic to handle network notifications +} + +void MemberListener::listenThread() +{ + if (_listener) { + while (rustybits::member_listener_listen(_listener)) { + // just keep looping + } + } +} + +void MemberListener::changeHandlerThread() +{ + if (_listener) { + rustybits::member_listener_change_handler(_listener); + } +} + +} // namespace ZeroTier + +#endif // ZT_CONTROLLER_USE_LIBPQ \ No newline at end of file diff --git a/controller/PubSubListener.hpp b/controller/PubSubListener.hpp new file mode 100644 index 000000000..43513e7f4 --- /dev/null +++ b/controller/PubSubListener.hpp @@ -0,0 +1,57 @@ +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#ifndef ZT_CONTROLLER_PUBSUBLISTENER_HPP +#define ZT_CONTROLLER_PUBSUBLISTENER_HPP + +#include "rustybits.h" + +#include +#include +#include + +namespace ZeroTier { +class PubSubListener { + public: + virtual ~PubSubListener() + { + } + + virtual void onNotification(const std::string& payload) = 0; +}; + +class NetworkListener : public PubSubListener { + public: + NetworkListener(const char* controller_id, uint64_t listen_timeout, rustybits::NetworkListenerCallback callback); + virtual ~NetworkListener(); + + virtual void onNotification(const std::string& payload) override; + + private: + void listenThread(); + void changeHandlerThread(); + + const rustybits::NetworkListener* _listener; + std::thread _listenThread; + std::thread _changeHandlerThread; +}; + +class MemberListener : public PubSubListener { + public: + MemberListener(const char* controller_id, uint64_t listen_timeout, rustybits::MemberListenerCallback callback); + virtual ~MemberListener(); + + virtual void onNotification(const std::string& payload) override; + + private: + void listenThread(); + void changeHandlerThread(); + + const rustybits::MemberListener* _listener; + std::thread _listenThread; + std::thread _changeHandlerThread; +}; + +} // namespace ZeroTier + +#endif // ZT_CONTROLLER_PUBSUBLISTENER_HPP +#endif // ZT_CONTROLLER_USE_LIBPQ \ No newline at end of file diff --git a/objects.mk b/objects.mk index 238481a42..79bd0ab48 100644 --- a/objects.mk +++ b/objects.mk @@ -42,6 +42,7 @@ ONE_OBJS=\ controller/CtlUtil.o \ controller/CV1.o \ controller/CV2.o \ + controller/PubSubListener.o \ osdep/EthernetTap.o \ osdep/ManagedRoute.o \ osdep/Http.o \