plumb through callbacks
Some checks are pending
/ build_macos (push) Waiting to run
/ build_windows (push) Waiting to run
/ build_ubuntu (push) Waiting to run

This commit is contained in:
Grant Limberg 2025-08-13 12:17:03 -07:00
commit 0e94891e5b
4 changed files with 114 additions and 29 deletions

View file

@ -12,8 +12,11 @@
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
#[cfg(feature = "ztcontroller")]
use std::os::raw::c_void;
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "ztcontroller")]
use tokio::runtime;
use url::Url;
@ -495,11 +498,18 @@ use crate::pubsub::member_listener::MemberListener;
#[cfg(feature = "ztcontroller")]
use crate::pubsub::network_listener::NetworkListener;
#[cfg(feature = "ztcontroller")]
use crate::pubsub::member_listener::MemberListenerCallback;
#[cfg(feature = "ztcontroller")]
use crate::pubsub::network_listener::NetworkListenerCallback;
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn init_network_listener(
controller_id: *const c_char,
listen_timeout: u64,
callback: NetworkListenerCallback,
user_ptr: *mut c_void,
) -> *const Arc<NetworkListener> {
if listen_timeout == 0 {
println!("listen_timeout is zero");
@ -514,7 +524,7 @@ pub unsafe extern "C" fn init_network_listener(
let rt = runtime::Handle::current();
rt.block_on(async {
match NetworkListener::new(id, Duration::from_secs(listen_timeout)).await {
match NetworkListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await {
Ok(listener) => Arc::into_raw(Arc::new(listener)),
Err(e) => {
println!("error creating network listener: {}", e);
@ -562,6 +572,8 @@ pub unsafe extern "C" fn network_listener_listen(ptr: *const Arc<NetworkListener
pub unsafe extern "C" fn init_member_listener(
controller_id: *const c_char,
listen_timeout: u64,
callback: MemberListenerCallback,
user_ptr: *mut c_void,
) -> *const Arc<MemberListener> {
if listen_timeout == 0 {
println!("listen_timeout is zero");
@ -576,7 +588,7 @@ pub unsafe extern "C" fn init_member_listener(
let rt = runtime::Handle::current();
rt.block_on(async {
match MemberListener::new(id, Duration::from_secs(listen_timeout)).await {
match MemberListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await {
Ok(listener) => Arc::into_raw(Arc::new(listener)),
Err(e) => {
println!("error creating member listener: {}", e);

View file

@ -88,33 +88,42 @@ impl ChangeListener {
#[cfg(test)]
mod tests {
// use super::*;
use super::*;
// use testcontainers::runners::AsyncRunner;
// use testcontainers_modules::google_cloud_sdk_emulators;
// use tokio;
use testcontainers::runners::AsyncRunner;
use testcontainers::ContainerAsync;
use testcontainers_modules::google_cloud_sdk_emulators;
use testcontainers_modules::google_cloud_sdk_emulators::CloudSdk;
use tokio;
// #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
// async fn setup_pubsub_emulator() -> Result<(), Box<dyn std::error::Error + 'static>> {
// let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?;
// let port = container.get_host_port_ipv4(8085).await?;
// let host = format!("localhost:{}", port);
// unsafe {
// std::env::set_var("PUBSUB_EMULATOR_HOST", host);
// }
// let cl = ChangeListener::new(
// "test_controller",
// "test_topic",
// "test_subscription",
// Duration::from_secs(10),
// )
// .await;
// assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err());
// Ok(())
// }
async fn setup_pubsub_emulator() -> Result<(ContainerAsync<CloudSdk>, String), Box<dyn std::error::Error>> {
let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?;
let port = container.get_host_port_ipv4(8085).await?;
let host = format!("localhost:{}", port);
Ok((container, host))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_can_connect_to_pubsub() -> Result<(), Box<dyn std::error::Error + 'static>> {
let (_container, host) = setup_pubsub_emulator().await?;
unsafe {
std::env::set_var("PUBSUB_EMULATOR_HOST", host);
}
let (tx, _rx) = tokio::sync::mpsc::channel(64);
let cl = ChangeListener::new(
"test_controller",
"test_topic",
"test_subscription",
Duration::from_secs(10),
tx,
)
.await;
assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err());
Ok(())
}
}

View file

@ -1,11 +1,16 @@
use crate::pubsub::change_listener::ChangeListener;
use crate::pubsub::protobuf::pbmessages::MemberChange;
use prost::Message;
use std::io::Write;
use std::os::raw::c_void;
use std::sync::atomic::AtomicPtr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
pub type MemberListenerCallback = extern "C" fn(*const c_void, *const u8, usize);
/**
* Member Listener listens for member changes and passes them back to the controller
*
@ -15,10 +20,17 @@ use tokio::sync::Mutex;
pub struct MemberListener {
change_listener: ChangeListener,
rx_channel: Mutex<Receiver<Vec<u8>>>,
callback: Mutex<MemberListenerCallback>,
user_ptr: AtomicPtr<c_void>,
}
impl MemberListener {
pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
pub async fn new(
controller_id: &str,
listen_timeout: Duration,
callback: MemberListenerCallback,
user_ptr: *mut c_void,
) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let change_listener = ChangeListener::new(
@ -30,7 +42,12 @@ impl MemberListener {
)
.await?;
Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) }))
Ok(Arc::new(Self {
change_listener,
rx_channel: Mutex::new(rx),
callback: Mutex::new(callback),
user_ptr: AtomicPtr::new(user_ptr as *mut c_void),
}))
}
pub async fn listen(&self) -> Result<(), Box<dyn std::error::Error>> {
@ -44,6 +61,21 @@ impl MemberListener {
while let Some(change) = rx.recv().await {
if let Ok(m) = MemberChange::decode(change.as_slice()) {
print!("Received change: {:?}", m);
let j = serde_json::to_string(&m).unwrap();
let mut buffer = [0; 16384];
let mut test: &mut [u8] = &mut buffer;
let mut size: usize = 0;
while let Ok(bytes) = test.write(j.as_bytes()) {
if bytes == 0 {
break;
}
size += bytes;
}
let callback = this.callback.lock().await;
let user_ptr = this.user_ptr.load(std::sync::atomic::Ordering::Relaxed);
(callback)(user_ptr, test.as_ptr(), size);
}
}
});

View file

@ -2,11 +2,16 @@ use crate::pubsub::change_listener::ChangeListener;
use crate::pubsub::protobuf::pbmessages::NetworkChange;
use prost::Message;
use serde_json;
use std::io::Write;
use std::os::raw::c_void;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
pub type NetworkListenerCallback = extern "C" fn(*mut c_void, *const u8, usize);
/**
* Network Listener listens for network changes and passes them back to the controller
*
@ -16,10 +21,17 @@ use tokio::sync::Mutex;
pub struct NetworkListener {
change_listener: ChangeListener,
rx_channel: Mutex<Receiver<Vec<u8>>>,
callback: Mutex<NetworkListenerCallback>,
user_ptr: AtomicPtr<c_void>,
}
impl NetworkListener {
pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
pub async fn new(
controller_id: &str,
listen_timeout: Duration,
callback: NetworkListenerCallback,
user_ptr: *mut c_void,
) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let change_listener = ChangeListener::new(
@ -30,7 +42,13 @@ impl NetworkListener {
tx,
)
.await?;
Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) }))
Ok(Arc::new(Self {
change_listener,
rx_channel: Mutex::new(rx),
callback: Mutex::new(callback),
user_ptr: AtomicPtr::new(user_ptr as *mut c_void),
}))
}
pub async fn listen(&self) -> Result<(), Box<dyn std::error::Error>> {
@ -43,8 +61,22 @@ impl NetworkListener {
let mut rx = this.rx_channel.lock().await;
while let Some(change) = rx.recv().await {
if let Ok(m) = NetworkChange::decode(change.as_slice()) {
print!("Received change: {:?}", m);
let j = serde_json::to_string(&m).unwrap();
print!("Received change: {:?}", j);
let mut buffer = [0; 16384];
let mut test: &mut [u8] = &mut buffer;
let mut size: usize = 0;
while let Ok(bytes) = test.write(j.as_bytes()) {
if bytes == 0 {
break; // No more space to write
}
size += bytes;
}
let callback = this.callback.lock().await;
let user_ptr = this.user_ptr.load(Ordering::Relaxed);
(callback)(user_ptr, test.as_ptr(), size);
}
}
});