@@ -156,6 +156,7 @@ int defaultConditionQueryRateMultiplier()
156156
157157void WorkflowHelpers::injectServiceDevices (WorkflowSpec& workflow, ConfigContext& ctx)
158158{
159+ int rateLimitingIPCID = std::stoi (ctx.options ().get <std::string>(" timeframes-rate-limit-ipcid" ));
159160 DataProcessorSpec ccdbBackend{
160161 .name = " internal-dpl-ccdb-backend" ,
161162 .outputs = {},
@@ -230,23 +231,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
230231 ConfigParamSpec{" step-value-enumeration" , VariantType::Int64, 1ll , {" step between one value and the other" }}},
231232 .requiredServices = CommonServices::defaultServices (" O2FrameworkAnalysisSupport:RunSummary" )};
232233
233- // AOD reader can be rate limited
234- int rateLimitingIPCID = std::stoi (ctx.options ().get <std::string>(" timeframes-rate-limit-ipcid" ));
235- std::string rateLimitingChannelConfigInput;
236- std::string rateLimitingChannelConfigOutput;
237- bool internalRateLimiting = false ;
238-
239- // In case we have rate-limiting requested, any device without an input will get one on the special
240- // "DPL/RATE" message.
241- if (rateLimitingIPCID >= 0 ) {
242- rateLimitingChannelConfigInput = fmt::format (" name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0" ,
243- ChannelSpecHelpers::defaultIPCFolder (), rateLimitingIPCID);
244- rateLimitingChannelConfigOutput = fmt::format (" name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0" ,
245- ChannelSpecHelpers::defaultIPCFolder (), rateLimitingIPCID);
246- internalRateLimiting = true ;
247- aodReader.options .emplace_back (ConfigParamSpec{" channel-config" , VariantType::String, rateLimitingChannelConfigInput, {" how many timeframes can be in flight at the same time" }});
248- }
249-
250234 ctx.services ().registerService (ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
251235 auto & dec = ctx.services ().get <DanglingEdgesContext>();
252236
@@ -274,7 +258,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
274258 // A timeframeSink consumes timeframes without creating new
275259 // timeframe data.
276260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
277- if (std::stoi (ctx. options (). get <std::string>( " timeframes-rate-limit-ipcid " )) != -1 ) {
261+ if (rateLimitingIPCID != -1 ) {
278262 if (timeframeSink && processor.name .find (" internal-dpl-injected-dummy-sink" ) == std::string::npos) {
279263 O2_SIGNPOST_ID_GENERATE (sid, workflow_helpers);
280264 uint32_t hash = runtime_hash (processor.name .c_str ());
@@ -384,7 +368,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
384368 " internal-dpl-aod-index-builder" ,
385369 {},
386370 {},
387- PluginManager::loadAlgorithmFromPlugin ( " O2FrameworkOnDemandTablesSupport " , " IndexTableBuilder " , ctx ), // readers::AODReaderHelpers::indexBuilderCallback(ctx),
371+ AlgorithmSpec::dummyAlgorithm ( ), // real algorithm will be set in adjustTopology
388372 {}};
389373 AnalysisSupportHelpers::addMissingOutputsToBuilder (dec.requestedIDXs , dec.requestedAODs , dec.requestedDYNs , indexBuilder);
390374
@@ -400,7 +384,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
400384 " internal-dpl-aod-spawner" ,
401385 {},
402386 {},
403- PluginManager::loadAlgorithmFromPlugin ( " O2FrameworkOnDemandTablesSupport " , " ExtendedTableSpawner " , ctx ), // readers::AODReaderHelpers::aodSpawnerCallback(ctx),
387+ AlgorithmSpec::dummyAlgorithm ( ), // real algorithm will be set in adjustTopology
404388 {}};
405389 AnalysisSupportHelpers::addMissingOutputsToSpawner ({}, dec.spawnerInputs , dec.requestedAODs , aodSpawner);
406390 AnalysisSupportHelpers::addMissingOutputsToReader (dec.providedAODs , dec.requestedAODs , aodReader);
@@ -431,13 +415,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
431415 auto mctracks2aod = std::find_if (workflow.begin (), workflow.end (), [](auto const & x) { return x.name == " mctracks-to-aod" ; });
432416 if (mctracks2aod == workflow.end ()) {
433417 // add normal reader
434- auto && algo = PluginManager::loadAlgorithmFromPlugin (" O2FrameworkAnalysisSupport" , " ROOTFileReader" , ctx);
435- aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption (algo);
436418 aodReader.outputs .emplace_back (OutputSpec{" TFN" , " TFNumber" });
437419 aodReader.outputs .emplace_back (OutputSpec{" TFF" , " TFFilename" });
438420 } else {
439- // AODs are being injected on-the-fly, add dummy reader
440- auto algo = AlgorithmSpec{
421+ // AODs are being injected on-the-fly, add error-handler reader
422+ aodReader. algorithm = AlgorithmSpec{
441423 adaptStateful (
442424 [outputs = aodReader.outputs ](DeviceSpec const &) {
443425 LOGP (warn, " Workflow with injected AODs has unsatisfied inputs:" );
@@ -448,7 +430,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
448430 // to ensure the output type for adaptStateful
449431 return adaptStateless ([](DataAllocator&) {});
450432 })};
451- aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption (algo);
452433 }
453434 auto concrete = DataSpecUtils::asConcreteDataMatcher (aodReader.inputs [0 ]);
454435 timer.outputs .emplace_back (concrete.origin , concrete.description , concrete.subSpec , Lifetime::Enumeration);
@@ -533,9 +514,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
533514
534515 // add the Analysys CCDB backend which reads CCDB objects using a provided table
535516 if (analysisCCDBBackend.outputs .empty () == false ) {
536- // add normal reader
537- auto && algo = PluginManager::loadAlgorithmFromPlugin (" O2FrameworkCCDBSupport" , " AnalysisCCDBFetcherPlugin" , ctx);
538- analysisCCDBBackend.algorithm = algo;
539517 extraSpecs.push_back (analysisCCDBBackend);
540518 }
541519
@@ -637,6 +615,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
637615 extraSpecs.push_back (CommonDataProcessors::getScheduledDummySink (ignored));
638616 } else {
639617 O2_SIGNPOST_EVENT_EMIT (workflow_helpers, sid, " injectServiceDevices" , " Injecting rate limited dummy sink" );
618+ std::string rateLimitingChannelConfigOutput;
619+ if (rateLimitingIPCID != -1 ) {
620+ rateLimitingChannelConfigOutput = fmt::format (" name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0" , ChannelSpecHelpers::defaultIPCFolder (), rateLimitingIPCID);
621+ }
640622 extraSpecs.push_back (CommonDataProcessors::getDummySink (ignored, rateLimitingChannelConfigOutput));
641623 }
642624 }
0 commit comments