@@ -1050,36 +1050,51 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
10501050 };
10511051 }
10521052
1053- auto decideEarlyForward = [&context, &deviceContext, &spec, this ]() -> bool {
1053+ auto decideEarlyForward = [&context, &deviceContext, &spec, this ]() -> ForwardPolicy {
1054+ ForwardPolicy defaultEarlyForwardPolicy = getenv (" DPL_OLD_EARLY_FORWARD" ) ? ForwardPolicy::AtCompletionPolicySatisified : ForwardPolicy::AtInjection;
1055+
10541056 // / We must make sure there is no optional
10551057 // / if we want to optimize the forwarding
1056- bool canForwardEarly = (spec.forwards .empty () == false ) && deviceContext.processingPolicies .earlyForward != EarlyForwardPolicy::NEVER;
1058+ ForwardPolicy forwardPolicy = defaultEarlyForwardPolicy;
1059+ if (spec.forwards .empty () == false ) {
1060+ switch (deviceContext.processingPolicies .earlyForward ) {
1061+ case o2::framework::EarlyForwardPolicy::NEVER:
1062+ forwardPolicy = ForwardPolicy::AfterProcessing;
1063+ break ;
1064+ case o2::framework::EarlyForwardPolicy::ALWAYS:
1065+ forwardPolicy = defaultEarlyForwardPolicy;
1066+ break ;
1067+ case o2::framework::EarlyForwardPolicy::NORAW:
1068+ forwardPolicy = defaultEarlyForwardPolicy;
1069+ break ;
1070+ }
1071+ }
10571072 bool onlyConditions = true ;
10581073 bool overriddenEarlyForward = false ;
10591074 for (auto & forwarded : spec.forwards ) {
10601075 if (forwarded.matcher .lifetime != Lifetime::Condition) {
10611076 onlyConditions = false ;
10621077 }
10631078 if (DataSpecUtils::partialMatch (forwarded.matcher , o2::header::DataDescription{" RAWDATA" }) && deviceContext.processingPolicies .earlyForward == EarlyForwardPolicy::NORAW) {
1064- context. canForwardEarly = false ;
1079+ forwardPolicy = ForwardPolicy::AfterProcessing ;
10651080 overriddenEarlyForward = true ;
10661081 LOG (detail) << " Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe (forwarded.matcher );
10671082 break ;
10681083 }
10691084 if (forwarded.matcher .lifetime == Lifetime::Optional) {
1070- context. canForwardEarly = false ;
1085+ forwardPolicy = ForwardPolicy::AfterProcessing ;
10711086 overriddenEarlyForward = true ;
10721087 LOG (detail) << " Cannot forward early because of Optional input: " << DataSpecUtils::describe (forwarded.matcher );
10731088 break ;
10741089 }
10751090 }
10761091 if (!overriddenEarlyForward && onlyConditions) {
1077- context. canForwardEarly = true ;
1092+ forwardPolicy = defaultEarlyForwardPolicy ;
10781093 LOG (detail) << " Enabling early forwarding because only conditions to be forwarded" ;
10791094 }
1080- return canForwardEarly ;
1095+ return forwardPolicy ;
10811096 };
1082- context.canForwardEarly = decideEarlyForward ();
1097+ context.forwardPolicy = decideEarlyForward ();
10831098}
10841099
10851100void DataProcessingDevice::PreRun ()
@@ -1700,7 +1715,7 @@ auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>
17001715
17011716 auto & spec = ref.get <DeviceSpec const >();
17021717 auto & context = ref.get <DataProcessorContext>();
1703- if (! context.canForwardEarly || spec.forwards .empty ()) {
1718+ if (context.forwardPolicy == ForwardPolicy::AfterProcessing || spec.forwards .empty ()) {
17041719 O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding not enabled / needed." );
17051720 return ;
17061721 }
@@ -1858,7 +1873,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18581873 stats.updateStats ({(int )ProcessingStatsId::ERROR_COUNT, DataProcessingStats::Op::Add, 1 });
18591874 };
18601875
1861- auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const & inputInfos) {
1876+ auto handleValidMessages = [&info, ref, &reportError, &context ](std::vector<InputInfo> const & inputInfos) {
18621877 auto & relayer = ref.get <DataRelayer>();
18631878 auto & state = ref.get <DeviceState>();
18641879 static WaitBackpressurePolicy policy;
@@ -1919,7 +1934,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
19191934 input,
19201935 nMessages,
19211936 nPayloadsPerHeader,
1922- nullptr ,
1937+ context. forwardPolicy == ForwardPolicy::AtInjection ? forwardOnInsertion : nullptr ,
19231938 onDrop);
19241939 switch (relayed.type ) {
19251940 case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2333,11 +2348,23 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
23332348 bool hasForwards = spec.forwards .empty () == false ;
23342349 bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
23352350
2336- if (context.canForwardEarly && hasForwards && consumeSomething) {
2337- O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " Early forwainding : %{public}s." , fmt::format (" {}" , action.op ).c_str ());
2351+ if (context.forwardPolicy == ForwardPolicy::AtCompletionPolicySatisified && hasForwards && consumeSomething) {
2352+ O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " Early forwarding : %{public}s." , fmt::format (" {}" , action.op ).c_str ());
23382353 auto & timesliceIndex = ref.get <TimesliceIndex>();
23392354 forwardInputs (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
2355+ } else if (context.forwardPolicy == ForwardPolicy::AtInjection && hasForwards && consumeSomething) {
2356+ // We used to do fowarding here, however we now do it much earlier.
2357+ // We still need to clean the inputs which were already consumed
2358+ // via ConsumeExisting and which still have an header to hold the slot.
2359+ // FIXME: do we? This should really happen when we do the forwarding on
2360+ // insertion, because otherwise we lose the relevant information on how to
2361+ // navigate the set of headers. We could actually rely on the messageset index,
2362+ // is that the right thing to do though?
2363+ O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " cleaning early forwarding: %{public}s." , fmt::format (" {}" , action.op ).c_str ());
2364+ auto & timesliceIndex = ref.get <TimesliceIndex>();
2365+ cleanEarlyForward (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
23402366 }
2367+
23412368 markInputsAsDone (action.slot );
23422369
23432370 uint64_t tStart = uv_hrtime ();
@@ -2456,7 +2483,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
24562483 context.postDispatchingCallbacks (processContext);
24572484 ref.get <CallbackService>().call <CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
24582485 }
2459- if ((context.canForwardEarly == false ) && hasForwards && consumeSomething) {
2486+ if ((context.forwardPolicy == ForwardPolicy::AfterProcessing ) && hasForwards && consumeSomething) {
24602487 O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " Late forwarding" );
24612488 auto & timesliceIndex = ref.get <TimesliceIndex>();
24622489 forwardInputs (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), false , action.op == CompletionPolicy::CompletionOp::Consume);
0 commit comments