From 2fa7d8a800358d230a325ac9d9b24dcaaf0f19d8 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 31 Mar 2022 14:54:24 +0200 Subject: [PATCH 1/2] Specialize the container type for antichains Antichains are currently unconditionally heap-allocated. For totally ordered elements, an antichain will container zero or one elements, but not more. This would allow us to avoid the heap allocation and instead store the element inline. To enable this, we add an associated type to `Timestamp` that defines a container storing an antichain of the type. The container implements the functionality to let the current `Antichain` type function without change. The obvious downside of this change is that we now include a type bound `T: Timestamp` on the `Antichain` type, because the elements are based on `T`'s associated type, which infects a bunch of other places. Signed-off-by: Moritz Hoffmann --- timely/src/order.rs | 2 + timely/src/progress/frontier.rs | 125 ++++++++++++------------- timely/src/progress/mod.rs | 2 +- timely/src/progress/timestamp.rs | 151 ++++++++++++++++++++++++++++++- 4 files changed, 208 insertions(+), 72 deletions(-) diff --git a/timely/src/order.rs b/timely/src/order.rs index 6132c5e89..51d903e9c 100644 --- a/timely/src/order.rs +++ b/timely/src/order.rs @@ -100,6 +100,7 @@ mod product { use crate::progress::Timestamp; impl Timestamp for Product { type Summary = Product; + type Container = Vec; fn minimum() -> Self { Self { outer: TOuter::minimum(), inner: TInner::minimum() }} } @@ -159,6 +160,7 @@ mod tuple { use crate::progress::Timestamp; impl Timestamp for (TOuter, TInner) { type Summary = (TOuter::Summary, TInner::Summary); + type Container = Vec; fn minimum() -> Self { (TOuter::minimum(), TInner::minimum()) } } diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 53699dfb8..2717e9b59 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -1,6 +1,6 @@ //! Tracks minimal sets of mutually incomparable elements of a partial order. -use crate::progress::ChangeBatch; +use crate::progress::{ChangeBatch, Timestamp, TimestampContainer}; use crate::order::{PartialOrder, TotalOrder}; /// A set of mutually incomparable elements. @@ -14,11 +14,11 @@ use crate::order::{PartialOrder, TotalOrder}; /// This can make equality testing quadratic, though linear in the common case that the sequences /// are identical. #[derive(Debug, Default, Abomonation, Serialize, Deserialize)] -pub struct Antichain { - elements: Vec +pub struct Antichain { + elements: T::Container, } -impl Antichain { +impl Antichain { /// Updates the `Antichain` if the element is not greater than or equal to some present element. /// /// Returns true if element is added to the set @@ -33,14 +33,7 @@ impl Antichain { /// assert!(!frontier.insert(3)); ///``` pub fn insert(&mut self, element: T) -> bool { - if !self.elements.iter().any(|x| x.less_equal(&element)) { - self.elements.retain(|x| !element.less_equal(x)); - self.elements.push(element); - true - } - else { - false - } + self.elements.insert(element) } /// Reserves capacity for at least additional more elements to be inserted in the given `Antichain` @@ -61,11 +54,7 @@ impl Antichain { /// assert!(!frontier.extend(vec![3, 4])); ///``` pub fn extend>(&mut self, iterator: I) -> bool { - let mut added = false; - for element in iterator { - added = self.insert(element) || added; - } - added + self.elements.extend(iterator) } /// Returns true if any item in the antichain is strictly less than the argument. @@ -85,7 +74,7 @@ impl Antichain { ///``` #[inline] pub fn less_than(&self, time: &T) -> bool { - self.elements.iter().any(|x| x.less_than(time)) + self.elements.less_than(time) } /// Returns true if any item in the antichain is less than or equal to the argument. @@ -105,7 +94,7 @@ impl Antichain { ///``` #[inline] pub fn less_equal(&self, time: &T) -> bool { - self.elements.iter().any(|x| x.less_equal(time)) + self.elements.less_equal(time) } /// Returns true if every element of `other` is greater or equal to some element of `self`. @@ -116,7 +105,7 @@ impl Antichain { } } -impl std::iter::FromIterator for Antichain { +impl std::iter::FromIterator for Antichain { fn from_iter(iterator: I) -> Self where I: IntoIterator @@ -127,7 +116,7 @@ impl std::iter::FromIterator for Antichain { } } -impl Antichain { +impl Antichain { /// Creates a new empty `Antichain`. /// @@ -138,7 +127,7 @@ impl Antichain { /// /// let mut frontier = Antichain::::new(); ///``` - pub fn new() -> Antichain { Antichain { elements: Vec::new() } } + pub fn new() -> Antichain { Antichain { elements: Default::default() } } /// Creates a new empty `Antichain` with space for `capacity` elements. /// @@ -151,7 +140,7 @@ impl Antichain { ///``` pub fn with_capacity(capacity: usize) -> Self { Self { - elements: Vec::with_capacity(capacity), + elements: T::Container::with_capacity(capacity), } } @@ -164,7 +153,7 @@ impl Antichain { /// /// let mut frontier = Antichain::from_elem(2); ///``` - pub fn from_elem(element: T) -> Antichain { Antichain { elements: vec![element] } } + pub fn from_elem(element: T) -> Antichain { Antichain { elements: T::Container::from_element(element) } } /// Clears the contents of the antichain. /// @@ -196,7 +185,7 @@ impl Antichain { /// let mut frontier = Antichain::from_elem(2); /// assert_eq!(frontier.elements(), &[2]); ///``` - #[inline] pub fn elements(&self) -> &[T] { &self[..] } + #[inline] pub fn elements(&self) -> &[T] { self.elements.as_ref() } /// Reveals the elements in the antichain. /// @@ -208,9 +197,9 @@ impl Antichain { /// let mut frontier = Antichain::from_elem(2); /// assert_eq!(&*frontier.borrow(), &[2]); ///``` - #[inline] pub fn borrow(&self) -> AntichainRef { AntichainRef::new(&self.elements) }} + #[inline] pub fn borrow(&self) -> AntichainRef { AntichainRef::new(self.elements.as_ref()) }} -impl PartialEq for Antichain { +impl PartialEq for Antichain { fn eq(&self, other: &Self) -> bool { // Lengths should be the same, with the option for fast acceptance if identical. self.elements().len() == other.elements().len() && @@ -221,15 +210,15 @@ impl PartialEq for Antichain { } } -impl Eq for Antichain { } +impl Eq for Antichain { } -impl PartialOrder for Antichain { +impl PartialOrder for Antichain { fn less_equal(&self, other: &Self) -> bool { other.elements().iter().all(|t2| self.elements().iter().any(|t1| t1.less_equal(t2))) } } -impl Clone for Antichain { +impl Clone for Antichain { fn clone(&self) -> Self { Antichain { elements: self.elements.clone() } } @@ -238,24 +227,22 @@ impl Clone for Antichain { } } -impl TotalOrder for Antichain { } +impl TotalOrder for Antichain { } -impl Antichain { +impl Antichain { /// Convert to the at most one element the antichain contains. - pub fn into_option(mut self) -> Option { - debug_assert!(self.len() <= 1); - self.elements.pop() + pub fn into_option(self) -> Option { + self.elements.into_option() } /// Return a reference to the at most one element the antichain contains. pub fn as_option(&self) -> Option<&T> { - debug_assert!(self.len() <= 1); - self.elements.last() + self.elements.as_option() } } -impl std::hash::Hash for Antichain { +impl std::hash::Hash for Antichain { fn hash(&self, state: &mut H) { - let mut temp = self.elements.iter().collect::>(); + let mut temp = self.elements.as_ref().iter().collect::>(); temp.sort(); for element in temp { element.hash(state); @@ -263,7 +250,7 @@ impl std::hash::Hash for Antichain { } } -impl From> for Antichain { +impl From> for Antichain { fn from(vec: Vec) -> Self { // TODO: We could reuse `vec` with some care. let mut temp = Antichain::new(); @@ -272,22 +259,24 @@ impl From> for Antichain { } } -impl Into> for Antichain { +impl Into> for Antichain { fn into(self) -> Vec { - self.elements + self.elements.into_vec() } } -impl ::std::ops::Deref for Antichain { +impl ::std::ops::Deref for Antichain { type Target = [T]; fn deref(&self) -> &Self::Target { - &self.elements + self.elements.as_ref() } } -impl ::std::iter::IntoIterator for Antichain { +impl std::iter::IntoIterator for Antichain +where T::Container: std::iter::IntoIterator +{ type Item = T; - type IntoIter = ::std::vec::IntoIter; + type IntoIter = ::IntoIter; fn into_iter(self) -> Self::IntoIter { self.elements.into_iter() } @@ -315,7 +304,7 @@ pub struct MutableAntichain { changes: ChangeBatch, } -impl MutableAntichain { +impl MutableAntichain { /// Creates a new empty `MutableAntichain`. /// /// # Examples @@ -530,7 +519,7 @@ impl MutableAntichain { } } -impl Default for MutableAntichain { +impl Default for MutableAntichain { fn default() -> Self { Self::new() } @@ -556,20 +545,22 @@ pub trait MutableAntichainFilter { fn filter_through(self, antichain: &mut MutableAntichain) -> ::std::vec::Drain<(T,i64)>; } -impl> MutableAntichainFilter for I { +impl> MutableAntichainFilter for I { fn filter_through(self, antichain: &mut MutableAntichain) -> ::std::vec::Drain<(T,i64)> { antichain.update_iter(self.into_iter()) } } -impl From> for MutableAntichain { +impl From> for MutableAntichain +where T::Container: std::iter::IntoIterator +{ fn from(antichain: Antichain) -> Self { let mut result = MutableAntichain::new(); result.update_iter(antichain.into_iter().map(|time| (time, 1))); result } } -impl<'a, T: PartialOrder+Ord+Clone> From> for MutableAntichain { +impl<'a, T: Timestamp> From> for MutableAntichain { fn from(antichain: AntichainRef<'a, T>) -> Self { let mut result = MutableAntichain::new(); result.update_iter(antichain.into_iter().map(|time| (time.clone(), 1))); @@ -577,7 +568,7 @@ impl<'a, T: PartialOrder+Ord+Clone> From> for MutableAnticha } } -impl std::iter::FromIterator<(T, i64)> for MutableAntichain +impl std::iter::FromIterator<(T, i64)> for MutableAntichain where T: Clone + PartialOrder + Ord, { @@ -608,7 +599,7 @@ impl<'a, T: 'a> Clone for AntichainRef<'a, T> { impl<'a, T: 'a> Copy for AntichainRef<'a, T> { } -impl<'a, T: 'a> AntichainRef<'a, T> { +impl<'a, T: Timestamp + 'a> AntichainRef<'a, T> { /// Create a new `AntichainRef` from a reference to a slice of elements forming the frontier. /// /// This method does not check that this antichain has any particular properties, for example @@ -631,7 +622,7 @@ impl<'a, T: 'a> AntichainRef<'a, T> { ///``` pub fn to_owned(&self) -> Antichain where T: Clone { Antichain { - elements: self.frontier.to_vec() + elements: T::Container::from_iter(self.frontier.into_iter().cloned()) } } } @@ -720,31 +711,27 @@ impl<'a, T: 'a> ::std::iter::IntoIterator for &'a AntichainRef<'a, T> { #[cfg(test)] mod tests { use std::collections::HashSet; + use crate::order::Product; use super::*; - #[derive(PartialEq, Eq, PartialOrd, Ord, Hash)] - struct Elem(char, usize); - - impl PartialOrder for Elem { - fn less_equal(&self, other: &Self) -> bool { - self.0 <= other.0 && self.1 <= other.1 - } + fn elem(a: char, b: u8) -> Product { + Product::new(a as u8 - 'a' as u8, b) } #[test] fn antichain_hash() { let mut hashed = HashSet::new(); - hashed.insert(Antichain::from(vec![Elem('a', 2), Elem('b', 1)])); + hashed.insert(Antichain::from(vec![elem('a', 2), elem('b', 1)])); - assert!(hashed.contains(&Antichain::from(vec![Elem('a', 2), Elem('b', 1)]))); - assert!(hashed.contains(&Antichain::from(vec![Elem('b', 1), Elem('a', 2)]))); + assert!(hashed.contains(&Antichain::from(vec![elem('a', 2), elem('b', 1)]))); + assert!(hashed.contains(&Antichain::from(vec![elem('b', 1), elem('a', 2)]))); - assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 2)]))); - assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1)]))); - assert!(!hashed.contains(&Antichain::from(vec![Elem('b', 2)]))); - assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1), Elem('b', 2)]))); - assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)]))); + assert!(!hashed.contains(&Antichain::from(vec![elem('a', 2)]))); + assert!(!hashed.contains(&Antichain::from(vec![elem('a', 1)]))); + assert!(!hashed.contains(&Antichain::from(vec![elem('b', 2)]))); + assert!(!hashed.contains(&Antichain::from(vec![elem('a', 1), elem('b', 2)]))); + assert!(!hashed.contains(&Antichain::from(vec![elem('c', 3)]))); assert!(!hashed.contains(&Antichain::from(vec![]))); } diff --git a/timely/src/progress/mod.rs b/timely/src/progress/mod.rs index 5d169c75d..013b88c7c 100644 --- a/timely/src/progress/mod.rs +++ b/timely/src/progress/mod.rs @@ -2,7 +2,7 @@ pub use self::operate::Operate; pub use self::subgraph::{Subgraph, SubgraphBuilder}; -pub use self::timestamp::{Timestamp, PathSummary}; +pub use self::timestamp::{Timestamp, PathSummary, TimestampContainer}; pub use self::change_batch::ChangeBatch; pub use self::frontier::Antichain; diff --git a/timely/src/progress/timestamp.rs b/timely/src/progress/timestamp.rs index 9471d3237..88bfa0941 100644 --- a/timely/src/progress/timestamp.rs +++ b/timely/src/progress/timestamp.rs @@ -10,12 +10,157 @@ use crate::order::PartialOrder; /// A composite trait for types that serve as timestamps in timely dataflow. pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+Data+Hash+Ord { + /// A type storing timestamps. + type Container: TimestampContainer + Debug; /// A type summarizing action on a timestamp along a dataflow path. - type Summary : PathSummary + 'static; + type Summary : Timestamp + PathSummary + 'static; /// A minimum value suitable as a default. fn minimum() -> Self; } +/// A type to store timestamps. This serves as a means to avoid heap-allocating space for +/// totally ordered timestamps, where we know that a frontier is either empty or has exactly +/// one element. +pub trait TimestampContainer: Default + Clone { + /// Create this container with the specified capacity hint. + fn with_capacity(_capacity: usize) -> Self { Default::default()} + /// Create this container from the supplied element. + fn from_element(element: T) -> Self; + /// Create this container from an iterator of elements. + fn from_iter>(iterator: I) -> Self { + let iter = iterator.into_iter(); + let mut this = Self::with_capacity(iter.size_hint().0); + this.extend(iter); + this + } + /// Remove all elements from this container, potentially leaving allocations in place. + fn clear(&mut self); + /// Sort the elements of this container. + fn sort(&mut self) where T: Ord; + + /// Insert a single element into this container while maintaining the frontier invariants. + /// Returns whether the container changed because of the insert. + fn insert(&mut self, element: T) -> bool; + + /// Extend the container by individually inserting the elments. Returns whether the container + /// changed. + fn extend>(&mut self, iterator: I) -> bool { + let mut added = false; + for element in iterator { + added = self.insert(element) || added; + } + added + } + /// Reserve space for additional elements. + fn reserve(&mut self, _additional: usize) {} + /// Test if this container is less than the provided `time`. + fn less_than(&self, time: &T) -> bool; + /// Test if this container is less or equal the provided `time`. + fn less_equal(&self, time: &T) -> bool; + /// Represent this container as a reference to a slice. + fn as_ref(&self) -> &[T]; + /// Convert to the at most one element the antichain contains. + fn into_option(self) -> Option; + /// Return a reference to the at most one element the antichain contains. + fn as_option(&self) -> Option<&T>; + /// Convert this container into a vector. + fn into_vec(self) -> Vec; +} + +impl TimestampContainer for Option { + fn from_element(element: T) -> Self { Some(element) } + fn insert(&mut self, element: T) -> bool { + match self { + Some(e) if !e.less_equal(&element) => { + *self = Some(element); + true + } + None => { + *self = Some(element); + true + } + _ => false, + } + } + fn clear(&mut self) { + *self = None + } + fn sort(&mut self) where T: Ord { + } + + fn less_than(&self, time: &T) -> bool { + self.as_ref().map(|x| x.less_than(time)).unwrap_or(false) + } + + fn less_equal(&self, time: &T) -> bool { + self.as_ref().map(|x| x.less_equal(time)).unwrap_or(false) + } + + fn as_ref(&self) -> &[T] { + match self { + None => &[], + Some(element) => std::slice::from_ref(element), + } + } + fn into_option(self) -> Option { + self + } + fn as_option(&self) -> Option<&T> { + self.as_ref() + } + fn into_vec(self) -> Vec { + match self { + None => vec![], + Some(element) => vec![element], + } + } +} + +impl TimestampContainer for Vec { + fn from_element(element: T) -> Self { vec![element] } + fn with_capacity(capacity: usize) -> Self { + Vec::with_capacity(capacity) + } + fn insert(&mut self, element: T) -> bool { + if !self.iter().any(|x| x.less_equal(&element)) { + self.retain(|x| !element.less_equal(x)); + self.push(element); + true + } else { + false + } + } + fn clear(&mut self) { + self.clear() + } + fn sort(&mut self) where T: Ord { + <[T]>::sort(self) + } + + fn less_than(&self, time: &T) -> bool { + self.iter().any(|x| x.less_than(time)) + } + + fn less_equal(&self, time: &T) -> bool { + self.iter().any(|x| x.less_equal(time)) + } + fn as_ref(&self) -> &[T] { + &self + } + fn into_option(mut self) -> Option { + debug_assert!(self.len() <= 1); + self.pop() + } + fn as_option(&self) -> Option<&T> { + debug_assert!(self.len() <= 1); + self.last() + } + fn into_vec(self) -> Vec { + self + } +} + + /// A summary of how a timestamp advances along a timely dataflow path. pub trait PathSummary : Clone+'static+Eq+PartialOrder+Debug+Default { /// Advances a timestamp according to the timestamp actions on the path. @@ -61,7 +206,7 @@ pub trait PathSummary : Clone+'static+Eq+PartialOrder+Debug+Default { fn followed_by(&self, other: &Self) -> Option; } -impl Timestamp for () { type Summary = (); fn minimum() -> Self { () }} +impl Timestamp for () { type Summary = (); type Container = Option<()>; fn minimum() -> Self { () }} impl PathSummary<()> for () { #[inline] fn results_in(&self, _src: &()) -> Option<()> { Some(()) } #[inline] fn followed_by(&self, _other: &()) -> Option<()> { Some(()) } @@ -73,6 +218,7 @@ macro_rules! implement_timestamp_add { $( impl Timestamp for $index_type { type Summary = $index_type; + type Container = Option<$index_type>; fn minimum() -> Self { Self::min_value() } } impl PathSummary<$index_type> for $index_type { @@ -89,6 +235,7 @@ implement_timestamp_add!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, impl Timestamp for ::std::time::Duration { type Summary = ::std::time::Duration; + type Container = Option<::std::time::Duration>; fn minimum() -> Self { ::std::time::Duration::new(0, 0) } } impl PathSummary<::std::time::Duration> for ::std::time::Duration { From 4ab758dbf3011832291214442c159577af3c32d1 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 8 Jun 2022 16:16:01 +0200 Subject: [PATCH 2/2] timestamp: Option container for totally ordered elements Signed-off-by: Moritz Hoffmann --- timely/src/progress/timestamp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timely/src/progress/timestamp.rs b/timely/src/progress/timestamp.rs index 88bfa0941..a98cf1742 100644 --- a/timely/src/progress/timestamp.rs +++ b/timely/src/progress/timestamp.rs @@ -6,7 +6,7 @@ use std::default::Default; use std::hash::Hash; use crate::communication::Data; -use crate::order::PartialOrder; +use crate::order::{PartialOrder, TotalOrder}; /// A composite trait for types that serve as timestamps in timely dataflow. pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+Data+Hash+Ord { @@ -67,7 +67,7 @@ pub trait TimestampContainer: Default + Clone { fn into_vec(self) -> Vec; } -impl TimestampContainer for Option { +impl TimestampContainer for Option { fn from_element(element: T) -> Self { Some(element) } fn insert(&mut self, element: T) -> bool { match self {