Skip to content

Conversation

@ibrarahmad
Copy link
Contributor

Implement preserve-origin feature that maintains original replication origin node ID and timstamp when repairing rows during recovery scenarios. This prevents replication conflicts when the origin node returns to the cluster.

Test Scenario

  • Source Node: n3 (has 70 rows with origin_id=49708, timestamp=2026-01-15 21:17:31)
  • Target Node: n2 (initially empty, needs recovery)
  • Test Rows: IDs 21-25 (sample of 10 recovered rows)
  • Original Origin: node_n1 (origin_id=49708 on n3)
Test Case Row ID Source Timestamp Source Origin Recovered Timestamp Recovered Origin Timestamp Preserved Origin Preserved Both Preserved
WITHOUT Patch
(no --preserve-origin)
21 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
22 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
23 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
24 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
25 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
WITH Patch
(--preserve-origin)
21 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
22 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
23 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
24 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
25 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES

Implement preserve-origin feature that maintains original replication origin
node ID and LSN when repairing rows during recovery scenarios. This prevents
replication conflicts when the origin node returns to the cluster.
Copy link
Member

@mason-sharp mason-sharp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments.

Also, please add tests.

Ibrar Ahmed added 2 commits February 2, 2026 18:15
Enhances the preserve-origin feature to maintain per-row timestamp accuracy
during table repairs. Each unique timestamp now gets its own transaction,
ensuring rows maintain their original commit timestamps with microsecond
precision. This is critical for temporal ordering and conflict resolution in
distributed database recovery scenarios.

Key changes: refactored grouping to use (origin, LSN, timestamp) tuples,
implemented per-timestamp transaction management, changed timestamp format
from RFC3339 to RFC3339Nano for microsecond precision, added unit tests for
batch key functions, and moved replication origin resets after commit.
Enhances preserve-origin documentation to describe how each unique timestamp
gets its own transaction for microsecond-precision preservation. Critical for
temporal ordering and conflict resolution in recovery scenarios.

Key changes: added preserve-origin flag to API docs, expanded table-repair
command documentation with per-row timestamp details, and updated HTTP API
documentation with behavior notes.
@ibrarahmad
Copy link
Contributor Author

A couple of comments.

Also, please add tests.

Test case added

@ibrarahmad ibrarahmad reopened this Feb 2, 2026
Ibrar Ahmed added 2 commits February 2, 2026 19:39
…mode

After insert-only repair with source-of-truth n1:
- All rows from n1 are copied to n2 (upserted)
- n2's unique rows are preserved (not deleted)
- Result: n1 has 0 unique rows, n2 has 2 unique rows

Updated test assertions from incorrect expectations (2, 4, 4) to
correct values (0, 2, 2) that match actual insert-only behavior.
…vation

Tests that preserve-origin flag correctly preserves commit timestamps and
replication origin metadata during table repair recovery operations.
@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

Warning

Rate limit exceeded

@ibrarahmad has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 19 minutes and 55 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📝 Walkthrough

Walkthrough

Adds preserve-origin support across repair logic, CLI and HTTP surfaces, DB templates/queries, docs, and tests; also sets default_cluster in ace.yaml to "test-cluster".

Changes

Cohort / File(s) Summary
Configuration
ace.yaml
Set default_cluster from an empty string to "test-cluster".
DB templates & queries
db/queries/templates.go, db/queries/queries.go
Added six SQL templates and matching query functions to manage replication origins, sessions, and transactions: GetReplicationOriginByName, CreateReplicationOrigin, SetupReplicationOriginSession, ResetReplicationOriginSession, SetupReplicationOriginXact, ResetReplicationOriginXact.
Repair core logic
internal/consistency/repair/table_repair.go
Added PreserveOrigin to TableRepairTask; introduced origin metadata types and helpers, propagated sourceRow through null/upsert flows, grouped upserts by origin/timestamp, resolved/fetched origin LSNs, and added per-origin session/xact setup/reset with many updated signatures and call sites.
CLI & HTTP surface
internal/cli/cli.go, internal/api/http/handler.go
Added --preserve-origin CLI flag and optional preserve_origin HTTP field; propagate value into TableRepairTask.PreserveOrigin.
Documentation
docs/api.md, docs/http-api.md, docs/commands/repair/table-repair.md
Documented --preserve-origin flag and preserve_origin request field; added detailed "Preserving replication origin" section for table-repair.
Tests & helpers
internal/consistency/repair/table_repair_batch_test.go, tests/integration/..., tests/integration/helpers_test.go
Added unit tests for originBatchKey and timestamp precision, new integration tests (preserve-origin, NULL-fix flows), helper functions for commit timestamps and replication origin, and adjusted an integration test to disable preserve-origin when origin LSN is unavailable.
Logger
pkg/logger/logger.go
Added exported SetOutput(*os.File) to allow redirecting logger output.
Dependencies
go.mod
Bumped github.com/mattn/go-sqlite3 from v1.14.22 to v1.14.33.

