Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS index_routing_rules;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS index_routing_rules (
routing_table_id VARCHAR(50) NOT NULL,
rank INTEGER NOT NULL,
filter TEXT NOT NULL,
index_id VARCHAR(50) NOT NULL,
create_timestamp TIMESTAMP NOT NULL DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'),
update_timestamp TIMESTAMP NOT NULL DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'),

PRIMARY KEY (routing_table_id, rank),
FOREIGN KEY(index_id) REFERENCES indexes(index_id)
);
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use quickwit_common::uri::Uri;
use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClient};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse,
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse,
CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery,
DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest,
DeleteTask, EmptyResponse, FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse,
GetClusterIdentityRequest, GetClusterIdentityResponse, GetIndexRoutingTableRequest,
GetIndexRoutingTableResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse,
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
Expand Down Expand Up @@ -289,4 +290,18 @@ impl MetastoreService for ControlPlaneMetastore {
) -> MetastoreResult<GetClusterIdentityResponse> {
self.metastore.get_cluster_identity(request).await
}

async fn create_index_routing_table(
&self,
request: CreateIndexRoutingTableRequest,
) -> MetastoreResult<CreateIndexRoutingTableResponse> {
self.metastore.create_index_routing_table(request).await
}

async fn get_index_routing_table(
&self,
request: GetIndexRoutingTableRequest,
) -> MetastoreResult<GetIndexRoutingTableResponse> {
self.metastore.get_index_routing_table(request).await
}
}
54 changes: 38 additions & 16 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,24 @@ use quickwit_common::ServiceStream;
use quickwit_config::IndexTemplate;
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind,
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest,
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest,
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils,
CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse,
CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery,
DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest,
DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest,
FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse,
GetIndexRoutingTableRequest, GetIndexRoutingTableResponse, GetIndexTemplateRequest,
GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason,
IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest,
IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse,
ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
serde_utils,
};
use quickwit_proto::types::{IndexId, IndexUid};
use quickwit_storage::Storage;
Expand Down Expand Up @@ -1275,6 +1277,26 @@ impl MetastoreService for FileBackedMetastore {
uuid: state_wlock_guard.identity.hyphenated().to_string(),
})
}

async fn create_index_routing_table(
&self,
_request: CreateIndexRoutingTableRequest,
) -> MetastoreResult<CreateIndexRoutingTableResponse> {
Err(MetastoreError::Internal {
message: "routing tables are not supported in file-backed metastore".to_string(),
cause: "unsupported operation".to_string(),
})
}

async fn get_index_routing_table(
&self,
_request: GetIndexRoutingTableRequest,
) -> MetastoreResult<GetIndexRoutingTableResponse> {
Err(MetastoreError::Internal {
message: "routing tables are not supported in file-backed metastore".to_string(),
cause: "unsupported operation".to_string(),
})
}
}

