Skip to content

Conversation

@ibrarahmad
Copy link
Contributor

This commit introduces logic to preserve replication origin metadata and commit timestamps during table repair operations. When enabled, repairs now maintain the original source node and timestamp information instead of marking repaired rows with current repair-time metadata.

The implementation extracts origin information from source row metadata and applies it during repair using PostgreSQL's replication origin API. Timestamp parsing supports multiple formats including RFC3339, PostgreSQL text format, and numeric Unix timestamps with automatic scale detection (seconds, milliseconds, microseconds, nanoseconds). Error handling ensures proper session cleanup and graceful degradation when origin metadata is unavailable.

Origin preservation works across repair modes including standard repair, fix-nulls updates, and bidirectional synchronization. New integration tests verify correct preservation of timestamps, origin node IDs, and row content integrity throughout the repair process.

This commit introduces logic to preserve replication origin metadata
and commit timestamps during table repair operations. When enabled,
repairs now maintain the original source node and timestamp information
instead of marking repaired rows with current repair-time metadata.

The implementation extracts origin information from source row metadata
and applies it during repair using PostgreSQL's replication origin API.
Timestamp parsing supports multiple formats including RFC3339, PostgreSQL
text format, and numeric Unix timestamps with automatic scale detection
(seconds, milliseconds, microseconds, nanoseconds). Error handling
ensures proper session cleanup and graceful degradation when origin
metadata is unavailable.

Origin preservation works across repair modes including standard repair,
fix-nulls updates, and bidirectional synchronization. New integration
tests verify correct preservation of timestamps, origin node IDs, and
row content integrity throughout the repair process.
@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

📝 Walkthrough

Walkthrough

This pull request introduces replication origin preservation during table repairs. It adds database query functions for managing replication origins, extends the CLI and HTTP API with a preserve-origin option, implements origin-aware repair logic with per-row timestamp tracking, and includes comprehensive test coverage and documentation updates.

Changes

Cohort / File(s) Summary
Database Query Layer
db/queries/queries.go, db/queries/templates.go
Added six new public functions and corresponding SQL templates for replication origin management: GetReplicationOriginByName, CreateReplicationOrigin, SetupReplicationOriginSession, ResetReplicationOriginSession, SetupReplicationOriginXact, and ResetReplicationOriginXact. Enables querying and managing PostgreSQL replication origins.
CLI & HTTP API Integration
internal/cli/cli.go, internal/api/http/handler.go
Added preserve-origin boolean flag to table-repair command and wired it through the repair task; added corresponding field to HTTP request body for the table-repair endpoint.
Core Repair Logic
internal/consistency/repair/table_repair.go
Implemented origin-aware repair execution with grouping utilities, LSN resolution, per-origin transaction setup, session management, and cleanup. Added PreserveOrigin feature flag, setupTransactionMode, and numerous helper methods for extracting and applying origin metadata during updates and upserts.
Test Infrastructure
internal/consistency/repair/table_repair_batch_test.go, tests/integration/helpers_test.go
Added batch test file for origin key generation, grouping, and timestamp handling; added test helpers: getCommitTimestamp, getReplicationOrigin, compareTimestamps, and compareTimestampsExact.
Integration Tests
tests/integration/table_repair_test.go, tests/integration/timestamp_comparison_test.go, tests/integration/crash_recovery_test.go
Added four new table-repair test scenarios covering bidirectional updates and origin preservation; added timestamp comparison validation tests; modified crash recovery test to disable preserve-origin when origin node is unavailable.
Documentation
docs/api.md, docs/commands/repair/table-repair.md, docs/http-api.md
Added preserve-origin flag documentation to API reference, expanded table-repair command documentation with feature section, and documented preserve_origin field in HTTP API with microsecond-precision timestamp details.
Miscellaneous
go.mod, pkg/logger/logger.go
Updated go-sqlite3 dependency from v1.14.22 to v1.14.33; added SetOutput function to logger package for redirecting log output.

Poem

🐰 With whiskers twitching, we now preserve,
The origin's LSN we serve,
Each row's timestamp, precise to the task,
Repair with ancestry—that's all we ask! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely summarizes the main feature added: preservation of origin and timestamp data during table repair operations.
Description check ✅ Passed The description is directly related to the changeset, explaining the feature implementation, timestamp parsing support, error handling, and integration test coverage.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ibrarahmad
Copy link
Contributor Author

