Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ build/
.idea
.vscode/
.amp/
.agents/settings.local.json
15 changes: 11 additions & 4 deletions crates/core/data-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,25 @@ impl DataStore {
dataset.hash(),
table_name,
path,
false,
)
.await
.map_err(RegisterTableRevisionError::RegisterPhysicalTable)?;

metadata_db::physical_table::mark_inactive_by_table_id(&mut tx, dataset.hash(), table_name)
.await
.map_err(RegisterTableRevisionError::MarkInactive)?;
metadata_db::physical_table::mark_inactive_by_table_name(
&mut tx,
dataset.namespace(),
dataset.name(),
dataset.hash(),
table_name,
)
.await
.map_err(RegisterTableRevisionError::MarkInactive)?;

metadata_db::physical_table::mark_active_by_id(
&mut tx,
location_id,
dataset.namespace(),
dataset.name(),
dataset.hash(),
table_name,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
-- Migration: Split physical_tables into physical_tables (meta) and physical_table_revisions
--
-- This migration decouples table identity from storage paths, enabling:
-- - Data sharing across datasets with same spec
-- - External data import (link any directory to any table)
--
-- Changes:
-- 1. Create physical_table_revisions table (preserves existing IDs for FK compatibility)
-- 2. Update file_metadata and gc_manifest FKs to point to revisions
-- 3. Transform physical_tables into a meta table with active_revision_id pointer
-- 4. Populate relationships between tables

-- =============================================================================
-- STEP 1: Create physical_table_revisions table
-- =============================================================================
-- Using GENERATED BY DEFAULT to allow explicit ID insertion during migration
-- This preserves location_id values that file_metadata and gc_manifest reference

CREATE TABLE physical_table_revisions (
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'),
updated_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'),
path TEXT NOT NULL UNIQUE,
writer BIGINT REFERENCES jobs(id) ON DELETE SET NULL,
metadata JSONB NOT NULL DEFAULT ('{}'::jsonb)
);

-- =============================================================================
-- STEP 2: Copy revision data with existing IDs
-- =============================================================================
-- Critical: This preserves the location_id values that file_metadata references

INSERT INTO physical_table_revisions (id, created_at, updated_at, path, writer)
SELECT id, created_at, created_at AS updated_at, path, writer
FROM physical_tables;

-- Reset the identity sequence to continue from max ID (only if table has rows)
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM physical_table_revisions LIMIT 1) THEN
PERFORM setval(
pg_get_serial_sequence('physical_table_revisions', 'id'),
(SELECT MAX(id) FROM physical_table_revisions)
);
END IF;
END $$;

-- =============================================================================
-- STEP 3: Update file_metadata FK to point to physical_table_revisions
-- =============================================================================

ALTER TABLE file_metadata
DROP CONSTRAINT IF EXISTS file_metadata_location_id_fkey;

ALTER TABLE file_metadata
ADD CONSTRAINT file_metadata_location_id_fkey
FOREIGN KEY (location_id)
REFERENCES physical_table_revisions(id)
ON DELETE CASCADE;

-- =============================================================================
-- STEP 4: Update gc_manifest FK to point to physical_table_revisions
-- =============================================================================

ALTER TABLE gc_manifest
DROP CONSTRAINT IF EXISTS gc_manifest_location_id_fkey;

ALTER TABLE gc_manifest
ADD CONSTRAINT gc_manifest_location_id_fkey
FOREIGN KEY (location_id)
REFERENCES physical_table_revisions(id)
ON DELETE CASCADE;

-- =============================================================================
-- STEP 5: Rename old table and create new physical_tables (meta)
-- =============================================================================

ALTER TABLE physical_tables RENAME TO physical_tables_old;

CREATE TABLE physical_tables (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'),
updated_at TIMESTAMP NOT NULL DEFAULT (now() AT TIME ZONE 'utc'),
dataset_namespace TEXT NOT NULL,
dataset_name TEXT NOT NULL,
table_name TEXT NOT NULL,
manifest_hash TEXT NOT NULL,
active_revision_id BIGINT REFERENCES physical_table_revisions(id) ON DELETE SET NULL
);