Poem

🐇 I nibble LSNs beneath the moon,
I tuck each timestamp safe and soon,
I hop through origins, stitch by stitch,
I keep each row's true time in niche,
A rabbit hums — repair is tuned.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 48.72% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly reflects the main change: implementing a preserve-origin feature with timestamp handling for table repair operations, matching the primary objective of the PR.
Description check ✅ Passed The description is clearly related to the changeset, providing a detailed test scenario and explaining the preserve-origin feature's purpose in preventing replication conflicts during recovery.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Fix all issues with AI agents
In `@docs/commands/repair/table-repair.md`:
- Line 33: The docs refer to a negated flag that doesn't exist: update the
documentation for the CLI flag --preserve-origin (and all occurrences in the
section lines 73-121) to reflect the actual behavior by either adding an
explicit negated flag to the CLI (--no-preserve-origin) in the command
implementation or changing the docs to show the flag as --preserve-origin=false
and state the correct default; specifically, locate the CLI flag definition
named --preserve-origin and either implement a corresponding
--no-preserve-origin boolean switch in the command parser (mirroring how other
negated flags are handled) or edit the markdown entries (including the table row
and any “enabled by default” text) to indicate the real default value and usage
form (--preserve-origin or --preserve-origin=false).

In `@internal/cli/cli.go`:
- Around line 226-230: Decide which default you want and make the code/docs
consistent: if the intended default is true, set the cli BoolFlag Value for Name
"preserve-origin" to true and update the struct default in table_repair.go to
true, and in the flag handling use ctx.IsSet("preserve-origin") to only override
the struct default when the user explicitly supplies the flag; if the intended
default is false, update docs (docs/api.md and
docs/commands/repair/table-repair.md) to state the default is false and correct
references to "--no-preserve-origin" to instead show "--preserve-origin=false"
(or document the exact supported syntax by urfave/cli v2). Ensure the single
source of truth is the flag Value and the table repair struct default are
aligned.

In `@internal/consistency/repair/table_repair.go`:
- Around line 1087-1091: The current update logic sets nu.sourceRow whenever
sourceRow is non-empty even if nu.sourceRow is already set, which can overwrite
the original source; change the condition to only set nu.sourceRow if it is
currently nil and the incoming sourceRow is non-empty (i.e., require
nu.sourceRow == nil && len(sourceRow) > 0) so the first non-nil source is
retained; locate this change around the nu.sourceRow assignment in the table
repair code (the block referencing nu.sourceRow and sourceRow) and adjust the
conditional accordingly.
- Around line 2700-2751: Replace the direct use of task.QualifiedTableName with
a sanitized identifier built via pgx.Identifier{task.Schema,
task.Table}.Sanitize() when writing the INSERT target (replace occurrences
around upsertSQL.WriteString(task.QualifiedTableName)), and after building
setClauses handle the empty-case by emitting "ON CONFLICT (...) DO NOTHING"
instead of "DO UPDATE SET " when setClauses is empty; otherwise emit "DO UPDATE
SET " + strings.Join(setClauses, ", "). Use the existing variables upsertSQL,
task.Schema, task.Table, task.Key and setClauses so the SQL remains consistent
and valid even for tables that only have primary key columns.
- Around line 3191-3230: The pool is getting connections with replication origin
state because resetReplicationOriginXactOnConnection and
resetReplicationOriginSessionOnConnection acquire new connections instead of
resetting the original transaction connection that executed the replicated SQL;
update the logic so the reset is performed on the same connection/tx (e.g., add
overloads or change callers to call resetReplicationOriginXact(tx pgx.Tx) /
resetReplicationOriginSession(tx pgx.Tx) using the original pgx.Conn or pgx.Tx
before it is released, or alternatively enable pool-level
BeforeAcquire/AfterRelease hooks to run
queries.ResetReplicationOriginSession/ResetReplicationOriginXact on connection
handles) and ensure the reset is invoked prior to returning the connection to
the pool (references: resetReplicationOriginXactOnConnection,
resetReplicationOriginSessionOnConnection, resetReplicationOriginXact).

