Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DataProcessingHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ struct DataProcessingHelpers {
/// Helper to route messages for forwarding
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,
bool copy, bool consume);

static void cleanForwardedMessages(std::span<fair::mq::MessagePtr>& currentSetOfInputs, bool consume);
};
} // namespace o2::framework
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_
7 changes: 5 additions & 2 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,12 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
// Always copy them, because we do not want to actually send them.
// We merely need the side effect of the consume, if applicable.
auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, true, consume);
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
DataProcessingHelpers::cleanForwardedMessages(span, consume);
}

O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done");
};

extern volatile int region_read_global_dummy_variable;
Expand Down
54 changes: 54 additions & 0 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,60 @@ void DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, std
}
}

void DataProcessingHelpers::cleanForwardedMessages(std::span<fair::mq::MessagePtr>& messages, bool consume)
{
size_t pi = 0;
while (pi < messages.size()) {
auto& header = messages[pi];

// If is now possible that the record is not complete when
// we forward it, because of a custom completion policy.
// this means that we need to skip the empty entries in the
// record for being forwarded.
if (header->GetData() == nullptr ||
o2::header::get<DomainInfoHeader*>(header->GetData()) ||
o2::header::get<SourceInfoHeader*>(header->GetData())) {
pi += 2;
continue;
}

auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());

if (dph == nullptr || dh == nullptr) {
// Complain only if this is not an out-of-band message
LOGP(error, "Data is missing {}{}{}",
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
pi += 2;
continue;
}

// At least one payload.
auto& payload = messages[pi + 1];
// Calculate the number of messages which should be handled together
// all in one go.
size_t numberOfMessages = 0;
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
// Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
numberOfMessages = dh->splitPayloadParts + 1; // one is for the header
} else {
// Sequence of splitPayloadParts (header, payload) pairs belonging together.
// In case splitPayloadParts = 0, we consider this as a single message pair
numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
}

if (payload.get() == nullptr && consume == true) {
// If the payload is not there, it means we already
// processed it with ConsumeExisiting. Therefore we
// need to do something only if this is the last consume.
header.reset(nullptr);
}

// Nothing to forward go to the next messageset
pi += numberOfMessages;
}
}

auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
std::vector<MessageSet>& currentSetOfInputs,
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
Expand Down