diff --git a/.changeset/evil-pianos-hear.md b/.changeset/evil-pianos-hear.md new file mode 100644 index 0000000..8a647f1 --- /dev/null +++ b/.changeset/evil-pianos-hear.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +Modify index implementation to keep a map of consolidated values and their multiplicities. This improves efficiency to get a value's multiplicity since it's already precomputed. Also modify reduce operator to emit a single diff instead of 2 diffs (1 that is -oldMultiplicity and 1 that is +newMultiplicity). diff --git a/packages/d2mini/src/indexes.ts b/packages/d2mini/src/indexes.ts index 5108b00..c02893e 100644 --- a/packages/d2mini/src/indexes.ts +++ b/packages/d2mini/src/indexes.ts @@ -7,24 +7,32 @@ import { DefaultMap, hash } from './utils.js' * exploit the key-value structure of the data to run efficiently. */ export class Index { - #inner: DefaultMap - #changedKeys: Set + #inner: DefaultMap> constructor() { - this.#inner = new DefaultMap(() => []) - this.#changedKeys = new Set() + this.#inner = new DefaultMap>( + () => + new DefaultMap(() => [undefined as any as V, 0]), + ) + // #inner is as map of: + // { + // [key]: { + // [hash(value)]: [value, multiplicity] + // } + // } } toString(indent = false): string { return `Index(${JSON.stringify( - [...this.#inner], + [...this.#inner].map(([k, valueMap]) => [k, [...valueMap]]), undefined, indent ? ' ' : undefined, )})` } get(key: K): [V, number][] { - return this.#inner.get(key) + const valueMap = this.#inner.get(key) + return [...valueMap.values()] } entries() { @@ -36,78 +44,44 @@ export class Index { } has(key: K): boolean { - return this.#inner.has(key) && this.#inner.get(key).length > 0 + return this.#inner.has(key) } get size(): number { - let count = 0 - for (const [, values] of this.#inner.entries()) { - if (values.length > 0) { - count++ - } - } - return count + return this.#inner.size } addValue(key: K, value: [V, number]): void { - const values = this.#inner.get(key) - values.push(value) - this.#changedKeys.add(key) - } - - append(other: Index): void { - for (const [key, otherValues] of other.entries()) { - const thisValues = this.#inner.get(key) - for (const value of otherValues) { - thisValues.push(value) - } - this.#changedKeys.add(key) - } - } - - compact(keys: K[] = []): void { - // If no keys specified, use the changed keys - const keysToProcess = keys.length === 0 ? [...this.#changedKeys] : keys - - for (const key of keysToProcess) { - if (!this.#inner.has(key)) continue - - const values = this.#inner.get(key) - const consolidated = this.consolidateValues(values) - - // Remove the key entirely and re-add only if there are non-zero values - this.#inner.delete(key) - if (consolidated.length > 0) { - this.#inner.get(key).push(...consolidated) - } - } - - // Clear the changed keys after compaction - if (keys.length === 0) { - this.#changedKeys.clear() - } else { - // Only remove the keys that were explicitly compacted - for (const key of keys) { - this.#changedKeys.delete(key) + const [val, multiplicity] = value + const valueMap = this.#inner.get(key) + const valueHash = hash(val) + const [, existingMultiplicity] = valueMap.get(valueHash) + const newMultiplicity = existingMultiplicity + multiplicity + if (multiplicity !== 0) { + if (newMultiplicity === 0) { + valueMap.delete(valueHash) + } else { + valueMap.set(valueHash, [val, newMultiplicity]) } } } - private consolidateValues(values: [V, number][]): [V, number][] { - const consolidated = new Map() - - for (const [value, multiplicity] of values) { - const valueHash = hash(value) - if (consolidated.has(valueHash)) { - consolidated.get(valueHash)!.multiplicity += multiplicity - } else { - consolidated.set(valueHash, { value, multiplicity }) + append(other: Index): void { + for (const [key, otherValueMap] of other.entries()) { + const thisValueMap = this.#inner.get(key) + for (const [ + valueHash, + [value, multiplicity], + ] of otherValueMap.entries()) { + const [, existingMultiplicity] = thisValueMap.get(valueHash) + const newMultiplicity = existingMultiplicity + multiplicity + if (newMultiplicity === 0) { + thisValueMap.delete(valueHash) + } else { + thisValueMap.set(valueHash, [value, newMultiplicity]) + } } } - - return [...consolidated.values()] - .filter(({ multiplicity }) => multiplicity !== 0) - .map(({ value, multiplicity }) => [value, multiplicity]) } join(other: Index): MultiSet<[K, [V, V2]]> { @@ -116,11 +90,11 @@ export class Index { // We want to iterate over the smaller of the two indexes to reduce the // number of operations we need to do. if (this.size <= other.size) { - for (const [key, values1] of this.entries()) { + for (const [key, valueMap] of this.entries()) { if (!other.has(key)) continue - const values2 = other.get(key) - for (const [val1, mul1] of values1) { - for (const [val2, mul2] of values2) { + const otherValues = other.get(key) + for (const [val1, mul1] of valueMap.values()) { + for (const [val2, mul2] of otherValues) { if (mul1 !== 0 && mul2 !== 0) { result.push([[key, [val1, val2]], mul1 * mul2]) } @@ -128,11 +102,11 @@ export class Index { } } } else { - for (const [key, values2] of other.entries()) { + for (const [key, otherValueMap] of other.entries()) { if (!this.has(key)) continue - const values1 = this.get(key) - for (const [val2, mul2] of values2) { - for (const [val1, mul1] of values1) { + const values = this.get(key) + for (const [val2, mul2] of otherValueMap.values()) { + for (const [val1, mul1] of values) { if (mul1 !== 0 && mul2 !== 0) { result.push([[key, [val1, val2]], mul1 * mul2]) } diff --git a/packages/d2mini/src/operators/join.ts b/packages/d2mini/src/operators/join.ts index 7c35e65..2b13049 100644 --- a/packages/d2mini/src/operators/join.ts +++ b/packages/d2mini/src/operators/join.ts @@ -77,13 +77,6 @@ export class JoinOperator extends BinaryOperator< // Append deltaB to indexB this.#indexB.append(deltaB) - - // Compact both indexes to consolidate values and remove zero-multiplicity entries - // Only compact changed keys for efficiency - deltaA.compact() - deltaB.compact() - this.#indexA.compact() - this.#indexB.compact() } } diff --git a/packages/d2mini/src/operators/reduce copy.ts b/packages/d2mini/src/operators/reduce copy.ts new file mode 100644 index 0000000..6704a38 --- /dev/null +++ b/packages/d2mini/src/operators/reduce copy.ts @@ -0,0 +1,103 @@ +import { IStreamBuilder, KeyValue } from '../types.js' +import { + DifferenceStreamReader, + DifferenceStreamWriter, + UnaryOperator, +} from '../graph.js' +import { StreamBuilder } from '../d2.js' +import { MultiSet } from '../multiset.js' +import { Index } from '../indexes.js' +import { hash } from '../utils.js' + +/** + * Base operator for reduction operations (version-free) + */ +export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { + #index = new Index() + #indexOut = new Index() + #f: (values: [V1, number][]) => [V2, number][] + + constructor( + id: number, + inputA: DifferenceStreamReader<[K, V1]>, + output: DifferenceStreamWriter<[K, V2]>, + f: (values: [V1, number][]) => [V2, number][], + ) { + super(id, inputA, output) + this.#f = f + } + + run(): void { + const keysTodo = new Set() + + // Collect all input messages and update the index + for (const message of this.inputMessages()) { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + this.#index.addValue(key, [value, multiplicity]) + keysTodo.add(key) + } + } + + // For each key, compute the reduction and delta + const result: [[K, V2], number][] = [] + for (const key of keysTodo) { + const curr = this.#index.get(key) + const currOut = this.#indexOut.get(key) + const out = this.#f(curr) + + // Calculate delta between current and previous output + const delta = new Map() + const values = new Map() + for (const [value, multiplicity] of out) { + const valueKey = hash(value) + values.set(valueKey, value) + delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity) + } + for (const [value, multiplicity] of currOut) { + const valueKey = hash(value) + values.set(valueKey, value) + delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity) + } + + // Add non-zero deltas to result + for (const [valueKey, multiplicity] of delta) { + const value = values.get(valueKey)! + if (multiplicity !== 0) { + result.push([[key, value], multiplicity]) + this.#indexOut.addValue(key, [value, multiplicity]) + } + } + } + + if (result.length > 0) { + this.output.sendData(new MultiSet(result)) + } + } +} + +/** + * Reduces the elements in the stream by key (version-free) + */ +export function reduce< + K extends T extends KeyValue ? K : never, + V1 extends T extends KeyValue ? V : never, + R, + T, +>(f: (values: [V1, number][]) => [R, number][]) { + return (stream: IStreamBuilder): IStreamBuilder> => { + const output = new StreamBuilder>( + stream.graph, + new DifferenceStreamWriter>(), + ) + const operator = new ReduceOperator( + stream.graph.getNextOperatorId(), + stream.connectReader() as DifferenceStreamReader>, + output.writer, + f, + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/d2mini/src/operators/reduce.ts b/packages/d2mini/src/operators/reduce.ts index 1188ec9..ae0bd2f 100644 --- a/packages/d2mini/src/operators/reduce.ts +++ b/packages/d2mini/src/operators/reduce.ts @@ -75,25 +75,43 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { } } - // First, emit removals for old values that are no longer present or have changed + const commonKeys = new Set() + + // First, emit removals for old values that are no longer present for (const [valueKey, { value, multiplicity }] of oldOutputMap) { const newEntry = newOutputMap.get(valueKey) - if (!newEntry || newEntry.multiplicity !== multiplicity) { + if (!newEntry) { // Remove the old value entirely result.push([[key, value], -multiplicity]) this.#indexOut.addValue(key, [value, -multiplicity]) + } else { + commonKeys.add(valueKey) } } - // Then, emit additions for new values that are not present in old or have changed + // Then, emit additions for new values that are not present in old for (const [valueKey, { value, multiplicity }] of newOutputMap) { const oldEntry = oldOutputMap.get(valueKey) - if (!oldEntry || oldEntry.multiplicity !== multiplicity) { + if (!oldEntry) { // Add the new value only if it has non-zero multiplicity if (multiplicity !== 0) { result.push([[key, value], multiplicity]) this.#indexOut.addValue(key, [value, multiplicity]) } + } else { + commonKeys.add(valueKey) + } + } + + // Then, emit multiplicity changes for values that were present and are still present + for (const valueKey of commonKeys) { + const newEntry = newOutputMap.get(valueKey) + const oldEntry = oldOutputMap.get(valueKey) + const delta = newEntry!.multiplicity - oldEntry!.multiplicity + // Only emit actual changes, i.e. non-zero deltas + if (delta !== 0) { + result.push([[key, newEntry!.value], delta]) + this.#indexOut.addValue(key, [newEntry!.value, delta]) } } } @@ -101,11 +119,6 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { if (result.length > 0) { this.output.sendData(new MultiSet(result)) } - - // Compact both indexes to consolidate values and remove zero-multiplicity entries - // Only compact changed keys for efficiency - this.#index.compact() - this.#indexOut.compact() } } diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index a6d173d..6d7ed74 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -293,11 +293,6 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< if (result.length > 0) { this.output.sendData(new MultiSet(result)) } - - // Compact both indexes to consolidate values and remove zero-multiplicity entries - // Only compact changed keys for efficiency - this.#index.compact() - this.#indexOut.compact() } } diff --git a/packages/d2mini/tests/indexes.test.ts b/packages/d2mini/tests/indexes.test.ts index 58c39cc..223840d 100644 --- a/packages/d2mini/tests/indexes.test.ts +++ b/packages/d2mini/tests/indexes.test.ts @@ -31,17 +31,9 @@ describe('Index', () => { index.addValue('key1', [10, 1]) index.addValue('key1', [10, -1]) - // Before compaction, values are stored as-is + // Value is compacted on the fly const result = index.get('key1') - expect(result).toEqual([ - [10, 1], - [10, -1], - ]) - - // After compaction, zero-multiplicity values are removed - index.compact(['key1']) - const compactedResult = index.get('key1') - expect(compactedResult).toEqual([]) + expect(result).toEqual([]) }) }) @@ -70,14 +62,6 @@ describe('Index', () => { index.append(other) - // Before compaction, values are stored separately - expect(index.get('key1')).toEqual([ - [10, 2], - [10, 3], - ]) - - // After compaction, multiplicities are combined - index.compact(['key1']) expect(index.get('key1')).toEqual([[10, 5]]) }) }) @@ -164,43 +148,17 @@ describe('Index', () => { // Append should also track changed keys index.append(other) - // Before compaction, all values should be stored as-is - expect(index.get('key1')).toEqual([ - [10, 1], - [10, -1], - ]) - expect(index.get('key2')).toEqual([[20, 1]]) - expect(index.get('key3')).toEqual([[30, 1]]) - expect(index.get('key4')).toEqual([ - [40, 1], - [40, -1], - ]) - expect(index.get('key5')).toEqual([[50, 1]]) - - // Compact without arguments should only compact changed keys - index.compact() - - // After compaction, values should be consolidated and zero-multiplicity entries removed + // Values should be consolidated and zero-multiplicity entries removed expect(index.get('key1')).toEqual([]) // Cancelled out expect(index.get('key2')).toEqual([[20, 1]]) expect(index.get('key3')).toEqual([[30, 1]]) expect(index.get('key4')).toEqual([]) // Cancelled out expect(index.get('key5')).toEqual([[50, 1]]) - // Add more values after compaction + // Add more values index.addValue('key2', [25, 1]) index.addValue('key6', [60, 1]) - // Only key2 and key6 should have new uncompacted values - expect(index.get('key2')).toEqual([ - [20, 1], - [25, 1], - ]) - expect(index.get('key6')).toEqual([[60, 1]]) - - // Compact again - should only affect key2 and key6 - index.compact() - expect(index.get('key2')).toEqual([ [20, 1], [25, 1],