In `@tests/integration/helpers_test.go`:
- Around line 348-379: getReplicationOrigin currently ignores the captured xmin
and returns an arbitrary origin; change it to use
pg_xact_commit_timestamp_origin(xmin) to resolve the transaction's origin OID
and join that OID to pg_replication_origin to get the actual origin name (ensure
track_commit_timestamp is enabled or handle nil); alternatively, if
commit-origin tracking isn't available, modify getReplicationOrigin(t, ctx,
pool, qualifiedTableName, id) to accept an expected origin parameter and
return/compare that instead. Use the function name getReplicationOrigin, the
local variable xmin, and replace the originQuery lookup logic with the
pg_xact_commit_timestamp_origin(xmin) -> pg_replication_origin join or the added
parameter approach.

In `@tests/integration/table_repair_test.go`:
- Around line 1028-1044: The test currently enables/truncates on each node
without registering a safety cleanup, so failures can leave the cluster
diverged; after calling tc.setup() add a t.Cleanup that restores a consistent
state for qualifiedTableName across pgCluster (use pgCluster.Node1Pool and
Node2Pool) by enabling spock.repair_mode(true), truncating the table, then
disabling spock.repair_mode(false) for each node (use the same pool.Exec pattern
and node names as in the loop), so any mid-test failure will re-align the nodes
and avoid leaving the cluster diverged; keep tc.teardown but ensure this new
cleanup runs to guarantee cluster consistency.
- Around line 1189-1193: The cleanup currently only drops the table
(qualifiedTableName) from the DB pools (pgCluster.Node1Pool, Node2Pool,
Node3Pool) but does not remove it from the repset; update the T.Cleanup closure
to call spock.repset_remove_table(...) for the default repset (matching how
other tests do it) against each node before executing the DROP TABLE CASCADE;
ensure you use the same qualifiedTableName and the same pgxpool pools so repset
metadata is cleared prior to table removal.
🧹 Nitpick comments (4)
ace.yaml (1)

12-12: Consider a neutral/empty default_cluster in the template.

Setting "test-cluster" as the default can steer users who omit the cluster arg toward a non-existent cluster in real deployments. If this is just for tests, consider keeping the template empty (or a neutral placeholder) to reduce surprise.

internal/consistency/repair/table_repair.go (3)

1270-1281: LSN offset calculation may cause collisions and is duplicated.

The offset calculation commitTS.UnixMicro() % 1000000 wraps every second, so two timestamps exactly 1 second apart would produce the same offset, potentially causing LSN collisions for different timestamp groups.

Additionally, this same logic is duplicated in executeUpserts (lines 2326-2332) and executeUpsertsWithTimestamps (lines 2596-2601).

Suggested refactor: Extract helper function and improve uniqueness
// calculateUniqueLSN generates a unique LSN by combining base LSN with timestamp-based offset.
// Uses full microsecond value (larger modulo) to reduce collision probability.
func calculateUniqueLSN(baseLSN uint64, commitTS *time.Time) uint64 {
    if commitTS == nil {
        return baseLSN
    }
    // Use larger modulo to reduce collision probability
    // Still maintains reasonable offset range
    offset := uint64(commitTS.UnixMicro() % 10000000000) // 10B allows ~2.7 hours uniqueness
    return baseLSN + offset
}

Then replace all three occurrences with a call to this helper.


2190-2206: Timestamp parsing has limited format support.

The timestamp parsing supports only two formats: RFC3339 and a specific PostgreSQL-like format. Other common timestamp formats (e.g., with timezone names, different separators) will fail silently, causing those rows to lose origin tracking.

Consider adding more fallback formats or using a more flexible parsing library, especially since this data comes from database metadata which may vary by configuration.

