-
Notifications
You must be signed in to change notification settings - Fork 3
Add origin and timestamp preservation to table repair. #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This commit introduces logic to preserve replication origin metadata and commit timestamps during table repair operations. When enabled, repairs now maintain the original source node and timestamp information instead of marking repaired rows with current repair-time metadata. The implementation extracts origin information from source row metadata and applies it during repair using PostgreSQL's replication origin API. Timestamp parsing supports multiple formats including RFC3339, PostgreSQL text format, and numeric Unix timestamps with automatic scale detection (seconds, milliseconds, microseconds, nanoseconds). Error handling ensures proper session cleanup and graceful degradation when origin metadata is unavailable. Origin preservation works across repair modes including standard repair, fix-nulls updates, and bidirectional synchronization. New integration tests verify correct preservation of timestamps, origin node IDs, and row content integrity throughout the repair process.
📝 WalkthroughWalkthroughThis pull request introduces replication origin preservation during table repairs. It adds database query functions for managing replication origins, extends the CLI and HTTP API with a preserve-origin option, implements origin-aware repair logic with per-row timestamp tracking, and includes comprehensive test coverage and documentation updates. Changes
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
For detailed refence see PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@db/queries/templates.go`:
- Around line 1552-1569: Add a brief comment above the replication origin SQL
templates (near GetReplicationOriginByName, CreateReplicationOrigin,
SetupReplicationOriginSession, ResetReplicationOriginSession,
SetupReplicationOriginXact, ResetReplicationOriginXact) stating that the
pg_replication_origin_* functions require PostgreSQL 9.5 or newer and that
calling them typically requires superuser or equivalent replication/owner
privileges; also add a short note to the README or database setup docs
documenting the minimum PostgreSQL version (>=9.5) and the privilege
requirements for these functions.
In `@internal/consistency/repair/table_repair_batch_test.go`:
- Around line 217-224: The comment for TestParseNumericTimestamp incorrectly
states that Unix seconds value 1704067200 corresponds to 2024-01-01 12:00:00
UTC; update the comment to the correct timestamp (2024-01-01 00:00:00 UTC) so it
accurately documents the expected value used in the sec variable and
parseNumericTimestamp test; locate the comment in TestParseNumericTimestamp near
the sec := time.Unix(1704067200, 0) line and change the annotation only.
In `@internal/consistency/repair/table_repair.go`:
- Around line 2736-2750: The code in resolveOriginLSN (the block that picks
survivorNode using t.Pools and t.SourceOfTruth and currently returns fmt.Errorf
when none found) should not hard-fail; instead log or warn and return (nil,
false, nil) to allow repairs to continue in degraded preserve-origin mode.
Change the branch that checks survivorNode == "" so it emits a warning
mentioning batchKey.nodeOrigin (use the existing logger or
fmt.Printf/processLogger) and then returns nil, false, nil rather than an error;
keep the survivor selection logic using t.Pools and t.SourceOfTruth unchanged.
- Around line 2301-2343: The current loop in executeUpserts processes multiple
originGroups in a single transaction so pg_replication_origin_xact_setup() is
overwritten and all rows get the last batch's origin; fix by ensuring each
origin group runs in its own transaction or by resetting the origin xact before
re-setting it: update executeUpserts (referencing executeUpsertBatch,
task.setupOriginForBatchKey, task.resetReplicationOriginXact,
task.resetReplicationOriginSession) to either (A) begin a fresh transaction for
each batchKey, call task.setupOriginForBatchKey, run executeUpsertBatch, commit
that transaction and then reset session as needed, or (B) if keeping one outer
tx, call task.resetReplicationOriginXact() (and handle errors) before calling
task.setupOriginForBatchKey for the next batch so per-batch origin
timestamps/LSNs are preserved.
🧹 Nitpick comments (1)
pkg/logger/logger.go (1)
31-33: *Accept io.Writer instead of os.File in SetOutput.The underlying
Log.SetOutputacceptsio.Writer. Restricting the wrapper to*os.Fileis unnecessarily limiting and goes against Go best practices of accepting interfaces, not concrete types. Since*os.Fileimplementsio.Writer, this change is fully backward compatible while enabling greater flexibility for testing and custom output destinations.♻️ Proposed refactor
import ( "fmt" + "io" "os" "github.com/charmbracelet/log" ) @@ -func SetOutput(w *os.File) { +func SetOutput(w io.Writer) { Log.SetOutput(w) }
| // TestParseNumericTimestamp tests that Unix timestamps in seconds, ms, µs, and ns are converted correctly. | ||
| func TestParseNumericTimestamp(t *testing.T) { | ||
| // Unix seconds: 1704067200 = 2024-01-01 12:00:00 UTC | ||
| sec := time.Unix(1704067200, 0) | ||
| assert.Equal(t, sec, parseNumericTimestamp(1704067200)) | ||
| // Unix milliseconds: 1704067200000 ms | ||
| ms := time.Unix(0, 1704067200000*1e6) | ||
| assert.Equal(t, ms, parseNumericTimestamp(1704067200000)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct the Unix timestamp annotation.
1704067200 is 2024-01-01 00:00:00 UTC, not 12:00:00 UTC, so the comment is misleading.
Proposed correction
- // Unix seconds: 1704067200 = 2024-01-01 12:00:00 UTC
+ // Unix seconds: 1704067200 = 2024-01-01 00:00:00 UTC📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // TestParseNumericTimestamp tests that Unix timestamps in seconds, ms, µs, and ns are converted correctly. | |
| func TestParseNumericTimestamp(t *testing.T) { | |
| // Unix seconds: 1704067200 = 2024-01-01 12:00:00 UTC | |
| sec := time.Unix(1704067200, 0) | |
| assert.Equal(t, sec, parseNumericTimestamp(1704067200)) | |
| // Unix milliseconds: 1704067200000 ms | |
| ms := time.Unix(0, 1704067200000*1e6) | |
| assert.Equal(t, ms, parseNumericTimestamp(1704067200000)) | |
| // TestParseNumericTimestamp tests that Unix timestamps in seconds, ms, µs, and ns are converted correctly. | |
| func TestParseNumericTimestamp(t *testing.T) { | |
| // Unix seconds: 1704067200 = 2024-01-01 00:00:00 UTC | |
| sec := time.Unix(1704067200, 0) | |
| assert.Equal(t, sec, parseNumericTimestamp(1704067200)) | |
| // Unix milliseconds: 1704067200000 ms | |
| ms := time.Unix(0, 1704067200000*1e6) | |
| assert.Equal(t, ms, parseNumericTimestamp(1704067200000)) |
🤖 Prompt for AI Agents
In `@internal/consistency/repair/table_repair_batch_test.go` around lines 217 -
224, The comment for TestParseNumericTimestamp incorrectly states that Unix
seconds value 1704067200 corresponds to 2024-01-01 12:00:00 UTC; update the
comment to the correct timestamp (2024-01-01 00:00:00 UTC) so it accurately
documents the expected value used in the sec variable and parseNumericTimestamp
test; locate the comment in TestParseNumericTimestamp near the sec :=
time.Unix(1704067200, 0) line and change the annotation only.
| // executeUpserts handles upserting rows in batches. | ||
| // originInfoMap maps primary key strings to their origin information. | ||
| // If originInfoMap is nil or empty, origin tracking is skipped. | ||
| func executeUpserts(tx pgx.Tx, task *TableRepairTask, nodeName string, upserts map[string]map[string]any, colTypes map[string]string, originInfoMap map[string]*rowOriginInfo) (int, error) { | ||
| if err := task.filterStaleRepairs(task.Ctx, tx, nodeName, upserts, colTypes, "upsert"); err != nil { | ||
| return 0, err | ||
| } | ||
|
|
||
| originGroups := groupUpsertsByOrigin(upserts, originInfoMap, task.PreserveOrigin) | ||
|
|
||
| totalUpsertedCount := 0 | ||
| setupSessions := make(map[string]bool) | ||
|
|
||
| for batchKey, originUpserts := range originGroups { | ||
| if len(originUpserts) == 0 { | ||
| continue | ||
| } | ||
|
|
||
| preserveThisGroup, err := task.setupOriginForBatchKey(tx, batchKey, nodeName, setupSessions) | ||
| if err != nil { | ||
| return totalUpsertedCount, err | ||
| } | ||
|
|
||
| count, err := executeUpsertBatch(tx, task, originUpserts, colTypes) | ||
| if err != nil { | ||
| if preserveThisGroup { | ||
| if resetErr := task.resetReplicationOriginXact(tx); resetErr != nil { | ||
| logger.Warn("failed to reset replication origin xact during cleanup: %v", resetErr) | ||
| } | ||
| if resetErr := task.resetReplicationOriginSession(tx); resetErr != nil { | ||
| logger.Warn("failed to reset replication origin session during cleanup: %v", resetErr) | ||
| } | ||
| } | ||
| return totalUpsertedCount, err | ||
| } | ||
| totalUpsertedCount += count | ||
| } | ||
|
|
||
| if len(setupSessions) > 0 { | ||
| if err := task.resetReplicationOriginSession(tx); err != nil { | ||
| return totalUpsertedCount, fmt.Errorf("failed to reset replication origin session after upserts: %w", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
PostgreSQL pg_replication_origin_xact_setup transaction scope multiple calls
💡 Result:
pg_replication_origin_xact_setup(origin_lsn, origin_timestamp) is transaction-scoped: it “marks the current transaction” as replaying a remote transaction (and requires that a replication origin has already been selected with pg_replication_origin_session_setup()). [1][3]
If you call pg_replication_origin_xact_setup() multiple times within the same transaction, PostgreSQL simply overwrites the per-transaction origin fields (origin_lsn, origin_timestamp) each time—there’s no stacking/accumulation; the last call in that transaction is what remains in effect. [2]
If you need to undo it within the same transaction, use pg_replication_origin_xact_reset(), which “cancels the effects” of pg_replication_origin_xact_setup() (without clearing the session’s selected origin). [1][2]
Sources:
[1] PostgreSQL 18 docs – System Administration Functions (pg_replication_origin_xact_setup/reset)
[2] PostgreSQL source (origin.c) – implementation shows it just assigns/overwrites replorigin_xact_state.origin_lsn and origin_timestamp
[3] PostgreSQL docs – Replication Progress Tracking / replication origins overview
🌐 Web query:
PostgreSQL commit timestamp replication origin per-transaction or per-statement
💡 Result:
PostgreSQL’s commit timestamp is tracked per transaction (per XID), not per statement.
pg_xact_commit_timestamp(xid)returns the commit timestamp of a transaction (one timestamp for the whole XID). If multiple statements run inside the same transaction, they all share that same commit timestamp. [1]pg_xact_commit_timestamp_origin(xid)likewise reports the transaction’s commit timestamp plus its replication origin. [1]- Replication-origin commit timestamps are also set per transaction:
pg_replication_origin_xact_setup()lets you configure the LSN and commit timestamp of every source transaction “on a per transaction basis”. [2] - PostgreSQL core developers explicitly describe it as “not per row but per xact”. [3]
Implication: you only effectively get “per-statement” timestamps if each statement runs in its own transaction (autocommit). [1][2][3]
Sources: [1] https://access.crunchydata.com/documentation/postgresql15/15.13/functions-info.html, [2] https://www.postgresql.org/docs/current/replication-origins.html, [3] https://www.postgresql.org/message-id/1b2e2697-76fc-7d8f-1ca6-588093531d7d%40anayrat.info
🏁 Script executed:
find . -type f -name "table_repair.go" | head -5Repository: pgEdge/ace
Length of output: 99
🏁 Script executed:
cat -n internal/consistency/repair/table_repair.go | sed -n '2301,2343p'Repository: pgEdge/ace
Length of output: 2004
🏁 Script executed:
# Find setupOriginForBatchKey definition
rg -A 15 "func.*setupOriginForBatchKey" internal/consistency/repair/table_repair.goRepository: pgEdge/ace
Length of output: 641
🏁 Script executed:
# Find groupUpsertsByOrigin definition to understand batching
rg -A 20 "func.*groupUpsertsByOrigin" internal/consistency/repair/table_repair.goRepository: pgEdge/ace
Length of output: 845
🏁 Script executed:
# Get the full setupOriginForBatchKey function
rg -A 40 "func \(t \*TableRepairTask\) setupOriginForBatchKey" internal/consistency/repair/table_repair.goRepository: pgEdge/ace
Length of output: 1483
🏁 Script executed:
# Look for pg_replication_origin_xact_setup calls to understand the implementation
rg "pg_replication_origin_xact_setup|replication_origin_xact_setup" internal/consistency/repair/table_repair.go -B 2 -A 2Repository: pgEdge/ace
Length of output: 36
🏁 Script executed:
# Look for xact setup functions
rg "setupReplicationOrigin|pg_replication" internal/consistency/repair/table_repair.go -B 1 -A 2Repository: pgEdge/ace
Length of output: 1589
🏁 Script executed:
# Get more of the setupOriginForBatchKey function
rg -A 50 "func \(t \*TableRepairTask\) setupOriginForBatchKey" internal/consistency/repair/table_repair.go | head -60Repository: pgEdge/ace
Length of output: 2172
🏁 Script executed:
# Check if there's a reset between batches
rg -A 20 "for batchKey, originUpserts := range originGroups" internal/consistency/repair/table_repair.goRepository: pgEdge/ace
Length of output: 803
🏁 Script executed:
# Look at resetReplicationOriginXact implementation
rg -A 8 "func \(t \*TableRepairTask\) resetReplicationOriginXact" internal/consistency/repair/table_repair.goRepository: pgEdge/ace
Length of output: 417
🏁 Script executed:
# Look at the full setupOriginForBatchKey to see the complete xact setup logic
sed -n '2250,2300p' internal/consistency/repair/table_repair.goRepository: pgEdge/ace
Length of output: 1383
Per-origin batches in a single transaction cannot preserve per-batch timestamps/origins.
PostgreSQL's pg_replication_origin_xact_setup() is transaction-scoped; each call overwrites the previous one. The code iterates through multiple origin batches in a single transaction (lines 2314–2337) without resetting the xact setup between batches, so the commit timestamp and LSN recorded at transaction commit will be from the last batch only, not per-batch. All rows inherit the final batch's origin metadata, defeating the preserve-origin purpose.
Fix: Run each origin batch in its own transaction, or explicitly call pg_replication_origin_xact_reset() before each subsequent setupReplicationOriginXact() to ensure per-batch metadata is preserved.
🤖 Prompt for AI Agents
In `@internal/consistency/repair/table_repair.go` around lines 2301 - 2343, The
current loop in executeUpserts processes multiple originGroups in a single
transaction so pg_replication_origin_xact_setup() is overwritten and all rows
get the last batch's origin; fix by ensuring each origin group runs in its own
transaction or by resetting the origin xact before re-setting it: update
executeUpserts (referencing executeUpsertBatch, task.setupOriginForBatchKey,
task.resetReplicationOriginXact, task.resetReplicationOriginSession) to either
(A) begin a fresh transaction for each batchKey, call
task.setupOriginForBatchKey, run executeUpsertBatch, commit that transaction and
then reset session as needed, or (B) if keeping one outer tx, call
task.resetReplicationOriginXact() (and handle errors) before calling
task.setupOriginForBatchKey for the next batch so per-batch origin
timestamps/LSNs are preserved.
| // Find a survivor node to fetch LSN from | ||
| var survivorNode string | ||
| for poolNode := range t.Pools { | ||
| if poolNode != batchKey.nodeOrigin && poolNode != nodeName { | ||
| survivorNode = poolNode | ||
| break | ||
| } | ||
| } | ||
| if survivorNode == "" && t.SourceOfTruth != "" && t.SourceOfTruth != batchKey.nodeOrigin { | ||
| survivorNode = t.SourceOfTruth | ||
| } | ||
|
|
||
| if survivorNode == "" { | ||
| return nil, false, fmt.Errorf("no survivor node available to fetch LSN for origin node %s", batchKey.nodeOrigin) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Graceful fallback should not hard‑fail when no survivor is available.
resolveOriginLSN returns an error when it can’t pick a survivor node, which aborts repairs even though preserve‑origin is supposed to degrade. Consider warning and returning (nil, false, nil) instead.
Suggested change
- if survivorNode == "" {
- return nil, false, fmt.Errorf("no survivor node available to fetch LSN for origin node %s", batchKey.nodeOrigin)
- }
+ if survivorNode == "" {
+ logger.Warn("preserve-origin: no survivor node available to fetch LSN for origin node %s; falling back to regular repair", batchKey.nodeOrigin)
+ return nil, false, nil
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Find a survivor node to fetch LSN from | |
| var survivorNode string | |
| for poolNode := range t.Pools { | |
| if poolNode != batchKey.nodeOrigin && poolNode != nodeName { | |
| survivorNode = poolNode | |
| break | |
| } | |
| } | |
| if survivorNode == "" && t.SourceOfTruth != "" && t.SourceOfTruth != batchKey.nodeOrigin { | |
| survivorNode = t.SourceOfTruth | |
| } | |
| if survivorNode == "" { | |
| return nil, false, fmt.Errorf("no survivor node available to fetch LSN for origin node %s", batchKey.nodeOrigin) | |
| } | |
| // Find a survivor node to fetch LSN from | |
| var survivorNode string | |
| for poolNode := range t.Pools { | |
| if poolNode != batchKey.nodeOrigin && poolNode != nodeName { | |
| survivorNode = poolNode | |
| break | |
| } | |
| } | |
| if survivorNode == "" && t.SourceOfTruth != "" && t.SourceOfTruth != batchKey.nodeOrigin { | |
| survivorNode = t.SourceOfTruth | |
| } | |
| if survivorNode == "" { | |
| logger.Warn("preserve-origin: no survivor node available to fetch LSN for origin node %s; falling back to regular repair", batchKey.nodeOrigin) | |
| return nil, false, nil | |
| } |
🤖 Prompt for AI Agents
In `@internal/consistency/repair/table_repair.go` around lines 2736 - 2750, The
code in resolveOriginLSN (the block that picks survivorNode using t.Pools and
t.SourceOfTruth and currently returns fmt.Errorf when none found) should not
hard-fail; instead log or warn and return (nil, false, nil) to allow repairs to
continue in degraded preserve-origin mode. Change the branch that checks
survivorNode == "" so it emits a warning mentioning batchKey.nodeOrigin (use the
existing logger or fmt.Printf/processLogger) and then returns nil, false, nil
rather than an error; keep the survivor selection logic using t.Pools and
t.SourceOfTruth unchanged.
This commit introduces logic to preserve replication origin metadata and commit timestamps during table repair operations. When enabled, repairs now maintain the original source node and timestamp information instead of marking repaired rows with current repair-time metadata.
The implementation extracts origin information from source row metadata and applies it during repair using PostgreSQL's replication origin API. Timestamp parsing supports multiple formats including RFC3339, PostgreSQL text format, and numeric Unix timestamps with automatic scale detection (seconds, milliseconds, microseconds, nanoseconds). Error handling ensures proper session cleanup and graceful degradation when origin metadata is unavailable.
Origin preservation works across repair modes including standard repair, fix-nulls updates, and bidirectional synchronization. New integration tests verify correct preservation of timestamps, origin node IDs, and row content integrity throughout the repair process.