From e4e678df486080b0e03792d1a777111d064ed782 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Thu, 5 Feb 2026 12:27:29 +0530 Subject: [PATCH 1/3] refactor(metadata-db): decouple physical tables from storage paths --- .gitignore | 1 + crates/core/data-store/src/lib.rs | 17 +- .../20260204061026_split_physical_tables.sql | 141 ++++++++++ crates/core/metadata-db/src/datasets/name.rs | 2 +- .../metadata-db/src/datasets/namespace.rs | 2 +- crates/core/metadata-db/src/jobs/sql.rs | 5 +- crates/core/metadata-db/src/manifests/hash.rs | 2 +- crates/core/metadata-db/src/physical_table.rs | 46 ++- .../metadata-db/src/physical_table/sql.rs | 208 ++++++++------ .../src/physical_table/tests/it_crud.rs | 261 ++++++++---------- .../src/physical_table/tests/it_pagination.rs | 29 +- 11 files changed, 465 insertions(+), 249 deletions(-) create mode 100644 crates/core/metadata-db/migrations/20260204061026_split_physical_tables.sql diff --git a/.gitignore b/.gitignore index bdfc809cf..1759c15d9 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ build/ .idea .vscode/ .amp/ +.agents/settings.local.json diff --git a/crates/core/data-store/src/lib.rs b/crates/core/data-store/src/lib.rs index 765534fc1..d0fa19295 100644 --- a/crates/core/data-store/src/lib.rs +++ b/crates/core/data-store/src/lib.rs @@ -134,20 +134,27 @@ impl DataStore { dataset.hash(), table_name, path, - false, ) .await .map_err(RegisterTableRevisionError::RegisterPhysicalTable)?; - metadata_db::physical_table::mark_inactive_by_table_id(&mut tx, dataset.hash(), table_name) - .await - .map_err(RegisterTableRevisionError::MarkInactive)?; + metadata_db::physical_table::mark_inactive_by_table_id( + &mut tx, + dataset.namespace(), + dataset.name(), + dataset.hash(), + table_name, + ) + .await + .map_err(RegisterTableRevisionError::MarkInactive)?; metadata_db::physical_table::mark_active_by_id( &mut tx, - location_id, + dataset.namespace(), + dataset.name(), dataset.hash(), table_name, + location_id, ) .await .map_err(RegisterTableRevisionError::MarkActive)?; diff --git a/crates/core/metadata-db/migrations/20260204061026_split_physical_tables.sql b/crates/core/metadata-db/migrations/20260204061026_split_physical_tables.sql new file mode 100644 index 000000000..4a0909d51 --- /dev/null +++ b/crates/core/metadata-db/migrations/20260204061026_split_physical_tables.sql @@ -0,0 +1,141 @@ +-- Migration: Split physical_tables into physical_tables (meta) and physical_table_revisions +-- +-- This migration decouples table identity from storage paths, enabling: +-- - Data sharing across datasets with same spec +-- - External data import (link any directory to any table) +-- +-- Changes: +-- 1. Create physical_table_revisions table (preserves existing IDs for FK compatibility) +-- 2. Update file_metadata and gc_manifest FKs to point to revisions +-- 3. Transform physical_tables into a meta table with active_revision_id pointer +-- 4. Populate relationships between tables + +-- ============================================================================= +-- STEP 1: Create physical_table_revisions table +-- ============================================================================= +-- Using GENERATED BY DEFAULT to allow explicit ID insertion during migration +-- This preserves location_id values that file_metadata and gc_manifest reference + +CREATE TABLE physical_table_revisions ( + id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'), + updated_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'), + path TEXT NOT NULL UNIQUE, + writer BIGINT REFERENCES jobs(id) ON DELETE SET NULL, + metadata JSONB NOT NULL DEFAULT ('{}'::jsonb) +); + +-- ============================================================================= +-- STEP 2: Copy revision data with existing IDs +-- ============================================================================= +-- Critical: This preserves the location_id values that file_metadata references + +INSERT INTO physical_table_revisions (id, created_at, updated_at, path, writer) +SELECT id, created_at, created_at AS updated_at, path, writer +FROM physical_tables; + +-- Reset the identity sequence to continue from max ID (only if table has rows) +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM physical_table_revisions LIMIT 1) THEN + PERFORM setval( + pg_get_serial_sequence('physical_table_revisions', 'id'), + (SELECT MAX(id) FROM physical_table_revisions) + ); + END IF; +END $$; + +-- ============================================================================= +-- STEP 3: Update file_metadata FK to point to physical_table_revisions +-- ============================================================================= + +ALTER TABLE file_metadata + DROP CONSTRAINT IF EXISTS file_metadata_location_id_fkey; + +ALTER TABLE file_metadata + ADD CONSTRAINT file_metadata_location_id_fkey + FOREIGN KEY (location_id) + REFERENCES physical_table_revisions(id) + ON DELETE CASCADE; + +-- ============================================================================= +-- STEP 4: Update gc_manifest FK to point to physical_table_revisions +-- ============================================================================= + +ALTER TABLE gc_manifest + DROP CONSTRAINT IF EXISTS gc_manifest_location_id_fkey; + +ALTER TABLE gc_manifest + ADD CONSTRAINT gc_manifest_location_id_fkey + FOREIGN KEY (location_id) + REFERENCES physical_table_revisions(id) + ON DELETE CASCADE; + +-- ============================================================================= +-- STEP 5: Rename old table and create new physical_tables (meta) +-- ============================================================================= + +ALTER TABLE physical_tables RENAME TO physical_tables_old; + +CREATE TABLE physical_tables ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'), + updated_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'), + dataset_namespace TEXT NOT NULL, + dataset_name TEXT NOT NULL, + table_name TEXT NOT NULL, + manifest_hash TEXT NOT NULL, + active_revision_id BIGINT REFERENCES physical_table_revisions(id) ON DELETE SET NULL +); + +-- ============================================================================= +-- STEP 6: Populate physical_tables (meta) from old data +-- ============================================================================= +-- Insert one row per unique (dataset_namespace, dataset_name, table_name, manifest_hash) combination +-- Set active_revision_id to the revision that was marked active (if any) + +INSERT INTO physical_tables ( + created_at, + updated_at, + dataset_namespace, + dataset_name, + table_name, + manifest_hash, + active_revision_id +) +SELECT + MIN(created_at) AS created_at, + MIN(created_at) AS updated_at, + dataset_namespace, + dataset_name, + table_name, + manifest_hash, + (SELECT id FROM physical_tables_old pt2 + WHERE pt2.manifest_hash = physical_tables_old.manifest_hash + AND pt2.table_name = physical_tables_old.table_name + AND pt2.active = true + LIMIT 1) AS active_revision_id +FROM physical_tables_old +GROUP BY dataset_namespace, dataset_name, table_name, manifest_hash; + +-- ============================================================================= +-- STEP 7: Create indexes +-- ============================================================================= + +-- Unique constraint: one physical_table per (dataset_namespace, dataset_name, manifest_hash, table_name) +CREATE UNIQUE INDEX unique_manifest_table +ON physical_tables (dataset_namespace, dataset_name, manifest_hash, table_name); + +-- Index for active_revision_id +CREATE INDEX idx_physical_tables_active_revision_id + ON physical_tables (active_revision_id) WHERE active_revision_id IS NOT NULL; + +-- ============================================================================= +-- STEP 8: Cleanup +-- ============================================================================= + +-- Drop the old partial unique index (no longer needed) +DROP INDEX IF EXISTS unique_active_per_manifest_table; + +-- Drop the old table +DROP TABLE physical_tables_old; diff --git a/crates/core/metadata-db/src/datasets/name.rs b/crates/core/metadata-db/src/datasets/name.rs index 0ce548631..2897efd67 100644 --- a/crates/core/metadata-db/src/datasets/name.rs +++ b/crates/core/metadata-db/src/datasets/name.rs @@ -35,7 +35,7 @@ pub type NameOwned = Name<'static>; /// /// The type trusts that values are already validated. Validation must occur at system /// boundaries before conversion into this type. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, serde::Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Name<'a>(Cow<'a, str>); impl<'a> Name<'a> { diff --git a/crates/core/metadata-db/src/datasets/namespace.rs b/crates/core/metadata-db/src/datasets/namespace.rs index cbd25b6e3..c993f725f 100644 --- a/crates/core/metadata-db/src/datasets/namespace.rs +++ b/crates/core/metadata-db/src/datasets/namespace.rs @@ -35,7 +35,7 @@ pub type NamespaceOwned = Namespace<'static>; /// /// The type trusts that values are already validated. Validation must occur at system /// boundaries before conversion into this type. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, serde::Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Namespace<'a>(Cow<'a, str>); impl<'a> Namespace<'a> { diff --git a/crates/core/metadata-db/src/jobs/sql.rs b/crates/core/metadata-db/src/jobs/sql.rs index 8110399ca..8074a4c89 100644 --- a/crates/core/metadata-db/src/jobs/sql.rs +++ b/crates/core/metadata-db/src/jobs/sql.rs @@ -191,8 +191,9 @@ where j.created_at, j.updated_at FROM jobs j - INNER JOIN physical_tables l ON j.id = l.writer - WHERE l.manifest_hash = $1 + INNER JOIN physical_table_revisions ptr ON j.id = ptr.writer + INNER JOIN physical_tables pt ON pt.active_revision_id = ptr.id + WHERE pt.manifest_hash = $1 ORDER BY j.id ASC "#}; let res = sqlx::query_as(query) diff --git a/crates/core/metadata-db/src/manifests/hash.rs b/crates/core/metadata-db/src/manifests/hash.rs index 807316156..d0ba543d1 100644 --- a/crates/core/metadata-db/src/manifests/hash.rs +++ b/crates/core/metadata-db/src/manifests/hash.rs @@ -34,7 +34,7 @@ pub type HashOwned = Hash<'static>; /// /// The type trusts that values are already validated. Validation must occur at system /// boundaries before conversion into this type. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, serde::Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Hash<'a>(Cow<'a, str>); impl<'a> Hash<'a> { diff --git a/crates/core/metadata-db/src/physical_table.rs b/crates/core/metadata-db/src/physical_table.rs index 472cb6335..1d71de1f5 100644 --- a/crates/core/metadata-db/src/physical_table.rs +++ b/crates/core/metadata-db/src/physical_table.rs @@ -30,16 +30,20 @@ use crate::{ #[tracing::instrument(skip(exe), err)] pub async fn register<'c, E>( exe: E, - dataset_namespace: impl Into> + std::fmt::Debug, - dataset_name: impl Into> + std::fmt::Debug, - manifest_hash: impl Into> + std::fmt::Debug, + dataset_namespace: impl Into> + std::fmt::Debug + serde::Serialize, + dataset_name: impl Into> + std::fmt::Debug + serde::Serialize, + manifest_hash: impl Into> + std::fmt::Debug + serde::Serialize, table_name: impl Into> + std::fmt::Debug, path: impl Into> + std::fmt::Debug, - active: bool, ) -> Result where E: Executor<'c>, { + let metadata = serde_json::json!({ + "dataset_namespace": dataset_namespace, + "dataset_name": dataset_name, + "manifest_hash": manifest_hash, + }); sql::insert( exe, dataset_namespace.into(), @@ -47,13 +51,13 @@ where manifest_hash.into(), table_name.into(), path.into(), - active, + metadata, ) .await .map_err(Error::Database) } -/// Get a physical table location by its ID +/// Get a physical table location by its ID (Only returns active revision) #[tracing::instrument(skip(exe), err)] pub async fn get_by_id<'c, E>( exe: E, @@ -128,15 +132,23 @@ where #[tracing::instrument(skip(exe), err)] pub async fn mark_inactive_by_table_id<'c, E>( exe: E, + dataset_namespace: impl Into> + std::fmt::Debug, + dataset_name: impl Into> + std::fmt::Debug, manifest_hash: impl Into> + std::fmt::Debug, table_name: impl Into> + std::fmt::Debug, ) -> Result<(), Error> where E: Executor<'c>, { - sql::mark_inactive_by_table_id(exe, manifest_hash.into(), table_name.into()) - .await - .map_err(Error::Database) + sql::mark_inactive_by_table_id( + exe, + dataset_namespace.into(), + dataset_name.into(), + manifest_hash.into(), + table_name.into(), + ) + .await + .map_err(Error::Database) } /// Mark a specific location as active @@ -151,15 +163,19 @@ where #[tracing::instrument(skip(exe), err)] pub async fn mark_active_by_id<'c, E>( exe: E, - location_id: impl Into + std::fmt::Debug, + dataset_namespace: impl Into> + std::fmt::Debug, + dataset_name: impl Into> + std::fmt::Debug, manifest_hash: impl Into> + std::fmt::Debug, table_name: impl Into> + std::fmt::Debug, + location_id: impl Into + std::fmt::Debug, ) -> Result<(), Error> where E: Executor<'c>, { sql::mark_active_by_id( exe, + dataset_namespace.into(), + dataset_name.into(), manifest_hash.into(), table_name.into(), location_id.into(), @@ -258,10 +274,14 @@ where .map_err(|err| Error::Database(err.0)) } -/// Basic location information from the database +/// Physical table revision information from the database +/// +/// Represents a specific revision (storage location) of a physical table. +/// The `active` field indicates whether this revision is the currently active +/// one for its parent physical table. #[derive(Debug, Clone, sqlx::FromRow)] pub struct PhysicalTable { - /// Unique identifier for the location + /// Unique identifier for the revision (location_id) pub id: LocationId, /// Manifest hash identifying the dataset version pub manifest_hash: ManifestHashOwned, @@ -274,7 +294,7 @@ pub struct PhysicalTable { pub table_name: TableNameOwned, /// Relative path to the storage location pub path: TablePathOwned, - /// Whether this location is currently active for queries + /// Whether this revision is currently active for queries pub active: bool, /// Writer job ID (if one exists) pub writer: Option, diff --git a/crates/core/metadata-db/src/physical_table/sql.rs b/crates/core/metadata-db/src/physical_table/sql.rs index d556b8c64..de09b5c0e 100644 --- a/crates/core/metadata-db/src/physical_table/sql.rs +++ b/crates/core/metadata-db/src/physical_table/sql.rs @@ -20,7 +20,11 @@ use crate::{ workers::WorkerNodeIdOwned, }; -/// Insert a physical table location into the database and return its ID (idempotent operation) +/// Insert a physical table revision into the database and return its ID (idempotent operation) +/// +/// This operation: +/// 1. Upserts the physical_table (meta) record +/// 2. Inserts the revision record pub async fn insert<'c, E>( exe: E, dataset_namespace: DatasetNamespace<'_>, @@ -28,16 +32,23 @@ pub async fn insert<'c, E>( manifest_hash: ManifestHash<'_>, table_name: Name<'_>, path: Path<'_>, - active: bool, + metadata: JsonValue, ) -> Result where E: Executor<'c, Database = Postgres>, { // Upsert with RETURNING id - the no-op update ensures RETURNING works for both insert and conflict cases let query = indoc::indoc! {" - INSERT INTO physical_tables(manifest_hash, table_name, dataset_namespace, dataset_name, path, active) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (path) DO UPDATE SET manifest_hash = EXCLUDED.manifest_hash + WITH pt AS ( + INSERT INTO physical_tables (manifest_hash, table_name, dataset_namespace, dataset_name) + VALUES ($1, $2, $3, $4) + ON CONFLICT (dataset_namespace, dataset_name, manifest_hash, table_name) DO UPDATE SET updated_at = now() + RETURNING id + ) + INSERT INTO physical_table_revisions (path, metadata) + SELECT $5, $6 + FROM pt + ON CONFLICT (path) DO UPDATE SET updated_at = now() RETURNING id "}; @@ -47,28 +58,41 @@ where .bind(dataset_namespace) .bind(dataset_name) .bind(path) - .bind(active) + .bind(metadata) .fetch_one(exe) .await?; Ok(id) } -/// Get a location by its ID +/// Get a revision by its ID (Only returns active revision) pub async fn get_by_id<'c, E>(exe: E, id: LocationId) -> Result, sqlx::Error> where E: Executor<'c, Database = Postgres>, { - let query = "SELECT * FROM physical_tables WHERE id = $1"; + let query = indoc::indoc! {" + SELECT + ptr.id, + pt.manifest_hash, + pt.dataset_namespace, + pt.dataset_name, + pt.table_name, + ptr.path, + (pt.active_revision_id = ptr.id) AS active, + ptr.writer + FROM physical_table_revisions ptr + JOIN physical_tables pt ON pt.active_revision_id = ptr.id + WHERE ptr.id = $1 + "}; sqlx::query_as(query).bind(id).fetch_optional(exe).await } -/// Get location ID by path only, returns first match if multiple exist +/// Get revision ID by path only, returns first match if multiple exist pub async fn path_to_id<'c, E>(exe: E, path: Path<'_>) -> Result, sqlx::Error> where E: Executor<'c, Database = Postgres>, { - let query = "SELECT id FROM physical_tables WHERE path = $1 LIMIT 1"; + let query = "SELECT id FROM physical_table_revisions WHERE path = $1 LIMIT 1"; let id: Option = sqlx::query_scalar(query) .bind(path) @@ -77,7 +101,7 @@ where Ok(id) } -/// Get a location by its ID with full writer job details +/// Get a revision by its ID with full writer job details (Only returns active revision) pub async fn get_by_id_with_details<'c, E>( exe: E, id: LocationId, @@ -87,14 +111,14 @@ where { let query = indoc::indoc! {" SELECT - -- Location fields - l.id, - l.manifest_hash, - l.table_name, - l.path, - l.active, - l.dataset_namespace, - l.dataset_name, + -- Revision fields + ptr.id, + pt.manifest_hash, + pt.table_name, + ptr.path, + (pt.active_revision_id = ptr.id) AS active, + pt.dataset_namespace, + pt.dataset_name, -- Writer job fields (optional) j.id AS writer_job_id, @@ -103,9 +127,10 @@ where j.descriptor AS writer_job_descriptor, j.created_at AS writer_job_created_at, j.updated_at AS writer_job_updated_at - FROM physical_tables l - LEFT JOIN jobs j ON l.writer = j.id - WHERE l.id = $1 + FROM physical_table_revisions ptr + JOIN physical_tables pt ON pt.active_revision_id = ptr.id + LEFT JOIN jobs j ON ptr.writer = j.id + WHERE ptr.id = $1 "}; // Internal row structure to match the query result @@ -170,7 +195,7 @@ where Ok(Some(LocationWithDetails { table, writer })) } -/// Get the active physical table for a table +/// Get the active revision for a physical table pub async fn get_active_physical_table<'c, E>( exe: E, manifest_hash: ManifestHash<'_>, @@ -180,23 +205,32 @@ where E: Executor<'c, Database = Postgres>, { let query = indoc::indoc! {" - SELECT * - FROM physical_tables - WHERE manifest_hash = $1 AND table_name = $2 AND active + SELECT + ptr.id, + pt.manifest_hash, + pt.dataset_namespace, + pt.dataset_name, + pt.table_name, + ptr.path, + true AS active, + ptr.writer + FROM physical_tables pt + JOIN physical_table_revisions ptr ON ptr.id = pt.active_revision_id + WHERE pt.manifest_hash = $1 AND pt.table_name = $2 "}; - let table = sqlx::query_as(query) + sqlx::query_as(query) .bind(manifest_hash) .bind(table_name) .fetch_optional(exe) - .await?; - - Ok(table) + .await } -/// Deactivate all active locations for a specific table +/// Deactivate all revisions for a specific table (set active_revision_id to NULL) pub async fn mark_inactive_by_table_id<'c, E>( exe: E, + dataset_namespace: DatasetNamespace<'_>, + dataset_name: DatasetName<'_>, manifest_hash: ManifestHash<'_>, table_name: Name<'_>, ) -> Result<(), sqlx::Error> @@ -205,11 +239,13 @@ where { let query = indoc::indoc! {" UPDATE physical_tables - SET active = false - WHERE manifest_hash = $1 AND table_name = $2 AND active + SET active_revision_id = NULL, updated_at = now() + WHERE dataset_namespace = $1 AND dataset_name = $2 AND manifest_hash = $3 AND table_name = $4 "}; sqlx::query(query) + .bind(dataset_namespace) + .bind(dataset_name) .bind(manifest_hash) .bind(table_name) .execute(exe) @@ -220,6 +256,8 @@ where /// Activate a specific location by ID (does not deactivate others) pub async fn mark_active_by_id<'c, E>( exe: E, + dataset_namespace: DatasetNamespace<'_>, + dataset_name: DatasetName<'_>, manifest_hash: ManifestHash<'_>, table_name: Name<'_>, location_id: LocationId, @@ -229,12 +267,14 @@ where { let query = indoc::indoc! {" UPDATE physical_tables - SET active = true - WHERE id = $1 AND manifest_hash = $2 AND table_name = $3 + SET active_revision_id = $1, updated_at = now() + WHERE dataset_namespace = $2 AND dataset_name = $3 AND manifest_hash = $4 AND table_name = $5 "}; sqlx::query(query) .bind(location_id) + .bind(dataset_namespace) + .bind(dataset_name) .bind(manifest_hash) .bind(table_name) .execute(exe) @@ -242,7 +282,7 @@ where Ok(()) } -/// Assign a job as the writer for multiple locations +/// Assign a job as the writer for multiple revisions pub async fn assign_job_writer<'c, E>( exe: E, locations: &[LocationId], @@ -252,8 +292,8 @@ where E: Executor<'c, Database = Postgres>, { let query = indoc::indoc! {" - UPDATE physical_tables - SET writer = $1 + UPDATE physical_table_revisions + SET writer = $1, updated_at = now() WHERE id = ANY($2) "}; @@ -265,16 +305,16 @@ where Ok(()) } -/// Delete a location by its ID +/// Delete a revision by its ID /// /// This will also delete all associated file_metadata entries due to CASCADE. -/// Returns true if the location was deleted, false if it didn't exist. +/// Returns true if the revision was deleted, false if it didn't exist. pub async fn delete_by_id<'c, E>(exe: E, id: LocationId) -> Result where E: Executor<'c, Database = Postgres>, { let query = indoc::indoc! {" - DELETE FROM physical_tables + DELETE FROM physical_table_revisions WHERE id = $1 "}; @@ -283,9 +323,9 @@ where Ok(result.rows_affected() > 0) } -/// List the first page of physical tables +/// List the first page of active physical table revisions /// -/// Returns a paginated list of physical tables ordered by ID in descending order (newest first). +/// Returns a paginated list of revisions ordered by ID in descending order (newest first). /// This function is used to fetch the initial page when no cursor is available. pub async fn list_first_page<'c, E>(exe: E, limit: i64) -> Result, sqlx::Error> where @@ -293,28 +333,28 @@ where { let query = indoc::indoc! {r#" SELECT - id, - manifest_hash, - dataset_namespace, - dataset_name, - table_name, - path, - active, - writer - FROM physical_tables - ORDER BY id DESC + ptr.id, + pt.manifest_hash, + pt.dataset_namespace, + pt.dataset_name, + pt.table_name, + ptr.path, + (pt.active_revision_id = ptr.id) AS active, + ptr.writer + FROM physical_table_revisions ptr + JOIN physical_tables pt ON pt.active_revision_id = ptr.id + ORDER BY ptr.id DESC LIMIT $1 "#}; - let res = sqlx::query_as(query).bind(limit).fetch_all(exe).await?; - Ok(res) + sqlx::query_as(query).bind(limit).fetch_all(exe).await } -/// List subsequent pages of physical tables using cursor-based pagination +/// List subsequent pages of active physical table revisions using cursor-based pagination /// -/// Returns a paginated list of physical tables with IDs less than the provided cursor, +/// Returns a paginated list of revisions with IDs less than the provided cursor, /// ordered by ID in descending order (newest first). This implements cursor-based -/// pagination for efficient traversal of large physical table lists. +/// pagination for efficient traversal of large revision lists. pub async fn list_next_page<'c, E>( exe: E, limit: i64, @@ -325,26 +365,26 @@ where { let query = indoc::indoc! {r#" SELECT - id, - manifest_hash, - dataset_namespace, - dataset_name, - table_name, - path, - active, - writer - FROM physical_tables - WHERE id < $2 - ORDER BY id DESC + ptr.id, + pt.manifest_hash, + pt.dataset_namespace, + pt.dataset_name, + pt.table_name, + ptr.path, + (pt.active_revision_id = ptr.id) AS active, + ptr.writer + FROM physical_table_revisions ptr + JOIN physical_tables pt ON pt.active_revision_id = ptr.id + WHERE ptr.id < $2 + ORDER BY ptr.id DESC LIMIT $1 "#}; - let res = sqlx::query_as(query) + sqlx::query_as(query) .bind(limit) .bind(last_id) .fetch_all(exe) - .await?; - Ok(res) + .await } /// Query active tables and their writer info for a dataset @@ -358,11 +398,12 @@ where let query = indoc::indoc! {r#" SELECT pt.table_name, - pt.writer AS job_id, + ptr.writer AS job_id, j.status AS job_status FROM physical_tables pt - LEFT JOIN jobs j ON pt.writer = j.id - WHERE pt.manifest_hash = $1 AND pt.active = true + JOIN physical_table_revisions ptr ON ptr.id = pt.active_revision_id + LEFT JOIN jobs j ON ptr.writer = j.id + WHERE pt.manifest_hash = $1 ORDER BY pt.table_name "#}; @@ -372,7 +413,7 @@ where .await } -/// Query tables associated with a specific writer +/// Query tables associated with a specific writer (Only returns active revision) pub async fn get_tables_by_writer<'c, E>( exe: E, writer_id: JobId, @@ -382,13 +423,14 @@ where { let query = indoc::indoc! {r#" SELECT - table_name, - manifest_hash, - dataset_namespace, - dataset_name - FROM physical_tables - WHERE writer = $1 AND active = true - ORDER BY table_name + pt.table_name, + pt.manifest_hash, + pt.dataset_namespace, + pt.dataset_name + FROM physical_table_revisions ptr + JOIN physical_tables pt ON pt.active_revision_id = ptr.id + WHERE ptr.writer = $1 + ORDER BY pt.table_name "#}; sqlx::query_as(query).bind(writer_id).fetch_all(exe).await diff --git a/crates/core/metadata-db/src/physical_table/tests/it_crud.rs b/crates/core/metadata-db/src/physical_table/tests/it_crud.rs index fe2264c1f..dcef31f11 100644 --- a/crates/core/metadata-db/src/physical_table/tests/it_crud.rs +++ b/crates/core/metadata-db/src/physical_table/tests/it_crud.rs @@ -31,13 +31,15 @@ async fn insert_creates_location_and_returns_id() { let path = TablePath::from_ref_unchecked( "test-dataset/test_table/01234567-89ab-cdef-0123-456789abcdef", ); - let active = true; //* When let location_id = - physical_table::register(&mut conn, namespace, name, hash, table_name, path, active) + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, &path) .await .expect("Failed to insert location"); + physical_table::mark_active_by_id(&mut conn, namespace, name, hash, table_name, location_id) + .await + .expect("Failed to mark location active"); //* Then assert!( @@ -90,23 +92,15 @@ async fn insert_on_conflict_returns_existing_id() { let path = TablePath::from_ref_unchecked("test-dataset/test_table/unique-revision-id"); // Insert first location - let first_id = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - &path, - true, - ) - .await - .expect("Failed to insert first location"); + let first_id = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, &path) + .await + .expect("Failed to insert first location"); //* When - Try to insert with same path but different data - let second_id = - physical_table::register(&mut conn, namespace, name, hash, table_name, path, false) - .await - .expect("Failed to insert second location"); + let second_id = physical_table::register(&mut conn, namespace, name, hash, table_name, path) + .await + .expect("Failed to insert second location"); //* Then - Should return the same ID due to conflict resolution assert_eq!( @@ -135,10 +129,9 @@ async fn path_to_location_id_finds_existing_location() { let table_name = TableName::from_ref_unchecked("test_table"); let path = TablePath::from_ref_unchecked("test-dataset/test_table/find-me-revision"); - let expected_id = - physical_table::register(&mut conn, namespace, name, hash, table_name, &path, false) - .await - .expect("Failed to insert location"); + let expected_id = physical_table::register(&mut conn, namespace, name, hash, table_name, &path) + .await + .expect("Failed to insert location"); //* When let found_id = physical_table::path_to_id(&mut conn, path) @@ -201,59 +194,61 @@ async fn get_active_by_table_id_filters_by_table_and_active_status() { // Create active location for target table let path1 = TablePath::from_ref_unchecked("test-dataset/test_table/active1-revision"); - let active_id1 = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - &path1, - true, - ) - .await - .expect("Failed to insert active location 1"); + let active_id1 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, &path1) + .await + .expect("Failed to insert active location 1"); + + physical_table::mark_active_by_id(&mut conn, &namespace, &name, &hash, &table_name, active_id1) + .await + .expect("Failed to mark location active"); // Create another active location for different table (still should be returned) let path2 = TablePath::from_ref_unchecked("test-dataset/test_table2/active2-revision"); - let active_id2 = physical_table::register( + let active_id2 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table2_name, &path2) + .await + .expect("Failed to insert active location 2"); + + physical_table::mark_active_by_id( &mut conn, &namespace, &name, &hash, &table2_name, - &path2, - true, + active_id2, ) .await - .expect("Failed to insert active location 2"); + .expect("Failed to mark location active"); // Create inactive location for target table (should be filtered out) let path3 = TablePath::from_ref_unchecked("test-dataset/test_table/inactive-revision"); - physical_table::register( + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path3) + .await + .expect("Failed to insert inactive location"); + + // Create active location for different table (should be filtered out) + let path4 = TablePath::from_ref_unchecked("test-dataset/other_table/other-revision"); + let active_id3 = physical_table::register( &mut conn, &namespace, &name, &hash, - &table_name, - path3, - false, + &other_table_name, + path4, ) .await - .expect("Failed to insert inactive location"); - - // Create active location for different table (should be filtered out) - let path4 = TablePath::from_ref_unchecked("test-dataset/other_table/other-revision"); - physical_table::register( + .expect("Failed to insert location for other table"); + physical_table::mark_active_by_id( &mut conn, &namespace, &name, &hash, &other_table_name, - path4, - true, + active_id3, ) .await - .expect("Failed to insert location for other table"); + .expect("Failed to mark location active"); //* When - Get locations for first table let active_location1 = physical_table::get_active(&mut conn, &hash, &table_name) @@ -320,45 +315,37 @@ async fn mark_inactive_by_table_id_deactivates_only_matching_active_locations() // Create active location for first target table let path1 = TablePath::from_ref_unchecked("test-dataset/test_table/target1-revision"); - let target_id1 = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - path1, - true, - ) - .await - .expect("Failed to insert target location 1"); + let target_id1 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path1) + .await + .expect("Failed to insert target location 1"); + physical_table::mark_active_by_id(&mut conn, &namespace, &name, &hash, &table_name, target_id1) + .await + .expect("Failed to mark location active"); // Create active location for second target table let path2 = TablePath::from_ref_unchecked("test-dataset/test_table2/target2-revision"); - let target_id2 = physical_table::register( + let target_id2 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table2_name, path2) + .await + .expect("Failed to insert target location 2"); + physical_table::mark_active_by_id( &mut conn, &namespace, &name, &hash, &table2_name, - path2, - true, + target_id2, ) .await - .expect("Failed to insert target location 2"); + .expect("Failed to mark location active"); // Create already inactive location for target table (should remain unchanged) let path3 = TablePath::from_ref_unchecked("test-dataset/test_table/already-inactive-revision"); - let inactive_id = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - path3, - false, - ) - .await - .expect("Failed to insert inactive location"); + let inactive_id = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path3) + .await + .expect("Failed to insert inactive location"); // Create active location for different table (should remain unchanged) let path4 = TablePath::from_ref_unchecked("test-dataset/other_table/other-revision"); @@ -369,13 +356,22 @@ async fn mark_inactive_by_table_id_deactivates_only_matching_active_locations() &hash, &other_table_name, path4, - true, ) .await .expect("Failed to insert other table location"); + physical_table::mark_active_by_id( + &mut conn, + &namespace, + &name, + &hash, + &other_table_name, + other_id, + ) + .await + .expect("Failed to mark other location active"); //* When - Mark only the first table inactive - physical_table::mark_inactive_by_table_id(&mut conn, &hash, &table_name) + physical_table::mark_inactive_by_table_id(&mut conn, &namespace, &name, &hash, &table_name) .await .expect("Failed to mark locations inactive"); @@ -419,6 +415,7 @@ async fn mark_active_by_id_activates_specific_location() { let mut conn = Connection::connect_with_retry(&temp_db.connection_uri()) .await .expect("Failed to connect to metadata db"); + conn.run_migrations() .await .expect("Failed to run migrations"); @@ -432,33 +429,19 @@ async fn mark_active_by_id_activates_specific_location() { let table_name = TableName::from_ref_unchecked("test_table"); let path1 = TablePath::from_ref_unchecked("test-dataset/test_table/to-activate-revision"); - let target_id = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - path1, - false, - ) - .await - .expect("Failed to insert location to activate"); + let target_id = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path1) + .await + .expect("Failed to insert location to activate"); let path2 = TablePath::from_ref_unchecked("test-dataset/test_table/stay-inactive-revision"); - let other_id = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - path2, - false, - ) - .await - .expect("Failed to insert other location"); + let other_id = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path2) + .await + .expect("Failed to insert other location"); //* When - physical_table::mark_active_by_id(&mut conn, target_id, &hash, &table_name) + physical_table::mark_active_by_id(&mut conn, &namespace, &name, &hash, &table_name, target_id) .await .expect("Failed to mark location active"); @@ -466,6 +449,7 @@ async fn mark_active_by_id_activates_specific_location() { let target_active = is_location_active(&mut conn, target_id) .await .expect("Failed to check target location active status"); + let other_still_inactive = is_location_active(&mut conn, other_id) .await .expect("Failed to check other location active status"); @@ -515,48 +499,27 @@ async fn assign_job_writer_assigns_job_to_multiple_locations() { // Create locations to assign let path1 = TablePath::from_ref_unchecked("test-dataset/output_table/assign1-revision"); - let location_id1 = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - path1, - false, - ) - .await - .expect("Failed to insert location 1"); + let location_id1 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path1) + .await + .expect("Failed to insert location 1"); let path2 = TablePath::from_ref_unchecked("test-dataset/output_table/assign2-revision"); - let location_id2 = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - path2, - false, - ) - .await - .expect("Failed to insert location 2"); + let location_id2 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path2) + .await + .expect("Failed to insert location 2"); let path3 = TablePath::from_ref_unchecked("test-dataset/output_table/assign3-revision"); - let location_id3 = physical_table::register( - &mut conn, - &namespace, - &name, - &hash, - &table_name, - path3, - false, - ) - .await - .expect("Failed to insert location 3"); + let location_id3 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path3) + .await + .expect("Failed to insert location 3"); // Create a location that should not be assigned let path4 = TablePath::from_ref_unchecked("test-dataset/output_table/not-assigned-revision"); let unassigned_id = - physical_table::register(&mut conn, namespace, name, hash, &table_name, path4, false) + physical_table::register(&mut conn, namespace, name, hash, &table_name, path4) .await .expect("Failed to insert unassigned location"); @@ -625,9 +588,19 @@ async fn get_by_id_returns_existing_location() { let path = TablePath::from_ref_unchecked("test-dataset/test_table/get-by-id-revision"); let inserted_id = - physical_table::register(&mut conn, namespace, name, hash, &table_name, &path, true) + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, &path) .await .expect("Failed to insert location"); + physical_table::mark_active_by_id( + &mut conn, + &namespace, + &name, + &hash, + &table_name, + inserted_id, + ) + .await + .expect("Failed to mark location active"); //* When let location = physical_table::get_by_id_with_details(&mut conn, inserted_id) @@ -691,7 +664,7 @@ async fn get_by_id_returns_none_for_nonexistent_location() { // Helper functions for tests -/// Helper function to fetch location details by ID +/// Helper function to fetch location details by ID (Only checks active revision) async fn get_location_by_id<'c, E>( exe: E, location_id: LocationId, @@ -699,8 +672,13 @@ async fn get_location_by_id<'c, E>( where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { - let query = - "SELECT id, manifest_hash, table_name, path, active FROM physical_tables WHERE id = $1"; + let query = indoc::indoc! {" + SELECT ptr.id, pt.manifest_hash, pt.table_name, ptr.path, + (pt.active_revision_id = ptr.id) AS active + FROM physical_table_revisions ptr + JOIN physical_tables pt ON pt.active_revision_id = ptr.id + WHERE ptr.id = $1 + "}; sqlx::query_as(query).bind(location_id).fetch_one(exe).await } @@ -709,7 +687,12 @@ async fn is_location_active<'c, E>(exe: E, location_id: LocationId) -> Result, { - let query = "SELECT active FROM physical_tables WHERE id = $1"; + let query = indoc::indoc! {" + SELECT COALESCE(pt.active_revision_id = ptr.id, false) AS activ + FROM physical_table_revisions ptr + LEFT JOIN physical_tables pt ON pt.active_revision_id = ptr.id + WHERE ptr.id = $1 + "}; sqlx::query_scalar(query) .bind(location_id) .fetch_one(exe) @@ -724,7 +707,7 @@ async fn get_writer_by_location_id<'c, E>( where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { - let query = "SELECT writer FROM physical_tables WHERE id = $1"; + let query = "SELECT writer FROM physical_table_revisions WHERE id = $1"; sqlx::query_scalar(query) .bind(location_id) .fetch_one(exe) diff --git a/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs b/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs index 0970beb3f..7b53c1bbf 100644 --- a/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs +++ b/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs @@ -57,9 +57,20 @@ async fn list_locations_first_page_respects_limit() { "test-dataset/test_table_{}/revision-{}", i, i )); - physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path, true) - .await - .expect("Failed to insert location"); + let location_id = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path) + .await + .expect("Failed to insert location"); + physical_table::mark_active_by_id( + &mut conn, + &namespace, + &name, + &hash, + &table_name, + location_id, + ) + .await + .expect("Failed to mark location active"); // Small delay to ensure different timestamps tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; @@ -122,10 +133,20 @@ async fn list_locations_next_page_uses_cursor() { i, i )); let location_id = - physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path, true) + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path) .await .expect("Failed to insert location"); all_location_ids.push(location_id); + physical_table::mark_active_by_id( + &mut conn, + &namespace, + &name, + &hash, + &table_name, + location_id, + ) + .await + .expect("Failed to mark location active"); // Small delay to ensure different timestamps tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; From 8a16ba1e438ae6c19166bac0155d42d73dc83bf0 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Thu, 5 Feb 2026 16:05:54 +0530 Subject: [PATCH 2/3] docs(data-store): update docs with physical table revisions architecture --- docs/features/data-store.md | 45 +++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/docs/features/data-store.md b/docs/features/data-store.md index 1ffafa5ce..c63e50d73 100644 --- a/docs/features/data-store.md +++ b/docs/features/data-store.md @@ -31,6 +31,7 @@ Data Store-specific concepts: - **File Registration**: The process of recording parquet file metadata (size, e_tag, version, parquet stats, raw footer bytes) in the metadata database after writing to object storage, enabling queries without scanning object storage. - **Restoration**: Recovering table revision state from object storage when the metadata database is empty (e.g., after database reset or migration). The system scans object storage, identifies revisions, and registers the latest as active. - **Metadata Caching**: In-memory caching of parsed parquet metadata with memory-weighted eviction. For caching architecture details, see [data-metadata-caching](data-metadata-caching.md). +- **Physical Table Revision**: A storage location (object store path) that physical tables link to. Multiple physical tables can share the same revision (many-to-one), enabling data sharing across datasets without copying files. Each physical table has one `active_revision_id` pointing to its queryable revision. ## Usage @@ -131,6 +132,11 @@ When a writer job claims a revision, it updates the `writer` column to reference This prevents concurrent writes and enables the system to track which job is responsible for a revision's contents. If a job fails, the lock can be released by clearing the writer reference. +**Linking** enables associating external data directories with logical tables. +Because physical tables are decoupled from storage paths, any compatible directory can be registered as a new revision. +The revision's `metadata` JSONB field can store informative data for debugging (e.g., the dataset associated at dump time). +To make a linked revision queryable, update the physical table's `active_revision_id` to point to it. + ### Parquet File Management DataStore manages the lifecycle of parquet files within revisions: @@ -184,17 +190,22 @@ Domain components like PhysicalTable delegate all storage operations to DataStor ### Database Schema -The metadata database uses two primary tables to track revisions and files: +The metadata database uses a three-table architecture to track physical tables, their revisions, and files: + +**`physical_tables`** - Stores table identity (meta table). +Each row represents a logical physical table within a dataset. +Key columns: `id` (table primary key), `manifest_hash`, `dataset_namespace`, `dataset_name`, `table_name`, `active_revision_id` (foreign key to revisions). +The `active_revision_id` points to the currently active revision for queries (NULL if no revision is active). -**`physical_tables`** - Stores table revision records. -Each row represents a revision with its storage path and active status. -Key columns: `id` (LocationId), `manifest_hash`, `dataset_namespace`, `dataset_name`, `table_name`, `path`, `active`, `writer` (job reference). -The `active` boolean ensures only one revision per table is queryable. +**`physical_table_revisions`** - Stores storage locations for table revisions. +Each row represents an immutable snapshot of table data at a specific path. +Key columns: `id` (RevisionId), `path` (unique storage path), `writer` (job reference for write locking), `metadata` (JSONB for informative data), `created_at`, `updated_at`. The `path` column stores the relative revision path (`//`). +The `metadata` JSONB stores informative data for debugging and diagnosis (e.g., the dataset associated at dump time, schema versions). This is extensible for management purposes but not used in DB operations. **`file_metadata`** - Stores parquet file records within revisions. Each row represents a single parquet file with its object store metadata. -Key columns: `id` (FileId), `location_id` (foreign key to physical_tables), `file_name`, `url` (full file URL), `object_size`, `object_e_tag`, `object_version`, `metadata` (parquet stats as JSON), `footer` (raw footer bytes). +Key columns: `id` (FileId), `location_id` (foreign key to physical_table_revisions), `file_name`, `url` (full file URL), `object_size`, `object_e_tag`, `object_version`, `metadata` (parquet stats as JSON), `footer` (raw footer bytes). Files are linked to revisions via `location_id`, enabling cascade deletes when revisions are removed. **`footer_cache`** - Stores raw parquet footer bytes for cache population. @@ -204,20 +215,26 @@ The cache table enables efficient footer retrieval without loading full file rec **Relationships:** -- **Revision → Files**: A revision (`physical_tables`) has many files (`file_metadata`) via `location_id`. - Deleting a revision cascades to delete all its files. -- **Revision → Writer**: A revision can be locked by a writer job via the `writer` column (foreign key to `jobs`). - This prevents concurrent writes by associating the revision with a specific job. - Only the owning job should write files to that revision. -- **File → Footer Cache**: Each file has a corresponding entry in `footer_cache` via `file_id`. - The footer is stored in both `file_metadata` and `footer_cache` to allow efficient streaming of file metadata without loading large footer blobs during bulk operations. +- **Table → Revisions**: A physical table (`physical_tables`) has `active_revision_id` pointing to one revision. A single revision (one storage path in `physical_table_revisions`) can be linked as the active revision by multiple physical tables—enabling data sharing across datasets that use the same underlying data. +- **Revision → Files**: A revision has many files (`file_metadata`) via `location_id`. Deleting a revision cascades to delete all its files. +- **Revision → Writer**: A revision can be locked by a writer job via the `writer` column (foreign key to `jobs`). Only the owning job should write files to that revision. +- **File → Footer Cache**: Each file has a corresponding entry in `footer_cache` via `file_id`. The footer is stored in both tables to allow efficient streaming without loading large footer blobs during bulk operations. + +**Revision Linking:** + +The decoupled architecture enables data sharing across datasets: + +- Multiple physical tables can point to the same revision (many-to-one), enabling data sharing across datasets without duplicating storage +- External data directories can be linked by registering new revisions with existing paths +- The `metadata` JSONB provides extensible storage for informative data (e.g., associated dataset at dump time) useful for debugging and diagnosis +- Switching active revision is atomic (update `active_revision_id` pointer) ### Source Files - `crates/core/data-store/src/lib.rs` - `DataStore` struct providing revision lifecycle (create, restore, lock, get active), file registration, streaming file metadata, and cached parquet metadata access. Also defines `PhyTableRevision` (the core revision handle) and `PhyTableRevisionFileMetadata`. - `crates/core/data-store/src/physical_table.rs` - Path and URL types: `PhyTableRevisionPath` (relative path like `dataset/table/uuid`), `PhyTablePath` (table directory path without revision), `PhyTableUrl` (full object store URL). - `crates/core/data-store/src/file_name.rs` - `FileName` type for validated parquet filenames with format `{block_num:09}-{suffix:016x}.parquet` (9-digit block number, 16-char hex suffix). -- `crates/core/metadata-db/src/physical_table.rs` - Database operations for `physical_tables`: register, get active, mark active/inactive, assign writer. +- `crates/core/metadata-db/src/physical_table.rs` - Database operations for `physical_tables` and `physical_table_revisions`: register tables and revisions, get active revision, mark active/inactive by updating `active_revision_id`, assign writer to revisions. - `crates/core/metadata-db/src/files.rs` - Database operations for `file_metadata` and `footer_cache`: register files with footers, stream by location, get footer bytes. ## References From b0de304bcfd995d9d5126fddd7bb7d19bf220d69 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Mon, 9 Feb 2026 15:29:53 +0530 Subject: [PATCH 3/3] refactor(metadata-db): rename mark_inactive_by_table_id to mark_inactive_by_table_name --- crates/core/data-store/src/lib.rs | 4 ++-- crates/core/metadata-db/src/datasets/name.rs | 2 +- .../metadata-db/src/datasets/namespace.rs | 2 +- crates/core/metadata-db/src/manifests/hash.rs | 2 +- crates/core/metadata-db/src/physical_table.rs | 6 +++--- .../metadata-db/src/physical_table/sql.rs | 2 +- .../src/physical_table/tests/it_crud.rs | 20 +++++++++---------- .../src/physical_table/tests/it_pagination.rs | 4 ++-- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/crates/core/data-store/src/lib.rs b/crates/core/data-store/src/lib.rs index d0fa19295..70378bf7e 100644 --- a/crates/core/data-store/src/lib.rs +++ b/crates/core/data-store/src/lib.rs @@ -138,7 +138,7 @@ impl DataStore { .await .map_err(RegisterTableRevisionError::RegisterPhysicalTable)?; - metadata_db::physical_table::mark_inactive_by_table_id( + metadata_db::physical_table::mark_inactive_by_table_name( &mut tx, dataset.namespace(), dataset.name(), @@ -150,11 +150,11 @@ impl DataStore { metadata_db::physical_table::mark_active_by_id( &mut tx, + location_id, dataset.namespace(), dataset.name(), dataset.hash(), table_name, - location_id, ) .await .map_err(RegisterTableRevisionError::MarkActive)?; diff --git a/crates/core/metadata-db/src/datasets/name.rs b/crates/core/metadata-db/src/datasets/name.rs index 2897efd67..99ac6458f 100644 --- a/crates/core/metadata-db/src/datasets/name.rs +++ b/crates/core/metadata-db/src/datasets/name.rs @@ -35,7 +35,7 @@ pub type NameOwned = Name<'static>; /// /// The type trusts that values are already validated. Validation must occur at system /// boundaries before conversion into this type. -#[derive(Clone, serde::Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)] pub struct Name<'a>(Cow<'a, str>); impl<'a> Name<'a> { diff --git a/crates/core/metadata-db/src/datasets/namespace.rs b/crates/core/metadata-db/src/datasets/namespace.rs index c993f725f..f118b97d5 100644 --- a/crates/core/metadata-db/src/datasets/namespace.rs +++ b/crates/core/metadata-db/src/datasets/namespace.rs @@ -35,7 +35,7 @@ pub type NamespaceOwned = Namespace<'static>; /// /// The type trusts that values are already validated. Validation must occur at system /// boundaries before conversion into this type. -#[derive(Clone, serde::Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)] pub struct Namespace<'a>(Cow<'a, str>); impl<'a> Namespace<'a> { diff --git a/crates/core/metadata-db/src/manifests/hash.rs b/crates/core/metadata-db/src/manifests/hash.rs index d0ba543d1..e37dc2ae1 100644 --- a/crates/core/metadata-db/src/manifests/hash.rs +++ b/crates/core/metadata-db/src/manifests/hash.rs @@ -34,7 +34,7 @@ pub type HashOwned = Hash<'static>; /// /// The type trusts that values are already validated. Validation must occur at system /// boundaries before conversion into this type. -#[derive(Clone, serde::Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)] pub struct Hash<'a>(Cow<'a, str>); impl<'a> Hash<'a> { diff --git a/crates/core/metadata-db/src/physical_table.rs b/crates/core/metadata-db/src/physical_table.rs index 1d71de1f5..42b659232 100644 --- a/crates/core/metadata-db/src/physical_table.rs +++ b/crates/core/metadata-db/src/physical_table.rs @@ -130,7 +130,7 @@ where /// This operation should typically be performed within a transaction along with /// `mark_active_by_id()` to ensure atomicity when switching active locations. #[tracing::instrument(skip(exe), err)] -pub async fn mark_inactive_by_table_id<'c, E>( +pub async fn mark_inactive_by_table_name<'c, E>( exe: E, dataset_namespace: impl Into> + std::fmt::Debug, dataset_name: impl Into> + std::fmt::Debug, @@ -140,7 +140,7 @@ pub async fn mark_inactive_by_table_id<'c, E>( where E: Executor<'c>, { - sql::mark_inactive_by_table_id( + sql::mark_inactive_by_table_name( exe, dataset_namespace.into(), dataset_name.into(), @@ -163,11 +163,11 @@ where #[tracing::instrument(skip(exe), err)] pub async fn mark_active_by_id<'c, E>( exe: E, + location_id: impl Into + std::fmt::Debug, dataset_namespace: impl Into> + std::fmt::Debug, dataset_name: impl Into> + std::fmt::Debug, manifest_hash: impl Into> + std::fmt::Debug, table_name: impl Into> + std::fmt::Debug, - location_id: impl Into + std::fmt::Debug, ) -> Result<(), Error> where E: Executor<'c>, diff --git a/crates/core/metadata-db/src/physical_table/sql.rs b/crates/core/metadata-db/src/physical_table/sql.rs index de09b5c0e..5b8e55e6a 100644 --- a/crates/core/metadata-db/src/physical_table/sql.rs +++ b/crates/core/metadata-db/src/physical_table/sql.rs @@ -227,7 +227,7 @@ where } /// Deactivate all revisions for a specific table (set active_revision_id to NULL) -pub async fn mark_inactive_by_table_id<'c, E>( +pub async fn mark_inactive_by_table_name<'c, E>( exe: E, dataset_namespace: DatasetNamespace<'_>, dataset_name: DatasetName<'_>, diff --git a/crates/core/metadata-db/src/physical_table/tests/it_crud.rs b/crates/core/metadata-db/src/physical_table/tests/it_crud.rs index dcef31f11..d4e63f324 100644 --- a/crates/core/metadata-db/src/physical_table/tests/it_crud.rs +++ b/crates/core/metadata-db/src/physical_table/tests/it_crud.rs @@ -37,7 +37,7 @@ async fn insert_creates_location_and_returns_id() { physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, &path) .await .expect("Failed to insert location"); - physical_table::mark_active_by_id(&mut conn, namespace, name, hash, table_name, location_id) + physical_table::mark_active_by_id(&mut conn, location_id, namespace, name, hash, table_name) .await .expect("Failed to mark location active"); @@ -199,7 +199,7 @@ async fn get_active_by_table_id_filters_by_table_and_active_status() { .await .expect("Failed to insert active location 1"); - physical_table::mark_active_by_id(&mut conn, &namespace, &name, &hash, &table_name, active_id1) + physical_table::mark_active_by_id(&mut conn, active_id1, &namespace, &name, &hash, &table_name) .await .expect("Failed to mark location active"); @@ -212,11 +212,11 @@ async fn get_active_by_table_id_filters_by_table_and_active_status() { physical_table::mark_active_by_id( &mut conn, + active_id2, &namespace, &name, &hash, &table2_name, - active_id2, ) .await .expect("Failed to mark location active"); @@ -241,11 +241,11 @@ async fn get_active_by_table_id_filters_by_table_and_active_status() { .expect("Failed to insert location for other table"); physical_table::mark_active_by_id( &mut conn, + active_id3, &namespace, &name, &hash, &other_table_name, - active_id3, ) .await .expect("Failed to mark location active"); @@ -319,7 +319,7 @@ async fn mark_inactive_by_table_id_deactivates_only_matching_active_locations() physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, path1) .await .expect("Failed to insert target location 1"); - physical_table::mark_active_by_id(&mut conn, &namespace, &name, &hash, &table_name, target_id1) + physical_table::mark_active_by_id(&mut conn, target_id1, &namespace, &name, &hash, &table_name) .await .expect("Failed to mark location active"); @@ -331,11 +331,11 @@ async fn mark_inactive_by_table_id_deactivates_only_matching_active_locations() .expect("Failed to insert target location 2"); physical_table::mark_active_by_id( &mut conn, + target_id2, &namespace, &name, &hash, &table2_name, - target_id2, ) .await .expect("Failed to mark location active"); @@ -361,17 +361,17 @@ async fn mark_inactive_by_table_id_deactivates_only_matching_active_locations() .expect("Failed to insert other table location"); physical_table::mark_active_by_id( &mut conn, + other_id, &namespace, &name, &hash, &other_table_name, - other_id, ) .await .expect("Failed to mark other location active"); //* When - Mark only the first table inactive - physical_table::mark_inactive_by_table_id(&mut conn, &namespace, &name, &hash, &table_name) + physical_table::mark_inactive_by_table_name(&mut conn, &namespace, &name, &hash, &table_name) .await .expect("Failed to mark locations inactive"); @@ -441,7 +441,7 @@ async fn mark_active_by_id_activates_specific_location() { .expect("Failed to insert other location"); //* When - physical_table::mark_active_by_id(&mut conn, &namespace, &name, &hash, &table_name, target_id) + physical_table::mark_active_by_id(&mut conn, target_id, &namespace, &name, &hash, &table_name) .await .expect("Failed to mark location active"); @@ -593,11 +593,11 @@ async fn get_by_id_returns_existing_location() { .expect("Failed to insert location"); physical_table::mark_active_by_id( &mut conn, + inserted_id, &namespace, &name, &hash, &table_name, - inserted_id, ) .await .expect("Failed to mark location active"); diff --git a/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs b/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs index 7b53c1bbf..6a4ed97b5 100644 --- a/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs +++ b/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs @@ -63,11 +63,11 @@ async fn list_locations_first_page_respects_limit() { .expect("Failed to insert location"); physical_table::mark_active_by_id( &mut conn, + location_id, &namespace, &name, &hash, &table_name, - location_id, ) .await .expect("Failed to mark location active"); @@ -139,11 +139,11 @@ async fn list_locations_next_page_uses_cursor() { all_location_ids.push(location_id); physical_table::mark_active_by_id( &mut conn, + location_id, &namespace, &name, &hash, &table_name, - location_id, ) .await .expect("Failed to mark location active");