For detailed refence see PR

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@db/queries/templates.go`:
- Around line 1552-1569: Add a brief comment above the replication origin SQL
templates (near GetReplicationOriginByName, CreateReplicationOrigin,
SetupReplicationOriginSession, ResetReplicationOriginSession,
SetupReplicationOriginXact, ResetReplicationOriginXact) stating that the
pg_replication_origin_* functions require PostgreSQL 9.5 or newer and that
calling them typically requires superuser or equivalent replication/owner
privileges; also add a short note to the README or database setup docs
documenting the minimum PostgreSQL version (>=9.5) and the privilege
requirements for these functions.

In `@internal/consistency/repair/table_repair_batch_test.go`:
- Around line 217-224: The comment for TestParseNumericTimestamp incorrectly
states that Unix seconds value 1704067200 corresponds to 2024-01-01 12:00:00
UTC; update the comment to the correct timestamp (2024-01-01 00:00:00 UTC) so it
accurately documents the expected value used in the sec variable and
parseNumericTimestamp test; locate the comment in TestParseNumericTimestamp near
the sec := time.Unix(1704067200, 0) line and change the annotation only.

In `@internal/consistency/repair/table_repair.go`:
- Around line 2736-2750: The code in resolveOriginLSN (the block that picks
survivorNode using t.Pools and t.SourceOfTruth and currently returns fmt.Errorf
when none found) should not hard-fail; instead log or warn and return (nil,
false, nil) to allow repairs to continue in degraded preserve-origin mode.
Change the branch that checks survivorNode == "" so it emits a warning
mentioning batchKey.nodeOrigin (use the existing logger or
fmt.Printf/processLogger) and then returns nil, false, nil rather than an error;
keep the survivor selection logic using t.Pools and t.SourceOfTruth unchanged.
- Around line 2301-2343: The current loop in executeUpserts processes multiple
originGroups in a single transaction so pg_replication_origin_xact_setup() is
overwritten and all rows get the last batch's origin; fix by ensuring each
origin group runs in its own transaction or by resetting the origin xact before
re-setting it: update executeUpserts (referencing executeUpsertBatch,
task.setupOriginForBatchKey, task.resetReplicationOriginXact,
task.resetReplicationOriginSession) to either (A) begin a fresh transaction for
each batchKey, call task.setupOriginForBatchKey, run executeUpsertBatch, commit
that transaction and then reset session as needed, or (B) if keeping one outer
tx, call task.resetReplicationOriginXact() (and handle errors) before calling
task.setupOriginForBatchKey for the next batch so per-batch origin
timestamps/LSNs are preserved.
🧹 Nitpick comments (1)
pkg/logger/logger.go (1)

31-33: *Accept io.Writer instead of os.File in SetOutput.

The underlying Log.SetOutput accepts io.Writer. Restricting the wrapper to *os.File is unnecessarily limiting and goes against Go best practices of accepting interfaces, not concrete types. Since *os.File implements io.Writer, this change is fully backward compatible while enabling greater flexibility for testing and custom output destinations.

♻️ Proposed refactor
 import (
 	"fmt"
+	"io"
 	"os"

 	"github.com/charmbracelet/log"
 )
@@
-func SetOutput(w *os.File) {
+func SetOutput(w io.Writer) {
 	Log.SetOutput(w)
 }

Comment on lines +217 to +224
// TestParseNumericTimestamp tests that Unix timestamps in seconds, ms, µs, and ns are converted correctly.
func TestParseNumericTimestamp(t *testing.T) {
// Unix seconds: 1704067200 = 2024-01-01 12:00:00 UTC
sec := time.Unix(1704067200, 0)
assert.Equal(t, sec, parseNumericTimestamp(1704067200))
// Unix milliseconds: 1704067200000 ms
ms := time.Unix(0, 1704067200000*1e6)
assert.Equal(t, ms, parseNumericTimestamp(1704067200000))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Correct the Unix timestamp annotation.
1704067200 is 2024-01-01 00:00:00 UTC, not 12:00:00 UTC, so the comment is misleading.

Proposed correction
-	// Unix seconds: 1704067200 = 2024-01-01 12:00:00 UTC
+	// Unix seconds: 1704067200 = 2024-01-01 00:00:00 UTC
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TestParseNumericTimestamp tests that Unix timestamps in seconds, ms, µs, and ns are converted correctly.
func TestParseNumericTimestamp(t *testing.T) {
// Unix seconds: 1704067200 = 2024-01-01 12:00:00 UTC
sec := time.Unix(1704067200, 0)
assert.Equal(t, sec, parseNumericTimestamp(1704067200))
// Unix milliseconds: 1704067200000 ms
ms := time.Unix(0, 1704067200000*1e6)
assert.Equal(t, ms, parseNumericTimestamp(1704067200000))
// TestParseNumericTimestamp tests that Unix timestamps in seconds, ms, µs, and ns are converted correctly.
func TestParseNumericTimestamp(t *testing.T) {
// Unix seconds: 1704067200 = 2024-01-01 00:00:00 UTC
sec := time.Unix(1704067200, 0)
assert.Equal(t, sec, parseNumericTimestamp(1704067200))
// Unix milliseconds: 1704067200000 ms
ms := time.Unix(0, 1704067200000*1e6)
assert.Equal(t, ms, parseNumericTimestamp(1704067200000))
🤖 Prompt for AI Agents
In `@internal/consistency/repair/table_repair_batch_test.go` around lines 217 -
224, The comment for TestParseNumericTimestamp incorrectly states that Unix
seconds value 1704067200 corresponds to 2024-01-01 12:00:00 UTC; update the
comment to the correct timestamp (2024-01-01 00:00:00 UTC) so it accurately
documents the expected value used in the sec variable and parseNumericTimestamp
test; locate the comment in TestParseNumericTimestamp near the sec :=
time.Unix(1704067200, 0) line and change the annotation only.

Comment on lines +2301 to +2343
// executeUpserts handles upserting rows in batches.
// originInfoMap maps primary key strings to their origin information.
// If originInfoMap is nil or empty, origin tracking is skipped.
func executeUpserts(tx pgx.Tx, task *TableRepairTask, nodeName string, upserts map[string]map[string]any, colTypes map[string]string, originInfoMap map[string]*rowOriginInfo) (int, error) {
if err := task.filterStaleRepairs(task.Ctx, tx, nodeName, upserts, colTypes, "upsert"); err != nil {
return 0, err
}

originGroups := groupUpsertsByOrigin(upserts, originInfoMap, task.PreserveOrigin)

totalUpsertedCount := 0
setupSessions := make(map[string]bool)

for batchKey, originUpserts := range originGroups {
if len(originUpserts) == 0 {
continue
}

preserveThisGroup, err := task.setupOriginForBatchKey(tx, batchKey, nodeName, setupSessions)
if err != nil {
return totalUpsertedCount, err
}

count, err := executeUpsertBatch(tx, task, originUpserts, colTypes)
if err != nil {
if preserveThisGroup {
if resetErr := task.resetReplicationOriginXact(tx); resetErr != nil {
logger.Warn("failed to reset replication origin xact during cleanup: %v", resetErr)
}
if resetErr := task.resetReplicationOriginSession(tx); resetErr != nil {
logger.Warn("failed to reset replication origin session during cleanup: %v", resetErr)
}
}
return totalUpsertedCount, err
}
totalUpsertedCount += count
}

if len(setupSessions) > 0 {
if err := task.resetReplicationOriginSession(tx); err != nil {
return totalUpsertedCount, fmt.Errorf("failed to reset replication origin session after upserts: %w", err)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

PostgreSQL pg_replication_origin_xact_setup transaction scope multiple calls

💡 Result:

pg_replication_origin_xact_setup(origin_lsn, origin_timestamp) is transaction-scoped: it “marks the current transaction” as replaying a remote transaction (and requires that a replication origin has already been selected with pg_replication_origin_session_setup()). [1][3]

If you call pg_replication_origin_xact_setup() multiple times within the same transaction, PostgreSQL simply overwrites the per-transaction origin fields (origin_lsn, origin_timestamp) each time—there’s no stacking/accumulation; the last call in that transaction is what remains in effect. [2]

If you need to undo it within the same transaction, use pg_replication_origin_xact_reset(), which “cancels the effects” of pg_replication_origin_xact_setup() (without clearing the session’s selected origin). [1][2]

Sources:
[1] PostgreSQL 18 docs – System Administration Functions (pg_replication_origin_xact_setup/reset)
[2] PostgreSQL source (origin.c) – implementation shows it just assigns/overwrites replorigin_xact_state.origin_lsn and origin_timestamp
[3] PostgreSQL docs – Replication Progress Tracking / replication origins overview


🌐 Web query:

PostgreSQL commit timestamp replication origin per-transaction or per-statement

💡 Result:

PostgreSQL’s commit timestamp is tracked per transaction (per XID), not per statement.

  • pg_xact_commit_timestamp(xid) returns the commit timestamp of a transaction (one timestamp for the whole XID). If multiple statements run inside the same transaction, they all share that same commit timestamp. [1]
  • pg_xact_commit_timestamp_origin(xid) likewise reports the transaction’s commit timestamp plus its replication origin. [1]
  • Replication-origin commit timestamps are also set per transaction: pg_replication_origin_xact_setup() lets you configure the LSN and commit timestamp of every source transaction “on a per transaction basis”. [2]
  • PostgreSQL core developers explicitly describe it as “not per row but per xact”. [3]

Implication: you only effectively get “per-statement” timestamps if each statement runs in its own transaction (autocommit). [1][2][3]

Sources: [1] https://access.crunchydata.com/documentation/postgresql15/15.13/functions-info.html, [2] https://www.postgresql.org/docs/current/replication-origins.html, [3] https://www.postgresql.org/message-id/1b2e2697-76fc-7d8f-1ca6-588093531d7d%40anayrat.info


🏁 Script executed:

find . -type f -name "table_repair.go" | head -5

Repository: pgEdge/ace

Length of output: 99


🏁 Script executed:

cat -n internal/consistency/repair/table_repair.go | sed -n '2301,2343p'

Repository: pgEdge/ace

Length of output: 2004


🏁 Script executed:

# Find setupOriginForBatchKey definition
rg -A 15 "func.*setupOriginForBatchKey" internal/consistency/repair/table_repair.go

Repository: pgEdge/ace

Length of output: 641


🏁 Script executed:

# Find groupUpsertsByOrigin definition to understand batching
rg -A 20 "func.*groupUpsertsByOrigin" internal/consistency/repair/table_repair.go

Repository: pgEdge/ace

Length of output: 845


🏁 Script executed:

# Get the full setupOriginForBatchKey function
rg -A 40 "func \(t \*TableRepairTask\) setupOriginForBatchKey" internal/consistency/repair/table_repair.go

Repository: pgEdge/ace

Length of output: 1483


🏁 Script executed:

# Look for pg_replication_origin_xact_setup calls to understand the implementation
rg "pg_replication_origin_xact_setup|replication_origin_xact_setup" internal/consistency/repair/table_repair.go -B 2 -A 2

Repository: pgEdge/ace

Length of output: 36


🏁 Script executed:

# Look for xact setup functions
rg "setupReplicationOrigin|pg_replication" internal/consistency/repair/table_repair.go -B 1 -A 2

Repository: pgEdge/ace

Length of output: 1589


🏁 Script executed:

# Get more of the setupOriginForBatchKey function
rg -A 50 "func \(t \*TableRepairTask\) setupOriginForBatchKey" internal/consistency/repair/table_repair.go | head -60

Repository: pgEdge/ace

Length of output: 2172


🏁 Script executed:

# Check if there's a reset between batches
rg -A 20 "for batchKey, originUpserts := range originGroups" internal/consistency/repair/table_repair.go

Repository: pgEdge/ace

Length of output: 803


🏁 Script executed:

# Look at resetReplicationOriginXact implementation
rg -A 8 "func \(t \*TableRepairTask\) resetReplicationOriginXact" internal/consistency/repair/table_repair.go

Repository: pgEdge/ace

Length of output: 417


🏁 Script executed:

# Look at the full setupOriginForBatchKey to see the complete xact setup logic
sed -n '2250,2300p' internal/consistency/repair/table_repair.go

Repository: pgEdge/ace

Length of output: 1383


Per-origin batches in a single transaction cannot preserve per-batch timestamps/origins.
PostgreSQL's pg_replication_origin_xact_setup() is transaction-scoped; each call overwrites the previous one. The code iterates through multiple origin batches in a single transaction (lines 2314–2337) without resetting the xact setup between batches, so the commit timestamp and LSN recorded at transaction commit will be from the last batch only, not per-batch. All rows inherit the final batch's origin metadata, defeating the preserve-origin purpose.

Fix: Run each origin batch in its own transaction, or explicitly call pg_replication_origin_xact_reset() before each subsequent setupReplicationOriginXact() to ensure per-batch metadata is preserved.

🤖 Prompt for AI Agents
In `@internal/consistency/repair/table_repair.go` around lines 2301 - 2343, The
current loop in executeUpserts processes multiple originGroups in a single
transaction so pg_replication_origin_xact_setup() is overwritten and all rows
get the last batch's origin; fix by ensuring each origin group runs in its own
transaction or by resetting the origin xact before re-setting it: update
executeUpserts (referencing executeUpsertBatch, task.setupOriginForBatchKey,
task.resetReplicationOriginXact, task.resetReplicationOriginSession) to either
(A) begin a fresh transaction for each batchKey, call
task.setupOriginForBatchKey, run executeUpsertBatch, commit that transaction and
then reset session as needed, or (B) if keeping one outer tx, call
task.resetReplicationOriginXact() (and handle errors) before calling
task.setupOriginForBatchKey for the next batch so per-batch origin
timestamps/LSNs are preserved.

Comment on lines +2736 to +2750
// Find a survivor node to fetch LSN from
var survivorNode string
for poolNode := range t.Pools {
if poolNode != batchKey.nodeOrigin && poolNode != nodeName {
survivorNode = poolNode
break
}
}
if survivorNode == "" && t.SourceOfTruth != "" && t.SourceOfTruth != batchKey.nodeOrigin {
survivorNode = t.SourceOfTruth
}

if survivorNode == "" {
return nil, false, fmt.Errorf("no survivor node available to fetch LSN for origin node %s", batchKey.nodeOrigin)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Graceful fallback should not hard‑fail when no survivor is available.
resolveOriginLSN returns an error when it can’t pick a survivor node, which aborts repairs even though preserve‑origin is supposed to degrade. Consider warning and returning (nil, false, nil) instead.

Suggested change
-	if survivorNode == "" {
-		return nil, false, fmt.Errorf("no survivor node available to fetch LSN for origin node %s", batchKey.nodeOrigin)
-	}
+	if survivorNode == "" {
+		logger.Warn("preserve-origin: no survivor node available to fetch LSN for origin node %s; falling back to regular repair", batchKey.nodeOrigin)
+		return nil, false, nil
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Find a survivor node to fetch LSN from
var survivorNode string
for poolNode := range t.Pools {
if poolNode != batchKey.nodeOrigin && poolNode != nodeName {
survivorNode = poolNode
break
}
}
if survivorNode == "" && t.SourceOfTruth != "" && t.SourceOfTruth != batchKey.nodeOrigin {
survivorNode = t.SourceOfTruth
}
if survivorNode == "" {
return nil, false, fmt.Errorf("no survivor node available to fetch LSN for origin node %s", batchKey.nodeOrigin)
}
// Find a survivor node to fetch LSN from
var survivorNode string
for poolNode := range t.Pools {
if poolNode != batchKey.nodeOrigin && poolNode != nodeName {
survivorNode = poolNode
break
}
}
if survivorNode == "" && t.SourceOfTruth != "" && t.SourceOfTruth != batchKey.nodeOrigin {
survivorNode = t.SourceOfTruth
}
if survivorNode == "" {
logger.Warn("preserve-origin: no survivor node available to fetch LSN for origin node %s; falling back to regular repair", batchKey.nodeOrigin)
return nil, false, nil
}
🤖 Prompt for AI Agents
In `@internal/consistency/repair/table_repair.go` around lines 2736 - 2750, The
code in resolveOriginLSN (the block that picks survivorNode using t.Pools and
t.SourceOfTruth and currently returns fmt.Errorf when none found) should not
hard-fail; instead log or warn and return (nil, false, nil) to allow repairs to
continue in degraded preserve-origin mode. Change the branch that checks
survivorNode == "" so it emits a warning mentioning batchKey.nodeOrigin (use the
existing logger or fmt.Printf/processLogger) and then returns nil, false, nil
rather than an error; keep the survivor selection logic using t.Pools and
t.SourceOfTruth unchanged.

@mason-sharp mason-sharp marked this pull request as draft February 12, 2026 00:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant