Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { PropRef, Value } from '../query/ir.js'
import { EventEmitter } from '../event-emitter.js'
import { compileExpression } from '../query/compiler/evaluators.js'
import { buildCursor } from '../utils/cursor.js'
import { isPredicateSubset } from '../query/predicate-utils.js'
import {
createFilterFunctionFromExpression,
createFilteredCallback,
Expand Down Expand Up @@ -375,7 +376,14 @@ export class CollectionSubscription
opts?.onLoadSubsetResult?.(syncResult)

// Track this loadSubset call so we can unload it later
this.loadedSubsets.push(loadOptions)
// Skip if an equivalent or superset predicate already exists to prevent
// unbounded accumulation of loadedSubsets
const isAlreadyCovered = this.loadedSubsets.some((existingOptions) =>
isPredicateSubset(loadOptions, existingOptions),
)
if (!isAlreadyCovered) {
this.loadedSubsets.push(loadOptions)
}

const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true
if (trackLoadSubsetPromise) {
Expand Down
26 changes: 23 additions & 3 deletions packages/db/src/query/compiler/joins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ function processJoin(

// Set up lazy loading: intercept active side's stream and dynamically load
// matching rows from lazy side based on join keys.
// Track which join keys have already been loaded to avoid duplicate requests
// and unbounded growth of loadedSubsets array.
const loadedJoinKeys = new Set<unknown>()

const activePipelineWithLoading: IStreamBuilder<
[key: unknown, [originalKey: string, namespacedRow: NamespacedRow]]
> = activePipeline.pipe(
Expand Down Expand Up @@ -301,15 +305,31 @@ function processJoin(
return
}

// Extract join keys and filter out already-loaded ones to prevent
// unbounded accumulation of loadedSubsets
const allJoinKeys = data.getInner().map(([[joinKey]]) => joinKey)
const newJoinKeys = allJoinKeys.filter(
(key) => !loadedJoinKeys.has(key),
)

// Skip if all keys have already been loaded
if (newJoinKeys.length === 0) {
return
}

// Request filtered snapshot from lazy collection for matching join keys
const joinKeys = data.getInner().map(([[joinKey]]) => joinKey)
const lazyJoinRef = new PropRef(followRefResult.path)
const loaded = lazySourceSubscription.requestSnapshot({
where: inArray(lazyJoinRef, joinKeys),
where: inArray(lazyJoinRef, newJoinKeys),
optimizedOnly: true,
})

if (!loaded) {
if (loaded) {
// Mark keys as loaded only if requestSnapshot succeeded
for (const key of newJoinKeys) {
loadedJoinKeys.add(key)
}
} else {
// Snapshot wasn't sent because it could not be loaded from the indexes
lazySourceSubscription.requestSnapshot()
}
Expand Down
259 changes: 259 additions & 0 deletions packages/db/tests/query/join.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1730,3 +1730,262 @@ describe(`Query JOIN Operations`, () => {
createJoinTests(`off`)
createJoinTests(`eager`)
})

describe(`JOIN lazy loading deduplication`, () => {
// Test that JOIN lazy loading doesn't accumulate duplicate loadSubset calls
// when the same join keys flow through the pipeline multiple times

type User = {
id: number
name: string
}

type Order = {
id: number
userId: number
amount: number
}

test(`should not request the same join keys multiple times`, () => {
const loadSubsetCalls: Array<{ where: unknown }> = []

// Create users collection
const usersOpts = mockSyncCollectionOptions<User>({
id: `dedup-test-users`,
getKey: (user) => user.id,
initialData: [
{ id: 1, name: `Alice` },
{ id: 2, name: `Bob` },
],
autoIndex: `eager`,
})
const usersCollection = createCollection(usersOpts)

// Create orders collection with loadSubset tracking
const ordersOpts = mockSyncCollectionOptions<Order>({
id: `dedup-test-orders`,
getKey: (order) => order.id,
initialData: [
{ id: 1, userId: 1, amount: 100 },
{ id: 2, userId: 1, amount: 200 },
{ id: 3, userId: 2, amount: 150 },
],
autoIndex: `eager`,
sync: {
sync: ({ begin, write, commit, markReady }) => {
begin()
write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } })
write({ type: `insert`, value: { id: 2, userId: 1, amount: 200 } })
write({ type: `insert`, value: { id: 3, userId: 2, amount: 150 } })
commit()
markReady()

return {
loadSubset: (options) => {
loadSubsetCalls.push({ where: options.where })
return true
},
}
},
},
})
const ordersCollection = createCollection(ordersOpts)

// Create join query - orders collection is the lazy side
const joinQuery = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ user: usersCollection })
.join({ order: ordersCollection }, ({ user, order }) =>
eq(user.id, order.userId),
)
.select(({ user, order }) => ({
userName: user.name,
orderId: order!.id,
amount: order!.amount,
})),
})

// Initial load should trigger loadSubset calls for join keys
const initialResults = joinQuery.toArray
expect(initialResults).toHaveLength(3)

// Record how many loadSubset calls we have so far
const initialCallCount = loadSubsetCalls.length

// Trigger another graph run by inserting a new user
// This will cause data to flow through the pipeline again
usersOpts.utils.begin()
usersOpts.utils.write({ type: `insert`, value: { id: 3, name: `Charlie` } })
usersOpts.utils.commit()

// Since Charlie has no orders, it shouldn't trigger new loadSubset calls
// for existing join keys (1 and 2)
const afterInsertCallCount = loadSubsetCalls.length

// The key assertion: we should NOT see duplicate loadSubset calls
// for the same join keys (1 and 2) that were already loaded
expect(afterInsertCallCount).toBeLessThanOrEqual(initialCallCount + 1)
})

