Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
525 changes: 525 additions & 0 deletions be/benchmark/benchmark_column_read_order.hpp

Large diffs are not rendered by default.

478 changes: 478 additions & 0 deletions be/benchmark/benchmark_lazy_dict_decode.hpp

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions be/benchmark/benchmark_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
#include "benchmark_bit_pack.hpp"
#include "benchmark_bits.hpp"
#include "benchmark_block_bloom_filter.hpp"
#include "benchmark_column_read_order.hpp"
#include "benchmark_fastunion.hpp"
#include "benchmark_hll_merge.hpp"
#include "benchmark_lazy_dict_decode.hpp"
#include "benchmark_p1_decoder_opts.hpp"
#include "benchmark_parquet_dict_decoder.hpp"
#include "benchmark_string.hpp"
#include "binary_cast_benchmark.hpp"
#include "vec/columns/column_string.h"
Expand Down
531 changes: 531 additions & 0 deletions be/benchmark/benchmark_p1_decoder_opts.hpp

Large diffs are not rendered by default.

498 changes: 498 additions & 0 deletions be/benchmark/benchmark_parquet_dict_decoder.hpp

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,23 @@ DEFINE_mInt64(compaction_batch_size, "-1");
// filter wrong data.
DEFINE_mBool(enable_parquet_page_index, "true");

// Whether to push down filter bitmap to the parquet decoder layer for lazy index decoding.
DEFINE_mBool(enable_parquet_lazy_dict_decode, "true");

// Whether to enable predicate column read order optimization in parquet lazy read.
DEFINE_mBool(enable_parquet_predicate_column_reorder, "true");

// Whether to enable lazy dictionary decode for non-predicate (lazy) string columns in parquet.
DEFINE_mBool(enable_parquet_lazy_dict_decode_for_lazy_columns, "true");

// Whether to enable AVX2 SIMD dict gather in parquet dictionary decoding.
// Benchmark shows SIMD gather is slower than scalar for most dict sizes on Alder Lake.
DEFINE_mBool(enable_parquet_simd_dict_decode, "false");

// Whether to enable software prefetch hints for large dictionary decoding in parquet.
// Benchmark shows software prefetch competes with hardware prefetcher, causing regression.
DEFINE_mBool(enable_parquet_dict_prefetch, "false");

DEFINE_mBool(ignore_not_found_file_in_external_table, "true");

DEFINE_mBool(enable_hdfs_mem_limiter, "true");
Expand Down
24 changes: 24 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,30 @@ DECLARE_mInt64(compaction_batch_size);

DECLARE_mBool(enable_parquet_page_index);

// Whether to push down filter bitmap to the parquet decoder layer for lazy index decoding.
// When enabled and selectivity is low, FILTERED_CONTENT runs skip RLE index decoding
// instead of decoding all indexes upfront.
DECLARE_mBool(enable_parquet_lazy_dict_decode);

// Whether to enable predicate column read order optimization in parquet lazy read.
// When enabled, predicate columns are read one by one with intermediate filtering,
// so highly-selective columns filter rows early, reducing decode work for subsequent columns.
DECLARE_mBool(enable_parquet_predicate_column_reorder);

// Whether to enable lazy dictionary decode for non-predicate (lazy) string columns in parquet.
// When enabled, lazy string columns that are fully dictionary-encoded output int32 dict codes
// during Phase 2 read, then convert to strings only for rows surviving the filter.
DECLARE_mBool(enable_parquet_lazy_dict_decode_for_lazy_columns);

// Whether to enable AVX2 SIMD dict gather in parquet dictionary decoding.
// When enabled, INT32/FLOAT uses 8-wide AVX2 gather, INT64/DOUBLE uses 4-wide gather.
DECLARE_mBool(enable_parquet_simd_dict_decode);

// Whether to enable software prefetch hints for large dictionary decoding in parquet.
// When enabled and dictionary exceeds L2 cache threshold, prefetch hints are emitted
// to hide cache miss latency during dict gather (both SIMD and scalar paths).
DECLARE_mBool(enable_parquet_dict_prefetch);

