From da1bd74bdc2d4e9939d2a8452df4c812acb25f1d Mon Sep 17 00:00:00 2001 From: monica Date: Wed, 22 Mar 2023 11:31:03 -0400 Subject: [PATCH] saved nonworking --- zssp/src/fragged.rs | 95 +++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 43 deletions(-) diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index a1d1e8c9e..73eca9682 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -6,8 +6,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; /// Fast packet defragmenter pub struct Fragged { + counter_want: RwLock<(u64, u64)>, have: AtomicU64, - counter: RwLock, frags: UnsafeCell<[MaybeUninit; MAX_FRAGMENTS]>, } @@ -38,13 +38,17 @@ impl Fragged { // that the array of MaybeUninit can be freely cast into an array of // Fragment. They also check that the maximum number of fragments is not too large // for the fact that we use bits in a u64 to track which fragments are received. - assert!(MAX_FRAGMENTS <= 64); - assert_eq!(size_of::>(), size_of::()); - assert_eq!( + debug_assert!(MAX_FRAGMENTS <= 64); + debug_assert_eq!(size_of::>(), size_of::()); + debug_assert_eq!( size_of::<[MaybeUninit; MAX_FRAGMENTS]>(), size_of::<[Fragment; MAX_FRAGMENTS]>() ); - unsafe { zeroed() } + Self { + have: AtomicU64::new(0), + counter_want: RwLock::new((0, 0)), + frags: unsafe { zeroed() }, + } } /// Add a fragment and return an assembled packet container if all fragments have been received. @@ -54,51 +58,52 @@ impl Fragged { #[inline(always)] pub fn assemble(&self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option> { if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS { - let r = self.counter.read().unwrap(); - let cur_counter = *r; + let r = self.counter_want.read().unwrap(); + let (cur_counter, want) = *r; let mut r_guard = Some(r); - let mut w_guard = None; - + let mut _w_guard = None; // If the counter has changed, reset the structure to receive a new packet. if counter != cur_counter { - drop(r_guard.take()); - let mut w = self.counter.write().unwrap(); - if *w != counter { - *w = counter; + r_guard.take(); + let mut w = self.counter_want.write().unwrap(); + let (cur_counter, _) = *w; + if counter != cur_counter { if needs_drop::() { let mut have = self.have.load(Ordering::Relaxed); - let mut i = 0; - while have != 0 { - if (have & 1) != 0 { - debug_assert!(i < MAX_FRAGMENTS); - unsafe { (*self.frags.get()).get_unchecked_mut(i).assume_init_drop() }; + if have != want { + let mut i = 0; + while have != 0 { + if (have & 1) != 0 { + debug_assert!(i < MAX_FRAGMENTS); + unsafe { (*self.frags.get()).get_unchecked_mut(i).assume_init_drop() }; + } + have = have.wrapping_shr(1); + i += 1; } - have = have.wrapping_shr(1); - i += 1; } } + self.have.store(0, Ordering::Relaxed); + let want = 1u64.wrapping_shl(fragment_count as u32) - 1; + *w = (counter, want); } - w_guard = Some(w); + _w_guard = Some(w); } - let want = 0xffffffffffffffffu64.wrapping_shr((64 - fragment_count) as u32); + // Validate inputs, mainly that the fragment_count matches what we expect + // and that fragment_no is actually a wanted fragment let got = 1u64.wrapping_shl(fragment_no as u32); - let have = self.have.fetch_or(got, Ordering::Relaxed); + if want & got > 0 && 1u64.wrapping_shl(fragment_count as u32) == want + 1 { - if have & got == 0 { - unsafe { - (*self.frags.get()).get_unchecked_mut(fragment_no as usize).write(fragment); - } - if ((have | got) & want) == want { - drop(r_guard.take()); - let mut w = w_guard.unwrap_or_else(|| self.counter.write().unwrap()); - if *w == counter { - *w = 0; - self.have.store(0, Ordering::Relaxed); - // Setting 'have' to 0 resets the state of this object, and the fragments - // are effectively moved into the Assembled<> container and returned. That - // container will drop them when it is dropped. + // Now we check whether fragment_no is missing + let have = self.have.fetch_or(got, Ordering::Relaxed); + if have & got == 0 { + unsafe { + (*self.frags.get()).get_unchecked_mut(fragment_no as usize).write(fragment); + } + if (have | got) == want { + // The fragments are effectively moved into the Assembled<> container and returned. + // That container will drop them when it is dropped. return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); } } @@ -112,15 +117,19 @@ impl Drop for Fragged() { + let r = self.counter_want.read().unwrap(); + let (_, want) = *r; let mut have = self.have.load(Ordering::Relaxed); - let mut i = 0; - while have != 0 { - if (have & 1) != 0 { - debug_assert!(i < MAX_FRAGMENTS); - unsafe { (*self.frags.get()).get_unchecked_mut(i).assume_init_drop() }; + if have != want { + let mut i = 0; + while have != 0 { + if (have & 1) != 0 { + debug_assert!(i < MAX_FRAGMENTS); + unsafe { (*self.frags.get()).get_unchecked_mut(i).assume_init_drop() }; + } + have = have.wrapping_shr(1); + i += 1; } - have = have.wrapping_shr(1); - i += 1; } } }