diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 04fa0cedf2e..a368cfad804 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -125,6 +125,10 @@ message ListFieldsRequest { optional int64 start_timestamp = 3; optional int64 end_timestamp = 4; + // JSON-serialized QueryAst for index_filter support. + // When provided, only fields from documents matching this query are returned. + optional string query_ast = 5; + // Control if the request will fail if split_ids contains a split that does not exist. // optional bool fail_on_missing_index = 6; } @@ -142,6 +146,13 @@ message LeafListFieldsRequest { // Wildcard expressions are supported. repeated string fields = 4; + // JSON-serialized QueryAst for index_filter support. + // When provided, only splits containing documents matching this query are included. + optional string query_ast = 5; + + // JSON-serialized DocMapper for query execution. + // Required when query_ast is provided to build and execute the query. + optional string doc_mapper = 6; } message ListFieldsResponse { diff --git a/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.trace.v1.rs b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.trace.v1.rs index 6736d97c7e2..afa08ca3c9d 100644 --- a/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.trace.v1.rs +++ b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.trace.v1.rs @@ -120,10 +120,12 @@ pub struct Span { /// attributes is a collection of key/value pairs. Note, global attributes /// like server name can be set using the resource API. Examples of attributes: /// - /// "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" - /// "/http/server_latency": 300 - /// "abc.com/myattribute": true - /// "abc.com/score": 10.239 + /// ```text + /// "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + /// "/http/server_latency": 300 + /// "abc.com/myattribute": true + /// "abc.com/score": 10.239 + /// ``` /// /// The OpenTelemetry API specification further restricts the allowed value types: /// @@ -276,7 +278,7 @@ pub mod span { } /// The Status type defines a logical error model that is suitable for different /// programming environments, including REST APIs and RPC APIs. -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct Status { /// A developer-facing human readable error message. #[prost(string, tag = "2")] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index e1201ce7a0e..64604e7eedc 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -70,6 +70,10 @@ pub struct ListFieldsRequest { pub start_timestamp: ::core::option::Option, #[prost(int64, optional, tag = "4")] pub end_timestamp: ::core::option::Option, + /// JSON-serialized QueryAst for index_filter support. + /// When provided, only fields from documents matching this query are returned. + #[prost(string, optional, tag = "5")] + pub query_ast: ::core::option::Option<::prost::alloc::string::String>, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -88,6 +92,14 @@ pub struct LeafListFieldsRequest { /// Wildcard expressions are supported. #[prost(string, repeated, tag = "4")] pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// JSON-serialized QueryAst for index_filter support. + /// When provided, only splits containing documents matching this query are included. + #[prost(string, optional, tag = "5")] + pub query_ast: ::core::option::Option<::prost::alloc::string::String>, + /// JSON-serialized DocMapper for query execution. + /// Required when query_ast is provided to build and execute the query. + #[prost(string, optional, tag = "6")] + pub doc_mapper: ::core::option::Option<::prost::alloc::string::String>, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs index f4cf173fe08..e225617d0a8 100644 --- a/quickwit/quickwit-search/src/list_fields.rs +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -24,6 +24,9 @@ use itertools::Itertools; use quickwit_common::rate_limited_warn; use quickwit_common::shared_consts::{FIELD_PRESENCE_FIELD_NAME, SPLIT_FIELDS_FILE_NAME}; use quickwit_common::uri::Uri; +use quickwit_config::build_doc_mapper; +use quickwit_doc_mapper::DocMapper; +use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_metastore::SplitMetadata; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::search::{ @@ -31,9 +34,11 @@ use quickwit_proto::search::{ ListFieldsResponse, SplitIdAndFooterOffsets, deserialize_split_fields, }; use quickwit_proto::types::{IndexId, IndexUid}; -use quickwit_storage::Storage; +use quickwit_query::query_ast::QueryAst; +use quickwit_storage::{ByteRangeCache, Storage}; +use tantivy::ReloadPolicy; -use crate::leaf::open_split_bundle; +use crate::leaf::{open_index_with_caches, open_split_bundle, warmup}; use crate::search_job_placer::group_jobs_by_index_id; use crate::service::SearcherContext; use crate::{ @@ -309,20 +314,127 @@ impl FieldPattern { } } +/// Checks if any documents in the split match the query. +/// Returns true if at least one document matches, false otherwise. +/// +/// This is a lightweight query execution that only counts matches without +/// materializing documents, used for split-level filtering in field capabilities. +async fn split_matches_query( + searcher_context: &SearcherContext, + index_storage: Arc, + split: &SplitIdAndFooterOffsets, + doc_mapper: &DocMapper, + query_ast: &QueryAst, +) -> crate::Result { + let byte_range_cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + // Open split with caches + let (index, _hot_directory) = open_index_with_caches( + searcher_context, + index_storage, + split, + Some(doc_mapper.tokenizer_manager()), + Some(byte_range_cache), + ) + .await?; + + // Create searcher with manual reload policy + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .map_err(|err| SearchError::Internal(format!("failed to create index reader: {err}")))?; + let searcher = reader.searcher(); + + // Build query from QueryAst + let (query, mut warmup_info) = doc_mapper + .query(searcher.schema().clone(), query_ast.clone(), false, None) + .map_err(|err| SearchError::InvalidQuery(format!("failed to build query: {err}")))?; + + // Warmup to ensure all bytes are fetched asynchronously before sync search + warmup_info.simplify(); + warmup(&searcher, &warmup_info) + .await + .map_err(|err| SearchError::Internal(format!("failed to warmup query: {err}")))?; + + // Check if any docs match (lightweight count) + let count = search_thread_pool() + .run_cpu_intensive(move || { + query + .count(&searcher) + .map_err(|err| SearchError::Internal(format!("failed to count matches: {err}"))) + }) + .await + .map_err(|_| SearchError::Internal("split matches query panicked".to_string()))??; + + Ok(count > 0) +} + /// `leaf` step of list fields. +/// +/// Returns field metadata from the assigned splits. When `query_ast` and `doc_mapper_str` +/// are provided, splits are filtered to only include those containing at least one +/// matching document (lightweight query execution for split-level filtering). pub async fn leaf_list_fields( index_id: IndexId, index_storage: Arc, searcher_context: &SearcherContext, split_ids: &[SplitIdAndFooterOffsets], field_patterns_str: &[String], + query_ast_str: Option<&str>, + doc_mapper_str: Option<&str>, ) -> crate::Result { let field_patterns: Vec = field_patterns_str .iter() .map(|pattern_str| FieldPattern::from_str(pattern_str)) .collect::>()?; - let single_split_list_fields_futures: Vec<_> = split_ids + // If no splits, return empty response + if split_ids.is_empty() { + return Ok(ListFieldsResponse { fields: Vec::new() }); + } + + // Filter splits based on query if both query_ast and doc_mapper are provided + let matching_splits: Vec<&SplitIdAndFooterOffsets> = match (query_ast_str, doc_mapper_str) { + (Some(ast_json), Some(mapper_json)) => { + let query_ast: QueryAst = serde_json::from_str(ast_json) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + let doc_mapper = crate::service::deserialize_doc_mapper(mapper_json)?; + + let split_match_tasks: Vec<_> = split_ids + .iter() + .map(|split| { + let index_storage = index_storage.clone(); + async { + split_matches_query( + searcher_context, + index_storage, + split, + &doc_mapper, + &query_ast, + ) + .await + } + }) + .collect(); + + let matches_vec = try_join_all(split_match_tasks).await?; + split_ids + .iter() + .zip(matches_vec) + .filter_map(|(split, matches)| matches.then_some(split)) + .collect() + } + _ => split_ids.iter().collect(), + }; + + // If no splits match, return empty response + if matching_splits.is_empty() { + return Ok(ListFieldsResponse { fields: Vec::new() }); + } + + // Get fields from matching splits + let single_split_list_fields_futures: Vec<_> = matching_splits .iter() .map(|split_id| { get_fields_from_split( @@ -375,13 +487,15 @@ pub async fn leaf_list_fields( Ok(ListFieldsResponse { fields }) } -/// Index metas needed for executing a leaf search request. +/// Index metas needed for executing a leaf list fields request. #[derive(Clone, Debug)] pub struct IndexMetasForLeafSearch { /// Index id. pub index_id: IndexId, /// Index URI. pub index_uri: Uri, + /// Serialized DocMapper for query execution (only set when query_ast is provided). + pub doc_mapper_str: Option, } /// Performs a distributed list fields request. @@ -399,29 +513,74 @@ pub async fn root_list_fields( if indexes_metadata.is_empty() { return Ok(ListFieldsResponse { fields: Vec::new() }); } - let index_uid_to_index_meta: HashMap = indexes_metadata - .iter() - .map(|index_metadata| { - let index_metadata_for_leaf_search = IndexMetasForLeafSearch { - index_uri: index_metadata.index_uri().clone(), - index_id: index_metadata.index_config.index_id.to_string(), - }; - - ( - index_metadata.index_uid.clone(), - index_metadata_for_leaf_search, + + // Build index metadata map, including doc_mapper if query_ast is provided + let has_query_ast = list_fields_req.query_ast.is_some(); + let mut index_uid_to_index_meta: HashMap = HashMap::new(); + let mut index_uids: Vec = Vec::new(); + let mut timestamp_field_opt: Option = None; + + for index_metadata in indexes_metadata { + // Only build doc_mapper when query_ast is provided (needed for split-level filtering) + let doc_mapper_str = if has_query_ast { + let doc_mapper = build_doc_mapper( + &index_metadata.index_config.doc_mapping, + &index_metadata.index_config.search_settings, ) - }) - .collect(); - let index_uids: Vec = indexes_metadata - .into_iter() - .map(|index_metadata| index_metadata.index_uid) - .collect(); + .map_err(|err| SearchError::Internal(format!("failed to build doc mapper: {err}")))?; + + // Capture timestamp field for time range extraction (use first index's field) + if timestamp_field_opt.is_none() { + timestamp_field_opt = doc_mapper.timestamp_field_name().map(|s| s.to_string()); + } + + Some(serde_json::to_string(&doc_mapper).map_err(|err| { + SearchError::Internal(format!("failed to serialize doc mapper: {err}")) + })?) + } else { + None + }; + + let index_metadata_for_leaf_search = IndexMetasForLeafSearch { + index_uri: index_metadata.index_uri().clone(), + index_id: index_metadata.index_config.index_id.to_string(), + doc_mapper_str, + }; + + index_uids.push(index_metadata.index_uid.clone()); + index_uid_to_index_meta.insert( + index_metadata.index_uid.clone(), + index_metadata_for_leaf_search, + ); + } + + // Extract tags and refine time range from query_ast for split pruning + let mut start_timestamp = list_fields_req.start_timestamp; + let mut end_timestamp = list_fields_req.end_timestamp; + let tags_filter_opt = if let Some(ref query_ast_json) = list_fields_req.query_ast { + let query_ast: QueryAst = serde_json::from_str(query_ast_json) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + + // Refine time range from query AST if timestamp field is available + if let Some(ref timestamp_field) = timestamp_field_opt { + crate::root::refine_start_end_timestamp_from_ast( + &query_ast, + timestamp_field, + &mut start_timestamp, + &mut end_timestamp, + ); + } + + extract_tags_from_query(query_ast) + } else { + None + }; + let split_metadatas: Vec = list_relevant_splits( index_uids, - list_fields_req.start_timestamp, - list_fields_req.end_timestamp, - None, + start_timestamp, + end_timestamp, + tags_filter_opt, &mut metastore, ) .await?; @@ -478,6 +637,8 @@ pub fn jobs_to_leaf_requests( index_uri: index_meta.index_uri.to_string(), fields: search_request_for_leaf.fields.clone(), split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), + query_ast: search_request_for_leaf.query_ast.clone(), + doc_mapper: index_meta.doc_mapper_str.clone(), }; leaf_search_requests.push(leaf_search_request); Ok(()) diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index e33b67339ff..ab10e8c91ce 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -306,6 +306,8 @@ impl SearchService for SearchServiceImpl { &self.searcher_context, &split_ids[..], &list_fields_req.fields, + list_fields_req.query_ast.as_deref(), + list_fields_req.doc_mapper.as_deref(), ) .await } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs index a382c541dc7..7d1f650274a 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs @@ -15,7 +15,10 @@ use std::collections::HashMap; use quickwit_proto::search::{ListFieldType, ListFieldsEntryResponse, ListFieldsResponse}; +use quickwit_query::ElasticQueryDsl; +use quickwit_query::query_ast::QueryAst; use serde::{Deserialize, Serialize}; +use warp::hyper::StatusCode; use super::ElasticsearchError; use super::search_query_params::*; @@ -173,16 +176,225 @@ pub fn convert_to_es_field_capabilities_response( FieldCapabilityResponse { indices, fields } } +/// Parses an Elasticsearch index_filter JSON value into a Quickwit QueryAst. +/// +/// Returns `Ok(None)` if the index_filter is null or empty. +/// Returns `Ok(Some(QueryAst))` if the index_filter is valid. +/// Returns `Err` if the index_filter is invalid or cannot be converted. +pub fn parse_index_filter_to_query_ast( + index_filter: serde_json::Value, +) -> Result, ElasticsearchError> { + if index_filter.is_null() || index_filter == serde_json::Value::Object(Default::default()) { + return Ok(None); + } + + // Parse ES Query DSL to internal QueryAst + let elastic_query_dsl: ElasticQueryDsl = + serde_json::from_value(index_filter).map_err(|err| { + ElasticsearchError::new( + StatusCode::BAD_REQUEST, + format!("Invalid index_filter: {err}"), + None, + ) + })?; + + let query_ast: QueryAst = elastic_query_dsl.try_into().map_err(|err: anyhow::Error| { + ElasticsearchError::new( + StatusCode::BAD_REQUEST, + format!("Failed to convert index_filter: {err}"), + None, + ) + })?; + + Ok(Some(query_ast)) +} + #[allow(clippy::result_large_err)] pub fn build_list_field_request_for_es_api( index_id_patterns: Vec, search_params: FieldCapabilityQueryParams, - _search_body: FieldCapabilityRequestBody, + search_body: FieldCapabilityRequestBody, ) -> Result { + let query_ast = parse_index_filter_to_query_ast(search_body.index_filter)?; + let query_ast_json = query_ast + .map(|ast| serde_json::to_string(&ast).expect("QueryAst should be JSON serializable")); + Ok(quickwit_proto::search::ListFieldsRequest { index_id_patterns, fields: search_params.fields.unwrap_or_default(), start_timestamp: search_params.start_timestamp, end_timestamp: search_params.end_timestamp, + query_ast: query_ast_json, }) } + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_build_list_field_request_empty_index_filter() { + let result = build_list_field_request_for_es_api( + vec!["test_index".to_string()], + FieldCapabilityQueryParams::default(), + FieldCapabilityRequestBody::default(), + ) + .unwrap(); + + assert_eq!(result.index_id_patterns, vec!["test_index".to_string()]); + assert!(result.query_ast.is_none()); + } + + #[test] + fn test_build_list_field_request_with_term_index_filter() { + let search_body = FieldCapabilityRequestBody { + index_filter: json!({ + "term": { + "status": "active" + } + }), + runtime_mappings: serde_json::Value::Null, + }; + + let result = build_list_field_request_for_es_api( + vec!["test_index".to_string()], + FieldCapabilityQueryParams::default(), + search_body, + ) + .unwrap(); + + assert_eq!(result.index_id_patterns, vec!["test_index".to_string()]); + assert!(result.query_ast.is_some()); + + // Verify the query_ast is valid JSON + let query_ast: serde_json::Value = + serde_json::from_str(&result.query_ast.unwrap()).unwrap(); + assert!(query_ast.is_object()); + } + + #[test] + fn test_build_list_field_request_with_bool_index_filter() { + let search_body = FieldCapabilityRequestBody { + index_filter: json!({ + "bool": { + "must": [ + { "term": { "status": "active" } } + ], + "filter": [ + { "range": { "age": { "gte": 18 } } } + ] + } + }), + runtime_mappings: serde_json::Value::Null, + }; + + let result = build_list_field_request_for_es_api( + vec!["test_index".to_string()], + FieldCapabilityQueryParams::default(), + search_body, + ) + .unwrap(); + + assert!(result.query_ast.is_some()); + } + + #[test] + fn test_build_list_field_request_with_invalid_index_filter() { + let search_body = FieldCapabilityRequestBody { + index_filter: json!({ + "invalid_query_type": { + "field": "value" + } + }), + runtime_mappings: serde_json::Value::Null, + }; + + let result = build_list_field_request_for_es_api( + vec!["test_index".to_string()], + FieldCapabilityQueryParams::default(), + search_body, + ); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.status, StatusCode::BAD_REQUEST); + } + + #[test] + fn test_build_list_field_request_with_null_index_filter() { + let search_body = FieldCapabilityRequestBody { + index_filter: serde_json::Value::Null, + runtime_mappings: serde_json::Value::Null, + }; + + let result = build_list_field_request_for_es_api( + vec!["test_index".to_string()], + FieldCapabilityQueryParams::default(), + search_body, + ) + .unwrap(); + + assert!(result.query_ast.is_none()); + } + + #[test] + fn test_build_list_field_request_preserves_other_params() { + let search_params = FieldCapabilityQueryParams { + fields: Some(vec!["field1".to_string(), "field2".to_string()]), + start_timestamp: Some(1000), + end_timestamp: Some(2000), + ..Default::default() + }; + + let search_body = FieldCapabilityRequestBody { + index_filter: json!({ "match_all": {} }), + runtime_mappings: serde_json::Value::Null, + }; + + let result = build_list_field_request_for_es_api( + vec!["test_index".to_string()], + search_params, + search_body, + ) + .unwrap(); + + assert_eq!( + result.fields, + vec!["field1".to_string(), "field2".to_string()] + ); + assert_eq!(result.start_timestamp, Some(1000)); + assert_eq!(result.end_timestamp, Some(2000)); + assert!(result.query_ast.is_some()); + } + + #[test] + fn test_parse_index_filter_to_query_ast_null() { + let result = parse_index_filter_to_query_ast(serde_json::Value::Null).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_parse_index_filter_to_query_ast_empty_object() { + let result = parse_index_filter_to_query_ast(json!({})).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_parse_index_filter_to_query_ast_valid_term() { + let result = parse_index_filter_to_query_ast(json!({ + "term": { "status": "active" } + })) + .unwrap(); + assert!(result.is_some()); + } + + #[test] + fn test_parse_index_filter_to_query_ast_invalid() { + let result = parse_index_filter_to_query_ast(json!({ + "invalid_query_type": { "field": "value" } + })); + assert!(result.is_err()); + } +} diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml index bd3cd917acd..16513f2d008 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml @@ -334,3 +334,129 @@ engines: - elasticsearch endpoint: doesno*texist/_field_caps?fields=date status_code: 200 +--- +# Test _field_caps API with index_filter (term query) +# Note: term queries require exact token match; 'fritz' is lowercase due to default tokenizer +method: [POST] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=* +json: + index_filter: + term: + name: "fritz" +expected: + indices: + - fieldcaps + fields: + name: + keyword: + type: keyword + metadata_field: false + searchable: true + aggregatable: true + text: + type: text + metadata_field: false + searchable: true + aggregatable: true +--- +# Test _field_caps API with index_filter (match_all query) +method: [POST] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=name +json: + index_filter: + match_all: {} +expected: + indices: + - fieldcaps + fields: + name: + keyword: + type: keyword + metadata_field: false + searchable: true + aggregatable: true + text: + type: text + metadata_field: false + searchable: true + aggregatable: true +--- +# Test _field_caps API with index_filter (bool query) +method: [POST] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=response,name +json: + index_filter: + bool: + must: + - term: + name: "fritz" + filter: + - range: + response: + gte: 30 +expected: + indices: + - fieldcaps + fields: + response: + long: + type: long + metadata_field: false + searchable: true + aggregatable: true + name: + keyword: + type: keyword + metadata_field: false + searchable: true + aggregatable: true + text: + type: text + metadata_field: false + searchable: true + aggregatable: true +--- +# Test _field_caps API with invalid index_filter +method: [POST] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=* +json: + index_filter: + invalid_query_type: + field: "value" +status_code: 400 +--- +# Test _field_caps API with empty index_filter (should work like no filter) +method: [POST] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=name +json: + index_filter: {} +expected: + indices: + - fieldcaps + fields: + name: + keyword: + type: keyword + metadata_field: false + searchable: true + aggregatable: true + text: + type: text + metadata_field: false + searchable: true + aggregatable: true