Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ message ListFieldsRequest {
optional int64 start_timestamp = 3;
optional int64 end_timestamp = 4;

// JSON-serialized QueryAst for index_filter support.
// When provided, only fields from documents matching this query are returned.
optional string query_ast = 5;

// Control if the request will fail if split_ids contains a split that does not exist.
// optional bool fail_on_missing_index = 6;
}
Expand All @@ -142,6 +146,13 @@ message LeafListFieldsRequest {
// Wildcard expressions are supported.
repeated string fields = 4;

// JSON-serialized QueryAst for index_filter support.
// When provided, only splits containing documents matching this query are included.
optional string query_ast = 5;

// JSON-serialized DocMapper for query execution.
// Required when query_ast is provided to build and execute the query.
optional string doc_mapper = 6;
}

message ListFieldsResponse {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

209 changes: 185 additions & 24 deletions quickwit/quickwit-search/src/list_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@ use itertools::Itertools;
use quickwit_common::rate_limited_warn;
use quickwit_common::shared_consts::{FIELD_PRESENCE_FIELD_NAME, SPLIT_FIELDS_FILE_NAME};
use quickwit_common::uri::Uri;
use quickwit_config::build_doc_mapper;
use quickwit_doc_mapper::DocMapper;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_metastore::SplitMetadata;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::search::{
LeafListFieldsRequest, ListFields, ListFieldsEntryResponse, ListFieldsRequest,
ListFieldsResponse, SplitIdAndFooterOffsets, deserialize_split_fields,
};
use quickwit_proto::types::{IndexId, IndexUid};
use quickwit_storage::Storage;
use quickwit_query::query_ast::QueryAst;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::ReloadPolicy;

use crate::leaf::open_split_bundle;
use crate::leaf::{open_index_with_caches, open_split_bundle, warmup};
use crate::search_job_placer::group_jobs_by_index_id;
use crate::service::SearcherContext;
use crate::{
Expand Down Expand Up @@ -309,20 +314,127 @@ impl FieldPattern {
}
}

/// Checks if any documents in the split match the query.
/// Returns true if at least one document matches, false otherwise.
///
/// This is a lightweight query execution that only counts matches without
/// materializing documents, used for split-level filtering in field capabilities.
Comment on lines +320 to +321
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not qualify "count" as lightweight here. It will actually involve downloading every chunk of the split (fast field, index including postings) that are necessary to solve the query. I don't think this is the original intent of the index_filter parameter on the endpoint. Can explain for which usecase it is worth going so deep in the split inspection?

async fn split_matches_query(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
split: &SplitIdAndFooterOffsets,
doc_mapper: &DocMapper,
query_ast: &QueryAst,
) -> crate::Result<bool> {
let byte_range_cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
// Open split with caches
let (index, _hot_directory) = open_index_with_caches(
searcher_context,
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
Some(byte_range_cache),
)
.await?;

// Create searcher with manual reload policy
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.map_err(|err| SearchError::Internal(format!("failed to create index reader: {err}")))?;
let searcher = reader.searcher();

// Build query from QueryAst
let (query, mut warmup_info) = doc_mapper
.query(searcher.schema().clone(), query_ast.clone(), false, None)
.map_err(|err| SearchError::InvalidQuery(format!("failed to build query: {err}")))?;

// Warmup to ensure all bytes are fetched asynchronously before sync search
warmup_info.simplify();
warmup(&searcher, &warmup_info)
.await
.map_err(|err| SearchError::Internal(format!("failed to warmup query: {err}")))?;

// Check if any docs match (lightweight count)
let count = search_thread_pool()
.run_cpu_intensive(move || {
query
.count(&searcher)
.map_err(|err| SearchError::Internal(format!("failed to count matches: {err}")))
})
.await
.map_err(|_| SearchError::Internal("split matches query panicked".to_string()))??;

Ok(count > 0)
}

/// `leaf` step of list fields.
///
/// Returns field metadata from the assigned splits. When `query_ast` and `doc_mapper_str`
/// are provided, splits are filtered to only include those containing at least one
/// matching document (lightweight query execution for split-level filtering).
pub async fn leaf_list_fields(
index_id: IndexId,
index_storage: Arc<dyn Storage>,
searcher_context: &SearcherContext,
split_ids: &[SplitIdAndFooterOffsets],
field_patterns_str: &[String],
query_ast_str: Option<&str>,
doc_mapper_str: Option<&str>,
) -> crate::Result<ListFieldsResponse> {
let field_patterns: Vec<FieldPattern> = field_patterns_str
.iter()
.map(|pattern_str| FieldPattern::from_str(pattern_str))
.collect::<crate::Result<_>>()?;

let single_split_list_fields_futures: Vec<_> = split_ids
// If no splits, return empty response
if split_ids.is_empty() {
return Ok(ListFieldsResponse { fields: Vec::new() });
}

// Filter splits based on query if both query_ast and doc_mapper are provided
let matching_splits: Vec<&SplitIdAndFooterOffsets> = match (query_ast_str, doc_mapper_str) {
(Some(ast_json), Some(mapper_json)) => {
let query_ast: QueryAst = serde_json::from_str(ast_json)
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;
let doc_mapper = crate::service::deserialize_doc_mapper(mapper_json)?;

let split_match_tasks: Vec<_> = split_ids
.iter()
.map(|split| {
let index_storage = index_storage.clone();
async {
split_matches_query(
searcher_context,
index_storage,
split,
&doc_mapper,
&query_ast,
)
.await
}
})
.collect();

let matches_vec = try_join_all(split_match_tasks).await?;
split_ids
.iter()
.zip(matches_vec)
.filter_map(|(split, matches)| matches.then_some(split))
.collect()
}
_ => split_ids.iter().collect(),
};

// If no splits match, return empty response
if matching_splits.is_empty() {
return Ok(ListFieldsResponse { fields: Vec::new() });
}

// Get fields from matching splits
let single_split_list_fields_futures: Vec<_> = matching_splits
.iter()
.map(|split_id| {
get_fields_from_split(
Expand Down Expand Up @@ -375,13 +487,15 @@ pub async fn leaf_list_fields(
Ok(ListFieldsResponse { fields })
}

/// Index metas needed for executing a leaf search request.
/// Index metas needed for executing a leaf list fields request.
#[derive(Clone, Debug)]
pub struct IndexMetasForLeafSearch {
/// Index id.
pub index_id: IndexId,
/// Index URI.
pub index_uri: Uri,
/// Serialized DocMapper for query execution (only set when query_ast is provided).
pub doc_mapper_str: Option<String>,
}

/// Performs a distributed list fields request.
Expand All @@ -399,29 +513,74 @@ pub async fn root_list_fields(
if indexes_metadata.is_empty() {
return Ok(ListFieldsResponse { fields: Vec::new() });
}
let index_uid_to_index_meta: HashMap<IndexUid, IndexMetasForLeafSearch> = indexes_metadata
.iter()
.map(|index_metadata| {
let index_metadata_for_leaf_search = IndexMetasForLeafSearch {
index_uri: index_metadata.index_uri().clone(),
index_id: index_metadata.index_config.index_id.to_string(),
};

(
index_metadata.index_uid.clone(),
index_metadata_for_leaf_search,

// Build index metadata map, including doc_mapper if query_ast is provided
let has_query_ast = list_fields_req.query_ast.is_some();
let mut index_uid_to_index_meta: HashMap<IndexUid, IndexMetasForLeafSearch> = HashMap::new();
let mut index_uids: Vec<IndexUid> = Vec::new();
let mut timestamp_field_opt: Option<String> = None;

for index_metadata in indexes_metadata {
// Only build doc_mapper when query_ast is provided (needed for split-level filtering)
let doc_mapper_str = if has_query_ast {
let doc_mapper = build_doc_mapper(
&index_metadata.index_config.doc_mapping,
&index_metadata.index_config.search_settings,
)
})
.collect();
let index_uids: Vec<IndexUid> = indexes_metadata
.into_iter()
.map(|index_metadata| index_metadata.index_uid)
.collect();
.map_err(|err| SearchError::Internal(format!("failed to build doc mapper: {err}")))?;

// Capture timestamp field for time range extraction (use first index's field)
if timestamp_field_opt.is_none() {
timestamp_field_opt = doc_mapper.timestamp_field_name().map(|s| s.to_string());
}

Some(serde_json::to_string(&doc_mapper).map_err(|err| {
SearchError::Internal(format!("failed to serialize doc mapper: {err}"))
})?)
} else {
None
};

let index_metadata_for_leaf_search = IndexMetasForLeafSearch {
index_uri: index_metadata.index_uri().clone(),
index_id: index_metadata.index_config.index_id.to_string(),
doc_mapper_str,
};

index_uids.push(index_metadata.index_uid.clone());
index_uid_to_index_meta.insert(
index_metadata.index_uid.clone(),
index_metadata_for_leaf_search,
);
}

// Extract tags and refine time range from query_ast for split pruning
let mut start_timestamp = list_fields_req.start_timestamp;
let mut end_timestamp = list_fields_req.end_timestamp;
let tags_filter_opt = if let Some(ref query_ast_json) = list_fields_req.query_ast {
let query_ast: QueryAst = serde_json::from_str(query_ast_json)
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// Refine time range from query AST if timestamp field is available
if let Some(ref timestamp_field) = timestamp_field_opt {
crate::root::refine_start_end_timestamp_from_ast(
&query_ast,
timestamp_field,
&mut start_timestamp,
&mut end_timestamp,
);
}

extract_tags_from_query(query_ast)
} else {
None
};

let split_metadatas: Vec<SplitMetadata> = list_relevant_splits(
index_uids,
list_fields_req.start_timestamp,
list_fields_req.end_timestamp,
None,
start_timestamp,
end_timestamp,
tags_filter_opt,
&mut metastore,
)
.await?;
Expand Down Expand Up @@ -478,6 +637,8 @@ pub fn jobs_to_leaf_requests(
index_uri: index_meta.index_uri.to_string(),
fields: search_request_for_leaf.fields.clone(),
split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
query_ast: search_request_for_leaf.query_ast.clone(),
doc_mapper: index_meta.doc_mapper_str.clone(),
};
leaf_search_requests.push(leaf_search_request);
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ impl SearchService for SearchServiceImpl {
&self.searcher_context,
&split_ids[..],
&list_fields_req.fields,
list_fields_req.query_ast.as_deref(),
list_fields_req.doc_mapper.as_deref(),
)
.await
}
Expand Down
Loading
Loading