From 2ff972670c842df1c0780bd6c12c8694a62c3f37 Mon Sep 17 00:00:00 2001 From: Arthur Brongniart Date: Fri, 2 Jan 2026 16:25:08 +0100 Subject: [PATCH] add index_routing_rules table and create / get methods --- .../26_create-index-routing-rules.down.sql | 1 + .../26_create-index-routing-rules.up.sql | 11 + .../src/metastore/control_plane_metastore.rs | 25 +- .../src/metastore/file_backed/mod.rs | 54 +- .../src/metastore/postgres/error.rs | 7 + .../src/metastore/postgres/metastore.rs | 94 +++- .../src/metastore/postgres/model.rs | 8 + .../postgres/queries/routing_rules/insert.sql | 2 + .../postgres/queries/routing_rules/select.sql | 4 + .../protos/quickwit/metastore.proto | 38 +- .../codegen/quickwit/quickwit.metastore.rs | 530 ++++++++++++++++++ quickwit/quickwit-proto/src/metastore/mod.rs | 8 + 12 files changed, 740 insertions(+), 42 deletions(-) create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.down.sql create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.up.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/insert.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/select.sql diff --git a/quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.down.sql new file mode 100644 index 00000000000..4bde0b6d35b --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS index_routing_rules; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.up.sql new file mode 100644 index 00000000000..75288a1526f --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/26_create-index-routing-rules.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS index_routing_rules ( + routing_table_id VARCHAR(50) NOT NULL, + rank INTEGER NOT NULL, + filter TEXT NOT NULL, + index_id VARCHAR(50) NOT NULL, + create_timestamp TIMESTAMP NOT NULL DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'), + update_timestamp TIMESTAMP NOT NULL DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'), + + PRIMARY KEY (routing_table_id, rank), + FOREIGN KEY(index_id) REFERENCES indexes(index_id) +); diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index bcb07d79020..68125ed9bd4 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -19,11 +19,12 @@ use quickwit_common::uri::Uri; use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClient}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, - CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, - DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, - DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, - FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, - GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, + CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse, + CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery, + DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest, + DeleteTask, EmptyResponse, FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, + GetClusterIdentityRequest, GetClusterIdentityResponse, GetIndexRoutingTableRequest, + GetIndexRoutingTableResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse, @@ -289,4 +290,18 @@ impl MetastoreService for ControlPlaneMetastore { ) -> MetastoreResult { self.metastore.get_cluster_identity(request).await } + + async fn create_index_routing_table( + &self, + request: CreateIndexRoutingTableRequest, + ) -> MetastoreResult { + self.metastore.create_index_routing_table(request).await + } + + async fn get_index_routing_table( + &self, + request: GetIndexRoutingTableRequest, + ) -> MetastoreResult { + self.metastore.get_index_routing_table(request).await + } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 2542f1db36f..37e0d4f0973 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -41,22 +41,24 @@ use quickwit_common::ServiceStream; use quickwit_config::IndexTemplate; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, - CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, - DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, - DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind, - FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, - GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, - IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, - IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, - LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, - ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest, - ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, - ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, - ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, - MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, - OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, - StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, - UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils, + CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse, + CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery, + DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest, + DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest, + FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse, + GetIndexRoutingTableRequest, GetIndexRoutingTableResponse, GetIndexTemplateRequest, + GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason, + IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest, + IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, + ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse, + ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, + ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, + ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, + MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, + OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, + ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, + UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + serde_utils, }; use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; @@ -1275,6 +1277,26 @@ impl MetastoreService for FileBackedMetastore { uuid: state_wlock_guard.identity.hyphenated().to_string(), }) } + + async fn create_index_routing_table( + &self, + _request: CreateIndexRoutingTableRequest, + ) -> MetastoreResult { + Err(MetastoreError::Internal { + message: "routing tables are not supported in file-backed metastore".to_string(), + cause: "unsupported operation".to_string(), + }) + } + + async fn get_index_routing_table( + &self, + _request: GetIndexRoutingTableRequest, + ) -> MetastoreResult { + Err(MetastoreError::Internal { + message: "routing tables are not supported in file-backed metastore".to_string(), + cause: "unsupported operation".to_string(), + }) + } } impl MetastoreServiceExt for FileBackedMetastore {} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/error.rs b/quickwit/quickwit-metastore/src/metastore/postgres/error.rs index 4f4fe7a2205..e6d171f20a1 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/error.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/error.rs @@ -40,6 +40,13 @@ pub(super) fn convert_sqlx_err(index_id: &str, sqlx_error: sqlx::Error) -> Metas index_id: index_id.to_string(), }) } + (pg_error_codes::UNIQUE_VIOLATION, Some("index_routing_rules")) => { + error!(error=?boxed_db_error, "postgresql-error"); + MetastoreError::Internal { + message: "duplicate rank in routing table".to_string(), + cause: format!("DB error {boxed_db_error:?}"), + } + } (pg_error_codes::UNIQUE_VIOLATION, _) => { error!(error=?boxed_db_error, "postgresql-error"); MetastoreError::Internal { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c7640bfb9ab..828153e9227 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -29,19 +29,21 @@ use quickwit_config::{ use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, - CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, - DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, - DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind, - FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, - GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, - IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, - IndexStats, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, - LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, - ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse, - ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, - ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse, - ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, - MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, + CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse, + CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery, + DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest, + DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest, + FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse, + GetIndexRoutingTableRequest, GetIndexRoutingTableResponse, GetIndexTemplateRequest, + GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason, + IndexMetadataRequest, IndexMetadataResponse, IndexRoutingRule, IndexStats, IndexTemplateMatch, + IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, + LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, + ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest, + ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, + ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest, + ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, + MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, SplitStats, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, @@ -57,7 +59,9 @@ use uuid::Uuid; use super::error::convert_sqlx_err; use super::migrator::run_migrations; -use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits}; +use super::model::{ + PgDeleteTask, PgIndex, PgIndexRoutingRule, PgIndexTemplate, PgShard, PgSplit, Splits, +}; use super::pool::TrackedPool; use super::split_stream::SplitStream; use super::utils::{append_query_filters_and_order_by, establish_connection}; @@ -1763,6 +1767,68 @@ impl MetastoreService for PostgresqlMetastore { .await?; Ok(GetClusterIdentityResponse { uuid }) } + + async fn create_index_routing_table( + &self, + request: CreateIndexRoutingTableRequest, + ) -> MetastoreResult { + const INSERT_ROUTING_RULE_QUERY: &str = include_str!("queries/routing_rules/insert.sql"); + + // Validate request + if request.rules.is_empty() { + return Err(MetastoreError::InvalidArgument { + message: "routing table must contain at least one rule".to_string(), + }); + } + + // Generate routing_table_id + let routing_table_id = Uuid::new_v4().hyphenated().to_string(); + + // Use transaction for atomicity + run_with_tx!(self.connection_pool, tx, "create_routing_table", { + // Insert each rule with its array index as the rank + for (rank, rule) in request.rules.iter().enumerate() { + sqlx::query(INSERT_ROUTING_RULE_QUERY) + .bind(&routing_table_id) + .bind(rank as i32) + .bind(&rule.filter) + .bind(&rule.index_id) + .execute(tx.as_mut()) + .await + .map_err(|e| convert_sqlx_err("routing_rules", e))?; + } + + Ok(CreateIndexRoutingTableResponse { routing_table_id }) + }) + } + + async fn get_index_routing_table( + &self, + request: GetIndexRoutingTableRequest, + ) -> MetastoreResult { + const SELECT_ROUTING_RULES_QUERY: &str = include_str!("queries/routing_rules/select.sql"); + + let pg_rules: Vec = sqlx::query_as(SELECT_ROUTING_RULES_QUERY) + .bind(&request.routing_table_id) + .fetch_all(&self.connection_pool) + .await?; + + if pg_rules.is_empty() { + return Err(MetastoreError::NotFound(EntityKind::RoutingTable { + routing_table_id: request.routing_table_id.clone(), + })); + } + + let rules = pg_rules + .into_iter() + .map(|pg_rule| IndexRoutingRule { + index_id: pg_rule.index_id, + filter: pg_rule.filter, + }) + .collect(); + + Ok(GetIndexRoutingTableResponse { rules }) + } } async fn open_or_fetch_shard<'e>( diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index 117f37047d9..bdda8787e4c 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -292,3 +292,11 @@ impl From for Shard { pub(super) struct PgIndexTemplate { pub index_template_json: String, } + +#[derive(sqlx::FromRow, Debug)] +pub(super) struct PgIndexRoutingRule { + pub routing_table_id: String, + pub rank: i32, + pub filter: String, + pub index_id: String, +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/insert.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/insert.sql new file mode 100644 index 00000000000..5c06e3d10e9 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/insert.sql @@ -0,0 +1,2 @@ +INSERT INTO index_routing_rules (routing_table_id, rank, filter, index_id) +VALUES ($1, $2, $3, $4) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/select.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/select.sql new file mode 100644 index 00000000000..1673cf93baf --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/routing_rules/select.sql @@ -0,0 +1,4 @@ +SELECT routing_table_id, rank, filter, index_id +FROM index_routing_rules +WHERE routing_table_id = $1 +ORDER BY rank ASC diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 00680da02d0..025706f81b3 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -204,11 +204,16 @@ service MetastoreService { // Get cluster identity rpc GetClusterIdentity(GetClusterIdentityRequest) returns (GetClusterIdentityResponse); -} -message EmptyResponse { + // Creates index routing table + rpc CreateIndexRoutingTable(CreateIndexRoutingTableRequest) returns (CreateIndexRoutingTableResponse); + + // Get index routing table + rpc GetIndexRoutingTable(GetIndexRoutingTableRequest) returns (GetIndexRoutingTableResponse); } +message EmptyResponse {} + message CreateIndexRequest { string index_config_json = 2; repeated string source_configs_json = 3; @@ -229,7 +234,7 @@ message UpdateIndexRequest { } message ListIndexesMetadataRequest { - reserved 1; + reserved 1; // List of patterns an index should match or not match to get considered // An index must match at least one positive pattern (a pattern not starting // with a '-'), and no negative pattern (a pattern starting with a '-'). @@ -550,8 +555,7 @@ message IndexTemplateMatch { string index_template_json = 3; } -message ListIndexTemplatesRequest { -} +message ListIndexTemplatesRequest {} message ListIndexTemplatesResponse { repeated string index_templates_json = 1; @@ -561,9 +565,29 @@ message DeleteIndexTemplatesRequest { repeated string template_ids = 1; } -message GetClusterIdentityRequest { -} +message GetClusterIdentityRequest {} message GetClusterIdentityResponse { string uuid = 1; } + +message CreateIndexRoutingTableRequest { + repeated IndexRoutingRule rules = 1; +} + +message CreateIndexRoutingTableResponse { + string routing_table_id = 1; +} + +message GetIndexRoutingTableRequest { + string routing_table_id = 1; +} + +message GetIndexRoutingTableResponse { + repeated IndexRoutingRule rules = 1; +} + +message IndexRoutingRule { + string index_id = 1; + string filter = 2; +} diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index ab6d1ddc236..bdf7c5ad918 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -533,6 +533,38 @@ pub struct GetClusterIdentityResponse { pub uuid: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CreateIndexRoutingTableRequest { + #[prost(message, repeated, tag = "1")] + pub rules: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CreateIndexRoutingTableResponse { + #[prost(string, tag = "1")] + pub routing_table_id: ::prost::alloc::string::String, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetIndexRoutingTableRequest { + #[prost(string, tag = "1")] + pub routing_table_id: ::prost::alloc::string::String, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetIndexRoutingTableResponse { + #[prost(message, repeated, tag = "1")] + pub rules: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct IndexRoutingRule { + #[prost(string, tag = "1")] + pub index_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub filter: ::prost::alloc::string::String, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "snake_case")] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] @@ -798,6 +830,16 @@ impl RpcName for GetClusterIdentityRequest { "get_cluster_identity" } } +impl RpcName for CreateIndexRoutingTableRequest { + fn rpc_name() -> &'static str { + "create_index_routing_table" + } +} +impl RpcName for GetIndexRoutingTableRequest { + fn rpc_name() -> &'static str { + "get_index_routing_table" + } +} pub type MetastoreServiceStream = quickwit_common::ServiceStream< crate::metastore::MetastoreResult, >; @@ -985,6 +1027,16 @@ pub trait MetastoreService: std::fmt::Debug + Send + Sync + 'static { &self, request: GetClusterIdentityRequest, ) -> crate::metastore::MetastoreResult; + ///Creates index routing table + async fn create_index_routing_table( + &self, + request: CreateIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult; + ///Get index routing table + async fn get_index_routing_table( + &self, + request: GetIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult; async fn check_connectivity(&self) -> anyhow::Result<()>; fn endpoints(&self) -> Vec; } @@ -1293,6 +1345,18 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.0.get_cluster_identity(request).await } + async fn create_index_routing_table( + &self, + request: CreateIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.0.create_index_routing_table(request).await + } + async fn get_index_routing_table( + &self, + request: GetIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.0.get_index_routing_table(request).await + } async fn check_connectivity(&self) -> anyhow::Result<()> { self.inner.0.check_connectivity().await } @@ -1511,6 +1575,18 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.get_cluster_identity(request).await } + async fn create_index_routing_table( + &self, + request: super::CreateIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.create_index_routing_table(request).await + } + async fn get_index_routing_table( + &self, + request: super::GetIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.get_index_routing_table(request).await + } async fn check_connectivity(&self) -> anyhow::Result<()> { self.inner.lock().await.check_connectivity().await } @@ -2050,6 +2126,38 @@ impl tower::Service for InnerMetastoreServiceClient { Box::pin(fut) } } +impl tower::Service for InnerMetastoreServiceClient { + type Response = CreateIndexRoutingTableResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: CreateIndexRoutingTableRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.create_index_routing_table(request).await }; + Box::pin(fut) + } +} +impl tower::Service for InnerMetastoreServiceClient { + type Response = GetIndexRoutingTableResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: GetIndexRoutingTableRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.get_index_routing_table(request).await }; + Box::pin(fut) + } +} /// A tower service stack is a set of tower services. #[derive(Debug)] struct MetastoreServiceTowerServiceStack { @@ -2220,6 +2328,16 @@ struct MetastoreServiceTowerServiceStack { GetClusterIdentityResponse, crate::metastore::MetastoreError, >, + create_index_routing_table_svc: quickwit_common::tower::BoxService< + CreateIndexRoutingTableRequest, + CreateIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + get_index_routing_table_svc: quickwit_common::tower::BoxService< + GetIndexRoutingTableRequest, + GetIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, } #[async_trait::async_trait] impl MetastoreService for MetastoreServiceTowerServiceStack { @@ -2421,6 +2539,18 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.get_cluster_identity_svc.clone().ready().await?.call(request).await } + async fn create_index_routing_table( + &self, + request: CreateIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.create_index_routing_table_svc.clone().ready().await?.call(request).await + } + async fn get_index_routing_table( + &self, + request: GetIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.get_index_routing_table_svc.clone().ready().await?.call(request).await + } async fn check_connectivity(&self) -> anyhow::Result<()> { self.inner.0.check_connectivity().await } @@ -2758,6 +2888,26 @@ type GetClusterIdentityLayer = quickwit_common::tower::BoxLayer< GetClusterIdentityResponse, crate::metastore::MetastoreError, >; +type CreateIndexRoutingTableLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + CreateIndexRoutingTableRequest, + CreateIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + CreateIndexRoutingTableRequest, + CreateIndexRoutingTableResponse, + crate::metastore::MetastoreError, +>; +type GetIndexRoutingTableLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + GetIndexRoutingTableRequest, + GetIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + GetIndexRoutingTableRequest, + GetIndexRoutingTableResponse, + crate::metastore::MetastoreError, +>; #[derive(Debug, Default)] pub struct MetastoreServiceTowerLayerStack { create_index_layers: Vec, @@ -2793,6 +2943,8 @@ pub struct MetastoreServiceTowerLayerStack { list_index_templates_layers: Vec, delete_index_templates_layers: Vec, get_cluster_identity_layers: Vec, + create_index_routing_table_layers: Vec, + get_index_routing_table_layers: Vec, } impl MetastoreServiceTowerLayerStack { pub fn stack_layer(mut self, layer: L) -> Self @@ -3640,6 +3792,60 @@ impl MetastoreServiceTowerLayerStack { >>::Service as tower::Service< GetClusterIdentityRequest, >>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + CreateIndexRoutingTableRequest, + CreateIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + CreateIndexRoutingTableRequest, + Response = CreateIndexRoutingTableResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service< + CreateIndexRoutingTableRequest, + >>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + GetIndexRoutingTableRequest, + GetIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + GetIndexRoutingTableRequest, + Response = GetIndexRoutingTableResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service< + GetIndexRoutingTableRequest, + >>::Future: Send + 'static, { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); @@ -3707,6 +3913,10 @@ impl MetastoreServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.get_cluster_identity_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.create_index_routing_table_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.get_index_routing_table_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } pub fn stack_create_index_layer(mut self, layer: L) -> Self @@ -4366,6 +4576,50 @@ impl MetastoreServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_create_index_routing_table_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + CreateIndexRoutingTableRequest, + CreateIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + CreateIndexRoutingTableRequest, + Response = CreateIndexRoutingTableResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.create_index_routing_table_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); + self + } + pub fn stack_get_index_routing_table_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + GetIndexRoutingTableRequest, + GetIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + GetIndexRoutingTableRequest, + Response = GetIndexRoutingTableResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.get_index_routing_table_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn build(self, instance: T) -> MetastoreServiceClient where T: MetastoreService, @@ -4690,6 +4944,22 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); + let create_index_routing_table_svc = self + .create_index_routing_table_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); + let get_index_routing_table_svc = self + .get_index_routing_table_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let tower_svc_stack = MetastoreServiceTowerServiceStack { inner: inner_client, create_index_svc, @@ -4725,6 +4995,8 @@ impl MetastoreServiceTowerLayerStack { list_index_templates_svc, delete_index_templates_svc, get_cluster_identity_svc, + create_index_routing_table_svc, + get_index_routing_table_svc, }; MetastoreServiceClient::new(tower_svc_stack) } @@ -5022,6 +5294,24 @@ where GetClusterIdentityResponse, crate::metastore::MetastoreError, >, + > + + tower::Service< + CreateIndexRoutingTableRequest, + Response = CreateIndexRoutingTableResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture< + CreateIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, + > + + tower::Service< + GetIndexRoutingTableRequest, + Response = GetIndexRoutingTableResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture< + GetIndexRoutingTableResponse, + crate::metastore::MetastoreError, + >, >, { async fn create_index( @@ -5222,6 +5512,18 @@ where ) -> crate::metastore::MetastoreResult { self.clone().call(request).await } + async fn create_index_routing_table( + &self, + request: CreateIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.clone().call(request).await + } + async fn get_index_routing_table( + &self, + request: GetIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.clone().call(request).await + } async fn check_connectivity(&self) -> anyhow::Result<()> { if self.inner.is_disconnected() { anyhow::bail!("actor `{}` is disconnected", self.inner.actor_instance_id()) @@ -5739,6 +6041,34 @@ where GetClusterIdentityRequest::rpc_name(), )) } + async fn create_index_routing_table( + &self, + request: CreateIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .clone() + .create_index_routing_table(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + CreateIndexRoutingTableRequest::rpc_name(), + )) + } + async fn get_index_routing_table( + &self, + request: GetIndexRoutingTableRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .clone() + .get_index_routing_table(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + GetIndexRoutingTableRequest::rpc_name(), + )) + } async fn check_connectivity(&self) -> anyhow::Result<()> { if self.connection_addrs_rx.borrow().is_empty() { anyhow::bail!("no server currently available") @@ -6140,6 +6470,28 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn create_index_routing_table( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .create_index_routing_table(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } + async fn get_index_routing_table( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .get_index_routing_table(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } } /// Generated client implementations. pub mod metastore_service_grpc_client { @@ -7223,6 +7575,66 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Creates index routing table + pub async fn create_index_routing_table( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/CreateIndexRoutingTable", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.metastore.MetastoreService", + "CreateIndexRoutingTable", + ), + ); + self.inner.unary(req, path, codec).await + } + /// Get index routing table + pub async fn get_index_routing_table( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/GetIndexRoutingTable", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.metastore.MetastoreService", + "GetIndexRoutingTable", + ), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -7479,6 +7891,22 @@ pub mod metastore_service_grpc_server { tonic::Response, tonic::Status, >; + /// Creates index routing table + async fn create_index_routing_table( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Get index routing table + async fn get_index_routing_table( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// Metastore meant to manage Quickwit's indexes, their splits and delete tasks. /// @@ -9187,6 +9615,108 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } + "/quickwit.metastore.MetastoreService/CreateIndexRoutingTable" => { + #[allow(non_camel_case_types)] + struct CreateIndexRoutingTableSvc( + pub Arc, + ); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for CreateIndexRoutingTableSvc { + type Response = super::CreateIndexRoutingTableResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::CreateIndexRoutingTableRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::create_index_routing_table( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CreateIndexRoutingTableSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.metastore.MetastoreService/GetIndexRoutingTable" => { + #[allow(non_camel_case_types)] + struct GetIndexRoutingTableSvc(pub Arc); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for GetIndexRoutingTableSvc { + type Response = super::GetIndexRoutingTableResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_index_routing_table( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetIndexRoutingTableSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index ba371c13d4a..1d9ccf96d10 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -79,6 +79,11 @@ pub enum EntityKind { /// Index template ID. template_id: String, }, + /// A routing table. + RoutingTable { + /// Routing table ID. + routing_table_id: String, + }, } impl fmt::Display for EntityKind { @@ -100,6 +105,9 @@ impl fmt::Display for EntityKind { EntityKind::IndexTemplate { template_id } => { write!(f, "index template `{template_id}`") } + EntityKind::RoutingTable { routing_table_id } => { + write!(f, "routing table `{routing_table_id}`") + } } } }