Skip to content

Speed up node boot times by parallelizing buffer acquisition#19025

Open
jtuglu1 wants to merge 4 commits intoapache:masterfrom
jtuglu1:speed-up-broker-launch-time
Open

Speed up node boot times by parallelizing buffer acquisition#19025
jtuglu1 wants to merge 4 commits intoapache:masterfrom
jtuglu1:speed-up-broker-launch-time

Conversation

@jtuglu1
Copy link
Contributor

@jtuglu1 jtuglu1 commented Feb 14, 2026

Description

Brokers/Historicals/Peons currently allocate buffers serially on boot. For large quantities of larger buffers (100+ buffers @ ~2GB per buffer) this can mean waiting for several minutes (in our case, upwards of 6mins for brokers, 5+ seconds for peons) just to acquire the memory needed, which isn't great. This is because it is effectively doing 100 sequential malloc/mmap calls each needing 2GB zero'd out memory. This change parallelizes the acquisition of the buffers proportional to the number of cores available on the machine. IntStream threadpool is temporary and released once finished (this happens before the broker comes online and is serving queries anyways).

This is very helpful for both deployments and auto-scaling as it means newly-added nodes can more quickly begin providing value to the cluster. This, for example, can help with task launch times. For applications that run with -XX:+AlwaysPreTouch, this will be even slower as the JVM pre-allocate/touch the necessary memory pages.

Once compiler version is ≥ 21, we can consider MemorySegment API and allocate a single "wide" buffer and slice into that. That should be one large malloc/mmap call which should return much faster.

Benchmarks

Allocate 10, 100 merge buffers at @ 2GB each, using optimal JVM memory flags on JDK 21 compiled on JDK 17:

Before

Benchmark                                              (bufferCount)  (bufferSizeBytes)  Mode  Cnt       Score      Error  Units
DefaultBlockingPoolConstructorBenchmark.constructPool             10         2000000000    ss    5   11860.140 ±   84.047  ms/op
DefaultBlockingPoolConstructorBenchmark.constructPool            100         2000000000    ss    5  118666.210 ± 1413.217  ms/op

After

Benchmark                                              (bufferCount)  (bufferSizeBytes)  Mode  Cnt     Score     Error  Units
DefaultBlockingPoolConstructorBenchmark.constructPool             10         2000000000    ss    5  1407.045 ±  11.297  ms/op
DefaultBlockingPoolConstructorBenchmark.constructPool            100         2000000000    ss    5  5329.244 ± 723.182  ms/op

Overall, this results in a measured ~10x reduction in launch time in the worst case, to 100x in the best case. Brokers now boot ~7mins faster and peons ~5s faster on our workfloads.

Benchmark File
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.benchmark.collections;

import com.google.common.base.Supplier;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Measures how long {@link DefaultBlockingPool}'s constructor takes to pre-allocate N buffers.
 *
 * This is meant to reflect broker/historical startup behavior, which constructs a merge buffer pool backed by direct
 * {@link ByteBuffer}s.
 */
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@Fork(1)
@State(Scope.Thread)
public class DefaultBlockingPoolConstructorBenchmark
{
  @Param({"10", "100", "500"})
  public int bufferCount;

  /**
   * Size of each direct buffer to allocate.
   *
   * Note that {@link ByteBuffer} capacities are limited to {@link Integer#MAX_VALUE}, so a "2GB" buffer cannot be
   * exactly 2GiB (2,147,483,648). Use a value <= 2,147,483,647.
   */
  @Param({"2000000000"})
  public int bufferSizeBytes;

  private Supplier<ByteBuffer> generator;
  private DefaultBlockingPool<ByteBuffer> pool;
  private ByteBuffer[] allocatedBuffers;

