From c50a062e5cc83eff804884be49887df5add1613d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 6 Feb 2026 22:28:08 -0500 Subject: [PATCH] Delay/retain input capabilities for specific outputs This PR implements various ergonomic improvements around capabilities and integration with outputs. Specifically, it makes the output port explicit, where previously one had to remember the order in which outputs were constructed. Changes the API on input capabilities such that retain/delay always takes an output port. Adds a function to outputs to determine their port. Removes functions that would always use port 0 as it is confusing in multi-output operators. Retain does not move the input capability anymore. Signed-off-by: Moritz Hoffmann --- mdbook/src/chapter_2/chapter_2_4.md | 8 ++--- mdbook/src/chapter_2/chapter_2_5.md | 4 +-- mdbook/src/chapter_4/chapter_4_3.md | 2 +- timely/examples/bfs.rs | 4 +-- timely/examples/columnar.rs | 2 +- timely/examples/pagerank.rs | 4 +-- timely/examples/wordcount.rs | 2 +- .../src/dataflow/channels/pullers/counter.rs | 1 + .../src/dataflow/channels/pushers/progress.rs | 4 +++ .../operators/aggregation/aggregate.rs | 2 +- .../operators/aggregation/state_machine.rs | 2 +- timely/src/dataflow/operators/capability.rs | 36 +++++-------------- .../src/dataflow/operators/core/feedback.rs | 2 +- timely/src/dataflow/operators/core/reclock.rs | 2 +- timely/src/dataflow/operators/count.rs | 2 +- timely/src/dataflow/operators/delay.rs | 4 +-- .../src/dataflow/operators/generic/handles.rs | 4 +++ .../dataflow/operators/generic/notificator.rs | 10 +++--- .../dataflow/operators/generic/operator.rs | 10 +++--- 19 files changed, 47 insertions(+), 58 deletions(-) diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 6c5bf6eb9..a7bf1a32b 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -192,13 +192,13 @@ fn main() { stash.entry(time.time().clone()) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); input2.for_each_time(|time, data| { stash.entry(time.time().clone()) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); notificator.for_each(&[frontier1, frontier2], |time, notificator| { @@ -240,12 +240,12 @@ fn main() { move |(input1, frontier1), (input2, frontier2), output| { input1.for_each_time(|time, data| { - stash.entry(time.retain()) + stash.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); }); input2.for_each_time(|time, data| { - stash.entry(time.retain()) + stash.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); }); diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 529788951..a0100a2e7 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -210,7 +210,7 @@ As before, I'm just going to show you the new code, which now lives just after ` // for each input batch, stash it at `time`. input.for_each_time(|time, data| { - queues.entry(time.retain()) + queues.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.flat_map(|d| d.drain(..))); }); @@ -295,7 +295,7 @@ Inside the closure, we do two things: (i) read inputs and (ii) update counts and ```rust,ignore // for each input batch, stash it at `time`. while let Some((time, data)) = input.next() { - queues.entry(time.retain()) + queues.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(std::mem::take(data)); } diff --git a/mdbook/src/chapter_4/chapter_4_3.md b/mdbook/src/chapter_4/chapter_4_3.md index f50f5e527..f543385df 100644 --- a/mdbook/src/chapter_4/chapter_4_3.md +++ b/mdbook/src/chapter_4/chapter_4_3.md @@ -82,7 +82,7 @@ fn main() { // Stash received data. input1.for_each_time(|time, data| { - stash.entry(time.retain()) + stash.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.flat_map(|d| d.drain(..))); }); diff --git a/timely/examples/bfs.rs b/timely/examples/bfs.rs index 33376b285..7238e235f 100644 --- a/timely/examples/bfs.rs +++ b/timely/examples/bfs.rs @@ -54,7 +54,7 @@ fn main() { // receive edges, start to sort them input1.for_each_time(|time, data| { - notify.notify_at(time.retain()); + notify.notify_at(time.retain(output.output_index())); edge_list.extend(data.map(std::mem::take)); }); @@ -62,7 +62,7 @@ fn main() { input2.for_each_time(|time, data| { node_lists.entry(*time.time()) .or_insert_with(|| { - notify.notify_at(time.retain()); + notify.notify_at(time.retain(output.output_index())); Vec::new() }) .extend(data.map(std::mem::take)); diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index c677c28b9..122a3d95f 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -68,7 +68,7 @@ fn main() { move |(input, frontier), output| { input.for_each_time(|time, data| { queues - .entry(time.retain()) + .entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); diff --git a/timely/examples/pagerank.rs b/timely/examples/pagerank.rs index 17ff79fec..6d07e335d 100644 --- a/timely/examples/pagerank.rs +++ b/timely/examples/pagerank.rs @@ -45,13 +45,13 @@ fn main() { // hold on to edge changes until it is time. input1.for_each_time(|time, data| { - let entry = edge_stash.entry(time.retain()).or_default(); + let entry = edge_stash.entry(time.retain(output.output_index())).or_default(); data.for_each(|data| entry.append(data)); }); // hold on to rank changes until it is time. input2.for_each_time(|time, data| { - let entry = rank_stash.entry(time.retain()).or_default(); + let entry = rank_stash.entry(time.retain(output.output_index())).or_default(); data.for_each(|data| entry.append(data)); }); diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index af70744a4..9d6ad6018 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -31,7 +31,7 @@ fn main() { move |(input, frontier), output| { input.for_each_time(|time, data| { - queues.entry(time.retain()) + queues.entry(time.retain(output.output_index())) .or_insert(Vec::new()) .extend(data.map(std::mem::take)); }); diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 5903b2e4c..ab5e30e1a 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -23,6 +23,7 @@ pub struct ConsumedGuard { } impl ConsumedGuard { + #[inline] pub(crate) fn time(&self) -> &T { self.time.as_ref().unwrap() } diff --git a/timely/src/dataflow/channels/pushers/progress.rs b/timely/src/dataflow/channels/pushers/progress.rs index 71c7993bf..c4ff4d9cd 100644 --- a/timely/src/dataflow/channels/pushers/progress.rs +++ b/timely/src/dataflow/channels/pushers/progress.rs @@ -36,6 +36,10 @@ impl Progress { pub fn valid>(&self, capability: &CT) -> bool { capability.valid_for_output(&self.internal, self.port) } + + /// The index of the output port. + #[inline] + pub fn output_index(&self) -> usize { self.port } } impl Progress where T : Ord+Clone+'static { diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index f22a73110..aabe3d62b 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -85,7 +85,7 @@ impl, K: ExchangeData+Hash+Eq, V: Exchang let agg = agg_time.entry(key.clone()).or_insert_with(Default::default); fold(&key, val, agg); } - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); // pop completed aggregates, send along whatever diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/aggregation/state_machine.rs index bad423827..0cd2d62d1 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/aggregation/state_machine.rs @@ -89,7 +89,7 @@ impl StateMachine f // stash if not time yet if notificator.frontier(0).less_than(time.time()) { for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); } - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); } else { // else we can process immediately diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index f984a5339..251f84d50 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -267,20 +267,18 @@ impl InputCapability { } /// The timestamp associated with this capability. + #[inline] pub fn time(&self) -> &T { self.consumed_guard.time() } + /// Delays capability for a specific output port. + /// /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of /// the source capability (`self`). /// /// This method panics if `self.time` is not less or equal to `new_time`. - pub fn delayed(&self, new_time: &T) -> Capability { - self.delayed_for_output(new_time, 0) - } - - /// Delays capability for a specific output port. - pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { + pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; if let Some(path) = self.summaries.borrow().get(output_port) { if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { @@ -294,34 +292,16 @@ impl InputCapability { } } - /// Transform to an owned capability. + /// Transforms to an owned capability for a specific output port. /// /// This method produces an owned capability which must be dropped to release the /// capability. Users should take care that these capabilities are only stored for /// as long as they are required, as failing to drop them may result in livelock. /// - /// This method panics if the timestamp summary to output zero strictly advances the time. - pub fn retain(self) -> Capability { - self.retain_for_output(0) - } - - /// Transforms to an owned capability for a specific output port. - /// /// This method panics if the timestamp summary to `output_port` strictly advances the time. - pub fn retain_for_output(self, output_port: usize) -> Capability { - use crate::progress::timestamp::PathSummary; - let self_time = self.time().clone(); - if let Some(path) = self.summaries.borrow().get(output_port) { - if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { - Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port])) - } - else { - panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time); - } - } - else { - panic!("Attempted to retain a capability for a disconnected output"); - } + #[inline] + pub fn retain(&self, output_port: usize) -> Capability { + self.delayed(self.time(), output_port) } } diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index f5a03cedd..c05f5912e 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -117,7 +117,7 @@ impl ConnectLoop for StreamCore { let mut output = output.activate(); input.for_each(|cap, data| { if let Some(new_time) = summary.results_in(cap.time()) { - let new_cap = cap.delayed(&new_time); + let new_cap = cap.delayed(&new_time, output.output_index()); output.give(&new_cap, data); } }); diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 3e17a09e0..1ed563d1b 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -64,7 +64,7 @@ impl Reclock for StreamCore { // request notification at time, to flush stash. input2.for_each_time(|time, _data| { - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); // each time with complete stash can be flushed. diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index e99f1000c..1fed3c1e9 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -57,7 +57,7 @@ impl, D: Data> Accumulate for Strea for data in data { logic(accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data); } - notificator.notify_at(time.retain()); + notificator.notify_at(time.retain(output.output_index())); }); notificator.for_each(|time,_,_| { diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index df41d9680..aeeb26a20 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -103,7 +103,7 @@ impl, D: Data> Delay for Stream, D: Data> Delay for Stream { } impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuilderSession<'a, T, CB> { + /// The index of the output port wrapped by this session. + #[inline] + pub fn output_index(&self) -> usize { self.session.output_index()} + /// A container-building session associated with a capability. /// /// This method is the prefered way of sending records that must be accumulated into a container, diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index de9b0e3ef..4f64ebc8f 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -58,7 +58,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> { /// input.for_each_time(|cap, data| { /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; - /// notificator.notify_at(cap.delayed(&time)); + /// notificator.notify_at(cap.delayed(&time, output.output_index())); /// }); /// notificator.for_each(|cap, count, _| { /// println!("done with time: {:?}, requested {} times", cap.time(), count); @@ -194,11 +194,11 @@ fn notificator_delivers_notifications_in_topo_order() { /// move |(input1, frontier1), (input2, frontier2), output| { /// input1.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); -/// notificator.notify_at(time.retain()); +/// notificator.notify_at(time.retain(output.output_index())); /// }); /// input2.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); -/// notificator.notify_at(time.retain()); +/// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(&[frontier1, frontier2], |time, _| { /// if let Some(mut vec) = stash.remove(time.time()) { @@ -267,7 +267,7 @@ impl FrontierNotificator { /// input.for_each_time(|cap, data| { /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; - /// notificator.notify_at(cap.delayed(&time)); + /// notificator.notify_at(cap.delayed(&time, output.output_index())); /// }); /// notificator.for_each(&[frontier], |cap, _| { /// println!("done with time: {:?}", cap.time()); @@ -397,7 +397,7 @@ impl FrontierNotificator { /// input.for_each_time(|cap, data| { /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; - /// notificator.notify_at(cap.delayed(&time)); + /// notificator.notify_at(cap.delayed(&time, output.output_index())); /// assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1); /// }); /// notificator.for_each(&[frontier], |cap, _| { diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 97b075f35..027f17942 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -78,7 +78,7 @@ pub trait Operator { /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| { /// input.for_each_time(|time, data| { /// output.session(&time).give_containers(data); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(|time, _cnt, _not| { /// println!("notified at {:?}", time); @@ -148,11 +148,11 @@ pub trait Operator { /// move |(input1, frontier1), (input2, frontier2), output| { /// input1.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// input2.for_each_time(|time, data| { /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(&[frontier1, frontier2], |time, _not| { /// if let Some(mut vec) = stash.remove(time.time()) { @@ -205,11 +205,11 @@ pub trait Operator { /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { /// input1.for_each_time(|time, data| { /// output.session(&time).give_containers(data); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// input2.for_each_time(|time, data| { /// output.session(&time).give_containers(data); - /// notificator.notify_at(time.retain()); + /// notificator.notify_at(time.retain(output.output_index())); /// }); /// notificator.for_each(|time, _cnt, _not| { /// println!("notified at {:?}", time);