Example of extended format support
var supportedFormats = []string{
    time.RFC3339,
    time.RFC3339Nano,
    "2006-01-02 15:04:05.999999-07",
    "2006-01-02 15:04:05.999999-07:00",
    "2006-01-02T15:04:05.999999Z",
    "2006-01-02 15:04:05",
}

for _, format := range supportedFormats {
    ts, err = time.Parse(format, v)
    if err == nil {
        break
    }
}

1002-1023: O(n²) lookup for source rows in buildNullUpdates.

The nested loops search through all source rows to find matching sourceRow for each null value difference. For large diff files, this could be slow.

Consider pre-building an index similar to how node1Index and node2Index are built, but preserving the original OrderedMap for origin extraction:

Performance optimization suggestion
// Build index that preserves OrderedMap for origin tracking
node1OrderedIndex := make(map[string]types.OrderedMap, len(node1Rows))
for _, r := range node1Rows {
    if pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil {
        node1OrderedIndex[pkeyStr] = r
    }
}
// Then use direct lookup instead of loop:
sourceRow := node2OrderedIndex[pkKey]

Ibrar Ahmed added 5 commits February 4, 2026 20:02
Use n1 as source of truth and compare n1-n2 instead of n2-n3 to avoid
cluster node name resolution issues in CI environment.
Insert data on n3 (instead of n1) so replicated rows on n1 have
node_origin='node_n3'. Repair code ignores 'local' origins, requiring
non-local origin metadata for preserve-origin feature to work correctly.
Improve code formatting, consolidate logic, and update documentation
for preserve-origin feature.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1342-1388: Add per-row replication-origin checks alongside the
timestamp checks: capture original origins from n1 into a map (e.g.,
originalOrigins using getReplicationOrigin() similar to originalTimestamps),
collect repaired origins from Node2 via getReplicationOrigin() inside the same
sampleIDs loop (e.g., originsWith), then assert that when PreserveOrigin=true
the repaired origin equals the original for each id and when PreserveOrigin is
false the repaired origin differs; update the result reporting to include
preservedOriginCount and list rows with origin mismatches (use function names
getReplicationOrigin, originalTimestamps, and the existing
sampleIDs/timestampsWith variables to locate where to add these checks).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1283-1284: repairTime is recorded after the time.Sleep, so it's
~1s later than the actual repair; move the repairTime capture to immediately
after the repair completes (i.e., before the time.Sleep) or simply remove the
time.Sleep entirely and keep the existing tolerance assertion that compares
repairTime to the repair timestamp to avoid the off-by-sleep drift; update the
variable capture for repairTime (and remove the redundant sleep) so the
subsequent tolerance check uses the true repair completion time.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1352-1364: The final assertion counting rows after the failed
PreserveOrigin repair can fail because rows were deleted before the second
repair; update the logic in TestTableRepair_PreserveOrigin to detect the
specific "origin LSN not available" error from the PreserveOrigin=true repair
attempt and, if present, skip the pgCluster.Node2Pool.QueryRow / require.Equal
row-count assertion (or alternatively re-run the repair with
PreserveOrigin=false to restore rows before asserting). Locate the
PreserveOrigin repair call (the function/method that performs the repair with
PreserveOrigin=true) and branch on the error value/type returned; only perform
the finalQuery using pgCluster.Node2Pool.QueryRow and the finalCount/assert when
the PreserveOrigin repair succeeded or after you re-run a non-preserve repair to
restore data.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/consistency/repair/table_repair.go (1)

1617-1665: ⚠️ Potential issue | 🟠 Major

Committing deletes before upserts can leave partial repairs on failure.

Line 1629 commits deletes before running upserts; if the upserts fail, the node can be left missing rows. Consider keeping the single-transaction path when PreserveOrigin is false or performing upserts first and deleting only after a successful upsert phase.