  @Setup(Level.Invocation)
  public void setupInvocation()
  {
    allocatedBuffers = new ByteBuffer[bufferCount];
    final AtomicInteger allocateIndex = new AtomicInteger(0);
    generator = () -> {
      final int i = allocateIndex.getAndIncrement();
      final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSizeBytes);
      // generator is invoked from a parallel stream in DefaultBlockingPool ctor
      if (i >= 0 && i < allocatedBuffers.length) {
        allocatedBuffers[i] = buffer;
      }
      return buffer;
    };
    pool = null;
  }

  @TearDown(Level.Invocation)
  public void tearDownInvocation()
  {
    pool = null;
    if (allocatedBuffers != null) {
      for (ByteBuffer buffer : allocatedBuffers) {
        if (buffer != null) {
          ByteBufferUtils.free(buffer);
        }
      }
      allocatedBuffers = null;
    }
  }

  @Benchmark
  public void constructPool(final Blackhole blackhole)
  {
    pool = new DefaultBlockingPool<>(generator, bufferCount);
    blackhole.consume(pool);
  }

  /**
   * Convenience entrypoint to run from an IDE without building the shaded benchmarks jar.
   *
   * Note: to run with large direct buffers, set your IDE run configuration VM options, e.g.
   * {@code -Xmx200g -XX:MaxDirectMemorySize=200g}.
   */
  public static void main(String[] args) throws RunnerException
  {
    final Options opt = new OptionsBuilder()
        .include(".*" + DefaultBlockingPoolConstructorBenchmark.class.getSimpleName() + ".*")
        .forks(1)
        .warmupIterations(3)
        .warmupTime(TimeValue.seconds(1))
        .measurementIterations(5)
        .measurementTime(TimeValue.seconds(1))
        .build();

    new Runner(opt).run();
  }
}

Release note

Speed up broker boot times by parallelizing merger buffer initialization


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@jtuglu1 jtuglu1 marked this pull request as ready for review February 14, 2026 20:57
Brokers currently allocate buffers serially on boot. For large amounts of buffer (100+ buffers) this can mean waiting for several minutes to acquire the memory needed. This change parallelizes the acquisition of the buffers.
@jtuglu1 jtuglu1 force-pushed the speed-up-broker-launch-time branch from 1fed5c0 to 408e2ff Compare February 15, 2026 20:40
Copy link
Contributor

@maytasm maytasm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 16, 2026

I will need to adjust approach since the test failures are due to deadlock/starvation on usage of common pool for the .parallel(). Given the gains, I think it's fine to spin up a temporary FJP to do the allocations for both temp/merge buffers.

@abhishekrb19
Copy link
Contributor

Brokers currently allocate buffers serially on boot. Speed up broker boot times by parallelizing merger buffer initialization

Just a quick clarification - isn't this applicable to all servers, including Historicals and Peons? Or did you notice the bottleneck primarily on the Brokers?

@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 17, 2026

Brokers currently allocate buffers serially on boot. Speed up broker boot times by parallelizing merger buffer initialization

Just a quick clarification - isn't this applicable to all servers, including Historicals and Peons? Or did you notice the bottleneck primarily on the Brokers?

Yeah this will speed up all servers that pre-alloc some # of buffers.

@jtuglu1 jtuglu1 changed the title Speed up broker boot times by parallelizing buffer acquisition Speed up node boot times by parallelizing buffer acquisition Feb 17, 2026
@kfaraz
Copy link
Contributor

kfaraz commented Feb 19, 2026

@jtuglu1 , to address the CI failures, would it make sense to not fully parallelize the allocation and instead use batches of size equal to the number of cores? Claude seems to think that this would reduce the contention in the JVM direct mem allocator.

@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 19, 2026

@jtuglu1 , to address the CI failures, would it make sense to not fully parallelize the allocation and instead use batches of size equal to the number of cores? Claude seems to think that this would reduce the contention in the JVM direct mem allocator.

