Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Pipeline/PipelineBuilder.h>
Expand All @@ -32,7 +33,6 @@
#include <Interpreters/Context.h>
#include <common/logger_useful.h>
#include <fmt/format.h>

namespace DB
{
namespace FailPoints
Expand Down Expand Up @@ -158,8 +158,55 @@ PhysicalPlanNodePtr PhysicalJoin::build(
{
build_key_names_map[original_build_key_names[i]] = build_key_names[i];
}
auto runtime_filter_list
= tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log);

// Conservative correctness guard:
// If join key *protobuf field types* across sides are not compatible, skip runtime filter as early as possible
// to avoid wrong filtering / wasted work.
//
// Why here:
// - We haven't created/registered any RuntimeFilter yet.
// - We still have access to `tipb::Join` and can cheaply check original field types.
//
// NOTE: This is intentionally conservative. Join itself will still work because join keys are cast to a common
// type for execution, but RF's Set header/value normalization may not be safe under mismatched signed/unsigned
// or integer/decimal scenarios.
auto is_join_key_field_type_compatible = [&]() -> bool {
const int n = join.left_join_keys_size();
if (n != join.right_join_keys_size())
return false;
for (int i = 0; i < n; ++i)
{
if (unlikely(
!exprHasValidFieldType(join.left_join_keys(i)) || !exprHasValidFieldType(join.right_join_keys(i))))
return false;

const auto & lt = join.left_join_keys(i).field_type();
const auto & rt = join.right_join_keys(i).field_type();

// If TiDB says the two sides are different basic tp, we don't try to be smart here.
if (lt.tp() != rt.tp())
return false;

// Signed/unsigned mismatch: when the tp is integer-like, TiDB encodes unsigned via flag.
// This is the known problematic case for RF(IN).
if (hasUnsignedFlag(lt) != hasUnsignedFlag(rt))
return false;
}
return true;
};

const bool enable_runtime_filter = is_join_key_field_type_compatible();
if (!enable_runtime_filter && !join.runtime_filter_list().empty())
{
LOG_INFO(
log,
"Disable runtime filter for join {} due to join-side key type mismatch (left/right key field types differ)",
executor_id);
}

auto runtime_filter_list = enable_runtime_filter
? tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log)
: std::vector<RuntimeFilterPtr>{};
LOG_DEBUG(log, "before register runtime filter list, list size:{}", runtime_filter_list.size());
context.getDAGContext()->runtime_filter_mgr.registerRuntimeFilterList(runtime_filter_list);

Expand Down
132 changes: 132 additions & 0 deletions dbms/src/Flash/tests/gtest_runtime_filter_disable_on_type_mismatch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Debug/MockRuntimeFilter.h>
#include <Interpreters/Context.h>
#include <TestUtils/ExecutorTestUtils.h>

namespace DB
{
namespace tests
{

/// Coverage for the conservative guard in `PhysicalJoin::build`:
/// - If join-side key protobuf field types are not compatible, we should skip creating/registering runtime filter.
///
/// Behavioral contract in this test:
/// - We still attach a RuntimeFilter request in the mock DAG.
/// - Because key types mismatch, the optimization should be disabled (i.e. no filtering happens).
/// - The join result should remain correct (same row count as “without runtime filter”).
class RuntimeFilterDisableOnTypeMismatchTestRunner : public DB::tests::ExecutorTest
{
public:
void initializeContext() override
{
ExecutorTest::initializeContext();
context.mockStorage()->setUseDeltaMerge(true);
}

static constexpr size_t concurrency = 10;
};

#define WrapForRuntimeFilterTestBegin \
std::vector<bool> pipelineBools{false, true}; \
for (auto enablePipelineFlag : pipelineBools) \
{ \
enablePipeline(enablePipelineFlag);

#define WrapForRuntimeFilterTestEnd }

TEST_F(RuntimeFilterDisableOnTypeMismatchTestRunner, DisableRuntimeFilterWhenJoinKeyFieldTypeMismatch)
try
{
context.context->getSettingsRef().dt_segment_stable_pack_rows = 1;
context.context->getSettingsRef().dt_segment_limit_rows = 1;
context.context->getSettingsRef().dt_segment_delta_cache_limit_rows = 1;
context.context->getSettingsRef().dt_segment_force_split_size = 70;
context.context->getSettingsRef().enable_hash_join_v2 = false;

// Probe(left) join key: Int32
// Note: When using DeltaMerge in tests, the primary key column is expected to be representable by integer.
// So we add an explicit integer handle column `pk` as the primary key and keep `k1` as a normal column.
context.addMockDeltaMerge(
{"test_db", "left_table"},
{{"pk", TiDB::TP::TypeLongLong, false}, {"k1", TiDB::TP::TypeLong}},
{toVec<Int64>("pk", {1, 2, 3}), toNullableVec<Int32>("k1", {1, 2, 3})},
concurrency);

// Build(right) join key: Int64 (mismatch)
context.addExchangeReceiver(
"right_exchange_table_i64",
{{"k1", TiDB::TP::TypeLongLong}},
{toNullableVec<Int64>("k1", {2, 2, 3, 4})});

// Build(right) join key: UInt32 (mismatch due to unsigned flag)
context.addExchangeReceiver(
"right_exchange_table_u32",
{{"k1", TiDB::TP::TypeLong, true}},
{toNullableVec<UInt32>("k1", {2, 2, 3, 4})});

WrapForRuntimeFilterTestBegin
{
// Baseline: without runtime filter.
auto request
= context.scan("test_db", "left_table")
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")})
.build(context);
Expect expect{
{"table_scan_0", {not_check_rows, not_check_concurrency}},
{"exchange_receiver_1", {4, concurrency}},
{"Join_2", {3, concurrency}}};
testForExecutionSummary(request, expect);
}

{
// With runtime filter requested, but type mismatch => runtime filter should be disabled.
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
auto request
= context.scan("test_db", "left_table", std::vector<int>{1})
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
.build(context);
// Expect no RF pruning, same as baseline.
Expect expect{
{"table_scan_0", {not_check_rows, not_check_concurrency}},
{"exchange_receiver_1", {4, concurrency}},
{"Join_2", {3, concurrency}}};
testForExecutionSummary(request, expect);
}

{
// With runtime filter requested, but signed/unsigned mismatch => runtime filter should be disabled.
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
auto request
= context.scan("test_db", "left_table", std::vector<int>{1})
.join(context.receive("right_exchange_table_u32"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
.build(context);
// Expect no RF pruning, same as baseline.
Expect expect{
{"table_scan_0", {not_check_rows, not_check_concurrency}},
{"exchange_receiver_1", {4, concurrency}},
{"Join_2", {3, concurrency}}};
testForExecutionSummary(request, expect);
}
WrapForRuntimeFilterTestEnd
}
CATCH

#undef WrapForRuntimeFilterTestBegin
#undef WrapForRuntimeFilterTestEnd

} // namespace tests
} // namespace DB