🛠️ One possible safeguard
-			// Close the current transaction before executing upserts with per-timestamp transactions
-			// Reset spock.repair_mode temporarily
-			if spockRepairModeActive {
-				_, err = tx.Exec(t.Ctx, "SELECT spock.repair_mode(false)")
-				if err != nil {
-					tx.Rollback(t.Ctx)
-					logger.Error("disabling spock.repair_mode(false) on %s before upserts: %v", nodeName, err)
-					repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(false) failed for %s: %v", nodeName, err))
-					continue
-				}
-			}
-
-			// Commit the current transaction (which handled deletes if any)
-			logger.Debug("Committing transaction on %s before calling executeUpsertsWithTimestamps", nodeName)
-			err = tx.Commit(t.Ctx)
-			if err != nil {
-				logger.Error("committing transaction on %s before upserts: %v", nodeName, err)
-				repairErrors = append(repairErrors, fmt.Sprintf("commit failed for %s: %v", nodeName, err))
-				continue
-			}
-
-			// Execute upserts with per-timestamp transactions
-			upsertedCount, err := executeUpsertsWithTimestamps(divergentPool, t, nodeName, nodeUpserts, targetNodeColTypes, originInfoMap)
+			if t.PreserveOrigin {
+				// Commit deletes before per-timestamp upserts (required for origin grouping)
+				if spockRepairModeActive {
+					_, err = tx.Exec(t.Ctx, "SELECT spock.repair_mode(false)")
+					if err != nil {
+						tx.Rollback(t.Ctx)
+						logger.Error("disabling spock.repair_mode(false) on %s before upserts: %v", nodeName, err)
+						repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(false) failed for %s: %v", nodeName, err))
+						continue
+					}
+				}
+				logger.Debug("Committing transaction on %s before calling executeUpsertsWithTimestamps", nodeName)
+				err = tx.Commit(t.Ctx)
+				if err != nil {
+					logger.Error("committing transaction on %s before upserts: %v", nodeName, err)
+					repairErrors = append(repairErrors, fmt.Sprintf("commit failed for %s: %v", nodeName, err))
+					continue
+				}
+				upsertedCount, err := executeUpsertsWithTimestamps(divergentPool, t, nodeName, nodeUpserts, targetNodeColTypes, originInfoMap)
+				if err != nil {
+					logger.Error("executing upserts on node %s: %v", nodeName, err)
+					repairErrors = append(repairErrors, fmt.Sprintf("upsert ops failed for %s: %v", nodeName, err))
+					continue
+				}
+				totalOps[nodeName]["upserted"] = upsertedCount
+				logger.Info("Executed %d upsert operations on %s", upsertedCount, nodeName)
+				if t.report != nil {
+					// existing report population...
+				}
+				continue
+			}
+
+			// Preserve atomicity when not using preserve-origin
+			upsertedCount, err := executeUpserts(tx, t, nodeName, nodeUpserts, targetNodeColTypes, nil)
 			if err != nil {
 				logger.Error("executing upserts on node %s: %v", nodeName, err)
 				repairErrors = append(repairErrors, fmt.Sprintf("upsert ops failed for %s: %v", nodeName, err))
 				continue
 			}
🤖 Fix all issues with AI agents
In `@internal/consistency/repair/table_repair.go`:
- Around line 2657-2660: The call to disable spock.repair_mode uses pool.Exec
after commit and can run on a different connection, leaking the session state;
change it to execute "SELECT spock.repair_mode(false)" on the same transaction
object that previously set repair mode (the transaction variable used around
where spock.repair_mode(true) was set, e.g., tx or task.Tx) and run that Exec
before committing the transaction, preserving the existing error
handling/logging but referencing the transaction Exec instead of pool.Exec.
- Around line 2716-2723: In executeUpsertsInTransaction, when building upsert
values for each orderedCols element you must convert raw JSON-decoded values to
pgx-compatible types using utils.ConvertToPgxType before appending to args and
formatting the placeholder; use the corresponding colTypes entry for the column
to call utils.ConvertToPgxType, handle and return any conversion error (wrap
with context like "convert upsert value for column X"), and only append the
converted value to args and use its placeholder in upsertSQL (replace the
current direct val usage). Ensure the change mirrors the
conversion/error-handling pattern used earlier in the file (lines around the
other conversion block).
- Around line 1177-1331: applyFixNullsUpdates currently calls
setupReplicationOriginXact() multiple times in one transaction, which overwrites
origin LSN/TS so only the last group persists, and it never calls
resetReplicationOriginSession() after commit in runFixNulls(), leaving origin
state on the connection; fix by processing each origin group in its own
transaction (mirror executeUpsertsWithTimestamps) or else reject PreserveOrigin
for FixNulls, and ensure after committing each group's tx you call
resetReplicationOriginXact(tx) and resetReplicationOriginSession(tx) before
returning the connection; update applyFixNullsUpdates, runFixNulls, and the
setup/reset calls (setupReplicationOriginSession, setupReplicationOriginXact,
resetReplicationOriginXact, resetReplicationOriginSession) accordingly so each
origin group gets its own tx and session reset.
🧹 Nitpick comments (1)
internal/consistency/repair/table_repair.go (1)