// Wheather to ignore not found file in external teble(eg, hive)
// Default is true, if set to false, the not found file will result in query failure.
DECLARE_mBool(ignore_not_found_file_in_external_table);
Expand Down
73 changes: 73 additions & 0 deletions be/src/util/rle_encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,10 @@ class RleBatchDecoder {
// Returns the number of consumed values or 0 if an error occurred.
uint32_t GetBatch(T* values, uint32_t batch_num);

// Skip 'num_values' values without writing to any buffer.
// Returns the number of values actually skipped.
uint32_t SkipBatch(uint32_t num_values);

private:
// Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
// Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
Expand Down Expand Up @@ -885,5 +889,74 @@ uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) {
}
return num_consumed;
}

template <typename T>
uint32_t RleBatchDecoder<T>::SkipBatch(uint32_t num_values) {
DCHECK_GT(num_values, 0u);
uint32_t num_skipped = 0;
while (num_skipped < num_values) {
// Try to skip from repeated run first.
uint32_t num_repeats = NextNumRepeats();
if (num_repeats > 0) {
uint32_t to_skip = std::min(num_repeats, num_values - num_skipped);
// Consume repeats without writing any values.
GetRepeatedValue(to_skip);
num_skipped += to_skip;
continue;
}

// Try to skip from literal run.
uint32_t num_literals = NextNumLiterals();
if (num_literals == 0) {
// No more data.
break;
}
uint32_t to_skip = std::min(num_literals, num_values - num_skipped);
// Skip literals from the bit reader.
// First, consume any already-buffered literals.
if (HaveBufferedLiterals()) {
uint32_t buffered_skip = std::min(
to_skip, static_cast<uint32_t>(num_buffered_literals_ - literal_buffer_pos_));
literal_buffer_pos_ += buffered_skip;
literal_count_ -= buffered_skip;
to_skip -= buffered_skip;
num_skipped += buffered_skip;
}
// For remaining literals, skip using the same approach as GetLiteralValues:
// 1. Skip in multiples of 32 via bit_reader_.SkipBatch (always byte-aligned).
// 2. Buffer the remainder via FillLiteralBuffer, then advance buffer position.
// This is necessary because BatchedBitReader::SkipBatch requires
// (bit_width * num_values) to be divisible by 8, which is guaranteed for
// multiples of 32 but not for arbitrary counts.
if (to_skip > 0 && literal_count_ > 0) {
uint32_t direct_skip = std::min(to_skip, static_cast<uint32_t>(literal_count_));
// Skip in multiples of 32 (byte-aligned) directly in the bit reader.
int32_t num_to_bypass = std::min<int32_t>(
literal_count_, BitUtil::RoundDownToPowerOf2(static_cast<int32_t>(direct_skip),
static_cast<int32_t>(32)));
if (num_to_bypass > 0) {
if (UNLIKELY(!bit_reader_.SkipBatch(bit_width_, num_to_bypass))) {
return num_skipped;
}
literal_count_ -= num_to_bypass;
direct_skip -= num_to_bypass;
num_skipped += num_to_bypass;
}
// For any remainder (< 32 values), buffer them and advance past.
if (direct_skip > 0 && literal_count_ > 0) {
if (UNLIKELY(!FillLiteralBuffer())) {
return num_skipped;
}
uint32_t buffered_skip = std::min(
direct_skip,
static_cast<uint32_t>(num_buffered_literals_ - literal_buffer_pos_));
literal_buffer_pos_ += buffered_skip;
literal_count_ -= buffered_skip;
num_skipped += buffered_skip;
}
}
}
return num_skipped;
}
#include "common/compile_check_end.h"
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
}

Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector, bool is_dict_filter) {
ColumnSelectVector& select_vector, bool is_dict_filter,
const uint8_t* filter_data) {
if (select_vector.has_filter()) {
return _decode_values<true>(doris_column, data_type, select_vector, is_dict_filter);
} else {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/bool_plain_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class BoolPlainDecoder final : public Decoder {
}

Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector, bool is_dict_filter) override;
ColumnSelectVector& select_vector, bool is_dict_filter,
const uint8_t* filter_data = nullptr) override;

