Skip to content

Commit 8f9029b

Browse files
committed
DPL: avoid needless copy of messages when cleaning up early forwarding
1 parent ed8276c commit 8f9029b

File tree

3 files changed

+61
-2
lines changed

3 files changed

+61
-2
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ struct DataProcessingHelpers {
5959
/// Helper to route messages for forwarding
6060
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,
6161
bool copy, bool consume);
62+
63+
static void cleanForwardedMessages(std::span<fair::mq::MessagePtr>& currentSetOfInputs, bool consume);
6264
};
6365
} // namespace o2::framework
6466
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -626,9 +626,12 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
626626
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
627627
// Always copy them, because we do not want to actually send them.
628628
// We merely need the side effect of the consume, if applicable.
629-
auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, true, consume);
629+
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
630+
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
631+
DataProcessingHelpers::cleanForwardedMessages(span, consume);
632+
}
630633

631-
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
634+
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done");
632635
};
633636

634637
extern volatile int region_read_global_dummy_variable;

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,60 @@ void DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, std
338338
}
339339
}
340340

341+
void DataProcessingHelpers::cleanForwardedMessages(std::span<fair::mq::MessagePtr>& messages, bool consume)
342+
{
343+
size_t pi = 0;
344+
while (pi < messages.size()) {
345+
auto& header = messages[pi];
346+
347+
// If is now possible that the record is not complete when
348+
// we forward it, because of a custom completion policy.
349+
// this means that we need to skip the empty entries in the
350+
// record for being forwarded.
351+
if (header->GetData() == nullptr ||
352+
o2::header::get<DomainInfoHeader*>(header->GetData()) ||
353+
o2::header::get<SourceInfoHeader*>(header->GetData())) {
354+
pi += 2;
355+
continue;
356+
}
357+
358+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
359+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
360+
361+
if (dph == nullptr || dh == nullptr) {
362+
// Complain only if this is not an out-of-band message
363+
LOGP(error, "Data is missing {}{}{}",
364+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
365+
pi += 2;
366+
continue;
367+
}
368+
369+
// At least one payload.
370+
auto& payload = messages[pi + 1];
371+
// Calculate the number of messages which should be handled together
372+
// all in one go.
373+
size_t numberOfMessages = 0;
374+
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
375+
// Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
376+
numberOfMessages = dh->splitPayloadParts + 1; // one is for the header
377+
} else {
378+
// Sequence of splitPayloadParts (header, payload) pairs belonging together.
379+
// In case splitPayloadParts = 0, we consider this as a single message pair
380+
numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
381+
}
382+
383+
if (payload.get() == nullptr && consume == true) {
384+
// If the payload is not there, it means we already
385+
// processed it with ConsumeExisiting. Therefore we
386+
// need to do something only if this is the last consume.
387+
header.reset(nullptr);
388+
}
389+
390+
// Nothing to forward go to the next messageset
391+
pi += numberOfMessages;
392+
}
393+
}
394+
341395
auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
342396
std::vector<MessageSet>& currentSetOfInputs,
343397
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>

0 commit comments

Comments
 (0)