WIP: pubsub wrapper in Rust
Some checks are pending
/ build_macos (push) Waiting to run
/ build_windows (push) Waiting to run
/ build_ubuntu (push) Waiting to run

This commit is contained in:
Grant Limberg 2025-08-12 10:42:07 -07:00
commit 06b2ce9f30
13 changed files with 884 additions and 45 deletions

386
rustybits/Cargo.lock generated
View file

@ -281,6 +281,56 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bollard"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97ccca1260af6a459d75994ad5acc1651bcabcbdbc41467cc9786519ab854c30"
dependencies = [
"base64 0.22.1",
"bollard-stubs",
"bytes",
"futures-core",
"futures-util",
"hex",
"home",
"http 1.3.1",
"http-body-util",
"hyper 1.6.0",
"hyper-named-pipe",
"hyper-rustls",
"hyper-util",
"hyperlocal",
"log",
"pin-project-lite",
"rustls",
"rustls-native-certs",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"serde",
"serde_derive",
"serde_json",
"serde_repr",
"serde_urlencoded",
"thiserror 2.0.12",
"tokio",
"tokio-util",
"tower-service",
"url",
"winapi",
]
[[package]]
name = "bollard-stubs"
version = "1.47.1-rc.27.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f179cfbddb6e77a5472703d4b30436bff32929c0aa8a9008ecf23d1d3cdd0da"
dependencies = [
"serde",
"serde_repr",
"serde_with",
]
[[package]]
name = "bumpalo"
version = "3.19.0"
@ -658,6 +708,17 @@ dependencies = [
"syn",
]
[[package]]
name = "docker_credential"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8"
dependencies = [
"base64 0.21.7",
"serde",
"serde_json",
]
[[package]]
name = "downcast"
version = "0.11.0"
@ -802,6 +863,17 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "etcetera"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26c7b13d0780cb82722fd59f6f57f925e143427e4a75313a6c77243bf5326ae6"
dependencies = [
"cfg-if",
"home",
"windows-sys 0.59.0",
]
[[package]]
name = "event-listener"
version = "2.5.3"
@ -830,6 +902,18 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64cd1e32ddd350061ae6edb1b082d7c54915b5c672c389143b9a63403a109f24"
[[package]]
name = "filetime"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586"
dependencies = [
"cfg-if",
"libc",
"libredox",
"windows-sys 0.59.0",
]
[[package]]
name = "fixedbitset"
version = "0.5.7"
@ -1038,8 +1122,8 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501459a508e7887cfedc45a45ee41602ac1f66d2b61deb05f1c2256bf2faf46d"
dependencies = [
"prost",
"prost-types",
"prost 0.13.5",
"prost-types 0.13.5",
"tonic",
]
@ -1065,7 +1149,7 @@ dependencies = [
"gcloud-auth",
"gcloud-gax",
"gcloud-googleapis",
"prost-types",
"prost-types 0.13.5",
"thiserror 1.0.69",
"token-source",
"tokio",
@ -1364,6 +1448,37 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-named-pipe"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278"
dependencies = [
"hex",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
"winapi",
]
[[package]]
name = "hyper-rustls"
version = "0.27.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
dependencies = [
"http 1.3.1",
"hyper 1.6.0",
"hyper-util",
"rustls",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
@ -1430,6 +1545,21 @@ dependencies = [
"tracing",
]
[[package]]
name = "hyperlocal"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7"
dependencies = [
"hex",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "iana-time-zone"
version = "0.1.63"
@ -1724,6 +1854,17 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]]
name = "libredox"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4488594b9328dee448adb906d8b126d9b7deb7cf5c22161ee591610bb1be83c0"
dependencies = [
"bitflags 2.9.1",
"libc",
"redox_syscall 0.5.13",
]
[[package]]
name = "linux-raw-sys"
version = "0.9.4"
@ -2139,11 +2280,36 @@ checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"redox_syscall 0.5.13",
"smallvec",
"windows-targets 0.52.6",
]
[[package]]
name = "parse-display"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a"
dependencies = [
"parse-display-derive",
"regex",
"regex-syntax 0.8.5",
]
[[package]]
name = "parse-display-derive"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281"
dependencies = [
"proc-macro2",
"quote",
"regex",
"regex-syntax 0.8.5",
"structmeta",
"syn",
]
[[package]]
name = "pem"
version = "3.0.5"
@ -2347,7 +2513,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.13.5",
]
[[package]]
name = "prost"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
dependencies = [
"bytes",
"prost-derive 0.14.1",
]
[[package]]
@ -2363,8 +2539,28 @@ dependencies = [
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"prost 0.13.5",
"prost-types 0.13.5",
"regex",
"syn",
"tempfile",
]
[[package]]
name = "prost-build"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1"
dependencies = [
"heck",
"itertools 0.14.0",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost 0.14.1",
"prost-types 0.14.1",
"regex",
"syn",
"tempfile",
@ -2383,13 +2579,35 @@ dependencies = [
"syn",
]
[[package]]
name = "prost-derive"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425"
dependencies = [
"anyhow",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [
"prost",
"prost 0.13.5",
]
[[package]]
name = "prost-types"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72"
dependencies = [
"prost 0.14.1",
]
[[package]]
@ -2400,7 +2618,7 @@ checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1"
dependencies = [
"chrono",
"inventory",
"prost",
"prost 0.13.5",
"serde",
"serde_derive",
"serde_json",
@ -2414,9 +2632,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b"
dependencies = [
"heck",
"prost",
"prost-build",
"prost-types",
"prost 0.13.5",
"prost-build 0.13.5",
"prost-types 0.13.5",
"quote",
]
@ -2427,9 +2645,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5"
dependencies = [
"chrono",
"prost",
"prost-build",
"prost-types",
"prost 0.13.5",
"prost-build 0.13.5",
"prost-types 0.13.5",
"prost-wkt",
"prost-wkt-build",
"regex",
@ -2536,6 +2754,15 @@ dependencies = [
"bitflags 2.9.1",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.13"
@ -2633,7 +2860,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"serde_urlencoded",
@ -2832,6 +3059,15 @@ dependencies = [
"base64 0.21.7",
]
[[package]]
name = "rustls-pemfile"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.12.0"
@ -2868,14 +3104,21 @@ dependencies = [
"gcloud-pubsub",
"jwt",
"openidconnect",
"prost 0.14.1",
"prost-build 0.14.1",
"prost-types 0.14.1",
"reqwest 0.11.27",
"serde",
"serde_json",
"temporal-client",
"temporal-sdk",
"temporal-sdk-core-protos",
"testcontainers",
"testcontainers-modules",
"thiserror 1.0.69",
"time",
"tokio",
"tokio-util",
"url",
"uuid",
]
@ -3042,6 +3285,17 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_repr"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_spanned"
version = "0.6.9"
@ -3226,6 +3480,29 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "structmeta"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329"
dependencies = [
"proc-macro2",
"quote",
"structmeta-derive",
"syn",
]
[[package]]
name = "structmeta-derive"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.6.1"
@ -3394,7 +3671,7 @@ dependencies = [
"parking_lot",
"pid",
"pin-project",
"prost",
"prost 0.13.5",
"prost-wkt-types",
"rand 0.9.1",
"ringbuf",
@ -3427,7 +3704,7 @@ dependencies = [
"derive_builder",
"derive_more",
"opentelemetry",
"prost",
"prost 0.13.5",
"serde_json",
"temporal-sdk-core-protos",
"thiserror 2.0.12",
@ -3445,8 +3722,8 @@ dependencies = [
"anyhow",
"base64 0.22.1",
"derive_more",
"prost",
"prost-build",
"prost 0.13.5",
"prost-build 0.13.5",
"prost-wkt",
"prost-wkt-build",
"prost-wkt-types",
@ -3465,6 +3742,44 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "testcontainers"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23bb7577dca13ad86a78e8271ef5d322f37229ec83b8d98da6d996c588a1ddb1"
dependencies = [
"async-trait",
"bollard",
"bollard-stubs",
"bytes",
"docker_credential",
"either",
"etcetera",
"futures",
"log",
"memchr",
"parse-display",
"pin-project-lite",
"serde",
"serde_json",
"serde_with",
"thiserror 2.0.12",
"tokio",
"tokio-stream",
"tokio-tar",
"tokio-util",
"url",
]
[[package]]
name = "testcontainers-modules"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac95cde96549fc19c6bf19ef34cc42bd56e264c1cb97e700e21555be0ecf9e2"
dependencies = [
"testcontainers",
]
[[package]]
name = "thiserror"
version = "1.0.69"
@ -3636,6 +3951,21 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tar"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75"
dependencies = [
"filetime",
"futures-core",
"libc",
"redox_syscall 0.3.5",
"tokio",
"tokio-stream",
"xattr",
]
[[package]]
name = "tokio-util"
version = "0.7.15"
@ -3710,7 +4040,7 @@ dependencies = [
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"prost 0.13.5",
"rustls-native-certs",
"socket2",
"tokio",
@ -3731,8 +4061,8 @@ checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-types",
"prost-build 0.13.5",
"prost-types 0.13.5",
"quote",
"syn",
]
@ -4492,6 +4822,16 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb"
[[package]]
name = "xattr"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af3a19837351dc82ba89f8a125e22a3c475f05aba604acc023d62b2739ae2909"
dependencies = [
"libc",
"rustix",
]
[[package]]
name = "yoke"
version = "0.8.0"

View file

@ -7,24 +7,29 @@ edition = "2021"
crate-type = ["staticlib", "rlib"]
[features]
default = ["zeroidc"]
default = ["zeroidc", "ztcontroller"]
zeroidc = []
ztcontroller = [
"dep:serde",
"dep:serde_json",
"dep:temporal-sdk",
"dep:temporal-client",
"dep:temporal-sdk-core-protos",
"dep:gcloud-pubsub",
"dep:prost",
"dep:prost-types",
]
[dependencies]
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
temporal-sdk = { git = "https://github.com/temporalio/sdk-core", branch = "master", optional = true }
temporal-client = { git = "https://github.com/temporalio/sdk-core", branch = "master", optional = true, features = [
"telemetry",
] }
temporal-sdk-core-protos = { git = "https://github.com/temporalio/sdk-core", branch = "master", optional = true }
tokio = { version = "1.43", features = ["full"] }
tokio = { version = "1.43", features = ["full", "rt", "macros"] }
tokio-util = { version = "0.7" }
uuid = { version = "1.4", features = ["v4"] }
openidconnect = { version = "3.4", default-features = false, features = [
"reqwest",
@ -39,9 +44,18 @@ time = { version = "~0.3", features = ["formatting"] }
bytes = "1.3"
thiserror = "1"
gcloud-pubsub = { version = "1.3.0", optional = true }
prost = { version = "0.14", optional = true, features = ["derive"] }
prost-types = { version = "0.14", optional = true }
[dev-dependencies]
testcontainers = { version = "0.24", features = ["blocking"] }
testcontainers-modules = { version = "0.12.1", features = [
"google_cloud_sdk_emulators",
] }
[build-dependencies]
cbindgen = "0.29"
prost-build = "0.14"
[profile.release]
strip = "debuginfo"

View file

@ -5,6 +5,21 @@ use std::env;
use std::path::PathBuf;
fn main() {
let mut prost_build = prost_build::Config::new();
prost_build
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.compile_protos(
&[
"src/pubsub/metadata.proto",
"src/pubsub/network.proto",
"src/pubsub/member.proto",
"src/pubsub/member_status.proto",
],
&["src/pubsub/"],
)
.expect("Failed to compile protobuf files");
let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
let package_name = env::var("CARGO_PKG_NAME").unwrap();

View file

@ -12,6 +12,9 @@
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime;
use url::Url;
static mut RT: Option<tokio::runtime::Runtime> = None;
@ -39,7 +42,7 @@ pub unsafe extern "C" fn shutdown_async_runtime() {
SHUTDOWN.call_once(|| {
// Shutdown the tokio runtime
unsafe {
if let Some(rt) = RT.take() {
if let Some(rt) = RT.take() {
rt.shutdown_timeout(std::time::Duration::from_secs(5));
}
}
@ -486,3 +489,131 @@ pub unsafe extern "C" fn smee_client_notify_network_joined(
}
}
}
#[cfg(feature = "ztcontroller")]
use crate::pubsub::member_listener::MemberListener;
#[cfg(feature = "ztcontroller")]
use crate::pubsub::network_listener::NetworkListener;
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn init_network_listener(
controller_id: *const c_char,
listen_timeout: u64,
) -> *const Arc<NetworkListener> {
if listen_timeout == 0 {
println!("listen_timeout is zero");
return std::ptr::null_mut();
}
if controller_id.is_null() {
println!("controller_id is null");
return std::ptr::null_mut();
}
let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap();
let rt = runtime::Handle::current();
rt.block_on(async {
match NetworkListener::new(id, Duration::from_secs(listen_timeout)).await {
Ok(listener) => Arc::into_raw(Arc::new(listener)),
Err(e) => {
println!("error creating network listener: {}", e);
std::ptr::null_mut()
}
}
})
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn network_listener_delete(ptr: *const Arc<NetworkListener>) {
if ptr.is_null() {
return;
}
drop(Arc::from_raw(ptr));
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn network_listener_listen(ptr: *const Arc<NetworkListener>) -> bool {
use std::mem::ManuallyDrop;
if ptr.is_null() {
println!("ptr is null");
return false;
}
let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) });
let rt = runtime::Handle::current();
match rt.block_on(listener.listen()) {
Ok(_) => {
println!("Network listener started successfully");
true
}
Err(e) => {
println!("Error starting network listener: {}", e);
false
}
}
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn init_member_listener(
controller_id: *const c_char,
listen_timeout: u64,
) -> *const Arc<MemberListener> {
if listen_timeout == 0 {
println!("listen_timeout is zero");
return std::ptr::null_mut();
}
if controller_id.is_null() {
println!("controller_id is null");
return std::ptr::null_mut();
}
let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap();
let rt = runtime::Handle::current();
rt.block_on(async {
match MemberListener::new(id, Duration::from_secs(listen_timeout)).await {
Ok(listener) => Arc::into_raw(Arc::new(listener)),
Err(e) => {
println!("error creating member listener: {}", e);
std::ptr::null_mut()
}
}
})
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn member_listener_delete(ptr: *const Arc<MemberListener>) {
if ptr.is_null() {
return;
}
drop(Arc::from_raw(ptr));
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn member_listener_listen(ptr: *const Arc<MemberListener>) -> bool {
use std::mem::ManuallyDrop;
if ptr.is_null() {
println!("ptr is null");
return false;
}
let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) });
let rt = runtime::Handle::current();
match rt.block_on(listener.listen()) {
Ok(_) => {
println!("Member listener started successfully");
true
}
Err(e) => {
println!("Error starting member listener: {}", e);
false
}
}
}

View file

@ -0,0 +1,120 @@
use gcloud_pubsub::client::{Client, ClientConfig};
use gcloud_pubsub::subscription::SubscriptionConfig;
use gcloud_pubsub::topic::Topic;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;
pub struct ChangeListener {
client: Client,
topic: Topic,
subscription_name: String,
controller_id: String,
listen_timeout: Duration,
sender: Sender<Vec<u8>>,
}
impl ChangeListener {
pub async fn new(
controller_id: &str,
topic_name: &str,
subscription_name: &str,
listen_timeout: Duration,
sender: Sender<Vec<u8>>,
) -> Result<Self, Box<dyn std::error::Error>> {
let config = ClientConfig::default().with_auth().await.unwrap();
let client = Client::new(config).await?;
let topic = client.topic(topic_name);
Ok(Self {
client,
topic,
subscription_name: subscription_name.to_string(),
controller_id: controller_id.to_string(),
listen_timeout,
sender,
})
}
pub async fn listen(&self) -> Result<(), Box<dyn std::error::Error>> {
let config = SubscriptionConfig {
enable_message_ordering: true,
filter: format!("attributes.controller_id = '{}'", self.controller_id),
..Default::default()
};
let subscription = self.client.subscription(self.subscription_name.as_str());
if !subscription.exists(None).await? {
subscription
.create(self.topic.fully_qualified_name(), config, None)
.await?;
}
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
let listen_timeout = self.listen_timeout.clone();
tokio::spawn(async move {
tokio::time::sleep(listen_timeout).await;
cancel2.cancel();
});
let tx = self.sender.clone();
let _ = subscription
.receive(
move |message, _cancel| {
let tx2 = tx.clone();
async move {
let data = message.message.data.clone();
match tx2.send(data.to_vec()).await {
Ok(_) => println!("Message sent successfully"),
Err(e) => eprintln!("Failed to send message: {}", e),
}
match message.ack().await {
Ok(_) => println!("Message acknowledged"),
Err(e) => eprintln!("Failed to acknowledge message: {}", e),
}
}
},
cancel.clone(),
None,
)
.await;
Ok(())
}
}
#[cfg(test)]
mod tests {
// use super::*;
// use testcontainers::runners::AsyncRunner;
// use testcontainers_modules::google_cloud_sdk_emulators;
// use tokio;
// #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
// async fn setup_pubsub_emulator() -> Result<(), Box<dyn std::error::Error + 'static>> {
// let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?;
// let port = container.get_host_port_ipv4(8085).await?;
// let host = format!("localhost:{}", port);
// unsafe {
// std::env::set_var("PUBSUB_EMULATOR_HOST", host);
// }
// let cl = ChangeListener::new(
// "test_controller",
// "test_topic",
// "test_subscription",
// Duration::from_secs(10),
// )
// .await;
// assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err());
// Ok(())
// }
}

View file

@ -0,0 +1,37 @@
syntax = "proto3";
import "metadata.proto";
package pbmessages;
message Member {
string device_id = 1;
string network_id = 2;
string identity = 3; // Identity of the member
bool authorized = 4; // Whether the member is authorized
repeated string ip_assignments = 5; // List of IP assignments
bool active_bridge = 6; // Whether the member is an active bridge
string tags = 7; // JSON string of tags
string capabilities = 8; // JSON string of capabilities
uint64 creation_time = 9; // Unix timestamp in milliseconds
bool no_auto_assign_ips = 10; // Whether auto IP assignment is disabled
uint64 revision = 11; // Revision number
uint64 last_authorized_time = 12; // Last time the member was authorized
uint64 last_deauthorized_time = 13; // Last time the member was deauthorized
optional string last_authorized_credential_type = 14; // Type of credential used for last authorization
optional string last_authorized_credential = 15; // Credential used for last authorization
int32 version_major = 16; // Major version of the member
int32 version_minor = 17; // Minor version of the member
int32 version_rev = 18; // Patch version of the member
int32 version_protocol = 19; // Protocol version of the member
int32 remote_trace_level = 20; // Remote trace level
optional string remote_trace_target = 21; // Remote trace target
bool sso_exepmt = 22; // Whether SSO is exempt
uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds
}
message MemberChange {
optional Member old = 1;
optional Member new = 2;
optional Metadata metadata = 3;
}

View file

@ -0,0 +1,53 @@
use crate::pubsub::change_listener::ChangeListener;
use crate::pubsub::protobuf::pbmessages::MemberChange;
use prost::Message;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
/**
* Member Listener listens for member changes and passes them back to the controller
*
* This is a wrapper around ChangeListener that specifically handles member changes.
* It uses a Tokio channel to receive messages and decodes them into MemberChange messages.
*/
pub struct MemberListener {
change_listener: ChangeListener,
rx_channel: Mutex<Receiver<Vec<u8>>>,
}
impl MemberListener {
pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let change_listener = ChangeListener::new(
controller_id,
"controller-member-change-stream",
format!("{}-member-change-subscription", controller_id).as_str(),
listen_timeout,
tx,
)
.await?;
Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) }))
}
pub async fn listen(&self) -> Result<(), Box<dyn std::error::Error>> {
self.change_listener.listen().await
}
pub fn change_handler(self: Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let this = self.clone();
tokio::spawn(async move {
let mut rx = this.rx_channel.lock().await;
while let Some(change) = rx.recv().await {
if let Ok(m) = MemberChange::decode(change.as_slice()) {
print!("Received change: {:?}", m);
}
}
});
Ok(())
}
}

View file

@ -0,0 +1,16 @@
syntax = "proto3";
package pbmessages;
import "metadata.proto";
message MemberStatus {
Metadata metadata = 1;
string network_id = 2;
string member_id = 3;
uint64 timestamp = 4; // Unix timestamp in milliseconds
optional string ip_address = 5; // Optional IP address of the member
optional string os = 6;
optional string arch = 7;
optional string version = 8;
}

View file

@ -0,0 +1,8 @@
syntax = "proto3";
package pbmessages;
message Metadata {
string trace_id = 1;
string controller_id = 2;
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c)2023 ZeroTier, Inc.
* Copyright (c)2025 ZeroTier, Inc.
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
@ -9,21 +9,7 @@
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
use gcloud_pubsub::client::{Client, ClientConfig};
pub struct PubSubClient {
client: Client,
}
impl PubSubClient {
pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
let config = ClientConfig::default().with_auth().await.unwrap();
let client = Client::new(config).await?;
// Assuming a topic name is required for the client
let topic_name = "default-topic".to_string();
Ok(Self { client })
}
}
mod change_listener;
pub mod member_listener;
pub mod network_listener;
mod protobuf;

View file

@ -0,0 +1,62 @@
syntax = "proto3";
import "metadata.proto";
package pbmessages;
message IPRange {
string start_ip = 1; // Start of the IP range
string end_ip = 2; // End of the IP range
}
message Route {
string target = 1; // Target IP or network
optional string via = 2; // Optional next hop IP
}
message DNS {
string domain = 1; // Search domain
repeated string nameservers = 2; // List of nameservers
}
message IPV4AssignMode {
bool zt = 1; // Whether ZeroTier is used for IPv4 assignment
}
message IPv6AssignMode {
bool six_plane = 1; // Whether 6plane is used for IPv6 assignment
bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment
bool zt = 3; // Whether ZeroTier is used for IPv6 assignment
}
message Network {
string network_id = 1;
string capabilities = 2; // JSON string of capabilities
uint64 creation_time = 3; // Unix timestamp in milliseconds
bool enable_broadcast = 4; // Whether broadcast is enabled
repeated IPRange assignment_pools = 5; // List of IP ranges for assignment
uint32 mtu = 6; // Maximum Transmission Unit
uint32 multicast_limit = 7; // Limit for multicast messages
optional string name = 8; // Name of the network
bool is_private = 9; // Whether the network is private
uint32 remote_trace_level = 10; // Remote trace level
optional string remote_trace_target = 11; // Remote trace target
uint64 revision = 12; // Revision number
repeated Route routes = 13; // List of routes
string rules = 14; // JSON string of rules
optional string tags = 15; // JSON string of tags
IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode
IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode
optional DNS dns = 18; // DNS configuration
bool sso_enabled = 19; // Whether Single Sign-On is enabled
optional string sso_client_id = 20; // SSO client ID
optional string sso_authorization_endpoint = 21; // SSO authorization endpoint
optional string sso_issuer = 22; // SSO issuer
optional string sso_provider = 23; // SSO provider
}
message NetworkChange {
optional Network old = 1;
optional Network new = 2;
optional Metadata metadata = 3;
}

View file

@ -0,0 +1,54 @@
use crate::pubsub::change_listener::ChangeListener;
use crate::pubsub::protobuf::pbmessages::NetworkChange;
use prost::Message;
use serde_json;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
/**
* Network Listener listens for network changes and passes them back to the controller
*
* This is a wrapper around ChangeListener that specifically handles network changes.
* It uses a Tokio channel to receive messages and decodes them into NetworkChange messages.
*/
pub struct NetworkListener {
change_listener: ChangeListener,
rx_channel: Mutex<Receiver<Vec<u8>>>,
}
impl NetworkListener {
pub async fn new(controller_id: &str, listen_timeout: Duration) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let change_listener = ChangeListener::new(
controller_id,
"controller-network-change-stream",
format!("{}-network-change-subscription", controller_id).as_str(),
listen_timeout,
tx,
)
.await?;
Ok(Arc::new(Self { change_listener, rx_channel: Mutex::new(rx) }))
}
pub async fn listen(&self) -> Result<(), Box<dyn std::error::Error>> {
self.change_listener.listen().await
}
pub fn change_handler(self: Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let this = self.clone();
tokio::spawn(async move {
let mut rx = this.rx_channel.lock().await;
while let Some(change) = rx.recv().await {
if let Ok(m) = NetworkChange::decode(change.as_slice()) {
let j = serde_json::to_string(&m).unwrap();
print!("Received change: {:?}", j);
}
}
});
Ok(())
}
}

View file

@ -0,0 +1,3 @@
pub(crate) mod pbmessages {
include!(concat!(env!("OUT_DIR"), "/pbmessages.rs"));
}