@@ -1854,11 +1854,60 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18541854 VariableContextHelpers::getTimeslice (variables);
18551855 forwardInputs (ref, slot, dropped, oldestOutputInfo, false , true );
18561856 };
1857+
1858+ auto onInsertion = [](ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) {
1859+ O2_LOG_ENABLE (forwarding);
1860+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
1861+
1862+ auto & spec = ref.get <DeviceSpec const >();
1863+ auto & context = ref.get <DataProcessorContext>();
1864+ if (!context.canForwardEarly || spec.forwards .empty ()) {
1865+ O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding not enabled / needed." );
1866+ return ;
1867+ }
1868+
1869+ O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding before injecting data into relayer." );
1870+ auto & timesliceIndex = ref.get <TimesliceIndex>();
1871+ auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput ();
1872+
1873+ auto & proxy = ref.get <FairMQDeviceProxy>();
1874+
1875+ O2_SIGNPOST_START (forwarding, sid, " forwardInputs" ,
1876+ " Starting forwarding for incoming messages with oldestTimeslice %zu with copy" ,
1877+ oldestTimeslice.timeslice .value );
1878+ std::vector<fair::mq::Parts> forwardedParts;
1879+ forwardedParts.resize (proxy.getNumForwards ());
1880+ DataProcessingHelpers::routeForwardedMessages (proxy, messages, forwardedParts, true , false );
1881+
1882+ for (int fi = 0 ; fi < proxy.getNumForwardChannels (); fi++) {
1883+ if (forwardedParts[fi].Size () == 0 ) {
1884+ continue ;
1885+ }
1886+ ForwardChannelInfo info = proxy.getForwardChannelInfo (ChannelIndex{fi});
1887+ auto & parts = forwardedParts[fi];
1888+ if (info.policy == nullptr ) {
1889+ O2_SIGNPOST_EVENT_EMIT_ERROR (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d has no policy." , info.name .c_str (), fi);
1890+ continue ;
1891+ }
1892+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d" , info.name .c_str (), fi);
1893+ info.policy ->forward (parts, ChannelIndex{fi}, ref);
1894+ }
1895+ auto & asyncQueue = ref.get <AsyncQueue>();
1896+ auto & decongestion = ref.get <DecongestionService>();
1897+ O2_SIGNPOST_ID_GENERATE (aid, async_queue);
1898+ O2_SIGNPOST_EVENT_EMIT (async_queue, aid, " forwardInputs" , " Queuing forwarding oldestPossible %zu" , oldestTimeslice.timeslice .value );
1899+ AsyncQueueHelpers::post (asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice , .id = decongestion.oldestPossibleTimesliceTask , .debounce = -1 , .callback = decongestionCallbackLate}
1900+ .user <DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
1901+ O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
1902+ O2_LOG_DISABLE (forwarding);
1903+ };
1904+
18571905 auto relayed = relayer.relay (parts.At (headerIndex)->GetData (),
18581906 &parts.At (headerIndex),
18591907 input,
18601908 nMessages,
18611909 nPayloadsPerHeader,
1910+ onInsertion,
18621911 onDrop);
18631912 switch (relayed.type ) {
18641913 case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2273,9 +2322,13 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22732322 bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
22742323
22752324 if (context.canForwardEarly && hasForwards && consumeSomething) {
2276- O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " Early forwainding: %{public}s." , fmt::format (" {}" , action.op ).c_str ());
2277- auto & timesliceIndex = ref.get <TimesliceIndex>();
2278- forwardInputs (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
2325+ // We used to do fowarding here, however we now do it much earlier.
2326+ // We still need to clean the inputs which were already consumed
2327+ // via ConsumeExisting and which still have an header to hold the slot.
2328+ // FIXME: do we? This should really happen when we do the forwarding on
2329+ // insertion, because otherwise we lose the relevant information on how to
2330+ // navigate the set of headers. We could actually rely on the messageset index,
2331+ // is that the right thing to do though?
22792332 }
22802333 markInputsAsDone (action.slot );
22812334
0 commit comments