diff --git a/rustybits/src/ext.rs b/rustybits/src/ext.rs index 7f933de34..8378d9921 100644 --- a/rustybits/src/ext.rs +++ b/rustybits/src/ext.rs @@ -12,8 +12,11 @@ use std::ffi::{CStr, CString}; use std::os::raw::c_char; +#[cfg(feature = "ztcontroller")] +use std::os::raw::c_void; use std::sync::Arc; use std::time::Duration; +#[cfg(feature = "ztcontroller")] use tokio::runtime; use url::Url; @@ -495,11 +498,18 @@ use crate::pubsub::member_listener::MemberListener; #[cfg(feature = "ztcontroller")] use crate::pubsub::network_listener::NetworkListener; +#[cfg(feature = "ztcontroller")] +use crate::pubsub::member_listener::MemberListenerCallback; +#[cfg(feature = "ztcontroller")] +use crate::pubsub::network_listener::NetworkListenerCallback; + #[cfg(feature = "ztcontroller")] #[no_mangle] pub unsafe extern "C" fn init_network_listener( controller_id: *const c_char, listen_timeout: u64, + callback: NetworkListenerCallback, + user_ptr: *mut c_void, ) -> *const Arc { if listen_timeout == 0 { println!("listen_timeout is zero"); @@ -514,7 +524,7 @@ pub unsafe extern "C" fn init_network_listener( let rt = runtime::Handle::current(); rt.block_on(async { - match NetworkListener::new(id, Duration::from_secs(listen_timeout)).await { + match NetworkListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await { Ok(listener) => Arc::into_raw(Arc::new(listener)), Err(e) => { println!("error creating network listener: {}", e); @@ -562,6 +572,8 @@ pub unsafe extern "C" fn network_listener_listen(ptr: *const Arc *const Arc { if listen_timeout == 0 { println!("listen_timeout is zero"); @@ -576,7 +588,7 @@ pub unsafe extern "C" fn init_member_listener( let rt = runtime::Handle::current(); rt.block_on(async { - match MemberListener::new(id, Duration::from_secs(listen_timeout)).await { + match MemberListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await { Ok(listener) => Arc::into_raw(Arc::new(listener)), Err(e) => { println!("error creating member listener: {}", e); diff --git a/rustybits/src/pubsub/change_listener.rs b/rustybits/src/pubsub/change_listener.rs index f02c0bca5..eab3f8e31 100644 --- a/rustybits/src/pubsub/change_listener.rs +++ b/rustybits/src/pubsub/change_listener.rs @@ -88,33 +88,42 @@ impl ChangeListener { #[cfg(test)] mod tests { - // use super::*; + use super::*; - // use testcontainers::runners::AsyncRunner; - // use testcontainers_modules::google_cloud_sdk_emulators; - // use tokio; + use testcontainers::runners::AsyncRunner; + use testcontainers::ContainerAsync; + use testcontainers_modules::google_cloud_sdk_emulators; + use testcontainers_modules::google_cloud_sdk_emulators::CloudSdk; + use tokio; - // #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - // async fn setup_pubsub_emulator() -> Result<(), Box> { - // let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?; + async fn setup_pubsub_emulator() -> Result<(ContainerAsync, String), Box> { + let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?; + let port = container.get_host_port_ipv4(8085).await?; + let host = format!("localhost:{}", port); + Ok((container, host)) + } - // let port = container.get_host_port_ipv4(8085).await?; - // let host = format!("localhost:{}", port); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_can_connect_to_pubsub() -> Result<(), Box> { + let (_container, host) = setup_pubsub_emulator().await?; - // unsafe { - // std::env::set_var("PUBSUB_EMULATOR_HOST", host); - // } + unsafe { + std::env::set_var("PUBSUB_EMULATOR_HOST", host); + } - // let cl = ChangeListener::new( - // "test_controller", - // "test_topic", - // "test_subscription", - // Duration::from_secs(10), - // ) - // .await; + let (tx, _rx) = tokio::sync::mpsc::channel(64); - // assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err()); + let cl = ChangeListener::new( + "test_controller", + "test_topic", + "test_subscription", + Duration::from_secs(10), + tx, + ) + .await; - // Ok(()) - // } + assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err()); + + Ok(()) + } } diff --git a/rustybits/src/pubsub/member_listener.rs b/rustybits/src/pubsub/member_listener.rs index f379e17c7..7b647dd01 100644 --- a/rustybits/src/pubsub/member_listener.rs +++ b/rustybits/src/pubsub/member_listener.rs @@ -1,11 +1,16 @@ use crate::pubsub::change_listener::ChangeListener; use crate::pubsub::protobuf::pbmessages::MemberChange; use prost::Message; +use std::io::Write; +use std::os::raw::c_void; +use std::sync::atomic::AtomicPtr; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::Mutex; +pub type MemberListenerCallback = extern "C" fn(*const c_void, *const u8, usize); + /** * Member Listener listens for member changes and passes them back to the controller * @@ -15,10 +20,17 @@ use tokio::sync::Mutex; pub struct MemberListener { change_listener: ChangeListener, rx_channel: Mutex>>, + callback: Mutex, + user_ptr: AtomicPtr, } impl MemberListener { - pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result, Box> { + pub async fn new( + controller_id: &str, + listen_timeout: Duration, + callback: MemberListenerCallback, + user_ptr: *mut c_void, + ) -> Result, Box> { let (tx, rx) = tokio::sync::mpsc::channel(64); let change_listener = ChangeListener::new( @@ -30,7 +42,12 @@ impl MemberListener { ) .await?; - Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) })) + Ok(Arc::new(Self { + change_listener, + rx_channel: Mutex::new(rx), + callback: Mutex::new(callback), + user_ptr: AtomicPtr::new(user_ptr as *mut c_void), + })) } pub async fn listen(&self) -> Result<(), Box> { @@ -44,6 +61,21 @@ impl MemberListener { while let Some(change) = rx.recv().await { if let Ok(m) = MemberChange::decode(change.as_slice()) { print!("Received change: {:?}", m); + + let j = serde_json::to_string(&m).unwrap(); + let mut buffer = [0; 16384]; + let mut test: &mut [u8] = &mut buffer; + let mut size: usize = 0; + while let Ok(bytes) = test.write(j.as_bytes()) { + if bytes == 0 { + break; + } + size += bytes; + } + let callback = this.callback.lock().await; + let user_ptr = this.user_ptr.load(std::sync::atomic::Ordering::Relaxed); + + (callback)(user_ptr, test.as_ptr(), size); } } }); diff --git a/rustybits/src/pubsub/network_listener.rs b/rustybits/src/pubsub/network_listener.rs index e97bc74d7..111bab893 100644 --- a/rustybits/src/pubsub/network_listener.rs +++ b/rustybits/src/pubsub/network_listener.rs @@ -2,11 +2,16 @@ use crate::pubsub::change_listener::ChangeListener; use crate::pubsub::protobuf::pbmessages::NetworkChange; use prost::Message; use serde_json; +use std::io::Write; +use std::os::raw::c_void; +use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::Mutex; +pub type NetworkListenerCallback = extern "C" fn(*mut c_void, *const u8, usize); + /** * Network Listener listens for network changes and passes them back to the controller * @@ -16,10 +21,17 @@ use tokio::sync::Mutex; pub struct NetworkListener { change_listener: ChangeListener, rx_channel: Mutex>>, + callback: Mutex, + user_ptr: AtomicPtr, } impl NetworkListener { - pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result, Box> { + pub async fn new( + controller_id: &str, + listen_timeout: Duration, + callback: NetworkListenerCallback, + user_ptr: *mut c_void, + ) -> Result, Box> { let (tx, rx) = tokio::sync::mpsc::channel(64); let change_listener = ChangeListener::new( @@ -30,7 +42,13 @@ impl NetworkListener { tx, ) .await?; - Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) })) + + Ok(Arc::new(Self { + change_listener, + rx_channel: Mutex::new(rx), + callback: Mutex::new(callback), + user_ptr: AtomicPtr::new(user_ptr as *mut c_void), + })) } pub async fn listen(&self) -> Result<(), Box> { @@ -43,8 +61,22 @@ impl NetworkListener { let mut rx = this.rx_channel.lock().await; while let Some(change) = rx.recv().await { if let Ok(m) = NetworkChange::decode(change.as_slice()) { + print!("Received change: {:?}", m); + let j = serde_json::to_string(&m).unwrap(); - print!("Received change: {:?}", j); + let mut buffer = [0; 16384]; + let mut test: &mut [u8] = &mut buffer; + let mut size: usize = 0; + while let Ok(bytes) = test.write(j.as_bytes()) { + if bytes == 0 { + break; // No more space to write + } + size += bytes; + } + let callback = this.callback.lock().await; + let user_ptr = this.user_ptr.load(Ordering::Relaxed); + + (callback)(user_ptr, test.as_ptr(), size); } } });