template <bool has_filter>
Status _decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ Status BoolRLEDecoder::skip_values(size_t num_values) {
}

Status BoolRLEDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector, bool is_dict_filter) {
ColumnSelectVector& select_vector, bool is_dict_filter,
const uint8_t* filter_data) {
if (select_vector.has_filter()) {
return _decode_values<true>(doris_column, data_type, select_vector, is_dict_filter);
} else {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/bool_rle_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class BoolRLEDecoder final : public Decoder {
Status set_data(Slice* slice) override;

Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector, bool is_dict_filter) override;
ColumnSelectVector& select_vector, bool is_dict_filter,
const uint8_t* filter_data = nullptr) override;

template <bool has_filter>
Status _decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
Expand Down
141 changes: 130 additions & 11 deletions be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <utility>

#include "common/compiler_util.h"
#include "common/config.h"
#include "util/coding.h"
#include "util/rle_encoding.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -69,6 +71,12 @@ Status ByteArrayDictDecoder::set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32
if (offset_cursor != length) {
return Status::Corruption("Wrong dictionary data for byte array type");
}
// P1-5: Check if dictionary data exceeds L2 cache threshold.
// For string dicts, the relevant size is _dict_items (StringRef array) + _dict_data (string bodies).
// Typical L2 cache: 256KB-1MB per core. Use 256KB as conservative threshold.
constexpr size_t L2_CACHE_THRESHOLD = 256 * 1024;
size_t dict_memory = _dict_items.size() * sizeof(StringRef) + _dict_data.size();
_dict_exceeds_l2_cache = dict_memory > L2_CACHE_THRESHOLD;
return Status::OK();
}

Expand All @@ -91,18 +99,21 @@ MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
}

Status ByteArrayDictDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector, bool is_dict_filter) {
ColumnSelectVector& select_vector, bool is_dict_filter,
const uint8_t* filter_data) {
if (select_vector.has_filter()) {
return _decode_values<true>(doris_column, data_type, select_vector, is_dict_filter);
return _decode_values<true>(doris_column, data_type, select_vector, is_dict_filter,
filter_data);
} else {
return _decode_values<false>(doris_column, data_type, select_vector, is_dict_filter);
return _decode_values<false>(doris_column, data_type, select_vector, is_dict_filter,
nullptr);
}
}

template <bool has_filter>
Status ByteArrayDictDecoder::_decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector,
bool is_dict_filter) {
ColumnSelectVector& select_vector, bool is_dict_filter,
const uint8_t* filter_data) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (doris_column->is_column_dictionary()) {
ColumnDictI32& dict_column = assert_cast<ColumnDictI32&>(*doris_column);
Expand All @@ -113,6 +124,21 @@ Status ByteArrayDictDecoder::_decode_values(MutableColumnPtr& doris_column, Data
cast_set<uint32_t>(_dict_items.size()));
}
}

// When filter_data is provided and has_filter is true, use lazy index decoding:
// decode indexes per-run and skip FILTERED_CONTENT via SkipBatch.
// This avoids decoding RLE indexes for rows that will be discarded.
if constexpr (has_filter) {
if (filter_data != nullptr) {
if (doris_column->is_column_dictionary() || is_dict_filter) {
// For dict-filter path, we still need all indexes.
// Fall through to bulk decode below.
} else {
return _lazy_decode_string_values(doris_column, select_vector);
}
}
}

_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(_indexes.data(), cast_set<uint32_t>(non_null_size));

