Skip to content

Commit c44ace5

Browse files
committed
TPC Workflow: use helper lambdas to create allocators to avoid code duplication
1 parent 38ccdf5 commit c44ace5

File tree

2 files changed

+72
-98
lines changed

2 files changed

+72
-98
lines changed

Detectors/TPC/workflow/src/CATrackerSpec.cxx

Lines changed: 72 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
391391
}
392392
std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
393393
for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
394-
const o2::header::DataHeader* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
394+
const DataHeader* dh = DataRefUtils::getHeader<DataHeader*>(ref);
395395
const gsl::span<const char> raw = pc.inputs().get<gsl::span<char>>(ref);
396396
o2::framework::RawParser parser(raw.data(), raw.size());
397397

@@ -403,7 +403,7 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
403403
size_t totalSize = 0;
404404
for (auto it = parser.begin(); it != parser.end(); it++) {
405405
const unsigned char* current = it.raw();
406-
const o2::header::RAWDataHeader* rdh = (const o2::header::RAWDataHeader*)current;
406+
const RAWDataHeader* rdh = (const RAWDataHeader*)current;
407407
if (current == nullptr || it.size() == 0 || (current - ptr) % TPCZSHDR::TPC_ZS_PAGE_SIZE || o2::raw::RDHUtils::getFEEID(*rdh) != lastFEE) {
408408
if (count) {
409409
tpcZSmetaPointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
@@ -445,7 +445,7 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
445445
/*DPLRawParser parser(pc.inputs(), filter);
446446
for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
447447
// retrieving RDH v4
448-
auto const* rdh = it.get_if<o2::header::RAWDataHeaderV4>();
448+
auto const* rdh = it.get_if<RAWDataHeaderV4>();
449449
// retrieving the raw pointer of the page
450450
auto const* raw = it.raw();
451451
// retrieving payload pointer of the page
@@ -484,28 +484,26 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
484484

485485
const auto& inputsClustersDigits = getWorkflowTPCInput(pc, verbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, processAttributes->tpcSectorMask, getWorkflowTPCInput_digits);
486486

487-
if (specconfig.caClusterer) {
488-
if (specconfig.zsOnTheFly) {
489-
const unsigned long long int* buffer = reinterpret_cast<const unsigned long long int*>(&inputZS[0]);
490-
o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
491-
ptrs.tpcZS = &tpcZS;
492-
if (specconfig.processMC) {
493-
ptrs.o2Digits = &inputsClustersDigits->inputDigits;
494-
ptrs.o2DigitsMC = &inputsClustersDigits->inputDigitsMCPtrs;
495-
}
496-
} else if (specconfig.zsDecoder) {
497-
ptrs.tpcZS = &tpcZS;
498-
if (specconfig.processMC) {
499-
throw std::runtime_error("Cannot process MC information, none available");
500-
}
501-
} else {
487+
if (specconfig.decompressTPC) {
488+
ptrs.compressedClusters = pCompClustersFlat;
489+
} else if (specconfig.zsOnTheFly) {
490+
const unsigned long long int* buffer = reinterpret_cast<const unsigned long long int*>(&inputZS[0]);
491+
o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
492+
ptrs.tpcZS = &tpcZS;
493+
if (specconfig.processMC) {
502494
ptrs.o2Digits = &inputsClustersDigits->inputDigits;
503-
if (specconfig.processMC) {
504-
ptrs.o2DigitsMC = &inputsClustersDigits->inputDigitsMCPtrs;
505-
}
495+
ptrs.o2DigitsMC = &inputsClustersDigits->inputDigitsMCPtrs;
496+
}
497+
} else if (specconfig.zsDecoder) {
498+
ptrs.tpcZS = &tpcZS;
499+
if (specconfig.processMC) {
500+
throw std::runtime_error("Cannot process MC information, none available");
501+
}
502+
} else if (specconfig.caClusterer) {
503+
ptrs.o2Digits = &inputsClustersDigits->inputDigits;
504+
if (specconfig.processMC) {
505+
ptrs.o2DigitsMC = &inputsClustersDigits->inputDigitsMCPtrs;
506506
}
507-
} else if (specconfig.decompressTPC) {
508-
ptrs.compressedClusters = pCompClustersFlat;
509507
} else {
510508
ptrs.clusters = &inputsClustersDigits->clusterIndex;
511509
}
@@ -515,52 +513,35 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
515513
TPCSectorHeader clusterOutputSectorHeader{0};
516514
if (processAttributes->clusterOutputIds.size() > 0) {
517515
clusterOutputSectorHeader.sectorBits = processAttributes->tpcSectorMask;
518-
// subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors
519-
// to indicate the full TPC
516+
// subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
520517
clusterOutputSectorHeader.activeSectors = processAttributes->tpcSectorMask;
521518
}
522519

523520
GPUInterfaceOutputs outputRegions;
524-
std::optional<std::reference_wrapper<O2CharVectorOutputType>> clusterOutput = std::nullopt, bufferCompressedClusters = std::nullopt, bufferTPCTracks = std::nullopt, bufferSharedClusterMap = std::nullopt;
525-
char *clusterOutputChar = nullptr, *bufferCompressedClustersChar = nullptr, *bufferTPCTracksChar = nullptr, *bufferSharedClusterMapChar;
526-
if (specconfig.outputCompClustersFlat) {
527-
if (processAttributes->allocateOutputOnTheFly) {
528-
outputRegions.compressedClusters.allocator = [&bufferCompressedClustersChar, &pc](size_t size) -> void* {bufferCompressedClustersChar = pc.outputs().make<char>(Output{gDataOriginTPC, "COMPCLUSTERSFLAT", 0}, size).data(); return bufferCompressedClustersChar; };
529-
} else {
530-
bufferCompressedClusters.emplace(pc.outputs().make<std::vector<char>>(Output{gDataOriginTPC, "COMPCLUSTERSFLAT", 0}, processAttributes->outputBufferSize));
531-
outputRegions.compressedClusters.ptrBase = bufferCompressedClustersChar = bufferCompressedClusters->get().data();
532-
outputRegions.compressedClusters.size = bufferCompressedClusters->get().size();
533-
}
534-
}
535-
if (processAttributes->clusterOutputIds.size() > 0) {
536-
const o2::header::DataDescription outputLabel = specconfig.sendClustersPerSector ? (o2::header::DataDescription) "CLUSTERNATIVETMP" : (o2::header::DataDescription) "CLUSTERNATIVE";
537-
if (processAttributes->allocateOutputOnTheFly) {
538-
outputRegions.clustersNative.allocator = [&clusterOutputChar, &pc, clusterOutputSectorHeader, outputLabel](size_t size) -> void* {clusterOutputChar = pc.outputs().make<char>({gDataOriginTPC, outputLabel, NSectors, Lifetime::Timeframe, {clusterOutputSectorHeader}}, size + sizeof(ClusterCountIndex)).data(); return clusterOutputChar + sizeof(ClusterCountIndex); };
539-
} else {
540-
clusterOutput.emplace(pc.outputs().make<std::vector<char>>({gDataOriginTPC, outputLabel, NSectors, Lifetime::Timeframe, {clusterOutputSectorHeader}}, processAttributes->outputBufferSize));
541-
clusterOutputChar = clusterOutput->get().data();
542-
outputRegions.clustersNative.ptrBase = clusterOutputChar + sizeof(ClusterCountIndex);
543-
outputRegions.clustersNative.size = clusterOutput->get().size() - sizeof(ClusterCountIndex);
544-
}
545-
}
546-
if (specconfig.outputTracks) {
547-
if (processAttributes->allocateOutputOnTheFly) {
548-
outputRegions.tpcTracks.allocator = [&bufferTPCTracksChar, &pc](size_t size) -> void* {bufferTPCTracksChar = pc.outputs().make<char>(Output{gDataOriginTPC, "TRACKSGPU", 0}, size).data(); return bufferTPCTracksChar; };
549-
} else {
550-
bufferTPCTracks.emplace(pc.outputs().make<std::vector<char>>(Output{gDataOriginTPC, "TRACKSGPU", 0}, processAttributes->outputBufferSize));
551-
outputRegions.tpcTracks.ptrBase = bufferTPCTracksChar = bufferTPCTracks->get().data();
552-
outputRegions.tpcTracks.size = bufferTPCTracks->get().size();
553-
}
554-
}
555-
if (specconfig.outputSharedClusterMap) {
556-
if (processAttributes->allocateOutputOnTheFly) {
557-
outputRegions.sharedClusterMap.allocator = [&bufferSharedClusterMapChar, &pc](size_t size) -> void* {bufferSharedClusterMapChar = pc.outputs().make<char>(Output{gDataOriginTPC, "CLSHAREDMAP", 0}, size).data(); return bufferSharedClusterMapChar; };
558-
} else {
559-
bufferSharedClusterMap.emplace(pc.outputs().make<std::vector<char>>(Output{gDataOriginTPC, "CLSHAREDMAP", 0}, processAttributes->outputBufferSize));
560-
outputRegions.sharedClusterMap.ptrBase = bufferSharedClusterMapChar = bufferSharedClusterMap->get().data();
561-
outputRegions.sharedClusterMap.size = bufferSharedClusterMap->get().size();
521+
std::vector<std::pair<std::optional<std::reference_wrapper<O2CharVectorOutputType>>, char*>> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
522+
523+
auto setOutputAllocator = [&specconfig, &outputBuffers, &outputRegions, &processAttributes, &pc, verbosity](bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
524+
if (condition) {
525+
auto& buffer = outputBuffers[outputRegions.getIndex(region)];
526+
if (processAttributes->allocateOutputOnTheFly) {
527+
region.allocator = [&buffer, &pc, outputSpec = std::move(outputSpec), verbosity, offset](size_t size) -> void* {
528+
size += offset;
529+
if (verbosity) {
530+
LOG(INFO) << "ALLOCATING " << size << " bytes for " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
531+
}
532+
return (buffer.second = pc.outputs().make<char>(std::make_from_tuple<Output>(outputSpec), size).data()) + offset;
533+
};
534+
} else {
535+
buffer.first.emplace(pc.outputs().make<std::vector<char>>(std::make_from_tuple<Output>(outputSpec), processAttributes->outputBufferSize));
536+
region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
537+
region.size = buffer.first->get().size() - offset;
538+
}
562539
}
563-
}
540+
};
541+
setOutputAllocator(specconfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
542+
setOutputAllocator(processAttributes->clusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, specconfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe, clusterOutputSectorHeader), sizeof(ClusterCountIndex));
543+
setOutputAllocator(specconfig.outputTracks, outputRegions.tpcTracks, std::make_tuple(gDataOriginTPC, (DataDescription) "TRACKSGPU", 0));
544+
setOutputAllocator(specconfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
564545
if (specconfig.processMC && specconfig.caClusterer) {
565546
outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
566547
}
@@ -572,6 +553,23 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
572553
if (retVal != 0) {
573554
throw std::runtime_error("tracker returned error code " + std::to_string(retVal));
574555
}
556+
557+
if (!processAttributes->allocateOutputOnTheFly) {
558+
for (unsigned int i = 0; i < outputRegions.count(); i++) {
559+
if (outputRegions.asArray()[i].ptrBase) {
560+
if (outputRegions.asArray()[i].size == 1) {
561+
throw std::runtime_error("Preallocated buffer size exceeded");
562+
}
563+
outputRegions.asArray()[i].checkCurrent();
564+
outputBuffers[i].first->get().resize((char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
565+
if ((void*)outputBuffers[i].first->get().data() != (void*)outputBuffers[i].second) {
566+
throw std::runtime_error("Output buffer ptr out of sync");
567+
}
568+
outputBuffers[i].second = outputBuffers[i].first->get().data();
569+
}
570+
}
571+
}
572+
575573
LOG(INFO) << "found " << tracks.size() << " track(s)";
576574
// tracks are published if the output channel is configured
577575
if (specconfig.outputTracks) {
@@ -583,42 +581,22 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
583581
}
584582
}
585583

586-
if (ptrs.compressedClusters != nullptr) {
587-
if (specconfig.outputCompClustersFlat) {
588-
if (!processAttributes->allocateOutputOnTheFly) {
589-
bufferCompressedClusters->get().resize(outputRegions.compressedClusters.size);
590-
}
591-
if ((void*)ptrs.compressedClusters != (void*)bufferCompressedClustersChar) {
592-
throw std::runtime_error("compressed cluster output ptrs out of sync"); // sanity check
593-
}
594-
}
595-
if (specconfig.outputCompClusters) {
596-
CompressedClustersROOT compressedClusters = *ptrs.compressedClusters;
597-
pc.outputs().snapshot(Output{gDataOriginTPC, "COMPCLUSTERS", 0}, ROOTSerialized<CompressedClustersROOT const>(compressedClusters));
598-
}
599-
} else {
600-
LOG(ERROR) << "unable to get compressed cluster info from track";
584+
if (specconfig.outputCompClusters) {
585+
CompressedClustersROOT compressedClusters = *ptrs.compressedClusters;
586+
pc.outputs().snapshot(Output{gDataOriginTPC, "COMPCLUSTERS", 0}, ROOTSerialized<CompressedClustersROOT const>(compressedClusters));
601587
}
602588

603-
// publish clusters produced by CA clusterer sector-wise if the outputs are configured
604-
if (processAttributes->clusterOutputIds.size() > 0 && ptrs.clusters == nullptr) {
605-
throw std::logic_error("No cluster index object provided by GPU processor");
606-
}
607-
// previously, clusters have been published individually for the enabled sectors
608-
// clusters are now published as one block, subspec is NSectors
609589
if (processAttributes->clusterOutputIds.size() > 0) {
610-
if (!processAttributes->allocateOutputOnTheFly) {
611-
clusterOutput->get().resize(sizeof(ClusterCountIndex) + outputRegions.clustersNative.size);
612-
}
613-
if ((void*)ptrs.clusters->clustersLinear != (void*)(clusterOutputChar + sizeof(ClusterCountIndex))) {
590+
if ((void*)ptrs.clusters->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(ClusterCountIndex))) {
614591
throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
615592
}
616593

617594
ClusterNativeAccess const& accessIndex = *ptrs.clusters;
618595
if (specconfig.sendClustersPerSector) {
596+
// Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
619597
for (int i = 0; i < NSectors; i++) {
620598
if (processAttributes->tpcSectorMask & (1ul << i)) {
621-
o2::header::DataHeader::SubSpecificationType subspec = i;
599+
DataHeader::SubSpecificationType subspec = i;
622600
clusterOutputSectorHeader.sectorBits = (1ul << i);
623601
char* buffer = pc.outputs().make<char>({gDataOriginTPC, "CLUSTERNATIVE", subspec, Lifetime::Timeframe, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(ClusterCountIndex)).data();
624602
ClusterCountIndex* outIndex = reinterpret_cast<ClusterCountIndex*>(buffer);
@@ -642,8 +620,9 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
642620
}
643621
}
644622
} else {
645-
o2::header::DataHeader::SubSpecificationType subspec = NSectors;
646-
ClusterCountIndex* outIndex = reinterpret_cast<ClusterCountIndex*>(clusterOutputChar);
623+
// Clusters are shipped as single message, fill ClusterCountIndex
624+
DataHeader::SubSpecificationType subspec = NSectors;
625+
ClusterCountIndex* outIndex = reinterpret_cast<ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
647626
static_assert(sizeof(ClusterCountIndex) == sizeof(accessIndex.nClusters));
648627
memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(ClusterCountIndex));
649628
if (specconfig.processMC && specconfig.caClusterer && accessIndex.clustersMCTruth) {
@@ -728,7 +707,7 @@ DataProcessorSpec getCATrackerSpec(CompletionPolicyData* policyData, ca::Config
728707
}
729708
if (specconfig.processMC && specconfig.outputTracks) {
730709
OutputLabel label{"mclblout"};
731-
constexpr o2::header::DataDescription datadesc("TRACKSMCLBL");
710+
constexpr DataDescription datadesc("TRACKSMCLBL");
732711
outputSpecs.emplace_back(label, gDataOriginTPC, datadesc, 0, Lifetime::Timeframe);
733712
}
734713
if (specconfig.outputCompClusters) {

GPU/GPUTracking/Interface/GPUO2Interface.cxx

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,6 @@ int GPUTPCO2Interface::RunTracking(GPUTrackingInOutPointers* data, GPUInterfaceO
134134
mRec->ClearAllocatedMemory();
135135
return retVal;
136136
}
137-
if (mConfig->configInterface.outputToExternalBuffers) {
138-
outputs->compressedClusters.size = mOutputControls[outputs->getIndex(outputs->compressedClusters)].size == 1 ? 0 : mChain->mIOPtrs.tpcCompressedClusters->totalDataSize;
139-
outputs->clustersNative.size = mOutputControls[outputs->getIndex(outputs->clustersNative)].size == 1 ? 0 : (mChain->mIOPtrs.clustersNative->nClustersTotal * sizeof(*mChain->mIOPtrs.clustersNative->clustersLinear));
140-
outputs->tpcTracks.size = mOutputControls[outputs->getIndex(outputs->tpcTracks)].size == 1 ? 0 : (size_t)((char*)mOutputControls[outputs->getIndex(outputs->tpcTracks)].ptrCurrent - (char*)mOutputControls[outputs->getIndex(outputs->tpcTracks)].ptrBase);
141-
}
142137
if (mConfig->configQA.shipToQC) {
143138
outputs->qa.hist1 = &mChain->GetQA()->getHistograms1D();
144139
outputs->qa.hist2 = &mChain->GetQA()->getHistograms2D();

0 commit comments

Comments
 (0)