diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index ba0c43c0edf..969d74417b8 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -32,7 +33,6 @@ #include #include #include - namespace DB { namespace FailPoints @@ -158,8 +158,55 @@ PhysicalPlanNodePtr PhysicalJoin::build( { build_key_names_map[original_build_key_names[i]] = build_key_names[i]; } - auto runtime_filter_list - = tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log); + + // Conservative correctness guard: + // If join key *protobuf field types* across sides are not compatible, skip runtime filter as early as possible + // to avoid wrong filtering / wasted work. + // + // Why here: + // - We haven't created/registered any RuntimeFilter yet. + // - We still have access to `tipb::Join` and can cheaply check original field types. + // + // NOTE: This is intentionally conservative. Join itself will still work because join keys are cast to a common + // type for execution, but RF's Set header/value normalization may not be safe under mismatched signed/unsigned + // or integer/decimal scenarios. + auto is_join_key_field_type_compatible = [&]() -> bool { + const int n = join.left_join_keys_size(); + if (n != join.right_join_keys_size()) + return false; + for (int i = 0; i < n; ++i) + { + if (unlikely( + !exprHasValidFieldType(join.left_join_keys(i)) || !exprHasValidFieldType(join.right_join_keys(i)))) + return false; + + const auto & lt = join.left_join_keys(i).field_type(); + const auto & rt = join.right_join_keys(i).field_type(); + + // If TiDB says the two sides are different basic tp, we don't try to be smart here. + if (lt.tp() != rt.tp()) + return false; + + // Signed/unsigned mismatch: when the tp is integer-like, TiDB encodes unsigned via flag. + // This is the known problematic case for RF(IN). + if (hasUnsignedFlag(lt) != hasUnsignedFlag(rt)) + return false; + } + return true; + }; + + const bool enable_runtime_filter = is_join_key_field_type_compatible(); + if (!enable_runtime_filter && !join.runtime_filter_list().empty()) + { + LOG_INFO( + log, + "Disable runtime filter for join {} due to join-side key type mismatch (left/right key field types differ)", + executor_id); + } + + auto runtime_filter_list = enable_runtime_filter + ? tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log) + : std::vector{}; LOG_DEBUG(log, "before register runtime filter list, list size:{}", runtime_filter_list.size()); context.getDAGContext()->runtime_filter_mgr.registerRuntimeFilterList(runtime_filter_list); diff --git a/dbms/src/Flash/tests/gtest_runtime_filter_disable_on_type_mismatch.cpp b/dbms/src/Flash/tests/gtest_runtime_filter_disable_on_type_mismatch.cpp new file mode 100644 index 00000000000..073a00ca86f --- /dev/null +++ b/dbms/src/Flash/tests/gtest_runtime_filter_disable_on_type_mismatch.cpp @@ -0,0 +1,132 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace tests +{ + +/// Coverage for the conservative guard in `PhysicalJoin::build`: +/// - If join-side key protobuf field types are not compatible, we should skip creating/registering runtime filter. +/// +/// Behavioral contract in this test: +/// - We still attach a RuntimeFilter request in the mock DAG. +/// - Because key types mismatch, the optimization should be disabled (i.e. no filtering happens). +/// - The join result should remain correct (same row count as “without runtime filter”). +class RuntimeFilterDisableOnTypeMismatchTestRunner : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + context.mockStorage()->setUseDeltaMerge(true); + } + + static constexpr size_t concurrency = 10; +}; + +#define WrapForRuntimeFilterTestBegin \ + std::vector pipelineBools{false, true}; \ + for (auto enablePipelineFlag : pipelineBools) \ + { \ + enablePipeline(enablePipelineFlag); + +#define WrapForRuntimeFilterTestEnd } + +TEST_F(RuntimeFilterDisableOnTypeMismatchTestRunner, DisableRuntimeFilterWhenJoinKeyFieldTypeMismatch) +try +{ + context.context->getSettingsRef().dt_segment_stable_pack_rows = 1; + context.context->getSettingsRef().dt_segment_limit_rows = 1; + context.context->getSettingsRef().dt_segment_delta_cache_limit_rows = 1; + context.context->getSettingsRef().dt_segment_force_split_size = 70; + context.context->getSettingsRef().enable_hash_join_v2 = false; + + // Probe(left) join key: Int32 + // Note: When using DeltaMerge in tests, the primary key column is expected to be representable by integer. + // So we add an explicit integer handle column `pk` as the primary key and keep `k1` as a normal column. + context.addMockDeltaMerge( + {"test_db", "left_table"}, + {{"pk", TiDB::TP::TypeLongLong, false}, {"k1", TiDB::TP::TypeLong}}, + {toVec("pk", {1, 2, 3}), toNullableVec("k1", {1, 2, 3})}, + concurrency); + + // Build(right) join key: Int64 (mismatch) + context.addExchangeReceiver( + "right_exchange_table_i64", + {{"k1", TiDB::TP::TypeLongLong}}, + {toNullableVec("k1", {2, 2, 3, 4})}); + + // Build(right) join key: UInt32 (mismatch due to unsigned flag) + context.addExchangeReceiver( + "right_exchange_table_u32", + {{"k1", TiDB::TP::TypeLong, true}}, + {toNullableVec("k1", {2, 2, 3, 4})}); + + WrapForRuntimeFilterTestBegin + { + // Baseline: without runtime filter. + auto request + = context.scan("test_db", "left_table") + .join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")}) + .build(context); + Expect expect{ + {"table_scan_0", {not_check_rows, not_check_concurrency}}, + {"exchange_receiver_1", {4, concurrency}}, + {"Join_2", {3, concurrency}}}; + testForExecutionSummary(request, expect); + } + + { + // With runtime filter requested, but type mismatch => runtime filter should be disabled. + mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0"); + auto request + = context.scan("test_db", "left_table", std::vector{1}) + .join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf) + .build(context); + // Expect no RF pruning, same as baseline. + Expect expect{ + {"table_scan_0", {not_check_rows, not_check_concurrency}}, + {"exchange_receiver_1", {4, concurrency}}, + {"Join_2", {3, concurrency}}}; + testForExecutionSummary(request, expect); + } + + { + // With runtime filter requested, but signed/unsigned mismatch => runtime filter should be disabled. + mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0"); + auto request + = context.scan("test_db", "left_table", std::vector{1}) + .join(context.receive("right_exchange_table_u32"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf) + .build(context); + // Expect no RF pruning, same as baseline. + Expect expect{ + {"table_scan_0", {not_check_rows, not_check_concurrency}}, + {"exchange_receiver_1", {4, concurrency}}, + {"Join_2", {3, concurrency}}}; + testForExecutionSummary(request, expect); + } + WrapForRuntimeFilterTestEnd +} +CATCH + +#undef WrapForRuntimeFilterTestBegin +#undef WrapForRuntimeFilterTestEnd + +} // namespace tests +} // namespace DB