-- =============================================================================
-- STEP 6: Populate physical_tables (meta) from old data
-- =============================================================================
-- Insert one row per unique (dataset_namespace, dataset_name, table_name, manifest_hash) combination
-- Set active_revision_id to the revision that was marked active (if any)

INSERT INTO physical_tables (
created_at,
updated_at,
dataset_namespace,
dataset_name,
table_name,
manifest_hash,
active_revision_id
)
SELECT
MIN(created_at) AS created_at,
MIN(created_at) AS updated_at,
dataset_namespace,
dataset_name,
table_name,
manifest_hash,
(SELECT id FROM physical_tables_old pt2
WHERE pt2.manifest_hash = physical_tables_old.manifest_hash
AND pt2.table_name = physical_tables_old.table_name
AND pt2.active = true
LIMIT 1) AS active_revision_id
FROM physical_tables_old
GROUP BY dataset_namespace, dataset_name, table_name, manifest_hash;

-- =============================================================================
-- STEP 7: Create indexes
-- =============================================================================

-- Unique constraint: one physical_table per (dataset_namespace, dataset_name, manifest_hash, table_name)
CREATE UNIQUE INDEX unique_manifest_table
ON physical_tables (dataset_namespace, dataset_name, manifest_hash, table_name);

-- Index for active_revision_id
CREATE INDEX idx_physical_tables_active_revision_id
ON physical_tables (active_revision_id) WHERE active_revision_id IS NOT NULL;

-- =============================================================================
-- STEP 8: Cleanup
-- =============================================================================

-- Drop the old partial unique index (no longer needed)
DROP INDEX IF EXISTS unique_active_per_manifest_table;

-- Drop the old table
DROP TABLE physical_tables_old;
2 changes: 1 addition & 1 deletion crates/core/metadata-db/src/datasets/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub type NameOwned = Name<'static>;
///
/// The type trusts that values are already validated. Validation must occur at system
/// boundaries before conversion into this type.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)]
pub struct Name<'a>(Cow<'a, str>);

impl<'a> Name<'a> {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/metadata-db/src/datasets/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub type NamespaceOwned = Namespace<'static>;
///
/// The type trusts that values are already validated. Validation must occur at system
/// boundaries before conversion into this type.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)]
pub struct Namespace<'a>(Cow<'a, str>);

impl<'a> Namespace<'a> {
Expand Down
5 changes: 3 additions & 2 deletions crates/core/metadata-db/src/jobs/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ where
j.created_at,
j.updated_at
FROM jobs j
INNER JOIN physical_tables l ON j.id = l.writer
WHERE l.manifest_hash = $1
INNER JOIN physical_table_revisions ptr ON j.id = ptr.writer
INNER JOIN physical_tables pt ON pt.active_revision_id = ptr.id
WHERE pt.manifest_hash = $1
ORDER BY j.id ASC
"#};
let res = sqlx::query_as(query)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/metadata-db/src/manifests/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub type HashOwned = Hash<'static>;
///
/// The type trusts that values are already validated. Validation must occur at system
/// boundaries before conversion into this type.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)]
pub struct Hash<'a>(Cow<'a, str>);

