diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 40060ca05..e73f8ffed 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -4,6 +4,7 @@ import { PropRef, Value } from '../query/ir.js' import { EventEmitter } from '../event-emitter.js' import { compileExpression } from '../query/compiler/evaluators.js' import { buildCursor } from '../utils/cursor.js' +import { isPredicateSubset } from '../query/predicate-utils.js' import { createFilterFunctionFromExpression, createFilteredCallback, @@ -375,7 +376,14 @@ export class CollectionSubscription opts?.onLoadSubsetResult?.(syncResult) // Track this loadSubset call so we can unload it later - this.loadedSubsets.push(loadOptions) + // Skip if an equivalent or superset predicate already exists to prevent + // unbounded accumulation of loadedSubsets + const isAlreadyCovered = this.loadedSubsets.some((existingOptions) => + isPredicateSubset(loadOptions, existingOptions), + ) + if (!isAlreadyCovered) { + this.loadedSubsets.push(loadOptions) + } const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true if (trackLoadSubsetPromise) { diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 5dcb7ed39..aa9320616 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -274,6 +274,10 @@ function processJoin( // Set up lazy loading: intercept active side's stream and dynamically load // matching rows from lazy side based on join keys. + // Track which join keys have already been loaded to avoid duplicate requests + // and unbounded growth of loadedSubsets array. + const loadedJoinKeys = new Set() + const activePipelineWithLoading: IStreamBuilder< [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( @@ -301,15 +305,31 @@ function processJoin( return } + // Extract join keys and filter out already-loaded ones to prevent + // unbounded accumulation of loadedSubsets + const allJoinKeys = data.getInner().map(([[joinKey]]) => joinKey) + const newJoinKeys = allJoinKeys.filter( + (key) => !loadedJoinKeys.has(key), + ) + + // Skip if all keys have already been loaded + if (newJoinKeys.length === 0) { + return + } + // Request filtered snapshot from lazy collection for matching join keys - const joinKeys = data.getInner().map(([[joinKey]]) => joinKey) const lazyJoinRef = new PropRef(followRefResult.path) const loaded = lazySourceSubscription.requestSnapshot({ - where: inArray(lazyJoinRef, joinKeys), + where: inArray(lazyJoinRef, newJoinKeys), optimizedOnly: true, }) - if (!loaded) { + if (loaded) { + // Mark keys as loaded only if requestSnapshot succeeded + for (const key of newJoinKeys) { + loadedJoinKeys.add(key) + } + } else { // Snapshot wasn't sent because it could not be loaded from the indexes lazySourceSubscription.requestSnapshot() } diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index edb632c30..32361a5a2 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -1730,3 +1730,262 @@ describe(`Query JOIN Operations`, () => { createJoinTests(`off`) createJoinTests(`eager`) }) + +describe(`JOIN lazy loading deduplication`, () => { + // Test that JOIN lazy loading doesn't accumulate duplicate loadSubset calls + // when the same join keys flow through the pipeline multiple times + + type User = { + id: number + name: string + } + + type Order = { + id: number + userId: number + amount: number + } + + test(`should not request the same join keys multiple times`, () => { + const loadSubsetCalls: Array<{ where: unknown }> = [] + + // Create users collection + const usersOpts = mockSyncCollectionOptions({ + id: `dedup-test-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice` }, + { id: 2, name: `Bob` }, + ], + autoIndex: `eager`, + }) + const usersCollection = createCollection(usersOpts) + + // Create orders collection with loadSubset tracking + const ordersOpts = mockSyncCollectionOptions({ + id: `dedup-test-orders`, + getKey: (order) => order.id, + initialData: [ + { id: 1, userId: 1, amount: 100 }, + { id: 2, userId: 1, amount: 200 }, + { id: 3, userId: 2, amount: 150 }, + ], + autoIndex: `eager`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } }) + write({ type: `insert`, value: { id: 2, userId: 1, amount: 200 } }) + write({ type: `insert`, value: { id: 3, userId: 2, amount: 150 } }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCalls.push({ where: options.where }) + return true + }, + } + }, + }, + }) + const ordersCollection = createCollection(ordersOpts) + + // Create join query - orders collection is the lazy side + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: usersCollection }) + .join({ order: ordersCollection }, ({ user, order }) => + eq(user.id, order.userId), + ) + .select(({ user, order }) => ({ + userName: user.name, + orderId: order!.id, + amount: order!.amount, + })), + }) + + // Initial load should trigger loadSubset calls for join keys + const initialResults = joinQuery.toArray + expect(initialResults).toHaveLength(3) + + // Record how many loadSubset calls we have so far + const initialCallCount = loadSubsetCalls.length + + // Trigger another graph run by inserting a new user + // This will cause data to flow through the pipeline again + usersOpts.utils.begin() + usersOpts.utils.write({ type: `insert`, value: { id: 3, name: `Charlie` } }) + usersOpts.utils.commit() + + // Since Charlie has no orders, it shouldn't trigger new loadSubset calls + // for existing join keys (1 and 2) + const afterInsertCallCount = loadSubsetCalls.length + + // The key assertion: we should NOT see duplicate loadSubset calls + // for the same join keys (1 and 2) that were already loaded + expect(afterInsertCallCount).toBeLessThanOrEqual(initialCallCount + 1) + }) + + test(`should deduplicate loadedSubsets in subscription`, () => { + const loadSubsetCalls: Array<{ where: unknown }> = [] + + const usersOpts = mockSyncCollectionOptions({ + id: `dedup-subsets-users`, + getKey: (user) => user.id, + initialData: [{ id: 1, name: `Alice` }], + autoIndex: `eager`, + }) + const usersCollection = createCollection(usersOpts) + + const ordersOpts = mockSyncCollectionOptions({ + id: `dedup-subsets-orders`, + getKey: (order) => order.id, + initialData: [{ id: 1, userId: 1, amount: 100 }], + autoIndex: `eager`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCalls.push({ where: options.where }) + return true + }, + } + }, + }, + }) + const ordersCollection = createCollection(ordersOpts) + + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: usersCollection }) + .join({ order: ordersCollection }, ({ user, order }) => + eq(user.id, order.userId), + ), + }) + + expect(joinQuery.toArray).toHaveLength(1) + + // Trigger multiple graph runs by updating the user + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated` }, + }) + usersOpts.utils.commit() + + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated 2` }, + }) + usersOpts.utils.commit() + + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice Updated 3` }, + }) + usersOpts.utils.commit() + + // The loadSubset calls should be bounded - not growing with each update + // because the same join key (1) is already tracked + expect(loadSubsetCalls.length).toBeLessThanOrEqual(2) + }) + + test(`should only track new join keys when they differ from loaded ones`, () => { + // This test verifies that the loadedJoinKeys Set correctly tracks + // which keys have been requested, preventing duplicate requests + + const loadSubsetCalls: Array<{ where: unknown }> = [] + + // Create users collection with multiple users + const usersOpts = mockSyncCollectionOptions({ + id: `track-keys-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice` }, + { id: 2, name: `Bob` }, + { id: 3, name: `Charlie` }, + ], + autoIndex: `eager`, + }) + const usersCollection = createCollection(usersOpts) + + // Create orders collection with loadSubset tracking + const ordersOpts = mockSyncCollectionOptions({ + id: `track-keys-orders`, + getKey: (order) => order.id, + initialData: [ + { id: 1, userId: 1, amount: 100 }, + { id: 2, userId: 2, amount: 200 }, + { id: 3, userId: 3, amount: 300 }, + ], + autoIndex: `eager`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } }) + write({ type: `insert`, value: { id: 2, userId: 2, amount: 200 } }) + write({ type: `insert`, value: { id: 3, userId: 3, amount: 300 } }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCalls.push({ where: options.where }) + return true + }, + } + }, + }, + }) + const ordersCollection = createCollection(ordersOpts) + + // Create join query + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: usersCollection }) + .join({ order: ordersCollection }, ({ user, order }) => + eq(user.id, order.userId), + ), + }) + + // Should have 3 results + expect(joinQuery.toArray).toHaveLength(3) + + // Record initial state + const initialCallCount = loadSubsetCalls.length + + // Now make multiple updates to trigger graph reruns + // Each update should NOT cause additional loadSubset calls + // for the same join keys that were already processed + for (let i = 0; i < 5; i++) { + usersOpts.utils.begin() + usersOpts.utils.write({ + type: `update`, + value: { id: 1, name: `Alice v${i}` }, + }) + usersOpts.utils.commit() + } + + // After 5 updates, we should NOT have 5 additional loadSubset calls + // because the join keys (1, 2, 3) are already tracked + const finalCallCount = loadSubsetCalls.length + + // The key assertion: loadSubset calls should be bounded, + // not growing proportionally with the number of graph runs + expect(finalCallCount).toBeLessThanOrEqual(initialCallCount + 1) + }) +})