From 18a158fa27c84b718e35b39a39a878adbd70a51f Mon Sep 17 00:00:00 2001 From: melvinhagberg Date: Fri, 30 Jan 2026 11:05:55 +0100 Subject: [PATCH] fix(db): prevent JOIN lazy loading from accumulating duplicate loadedSubsets When using JOIN queries with on-demand sync mode, the lazy loading optimization for JOINs was accumulating LoadSubsetOptions in subscription.loadedSubsets without deduplication. Over multiple graph runs, this caused query keys and SQL queries to grow unboundedly, eventually causing "too many SQL variables" errors. Changes: - Add loadedJoinKeys Set in joins.ts to track already-loaded join keys - Filter out loaded keys before calling requestSnapshot - Add isPredicateSubset check in subscription.ts before pushing to loadedSubsets - Add tests for JOIN lazy loading deduplication Co-Authored-By: Claude Opus 4.5 --- packages/db/src/collection/subscription.ts | 10 +- packages/db/src/query/compiler/joins.ts | 26 ++- packages/db/tests/query/join.test.ts | 259 +++++++++++++++++++++ 3 files changed, 291 insertions(+), 4 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 40060ca05..e73f8ffed 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -4,6 +4,7 @@ import { PropRef, Value } from '../query/ir.js' import { EventEmitter } from '../event-emitter.js' import { compileExpression } from '../query/compiler/evaluators.js' import { buildCursor } from '../utils/cursor.js' +import { isPredicateSubset } from '../query/predicate-utils.js' import { createFilterFunctionFromExpression, createFilteredCallback, @@ -375,7 +376,14 @@ export class CollectionSubscription opts?.onLoadSubsetResult?.(syncResult) // Track this loadSubset call so we can unload it later - this.loadedSubsets.push(loadOptions) + // Skip if an equivalent or superset predicate already exists to prevent + // unbounded accumulation of loadedSubsets + const isAlreadyCovered = this.loadedSubsets.some((existingOptions) => + isPredicateSubset(loadOptions, existingOptions), + ) + if (!isAlreadyCovered) { + this.loadedSubsets.push(loadOptions) + } const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true if (trackLoadSubsetPromise) { diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 5dcb7ed39..aa9320616 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -274,6 +274,10 @@ function processJoin( // Set up lazy loading: intercept active side's stream and dynamically load // matching rows from lazy side based on join keys. + // Track which join keys have already been loaded to avoid duplicate requests + // and unbounded growth of loadedSubsets array. + const loadedJoinKeys = new Set() + const activePipelineWithLoading: IStreamBuilder< [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( @@ -301,15 +305,31 @@ function processJoin( return } + // Extract join keys and filter out already-loaded ones to prevent + // unbounded accumulation of loadedSubsets + const allJoinKeys = data.getInner().map(([[joinKey]]) => joinKey) + const newJoinKeys = allJoinKeys.filter( + (key) => !loadedJoinKeys.has(key), + ) + + // Skip if all keys have already been loaded + if (newJoinKeys.length === 0) { + return + } + // Request filtered snapshot from lazy collection for matching join keys - const joinKeys = data.getInner().map(([[joinKey]]) => joinKey) const lazyJoinRef = new PropRef(followRefResult.path) const loaded = lazySourceSubscription.requestSnapshot({ - where: inArray(lazyJoinRef, joinKeys), + where: inArray(lazyJoinRef, newJoinKeys), optimizedOnly: true, }) - if (!loaded) { + if (loaded) { + // Mark keys as loaded only if requestSnapshot succeeded + for (const key of newJoinKeys) { + loadedJoinKeys.add(key) + } + } else { // Snapshot wasn't sent because it could not be loaded from the indexes lazySourceSubscription.requestSnapshot() } diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index edb632c30..32361a5a2 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -1730,3 +1730,262 @@ describe(`Query JOIN Operations`, () => { createJoinTests(`off`) createJoinTests(`eager`) }) + +describe(`JOIN lazy loading deduplication`, () => { + // Test that JOIN lazy loading doesn't accumulate duplicate loadSubset calls + // when the same join keys flow through the pipeline multiple times + + type User = { + id: number + name: string + } + + type Order = { + id: number + userId: number + amount: number + } + + test(`should not request the same join keys multiple times`, () => { + const loadSubsetCalls: Array<{ where: unknown }> = [] + + // Create users collection + const usersOpts = mockSyncCollectionOptions({ + id: `dedup-test-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice` }, + { id: 2, name: `Bob` }, + ], + autoIndex: `eager`, + }) + const usersCollection = createCollection(usersOpts) + + // Create orders collection with loadSubset tracking + const ordersOpts = mockSyncCollectionOptions({ + id: `dedup-test-orders`, + getKey: (order) => order.id, + initialData: [ + { id: 1, userId: 1, amount: 100 }, + { id: 2, userId: 1, amount: 200 }, + { id: 3, userId: 2, amount: 150 }, + ], + autoIndex: `eager`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } }) + write({ type: `insert`, value: { id: 2, userId: 1, amount: 200 } }) + write({ type: `insert`, value: { id: 3, userId: 2, amount: 150 } }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCalls.push({ where: options.where }) + return true + }, + } + }, + }, + }) + const ordersCollection = createCollection(ordersOpts) + + // Create join query - orders collection is the lazy side + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: usersCollection }) + .join({ order: ordersCollection }, ({ user, order }) => + eq(user.id, order.userId), + ) + .select(({ user, order }) => ({ + userName: user.name, + orderId: order!.id, + amount: order!.amount, + })), + }) + + // Initial load should trigger loadSubset calls for join keys + const initialResults = joinQuery.toArray + expect(initialResults).toHaveLength(3) + + // Record how many loadSubset calls we have so far + const initialCallCount = loadSubsetCalls.length + + // Trigger another graph run by inserting a new user + // This will cause data to flow through the pipeline again + usersOpts.utils.begin() + usersOpts.utils.write({ type: `insert`, value: { id: 3, name: `Charlie` } }) + usersOpts.utils.commit() + + // Since Charlie has no orders, it shouldn't trigger new loadSubset calls + // for existing join keys (1 and 2) + const afterInsertCallCount = loadSubsetCalls.length + + // The key assertion: we should NOT see duplicate loadSubset calls + // for the same join keys (1 and 2) that were already loaded + expect(afterInsertCallCount).toBeLessThanOrEqual(initialCallCount + 1) + }) + + test(`should deduplicate loadedSubsets in subscription`, () => { + const loadSubsetCalls: Array<{ where: unknown }> = [] + + const usersOpts = mockSyncCollectionOptions({ + id: `dedup-subsets-users`, + getKey: (user) => user.id, + initialData: [{ id: 1, name: `Alice` }], + autoIndex: `eager`, + }) + const usersCollection = createCollection(usersOpts) + + const ordersOpts = mockSyncCollectionOptions({ + id: `dedup-subsets-orders`, + getKey: (order) => order.id, + initialData: [{ id: 1, userId: 1, amount: 100 }], + autoIndex: `eager`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCalls.push({ where: options.where }) + return true + }, + } + }, + }, + }) + const ordersCollection = createCollection(ordersOpts) + + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: usersCollection }) + .join({ order: ordersCollection }, ({ user, order }) => + eq(user.id, order.userId), + ), + }) + + expect(joinQuery.toArray).toHaveLength(1) + + // Trigger multiple graph runs by updating the user + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated` }, + }) + usersOpts.utils.commit() + + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated 2` }, + }) + usersOpts.utils.commit() + + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated 3` }, + }) + usersOpts.utils.commit() + + // The loadSubset calls should be bounded - not growing with each update + // because the same join key (1) is already tracked + expect(loadSubsetCalls.length).toBeLessThanOrEqual(2) + }) + + test(`should only track new join keys when they differ from loaded ones`, () => { + // This test verifies that the loadedJoinKeys Set correctly tracks + // which keys have been requested, preventing duplicate requests + + const loadSubsetCalls: Array<{ where: unknown }> = [] + + // Create users collection with multiple users + const usersOpts = mockSyncCollectionOptions({ + id: `track-keys-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice` }, + { id: 2, name: `Bob` }, + { id: 3, name: `Charlie` }, + ], + autoIndex: `eager`, + }) + const usersCollection = createCollection(usersOpts) + + // Create orders collection with loadSubset tracking + const ordersOpts = mockSyncCollectionOptions({ + id: `track-keys-orders`, + getKey: (order) => order.id, + initialData: [ + { id: 1, userId: 1, amount: 100 }, + { id: 2, userId: 2, amount: 200 }, + { id: 3, userId: 3, amount: 300 }, + ], + autoIndex: `eager`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } }) + write({ type: `insert`, value: { id: 2, userId: 2, amount: 200 } }) + write({ type: `insert`, value: { id: 3, userId: 3, amount: 300 } }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCalls.push({ where: options.where }) + return true + }, + } + }, + }, + }) + const ordersCollection = createCollection(ordersOpts) + + // Create join query + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: usersCollection }) + .join({ order: ordersCollection }, ({ user, order }) => + eq(user.id, order.userId), + ), + }) + + // Should have 3 results + expect(joinQuery.toArray).toHaveLength(3) + + // Record initial state + const initialCallCount = loadSubsetCalls.length + + // Now make multiple updates to trigger graph reruns + // Each update should NOT cause additional loadSubset calls + // for the same join keys that were already processed + for (let i = 0; i < 5; i++) { + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice v${i}` }, + }) + usersOpts.utils.commit() + } + + // After 5 updates, we should NOT have 5 additional loadSubset calls + // because the join keys (1, 2, 3) are already tracked + const finalCallCount = loadSubsetCalls.length + + // The key assertion: loadSubset calls should be bounded, + // not growing proportionally with the number of graph runs + expect(finalCallCount).toBeLessThanOrEqual(initialCallCount + 1) + }) +})