I think this can still, in the worst case, run into issues since you're not guaranteeing a completion deadline on the buffer alloc tasks. This means you can still occupy the common thread pool threads which might cause other deadlock issues. To resolve this, I've created a temporary FJP to perform the allocs. Normally, doing this sort of thing would be prohibitive, however FJP threads are created lazily and there are only at most 2 production usages of this pool per node, so we're spinning up at most 2 dedicated, short-lived allocation pools per node only once (on boot) which I think is reasonable, LMK if you disagree.

An alternative would be to make a static, shared FJP in the class.

@jtuglu1 jtuglu1 requested a review from kfaraz February 19, 2026 20:15
@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 19, 2026

@gianm @clintropolis any thoughts here?

@jtuglu1 jtuglu1 force-pushed the speed-up-broker-launch-time branch 3 times, most recently from bf519eb to 012f9fc Compare February 20, 2026 18:39
@jtuglu1 jtuglu1 force-pushed the speed-up-broker-launch-time branch from 012f9fc to 647b517 Compare February 20, 2026 19:54
@jtuglu1 jtuglu1 force-pushed the speed-up-broker-launch-time branch 2 times, most recently from add5d63 to 37f6cca Compare February 20, 2026 20:01
@jtuglu1 jtuglu1 force-pushed the speed-up-broker-launch-time branch from 37f6cca to 63b2a18 Compare February 20, 2026 20:02
@jtuglu1 jtuglu1 requested a review from abhishekrb19 February 20, 2026 20:16
@jtuglu1 jtuglu1 requested a review from abhishekrb19 February 21, 2026 01:41
|`druid.processing.fifo`|Enables the processing queue to treat tasks of equal priority in a FIFO manner.|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks. <br />Set to `local` to store segment files in the local storage of the Middle Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid does not support automated cleanup for the `shuffle-data` directory. You can set up cloud storage lifecycle rules for automated cleanup of data at the `shuffle-data` prefix location.|`local`|
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing pools to be allocated in parallel on process launch. This significantly speeds up node launch times.|`false`|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Maybe also include an estimate of what qualifies as a "big" number of "large" buffers so that operators know when to turn this on)

Suggested change
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing pools to be allocated in parallel on process launch. This significantly speeds up node launch times.|`false`|
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing pools to be allocated in parallel on process launch. This may significantly speed up node launch times if allocating several large buffers.|`false`|

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include this update in the other places too, and also call out the potential caveat with the locking/starvation if the node doesn't have enough resources.

this.tmpDir = Configs.valueOrDefault(tmpDir, System.getProperty("java.io.tmpdir"));
this.buffer = Configs.valueOrDefault(buffer, new DruidProcessingBufferConfig());
this.indexes = Configs.valueOrDefault(indexes, new DruidProcessingIndexesConfig());
this.parallelPoolInit = parallelPoolInit != null && parallelPoolInit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: easier to follow

Suggested change
this.parallelPoolInit = parallelPoolInit != null && parallelPoolInit;
this.parallelPoolInit = Configs.valueOrDefault(parallelPoolInit, false);

return numMergeBuffersConfigured;
}

public boolean isParallelPoolInit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1-line javadoc might be helpful here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or isParallelMemoryPoolInit to better distinguish it from thread pool stuff?

parallelMergeInitialYieldRows
parallelMergeParallelism
parallelMergeSmallBatchRows
parallelPoolInit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the spellcheck fail? I was under the impression that back-quoted stuff was exempt from spelling checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did, yes

@clintropolis
Copy link
Member

To resolve this, I've created a temporary FJP to perform the allocs. Normally, doing this sort of thing would be prohibitive, however FJP threads are created lazily and there are only at most 2 production usages of this pool per node, so we're spinning up at most 2 dedicated, short-lived allocation pools per node only once (on boot) which I think is reasonable, LMK if you disagree.

this sounds reasonable to me 🤷

return numMergeBuffersConfigured;
}

public boolean isParallelPoolInit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or isParallelMemoryPoolInit to better distinguish it from thread pool stuff?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants