diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index ec854da306c48..555cf9efe5ad9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -204,7 +204,8 @@ public TsBlock remove() { localPlanNodeId, tsBlock.getSizeInBytes()); bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes(); - // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to + // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event + // to // corresponding LocalSinkChannel. if (sinkChannel != null) { sinkChannel.checkAndInvokeOnFinished(); @@ -253,6 +254,16 @@ public ListenableFuture add(TsBlock tsBlock) { blockedOnMemory.addListener( () -> { synchronized (this) { + // If the queue has been closed or aborted before this listener executes, + // we must not add the TsBlock. The memory reserved for this TsBlock has + // already been freed by abort()/close() via bufferRetainedSizeInBytes. + // Adding it would cause a downstream NPE in MemoryPool.free() when + // the consumer calls remove(), because the upstream FI's memory mapping + // has already been deregistered. + if (closed) { + channelBlocked.set(null); + return; + } queue.add(tsBlock); if (!blocked.isDone()) { blocked.set(null); @@ -262,8 +273,10 @@ public ListenableFuture add(TsBlock tsBlock) { }, // Use directExecutor() here could lead to deadlock. Thread A holds lock of // SharedTsBlockQueueA and tries to invoke the listener of - // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) while - // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener of + // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) + // while + // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener + // of // SharedTsBlockQueueA executorService); return channelBlocked; @@ -303,13 +316,18 @@ public void close() { bufferRetainedSizeInBytes = 0; } if (sinkChannel != null) { - // attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we close - // LocalSourceHandle(with limit clause it's possible) before constructing the corresponding + // attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we + // close + // LocalSourceHandle(with limit clause it's possible) before constructing the + // corresponding // LocalSinkChannel. - // If this close method is invoked by LocalSourceHandle, listener of LocalSourceHandle will - // remove the LocalSourceHandle from the map of MppDataExchangeManager and later when + // If this close method is invoked by LocalSourceHandle, listener of + // LocalSourceHandle will + // remove the LocalSourceHandle from the map of MppDataExchangeManager and later + // when // LocalSinkChannel is initialized, it will construct a new SharedTsBlockQueue. - // It is still safe that we let the LocalSourceHandle close successfully in this case. Because + // It is still safe that we let the LocalSourceHandle close successfully in this + // case. Because // the QueryTerminator will do the final cleaning logic. sinkChannel.close(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java index c39175f6c972d..46196d1c990d1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java @@ -24,7 +24,10 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.lang3.Validate; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -36,12 +39,89 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; public class SharedTsBlockQueueTest { + + /** + * Test that when add() goes into the async listener path (memory blocked) and the queue is + * aborted before the listener fires, the listener does NOT add the TsBlock to the closed queue. + * This reproduces the race condition that caused NPE in MemoryPool.free(). + */ + @Test + public void testAsyncListenerAfterAbortDoesNotAddTsBlock() { + final String queryId = "q0"; + final long mockTsBlockSize = 1024L; + final TFragmentInstanceId fragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); + final String planNodeId = "test"; + + // Use a SettableFuture to manually control when the blocked-on-memory future + // completes. + SettableFuture manualFuture = SettableFuture.create(); + + // Create a mock MemoryPool that returns the manually-controlled future + // (simulating blocked). + LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); + MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class); + Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool); + + // reserve() returns (manualFuture, false) — simulating memory blocked + Mockito.when( + mockMemoryPool.reserve( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyLong(), + Mockito.anyLong())) + .thenReturn(new Pair<>(manualFuture, Boolean.FALSE)); + // tryCancel returns 0 — simulating future already completed (can't cancel) + Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L); + + // Use a direct executor so that when we complete manualFuture, the listener + // runs immediately. + SharedTsBlockQueue queue = + new SharedTsBlockQueue( + fragmentInstanceId, planNodeId, mockLocalMemoryManager, newDirectExecutorService()); + queue.getCanAddTsBlock().set(null); + queue.setMaxBytesCanReserve(Long.MAX_VALUE); + + TsBlock mockTsBlock = Utils.createMockTsBlock(mockTsBlockSize); + + // Step 1: add() goes into async path — listener is registered on manualFuture. + // reserve() returns (manualFuture, false), so the TsBlock is NOT yet added to + // the queue. + ListenableFuture addFuture; + synchronized (queue) { + addFuture = queue.add(mockTsBlock); + } + // The addFuture (channelBlocked) should not be done yet + Assert.assertFalse(addFuture.isDone()); + // Queue should be empty — TsBlock is waiting for memory + Assert.assertTrue(queue.isEmpty()); + + // Step 2: Abort the queue (simulates upstream FI state change listener calling + // abort) + synchronized (queue) { + queue.abort(); + } + Assert.assertTrue(queue.isClosed()); + + // Step 3: Now complete the manualFuture — this triggers the async listener. + // Before the fix, this would add the TsBlock to the closed queue. + // After the fix, the listener detects closed==true and returns without adding. + manualFuture.set(null); + + // Verify: queue should still be empty (TsBlock was NOT added to the closed + // queue) + Assert.assertTrue(queue.isEmpty()); + // The channelBlocked future should be completed (no hang) + Assert.assertTrue(addFuture.isDone()); + } + @Test(timeout = 15000L) public void concurrencyTest() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; - // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query. + // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per + // query. LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); MemoryPool spyMemoryPool = Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));