Skip to content

Commit 078eb5d

Browse files
committed
Revert "DPL Analysis: Use dangling edges context in more places (#14953)"
This reverts commit c990996.
1 parent 397e019 commit 078eb5d

File tree

5 files changed

+45
-45
lines changed

5 files changed

+45
-45
lines changed

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,13 @@ const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNa
6262

6363
AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
6464
{
65+
auto& ac = ctx.services().get<DanglingEdgesContext>();
6566
auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
6667
int compressionLevel = 505;
6768
if (ctx.options().hasOption("aod-writer-compression")) {
6869
compressionLevel = ctx.options().get<int>("aod-writer-compression");
6970
}
70-
return AlgorithmSpec{[dod, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
71-
auto outputInputs = ic.services().get<DanglingEdgesContext>().outputsInputsAOD;
71+
return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
7272
LOGP(debug, "======== getGlobalAODSink::Init ==========");
7373

7474
// find out if any table needs to be saved
@@ -241,13 +241,14 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
241241
};
242242
}
243243

244-
AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ctx*/)
244+
AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
245245
{
246-
return AlgorithmSpec{[](InitContext& ic) -> std::function<void(ProcessingContext&)> {
247-
using namespace monitoring;
248-
auto& dec = ic.services().get<DanglingEdgesContext>();
249-
auto tskmap = dec.outTskMap;
250-
auto objmap = dec.outObjHistMap;
246+
using namespace monitoring;
247+
auto& ac = ctx.services().get<DanglingEdgesContext>();
248+
auto tskmap = ac.outTskMap;
249+
auto objmap = ac.outObjHistMap;
250+
251+
return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
251252
auto& callbacks = ic.services().get<CallbackService>();
252253
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
253254

@@ -277,7 +278,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ct
277278

278279
callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
279280
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
280-
auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) {
281+
auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) {
281282
O2_SIGNPOST_ID_GENERATE(hid, histogram_registry);
282283
O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram");
283284
if (!ref.header) {
@@ -473,7 +474,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ct
473474
};
474475
O2_SIGNPOST_ID_GENERATE(rid, histogram_registry);
475476
O2_SIGNPOST_START(histogram_registry, rid, "processParts", "Start merging %zu parts received together.", pc.inputs().getNofParts(0));
476-
for (auto pi = 0U; pi < pc.inputs().getNofParts(0); ++pi) {
477+
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
477478
mergePart(pc.inputs().get("x", pi));
478479
}
479480
O2_SIGNPOST_END(histogram_registry, rid, "processParts", "Done histograms in multipart message.");

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -67,39 +67,38 @@ void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::Outpu
6767
}
6868
} // namespace
6969

70-
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
70+
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
7171
{
72-
return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
73-
auto& dec = ic.services().get<DanglingEdgesContext>();
74-
std::vector<std::shared_ptr<arrow::Schema>> schemas;
75-
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
72+
auto& ac = ctx.services().get<DanglingEdgesContext>();
73+
std::vector<std::shared_ptr<arrow::Schema>> schemas;
74+
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
7675

77-
for (auto& input : dec.analysisCCDBInputs) {
78-
std::vector<std::shared_ptr<arrow::Field>> fields;
79-
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
80-
schemaMetadata->Append("outputBinding", input.binding);
76+
for (auto& input : ac.analysisCCDBInputs) {
77+
std::vector<std::shared_ptr<arrow::Field>> fields;
78+
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
79+
schemaMetadata->Append("outputBinding", input.binding);
8180

82-
for (auto& m : input.metadata) {
83-
// Save the list of input tables
84-
if (m.name.starts_with("input:")) {
85-
auto name = m.name.substr(6);
86-
schemaMetadata->Append("sourceTable", name);
87-
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
88-
continue;
89-
}
90-
// Ignore the non ccdb: entries
91-
if (!m.name.starts_with("ccdb:")) {
92-
continue;
93-
}
94-
// Create the schema of the output
95-
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
96-
metadata->Append("url", m.defaultValue.asString());
97-
auto columnName = m.name.substr(strlen("ccdb:"));
98-
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
81+
for (auto& m : input.metadata) {
82+
// Save the list of input tables
83+
if (m.name.starts_with("input:")) {
84+
auto name = m.name.substr(6);
85+
schemaMetadata->Append("sourceTable", name);
86+
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
87+
continue;
88+
}
89+
// Ignore the non ccdb: entries
90+
if (!m.name.starts_with("ccdb:")) {
91+
continue;
9992
}
100-
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
93+
// Create the schema of the output
94+
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
95+
metadata->Append("url", m.defaultValue.asString());
96+
auto columnName = m.name.substr(strlen("ccdb:"));
97+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
10198
}
102-
99+
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
100+
}
101+
return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
103102
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
104103
CCDBFetcherHelper::initialiseHelper(*helper, options);
105104
std::unordered_map<std::string, int> bindings;
@@ -130,11 +129,11 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
130129
int outputRouteIndex = bindings.at(outRouteDesc);
131130
auto& spec = helper->routes[outputRouteIndex].matcher;
132131
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
133-
for (auto const& _ : schema->fields()) {
132+
for (auto& _ : schema->fields()) {
134133
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
135134
}
136135

137-
for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
136+
for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
138137
std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
139138
auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
140139

Framework/CCDBSupport/src/AnalysisCCDBHelpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace o2::framework
1717
{
1818

1919
struct AnalysisCCDBHelpers {
20-
static AlgorithmSpec fetchFromCCDB(ConfigContext const&);
20+
static AlgorithmSpec fetchFromCCDB(ConfigContext const& ctx);
2121
};
2222

2323
} // namespace o2::framework

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
521521
std::vector<ExpressionInfo> expressionInfos;
522522

523523
/// make sure options and configurables are set before expression infos are created
524-
homogeneous_apply_refs([&options](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
524+
homogeneous_apply_refs([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
525525
/// extract conditions and append them as inputs
526526
homogeneous_apply_refs([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());
527527

@@ -620,7 +620,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
620620
}
621621
// reset pre-slice for the next dataframe
622622
auto slices = pc.services().get<ArrowTableSlicingCache>();
623-
homogeneous_apply_refs([&slices](auto& element) {
623+
homogeneous_apply_refs([&pc, &slices](auto& element) {
624624
return analysis_task_parsers::updateSliceInfo(element, slices);
625625
},
626626
*(task.get()));

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirecto
9898
if (!keepString.empty()) {
9999
dod->reset();
100100
std::string d("dangling");
101-
if (d.starts_with(keepString) == 0) {
101+
if (d.find(keepString) == 0) {
102102
// use the dangling outputs
103103
std::vector<InputSpec> danglingOutputs;
104104
for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
@@ -144,7 +144,7 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec>
144144
sinks::append_to{publisher.outputs}; // append them to the publisher outputs
145145

146146
std::vector<InputSpec> additionalInputs;
147-
for (auto const& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
147+
for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
148148
input.metadata |
149149
views::filter_string_params_with("input:") |
150150
views::params_to_input_specs() |

0 commit comments

Comments
 (0)