1002-1023: Pre-index source rows to avoid O(n²) scans.

Line 1005+ scans the full row list for every primary key. Build a lookup map once and use constant-time access.

♻️ Suggested refactor
+		node1RowsByKey := make(map[string]types.OrderedMap, len(node1Rows))
+		for _, r := range node1Rows {
+			if pk, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil {
+				node1RowsByKey[pk] = r
+			}
+		}
+		node2RowsByKey := make(map[string]types.OrderedMap, len(node2Rows))
+		for _, r := range node2Rows {
+			if pk, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil {
+				node2RowsByKey[pk] = r
+			}
+		}
@@
-					var sourceRow types.OrderedMap
-					for _, r := range node2Rows {
-						pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key)
-						if err == nil && pkeyStr == pkKey {
-							sourceRow = r
-							break
-						}
-					}
+					sourceRow := node2RowsByKey[pkKey]
 					addNullUpdate(updatesByNode, node1Name, row1, col, val2, sourceRow)
 				} else if val2 == nil && val1 != nil {
-					var sourceRow types.OrderedMap
-					for _, r := range node1Rows {
-						pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key)
-						if err == nil && pkeyStr == pkKey {
-							sourceRow = r
-							break
-						}
-					}
+					sourceRow := node1RowsByKey[pkKey]
 					addNullUpdate(updatesByNode, node2Name, row2, col, val1, sourceRow)
 				}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1414-1442: The test currently only logs a warning when 0 <
preservedCount < len(sampleIDs) (the partial-preservation branch) so it can
silently pass; update the partial branch (where preservedCount, failedRows,
sampleIDs are checked) to assert expected behavior — either require
preservedCount == len(sampleIDs) or enforce a configurable minimum threshold
(e.g., require preservedCount >= threshold) and call t.Errorf/t.Fatalf when the
threshold isn’t met; also include t.Logf with failedRows and
repairOutput/repairVerifyTime/timestampsWith context to aid debugging, and
optionally verify origins via getReplicationOrigin for any non-preserved ids to
surface failures immediately.

In `@tests/integration/timestamp_comparison_test.go`:
- Around line 79-85: The "negative difference (within tolerance)" test case is
incorrect because t1 and t2 are identical to the positive case; update the test
case (named "negative difference (within tolerance)") so that t1 is later than
t2 by swapping their time.Date values (make t1 = ...123457000 and t2 =
...123456000) to produce a negative difference and preserve the same tolerance
and expected=true assertion.
🧹 Nitpick comments (3)
tests/integration/timestamp_comparison_test.go (1)

105-106: Misleading assertion message: the old function is behaving correctly per its design.

With a 1-second tolerance, treating a 500ms difference as "equal" is the expected behavior of the old function, not incorrect. The message reads as if the old function has a bug. Consider:

-	assert.True(t, oldResult, "Old function incorrectly considers 500ms difference as equal")
+	assert.True(t, oldResult, "Old function considers 500ms difference as equal (within 1s tolerance)")
tests/integration/table_repair_test.go (2)

1116-1166: Redundant assertions: lines 1132–1140 and 1158–1166 re-check values already verified above.

The "Both nodes should now have complete row" blocks repeat the exact same field checks that were already asserted in the per-node NULL-fill blocks (lines 1120–1130 and 1146–1156). Consider removing the duplicate block or consolidating into a single cross-node equality check.


1346-1351: err is set inside a closure captured by reference — works but is subtle.

The err variable on line 1347 refers to the outer-scope err (declared at line 1253). This is valid Go but can be confusing. Consider using a dedicated variable for clarity:

+	var repairErr error
 	repairOutput := captureOutput(t, func() {
-		err = repairTaskWith.Run(false)
+		repairErr = repairTaskWith.Run(false)
 	})
-	require.NoError(t, err, "Table repair Run() returned unexpected error")
+	require.NoError(t, repairErr, "Table repair Run() returned unexpected error")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants