diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 1ce074ecf1bb43..ab039e9e9b667d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -651,4 +651,13 @@ public TableScanParams getScanParams() { } return this.scanParams; } + + protected long applyMaxFileSplitNumLimit(long targetSplitSize, long totalFileSize) { + int maxFileSplitNum = sessionVariable.getMaxFileSplitNum(); + if (maxFileSplitNum <= 0 || totalFileSize <= 0) { + return targetSplitSize; + } + long minSplitSizeForMaxNum = (totalFileSize + maxFileSplitNum - 1L) / (long) maxFileSplitNum; + return Math.max(targetSplitSize, minSplitSizeForMaxNum); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 4c18d1ee5051b3..41dc478c38c03b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -358,18 +358,22 @@ private long determineTargetFileSplitSize(List fileCaches, } long result = sessionVariable.getMaxInitialSplitSize(); long totalFileSize = 0; + boolean exceedInitialThreshold = false; for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { if (fileCacheValue.getFiles() == null) { continue; } for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { totalFileSize += status.getLength(); - if (totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { - result = sessionVariable.getMaxSplitSize(); - break; + if (!exceedInitialThreshold + && totalFileSize >= sessionVariable.getMaxSplitSize() + * sessionVariable.getMaxInitialSplitNum()) { + exceedInitialThreshold = true; } } } + result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result; + result = applyMaxFileSplitNumLimit(result, totalFileSize); return result; } @@ -635,4 +639,3 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index f639a46c092e46..982fe4a7e2c36f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -476,14 +476,18 @@ private CloseableIterable splitFiles(TableScan scan) { private long determineTargetFileSplitSize(Iterable tasks) { long result = sessionVariable.getMaxInitialSplitSize(); long accumulatedTotalFileSize = 0; + boolean exceedInitialThreshold = false; for (FileScanTask task : tasks) { accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file()); - if (accumulatedTotalFileSize + if (!exceedInitialThreshold && accumulatedTotalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { - result = sessionVariable.getMaxSplitSize(); - break; + exceedInitialThreshold = true; } } + result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result; + if (!isBatchMode()) { + result = applyMaxFileSplitNumLimit(result, accumulatedTotalFileSize); + } return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index d8db43c21fda76..3004e9dc027a95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -436,6 +436,7 @@ private long determineTargetFileSplitSize(List dataSplits, } long result = sessionVariable.getMaxInitialSplitSize(); long totalFileSize = 0; + boolean exceedInitialThreshold = false; for (DataSplit dataSplit : dataSplits) { Optional> rawFiles = dataSplit.convertToRawFiles(); if (!supportNativeReader(rawFiles)) { @@ -443,13 +444,14 @@ private long determineTargetFileSplitSize(List dataSplits, } for (RawFile rawFile : rawFiles.get()) { totalFileSize += rawFile.fileSize(); - if (totalFileSize + if (!exceedInitialThreshold && totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { - result = sessionVariable.getMaxSplitSize(); - break; + exceedInitialThreshold = true; } } } + result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result; + result = applyMaxFileSplitNumLimit(result, totalFileSize); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index 543575689daec4..12db0b7c1bf652 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -174,14 +174,16 @@ private long determineTargetFileSplitSize(List fileStatuses) } long result = sessionVariable.getMaxInitialSplitSize(); long totalFileSize = 0; + boolean exceedInitialThreshold = false; for (TBrokerFileStatus fileStatus : fileStatuses) { totalFileSize += fileStatus.getSize(); - if (totalFileSize - >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { - result = sessionVariable.getMaxSplitSize(); - break; + if (!exceedInitialThreshold + && totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + exceedInitialThreshold = true; } } + result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result; + result = applyMaxFileSplitNumLimit(result, totalFileSize); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index de1cf3b3ce37b6..f35551fac7262e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -522,6 +522,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_INITIAL_FILE_SPLIT_NUM = "max_initial_file_split_num"; + public static final String MAX_FILE_SPLIT_NUM = "max_file_split_num"; + // Target file size in bytes for Iceberg write operations public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES = "iceberg_write_target_file_size_bytes"; @@ -2225,6 +2227,13 @@ public boolean isEnableHboNonStrictMatchingMode() { needForward = true) public int maxInitialSplitNum = 200; + @VariableMgr.VarAttr( + name = MAX_FILE_SPLIT_NUM, + description = {"在非 batch 模式下,每个 table scan 最大允许的 split 数量,防止产生过多 split 导致 OOM。", + "In non-batch mode, the maximum number of splits allowed per table scan to avoid OOM."}, + needForward = true) + public int maxFileSplitNum = 100000; + // Target file size for Iceberg write operations // Default 0 means use config::iceberg_sink_max_file_size @VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES, needForward = true) @@ -4308,6 +4317,14 @@ public void setMaxInitialSplitNum(int maxInitialSplitNum) { this.maxInitialSplitNum = maxInitialSplitNum; } + public int getMaxFileSplitNum() { + return maxFileSplitNum; + } + + public void setMaxFileSplitNum(int maxFileSplitNum) { + this.maxFileSplitNum = maxFileSplitNum; + } + public long getIcebergWriteTargetFileSizeBytes() { return icebergWriteTargetFileSizeBytes; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java new file mode 100644 index 00000000000000..8b1d98e509a904 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TFileFormatType; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class FileQueryScanNodeTest { + private static final long MB = 1024L * 1024L; + + private static class TestFileQueryScanNode extends FileQueryScanNode { + TestFileQueryScanNode(SessionVariable sv) { + super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), "test", + StatisticalType.TEST_EXTERNAL_TABLE, false, sv); + } + + @Override + protected TFileFormatType getFileFormatType() throws UserException { + return TFileFormatType.FORMAT_ORC; + } + + @Override + protected List getPathPartitionKeys() throws UserException { + return Collections.emptyList(); + } + + @Override + protected TableIf getTargetTable() throws UserException { + return null; + } + + @Override + protected Map getLocationProperties() throws UserException { + return Collections.emptyMap(); + } + } + + @Test + public void testApplyMaxFileSplitNumLimitRaisesTargetSize() { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(100); + TestFileQueryScanNode node = new TestFileQueryScanNode(sv); + long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB); + Assert.assertEquals(100 * MB, target); + } + + @Test + public void testApplyMaxFileSplitNumLimitKeepsTargetSizeWhenSmall() { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(100); + TestFileQueryScanNode node = new TestFileQueryScanNode(sv); + long target = node.applyMaxFileSplitNumLimit(32 * MB, 500L * MB); + Assert.assertEquals(32 * MB, target); + } + + @Test + public void testApplyMaxFileSplitNumLimitDisabled() { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(0); + TestFileQueryScanNode node = new TestFileQueryScanNode(sv); + long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB); + Assert.assertEquals(32 * MB, target); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java new file mode 100644 index 00000000000000..727ff9390032ab --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +public class HiveScanNodeTest { + private static final long MB = 1024L * 1024L; + + @Test + public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws Exception { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(100); + TupleDescriptor desc = new TupleDescriptor(new TupleId(0)); + HMSExternalTable table = Mockito.mock(HMSExternalTable.class); + HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class); + Mockito.when(table.getCatalog()).thenReturn(catalog); + Mockito.when(catalog.bindBrokerName()).thenReturn(""); + desc.setTable(table); + HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, sv, null); + + HiveMetaStoreCache.FileCacheValue fileCacheValue = new HiveMetaStoreCache.FileCacheValue(); + HiveMetaStoreCache.HiveFileStatus status = new HiveMetaStoreCache.HiveFileStatus(); + status.setLength(10_000L * MB); + fileCacheValue.getFiles().add(status); + List caches = Collections.singletonList(fileCacheValue); + + Method method = HiveScanNode.class.getDeclaredMethod( + "determineTargetFileSplitSize", List.class, boolean.class); + method.setAccessible(true); + long target = (long) method.invoke(node, caches, false); + Assert.assertEquals(100 * MB, target); + } + + @Test + public void testDetermineTargetFileSplitSizeKeepsInitialSize() throws Exception { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(100); + TupleDescriptor desc = new TupleDescriptor(new TupleId(0)); + HMSExternalTable table = Mockito.mock(HMSExternalTable.class); + HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class); + Mockito.when(table.getCatalog()).thenReturn(catalog); + Mockito.when(catalog.bindBrokerName()).thenReturn(""); + desc.setTable(table); + HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, sv, null); + + HiveMetaStoreCache.FileCacheValue fileCacheValue = new HiveMetaStoreCache.FileCacheValue(); + HiveMetaStoreCache.HiveFileStatus status = new HiveMetaStoreCache.HiveFileStatus(); + status.setLength(500L * MB); + fileCacheValue.getFiles().add(status); + List caches = Collections.singletonList(fileCacheValue); + + Method method = HiveScanNode.class.getDeclaredMethod( + "determineTargetFileSplitSize", List.class, boolean.class); + method.setAccessible(true); + long target = (long) method.invoke(node, caches, false); + Assert.assertEquals(32 * MB, target); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java new file mode 100644 index 00000000000000..48031a2303e6b2 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.util.ScanTaskUtil; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Method; +import java.util.Collections; + +public class IcebergScanNodeTest { + private static final long MB = 1024L * 1024L; + + private static class TestIcebergScanNode extends IcebergScanNode { + TestIcebergScanNode(SessionVariable sv) { + super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv); + } + + @Override + public boolean isBatchMode() { + return false; + } + } + + @Test + public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws Exception { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(100); + TestIcebergScanNode node = new TestIcebergScanNode(sv); + + DataFile dataFile = Mockito.mock(DataFile.class); + Mockito.when(dataFile.fileSizeInBytes()).thenReturn(10_000L * MB); + FileScanTask task = Mockito.mock(FileScanTask.class); + Mockito.when(task.file()).thenReturn(dataFile); + Mockito.when(task.length()).thenReturn(10_000L * MB); + + try (org.mockito.MockedStatic mockedScanTaskUtil = + Mockito.mockStatic(ScanTaskUtil.class)) { + mockedScanTaskUtil.when(() -> ScanTaskUtil.contentSizeInBytes(dataFile)) + .thenReturn(10_000L * MB); + + Method method = IcebergScanNode.class.getDeclaredMethod("determineTargetFileSplitSize", Iterable.class); + method.setAccessible(true); + long target = (long) method.invoke(node, Collections.singletonList(task)); + Assert.assertEquals(100 * MB, target); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java index 692a0db12caa63..09795c53910436 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -31,6 +31,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.RawFile; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,6 +40,7 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -381,6 +383,29 @@ public void testValidateIncrementalReadParams() throws UserException { } } + @Test + public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws Exception { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(100); + PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), false, sv); + + PaimonSource source = Mockito.mock(PaimonSource.class); + Mockito.when(source.getFileFormatFromTableProperties()).thenReturn("parquet"); + node.setSource(source); + + RawFile rawFile = Mockito.mock(RawFile.class); + Mockito.when(rawFile.path()).thenReturn("file.parquet"); + Mockito.when(rawFile.fileSize()).thenReturn(10_000L * 1024L * 1024L); + + DataSplit dataSplit = Mockito.mock(DataSplit.class); + Mockito.when(dataSplit.convertToRawFiles()).thenReturn(Optional.of(Collections.singletonList(rawFile))); + + Method method = PaimonScanNode.class.getDeclaredMethod("determineTargetFileSplitSize", List.class, boolean.class); + method.setAccessible(true); + long target = (long) method.invoke(node, Collections.singletonList(dataSplit), false); + Assert.assertEquals(100L * 1024L * 1024L, target); + } + private void mockJniReader(PaimonScanNode spyNode) { Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java new file mode 100644 index 00000000000000..8d591362376f3f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.tvf.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.FunctionGenTable; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.thrift.TBrokerFileStatus; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +public class TVFScanNodeTest { + private static final long MB = 1024L * 1024L; + + @Test + public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws Exception { + SessionVariable sv = new SessionVariable(); + sv.setMaxFileSplitNum(100); + TupleDescriptor desc = new TupleDescriptor(new TupleId(0)); + FunctionGenTable table = Mockito.mock(FunctionGenTable.class); + ExternalFileTableValuedFunction tvf = Mockito.mock(ExternalFileTableValuedFunction.class); + Mockito.when(table.getTvf()).thenReturn(tvf); + desc.setTable(table); + TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv); + + TBrokerFileStatus status = new TBrokerFileStatus(); + status.setSize(10_000L * MB); + List statuses = Collections.singletonList(status); + + Method method = TVFScanNode.class.getDeclaredMethod("determineTargetFileSplitSize", List.class); + method.setAccessible(true); + long target = (long) method.invoke(node, statuses); + Assert.assertEquals(100 * MB, target); + } +}