impl MetastoreServiceExt for FileBackedMetastore {}
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pub(super) fn convert_sqlx_err(index_id: &str, sqlx_error: sqlx::Error) -> Metas
index_id: index_id.to_string(),
})
}
(pg_error_codes::UNIQUE_VIOLATION, Some("index_routing_rules")) => {
error!(error=?boxed_db_error, "postgresql-error");
MetastoreError::Internal {
message: "duplicate rank in routing table".to_string(),
cause: format!("DB error {boxed_db_error:?}"),
}
}
(pg_error_codes::UNIQUE_VIOLATION, _) => {
error!(error=?boxed_db_error, "postgresql-error");
MetastoreError::Internal {
Expand Down
94 changes: 80 additions & 14 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ use quickwit_config::{
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind,
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
IndexStats, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse,
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse,
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse,
CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery,
DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest,
DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest,
FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse,
GetIndexRoutingTableRequest, GetIndexRoutingTableResponse, GetIndexTemplateRequest,
GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason,
IndexMetadataRequest, IndexMetadataResponse, IndexRoutingRule, IndexStats, IndexTemplateMatch,
IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest,
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest,
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
PublishSplitsRequest, ResetSourceCheckpointRequest, SplitStats, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
Expand All @@ -57,7 +59,9 @@ use uuid::Uuid;

use super::error::convert_sqlx_err;
use super::migrator::run_migrations;
use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits};
use super::model::{
PgDeleteTask, PgIndex, PgIndexRoutingRule, PgIndexTemplate, PgShard, PgSplit, Splits,
};
use super::pool::TrackedPool;
use super::split_stream::SplitStream;
use super::utils::{append_query_filters_and_order_by, establish_connection};
Expand Down Expand Up @@ -1763,6 +1767,68 @@ impl MetastoreService for PostgresqlMetastore {
.await?;
Ok(GetClusterIdentityResponse { uuid })
}

async fn create_index_routing_table(
&self,
request: CreateIndexRoutingTableRequest,
) -> MetastoreResult<CreateIndexRoutingTableResponse> {
const INSERT_ROUTING_RULE_QUERY: &str = include_str!("queries/routing_rules/insert.sql");

// Validate request
if request.rules.is_empty() {
return Err(MetastoreError::InvalidArgument {
message: "routing table must contain at least one rule".to_string(),
});
}

// Generate routing_table_id
let routing_table_id = Uuid::new_v4().hyphenated().to_string();

// Use transaction for atomicity
run_with_tx!(self.connection_pool, tx, "create_routing_table", {
// Insert each rule with its array index as the rank
for (rank, rule) in request.rules.iter().enumerate() {
sqlx::query(INSERT_ROUTING_RULE_QUERY)
.bind(&routing_table_id)
.bind(rank as i32)
.bind(&rule.filter)
.bind(&rule.index_id)
.execute(tx.as_mut())
.await
.map_err(|e| convert_sqlx_err("routing_rules", e))?;
}

Ok(CreateIndexRoutingTableResponse { routing_table_id })
})
}

async fn get_index_routing_table(
&self,
request: GetIndexRoutingTableRequest,
) -> MetastoreResult<GetIndexRoutingTableResponse> {
const SELECT_ROUTING_RULES_QUERY: &str = include_str!("queries/routing_rules/select.sql");

let pg_rules: Vec<PgIndexRoutingRule> = sqlx::query_as(SELECT_ROUTING_RULES_QUERY)
.bind(&request.routing_table_id)
.fetch_all(&self.connection_pool)
.await?;

if pg_rules.is_empty() {
return Err(MetastoreError::NotFound(EntityKind::RoutingTable {
routing_table_id: request.routing_table_id.clone(),
}));
}

let rules = pg_rules
.into_iter()
.map(|pg_rule| IndexRoutingRule {
index_id: pg_rule.index_id,
filter: pg_rule.filter,
})
.collect();

Ok(GetIndexRoutingTableResponse { rules })
}
}

async fn open_or_fetch_shard<'e>(
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,11 @@ impl From<PgShard> for Shard {
pub(super) struct PgIndexTemplate {
pub index_template_json: String,
}

#[derive(sqlx::FromRow, Debug)]
pub(super) struct PgIndexRoutingRule {
pub routing_table_id: String,
pub rank: i32,
pub filter: String,
pub index_id: String,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERT INTO index_routing_rules (routing_table_id, rank, filter, index_id)
VALUES ($1, $2, $3, $4)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SELECT routing_table_id, rank, filter, index_id
FROM index_routing_rules
WHERE routing_table_id = $1
ORDER BY rank ASC
38 changes: 31 additions & 7 deletions quickwit/quickwit-proto/protos/quickwit/metastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,16 @@ service MetastoreService {

// Get cluster identity
rpc GetClusterIdentity(GetClusterIdentityRequest) returns (GetClusterIdentityResponse);
}

message EmptyResponse {
// Creates index routing table
rpc CreateIndexRoutingTable(CreateIndexRoutingTableRequest) returns (CreateIndexRoutingTableResponse);

// Get index routing table
rpc GetIndexRoutingTable(GetIndexRoutingTableRequest) returns (GetIndexRoutingTableResponse);
}

message EmptyResponse {}

message CreateIndexRequest {
string index_config_json = 2;
repeated string source_configs_json = 3;
Expand All @@ -229,7 +234,7 @@ message UpdateIndexRequest {
}

message ListIndexesMetadataRequest {
reserved 1;
reserved 1;
// List of patterns an index should match or not match to get considered
// An index must match at least one positive pattern (a pattern not starting
// with a '-'), and no negative pattern (a pattern starting with a '-').
Expand Down Expand Up @@ -550,8 +555,7 @@ message IndexTemplateMatch {
string index_template_json = 3;
}

message ListIndexTemplatesRequest {
}
message ListIndexTemplatesRequest {}

message ListIndexTemplatesResponse {
repeated string index_templates_json = 1;
Expand All @@ -561,9 +565,29 @@ message DeleteIndexTemplatesRequest {
repeated string template_ids = 1;
}

message GetClusterIdentityRequest {
}
message GetClusterIdentityRequest {}

message GetClusterIdentityResponse {
string uuid = 1;
}

message CreateIndexRoutingTableRequest {
repeated IndexRoutingRule rules = 1;
}

message CreateIndexRoutingTableResponse {
string routing_table_id = 1;
}

message GetIndexRoutingTableRequest {
string routing_table_id = 1;
}

message GetIndexRoutingTableResponse {
repeated IndexRoutingRule rules = 1;
}

message IndexRoutingRule {
string index_id = 1;
string filter = 2;
}
Loading