From 1afbc73ff8855085560aa4dd187c1809b7ac3a6f Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 13 Mar 2023 14:46:01 -0400 Subject: [PATCH] The scatter/gather algorithm works. --- network-hypervisor/src/dr/mod.rs | 2 +- .../src/dr/{rcdcb.rs => scattergather.rs} | 98 +++++++++++++------ 2 files changed, 71 insertions(+), 29 deletions(-) rename network-hypervisor/src/dr/{rcdcb.rs => scattergather.rs} (65%) diff --git a/network-hypervisor/src/dr/mod.rs b/network-hypervisor/src/dr/mod.rs index 3ee667d4e..9bbde70ad 100644 --- a/network-hypervisor/src/dr/mod.rs +++ b/network-hypervisor/src/dr/mod.rs @@ -1 +1 @@ -pub mod rcdcb; +pub mod scattergather; diff --git a/network-hypervisor/src/dr/rcdcb.rs b/network-hypervisor/src/dr/scattergather.rs similarity index 65% rename from network-hypervisor/src/dr/rcdcb.rs rename to network-hypervisor/src/dr/scattergather.rs index bdcd26096..6fb11d9cc 100644 --- a/network-hypervisor/src/dr/rcdcb.rs +++ b/network-hypervisor/src/dr/scattergather.rs @@ -1,4 +1,4 @@ -use fastcdc::v2020::{FastCDC, MINIMUM_MIN}; +use fastcdc::v2020; use std::io::Write; use zerotier_crypto::hash::{SHA384, SHA384_HASH_SIZE}; use zerotier_utils::error::{InvalidFormatError, InvalidParameterError}; @@ -17,19 +17,20 @@ impl ObjectAssembler { Self { data_chunks: Vec::new(), need: hash_list } } - fn gather_recursive Option>>( + fn gather_recursive Option>>( hl: &[u8], new_hl: &mut Vec, get_chunk: &mut GetChunk, have_all_data_chunk_hashes: &mut bool, depth: usize, ) -> Result<(), InvalidFormatError> { - if (hl.len() % SHA384_HASH_SIZE) != 0 { + if (hl.len() % SHA384_HASH_SIZE) != 0 || hl.is_empty() { return Err(InvalidFormatError); } for h in hl.chunks_exact(SHA384_HASH_SIZE) { if (h[SHA384_HASH_SIZE - 1] & 0x01) != 0 { - if let Some(chunk) = get_chunk(h) { + debug_assert_eq!(h.len(), SHA384_HASH_SIZE); + if let Some(chunk) = get_chunk(unsafe { &*h.as_ptr().cast() }) { if depth < MAX_RECURSION_DEPTH { Self::gather_recursive(chunk.as_slice(), new_hl, get_chunk, have_all_data_chunk_hashes, depth + 1)?; continue; @@ -46,9 +47,14 @@ impl ObjectAssembler { /// Try to assemble this object, using the supplied function to request chunks we don't have. /// - /// Once all chunks are retrieved this will return Ok(Some(object)). An error return can occur if a chunk - /// is invalid or the maximum recursion depth is reached. - pub fn gather Option>>(&mut self, mut get_chunk: GetChunk) -> Result>, InvalidFormatError> { + /// Once all chunks are retrieved this will return Ok(Some(object)). A return of Ok(None) means there are + /// still missing chunks that couldn't be resolved with the supplied getter. In that case this should be + /// called again once more chunks are fetched. An error return indicates invalid chunk data or that the + /// maximum recursion depth has been exceeded. + pub fn gather Option>>( + &mut self, + mut get_chunk: GetChunk, + ) -> Result>, InvalidFormatError> { let mut new_need = Vec::with_capacity(self.need.len()); let mut have_all_data_chunk_hashes = true; Self::gather_recursive(self.need.as_slice(), &mut new_need, &mut get_chunk, &mut have_all_data_chunk_hashes, 0)?; @@ -57,19 +63,26 @@ impl ObjectAssembler { if have_all_data_chunk_hashes { self.data_chunks.resize(self.need.len() / SHA384_HASH_SIZE, Vec::new()); - new_need.clear(); let mut cn = 0; + let mut missing_chunks = false; for h in self.need.chunks_exact(SHA384_HASH_SIZE) { - if let Some(chunk) = get_chunk(h) { - *self.data_chunks.get_mut(cn).unwrap() = chunk; - } else { - let _ = new_need.write_all(h); + let dc = self.data_chunks.get_mut(cn).unwrap(); + if dc.is_empty() { + debug_assert_eq!(h.len(), SHA384_HASH_SIZE); + if let Some(chunk) = get_chunk(unsafe { &*h.as_ptr().cast() }) { + if !chunk.is_empty() { + *dc = chunk; + } else { + return Err(InvalidFormatError); + } + } else { + missing_chunks = true; + } } cn += 1; } - self.need = new_need; - if self.need.is_empty() { + if !missing_chunks { let mut obj_size = 0; for dc in self.data_chunks.iter() { obj_size += dc.len(); @@ -84,6 +97,12 @@ impl ObjectAssembler { return Ok(None); } + + /// Get an iterator of hashes currently known to be needed to reassemble this object. + #[inline] + pub fn need(&self) -> impl Iterator { + self.need.chunks_exact(SHA384_HASH_SIZE).map(|c| unsafe { &*c.as_ptr().cast() }) + } } /// Decompose an object into a series of chunks identified by SHA384 hashes. @@ -95,7 +114,8 @@ impl ObjectAssembler { /// set to 0 if the hash points to a chunk of data or 1 if it points to a chunk of hashes. /// /// The supplied function is called to output each chunk except for the root hash list, which is -/// returned. +/// returned. It's technically possible for the same chunk to be output more than once if there are +/// long runs of identical data in the supplied object. In this case it need only be stored once. /// /// * `obj` - Blob to decompose /// * `max_chunk_size` - Maximum size of any chunk including root hash list (minimum allowed: 256) @@ -110,7 +130,7 @@ pub fn scatter( } let mut root_hash_list = Vec::with_capacity(max_chunk_size as usize); - for chunk in FastCDC::new(obj, (max_chunk_size / 4).max(MINIMUM_MIN), max_chunk_size / 2, max_chunk_size) { + for chunk in v2020::FastCDC::new(obj, (max_chunk_size / 4).max(v2020::MINIMUM_MIN), max_chunk_size / 2, max_chunk_size) { let chunk = &obj[chunk.offset..chunk.offset + chunk.length]; let mut chunk_hash = SHA384::hash(chunk); chunk_hash[SHA384_HASH_SIZE - 1] &= 0xfe; // chunk of data @@ -125,17 +145,20 @@ pub fn scatter( loop { let mut r = root_hash_list.as_slice(); while !r.is_empty() { - let clen = r.len().min(max_hashes_per_chunk); - let chunk = &r[..clen]; - if clen > SHA384_HASH_SIZE && (new_root_hash_list.len() + clen) > (max_chunk_size as usize) { + debug_assert_eq!(new_root_hash_list.len() % SHA384_HASH_SIZE, 0); + debug_assert_eq!(r.len() % SHA384_HASH_SIZE, 0); + if (new_root_hash_list.len() + r.len()) <= (max_chunk_size as usize) { + let _ = new_root_hash_list.write_all(r); + break; + } else { + let clen = r.len().min(max_hashes_per_chunk); + let chunk = &r[..clen]; + r = &r[clen..]; + let mut chunk_hash = SHA384::hash(chunk); chunk_hash[SHA384_HASH_SIZE - 1] |= 0x01; // chunk of hashes let _ = new_root_hash_list.write_all(&chunk_hash); store_chunk(chunk_hash, chunk); - r = &r[clen..]; - } else { - let _ = new_root_hash_list.write_all(chunk); - break; } } std::mem::swap(&mut root_hash_list, &mut new_root_hash_list); @@ -159,16 +182,15 @@ pub fn scatter( mod tests { use super::*; use std::collections::HashMap; - use zerotier_utils::hex; #[test] fn rcdcb_random_blobs() { let mut random_data = Vec::new(); - random_data.resize(1024 * 1024 * 32, 0); + random_data.resize(1024 * 1024 * 8, 0); zerotier_crypto::random::fill_bytes_secure(random_data.as_mut()); let mut chunks = HashMap::new(); - for _ in 0..1024 { + for _ in 0..4 { chunks.clear(); let test_blob = ((zerotier_crypto::random::xorshift64_random() as usize) % (random_data.len() - 1)) + 1; let test_blob = &random_data.as_slice()[..test_blob]; @@ -176,8 +198,28 @@ mod tests { let root_hash_list = scatter(test_blob, 1024, |k, v| { //println!("{}", hex::to_string(&k)); chunks.insert(k, v.to_vec()); - }); - println!("{} chunks", chunks.len()); + }) + .unwrap(); + + let mut assembler = ObjectAssembler::init(root_hash_list); + let mut gathered_blob; + loop { + gathered_blob = assembler + .gather(|c| { + if zerotier_crypto::random::xorshift64_random() < (u64::MAX / 8) { + None + } else { + chunks.get(c).cloned() + } + }) + .unwrap(); + if gathered_blob.is_some() { + break; + } + } + let gathered_blob = gathered_blob.unwrap(); + + assert!(gathered_blob.eq(test_blob)); } } }