From 59b424d605d1bc542101d362e41e1606a1ab4119 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 7 Jul 2025 13:49:26 +0200 Subject: [PATCH 1/6] Revert back to map implementation of index --- packages/d2mini/src/indexes.ts | 122 +++++++----------- packages/d2mini/src/operators/join.ts | 7 - packages/d2mini/src/operators/reduce.ts | 14 +- .../src/operators/topKWithFractionalIndex.ts | 5 - 4 files changed, 57 insertions(+), 91 deletions(-) diff --git a/packages/d2mini/src/indexes.ts b/packages/d2mini/src/indexes.ts index 5108b00..c02893e 100644 --- a/packages/d2mini/src/indexes.ts +++ b/packages/d2mini/src/indexes.ts @@ -7,24 +7,32 @@ import { DefaultMap, hash } from './utils.js' * exploit the key-value structure of the data to run efficiently. */ export class Index { - #inner: DefaultMap - #changedKeys: Set + #inner: DefaultMap> constructor() { - this.#inner = new DefaultMap(() => []) - this.#changedKeys = new Set() + this.#inner = new DefaultMap>( + () => + new DefaultMap(() => [undefined as any as V, 0]), + ) + // #inner is as map of: + // { + // [key]: { + // [hash(value)]: [value, multiplicity] + // } + // } } toString(indent = false): string { return `Index(${JSON.stringify( - [...this.#inner], + [...this.#inner].map(([k, valueMap]) => [k, [...valueMap]]), undefined, indent ? ' ' : undefined, )})` } get(key: K): [V, number][] { - return this.#inner.get(key) + const valueMap = this.#inner.get(key) + return [...valueMap.values()] } entries() { @@ -36,78 +44,44 @@ export class Index { } has(key: K): boolean { - return this.#inner.has(key) && this.#inner.get(key).length > 0 + return this.#inner.has(key) } get size(): number { - let count = 0 - for (const [, values] of this.#inner.entries()) { - if (values.length > 0) { - count++ - } - } - return count + return this.#inner.size } addValue(key: K, value: [V, number]): void { - const values = this.#inner.get(key) - values.push(value) - this.#changedKeys.add(key) - } - - append(other: Index): void { - for (const [key, otherValues] of other.entries()) { - const thisValues = this.#inner.get(key) - for (const value of otherValues) { - thisValues.push(value) - } - this.#changedKeys.add(key) - } - } - - compact(keys: K[] = []): void { - // If no keys specified, use the changed keys - const keysToProcess = keys.length === 0 ? [...this.#changedKeys] : keys - - for (const key of keysToProcess) { - if (!this.#inner.has(key)) continue - - const values = this.#inner.get(key) - const consolidated = this.consolidateValues(values) - - // Remove the key entirely and re-add only if there are non-zero values - this.#inner.delete(key) - if (consolidated.length > 0) { - this.#inner.get(key).push(...consolidated) - } - } - - // Clear the changed keys after compaction - if (keys.length === 0) { - this.#changedKeys.clear() - } else { - // Only remove the keys that were explicitly compacted - for (const key of keys) { - this.#changedKeys.delete(key) + const [val, multiplicity] = value + const valueMap = this.#inner.get(key) + const valueHash = hash(val) + const [, existingMultiplicity] = valueMap.get(valueHash) + const newMultiplicity = existingMultiplicity + multiplicity + if (multiplicity !== 0) { + if (newMultiplicity === 0) { + valueMap.delete(valueHash) + } else { + valueMap.set(valueHash, [val, newMultiplicity]) } } } - private consolidateValues(values: [V, number][]): [V, number][] { - const consolidated = new Map() - - for (const [value, multiplicity] of values) { - const valueHash = hash(value) - if (consolidated.has(valueHash)) { - consolidated.get(valueHash)!.multiplicity += multiplicity - } else { - consolidated.set(valueHash, { value, multiplicity }) + append(other: Index): void { + for (const [key, otherValueMap] of other.entries()) { + const thisValueMap = this.#inner.get(key) + for (const [ + valueHash, + [value, multiplicity], + ] of otherValueMap.entries()) { + const [, existingMultiplicity] = thisValueMap.get(valueHash) + const newMultiplicity = existingMultiplicity + multiplicity + if (newMultiplicity === 0) { + thisValueMap.delete(valueHash) + } else { + thisValueMap.set(valueHash, [value, newMultiplicity]) + } } } - - return [...consolidated.values()] - .filter(({ multiplicity }) => multiplicity !== 0) - .map(({ value, multiplicity }) => [value, multiplicity]) } join(other: Index): MultiSet<[K, [V, V2]]> { @@ -116,11 +90,11 @@ export class Index { // We want to iterate over the smaller of the two indexes to reduce the // number of operations we need to do. if (this.size <= other.size) { - for (const [key, values1] of this.entries()) { + for (const [key, valueMap] of this.entries()) { if (!other.has(key)) continue - const values2 = other.get(key) - for (const [val1, mul1] of values1) { - for (const [val2, mul2] of values2) { + const otherValues = other.get(key) + for (const [val1, mul1] of valueMap.values()) { + for (const [val2, mul2] of otherValues) { if (mul1 !== 0 && mul2 !== 0) { result.push([[key, [val1, val2]], mul1 * mul2]) } @@ -128,11 +102,11 @@ export class Index { } } } else { - for (const [key, values2] of other.entries()) { + for (const [key, otherValueMap] of other.entries()) { if (!this.has(key)) continue - const values1 = this.get(key) - for (const [val2, mul2] of values2) { - for (const [val1, mul1] of values1) { + const values = this.get(key) + for (const [val2, mul2] of otherValueMap.values()) { + for (const [val1, mul1] of values) { if (mul1 !== 0 && mul2 !== 0) { result.push([[key, [val1, val2]], mul1 * mul2]) } diff --git a/packages/d2mini/src/operators/join.ts b/packages/d2mini/src/operators/join.ts index 7c35e65..2b13049 100644 --- a/packages/d2mini/src/operators/join.ts +++ b/packages/d2mini/src/operators/join.ts @@ -77,13 +77,6 @@ export class JoinOperator extends BinaryOperator< // Append deltaB to indexB this.#indexB.append(deltaB) - - // Compact both indexes to consolidate values and remove zero-multiplicity entries - // Only compact changed keys for efficiency - deltaA.compact() - deltaB.compact() - this.#indexA.compact() - this.#indexB.compact() } } diff --git a/packages/d2mini/src/operators/reduce.ts b/packages/d2mini/src/operators/reduce.ts index 1188ec9..b7c6649 100644 --- a/packages/d2mini/src/operators/reduce.ts +++ b/packages/d2mini/src/operators/reduce.ts @@ -75,6 +75,15 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { } } + // NOTE: in the below logic if an element is still present + // but its multiplicity changed then we are going + // to emit 2 diffs: a -oldMultiplicity and a +newMultiplicity + // Why not detect this case and emit a single diff newMultiplicity - oldMultiplicity ? + // + // We could when we map of both maps we could detect that it is in both (i.e. add an else branch) + // and keep track of a set of keys that are in both + // then a third for loop that loops over those common keys and emits the diff in multiplicity + // First, emit removals for old values that are no longer present or have changed for (const [valueKey, { value, multiplicity }] of oldOutputMap) { const newEntry = newOutputMap.get(valueKey) @@ -101,11 +110,6 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { if (result.length > 0) { this.output.sendData(new MultiSet(result)) } - - // Compact both indexes to consolidate values and remove zero-multiplicity entries - // Only compact changed keys for efficiency - this.#index.compact() - this.#indexOut.compact() } } diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index a6d173d..6d7ed74 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -293,11 +293,6 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< if (result.length > 0) { this.output.sendData(new MultiSet(result)) } - - // Compact both indexes to consolidate values and remove zero-multiplicity entries - // Only compact changed keys for efficiency - this.#index.compact() - this.#indexOut.compact() } } From c8da837f19697bbee76986f3cd8c810d0b33e413 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 7 Jul 2025 14:42:33 +0200 Subject: [PATCH 2/6] Modify index tests to no longer assert uncompacted form --- packages/d2mini/tests/indexes.test.ts | 50 +++------------------------ 1 file changed, 4 insertions(+), 46 deletions(-) diff --git a/packages/d2mini/tests/indexes.test.ts b/packages/d2mini/tests/indexes.test.ts index 58c39cc..223840d 100644 --- a/packages/d2mini/tests/indexes.test.ts +++ b/packages/d2mini/tests/indexes.test.ts @@ -31,17 +31,9 @@ describe('Index', () => { index.addValue('key1', [10, 1]) index.addValue('key1', [10, -1]) - // Before compaction, values are stored as-is + // Value is compacted on the fly const result = index.get('key1') - expect(result).toEqual([ - [10, 1], - [10, -1], - ]) - - // After compaction, zero-multiplicity values are removed - index.compact(['key1']) - const compactedResult = index.get('key1') - expect(compactedResult).toEqual([]) + expect(result).toEqual([]) }) }) @@ -70,14 +62,6 @@ describe('Index', () => { index.append(other) - // Before compaction, values are stored separately - expect(index.get('key1')).toEqual([ - [10, 2], - [10, 3], - ]) - - // After compaction, multiplicities are combined - index.compact(['key1']) expect(index.get('key1')).toEqual([[10, 5]]) }) }) @@ -164,43 +148,17 @@ describe('Index', () => { // Append should also track changed keys index.append(other) - // Before compaction, all values should be stored as-is - expect(index.get('key1')).toEqual([ - [10, 1], - [10, -1], - ]) - expect(index.get('key2')).toEqual([[20, 1]]) - expect(index.get('key3')).toEqual([[30, 1]]) - expect(index.get('key4')).toEqual([ - [40, 1], - [40, -1], - ]) - expect(index.get('key5')).toEqual([[50, 1]]) - - // Compact without arguments should only compact changed keys - index.compact() - - // After compaction, values should be consolidated and zero-multiplicity entries removed + // Values should be consolidated and zero-multiplicity entries removed expect(index.get('key1')).toEqual([]) // Cancelled out expect(index.get('key2')).toEqual([[20, 1]]) expect(index.get('key3')).toEqual([[30, 1]]) expect(index.get('key4')).toEqual([]) // Cancelled out expect(index.get('key5')).toEqual([[50, 1]]) - // Add more values after compaction + // Add more values index.addValue('key2', [25, 1]) index.addValue('key6', [60, 1]) - // Only key2 and key6 should have new uncompacted values - expect(index.get('key2')).toEqual([ - [20, 1], - [25, 1], - ]) - expect(index.get('key6')).toEqual([[60, 1]]) - - // Compact again - should only affect key2 and key6 - index.compact() - expect(index.get('key2')).toEqual([ [20, 1], [25, 1], From c9b8ea8f27b7efbceff1b3d648a4483e12126abe Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 7 Jul 2025 15:14:04 +0200 Subject: [PATCH 3/6] Remove obsolete comments --- packages/d2mini/src/operators/reduce copy.ts | 103 +++++++++++++++++++ packages/d2mini/src/operators/reduce.ts | 9 -- 2 files changed, 103 insertions(+), 9 deletions(-) create mode 100644 packages/d2mini/src/operators/reduce copy.ts diff --git a/packages/d2mini/src/operators/reduce copy.ts b/packages/d2mini/src/operators/reduce copy.ts new file mode 100644 index 0000000..6704a38 --- /dev/null +++ b/packages/d2mini/src/operators/reduce copy.ts @@ -0,0 +1,103 @@ +import { IStreamBuilder, KeyValue } from '../types.js' +import { + DifferenceStreamReader, + DifferenceStreamWriter, + UnaryOperator, +} from '../graph.js' +import { StreamBuilder } from '../d2.js' +import { MultiSet } from '../multiset.js' +import { Index } from '../indexes.js' +import { hash } from '../utils.js' + +/** + * Base operator for reduction operations (version-free) + */ +export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { + #index = new Index() + #indexOut = new Index() + #f: (values: [V1, number][]) => [V2, number][] + + constructor( + id: number, + inputA: DifferenceStreamReader<[K, V1]>, + output: DifferenceStreamWriter<[K, V2]>, + f: (values: [V1, number][]) => [V2, number][], + ) { + super(id, inputA, output) + this.#f = f + } + + run(): void { + const keysTodo = new Set() + + // Collect all input messages and update the index + for (const message of this.inputMessages()) { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + this.#index.addValue(key, [value, multiplicity]) + keysTodo.add(key) + } + } + + // For each key, compute the reduction and delta + const result: [[K, V2], number][] = [] + for (const key of keysTodo) { + const curr = this.#index.get(key) + const currOut = this.#indexOut.get(key) + const out = this.#f(curr) + + // Calculate delta between current and previous output + const delta = new Map() + const values = new Map() + for (const [value, multiplicity] of out) { + const valueKey = hash(value) + values.set(valueKey, value) + delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity) + } + for (const [value, multiplicity] of currOut) { + const valueKey = hash(value) + values.set(valueKey, value) + delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity) + } + + // Add non-zero deltas to result + for (const [valueKey, multiplicity] of delta) { + const value = values.get(valueKey)! + if (multiplicity !== 0) { + result.push([[key, value], multiplicity]) + this.#indexOut.addValue(key, [value, multiplicity]) + } + } + } + + if (result.length > 0) { + this.output.sendData(new MultiSet(result)) + } + } +} + +/** + * Reduces the elements in the stream by key (version-free) + */ +export function reduce< + K extends T extends KeyValue ? K : never, + V1 extends T extends KeyValue ? V : never, + R, + T, +>(f: (values: [V1, number][]) => [R, number][]) { + return (stream: IStreamBuilder): IStreamBuilder> => { + const output = new StreamBuilder>( + stream.graph, + new DifferenceStreamWriter>(), + ) + const operator = new ReduceOperator( + stream.graph.getNextOperatorId(), + stream.connectReader() as DifferenceStreamReader>, + output.writer, + f, + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/d2mini/src/operators/reduce.ts b/packages/d2mini/src/operators/reduce.ts index b7c6649..30b3e68 100644 --- a/packages/d2mini/src/operators/reduce.ts +++ b/packages/d2mini/src/operators/reduce.ts @@ -75,15 +75,6 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { } } - // NOTE: in the below logic if an element is still present - // but its multiplicity changed then we are going - // to emit 2 diffs: a -oldMultiplicity and a +newMultiplicity - // Why not detect this case and emit a single diff newMultiplicity - oldMultiplicity ? - // - // We could when we map of both maps we could detect that it is in both (i.e. add an else branch) - // and keep track of a set of keys that are in both - // then a third for loop that loops over those common keys and emits the diff in multiplicity - // First, emit removals for old values that are no longer present or have changed for (const [valueKey, { value, multiplicity }] of oldOutputMap) { const newEntry = newOutputMap.get(valueKey) From d9375db9d178eb32a05b7139f744190249e11538 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 7 Jul 2025 16:00:55 +0200 Subject: [PATCH 4/6] changeset --- .changeset/evil-pianos-hear.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/evil-pianos-hear.md diff --git a/.changeset/evil-pianos-hear.md b/.changeset/evil-pianos-hear.md new file mode 100644 index 0000000..b6af55c --- /dev/null +++ b/.changeset/evil-pianos-hear.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +Modify index implementation to keep a map of consolidated values and their multiplicities. This improves efficiency to get a value's multiplicity since it's already precomputed. From a5f268d1ad3f17e292656fdd6f8f7a20663fc195 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 7 Jul 2025 16:05:54 +0200 Subject: [PATCH 5/6] Refactor reduce --- packages/d2mini/src/operators/reduce.ts | 26 +++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/packages/d2mini/src/operators/reduce.ts b/packages/d2mini/src/operators/reduce.ts index 30b3e68..ae0bd2f 100644 --- a/packages/d2mini/src/operators/reduce.ts +++ b/packages/d2mini/src/operators/reduce.ts @@ -75,25 +75,43 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { } } - // First, emit removals for old values that are no longer present or have changed + const commonKeys = new Set() + + // First, emit removals for old values that are no longer present for (const [valueKey, { value, multiplicity }] of oldOutputMap) { const newEntry = newOutputMap.get(valueKey) - if (!newEntry || newEntry.multiplicity !== multiplicity) { + if (!newEntry) { // Remove the old value entirely result.push([[key, value], -multiplicity]) this.#indexOut.addValue(key, [value, -multiplicity]) + } else { + commonKeys.add(valueKey) } } - // Then, emit additions for new values that are not present in old or have changed + // Then, emit additions for new values that are not present in old for (const [valueKey, { value, multiplicity }] of newOutputMap) { const oldEntry = oldOutputMap.get(valueKey) - if (!oldEntry || oldEntry.multiplicity !== multiplicity) { + if (!oldEntry) { // Add the new value only if it has non-zero multiplicity if (multiplicity !== 0) { result.push([[key, value], multiplicity]) this.#indexOut.addValue(key, [value, multiplicity]) } + } else { + commonKeys.add(valueKey) + } + } + + // Then, emit multiplicity changes for values that were present and are still present + for (const valueKey of commonKeys) { + const newEntry = newOutputMap.get(valueKey) + const oldEntry = oldOutputMap.get(valueKey) + const delta = newEntry!.multiplicity - oldEntry!.multiplicity + // Only emit actual changes, i.e. non-zero deltas + if (delta !== 0) { + result.push([[key, newEntry!.value], delta]) + this.#indexOut.addValue(key, [newEntry!.value, delta]) } } } From 1469779fec7a85b82de745cec6e4933b849b5e9c Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 7 Jul 2025 16:07:27 +0200 Subject: [PATCH 6/6] Update changeset --- .changeset/evil-pianos-hear.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/evil-pianos-hear.md b/.changeset/evil-pianos-hear.md index b6af55c..8a647f1 100644 --- a/.changeset/evil-pianos-hear.md +++ b/.changeset/evil-pianos-hear.md @@ -2,4 +2,4 @@ '@electric-sql/d2mini': patch --- -Modify index implementation to keep a map of consolidated values and their multiplicities. This improves efficiency to get a value's multiplicity since it's already precomputed. +Modify index implementation to keep a map of consolidated values and their multiplicities. This improves efficiency to get a value's multiplicity since it's already precomputed. Also modify reduce operator to emit a single diff instead of 2 diffs (1 that is -oldMultiplicity and 1 that is +newMultiplicity).