diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 6c5bf6eb9..a7bf1a32b 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -192,13 +192,13 @@ fn main() { stash.entry(time.time().clone()) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); input2.for_each_time(|time, data| { stash.entry(time.time().clone()) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); notificator.for_each(&[frontier1, frontier2], |time, notificator| { @@ -240,12 +240,12 @@ fn main() { move |(input1, frontier1), (input2, frontier2), output| { input1.for_each_time(|time, data| { - stash.entry(time.retain()) + stash.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); }); input2.for_each_time(|time, data| { - stash.entry(time.retain()) + stash.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); }); diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 529788951..a0100a2e7 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -210,7 +210,7 @@ As before, I'm just going to show you the new code, which now lives just after ` // for each input batch, stash it at `time`. input.for_each_time(|time, data| { - queues.entry(time.retain()) + queues.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.flat_map(|d| d.drain(..))); }); @@ -295,7 +295,7 @@ Inside the closure, we do two things: (i) read inputs and (ii) update counts and ```rust,ignore // for each input batch, stash it at `time`. while let Some((time, data)) = input.next() { - queues.entry(time.retain()) + queues.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(std::mem::take(data)); } diff --git a/mdbook/src/chapter_4/chapter_4_3.md b/mdbook/src/chapter_4/chapter_4_3.md index f50f5e527..f543385df 100644 --- a/mdbook/src/chapter_4/chapter_4_3.md +++ b/mdbook/src/chapter_4/chapter_4_3.md @@ -82,7 +82,7 @@ fn main() { // Stash received data. input1.for_each_time(|time, data| { - stash.entry(time.retain()) + stash.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.flat_map(|d| d.drain(..))); }); diff --git a/timely/examples/bfs.rs b/timely/examples/bfs.rs index 33376b285..7238e235f 100644 --- a/timely/examples/bfs.rs +++ b/timely/examples/bfs.rs @@ -54,7 +54,7 @@ fn main() { // receive edges, start to sort them input1.for_each_time(|time, data| { - notify.notify_at(time.retain()); + notify.notify_at(time.retain(output.output_index())); edge_list.extend(data.map(std::mem::take)); }); @@ -62,7 +62,7 @@ fn main() { input2.for_each_time(|time, data| { node_lists.entry(*time.time()) .or_insert_with(|| { - notify.notify_at(time.retain()); + notify.notify_at(time.retain(output.output_index())); Vec::new() }) .extend(data.map(std::mem::take)); diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index c677c28b9..122a3d95f 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -68,7 +68,7 @@ fn main() { move |(input, frontier), output| { input.for_each_time(|time, data| { queues - .entry(time.retain()) + .entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); diff --git a/timely/examples/pagerank.rs b/timely/examples/pagerank.rs index 17ff79fec..6d07e335d 100644 --- a/timely/examples/pagerank.rs +++ b/timely/examples/pagerank.rs @@ -45,13 +45,13 @@ fn main() { // hold on to edge changes until it is time. input1.for_each_time(|time, data| { - let entry = edge_stash.entry(time.retain()).or_default(); + let entry = edge_stash.entry(time.retain(output.output_index())).or_default(); data.for_each(|data| entry.append(data)); }); // hold on to rank changes until it is time. input2.for_each_time(|time, data| { - let entry = rank_stash.entry(time.retain()).or_default(); + let entry = rank_stash.entry(time.retain(output.output_index())).or_default(); data.for_each(|data| entry.append(data)); }); diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index af70744a4..9d6ad6018 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -31,7 +31,7 @@ fn main() { move |(input, frontier), output| { input.for_each_time(|time, data| { - queues.entry(time.retain()) + queues.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); }); diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 5903b2e4c..ab5e30e1a 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -23,6 +23,7 @@ pub struct ConsumedGuard { } impl ConsumedGuard { + #[inline] pub(crate) fn time(&self) -> &T { self.time.as_ref().unwrap() } diff --git a/timely/src/dataflow/channels/pushers/progress.rs b/timely/src/dataflow/channels/pushers/progress.rs index 71c7993bf..c4ff4d9cd 100644 --- a/timely/src/dataflow/channels/pushers/progress.rs +++ b/timely/src/dataflow/channels/pushers/progress.rs @@ -36,6 +36,10 @@ impl Progress { pub fn valid>(&self, capability: &CT) -> bool { capability.valid_for_output(&self.internal, self.port) } + + /// The index of the output port. + #[inline] + pub fn output_index(&self) -> usize { self.port } } impl Progress where T : Ord+Clone+'static { diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index f22a73110..aabe3d62b 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -85,7 +85,7 @@ impl, K: ExchangeData+Hash+Eq, V: Exchang let agg = agg_time.entry(key.clone()).or_insert_with(Default::default); fold(&key, val, agg); } - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); // pop completed aggregates, send along whatever diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/aggregation/state_machine.rs index bad423827..0cd2d62d1 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/aggregation/state_machine.rs @@ -89,7 +89,7 @@ impl StateMachine f // stash if not time yet if notificator.frontier(0).less_than(time.time()) { for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); } - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); } else { // else we can process immediately diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index f984a5339..251f84d50 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -267,20 +267,18 @@ impl InputCapability { } /// The timestamp associated with this capability. + #[inline] pub fn time(&self) -> &T { self.consumed_guard.time() } + /// Delays capability for a specific output port. + /// /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of /// the source capability (`self`). /// /// This method panics if `self.time` is not less or equal to `new_time`. - pub fn delayed(&self, new_time: &T) -> Capability { - self.delayed_for_output(new_time, 0) - } - - /// Delays capability for a specific output port. - pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { + pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; if let Some(path) = self.summaries.borrow().get(output_port) { if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { @@ -294,34 +292,16 @@ impl InputCapability { } } - /// Transform to an owned capability. + /// Transforms to an owned capability for a specific output port. /// /// This method produces an owned capability which must be dropped to release the /// capability. Users should take care that these capabilities are only stored for /// as long as they are required, as failing to drop them may result in livelock. /// - /// This method panics if the timestamp summary to output zero strictly advances the time. - pub fn retain(self) -> Capability { - self.retain_for_output(0) - } - - /// Transforms to an owned capability for a specific output port. - /// /// This method panics if the timestamp summary to `output_port` strictly advances the time. - pub fn retain_for_output(self, output_port: usize) -> Capability { - use crate::progress::timestamp::PathSummary; - let self_time = self.time().clone(); - if let Some(path) = self.summaries.borrow().get(output_port) { - if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { - Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port])) - } - else { - panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time); - } - } - else { - panic!("Attempted to retain a capability for a disconnected output"); - } + #[inline] + pub fn retain(&self, output_port: usize) -> Capability { + self.delayed(self.time(), output_port) } } diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index f5a03cedd..c05f5912e 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -117,7 +117,7 @@ impl ConnectLoop for StreamCore { let mut output = output.activate(); input.for_each(|cap, data| { if let Some(new_time) = summary.results_in(cap.time()) { - let new_cap = cap.delayed(&new_time); + let new_cap = cap.delayed(&new_time, output.output_index()); output.give(&new_cap, data); } }); diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 3e17a09e0..1ed563d1b 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -64,7 +64,7 @@ impl Reclock for StreamCore { // request notification at time, to flush stash. input2.for_each_time(|time, _data| { - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); // each time with complete stash can be flushed. diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index e99f1000c..1fed3c1e9 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -57,7 +57,7 @@ impl, D: Data> Accumulate for Strea for data in data { logic(accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data); } - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); notificator.for_each(|time,_,_| { diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index df41d9680..aeeb26a20 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -103,7 +103,7 @@ impl, D: Data> Delay for Stream, D: Data> Delay for Stream { } impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuilderSession<'a, T, CB> { + /// The index of the output port wrapped by this session. + #[inline] + pub fn output_index(&self) -> usize { self.session.output_index()} + /// A container-building session associated with a capability. /// /// This method is the prefered way of sending records that must be accumulated into a container, diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index de9b0e3ef..4f64ebc8f 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -58,7 +58,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> { /// input.for_each_time(|cap, data| { /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; - /// notificator.notify_at(cap.delayed(&time)); + /// notificator.notify_at(cap.delayed(&time, output.output_index())); /// }); /// notificator.for_each(|cap, count, _| { /// println!("done with time: {:?}, requested {} times", cap.time(), count); @@ -194,11 +194,11 @@ fn notificator_delivers_notifications_in_topo_order() { /// move |(input1, frontier1), (input2, frontier2), output| { /// input1.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); -/// notificator.notify_at(time.retain()); +/// notificator.notify_at(time.retain(output.output_index())); /// }); /// input2.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); -/// notificator.notify_at(time.retain()); +/// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(&[frontier1, frontier2], |time, _| { /// if let Some(mut vec) = stash.remove(time.time()) { @@ -267,7 +267,7 @@ impl FrontierNotificator { /// input.for_each_time(|cap, data| { /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; - /// notificator.notify_at(cap.delayed(&time)); + /// notificator.notify_at(cap.delayed(&time, output.output_index())); /// }); /// notificator.for_each(&[frontier], |cap, _| { /// println!("done with time: {:?}", cap.time()); @@ -397,7 +397,7 @@ impl FrontierNotificator { /// input.for_each_time(|cap, data| { /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; - /// notificator.notify_at(cap.delayed(&time)); + /// notificator.notify_at(cap.delayed(&time, output.output_index())); /// assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1); /// }); /// notificator.for_each(&[frontier], |cap, _| { diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 97b075f35..027f17942 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -78,7 +78,7 @@ pub trait Operator { /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| { /// input.for_each_time(|time, data| { /// output.session(&time).give_containers(data); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(|time, _cnt, _not| { /// println!("notified at {:?}", time); @@ -148,11 +148,11 @@ pub trait Operator { /// move |(input1, frontier1), (input2, frontier2), output| { /// input1.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// input2.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(&[frontier1, frontier2], |time, _not| { /// if let Some(mut vec) = stash.remove(time.time()) { @@ -205,11 +205,11 @@ pub trait Operator { /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { /// input1.for_each_time(|time, data| { /// output.session(&time).give_containers(data); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// input2.for_each_time(|time, data| { /// output.session(&time).give_containers(data); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(|time, _cnt, _not| { /// println!("notified at {:?}", time);