From 370df9871b00b70b62c3a61eb9b29f15d35b45e1 Mon Sep 17 00:00:00 2001 From: Jackie Tien Date: Wed, 11 Feb 2026 14:13:27 +0800 Subject: [PATCH] Fix race condition in SharedTsBlockQueue async listener causing NPE in MemoryPool.free() (#17196) When SharedTsBlockQueue.add() encounters memory pressure, it registers an async listener on a MemoryReservationFuture to add the TsBlock later. If the upstream FragmentInstance finishes and calls abort()/close() before the listener executes, the following race occurs: 1. abort() sets closed=true, clears the queue, frees bufferRetainedSizeInBytes 2. deRegisterFragmentInstanceFromMemoryPool removes the upstream FI's memory mapping 3. The async listener fires and adds the TsBlock to the closed queue 4. The downstream consumer calls remove() -> MemoryPool.free() with the upstream FI's IDs, but the mapping no longer exists -> NPE Fix: Check the `closed` flag inside the async listener before adding the TsBlock. When closed, skip the add (memory was already freed by abort/close) and complete channelBlocked to prevent hangs. Also add a unit test that reproduces this race condition by using a manually-controlled SettableFuture to simulate the blocked-on-memory path. (cherry picked from commit 7ee7a83b86fc4d266c9c7aa45cccb0044b56daa6) --- .../exchange/SharedTsBlockQueue.java | 34 ++++++-- .../exchange/SharedTsBlockQueueTest.java | 82 ++++++++++++++++++- 2 files changed, 107 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index ec854da306c48..555cf9efe5ad9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -204,7 +204,8 @@ public TsBlock remove() { localPlanNodeId, tsBlock.getSizeInBytes()); bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes(); - // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to + // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event + // to // corresponding LocalSinkChannel. if (sinkChannel != null) { sinkChannel.checkAndInvokeOnFinished(); @@ -253,6 +254,16 @@ public ListenableFuture add(TsBlock tsBlock) { blockedOnMemory.addListener( () -> { synchronized (this) { + // If the queue has been closed or aborted before this listener executes, + // we must not add the TsBlock. The memory reserved for this TsBlock has + // already been freed by abort()/close() via bufferRetainedSizeInBytes. + // Adding it would cause a downstream NPE in MemoryPool.free() when + // the consumer calls remove(), because the upstream FI's memory mapping + // has already been deregistered. + if (closed) { + channelBlocked.set(null); + return; + } queue.add(tsBlock); if (!blocked.isDone()) { blocked.set(null); @@ -262,8 +273,10 @@ public ListenableFuture add(TsBlock tsBlock) { }, // Use directExecutor() here could lead to deadlock. Thread A holds lock of // SharedTsBlockQueueA and tries to invoke the listener of - // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) while - // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener of + // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) + // while + // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener + // of // SharedTsBlockQueueA executorService); return channelBlocked; @@ -303,13 +316,18 @@ public void close() { bufferRetainedSizeInBytes = 0; } if (sinkChannel != null) { - // attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we close - // LocalSourceHandle(with limit clause it's possible) before constructing the corresponding + // attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we + // close + // LocalSourceHandle(with limit clause it's possible) before constructing the + // corresponding // LocalSinkChannel. - // If this close method is invoked by LocalSourceHandle, listener of LocalSourceHandle will - // remove the LocalSourceHandle from the map of MppDataExchangeManager and later when + // If this close method is invoked by LocalSourceHandle, listener of + // LocalSourceHandle will + // remove the LocalSourceHandle from the map of MppDataExchangeManager and later + // when // LocalSinkChannel is initialized, it will construct a new SharedTsBlockQueue. - // It is still safe that we let the LocalSourceHandle close successfully in this case. Because + // It is still safe that we let the LocalSourceHandle close successfully in this + // case. Because // the QueryTerminator will do the final cleaning logic. sinkChannel.close(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java index c39175f6c972d..46196d1c990d1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java @@ -24,7 +24,10 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.lang3.Validate; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -36,12 +39,89 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; public class SharedTsBlockQueueTest { + + /** + * Test that when add() goes into the async listener path (memory blocked) and the queue is + * aborted before the listener fires, the listener does NOT add the TsBlock to the closed queue. + * This reproduces the race condition that caused NPE in MemoryPool.free(). + */ + @Test + public void testAsyncListenerAfterAbortDoesNotAddTsBlock() { + final String queryId = "q0"; + final long mockTsBlockSize = 1024L; + final TFragmentInstanceId fragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); + final String planNodeId = "test"; + + // Use a SettableFuture to manually control when the blocked-on-memory future + // completes. + SettableFuture manualFuture = SettableFuture.create(); + + // Create a mock MemoryPool that returns the manually-controlled future + // (simulating blocked). + LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); + MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class); + Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool); + + // reserve() returns (manualFuture, false) — simulating memory blocked + Mockito.when( + mockMemoryPool.reserve( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyLong(), + Mockito.anyLong())) + .thenReturn(new Pair<>(manualFuture, Boolean.FALSE)); + // tryCancel returns 0 — simulating future already completed (can't cancel) + Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L); + + // Use a direct executor so that when we complete manualFuture, the listener + // runs immediately. + SharedTsBlockQueue queue = + new SharedTsBlockQueue( + fragmentInstanceId, planNodeId, mockLocalMemoryManager, newDirectExecutorService()); + queue.getCanAddTsBlock().set(null); + queue.setMaxBytesCanReserve(Long.MAX_VALUE); + + TsBlock mockTsBlock = Utils.createMockTsBlock(mockTsBlockSize); + + // Step 1: add() goes into async path — listener is registered on manualFuture. + // reserve() returns (manualFuture, false), so the TsBlock is NOT yet added to + // the queue. + ListenableFuture addFuture; + synchronized (queue) { + addFuture = queue.add(mockTsBlock); + } + // The addFuture (channelBlocked) should not be done yet + Assert.assertFalse(addFuture.isDone()); + // Queue should be empty — TsBlock is waiting for memory + Assert.assertTrue(queue.isEmpty()); + + // Step 2: Abort the queue (simulates upstream FI state change listener calling + // abort) + synchronized (queue) { + queue.abort(); + } + Assert.assertTrue(queue.isClosed()); + + // Step 3: Now complete the manualFuture — this triggers the async listener. + // Before the fix, this would add the TsBlock to the closed queue. + // After the fix, the listener detects closed==true and returns without adding. + manualFuture.set(null); + + // Verify: queue should still be empty (TsBlock was NOT added to the closed + // queue) + Assert.assertTrue(queue.isEmpty()); + // The channelBlocked future should be completed (no hang) + Assert.assertTrue(addFuture.isDone()); + } + @Test(timeout = 15000L) public void concurrencyTest() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; - // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query. + // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per + // query. LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); MemoryPool spyMemoryPool = Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));