Expand All @@ -126,13 +152,42 @@ Status ByteArrayDictDecoder::_decode_values(MutableColumnPtr& doris_column, Data
while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
if (config::enable_parquet_simd_dict_decode) {
// P1-4: Use reusable buffer to avoid per-run heap allocation.
_string_values_buf.resize(run_length);
constexpr size_t PREFETCH_DISTANCE = 8;
for (size_t i = 0; i < run_length; ++i) {
// P1-5: Software prefetch for large dictionaries (separate config)
if (_dict_exceeds_l2_cache && config::enable_parquet_dict_prefetch &&
i + PREFETCH_DISTANCE < run_length) {
PREFETCH(&_dict_items[_indexes[dict_index + PREFETCH_DISTANCE]]);
}
_string_values_buf[i] = _dict_items[_indexes[dict_index++]];
}
doris_column->insert_many_strings_overflow(_string_values_buf.data(), run_length,
_max_value_length);
} else if (_dict_exceeds_l2_cache && config::enable_parquet_dict_prefetch) {
// P1-5 only: scalar path with software prefetch for large dicts
std::vector<StringRef> string_values;
string_values.reserve(run_length);
constexpr size_t PREFETCH_DISTANCE = 8;
for (size_t i = 0; i < run_length; ++i) {
if (i + PREFETCH_DISTANCE < run_length) {
PREFETCH(&_dict_items[_indexes[dict_index + PREFETCH_DISTANCE]]);
}
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
}
doris_column->insert_many_strings_overflow(string_values.data(), run_length,
_max_value_length);
} else {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
}
doris_column->insert_many_strings_overflow(string_values.data(), run_length,
_max_value_length);
}
doris_column->insert_many_strings_overflow(string_values.data(), run_length,
_max_value_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
Expand All @@ -151,6 +206,70 @@ Status ByteArrayDictDecoder::_decode_values(MutableColumnPtr& doris_column, Data
}
return Status::OK();
}
Status ByteArrayDictDecoder::_lazy_decode_string_values(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run<true>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
// Decode only the indexes needed for this CONTENT run.
_indexes.resize(run_length);
_index_batch_decoder->GetBatch(_indexes.data(), cast_set<uint32_t>(run_length));
if (config::enable_parquet_simd_dict_decode) {
// P1-4: Reusable buffer + P1-5: software prefetch for lazy path
_string_values_buf.resize(run_length);
constexpr size_t PREFETCH_DISTANCE = 8;
for (size_t i = 0; i < run_length; ++i) {
if (_dict_exceeds_l2_cache && config::enable_parquet_dict_prefetch &&
i + PREFETCH_DISTANCE < run_length) {
PREFETCH(&_dict_items[_indexes[i + PREFETCH_DISTANCE]]);
}
_string_values_buf[i] = _dict_items[_indexes[i]];
}
doris_column->insert_many_strings_overflow(_string_values_buf.data(), run_length,
_max_value_length);
} else if (_dict_exceeds_l2_cache && config::enable_parquet_dict_prefetch) {
// P1-5 only: scalar path with software prefetch for lazy path
std::vector<StringRef> string_values;
string_values.reserve(run_length);
constexpr size_t PREFETCH_DISTANCE = 8;
for (size_t i = 0; i < run_length; ++i) {
if (i + PREFETCH_DISTANCE < run_length) {
PREFETCH(&_dict_items[_indexes[i + PREFETCH_DISTANCE]]);
}
string_values.emplace_back(_dict_items[_indexes[i]]);
}
doris_column->insert_many_strings_overflow(string_values.data(), run_length,
_max_value_length);
} else {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
string_values.emplace_back(_dict_items[_indexes[i]]);
}
doris_column->insert_many_strings_overflow(string_values.data(), run_length,
_max_value_length);
}
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
// Skip indexes in the RLE stream without decoding them.
_index_batch_decoder->SkipBatch(cast_set<uint32_t>(run_length));
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// No indexes to skip for null values.
break;
}
}
}
return Status::OK();
}

#include "common/compile_check_end.h"

} // namespace doris::vectorized
Loading
Loading