diff --git a/rustybits/Cargo.lock b/rustybits/Cargo.lock index cf3c6acea..1f0d77521 100644 --- a/rustybits/Cargo.lock +++ b/rustybits/Cargo.lock @@ -281,6 +281,56 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bollard" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97ccca1260af6a459d75994ad5acc1651bcabcbdbc41467cc9786519ab854c30" +dependencies = [ + "base64 0.22.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "home", + "http 1.3.1", + "http-body-util", + "hyper 1.6.0", + "hyper-named-pipe", + "hyper-rustls", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "rustls", + "rustls-native-certs", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror 2.0.12", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.47.1-rc.27.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f179cfbddb6e77a5472703d4b30436bff32929c0aa8a9008ecf23d1d3cdd0da" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -658,6 +708,17 @@ dependencies = [ "syn", ] +[[package]] +name = "docker_credential" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8" +dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", +] + [[package]] name = "downcast" version = "0.11.0" @@ -802,6 +863,17 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "etcetera" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c7b13d0780cb82722fd59f6f57f925e143427e4a75313a6c77243bf5326ae6" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.59.0", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -830,6 +902,18 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64cd1e32ddd350061ae6edb1b082d7c54915b5c672c389143b9a63403a109f24" +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "fixedbitset" version = "0.5.7" @@ -1038,8 +1122,8 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "501459a508e7887cfedc45a45ee41602ac1f66d2b61deb05f1c2256bf2faf46d" dependencies = [ - "prost", - "prost-types", + "prost 0.13.5", + "prost-types 0.13.5", "tonic", ] @@ -1065,7 +1149,7 @@ dependencies = [ "gcloud-auth", "gcloud-gax", "gcloud-googleapis", - "prost-types", + "prost-types 0.13.5", "thiserror 1.0.69", "token-source", "tokio", @@ -1364,6 +1448,37 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http 1.3.1", + "hyper 1.6.0", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-timeout" version = "0.5.2" @@ -1430,6 +1545,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyperlocal" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.63" @@ -1724,6 +1854,17 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libredox" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4488594b9328dee448adb906d8b126d9b7deb7cf5c22161ee591610bb1be83c0" +dependencies = [ + "bitflags 2.9.1", + "libc", + "redox_syscall 0.5.13", +] + [[package]] name = "linux-raw-sys" version = "0.9.4" @@ -2139,11 +2280,36 @@ checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.13", "smallvec", "windows-targets 0.52.6", ] +[[package]] +name = "parse-display" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax 0.8.5", +] + +[[package]] +name = "parse-display-derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax 0.8.5", + "structmeta", + "syn", +] + [[package]] name = "pem" version = "3.0.5" @@ -2347,7 +2513,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive 0.14.1", ] [[package]] @@ -2363,8 +2539,28 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", - "prost-types", + "prost 0.13.5", + "prost-types 0.13.5", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" +dependencies = [ + "heck", + "itertools 0.14.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.14.1", + "prost-types 0.14.1", "regex", "syn", "tempfile", @@ -2383,13 +2579,35 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "prost-types" version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" dependencies = [ - "prost", + "prost 0.13.5", +] + +[[package]] +name = "prost-types" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" +dependencies = [ + "prost 0.14.1", ] [[package]] @@ -2400,7 +2618,7 @@ checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" dependencies = [ "chrono", "inventory", - "prost", + "prost 0.13.5", "serde", "serde_derive", "serde_json", @@ -2414,9 +2632,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" dependencies = [ "heck", - "prost", - "prost-build", - "prost-types", + "prost 0.13.5", + "prost-build 0.13.5", + "prost-types 0.13.5", "quote", ] @@ -2427,9 +2645,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" dependencies = [ "chrono", - "prost", - "prost-build", - "prost-types", + "prost 0.13.5", + "prost-build 0.13.5", + "prost-types 0.13.5", "prost-wkt", "prost-wkt-build", "regex", @@ -2536,6 +2754,15 @@ dependencies = [ "bitflags 2.9.1", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.13" @@ -2633,7 +2860,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2832,6 +3059,15 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -2868,14 +3104,21 @@ dependencies = [ "gcloud-pubsub", "jwt", "openidconnect", + "prost 0.14.1", + "prost-build 0.14.1", + "prost-types 0.14.1", "reqwest 0.11.27", "serde", + "serde_json", "temporal-client", "temporal-sdk", "temporal-sdk-core-protos", + "testcontainers", + "testcontainers-modules", "thiserror 1.0.69", "time", "tokio", + "tokio-util", "url", "uuid", ] @@ -3042,6 +3285,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -3226,6 +3480,29 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn", +] + +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" @@ -3394,7 +3671,7 @@ dependencies = [ "parking_lot", "pid", "pin-project", - "prost", + "prost 0.13.5", "prost-wkt-types", "rand 0.9.1", "ringbuf", @@ -3427,7 +3704,7 @@ dependencies = [ "derive_builder", "derive_more", "opentelemetry", - "prost", + "prost 0.13.5", "serde_json", "temporal-sdk-core-protos", "thiserror 2.0.12", @@ -3445,8 +3722,8 @@ dependencies = [ "anyhow", "base64 0.22.1", "derive_more", - "prost", - "prost-build", + "prost 0.13.5", + "prost-build 0.13.5", "prost-wkt", "prost-wkt-build", "prost-wkt-types", @@ -3465,6 +3742,44 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" +[[package]] +name = "testcontainers" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23bb7577dca13ad86a78e8271ef5d322f37229ec83b8d98da6d996c588a1ddb1" +dependencies = [ + "async-trait", + "bollard", + "bollard-stubs", + "bytes", + "docker_credential", + "either", + "etcetera", + "futures", + "log", + "memchr", + "parse-display", + "pin-project-lite", + "serde", + "serde_json", + "serde_with", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-tar", + "tokio-util", + "url", +] + +[[package]] +name = "testcontainers-modules" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac95cde96549fc19c6bf19ef34cc42bd56e264c1cb97e700e21555be0ecf9e2" +dependencies = [ + "testcontainers", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -3636,6 +3951,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tar" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall 0.3.5", + "tokio", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -3710,7 +4040,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "rustls-native-certs", "socket2", "tokio", @@ -3731,8 +4061,8 @@ checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" dependencies = [ "prettyplease", "proc-macro2", - "prost-build", - "prost-types", + "prost-build 0.13.5", + "prost-types 0.13.5", "quote", "syn", ] @@ -4492,6 +4822,16 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" +[[package]] +name = "xattr" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af3a19837351dc82ba89f8a125e22a3c475f05aba604acc023d62b2739ae2909" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "yoke" version = "0.8.0" diff --git a/rustybits/Cargo.toml b/rustybits/Cargo.toml index 797ff8bf7..3f0f82b5c 100644 --- a/rustybits/Cargo.toml +++ b/rustybits/Cargo.toml @@ -7,24 +7,29 @@ edition = "2021" crate-type = ["staticlib", "rlib"] [features] -default = ["zeroidc"] +default = ["zeroidc", "ztcontroller"] zeroidc = [] ztcontroller = [ "dep:serde", + "dep:serde_json", "dep:temporal-sdk", "dep:temporal-client", "dep:temporal-sdk-core-protos", "dep:gcloud-pubsub", + "dep:prost", + "dep:prost-types", ] [dependencies] serde = { version = "1", features = ["derive"], optional = true } +serde_json = { version = "1", optional = true } temporal-sdk = { git = "https://github.com/temporalio/sdk-core", branch = "master", optional = true } temporal-client = { git = "https://github.com/temporalio/sdk-core", branch = "master", optional = true, features = [ "telemetry", ] } temporal-sdk-core-protos = { git = "https://github.com/temporalio/sdk-core", branch = "master", optional = true } -tokio = { version = "1.43", features = ["full"] } +tokio = { version = "1.43", features = ["full", "rt", "macros"] } +tokio-util = { version = "0.7" } uuid = { version = "1.4", features = ["v4"] } openidconnect = { version = "3.4", default-features = false, features = [ "reqwest", @@ -39,9 +44,18 @@ time = { version = "~0.3", features = ["formatting"] } bytes = "1.3" thiserror = "1" gcloud-pubsub = { version = "1.3.0", optional = true } +prost = { version = "0.14", optional = true, features = ["derive"] } +prost-types = { version = "0.14", optional = true } + +[dev-dependencies] +testcontainers = { version = "0.24", features = ["blocking"] } +testcontainers-modules = { version = "0.12.1", features = [ + "google_cloud_sdk_emulators", +] } [build-dependencies] cbindgen = "0.29" +prost-build = "0.14" [profile.release] strip = "debuginfo" diff --git a/rustybits/build.rs b/rustybits/build.rs index be7973db1..1d0d01e36 100644 --- a/rustybits/build.rs +++ b/rustybits/build.rs @@ -5,6 +5,21 @@ use std::env; use std::path::PathBuf; fn main() { + let mut prost_build = prost_build::Config::new(); + + prost_build + .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") + .compile_protos( + &[ + "src/pubsub/metadata.proto", + "src/pubsub/network.proto", + "src/pubsub/member.proto", + "src/pubsub/member_status.proto", + ], + &["src/pubsub/"], + ) + .expect("Failed to compile protobuf files"); + let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); let package_name = env::var("CARGO_PKG_NAME").unwrap(); diff --git a/rustybits/src/ext.rs b/rustybits/src/ext.rs index 1b87b9dc1..7f933de34 100644 --- a/rustybits/src/ext.rs +++ b/rustybits/src/ext.rs @@ -12,6 +12,9 @@ use std::ffi::{CStr, CString}; use std::os::raw::c_char; +use std::sync::Arc; +use std::time::Duration; +use tokio::runtime; use url::Url; static mut RT: Option = None; @@ -39,7 +42,7 @@ pub unsafe extern "C" fn shutdown_async_runtime() { SHUTDOWN.call_once(|| { // Shutdown the tokio runtime unsafe { - if let Some(rt) = RT.take() { + if let Some(rt) = RT.take() { rt.shutdown_timeout(std::time::Duration::from_secs(5)); } } @@ -486,3 +489,131 @@ pub unsafe extern "C" fn smee_client_notify_network_joined( } } } + +#[cfg(feature = "ztcontroller")] +use crate::pubsub::member_listener::MemberListener; +#[cfg(feature = "ztcontroller")] +use crate::pubsub::network_listener::NetworkListener; + +#[cfg(feature = "ztcontroller")] +#[no_mangle] +pub unsafe extern "C" fn init_network_listener( + controller_id: *const c_char, + listen_timeout: u64, +) -> *const Arc { + if listen_timeout == 0 { + println!("listen_timeout is zero"); + return std::ptr::null_mut(); + } + if controller_id.is_null() { + println!("controller_id is null"); + return std::ptr::null_mut(); + } + + let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap(); + + let rt = runtime::Handle::current(); + rt.block_on(async { + match NetworkListener::new(id, Duration::from_secs(listen_timeout)).await { + Ok(listener) => Arc::into_raw(Arc::new(listener)), + Err(e) => { + println!("error creating network listener: {}", e); + std::ptr::null_mut() + } + } + }) +} + +#[cfg(feature = "ztcontroller")] +#[no_mangle] +pub unsafe extern "C" fn network_listener_delete(ptr: *const Arc) { + if ptr.is_null() { + return; + } + drop(Arc::from_raw(ptr)); +} + +#[cfg(feature = "ztcontroller")] +#[no_mangle] +pub unsafe extern "C" fn network_listener_listen(ptr: *const Arc) -> bool { + use std::mem::ManuallyDrop; + if ptr.is_null() { + println!("ptr is null"); + return false; + } + + let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); + + let rt = runtime::Handle::current(); + match rt.block_on(listener.listen()) { + Ok(_) => { + println!("Network listener started successfully"); + true + } + Err(e) => { + println!("Error starting network listener: {}", e); + false + } + } +} + +#[cfg(feature = "ztcontroller")] +#[no_mangle] +pub unsafe extern "C" fn init_member_listener( + controller_id: *const c_char, + listen_timeout: u64, +) -> *const Arc { + if listen_timeout == 0 { + println!("listen_timeout is zero"); + return std::ptr::null_mut(); + } + if controller_id.is_null() { + println!("controller_id is null"); + return std::ptr::null_mut(); + } + + let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap(); + + let rt = runtime::Handle::current(); + rt.block_on(async { + match MemberListener::new(id, Duration::from_secs(listen_timeout)).await { + Ok(listener) => Arc::into_raw(Arc::new(listener)), + Err(e) => { + println!("error creating member listener: {}", e); + std::ptr::null_mut() + } + } + }) +} + +#[cfg(feature = "ztcontroller")] +#[no_mangle] +pub unsafe extern "C" fn member_listener_delete(ptr: *const Arc) { + if ptr.is_null() { + return; + } + drop(Arc::from_raw(ptr)); +} + +#[cfg(feature = "ztcontroller")] +#[no_mangle] +pub unsafe extern "C" fn member_listener_listen(ptr: *const Arc) -> bool { + use std::mem::ManuallyDrop; + if ptr.is_null() { + println!("ptr is null"); + return false; + } + + let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); + let rt = runtime::Handle::current(); + match rt.block_on(listener.listen()) { + Ok(_) => { + println!("Member listener started successfully"); + true + } + Err(e) => { + println!("Error starting member listener: {}", e); + false + } + } +} diff --git a/rustybits/src/pubsub/change_listener.rs b/rustybits/src/pubsub/change_listener.rs new file mode 100644 index 000000000..f02c0bca5 --- /dev/null +++ b/rustybits/src/pubsub/change_listener.rs @@ -0,0 +1,120 @@ +use gcloud_pubsub::client::{Client, ClientConfig}; +use gcloud_pubsub::subscription::SubscriptionConfig; +use gcloud_pubsub::topic::Topic; +use std::time::Duration; +use tokio::sync::mpsc::Sender; +use tokio_util::sync::CancellationToken; + +pub struct ChangeListener { + client: Client, + topic: Topic, + subscription_name: String, + controller_id: String, + listen_timeout: Duration, + sender: Sender>, +} + +impl ChangeListener { + pub async fn new( + controller_id: &str, + topic_name: &str, + subscription_name: &str, + listen_timeout: Duration, + sender: Sender>, + ) -> Result> { + let config = ClientConfig::default().with_auth().await.unwrap(); + let client = Client::new(config).await?; + let topic = client.topic(topic_name); + Ok(Self { + client, + topic, + subscription_name: subscription_name.to_string(), + controller_id: controller_id.to_string(), + listen_timeout, + sender, + }) + } + + pub async fn listen(&self) -> Result<(), Box> { + let config = SubscriptionConfig { + enable_message_ordering: true, + filter: format!("attributes.controller_id = '{}'", self.controller_id), + ..Default::default() + }; + + let subscription = self.client.subscription(self.subscription_name.as_str()); + if !subscription.exists(None).await? { + subscription + .create(self.topic.fully_qualified_name(), config, None) + .await?; + } + + let cancel = CancellationToken::new(); + let cancel2 = cancel.clone(); + + let listen_timeout = self.listen_timeout.clone(); + tokio::spawn(async move { + tokio::time::sleep(listen_timeout).await; + cancel2.cancel(); + }); + + let tx = self.sender.clone(); + let _ = subscription + .receive( + move |message, _cancel| { + let tx2 = tx.clone(); + async move { + let data = message.message.data.clone(); + + match tx2.send(data.to_vec()).await { + Ok(_) => println!("Message sent successfully"), + Err(e) => eprintln!("Failed to send message: {}", e), + } + + match message.ack().await { + Ok(_) => println!("Message acknowledged"), + Err(e) => eprintln!("Failed to acknowledge message: {}", e), + } + } + }, + cancel.clone(), + None, + ) + .await; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + // use super::*; + + // use testcontainers::runners::AsyncRunner; + // use testcontainers_modules::google_cloud_sdk_emulators; + // use tokio; + + // #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + // async fn setup_pubsub_emulator() -> Result<(), Box> { + // let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?; + + // let port = container.get_host_port_ipv4(8085).await?; + // let host = format!("localhost:{}", port); + + // unsafe { + // std::env::set_var("PUBSUB_EMULATOR_HOST", host); + // } + + // let cl = ChangeListener::new( + // "test_controller", + // "test_topic", + // "test_subscription", + // Duration::from_secs(10), + // ) + // .await; + + // assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err()); + + // Ok(()) + // } +} diff --git a/rustybits/src/pubsub/member.proto b/rustybits/src/pubsub/member.proto new file mode 100644 index 000000000..cdf8cc169 --- /dev/null +++ b/rustybits/src/pubsub/member.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +import "metadata.proto"; + +package pbmessages; + +message Member { + string device_id = 1; + string network_id = 2; + string identity = 3; // Identity of the member + bool authorized = 4; // Whether the member is authorized + repeated string ip_assignments = 5; // List of IP assignments + bool active_bridge = 6; // Whether the member is an active bridge + string tags = 7; // JSON string of tags + string capabilities = 8; // JSON string of capabilities + uint64 creation_time = 9; // Unix timestamp in milliseconds + bool no_auto_assign_ips = 10; // Whether auto IP assignment is disabled + uint64 revision = 11; // Revision number + uint64 last_authorized_time = 12; // Last time the member was authorized + uint64 last_deauthorized_time = 13; // Last time the member was deauthorized + optional string last_authorized_credential_type = 14; // Type of credential used for last authorization + optional string last_authorized_credential = 15; // Credential used for last authorization + int32 version_major = 16; // Major version of the member + int32 version_minor = 17; // Minor version of the member + int32 version_rev = 18; // Patch version of the member + int32 version_protocol = 19; // Protocol version of the member + int32 remote_trace_level = 20; // Remote trace level + optional string remote_trace_target = 21; // Remote trace target + bool sso_exepmt = 22; // Whether SSO is exempt + uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds +} + +message MemberChange { + optional Member old = 1; + optional Member new = 2; + optional Metadata metadata = 3; +} diff --git a/rustybits/src/pubsub/member_listener.rs b/rustybits/src/pubsub/member_listener.rs new file mode 100644 index 000000000..f379e17c7 --- /dev/null +++ b/rustybits/src/pubsub/member_listener.rs @@ -0,0 +1,53 @@ +use crate::pubsub::change_listener::ChangeListener; +use crate::pubsub::protobuf::pbmessages::MemberChange; +use prost::Message; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; + +/** + * Member Listener listens for member changes and passes them back to the controller + * + * This is a wrapper around ChangeListener that specifically handles member changes. + * It uses a Tokio channel to receive messages and decodes them into MemberChange messages. + */ +pub struct MemberListener { + change_listener: ChangeListener, + rx_channel: Mutex>>, +} + +impl MemberListener { + pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result, Box> { + let (tx, rx) = tokio::sync::mpsc::channel(64); + + let change_listener = ChangeListener::new( + controller_id, + "controller-member-change-stream", + format!("{}-member-change-subscription", controller_id).as_str(), + listen_timeout, + tx, + ) + .await?; + + Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) })) + } + + pub async fn listen(&self) -> Result<(), Box> { + self.change_listener.listen().await + } + + pub fn change_handler(self: Arc) -> Result<(), Box> { + let this = self.clone(); + tokio::spawn(async move { + let mut rx = this.rx_channel.lock().await; + while let Some(change) = rx.recv().await { + if let Ok(m) = MemberChange::decode(change.as_slice()) { + print!("Received change: {:?}", m); + } + } + }); + + Ok(()) + } +} diff --git a/rustybits/src/pubsub/member_status.proto b/rustybits/src/pubsub/member_status.proto new file mode 100644 index 000000000..74e590089 --- /dev/null +++ b/rustybits/src/pubsub/member_status.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package pbmessages; + +import "metadata.proto"; + +message MemberStatus { + Metadata metadata = 1; + string network_id = 2; + string member_id = 3; + uint64 timestamp = 4; // Unix timestamp in milliseconds + optional string ip_address = 5; // Optional IP address of the member + optional string os = 6; + optional string arch = 7; + optional string version = 8; +} \ No newline at end of file diff --git a/rustybits/src/pubsub/metadata.proto b/rustybits/src/pubsub/metadata.proto new file mode 100644 index 000000000..a5e75dac4 --- /dev/null +++ b/rustybits/src/pubsub/metadata.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package pbmessages; + +message Metadata { + string trace_id = 1; + string controller_id = 2; +} \ No newline at end of file diff --git a/rustybits/src/pubsub/mod.rs b/rustybits/src/pubsub/mod.rs index 719f52e59..8d5f2a5b0 100644 --- a/rustybits/src/pubsub/mod.rs +++ b/rustybits/src/pubsub/mod.rs @@ -1,5 +1,5 @@ /* - * Copyright (c)2023 ZeroTier, Inc. + * Copyright (c)2025 ZeroTier, Inc. * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file in the project's root directory. @@ -9,21 +9,7 @@ * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2.0 of the Apache License. */ - -use gcloud_pubsub::client::{Client, ClientConfig}; - -pub struct PubSubClient { - client: Client, -} - -impl PubSubClient { - pub async fn new() -> Result> { - let config = ClientConfig::default().with_auth().await.unwrap(); - let client = Client::new(config).await?; - - // Assuming a topic name is required for the client - let topic_name = "default-topic".to_string(); - - Ok(Self { client }) - } -} +mod change_listener; +pub mod member_listener; +pub mod network_listener; +mod protobuf; diff --git a/rustybits/src/pubsub/network.proto b/rustybits/src/pubsub/network.proto new file mode 100644 index 000000000..cf2586fe2 --- /dev/null +++ b/rustybits/src/pubsub/network.proto @@ -0,0 +1,62 @@ +syntax = "proto3"; + +import "metadata.proto"; + +package pbmessages; + +message IPRange { + string start_ip = 1; // Start of the IP range + string end_ip = 2; // End of the IP range +} + +message Route { + string target = 1; // Target IP or network + optional string via = 2; // Optional next hop IP +} + +message DNS { + string domain = 1; // Search domain + repeated string nameservers = 2; // List of nameservers +} + +message IPV4AssignMode { + bool zt = 1; // Whether ZeroTier is used for IPv4 assignment +} + +message IPv6AssignMode { + bool six_plane = 1; // Whether 6plane is used for IPv6 assignment + bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment + bool zt = 3; // Whether ZeroTier is used for IPv6 assignment +} + +message Network { + string network_id = 1; + string capabilities = 2; // JSON string of capabilities + uint64 creation_time = 3; // Unix timestamp in milliseconds + bool enable_broadcast = 4; // Whether broadcast is enabled + repeated IPRange assignment_pools = 5; // List of IP ranges for assignment + uint32 mtu = 6; // Maximum Transmission Unit + uint32 multicast_limit = 7; // Limit for multicast messages + optional string name = 8; // Name of the network + bool is_private = 9; // Whether the network is private + uint32 remote_trace_level = 10; // Remote trace level + optional string remote_trace_target = 11; // Remote trace target + uint64 revision = 12; // Revision number + repeated Route routes = 13; // List of routes + string rules = 14; // JSON string of rules + optional string tags = 15; // JSON string of tags + IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode + IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode + optional DNS dns = 18; // DNS configuration + bool sso_enabled = 19; // Whether Single Sign-On is enabled + optional string sso_client_id = 20; // SSO client ID + optional string sso_authorization_endpoint = 21; // SSO authorization endpoint + optional string sso_issuer = 22; // SSO issuer + optional string sso_provider = 23; // SSO provider +} + +message NetworkChange { + optional Network old = 1; + optional Network new = 2; + optional Metadata metadata = 3; +} diff --git a/rustybits/src/pubsub/network_listener.rs b/rustybits/src/pubsub/network_listener.rs new file mode 100644 index 000000000..e97bc74d7 --- /dev/null +++ b/rustybits/src/pubsub/network_listener.rs @@ -0,0 +1,54 @@ +use crate::pubsub::change_listener::ChangeListener; +use crate::pubsub::protobuf::pbmessages::NetworkChange; +use prost::Message; +use serde_json; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; + +/** + * Network Listener listens for network changes and passes them back to the controller + * + * This is a wrapper around ChangeListener that specifically handles network changes. + * It uses a Tokio channel to receive messages and decodes them into NetworkChange messages. + */ +pub struct NetworkListener { + change_listener: ChangeListener, + rx_channel: Mutex>>, +} + +impl NetworkListener { + pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result, Box> { + let (tx, rx) = tokio::sync::mpsc::channel(64); + + let change_listener = ChangeListener::new( + controller_id, + "controller-network-change-stream", + format!("{}-network-change-subscription", controller_id).as_str(), + listen_timeout, + tx, + ) + .await?; + Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) })) + } + + pub async fn listen(&self) -> Result<(), Box> { + self.change_listener.listen().await + } + + pub fn change_handler(self: Arc) -> Result<(), Box> { + let this = self.clone(); + tokio::spawn(async move { + let mut rx = this.rx_channel.lock().await; + while let Some(change) = rx.recv().await { + if let Ok(m) = NetworkChange::decode(change.as_slice()) { + let j = serde_json::to_string(&m).unwrap(); + print!("Received change: {:?}", j); + } + } + }); + + Ok(()) + } +} diff --git a/rustybits/src/pubsub/protobuf.rs b/rustybits/src/pubsub/protobuf.rs new file mode 100644 index 000000000..aa2614865 --- /dev/null +++ b/rustybits/src/pubsub/protobuf.rs @@ -0,0 +1,3 @@ +pub(crate) mod pbmessages { + include!(concat!(env!("OUT_DIR"), "/pbmessages.rs")); +}