test(`should deduplicate loadedSubsets in subscription`, () => {
const loadSubsetCalls: Array<{ where: unknown }> = []

const usersOpts = mockSyncCollectionOptions<User>({
id: `dedup-subsets-users`,
getKey: (user) => user.id,
initialData: [{ id: 1, name: `Alice` }],
autoIndex: `eager`,
})
const usersCollection = createCollection(usersOpts)

const ordersOpts = mockSyncCollectionOptions<Order>({
id: `dedup-subsets-orders`,
getKey: (order) => order.id,
initialData: [{ id: 1, userId: 1, amount: 100 }],
autoIndex: `eager`,
sync: {
sync: ({ begin, write, commit, markReady }) => {
begin()
write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } })
commit()
markReady()

return {
loadSubset: (options) => {
loadSubsetCalls.push({ where: options.where })
return true
},
}
},
},
})
const ordersCollection = createCollection(ordersOpts)

const joinQuery = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ user: usersCollection })
.join({ order: ordersCollection }, ({ user, order }) =>
eq(user.id, order.userId),
),
})

expect(joinQuery.toArray).toHaveLength(1)

// Trigger multiple graph runs by updating the user
usersOpts.utils.begin()
usersOpts.utils.write({
type: `update`,
value: { id: 1, name: `Alice Updated` },
})
usersOpts.utils.commit()

usersOpts.utils.begin()
usersOpts.utils.write({
type: `update`,
value: { id: 1, name: `Alice Updated 2` },
})
usersOpts.utils.commit()

usersOpts.utils.begin()
usersOpts.utils.write({
type: `update`,
value: { id: 1, name: `Alice Updated 3` },
})
usersOpts.utils.commit()

// The loadSubset calls should be bounded - not growing with each update
// because the same join key (1) is already tracked
expect(loadSubsetCalls.length).toBeLessThanOrEqual(2)
})

test(`should only track new join keys when they differ from loaded ones`, () => {
// This test verifies that the loadedJoinKeys Set correctly tracks
// which keys have been requested, preventing duplicate requests

const loadSubsetCalls: Array<{ where: unknown }> = []

// Create users collection with multiple users
const usersOpts = mockSyncCollectionOptions<User>({
id: `track-keys-users`,
getKey: (user) => user.id,
initialData: [
{ id: 1, name: `Alice` },
{ id: 2, name: `Bob` },
{ id: 3, name: `Charlie` },
],
autoIndex: `eager`,
})
const usersCollection = createCollection(usersOpts)

// Create orders collection with loadSubset tracking
const ordersOpts = mockSyncCollectionOptions<Order>({
id: `track-keys-orders`,
getKey: (order) => order.id,
initialData: [
{ id: 1, userId: 1, amount: 100 },
{ id: 2, userId: 2, amount: 200 },
{ id: 3, userId: 3, amount: 300 },
],
autoIndex: `eager`,
sync: {
sync: ({ begin, write, commit, markReady }) => {
begin()
write({ type: `insert`, value: { id: 1, userId: 1, amount: 100 } })
write({ type: `insert`, value: { id: 2, userId: 2, amount: 200 } })
write({ type: `insert`, value: { id: 3, userId: 3, amount: 300 } })
commit()
markReady()

return {
loadSubset: (options) => {
loadSubsetCalls.push({ where: options.where })
return true
},
}
},
},
})
const ordersCollection = createCollection(ordersOpts)

// Create join query
const joinQuery = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ user: usersCollection })
.join({ order: ordersCollection }, ({ user, order }) =>
eq(user.id, order.userId),
),
})

// Should have 3 results
expect(joinQuery.toArray).toHaveLength(3)

// Record initial state
const initialCallCount = loadSubsetCalls.length

// Now make multiple updates to trigger graph reruns
// Each update should NOT cause additional loadSubset calls
// for the same join keys that were already processed
for (let i = 0; i < 5; i++) {
usersOpts.utils.begin()
usersOpts.utils.write({
type: `update`,
value: { id: 1, name: `Alice v${i}` },
})
usersOpts.utils.commit()
}

// After 5 updates, we should NOT have 5 additional loadSubset calls
// because the join keys (1, 2, 3) are already tracked
const finalCallCount = loadSubsetCalls.length

// The key assertion: loadSubset calls should be bounded,
// not growing proportionally with the number of graph runs
expect(finalCallCount).toBeLessThanOrEqual(initialCallCount + 1)
})
})