impl<'a> Hash<'a> {
Expand Down
46 changes: 33 additions & 13 deletions crates/core/metadata-db/src/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,34 @@ use crate::{
#[tracing::instrument(skip(exe), err)]
pub async fn register<'c, E>(
exe: E,
dataset_namespace: impl Into<DatasetNamespace<'_>> + std::fmt::Debug,
dataset_name: impl Into<DatasetName<'_>> + std::fmt::Debug,
manifest_hash: impl Into<ManifestHash<'_>> + std::fmt::Debug,
dataset_namespace: impl Into<DatasetNamespace<'_>> + std::fmt::Debug + serde::Serialize,
dataset_name: impl Into<DatasetName<'_>> + std::fmt::Debug + serde::Serialize,
manifest_hash: impl Into<ManifestHash<'_>> + std::fmt::Debug + serde::Serialize,
table_name: impl Into<TableName<'_>> + std::fmt::Debug,
path: impl Into<TablePath<'_>> + std::fmt::Debug,
active: bool,
) -> Result<LocationId, Error>
where
E: Executor<'c>,
{
let metadata = serde_json::json!({
"dataset_namespace": dataset_namespace,
"dataset_name": dataset_name,
"manifest_hash": manifest_hash,
});
sql::insert(
exe,
dataset_namespace.into(),
dataset_name.into(),
manifest_hash.into(),
table_name.into(),
path.into(),
active,
metadata,
)
.await
.map_err(Error::Database)
}

/// Get a physical table location by its ID
/// Get a physical table location by its ID (Only returns active revision)
#[tracing::instrument(skip(exe), err)]
pub async fn get_by_id<'c, E>(
exe: E,
Expand Down Expand Up @@ -126,17 +130,25 @@ where
/// This operation should typically be performed within a transaction along with
/// `mark_active_by_id()` to ensure atomicity when switching active locations.
#[tracing::instrument(skip(exe), err)]
pub async fn mark_inactive_by_table_id<'c, E>(
pub async fn mark_inactive_by_table_name<'c, E>(
exe: E,
dataset_namespace: impl Into<DatasetNamespace<'_>> + std::fmt::Debug,
dataset_name: impl Into<DatasetName<'_>> + std::fmt::Debug,
manifest_hash: impl Into<ManifestHash<'_>> + std::fmt::Debug,
table_name: impl Into<TableName<'_>> + std::fmt::Debug,
) -> Result<(), Error>
where
E: Executor<'c>,
{
sql::mark_inactive_by_table_id(exe, manifest_hash.into(), table_name.into())
.await
.map_err(Error::Database)
sql::mark_inactive_by_table_name(
exe,
dataset_namespace.into(),
dataset_name.into(),
manifest_hash.into(),
table_name.into(),
)
.await
.map_err(Error::Database)
}

/// Mark a specific location as active
Expand All @@ -152,6 +164,8 @@ where
pub async fn mark_active_by_id<'c, E>(
exe: E,
location_id: impl Into<LocationId> + std::fmt::Debug,
dataset_namespace: impl Into<DatasetNamespace<'_>> + std::fmt::Debug,
dataset_name: impl Into<DatasetName<'_>> + std::fmt::Debug,
manifest_hash: impl Into<ManifestHash<'_>> + std::fmt::Debug,
table_name: impl Into<TableName<'_>> + std::fmt::Debug,
) -> Result<(), Error>
Expand All @@ -160,6 +174,8 @@ where
{
sql::mark_active_by_id(
exe,
dataset_namespace.into(),
dataset_name.into(),
manifest_hash.into(),
table_name.into(),
location_id.into(),
Expand Down Expand Up @@ -258,10 +274,14 @@ where
.map_err(|err| Error::Database(err.0))
}

/// Basic location information from the database
/// Physical table revision information from the database
///
/// Represents a specific revision (storage location) of a physical table.
/// The `active` field indicates whether this revision is the currently active
/// one for its parent physical table.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct PhysicalTable {
/// Unique identifier for the location
/// Unique identifier for the revision (location_id)
pub id: LocationId,
/// Manifest hash identifying the dataset version
pub manifest_hash: ManifestHashOwned,
Expand All @@ -274,7 +294,7 @@ pub struct PhysicalTable {
pub table_name: TableNameOwned,
/// Relative path to the storage location
pub path: TablePathOwned,
/// Whether this location is currently active for queries
/// Whether this revision is currently active for queries
pub active: bool,
/// Writer job ID (if one exists)
pub writer: Option<JobId>,
Expand Down
Loading