Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,13 @@ fn main() {
stash.entry(time.time().clone())
.or_insert(Vec::new())
.extend(data.map(std::mem::take));
notificator.notify_at(time.retain());
notificator.notify_at(time.retain(output.output_index()));
});
input2.for_each_time(|time, data| {
stash.entry(time.time().clone())
.or_insert(Vec::new())
.extend(data.map(std::mem::take));
notificator.notify_at(time.retain());
notificator.notify_at(time.retain(output.output_index()));
});

notificator.for_each(&[frontier1, frontier2], |time, notificator| {
Expand Down Expand Up @@ -240,12 +240,12 @@ fn main() {
move |(input1, frontier1), (input2, frontier2), output| {

input1.for_each_time(|time, data| {
stash.entry(time.retain())
stash.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(data.map(std::mem::take));
});
input2.for_each_time(|time, data| {
stash.entry(time.retain())
stash.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(data.map(std::mem::take));
});
Expand Down
4 changes: 2 additions & 2 deletions mdbook/src/chapter_2/chapter_2_5.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ As before, I'm just going to show you the new code, which now lives just after `

// for each input batch, stash it at `time`.
input.for_each_time(|time, data| {
queues.entry(time.retain())
queues.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(data.flat_map(|d| d.drain(..)));
});
Expand Down Expand Up @@ -295,7 +295,7 @@ Inside the closure, we do two things: (i) read inputs and (ii) update counts and
```rust,ignore
// for each input batch, stash it at `time`.
while let Some((time, data)) = input.next() {
queues.entry(time.retain())
queues.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(std::mem::take(data));
}
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_4/chapter_4_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn main() {

// Stash received data.
input1.for_each_time(|time, data| {
stash.entry(time.retain())
stash.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(data.flat_map(|d| d.drain(..)));
});
Expand Down
4 changes: 2 additions & 2 deletions timely/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ fn main() {

// receive edges, start to sort them
input1.for_each_time(|time, data| {
notify.notify_at(time.retain());
notify.notify_at(time.retain(output.output_index()));
edge_list.extend(data.map(std::mem::take));
});

// receive (node, worker) pairs, note any new ones.
input2.for_each_time(|time, data| {
node_lists.entry(*time.time())
.or_insert_with(|| {
notify.notify_at(time.retain());
notify.notify_at(time.retain(output.output_index()));
Vec::new()
})
.extend(data.map(std::mem::take));
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn main() {
move |(input, frontier), output| {
input.for_each_time(|time, data| {
queues
.entry(time.retain())
.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(data.map(std::mem::take));

Expand Down
4 changes: 2 additions & 2 deletions timely/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ fn main() {

// hold on to edge changes until it is time.
input1.for_each_time(|time, data| {
let entry = edge_stash.entry(time.retain()).or_default();
let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
data.for_each(|data| entry.append(data));
});

// hold on to rank changes until it is time.
input2.for_each_time(|time, data| {
let entry = rank_stash.entry(time.retain()).or_default();
let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
data.for_each(|data| entry.append(data));
});

Expand Down
2 changes: 1 addition & 1 deletion timely/examples/wordcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn main() {

move |(input, frontier), output| {
input.for_each_time(|time, data| {
queues.entry(time.retain())
queues.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(data.map(std::mem::take));
});
Expand Down
1 change: 1 addition & 0 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ConsumedGuard<T: Ord + Clone + 'static> {
}

impl<T:Ord+Clone+'static> ConsumedGuard<T> {
#[inline]
pub(crate) fn time(&self) -> &T {
self.time.as_ref().unwrap()
}
Expand Down
4 changes: 4 additions & 0 deletions timely/src/dataflow/channels/pushers/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ impl<T: Timestamp, P> Progress<T, P> {
pub fn valid<CT: CapabilityTrait<T>>(&self, capability: &CT) -> bool {
capability.valid_for_output(&self.internal, self.port)
}

/// The index of the output port.
#[inline]
pub fn output_index(&self) -> usize { self.port }
}

impl<T, P> Progress<T, P> where T : Ord+Clone+'static {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/aggregation/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<S: Scope<Timestamp: ::std::hash::Hash>, K: ExchangeData+Hash+Eq, V: Exchang
let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, agg);
}
notificator.notify_at(time.retain());
notificator.notify_at(time.retain(output.output_index()));
});

// pop completed aggregates, send along whatever
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/aggregation/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
// stash if not time yet
if notificator.frontier(0).less_than(time.time()) {
for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); }
notificator.notify_at(time.retain());
notificator.notify_at(time.retain(output.output_index()));
}
else {
// else we can process immediately
Expand Down
36 changes: 8 additions & 28 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,18 @@ impl<T: Timestamp> InputCapability<T> {
}

/// The timestamp associated with this capability.
#[inline]
pub fn time(&self) -> &T {
self.consumed_guard.time()
}

/// Delays capability for a specific output port.
///
/// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
/// the source capability (`self`).
///
/// This method panics if `self.time` is not less or equal to `new_time`.
pub fn delayed(&self, new_time: &T) -> Capability<T> {
self.delayed_for_output(new_time, 0)
}

/// Delays capability for a specific output port.
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Expand All @@ -294,34 +292,16 @@ impl<T: Timestamp> InputCapability<T> {
}
}

/// Transform to an owned capability.
/// Transforms to an owned capability for a specific output port.
///
/// This method produces an owned capability which must be dropped to release the
/// capability. Users should take care that these capabilities are only stored for
/// as long as they are required, as failing to drop them may result in livelock.
///
/// This method panics if the timestamp summary to output zero strictly advances the time.
pub fn retain(self) -> Capability<T> {
self.retain_for_output(0)
}

/// Transforms to an owned capability for a specific output port.
///
/// This method panics if the timestamp summary to `output_port` strictly advances the time.
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
let self_time = self.time().clone();
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
}
else {
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
}
}
else {
panic!("Attempted to retain a capability for a disconnected output");
}
#[inline]
pub fn retain(&self, output_port: usize) -> Capability<T> {
self.delayed(self.time(), output_port)
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C> {
let mut output = output.activate();
input.for_each(|cap, data| {
if let Some(new_time) = summary.results_in(cap.time()) {
let new_cap = cap.delayed(&new_time);
let new_cap = cap.delayed(&new_time, output.output_index());
output.give(&new_cap, data);
}
});
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/reclock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<S: Scope, C: Container> Reclock<S> for StreamCore<S, C> {

// request notification at time, to flush stash.
input2.for_each_time(|time, _data| {
notificator.notify_at(time.retain());
notificator.notify_at(time.retain(output.output_index()));
});

// each time with complete stash can be flushed.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<G: Scope<Timestamp: ::std::hash::Hash>, D: Data> Accumulate<G, D> for Strea
for data in data {
logic(accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data);
}
notificator.notify_at(time.retain());
notificator.notify_at(time.retain(output.output_index()));
});

notificator.for_each(|time,_,_| {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<G: Scope<Timestamp: ::std::hash::Hash>, D: Data> Delay<G, D> for Stream<G,
let new_time = func(&datum, &time);
assert!(time.time().less_equal(&new_time));
elements.entry(new_time.clone())
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
.push(datum);
}
});
Expand All @@ -130,7 +130,7 @@ impl<G: Scope<Timestamp: ::std::hash::Hash>, D: Data> Delay<G, D> for Stream<G,
let new_time = func(&time);
assert!(time.time().less_equal(&new_time));
elements.entry(new_time.clone())
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
.extend(data.map(std::mem::take));
});

Expand Down
4 changes: 4 additions & 0 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ pub struct OutputBuilderSession<'a, T: Timestamp, CB: ContainerBuilder> {
}

impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuilderSession<'a, T, CB> {
/// The index of the output port wrapped by this session.
#[inline]
pub fn output_index(&self) -> usize { self.session.output_index()}

/// A container-building session associated with a capability.
///
/// This method is the prefered way of sending records that must be accumulated into a container,
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
/// input.for_each_time(|cap, data| {
/// output.session(&cap).give_containers(data);
/// let time = cap.time().clone() + 1;
/// notificator.notify_at(cap.delayed(&time));
/// notificator.notify_at(cap.delayed(&time, output.output_index()));
/// });
/// notificator.for_each(|cap, count, _| {
/// println!("done with time: {:?}, requested {} times", cap.time(), count);
Expand Down Expand Up @@ -194,11 +194,11 @@ fn notificator_delivers_notifications_in_topo_order() {
/// move |(input1, frontier1), (input2, frontier2), output| {
/// input1.for_each_time(|time, data| {
/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
/// notificator.notify_at(time.retain());
/// notificator.notify_at(time.retain(output.output_index()));
/// });
/// input2.for_each_time(|time, data| {
/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
/// notificator.notify_at(time.retain());
/// notificator.notify_at(time.retain(output.output_index()));
/// });
/// notificator.for_each(&[frontier1, frontier2], |time, _| {
/// if let Some(mut vec) = stash.remove(time.time()) {
Expand Down Expand Up @@ -267,7 +267,7 @@ impl<T: Timestamp> FrontierNotificator<T> {
/// input.for_each_time(|cap, data| {
/// output.session(&cap).give_containers(data);
/// let time = cap.time().clone() + 1;
/// notificator.notify_at(cap.delayed(&time));
/// notificator.notify_at(cap.delayed(&time, output.output_index()));
/// });
/// notificator.for_each(&[frontier], |cap, _| {
/// println!("done with time: {:?}", cap.time());
Expand Down Expand Up @@ -397,7 +397,7 @@ impl<T: Timestamp> FrontierNotificator<T> {
/// input.for_each_time(|cap, data| {
/// output.session(&cap).give_containers(data);
/// let time = cap.time().clone() + 1;
/// notificator.notify_at(cap.delayed(&time));
/// notificator.notify_at(cap.delayed(&time, output.output_index()));
/// assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
/// });
/// notificator.for_each(&[frontier], |cap, _| {
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub trait Operator<G: Scope, C1> {
/// .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
/// input.for_each_time(|time, data| {
/// output.session(&time).give_containers(data);
/// notificator.notify_at(time.retain());
/// notificator.notify_at(time.retain(output.output_index()));
/// });
/// notificator.for_each(|time, _cnt, _not| {
/// println!("notified at {:?}", time);
Expand Down Expand Up @@ -148,11 +148,11 @@ pub trait Operator<G: Scope, C1> {
/// move |(input1, frontier1), (input2, frontier2), output| {
/// input1.for_each_time(|time, data| {
/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
/// notificator.notify_at(time.retain());
/// notificator.notify_at(time.retain(output.output_index()));
/// });
/// input2.for_each_time(|time, data| {
/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
/// notificator.notify_at(time.retain());
/// notificator.notify_at(time.retain(output.output_index()));
/// });
/// notificator.for_each(&[frontier1, frontier2], |time, _not| {
/// if let Some(mut vec) = stash.remove(time.time()) {
Expand Down Expand Up @@ -205,11 +205,11 @@ pub trait Operator<G: Scope, C1> {
/// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
/// input1.for_each_time(|time, data| {
/// output.session(&time).give_containers(data);
/// notificator.notify_at(time.retain());
/// notificator.notify_at(time.retain(output.output_index()));
/// });
/// input2.for_each_time(|time, data| {
/// output.session(&time).give_containers(data);
/// notificator.notify_at(time.retain());
/// notificator.notify_at(time.retain(output.output_index()));
/// });
/// notificator.for_each(|time, _cnt, _not| {
/// println!("notified at {:?}", time);
Expand Down
Loading