From fdcab18f3b2fb2b3b720c7b4676da2738102bac5 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Feb 2026 10:12:23 -0500 Subject: [PATCH 1/6] Centralize VecContainer implementations --- differential-dataflow/src/collection.rs | 821 ++++++++++++------------ 1 file changed, 423 insertions(+), 398 deletions(-) diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index dba96a62b..6b8c33567 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -8,38 +8,14 @@ //! manually. The higher-level of programming allows differential dataflow to provide efficient //! implementations, and to support efficient incremental updates to the collections. -use std::hash::Hash; - use timely::{Container, Data}; use timely::progress::Timestamp; -use timely::order::Product; -use timely::dataflow::scopes::{Child, child::Iterative}; +use timely::dataflow::scopes::Child; use timely::dataflow::Scope; use timely::dataflow::operators::*; use timely::dataflow::StreamCore; -use crate::difference::{Semigroup, Abelian, Multiply}; -use crate::lattice::Lattice; -use crate::hashable::Hashable; - -/// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers. -/// -/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your -/// differential dataflow computation, you write as if the collection is a static dataset to which you -/// apply functional transformations, creating new collections. Once your computation is written, you -/// are able to mutate the collection (by inserting and removing elements); differential dataflow will -/// propagate changes through your functional computation and report the corresponding changes to the -/// output collections. -/// -/// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the -/// collection exists; as you write more complicated programs you may wish to introduce nested scopes -/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D` -/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec>)`. -/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and -/// defaults to) `isize`, representing changes to the occurrence count of each record. -/// -/// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`. -pub type VecCollection = Collection::Timestamp, R)>>; +use crate::difference::Abelian; /// An evolving collection represented by a stream of abstract containers. /// @@ -285,317 +261,6 @@ impl Collection { } } -impl VecCollection { - /// Creates a new collection by applying the supplied function to each input element. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// scope.new_collection_from(1 .. 10).1 - /// .map(|x| x * 2) - /// .filter(|x| x % 2 == 1) - /// .assert_empty(); - /// }); - /// ``` - pub fn map(&self, mut logic: L) -> VecCollection - where - D2: Data, - L: FnMut(D) -> D2 + 'static, - { - self.inner - .map(move |(data, time, delta)| (logic(data), time, delta)) - .as_collection() - } - /// Creates a new collection by applying the supplied function to each input element. - /// - /// Although the name suggests in-place mutation, this function does not change the source collection, - /// but rather re-uses the underlying allocations in its implementation. The method is semantically - /// equivalent to `map`, but can be more efficient. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// scope.new_collection_from(1 .. 10).1 - /// .map_in_place(|x| *x *= 2) - /// .filter(|x| x % 2 == 1) - /// .assert_empty(); - /// }); - /// ``` - pub fn map_in_place(&self, mut logic: L) -> VecCollection - where - L: FnMut(&mut D) + 'static, - { - self.inner - .map_in_place(move |&mut (ref mut data, _, _)| logic(data)) - .as_collection() - } - /// Creates a new collection by applying the supplied function to each input element and accumulating the results. - /// - /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be - /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before - /// attempting to consolidate the results. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// scope.new_collection_from(1 .. 10).1 - /// .flat_map(|x| 0 .. x); - /// }); - /// ``` - pub fn flat_map(&self, mut logic: L) -> VecCollection - where - G::Timestamp: Clone, - I: IntoIterator, - L: FnMut(D) -> I + 'static, - { - self.inner - .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone()))) - .as_collection() - } - /// Creates a new collection containing those input records satisfying the supplied predicate. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// scope.new_collection_from(1 .. 10).1 - /// .map(|x| x * 2) - /// .filter(|x| x % 2 == 1) - /// .assert_empty(); - /// }); - /// ``` - pub fn filter(&self, mut logic: L) -> VecCollection - where - L: FnMut(&D) -> bool + 'static, - { - self.inner - .filter(move |(data, _, _)| logic(data)) - .as_collection() - } - /// Replaces each record with another, with a new difference type. - /// - /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) - /// and move the data into the difference component. This will allow differential dataflow to update in-place. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let nums = scope.new_collection_from(0 .. 10).1; - /// let x1 = nums.flat_map(|x| 0 .. x); - /// let x2 = nums.map(|x| (x, 9 - x)) - /// .explode(|(x,y)| Some((x,y))); - /// - /// x1.assert_eq(&x2); - /// }); - /// ``` - pub fn explode(&self, mut logic: L) -> VecCollection>::Output> - where - D2: Data, - R2: Semigroup+Multiply, - I: IntoIterator, - L: FnMut(D)->I+'static, - { - self.inner - .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d)))) - .as_collection() - } - - /// Joins each record against a collection defined by the function `logic`. - /// - /// This method performs what is essentially a join with the collection of records `(x, logic(x))`. - /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate - /// modifications made to the results, namely joining timestamps and multiplying differences. - /// - /// #Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// // creates `x` copies of `2*x` from time `3*x` until `4*x`, - /// // for x from 0 through 9. - /// scope.new_collection_from(0 .. 10isize).1 - /// .join_function(|x| - /// // data time diff - /// vec![(2*x, (3*x) as u64, x), - /// (2*x, (4*x) as u64, -x)] - /// ); - /// }); - /// ``` - pub fn join_function(&self, mut logic: L) -> VecCollection>::Output> - where - G::Timestamp: Lattice, - D2: Data, - R2: Semigroup+Multiply, - I: IntoIterator, - L: FnMut(D)->I+'static, - { - self.inner - .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d)))) - .as_collection() - } - - /// Brings a Collection into a nested scope, at varying times. - /// - /// The `initial` function indicates the time at which each element of the Collection should appear. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::Scope; - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let result = scope.iterative::(|child| { - /// data.enter_at(child, |x| *x) - /// .leave() - /// }); - /// - /// data.assert_eq(&result); - /// }); - /// ``` - pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> VecCollection, D, R> - where - T: Timestamp+Hash, - F: FnMut(&D) -> T + Clone + 'static, - G::Timestamp: Hash, - { - self.inner - .enter(child) - .map(move |(data, time, diff)| { - let new_time = Product::new(time, initial(&data)); - (data, new_time, diff) - }) - .as_collection() - } - - /// Delays each difference by a supplied function. - /// - /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly - /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are - /// ordered, they should have the same order or compare equal once `func` is applied to them (this - /// is because we advance the timely capability with the same logic, and it must remain `less_equal` - /// to all of the data timestamps). - pub fn delay(&self, func: F) -> VecCollection - where - G::Timestamp: Hash, - F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static, - { - let mut func1 = func.clone(); - let mut func2 = func.clone(); - - self.inner - .delay_batch(move |x| func1(x)) - .map_in_place(move |x| x.1 = func2(&x.1)) - .as_collection() - } - - /// Applies a supplied function to each update. - /// - /// This method is most commonly used to report information back to the user, often for debugging purposes. - /// Any function can be used here, but be warned that the incremental nature of differential dataflow does - /// not guarantee that it will be called as many times as you might expect. - /// - /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect - /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect - /// the changes along the sequence of collections. For partially ordered times, the mathematics are more - /// interesting and less intuitive, unfortunately. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// scope.new_collection_from(1 .. 10).1 - /// .map_in_place(|x| *x *= 2) - /// .filter(|x| x % 2 == 1) - /// .inspect(|x| println!("error: {:?}", x)); - /// }); - /// ``` - pub fn inspect(&self, func: F) -> VecCollection - where - F: FnMut(&(D, G::Timestamp, R))+'static, - { - self.inner - .inspect(func) - .as_collection() - } - /// Applies a supplied function to each batch of updates. - /// - /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the - /// timely dataflow capability associated with the batch of updates. The observed batching depends - /// on how the system executes, and may vary run to run. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// scope.new_collection_from(1 .. 10).1 - /// .map_in_place(|x| *x *= 2) - /// .filter(|x| x % 2 == 1) - /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs)); - /// }); - /// ``` - pub fn inspect_batch(&self, mut func: F) -> VecCollection - where - F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static, - { - self.inner - .inspect_batch(move |time, data| func(time, data)) - .as_collection() - } - - /// Assert if the collection is ever non-empty. - /// - /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation - /// is not run, or not run to completion, there may be un-exercised times at which the collection could be - /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a - /// program should indicate that this assertion never found cause to complain. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// scope.new_collection_from(1 .. 10).1 - /// .map(|x| x * 2) - /// .filter(|x| x % 2 == 1) - /// .assert_empty(); - /// }); - /// ``` - pub fn assert_empty(&self) - where - D: crate::ExchangeData+Hashable, - R: crate::ExchangeData+Hashable + Semigroup, - G::Timestamp: Lattice+Ord, - { - self.consolidate() - .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); - } -} - use timely::dataflow::scopes::ScopeParent; use timely::progress::timestamp::Refines; @@ -650,41 +315,392 @@ impl Collection, C> } } -/// Methods requiring an Abelian difference, to support negation. -impl, D: Clone+'static, R: Abelian+'static> VecCollection { - /// Assert if the collections are ever different. - /// - /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation - /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary. - /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should - /// indicate that this assertion never found cause to complain. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.concat(&evens) - /// .assert_eq(&data); - /// }); - /// ``` - pub fn assert_eq(&self, other: &Self) - where - D: crate::ExchangeData+Hashable, - R: crate::ExchangeData+Hashable, - G::Timestamp: Lattice+Ord, - { - self.negate() - .concat(other) - .assert_empty(); +pub use vec::Collection as VecCollection; +/// Specializations of `Collection` that use `Vec` as the container. +pub mod vec { + + use std::hash::Hash; + + use timely::Data; + use timely::progress::Timestamp; + use timely::order::Product; + use timely::dataflow::scopes::child::Iterative; + use timely::dataflow::{Scope, ScopeParent}; + use timely::dataflow::operators::*; + + use crate::collection::AsCollection; + use crate::difference::{Semigroup, Abelian, Multiply}; + use crate::lattice::Lattice; + use crate::hashable::Hashable; + + /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers. + /// + /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your + /// differential dataflow computation, you write as if the collection is a static dataset to which you + /// apply functional transformations, creating new collections. Once your computation is written, you + /// are able to mutate the collection (by inserting and removing elements); differential dataflow will + /// propagate changes through your functional computation and report the corresponding changes to the + /// output collections. + /// + /// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the + /// collection exists; as you write more complicated programs you may wish to introduce nested scopes + /// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D` + /// parameter is the type of data in your collection, for example `String`, or `(u32, Vec>)`. + /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and + /// defaults to) `isize`, representing changes to the occurrence count of each record. + /// + /// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`. + pub type Collection = super::Collection::Timestamp, R)>>; + + + impl Collection { + /// Creates a new collection by applying the supplied function to each input element. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| x * 2) + /// .filter(|x| x % 2 == 1) + /// .assert_empty(); + /// }); + /// ``` + pub fn map(&self, mut logic: L) -> Collection + where + D2: Data, + L: FnMut(D) -> D2 + 'static, + { + self.inner + .map(move |(data, time, delta)| (logic(data), time, delta)) + .as_collection() + } + /// Creates a new collection by applying the supplied function to each input element. + /// + /// Although the name suggests in-place mutation, this function does not change the source collection, + /// but rather re-uses the underlying allocations in its implementation. The method is semantically + /// equivalent to `map`, but can be more efficient. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .map_in_place(|x| *x *= 2) + /// .filter(|x| x % 2 == 1) + /// .assert_empty(); + /// }); + /// ``` + pub fn map_in_place(&self, mut logic: L) -> Collection + where + L: FnMut(&mut D) + 'static, + { + self.inner + .map_in_place(move |&mut (ref mut data, _, _)| logic(data)) + .as_collection() + } + /// Creates a new collection by applying the supplied function to each input element and accumulating the results. + /// + /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be + /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before + /// attempting to consolidate the results. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .flat_map(|x| 0 .. x); + /// }); + /// ``` + pub fn flat_map(&self, mut logic: L) -> Collection + where + G::Timestamp: Clone, + I: IntoIterator, + L: FnMut(D) -> I + 'static, + { + self.inner + .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone()))) + .as_collection() + } + /// Creates a new collection containing those input records satisfying the supplied predicate. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| x * 2) + /// .filter(|x| x % 2 == 1) + /// .assert_empty(); + /// }); + /// ``` + pub fn filter(&self, mut logic: L) -> Collection + where + L: FnMut(&D) -> bool + 'static, + { + self.inner + .filter(move |(data, _, _)| logic(data)) + .as_collection() + } + /// Replaces each record with another, with a new difference type. + /// + /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) + /// and move the data into the difference component. This will allow differential dataflow to update in-place. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let nums = scope.new_collection_from(0 .. 10).1; + /// let x1 = nums.flat_map(|x| 0 .. x); + /// let x2 = nums.map(|x| (x, 9 - x)) + /// .explode(|(x,y)| Some((x,y))); + /// + /// x1.assert_eq(&x2); + /// }); + /// ``` + pub fn explode(&self, mut logic: L) -> Collection>::Output> + where + D2: Data, + R2: Semigroup+Multiply, + I: IntoIterator, + L: FnMut(D)->I+'static, + { + self.inner + .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d)))) + .as_collection() + } + + /// Joins each record against a collection defined by the function `logic`. + /// + /// This method performs what is essentially a join with the collection of records `(x, logic(x))`. + /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate + /// modifications made to the results, namely joining timestamps and multiplying differences. + /// + /// #Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// // creates `x` copies of `2*x` from time `3*x` until `4*x`, + /// // for x from 0 through 9. + /// scope.new_collection_from(0 .. 10isize).1 + /// .join_function(|x| + /// // data time diff + /// vec![(2*x, (3*x) as u64, x), + /// (2*x, (4*x) as u64, -x)] + /// ); + /// }); + /// ``` + pub fn join_function(&self, mut logic: L) -> Collection>::Output> + where + G::Timestamp: Lattice, + D2: Data, + R2: Semigroup+Multiply, + I: IntoIterator, + L: FnMut(D)->I+'static, + { + self.inner + .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d)))) + .as_collection() + } + + /// Brings a Collection into a nested scope, at varying times. + /// + /// The `initial` function indicates the time at which each element of the Collection should appear. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let result = scope.iterative::(|child| { + /// data.enter_at(child, |x| *x) + /// .leave() + /// }); + /// + /// data.assert_eq(&result); + /// }); + /// ``` + pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> + where + T: Timestamp+Hash, + F: FnMut(&D) -> T + Clone + 'static, + G::Timestamp: Hash, + { + self.inner + .enter(child) + .map(move |(data, time, diff)| { + let new_time = Product::new(time, initial(&data)); + (data, new_time, diff) + }) + .as_collection() + } + + /// Delays each difference by a supplied function. + /// + /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly + /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are + /// ordered, they should have the same order or compare equal once `func` is applied to them (this + /// is because we advance the timely capability with the same logic, and it must remain `less_equal` + /// to all of the data timestamps). + pub fn delay(&self, func: F) -> Collection + where + G::Timestamp: Hash, + F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static, + { + let mut func1 = func.clone(); + let mut func2 = func.clone(); + + self.inner + .delay_batch(move |x| func1(x)) + .map_in_place(move |x| x.1 = func2(&x.1)) + .as_collection() + } + + /// Applies a supplied function to each update. + /// + /// This method is most commonly used to report information back to the user, often for debugging purposes. + /// Any function can be used here, but be warned that the incremental nature of differential dataflow does + /// not guarantee that it will be called as many times as you might expect. + /// + /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect + /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect + /// the changes along the sequence of collections. For partially ordered times, the mathematics are more + /// interesting and less intuitive, unfortunately. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .map_in_place(|x| *x *= 2) + /// .filter(|x| x % 2 == 1) + /// .inspect(|x| println!("error: {:?}", x)); + /// }); + /// ``` + pub fn inspect(&self, func: F) -> Collection + where + F: FnMut(&(D, G::Timestamp, R))+'static, + { + self.inner + .inspect(func) + .as_collection() + } + /// Applies a supplied function to each batch of updates. + /// + /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the + /// timely dataflow capability associated with the batch of updates. The observed batching depends + /// on how the system executes, and may vary run to run. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .map_in_place(|x| *x *= 2) + /// .filter(|x| x % 2 == 1) + /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs)); + /// }); + /// ``` + pub fn inspect_batch(&self, mut func: F) -> Collection + where + F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static, + { + self.inner + .inspect_batch(move |time, data| func(time, data)) + .as_collection() + } + + /// Assert if the collection is ever non-empty. + /// + /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation + /// is not run, or not run to completion, there may be un-exercised times at which the collection could be + /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a + /// program should indicate that this assertion never found cause to complain. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| x * 2) + /// .filter(|x| x % 2 == 1) + /// .assert_empty(); + /// }); + /// ``` + pub fn assert_empty(&self) + where + D: crate::ExchangeData+Hashable, + R: crate::ExchangeData+Hashable + Semigroup, + G::Timestamp: Lattice+Ord, + { + self.consolidate() + .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); + } + } + + /// Methods requiring an Abelian difference, to support negation. + impl, D: Clone+'static, R: Abelian+'static> Collection { + /// Assert if the collections are ever different. + /// + /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation + /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary. + /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should + /// indicate that this assertion never found cause to complain. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.concat(&evens) + /// .assert_eq(&data); + /// }); + /// ``` + pub fn assert_eq(&self, other: &Self) + where + D: crate::ExchangeData+Hashable, + R: crate::ExchangeData+Hashable, + G::Timestamp: Lattice+Ord, + { + self.negate() + .concat(other) + .assert_empty(); + } } + } /// Conversion to a differential dataflow Collection. @@ -738,22 +754,11 @@ where /// Traits that can be implemented by containers to provide functionality to collections based on them. pub mod containers { - use timely::progress::{Timestamp, timestamp::Refines}; - use crate::collection::Abelian; - /// A container that can negate its updates. pub trait Negate { /// Negates Abelian differences of each update. fn negate(self) -> Self; } - impl Negate for Vec<(D, T, R)> { - fn negate(mut self) -> Self { - for (_data, _time, diff) in self.iter_mut() { - diff.negate(); - } - self - } - } /// A container that can enter from timestamp `T1` into timestamp `T2`. pub trait Enter { @@ -762,12 +767,6 @@ pub mod containers { /// Update timestamps from `T1` to `T2`. fn enter(self) -> Self::InnerContainer; } - impl, R> Enter for Vec<(D, T1, R)> { - type InnerContainer = Vec<(D, T2, R)>; - fn enter(self) -> Self::InnerContainer { - self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect() - } - } /// A container that can leave from timestamp `T1` into timestamp `T2`. pub trait Leave { @@ -776,22 +775,48 @@ pub mod containers { /// Update timestamps from `T1` to `T2`. fn leave(self) -> Self::OuterContainer; } - impl, T2: Timestamp, R> Leave for Vec<(D, T1, R)> { - type OuterContainer = Vec<(D, T2, R)>; - fn leave(self) -> Self::OuterContainer { - self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect() - } - } /// A container that can advance timestamps by a summary `TS`. pub trait ResultsIn { /// Advance times in the container by `step`. fn results_in(self, step: &TS) -> Self; } - impl ResultsIn for Vec<(D, T, R)> { - fn results_in(self, step: &T::Summary) -> Self { - use timely::progress::PathSummary; - self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect() + + + /// Implementations of container traits for the `Vec` container. + mod vec { + + use timely::progress::{Timestamp, timestamp::Refines}; + use crate::collection::Abelian; + + use super::{Negate, Enter, Leave, ResultsIn}; + + impl Negate for Vec<(D, T, R)> { + fn negate(mut self) -> Self { + for (_data, _time, diff) in self.iter_mut() { diff.negate(); } + self + } + } + + impl, R> Enter for Vec<(D, T1, R)> { + type InnerContainer = Vec<(D, T2, R)>; + fn enter(self) -> Self::InnerContainer { + self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect() + } + } + + impl, T2: Timestamp, R> Leave for Vec<(D, T1, R)> { + type OuterContainer = Vec<(D, T2, R)>; + fn leave(self) -> Self::OuterContainer { + self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect() + } + } + + impl ResultsIn for Vec<(D, T, R)> { + fn results_in(self, step: &T::Summary) -> Self { + use timely::progress::PathSummary; + self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect() + } } } } From c48aa2cd6b4c8783ad5ab9f9c2e332e6afc5438a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Feb 2026 11:38:31 -0500 Subject: [PATCH 2/6] Move Vec::Reduce methods to inherent methods --- differential-dataflow/examples/arrange.rs | 1 - .../src/algorithms/identifiers.rs | 2 +- differential-dataflow/src/collection.rs | 53 +++++++++++++++ .../src/operators/arrange/agent.rs | 7 +- differential-dataflow/src/operators/mod.rs | 2 +- differential-dataflow/src/operators/reduce.rs | 66 ------------------- differential-dataflow/tests/import.rs | 13 +++- differential-dataflow/tests/reduce.rs | 2 +- 8 files changed, 69 insertions(+), 77 deletions(-) diff --git a/differential-dataflow/examples/arrange.rs b/differential-dataflow/examples/arrange.rs index afe412440..a49136e2d 100644 --- a/differential-dataflow/examples/arrange.rs +++ b/differential-dataflow/examples/arrange.rs @@ -7,7 +7,6 @@ use timely::scheduling::Scheduler; use differential_dataflow::input::Input; use differential_dataflow::AsCollection; use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::operators::join::JoinCore; use differential_dataflow::operators::Iterate; diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index 88ab1fcf7..736a9a170 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -98,7 +98,7 @@ mod tests { // there are collisions, everyone gets a unique identifier. use crate::input::Input; - use crate::operators::{Threshold, Reduce}; + use crate::operators::Threshold; use crate::operators::iterate::Iterate; ::timely::example(|scope| { diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 6b8c33567..493f47e3e 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -701,6 +701,59 @@ pub mod vec { } } + + + impl Collection + where + G: Scope, + K: crate::ExchangeData+Hashable, + V: crate::ExchangeData, + R: crate::ExchangeData+Semigroup, + { + /// Applies a reduction function on records grouped by key. + /// + /// Input data must be structured as `(key, val)` pairs. + /// The user-supplied reduction function takes as arguments + /// + /// 1. a reference to the key, + /// 2. a reference to the slice of values and their accumulated updates, + /// 3. a mutuable reference to a vector to populate with output values and accumulated updates. + /// + /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the + /// slice of input values is non-empty. The values are presented in sorted order, as defined by their + /// `Ord` implementations. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// // report the smallest value for each group + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| (x / 3, x)) + /// .reduce(|_key, input, output| { + /// output.push((*input[0].0, 1)) + /// }); + /// }); + /// ``` + pub fn reduce(&self, logic: L) -> Collection + where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { + self.reduce_named("Reduce", logic) + } + + /// As `reduce` with the ability to name the operator. + pub fn reduce_named(&self, name: &str, logic: L) -> Collection + where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { + use crate::operators::arrange::arrangement::ArrangeByKey; + use crate::trace::implementations::{ValBuilder, ValSpine}; + + self.arrange_by_key_named(&format!("Arrange: {}", name)) + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>(name, logic) + .as_collection(|k,v| (k.clone(), v.clone())) + } + } + } /// Conversion to a differential dataflow Collection. diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index b4d82952e..d40ccd4bf 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -186,8 +186,8 @@ impl TraceAgent { /// use timely::Config; /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::arrange::ArrangeBySelf; - /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; + /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; /// /// ::timely::execute(Config::thread(), |worker| { /// @@ -206,7 +206,8 @@ impl TraceAgent { /// // create a second dataflow /// worker.dataflow(move |scope| { /// trace.import(scope) - /// .reduce(move |_key, src, dst| dst.push((*src[0].0, 1))); + /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Reduce", |_key, src, dst| dst.push((*src[0].0, 1))) + /// .as_collection(|k,v| (k.clone(), v.clone())); /// }); /// /// }).unwrap(); @@ -237,7 +238,6 @@ impl TraceAgent { /// use timely::dataflow::operators::Probe; /// use differential_dataflow::input::InputSession; /// use differential_dataflow::operators::arrange::ArrangeBySelf; - /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; /// /// ::timely::execute(Config::thread(), |worker| { @@ -342,7 +342,6 @@ impl TraceAgent { /// use timely::dataflow::operators::Inspect; /// use differential_dataflow::input::InputSession; /// use differential_dataflow::operators::arrange::ArrangeBySelf; - /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::TraceReader; /// use differential_dataflow::input::Input; diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index bba1ca605..5f4929d3d 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -4,7 +4,7 @@ //! operators have specialized implementations to make them work efficiently, and are in addition //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). -pub use self::reduce::{Reduce, Threshold, Count}; +pub use self::reduce::{Threshold, Count}; pub use self::iterate::Iterate; pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 59e49c08a..6f8f14cf6 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -27,72 +27,6 @@ use crate::trace::implementations::containers::BatchContainer; use crate::trace::implementations::merge_batcher::container::MergerChunk; use crate::trace::TraceReader; -/// Extension trait for the `reduce` differential dataflow method. -pub trait Reduce, K: Data, V: Data, R: Semigroup> { - /// Applies a reduction function on records grouped by key. - /// - /// Input data must be structured as `(key, val)` pairs. - /// The user-supplied reduction function takes as arguments - /// - /// 1. a reference to the key, - /// 2. a reference to the slice of values and their accumulated updates, - /// 3. a mutuable reference to a vector to populate with output values and accumulated updates. - /// - /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the - /// slice of input values is non-empty. The values are presented in sorted order, as defined by their - /// `Ord` implementations. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Reduce; - /// - /// ::timely::example(|scope| { - /// // report the smallest value for each group - /// scope.new_collection_from(1 .. 10).1 - /// .map(|x| (x / 3, x)) - /// .reduce(|_key, input, output| { - /// output.push((*input[0].0, 1)) - /// }); - /// }); - /// ``` - fn reduce(&self, logic: L) -> VecCollection - where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_named("Reduce", logic) - } - - /// As `reduce` with the ability to name the operator. - fn reduce_named(&self, name: &str, logic: L) -> VecCollection - where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static; -} - -impl Reduce for VecCollection - where - G: Scope, - K: ExchangeData+Hashable, - V: ExchangeData, - R: ExchangeData+Semigroup, - { - fn reduce_named(&self, name: &str, logic: L) -> VecCollection - where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_named(name, logic) - } -} - -impl Reduce for Arranged -where - G: Scope, - T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a V, Diff=R>+Clone+'static, -{ - fn reduce_named(&self, name: &str, logic: L) -> VecCollection - where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>(name, logic) - .as_collection(|k,v| (k.clone(), v.clone())) - } -} - /// Extension trait for the `threshold` and `distinct` differential dataflow methods. pub trait Threshold, K: Data, R1: Semigroup> { /// Transforms the multiplicity of records. diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index 4d0582c95..f6d1306cd 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -5,7 +5,6 @@ use timely::progress::frontier::AntichainRef; use differential_dataflow::input::InputSession; use differential_dataflow::collection::AsCollection; use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; -use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::trace::TraceReader; use itertools::Itertools; @@ -42,6 +41,9 @@ where #[test] fn test_import_vanilla() { + + use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; + run_test(|input_epochs| { timely::execute(timely::Config::process(4), move |worker| { let ref input_epochs = input_epochs; @@ -59,7 +61,8 @@ fn test_import_vanilla() { ::std::mem::drop(trace); let captured = imported - .reduce(|_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64))) + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>("Reduce", |_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64))) + .as_collection(|k,v| (k.clone(), v.clone())) .inner .exchange(|_| 0) .capture(); @@ -97,6 +100,9 @@ fn test_import_vanilla() { #[test] fn test_import_completed_dataflow() { + + use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; + // Runs the first dataflow to completion before constructing the subscriber. run_test(|input_epochs| { timely::execute(timely::Config::process(4), move |worker| { @@ -129,7 +135,8 @@ fn test_import_completed_dataflow() { ::std::mem::drop(trace); let stream = imported - .reduce(|_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64))) + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>("Reduce", |_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64))) + .as_collection(|k,v| (k.clone(), v.clone())) .inner .exchange(|_| 0); let probe = stream.probe(); diff --git a/differential-dataflow/tests/reduce.rs b/differential-dataflow/tests/reduce.rs index 88ac5477a..c5da11bdb 100644 --- a/differential-dataflow/tests/reduce.rs +++ b/differential-dataflow/tests/reduce.rs @@ -1,7 +1,7 @@ use timely::dataflow::operators::{ToStream, Capture, Map}; use timely::dataflow::operators::capture::Extract; use differential_dataflow::AsCollection; -use differential_dataflow::operators::{Reduce, Count}; +use differential_dataflow::operators::Count; #[test] fn reduce() { From 19317ce6381e40eb99af78a3eed02e736a9e65e8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Feb 2026 12:34:39 -0500 Subject: [PATCH 3/6] Move Vec::Threshold methods to inherent methods --- differential-dataflow/examples/compact.rs | 1 - differential-dataflow/examples/graspan.rs | 1 - .../src/algorithms/identifiers.rs | 2 - differential-dataflow/src/collection.rs | 69 +++++++++++++++++ differential-dataflow/src/operators/mod.rs | 2 +- differential-dataflow/src/operators/reduce.rs | 77 +------------------ dogsdogsdogs/src/lib.rs | 3 +- doop/src/main.rs | 2 +- server/dataflows/reachability/src/lib.rs | 4 +- 9 files changed, 76 insertions(+), 85 deletions(-) diff --git a/differential-dataflow/examples/compact.rs b/differential-dataflow/examples/compact.rs index dd0d33964..df8f58d4d 100644 --- a/differential-dataflow/examples/compact.rs +++ b/differential-dataflow/examples/compact.rs @@ -1,5 +1,4 @@ use differential_dataflow::input::Input; -use differential_dataflow::operators::Threshold; fn main() { diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index eb58e0115..668836600 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -12,7 +12,6 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::input::{Input, InputSession}; use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; use differential_dataflow::operators::iterate::VecVariable; -use differential_dataflow::operators::Threshold; type Node = usize; type Edge = (Node, Node); diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index 736a9a170..e44dbe92c 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -15,7 +15,6 @@ pub trait Identifiers { /// ``` /// use differential_dataflow::input::Input; /// use differential_dataflow::algorithms::identifiers::Identifiers; - /// use differential_dataflow::operators::Threshold; /// /// ::timely::example(|scope| { /// @@ -98,7 +97,6 @@ mod tests { // there are collisions, everyone gets a unique identifier. use crate::input::Input; - use crate::operators::Threshold; use crate::operators::iterate::Iterate; ::timely::example(|scope| { diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 493f47e3e..b8c5f5d42 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -754,6 +754,75 @@ pub mod vec { } } + + impl Collection + where + G: Scope, + K: crate::ExchangeData+Hashable, + R1: crate::ExchangeData+Semigroup + { + + /// Reduces the collection to one occurrence of each distinct element. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// // report at most one of each key. + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| x / 3) + /// .distinct(); + /// }); + /// ``` + pub fn distinct(&self) -> Collection { + self.distinct_core() + } + + /// Distinct for general integer differences. + /// + /// This method allows `distinct` to produce collections whose difference + /// type is something other than an `isize` integer, for example perhaps an + /// `i32`. + pub fn distinct_core>(&self) -> Collection { + self.threshold_named("Distinct", |_,_| R2::from(1i8)) + } + + /// Transforms the multiplicity of records. + /// + /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at + /// least the computation may behave as if it does. Otherwise, the transformation + /// can be nearly arbitrary: the code does not assume any properties of `threshold`. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// // report at most one of each key. + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| x / 3) + /// .threshold(|_,c| c % 2); + /// }); + /// ``` + pub fn thresholdR2+'static>(&self, thresh: F) -> Collection { + self.threshold_named("Threshold", thresh) + } + + /// A `threshold` with the ability to name the operator. + pub fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { + use crate::operators::arrange::arrangement::ArrangeBySelf; + use crate::trace::implementations::{KeyBuilder, KeySpine}; + + self.arrange_by_self_named(&format!("Arrange: {}", name)) + .reduce_abelian::<_,KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + .as_collection(|k,_| k.clone()) + } + + } + } /// Conversion to a differential dataflow Collection. diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 5f4929d3d..6edae1090 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -4,7 +4,7 @@ //! operators have specialized implementations to make them work efficiently, and are in addition //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). -pub use self::reduce::{Threshold, Count}; +pub use self::reduce::Count; pub use self::iterate::Iterate; pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 6f8f14cf6..c9b8e1d06 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -22,86 +22,11 @@ use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgen use crate::lattice::Lattice; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; -use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder}; +use crate::trace::implementations::{ValSpine, ValBuilder}; use crate::trace::implementations::containers::BatchContainer; use crate::trace::implementations::merge_batcher::container::MergerChunk; use crate::trace::TraceReader; -/// Extension trait for the `threshold` and `distinct` differential dataflow methods. -pub trait Threshold, K: Data, R1: Semigroup> { - /// Transforms the multiplicity of records. - /// - /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at - /// least the computation may behave as if it does. Otherwise, the transformation - /// can be nearly arbitrary: the code does not assume any properties of `threshold`. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Threshold; - /// - /// ::timely::example(|scope| { - /// // report at most one of each key. - /// scope.new_collection_from(1 .. 10).1 - /// .map(|x| x / 3) - /// .threshold(|_,c| c % 2); - /// }); - /// ``` - fn thresholdR2+'static>(&self, thresh: F) -> VecCollection { - self.threshold_named("Threshold", thresh) - } - - /// A `threshold` with the ability to name the operator. - fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> VecCollection; - - /// Reduces the collection to one occurrence of each distinct element. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Threshold; - /// - /// ::timely::example(|scope| { - /// // report at most one of each key. - /// scope.new_collection_from(1 .. 10).1 - /// .map(|x| x / 3) - /// .distinct(); - /// }); - /// ``` - fn distinct(&self) -> VecCollection { - self.distinct_core() - } - - /// Distinct for general integer differences. - /// - /// This method allows `distinct` to produce collections whose difference - /// type is something other than an `isize` integer, for example perhaps an - /// `i32`. - fn distinct_core>(&self) -> VecCollection { - self.threshold_named("Distinct", |_,_| R2::from(1i8)) - } -} - -impl, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold for VecCollection { - fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> VecCollection { - self.arrange_by_self_named(&format!("Arrange: {}", name)) - .threshold_named(name, thresh) - } -} - -impl Threshold for Arranged -where - G: Scope, - T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static, -{ - fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> VecCollection { - self.reduce_abelian::<_,KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) - .as_collection(|k,_| k.clone()) - } -} - /// Extension trait for the `count` differential dataflow method. pub trait Count, K: Data, R: Semigroup> { /// Counts the number of occurrences of each element. diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index e202294f4..7d9f577ca 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -6,7 +6,6 @@ use timely::dataflow::operators::Partition; use timely::dataflow::operators::Concatenate; use differential_dataflow::{ExchangeData, VecCollection, AsCollection}; -use differential_dataflow::operators::Threshold; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::TraceAgent; @@ -143,7 +142,9 @@ where // We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation. // counts and validate can share the base arrangement let arranged = collection.arrange_by_self(); + // TODO: This could/should be arrangement to arrangement, via `reduce_abelian`, but the types are a mouthful at the moment. let counts = arranged + .as_collection(|k,_v| k.clone()) .distinct() .map(|(k, _v)| k) .arrange_by_self() diff --git a/doop/src/main.rs b/doop/src/main.rs index a64a71e7c..1f7e16983 100644 --- a/doop/src/main.rs +++ b/doop/src/main.rs @@ -12,7 +12,7 @@ use differential_dataflow::ExchangeData as Data; use differential_dataflow::lattice::Lattice; use differential_dataflow::input::Input; use differential_dataflow::operators::iterate::VecVariable; -use differential_dataflow::operators::{Threshold, Join, JoinCore}; +use differential_dataflow::operators::{Join, JoinCore}; use differential_dataflow::operators::arrange::ArrangeByKey; // Type aliases for differential execution. diff --git a/server/dataflows/reachability/src/lib.rs b/server/dataflows/reachability/src/lib.rs index 30427eb54..223363bcd 100644 --- a/server/dataflows/reachability/src/lib.rs +++ b/server/dataflows/reachability/src/lib.rs @@ -2,7 +2,7 @@ use std::rc::Rc; use std::cell::RefCell; use differential_dataflow::input::Input; -use differential_dataflow::operators::{Iterate, Threshold}; +use differential_dataflow::operators::Iterate; use differential_dataflow::operators::arrange::ArrangeBySelf; use dd_server::{Environment, TraceHandle}; @@ -16,7 +16,7 @@ pub fn build((dataflow, handles, probe, _timer, args): Environment) -> Result<() .get_mut::>>>(&args[0])? .borrow_mut().as_mut().unwrap().import(dataflow); - let source = args[1].parse::().map_err(|_| format!("parse error, source: {:?}", args[1]))?; + let source = args[1].parse::().map_err(|_| format!("parse error, source: {:?}", args[1]))?; let (_input, roots) = dataflow.new_collection_from(Some(source)); // repeatedly update minimal distances each node can be reached from each root From 10e67f203bf1bc4bf0ff7e8eadd4b58e222ff41c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Feb 2026 12:50:53 -0500 Subject: [PATCH 4/6] Move Vec::Count methods to inherent methods --- .../examples/itembased_cf.rs | 2 +- .../examples/multitemporal.rs | 2 +- differential-dataflow/src/collection.rs | 37 ++++++++++++++ differential-dataflow/src/operators/mod.rs | 1 - differential-dataflow/src/operators/reduce.rs | 50 +------------------ differential-dataflow/tests/join.rs | 2 +- differential-dataflow/tests/reduce.rs | 1 - 7 files changed, 41 insertions(+), 54 deletions(-) diff --git a/differential-dataflow/examples/itembased_cf.rs b/differential-dataflow/examples/itembased_cf.rs index 90384fa56..45859f23f 100644 --- a/differential-dataflow/examples/itembased_cf.rs +++ b/differential-dataflow/examples/itembased_cf.rs @@ -1,5 +1,5 @@ use differential_dataflow::input::InputSession; -use differential_dataflow::operators::{Join,CountTotal,Count}; +use differential_dataflow::operators::{Join,CountTotal}; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::join::JoinCore; diff --git a/differential-dataflow/examples/multitemporal.rs b/differential-dataflow/examples/multitemporal.rs index 36acbd5e7..84f48ed22 100644 --- a/differential-dataflow/examples/multitemporal.rs +++ b/differential-dataflow/examples/multitemporal.rs @@ -7,7 +7,7 @@ use timely::progress::frontier::AntichainRef; use timely::PartialOrder; use differential_dataflow::AsCollection; -use differential_dataflow::operators::{Count, arrange::ArrangeBySelf}; +use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::trace::{Cursor, TraceReader}; use pair::Pair; diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index b8c5f5d42..95c3b5b40 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -823,6 +823,43 @@ pub mod vec { } + impl Collection + where + G: Scope, + K: crate::ExchangeData+Hashable, + R: crate::ExchangeData+Semigroup + { + + /// Counts the number of occurrences of each element. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// // report the number of occurrences of each key + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| x / 3) + /// .count(); + /// }); + /// ``` + pub fn count(&self) -> Collection { self.count_core() } + + /// Count for general integer differences. + /// + /// This method allows `count` to produce collections whose difference + /// type is something other than an `isize` integer, for example perhaps an + /// `i32`. + pub fn count_core + 'static>(&self) -> Collection { + use crate::operators::arrange::arrangement::ArrangeBySelf; + use crate::trace::implementations::{ValBuilder, ValSpine}; + self.arrange_by_self_named("Arrange: Count") + .reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + .as_collection(|k,c| (k.clone(), c.clone())) + } + } + } /// Conversion to a differential dataflow Collection. diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 6edae1090..4ceba6754 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -4,7 +4,6 @@ //! operators have specialized implementations to make them work efficiently, and are in addition //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). -pub use self::reduce::Count; pub use self::iterate::Iterate; pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index c9b8e1d06..f8973f599 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -18,62 +18,14 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; -use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; +use crate::operators::arrange::{Arranged, ArrangeByKey, TraceAgent}; use crate::lattice::Lattice; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; -use crate::trace::implementations::{ValSpine, ValBuilder}; use crate::trace::implementations::containers::BatchContainer; use crate::trace::implementations::merge_batcher::container::MergerChunk; use crate::trace::TraceReader; -/// Extension trait for the `count` differential dataflow method. -pub trait Count, K: Data, R: Semigroup> { - /// Counts the number of occurrences of each element. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Count; - /// - /// ::timely::example(|scope| { - /// // report the number of occurrences of each key - /// scope.new_collection_from(1 .. 10).1 - /// .map(|x| x / 3) - /// .count(); - /// }); - /// ``` - fn count(&self) -> VecCollection { - self.count_core() - } - - /// Count for general integer differences. - /// - /// This method allows `count` to produce collections whose difference - /// type is something other than an `isize` integer, for example perhaps an - /// `i32`. - fn count_core + 'static>(&self) -> VecCollection; -} - -impl, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count for VecCollection { - fn count_core + 'static>(&self) -> VecCollection { - self.arrange_by_self_named("Arrange: Count") - .count_core() - } -} - -impl Count for Arranged -where - G: Scope, - T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static, -{ - fn count_core + 'static>(&self) -> VecCollection { - self.reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) - .as_collection(|k,c| (k.clone(), c.clone())) - } -} - /// Extension trait for the `reduce_core` differential dataflow method. pub trait ReduceCore, K: ToOwned + ?Sized, V: Data, R: Semigroup> { /// Applies `reduce` to arranged data, and returns an arrangement of output data. diff --git a/differential-dataflow/tests/join.rs b/differential-dataflow/tests/join.rs index 94893b730..5e6b1f46c 100644 --- a/differential-dataflow/tests/join.rs +++ b/differential-dataflow/tests/join.rs @@ -1,7 +1,7 @@ use timely::dataflow::operators::{ToStream, Capture, Map}; use timely::dataflow::operators::capture::Extract; use differential_dataflow::AsCollection; -use differential_dataflow::operators::{Join, Count}; +use differential_dataflow::operators::Join; #[test] fn join() { diff --git a/differential-dataflow/tests/reduce.rs b/differential-dataflow/tests/reduce.rs index c5da11bdb..6841ff5e8 100644 --- a/differential-dataflow/tests/reduce.rs +++ b/differential-dataflow/tests/reduce.rs @@ -1,7 +1,6 @@ use timely::dataflow::operators::{ToStream, Capture, Map}; use timely::dataflow::operators::capture::Extract; use differential_dataflow::AsCollection; -use differential_dataflow::operators::Count; #[test] fn reduce() { From 07eb7fd0d32ac58bcfa635abc7be6841bee94b00 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Feb 2026 13:16:44 -0500 Subject: [PATCH 5/6] Move Vec::ReduceCore methods to inherent methods --- differential-dataflow/examples/monoid-bfs.rs | 2 - .../src/algorithms/graphs/propagate.rs | 1 - differential-dataflow/src/collection.rs | 58 ++++++++++- differential-dataflow/src/operators/reduce.rs | 98 +------------------ 4 files changed, 58 insertions(+), 101 deletions(-) diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 87127c2ed..786b9c4b9 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -131,10 +131,8 @@ where roots.scope().iterative::(|scope| { use differential_dataflow::operators::iterate::SemigroupVariable; - use differential_dataflow::operators::reduce::ReduceCore; use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder}; - use timely::order::Product; let variable = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 2c0671b62..24d4b06cf 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -80,7 +80,6 @@ where nodes.scope().iterative::(|scope| { - use crate::operators::reduce::ReduceCore; use crate::operators::iterate::SemigroupVariable; use crate::trace::implementations::{ValBuilder, ValSpine}; diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 95c3b5b40..7164176e7 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -701,7 +701,8 @@ pub mod vec { } } - + use crate::trace::{Trace, Builder}; + use crate::operators::arrange::{Arranged, TraceAgent}; impl Collection where @@ -752,8 +753,61 @@ pub mod vec { .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>(name, logic) .as_collection(|k,v| (k.clone(), v.clone())) } - } + /// Applies `reduce` to arranged data, and returns an arrangement of output data. + /// + /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although + /// it can be very useful if one needs to manually attach and re-use existing arranged collections. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// use differential_dataflow::trace::Trace; + /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; + /// + /// ::timely::example(|scope| { + /// + /// let trace = + /// scope.new_collection_from(1 .. 10u32).1 + /// .map(|x| (x, x)) + /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>( + /// "Example", + /// move |_key, src, dst| dst.push((*src[0].0, 1)) + /// ) + /// .trace; + /// }); + /// ``` + pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + where + T2: for<'a> Trace= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, + Bu: Builder, Output = T2::Batch>, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, + { + self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { + if !input.is_empty() { logic(key, input, change); } + change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) })); + crate::consolidation::consolidate(change); + }) + } + + /// Solves for output updates when presented with inputs and would-be outputs. + /// + /// Unlike `reduce_arranged`, this method may be called with an empty `input`, + /// and it may not be safe to index into the first element. + /// At least one of the two collections will be non-empty. + pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + where + V: Data, + T2: for<'a> Trace=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static, + Bu: Builder, Output = T2::Batch>, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, + { + use crate::operators::arrange::arrangement::ArrangeByKey; + self.arrange_by_key_named(&format!("Arrange: {}", name)) + .reduce_core::<_,Bu,_>(name, logic) + } + } impl Collection where diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index f8973f599..7423d6e02 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -6,9 +6,7 @@ //! The function is expected to populate a list of output values. use timely::container::PushInto; -use crate::hashable::Hashable; -use crate::{Data, ExchangeData, VecCollection}; -use crate::difference::{Semigroup, Abelian}; +use crate::Data; use timely::order::PartialOrder; use timely::progress::frontier::Antichain; @@ -18,105 +16,13 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; -use crate::operators::arrange::{Arranged, ArrangeByKey, TraceAgent}; -use crate::lattice::Lattice; +use crate::operators::arrange::{Arranged, TraceAgent}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; use crate::trace::implementations::merge_batcher::container::MergerChunk; use crate::trace::TraceReader; -/// Extension trait for the `reduce_core` differential dataflow method. -pub trait ReduceCore, K: ToOwned + ?Sized, V: Data, R: Semigroup> { - /// Applies `reduce` to arranged data, and returns an arrangement of output data. - /// - /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although - /// it can be very useful if one needs to manually attach and re-use existing arranged collections. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::reduce::ReduceCore; - /// use differential_dataflow::trace::Trace; - /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; - /// - /// ::timely::example(|scope| { - /// - /// let trace = - /// scope.new_collection_from(1 .. 10u32).1 - /// .map(|x| (x, x)) - /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>( - /// "Example", - /// move |_key, src, dst| dst.push((*src[0].0, 1)) - /// ) - /// .trace; - /// }); - /// ``` - fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> - where - T2: for<'a> Trace< - Key<'a>= &'a K, - KeyOwn = K, - ValOwn = V, - Time=G::Timestamp, - Diff: Abelian, - >+'static, - Bu: Builder, Output = T2::Batch>, - L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, - { - self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { - if !input.is_empty() { - logic(key, input, change); - } - change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) })); - crate::consolidation::consolidate(change); - }) - } - - /// Solves for output updates when presented with inputs and would-be outputs. - /// - /// Unlike `reduce_arranged`, this method may be called with an empty `input`, - /// and it may not be safe to index into the first element. - /// At least one of the two collections will be non-empty. - fn reduce_core(&self, name: &str, logic: L) -> Arranged> - where - T2: for<'a> Trace< - Key<'a>=&'a K, - KeyOwn = K, - ValOwn = V, - Time=G::Timestamp, - >+'static, - Bu: Builder, Output = T2::Batch>, - L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, - ; -} - -impl ReduceCore for VecCollection -where - G: Scope, - G::Timestamp: Lattice+Ord, - K: ExchangeData+Hashable, - V: ExchangeData, - R: ExchangeData+Semigroup, -{ - fn reduce_core(&self, name: &str, logic: L) -> Arranged> - where - V: Data, - T2: for<'a> Trace< - Key<'a>=&'a K, - KeyOwn = K, - ValOwn = V, - Time=G::Timestamp, - >+'static, - Bu: Builder, Output = T2::Batch>, - L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, - { - self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_core::<_,Bu,_>(name, logic) - } -} - /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. From 2b517dd0d43fb2aef87fb7c95d9b00fcd21d7bfd Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Feb 2026 13:24:39 -0500 Subject: [PATCH 6/6] Correct mdbook --- mdbook/src/chapter_2/chapter_2_4.md | 1 - mdbook/src/chapter_2/chapter_2_6.md | 1 - mdbook/src/chapter_2/chapter_2_7.md | 4 ++-- mdbook/src/chapter_5/chapter_5_4.md | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 5f002bb28..5e3f6a2ca 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -12,7 +12,6 @@ As an example, if we were to inspect # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; -# use differential_dataflow::operators::Reduce; # fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { diff --git a/mdbook/src/chapter_2/chapter_2_6.md b/mdbook/src/chapter_2/chapter_2_6.md index f966f5002..54f103eed 100644 --- a/mdbook/src/chapter_2/chapter_2_6.md +++ b/mdbook/src/chapter_2/chapter_2_6.md @@ -10,7 +10,6 @@ For example, to produce for each manager their managee with the lowest identifie # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; -# use differential_dataflow::operators::Reduce; # fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { diff --git a/mdbook/src/chapter_2/chapter_2_7.md b/mdbook/src/chapter_2/chapter_2_7.md index 0099cf820..4c1b2902a 100644 --- a/mdbook/src/chapter_2/chapter_2_7.md +++ b/mdbook/src/chapter_2/chapter_2_7.md @@ -9,7 +9,7 @@ As an example, we can take our `manages` relation and determine for all employee # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; -# use differential_dataflow::operators::{Join, Iterate, Threshold}; +# use differential_dataflow::operators::{Join, Iterate}; # use differential_dataflow::lattice::Lattice; # fn example(manages: &VecCollection) # where G::Timestamp: Lattice @@ -45,7 +45,7 @@ In the example above, we could rewrite # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; -# use differential_dataflow::operators::{Join, Threshold}; +# use differential_dataflow::operators::Join; # use differential_dataflow::operators::{Iterate, iterate::VecVariable}; # use differential_dataflow::lattice::Lattice; # fn example(manages: &VecCollection) diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index 2c3155a6a..15ca64228 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -23,7 +23,6 @@ extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::Join; -use differential_dataflow::operators::Threshold; use differential_dataflow::operators::Iterate; use differential_dataflow::operators::arrange::ArrangeByKey;