Speed up node boot times by parallelizing buffer acquisition#19025
Speed up node boot times by parallelizing buffer acquisition#19025jtuglu1 wants to merge 4 commits intoapache:masterfrom
Conversation
Brokers currently allocate buffers serially on boot. For large amounts of buffer (100+ buffers) this can mean waiting for several minutes to acquire the memory needed. This change parallelizes the acquisition of the buffers.
1fed5c0 to
408e2ff
Compare
|
I will need to adjust approach since the test failures are due to deadlock/starvation on usage of common pool for the |
Just a quick clarification - isn't this applicable to all servers, including Historicals and Peons? Or did you notice the bottleneck primarily on the Brokers? |
Yeah this will speed up all servers that pre-alloc some # of buffers. |
processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
Outdated
Show resolved
Hide resolved
|
@jtuglu1 , to address the CI failures, would it make sense to not fully parallelize the allocation and instead use batches of size equal to the number of cores? Claude seems to think that this would reduce the contention in the JVM direct mem allocator. |
I think this can still, in the worst case, run into issues since you're not guaranteeing a completion deadline on the buffer alloc tasks. This means you can still occupy the common thread pool threads which might cause other deadlock issues. To resolve this, I've created a temporary FJP to perform the allocs. Normally, doing this sort of thing would be prohibitive, however FJP threads are created lazily and there are only at most 2 production usages of this pool per node, so we're spinning up at most 2 dedicated, short-lived allocation pools per node only once (on boot) which I think is reasonable, LMK if you disagree. An alternative would be to make a static, shared FJP in the class. |
|
@gianm @clintropolis any thoughts here? |
bf519eb to
012f9fc
Compare
processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
Dismissed
Show dismissed
Hide dismissed
012f9fc to
647b517
Compare
add5d63 to
37f6cca
Compare
37f6cca to
63b2a18
Compare
| |`druid.processing.fifo`|Enables the processing queue to treat tasks of equal priority in a FIFO manner.|`true`| | ||
| |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| | ||
| |`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks. <br />Set to `local` to store segment files in the local storage of the Middle Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid does not support automated cleanup for the `shuffle-data` directory. You can set up cloud storage lifecycle rules for automated cleanup of data at the `shuffle-data` prefix location.|`local`| | ||
| |`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing pools to be allocated in parallel on process launch. This significantly speeds up node launch times.|`false`| |
There was a problem hiding this comment.
(Maybe also include an estimate of what qualifies as a "big" number of "large" buffers so that operators know when to turn this on)
| |`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing pools to be allocated in parallel on process launch. This significantly speeds up node launch times.|`false`| | |
| |`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing pools to be allocated in parallel on process launch. This may significantly speed up node launch times if allocating several large buffers.|`false`| |
There was a problem hiding this comment.
Please include this update in the other places too, and also call out the potential caveat with the locking/starvation if the node doesn't have enough resources.
| this.tmpDir = Configs.valueOrDefault(tmpDir, System.getProperty("java.io.tmpdir")); | ||
| this.buffer = Configs.valueOrDefault(buffer, new DruidProcessingBufferConfig()); | ||
| this.indexes = Configs.valueOrDefault(indexes, new DruidProcessingIndexesConfig()); | ||
| this.parallelPoolInit = parallelPoolInit != null && parallelPoolInit; |
There was a problem hiding this comment.
Nit: easier to follow
| this.parallelPoolInit = parallelPoolInit != null && parallelPoolInit; | |
| this.parallelPoolInit = Configs.valueOrDefault(parallelPoolInit, false); |
| return numMergeBuffersConfigured; | ||
| } | ||
|
|
||
| public boolean isParallelPoolInit() |
There was a problem hiding this comment.
1-line javadoc might be helpful here.
There was a problem hiding this comment.
or isParallelMemoryPoolInit to better distinguish it from thread pool stuff?
| parallelMergeInitialYieldRows | ||
| parallelMergeParallelism | ||
| parallelMergeSmallBatchRows | ||
| parallelPoolInit |
There was a problem hiding this comment.
Did the spellcheck fail? I was under the impression that back-quoted stuff was exempt from spelling checks.
this sounds reasonable to me 🤷 |
| return numMergeBuffersConfigured; | ||
| } | ||
|
|
||
| public boolean isParallelPoolInit() |
There was a problem hiding this comment.
or isParallelMemoryPoolInit to better distinguish it from thread pool stuff?
Description
Brokers/Historicals/Peons currently allocate buffers serially on boot. For large quantities of larger buffers (100+ buffers @ ~2GB per buffer) this can mean waiting for several minutes (in our case, upwards of 6mins for brokers, 5+ seconds for peons) just to acquire the memory needed, which isn't great. This is because it is effectively doing 100 sequential malloc/mmap calls each needing 2GB zero'd out memory. This change parallelizes the acquisition of the buffers proportional to the number of cores available on the machine.
IntStreamthreadpool is temporary and released once finished (this happens before the broker comes online and is serving queries anyways).This is very helpful for both deployments and auto-scaling as it means newly-added nodes can more quickly begin providing value to the cluster. This, for example, can help with task launch times. For applications that run with
-XX:+AlwaysPreTouch, this will be even slower as the JVM pre-allocate/touch the necessary memory pages.Once compiler version is ≥ 21, we can consider MemorySegment API and allocate a single "wide" buffer and slice into that. That should be one large malloc/mmap call which should return much faster.
Benchmarks
Allocate 10, 100 merge buffers at @ 2GB each, using optimal JVM memory flags on JDK 21 compiled on JDK 17:
Before
After
Overall, this results in a measured ~10x reduction in launch time in the worst case, to 100x in the best case. Brokers now boot ~7mins faster and peons ~5s faster on our workfloads.
Benchmark File
Release note
Speed up broker boot times by parallelizing merger buffer initialization
This PR has: