diff --git a/docs/ken-protocol-assessment.md b/docs/ken-protocol-assessment.md index 371721c61..763fc5de2 100644 --- a/docs/ken-protocol-assessment.md +++ b/docs/ken-protocol-assessment.md @@ -46,28 +46,51 @@ Key aspects: | Retransmission | ✓ | Timeout-based retransmit until ACK or max retries | | Crash-safe persistence | ✓ | Write message first, then update nextSendSeq | | Local recovery | ✓ | Restore seq state, restart ACK timeout | +| **Transactional turns** | ✓ | Crank buffering defers outputs until crank commit | +| **Deferred transmission** | **Partial** | Buffered within kernel, but RemoteHandle transmits immediately on flush | -### What We're Missing or Differs +### Recent Improvements: Crank Buffering -#### 1. Transactional Turns (Major Gap) +The crank buffering feature (issue #786) significantly improves our alignment with Ken: + +**Our new crank model:** +``` +crank_start(deliver one item from run queue) + → create database savepoint + → vat processes message + → vat syscalls buffer outputs (sends, notifications) in CrankBuffer +crank_end: + → if success: atomically flush buffer to run queue + commit state + → if failure: rollback to savepoint, discard buffer +``` + +This achieves Ken's core property that **outputs are only externalized after successful turn completion**. Within the kernel: +- `enqueueSend(target, message, immediate=false)` buffers sends +- `enqueueNotify(endpoint, kpid, immediate=false)` buffers notifications +- `resolvePromises(endpoint, resolutions, immediate=false)` buffers all resolution effects +- On successful crank: `#flushCrankBuffer()` moves items to run queue +- On rollback: buffer is discarded along with database changes + +### What We're Still Missing or Differs + +#### 1. Deferred Network Transmission (Gap) **Ken's model:** ``` -turn_start(deliver one message) - → processing_function executes - → outputs buffered in Q_out -turn_end: - → atomically persist(turn, app_state, Q_out, Done) - → THEN transmit buffered messages +persist checkpoint → THEN transmit to network ``` **Our model:** ``` -message received → kernel processes → sends outputs immediately -each output: persist message → update seq → transmit +crank completes → flush to run queue → eventually delivered to RemoteHandle +RemoteHandle: persist message → transmit immediately ``` -The kernel's "crank" mechanism may provide turn-like boundaries, but `RemoteHandle` doesn't coordinate with it. Messages are transmitted immediately after being persisted, not deferred until end of turn. +While crank outputs are now buffered until crank commit, when a message reaches `RemoteHandle` for remote transmission, it is persisted and transmitted in quick succession. A crash between persist and transmit could result in the message being retransmitted on recovery (which is fine due to idempotency), but more critically, there's no coordination ensuring the kernel's crank commit happens before network transmission. + +**Impact**: If RemoteHandle transmits a message and then the kernel crashes before its crank fully commits, the remote has received a message that the local kernel will "forget" on recovery. This violates output validity. + +**Mitigation needed**: RemoteHandle should only transmit messages after the originating crank has been fully committed. This requires coordination between the kernel's crank lifecycle and RemoteHandle's transmission timing. #### 2. Done Table / Duplicate Detection (Gap) @@ -77,58 +100,63 @@ Ken maintains a `Done` table ensuring: We track `highestReceivedSeq` but only for ACK purposes. We don't have explicit duplicate detection for incoming messages. If the remote retransmits a message we already processed (but before we ACKed), we could deliver it twice. -#### 3. Output Validity (Partial) +#### 3. Output Validity (Improved, but Partial) Ken guarantees outputs could have resulted from failure-free execution because: - Outputs are buffered during a turn - A crash during processing loses all outputs from that turn - Only committed outputs escape to the outside world -Our system transmits immediately after persisting, so a crash mid-crank could result in: -- Some messages transmitted to remote -- But kernel state not yet committed -- On recovery, kernel re-executes and sends different/duplicate messages +**Improvement**: With crank buffering, kernel-internal outputs (sends to local vats, notifications) are now properly buffered and discarded on rollback. A crash mid-crank no longer results in partial kernel state. -#### 4. Atomic Checkpoint (Gap) - -Ken atomically checkpoints `(turn, app_state, Q_out, Done)` together at end of turn. +**Remaining gap**: For remote messages, the gap described in #1 above means network transmissions could still escape before the crank is fully committed. -Our system persists messages individually as sent. There's no atomic boundary coordinating kernel state with outgoing message state. +#### 4. Atomic Checkpoint (Improved) -#### 5. Deferred Transmission (Gap) - -**Ken:** `persist checkpoint → THEN transmit` +Ken atomically checkpoints `(turn, app_state, Q_out, Done)` together at end of turn. -**Ours:** `persist message → transmit immediately` +**Improvement**: The kernel now uses database savepoints to make crank state changes atomic. The `CrankBuffer` contents are flushed atomically with the crank commit. -Ken's approach ensures the "send" is recorded in checkpoint before any transmission. This is crucial for the consistent frontier property. +**Remaining gap**: RemoteHandle's message persistence is separate from the kernel's crank commit. These two persistence operations are not atomic with respect to each other. -#### 6. Input Queue Handling (Gap) +#### 5. FIFO Enforcement on Receive (Gap) -Ken can opportunistically persist incoming messages before delivery. On crash, the input queue is reconstructed from sender retransmissions. +Hold out-of-order messages until predecessors processed: +- Track expected next seq per sender +- Buffer messages that arrive out of order +- Deliver in sequence order only -We don't persist incoming messages. On crash, we rely entirely on senders to retransmit. +We don't currently enforce FIFO delivery order on the receive side. ### Summary Table | Ken Property | Our System | Notes | |--------------|------------|-------| | Exactly-once delivery | **Partial** | At-least-once with no duplicate detection | -| Output validity | **No** | Immediate transmission, no turn boundaries | -| Transactional turns | **No** | No coordination with kernel cranks | -| Consistent frontier | **Partial** | No atomic checkpoint across kernel+remote state | +| Output validity | **Partial** | ✓ for kernel-internal, gap for remote transmission | +| Transactional turns | **Yes** | Crank buffering provides turn boundaries | +| Consistent frontier | **Partial** | Kernel state atomic, but not coordinated with RemoteHandle | | Local recovery | **Yes** | Crashes don't affect other processes | | Sender-based logging | **Yes** | Messages persisted until ACKed | +| Deferred transmission | **Partial** | ✓ within kernel, gap at network boundary | | FIFO ordering | **Partial** | Per-sender seq, but no enforcement on receive side | -## What Would Be Needed to Achieve Ken Properties +## What Would Be Needed to Achieve Full Ken Properties + +### 1. Coordinate RemoteHandle with Crank Commit (Critical) -### 1. Coordinate with Kernel Crank Boundaries +The most important remaining gap. Options: -Buffer outgoing messages during crank execution, persist and transmit only at crank commit. This would require: -- `RemoteHandle` to be aware of crank boundaries -- Outgoing messages buffered in memory during crank -- Batch persist + transmit at crank commit +**Option A: Two-phase approach** +- During crank: RemoteHandle persists message but does NOT transmit +- After crank commit: Signal RemoteHandle to transmit persisted messages +- Requires: Crank commit notification mechanism to RemoteHandle + +**Option B: Defer to run queue delivery** +- RemoteHandle only transmits when it receives a "transmit" item from run queue +- Crank buffers "transmit" items along with other outputs +- Flush adds transmit items to run queue +- RemoteHandle processes transmit items after crank commit ### 2. Add Done Table @@ -137,21 +165,7 @@ Track processed message IDs, deduplicate on receive: - On receive, check if message already in `Done` before delivering - ACK messages in `Done` without re-delivering -### 3. Atomic Checkpoint - -Persist kernel state and output queue together: -- Single atomic write at end of crank -- Include: kernel state, outgoing messages, Done table updates -- Requires coordination between kernel store and remote message persistence - -### 4. Defer Transmission - -Transmit only after checkpoint completes: -- Buffer messages during turn -- After atomic checkpoint succeeds, release messages for transmission -- This ensures "send" is recorded before any transmission occurs - -### 5. FIFO Enforcement on Receive +### 3. FIFO Enforcement on Receive Hold out-of-order messages until predecessors processed: - Track expected next seq per sender @@ -160,18 +174,30 @@ Hold out-of-order messages until predecessors processed: ## Architectural Implications -The most significant change would be integrating `RemoteHandle` with the kernel's crank/commit cycle. Currently: +The crank buffering work has brought us significantly closer to Ken's model: +**Before crank buffering:** ``` Kernel Crank: - process message → syscalls may send to remote + process message → syscalls immediately enqueue to run queue RemoteHandle (independent): persist each outgoing message → transmit immediately ``` -Ken-style architecture: +**After crank buffering:** +``` +Kernel Crank: + process message → syscalls buffer outputs + +Crank Commit (atomic): + persist(kernel_state) + flush(buffered_outputs to run queue) +RemoteHandle (still independent): + receive from run queue → persist → transmit immediately +``` + +**Ken-style architecture (goal):** ``` Kernel Crank: process message → syscalls buffer outputs @@ -180,10 +206,21 @@ Crank Commit (atomic): persist(kernel_state, buffered_outputs, done_table) Post-Commit: - transmit buffered outputs + signal RemoteHandle to transmit persisted messages ``` -This would require the kernel to control when `RemoteHandle` actually transmits, rather than `RemoteHandle` transmitting independently. +The key remaining work is ensuring that network transmission only happens after the crank that produced the message has been fully committed. + +## Progress Summary + +| Area | Before | After Crank Buffering | +|------|--------|----------------------| +| Kernel-internal output buffering | No | **Yes** | +| Rollback discards uncommitted outputs | No | **Yes** | +| Atomic kernel state + output queue | No | **Yes** | +| Network transmission deferred to commit | No | No (still needed) | +| Done table for deduplication | No | No (still needed) | +| FIFO enforcement on receive | No | No (still needed) | ## References diff --git a/packages/ocap-kernel/src/KernelQueue.test.ts b/packages/ocap-kernel/src/KernelQueue.test.ts index e79752e5a..1cc8146ee 100644 --- a/packages/ocap-kernel/src/KernelQueue.test.ts +++ b/packages/ocap-kernel/src/KernelQueue.test.ts @@ -43,7 +43,7 @@ describe('KernelQueue', () => { initKernelPromise: vi.fn().mockReturnValue(['kp1']), incrementRefCount: vi.fn(), getKernelPromise: vi.fn(), - resolveKernelPromise: vi.fn(), + resolveKernelPromise: vi.fn().mockReturnValue([]), nextReapAction: vi.fn().mockReturnValue(null), getGCActions: vi.fn().mockReturnValue([]), startCrank: vi.fn(), @@ -51,6 +51,9 @@ describe('KernelQueue', () => { createCrankSavepoint: vi.fn(), rollbackCrank: vi.fn(), waitForCrank: vi.fn(), + // Crank buffer methods + bufferCrankOutput: vi.fn(), + flushCrankBuffer: vi.fn().mockReturnValue([]), } as unknown as KernelStore; kernelQueue = new KernelQueue(kernelStore, terminateVat); @@ -244,24 +247,24 @@ describe('KernelQueue', () => { }); describe('enqueueNotify', () => { - it('creates a notify item and adds it to the run queue', () => { + it('enqueues a notify and increments refcount', () => { const endpointId = 'v1'; const kpid = 'kp123'; kernelQueue.enqueueNotify(endpointId, kpid); + expect(kernelStore.incrementRefCount).toHaveBeenCalledWith( + kpid, + 'notify', + ); expect(kernelStore.enqueueRun).toHaveBeenCalledWith({ type: 'notify', endpointId, kpid, }); - expect(kernelStore.incrementRefCount).toHaveBeenCalledWith( - kpid, - 'notify', - ); }); }); describe('resolvePromises', () => { - it('resolves kernel promises and notifies subscribers', () => { + it('resolves kernel promises and buffers notifications for subscribers', () => { const endpointId = 'v1'; const kpid = 'kp123'; const resolution: VatOneResolution = [ @@ -278,7 +281,7 @@ describe('KernelQueue', () => { ); const resolveHandler = vi.fn(); kernelQueue.subscriptions.set(kpid, resolveHandler); - kernelQueue.resolvePromises(endpointId, [resolution]); + kernelQueue.resolvePromises(endpointId, [resolution], false); expect(kernelStore.incrementRefCount).toHaveBeenCalledWith( kpid, 'resolve|kpid', @@ -287,12 +290,17 @@ describe('KernelQueue', () => { 'slot1', 'resolve|slot', ); - expect(kernelStore.enqueueRun).toHaveBeenCalledWith({ + // Notifications are buffered with refcount increments + expect(kernelStore.incrementRefCount).toHaveBeenCalledWith( + kpid, + 'notify', + ); + expect(kernelStore.bufferCrankOutput).toHaveBeenCalledWith({ type: 'notify', endpointId: 'v2', kpid, }); - expect(kernelStore.enqueueRun).toHaveBeenCalledWith({ + expect(kernelStore.bufferCrankOutput).toHaveBeenCalledWith({ type: 'notify', endpointId: 'v3', kpid, @@ -302,11 +310,10 @@ describe('KernelQueue', () => { false, { body: 'resolved value', slots: ['slot1'] }, ); - expect(resolveHandler).toHaveBeenCalledWith({ - body: 'resolved value', - slots: ['slot1'], - }); - expect(kernelQueue.subscriptions.has(kpid)).toBe(false); + // Kernel subscription callback is NOT called immediately - deferred to flush + expect(resolveHandler).not.toHaveBeenCalled(); + // Subscription is still registered, will be invoked during flush + expect(kernelQueue.subscriptions.has(kpid)).toBe(true); }); it('handles resolutions with undefined vatId (kernel decider)', () => { @@ -326,7 +333,7 @@ describe('KernelQueue', () => { const resolveHandler = vi.fn(); kernelQueue.subscriptions.set(kpid, resolveHandler); const insistEndpointIdSpy = vi.spyOn(types, 'insistEndpointId'); - kernelQueue.resolvePromises(undefined, [resolution]); + kernelQueue.resolvePromises(undefined, [resolution], false); expect(insistEndpointIdSpy).not.toHaveBeenCalled(); expect(kernelStore.incrementRefCount).toHaveBeenCalledWith( kpid, @@ -336,7 +343,12 @@ describe('KernelQueue', () => { 'slot1', 'resolve|slot', ); - expect(kernelStore.enqueueRun).toHaveBeenCalledWith({ + // Notification is buffered with refcount increment + expect(kernelStore.incrementRefCount).toHaveBeenCalledWith( + kpid, + 'notify', + ); + expect(kernelStore.bufferCrankOutput).toHaveBeenCalledWith({ type: 'notify', endpointId: 'v2', kpid, @@ -346,8 +358,9 @@ describe('KernelQueue', () => { false, resolution[2], ); - expect(resolveHandler).toHaveBeenCalledWith(resolution[2]); - expect(kernelQueue.subscriptions.has(kpid)).toBe(false); + // Kernel subscription callback is NOT called immediately - deferred to flush + expect(resolveHandler).not.toHaveBeenCalled(); + expect(kernelQueue.subscriptions.has(kpid)).toBe(true); insistEndpointIdSpy.mockRestore(); }); @@ -368,17 +381,17 @@ describe('KernelQueue', () => { ); const resolveHandler = vi.fn(); kernelQueue.subscriptions.set(kpid, resolveHandler); - kernelQueue.resolvePromises(endpointId, [resolution]); - expect(kernelStore.enqueueRun).not.toHaveBeenCalledWith( - expect.objectContaining({ type: 'notify' }), - ); + kernelQueue.resolvePromises(endpointId, [resolution], false); + // No notifications buffered because no subscribers + expect(kernelStore.bufferCrankOutput).not.toHaveBeenCalled(); expect(kernelStore.resolveKernelPromise).toHaveBeenCalledWith( kpid, false, resolution[2], ); - expect(resolveHandler).toHaveBeenCalledWith(resolution[2]); - expect(kernelQueue.subscriptions.has(kpid)).toBe(false); + // Kernel subscription callback is NOT called immediately - deferred to flush + expect(resolveHandler).not.toHaveBeenCalled(); + expect(kernelQueue.subscriptions.has(kpid)).toBe(true); }); it('throws error if a promise is already resolved', () => { diff --git a/packages/ocap-kernel/src/KernelQueue.ts b/packages/ocap-kernel/src/KernelQueue.ts index f537ec648..1cc6a5edf 100644 --- a/packages/ocap-kernel/src/KernelQueue.ts +++ b/packages/ocap-kernel/src/KernelQueue.ts @@ -37,6 +37,9 @@ export class KernelQueue { /** Message results that the kernel itself has subscribed to */ readonly subscriptions: Map) => void> = new Map(); + /** Promises resolved during this crank that have kernel subscriptions */ + #resolvedWithKernelSubscription: KRef[] = []; + /** Thunk to signal run queue transition from empty to non-empty */ #wakeUpTheRunQueue: (() => void) | null; @@ -72,9 +75,14 @@ export class KernelQueue { // For active vats, this allows the message to be retried in a future crank. // For terminated vats, the message will just go splat. this.#kernelStore.rollbackCrank('start'); + // Discard kernel subscriptions that were queued for invocation + this.#resolvedWithKernelSubscription = []; // TODO: Currently all errors terminate the vat, but instead we could // restart it and terminate the vat only after a certain number of failed // retries. This is probably where we should implement the vat restart logic. + } else { + // Upon on successful crank completion, enqueue buffered vat outputs for delivery. + this.#flushCrankBuffer(); } // Vat termination during delivery is triggered by an illegal syscall // or by syscall.exit(). @@ -149,6 +157,43 @@ export class KernelQueue { } } + /** + * Flush the crank buffer, moving buffered vat output items to the run queue + * and invoking kernel subscription callbacks for resolved promises. + */ + #flushCrankBuffer(): void { + const items = this.#kernelStore.flushCrankBuffer(); + for (const item of items) { + this.#enqueueRun(item); + if (item.type === 'notify') { + // Invoke kernel subscription callback if any, reading resolution + // data from the (now committed) promise state + this.#invokeKernelSubscription(item.kpid); + } + } + + // Invoke kernel subscriptions for promises resolved during this crank + // that don't have kernel-level subscribers (e.g., promises from enqueueMessage) + for (const kpid of this.#resolvedWithKernelSubscription) { + this.#invokeKernelSubscription(kpid); + } + this.#resolvedWithKernelSubscription = []; + } + + /** + * Invoke the kernel subscription callback for a resolved promise, if any. + * + * @param kpid - The promise ID to check for subscriptions. + */ + #invokeKernelSubscription(kpid: KRef): void { + const subscription = this.subscriptions.get(kpid); + if (subscription) { + this.subscriptions.delete(kpid); + const promise = this.#kernelStore.getKernelPromise(kpid); + subscription(promise.value as CapData); + } + } + /** * Queue a message to be delivered from the kernel to an object in an endpoint. * @@ -177,12 +222,13 @@ export class KernelQueue { } /** - * Enqueue a send message to be delivered to an endpoint. + * Enqueue a message send to be delivered to an endpoint. * * @param target - The object to which the message is directed. * @param message - The message to be delivered. + * @param immediate - If true (the default), enqueue immediately; if false, buffer for crank completion. */ - enqueueSend(target: KRef, message: Message): void { + enqueueSend(target: KRef, message: Message, immediate = true): void { this.#kernelStore.incrementRefCount(target, 'queue|target'); if (message.result) { this.#kernelStore.incrementRefCount(message.result, 'queue|result'); @@ -190,26 +236,29 @@ export class KernelQueue { for (const slot of message.methargs.slots || []) { this.#kernelStore.incrementRefCount(slot, 'queue|slot'); } - const queueItem: RunQueueItemSend = { - type: 'send', - target, - message, - }; - this.#enqueueRun(queueItem); + const item: RunQueueItemSend = { type: 'send', target, message }; + if (immediate) { + this.#enqueueRun(item); + } else { + this.#kernelStore.bufferCrankOutput(item); + } } /** - * Enqueue for delivery a notification to an endpoint about the resolution of a - * promise. + * Enqueue a notification of promise resolution to an endpoint. * * @param endpointId - The endpoint that will be notified. * @param kpid - The promise of interest. + * @param immediate - If true (the default), enqueue immediately; if false, buffer for crank completion. */ - enqueueNotify(endpointId: EndpointId, kpid: KRef): void { - const notifyItem: RunQueueItemNotify = { type: 'notify', endpointId, kpid }; - this.#enqueueRun(notifyItem); - // Increment reference count for the promise being notified about + enqueueNotify(endpointId: EndpointId, kpid: KRef, immediate = true): void { this.#kernelStore.incrementRefCount(kpid, 'notify'); + const item: RunQueueItemNotify = { type: 'notify', endpointId, kpid }; + if (immediate) { + this.#enqueueRun(item); + } else { + this.#kernelStore.bufferCrankOutput(item); + } } /** @@ -225,13 +274,19 @@ export class KernelQueue { /** * Process a set of promise resolutions coming from an endpoint. + * When immediate is false (for vat syscalls), notifications and kernel + * subscription callbacks are deferred until the crank buffer is flushed on + * successful crank completion. When immediate is true (for remote message + * handling), effects are immediate. * * @param endpointId - The endpoint doing the resolving, if there is one. * @param resolutions - One or more resolutions, to be processed as a group. + * @param immediate - If true (the default), enqueue immediately; if false, buffer for crank completion. */ resolvePromises( endpointId: EndpointId | undefined, resolutions: VatOneResolution[], + immediate = true, ): void { if (endpointId && endpointId !== 'kernel') { insistEndpointId(endpointId); @@ -258,15 +313,30 @@ export class KernelQueue { throw Fail`${kpid} subscribers not set`; } + // Enqueue notifications for each subscriber (immediate or buffered based on flag). for (const subscriber of subscribers) { - this.enqueueNotify(subscriber, kpid); + this.enqueueNotify(subscriber, kpid, immediate); + } + + // Update promise state and get any queued messages to it. + const queuedMessages = this.#kernelStore.resolveKernelPromise( + kpid, + rejected, + data, + ); + + // Enqueue the queued messages (immediate or buffered based on flag). + for (const [target, message] of queuedMessages) { + this.enqueueSend(target, message, immediate); } - this.#kernelStore.resolveKernelPromise(kpid, rejected, data); - const kernelResolve = this.subscriptions.get(kpid); - if (kernelResolve) { - this.subscriptions.delete(kpid); - kernelResolve(data); + // Handle kernel subscriptions based on immediate flag. + if (immediate) { + // Invoke kernel subscription immediately + this.#invokeKernelSubscription(kpid); + } else if (this.subscriptions.has(kpid)) { + // Track resolved promises that have kernel subscriptions for invocation at flush time + this.#resolvedWithKernelSubscription.push(kpid); } } } diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index 5f47cb80b..9210db9ea 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -46,6 +46,7 @@ describe('kernel store', () => { 'addSubcluster', 'addSubclusterVat', 'allocateErefForKref', + 'bufferCrankOutput', 'cleanupOrphanMessages', 'cleanupTerminatedVat', 'clear', @@ -72,6 +73,7 @@ describe('kernel store', () => { 'enqueueRun', 'erefToKref', 'exportFromEndpoint', + 'flushCrankBuffer', 'forgetEref', 'forgetKref', 'forgetTerminatedVat', diff --git a/packages/ocap-kernel/src/store/index.ts b/packages/ocap-kernel/src/store/index.ts index f058e9d89..b21dd36a6 100644 --- a/packages/ocap-kernel/src/store/index.ts +++ b/packages/ocap-kernel/src/store/index.ts @@ -141,6 +141,7 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { terminatedVats: provideCachedStoredValue('vats.terminated', '[]'), inCrank: false, savepoints: [], + crankBuffer: [], // Subclusters subclusters: provideCachedStoredValue('subclusters', '[]'), nextSubclusterId: provideCachedStoredValue('nextSubclusterId', '1'), @@ -218,6 +219,7 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { '{}', ); crank.releaseAllSavepoints(); + context.crankBuffer.length = 0; preservedState?.forEach(({ key, value }) => { if (value) { context.kv.set(key, value); diff --git a/packages/ocap-kernel/src/store/methods/crank.test.ts b/packages/ocap-kernel/src/store/methods/crank.test.ts index a98d209c7..eba5bca01 100644 --- a/packages/ocap-kernel/src/store/methods/crank.test.ts +++ b/packages/ocap-kernel/src/store/methods/crank.test.ts @@ -8,11 +8,14 @@ describe('crank methods', () => { let context: StoreContext; let kdb: KernelDatabase; let crankMethods: ReturnType; + let mockCrankBuffer: unknown[]; beforeEach(() => { + mockCrankBuffer = []; context = { inCrank: false, savepoints: [], + crankBuffer: mockCrankBuffer, } as unknown as StoreContext; kdb = { @@ -104,6 +107,16 @@ describe('crank methods', () => { expect(kdb.createSavepoint).toHaveBeenLastCalledWith('t1'); expect(context.savepoints).toStrictEqual(['a', 'b2']); }); + + it('clears the crank buffer', () => { + context.inCrank = true; + context.savepoints = ['start']; + mockCrankBuffer.push({ type: 'send' }, { type: 'notify' }); + + crankMethods.rollbackCrank('start'); + + expect(mockCrankBuffer).toHaveLength(0); + }); }); describe('endCrank', () => { diff --git a/packages/ocap-kernel/src/store/methods/crank.ts b/packages/ocap-kernel/src/store/methods/crank.ts index 040373068..8fbb688c3 100644 --- a/packages/ocap-kernel/src/store/methods/crank.ts +++ b/packages/ocap-kernel/src/store/methods/crank.ts @@ -2,7 +2,7 @@ import { Fail, q } from '@endo/errors'; import { makePromiseKit } from '@endo/promise-kit'; import type { KernelDatabase } from '@metamask/kernel-store'; -import type { StoreContext } from '../types.ts'; +import type { CrankBufferItem, StoreContext } from '../types.ts'; /** * Get the crank methods. @@ -43,6 +43,7 @@ export function getCrankMethods(ctx: StoreContext, kdb: KernelDatabase) { */ function rollbackCrank(savepoint: string): void { ctx.inCrank || Fail`rollbackCrank outside of crank`; + ctx.crankBuffer.length = 0; // Discard buffered outputs for (const ordinal of ctx.savepoints.keys()) { if (ctx.savepoints[ordinal] === savepoint) { kdb.rollbackSavepoint(`t${ordinal}`); @@ -85,6 +86,26 @@ export function getCrankMethods(ctx: StoreContext, kdb: KernelDatabase) { : Promise.resolve(); } + /** + * Buffer a vat output for delivery upon crank completion. + * + * @param item - The item to buffer. + */ + function bufferCrankOutput(item: CrankBufferItem): void { + ctx.crankBuffer.push(item); + } + + /** + * Flush the crank buffer, returning all buffered items. + * + * @returns The buffered items. + */ + function flushCrankBuffer(): CrankBufferItem[] { + const items = ctx.crankBuffer; + ctx.crankBuffer = []; + return items; + } + return { startCrank, createCrankSavepoint, @@ -92,5 +113,7 @@ export function getCrankMethods(ctx: StoreContext, kdb: KernelDatabase) { endCrank, releaseAllSavepoints, waitForCrank, + bufferCrankOutput, + flushCrankBuffer, }; } diff --git a/packages/ocap-kernel/src/store/methods/promise.test.ts b/packages/ocap-kernel/src/store/methods/promise.test.ts index f738797e8..21d9d26e1 100644 --- a/packages/ocap-kernel/src/store/methods/promise.test.ts +++ b/packages/ocap-kernel/src/store/methods/promise.test.ts @@ -298,7 +298,7 @@ describe('promise store methods', () => { }); describe('resolveKernelPromise', () => { - it('fulfills a promise and enqueues pending messages', () => { + it('fulfills a promise and returns queued messages', () => { const kpid = 'kp123'; const value: CapData = { body: 'someValue', @@ -316,20 +316,19 @@ describe('promise store methods', () => { .mockReturnValueOnce(message2) .mockReturnValueOnce(undefined); - promiseMethods.resolveKernelPromise(kpid, false, value); + const queuedMessages = promiseMethods.resolveKernelPromise( + kpid, + false, + value, + ); expect(mockQueue.dequeue).toHaveBeenCalledTimes(3); - expect(mockEnqueueRun).toHaveBeenCalledTimes(2); - expect(mockEnqueueRun).toHaveBeenNthCalledWith(1, { - type: 'send', - target: kpid, - message: message1, - }); - expect(mockEnqueueRun).toHaveBeenNthCalledWith(2, { - type: 'send', - target: kpid, - message: message2, - }); + // Now returns messages instead of calling enqueueRun + expect(queuedMessages).toStrictEqual([ + [kpid, message1], + [kpid, message2], + ]); + expect(mockEnqueueRun).not.toHaveBeenCalled(); expect(mockKV.get(`${kpid}.state`)).toBe('fulfilled'); expect(mockKV.get(`${kpid}.value`)).toBe(JSON.stringify(value)); diff --git a/packages/ocap-kernel/src/store/methods/promise.ts b/packages/ocap-kernel/src/store/methods/promise.ts index 48044a887..92261c375 100644 --- a/packages/ocap-kernel/src/store/methods/promise.ts +++ b/packages/ocap-kernel/src/store/methods/promise.ts @@ -2,13 +2,11 @@ import { Fail } from '@endo/errors'; import type { CapData } from '@endo/marshal'; import { getBaseMethods } from './base.ts'; -import { getQueueMethods } from './queue.ts'; import type { KRef, KernelPromise, Message, PromiseState, - RunQueueItemSend, EndpointId, } from '../../types.ts'; import { insistEndpointId } from '../../types.ts'; @@ -29,7 +27,6 @@ import { isPromiseRef } from '../utils/promise-ref.ts'; export function getPromiseMethods(ctx: StoreContext) { const { incCounter, provideStoredQueue, getPrefixedKeys, refCountKey } = getBaseMethods(ctx.kv); - const { enqueueRun } = getQueueMethods(ctx); const { decrementRefCount } = getRefCountMethods(ctx); /** @@ -145,25 +142,25 @@ export function getPromiseMethods(ctx: StoreContext) { } /** - * Record the resolution of a kernel promise. + * Resolve a kernel promise, updating its state and returning any queued messages. + * The queued messages should be buffered by the caller for eventual delivery. * * @param kpid - The ref of the promise being resolved. * @param rejected - True if the promise is being rejected, false if fulfilled. * @param value - The value the promise is being fulfilled to or rejected with. + * @returns An array of queued messages that were waiting on this promise, + * each as a tuple of [target, message]. */ function resolveKernelPromise( kpid: KRef, rejected: boolean, value: CapData, - ): void { + ): [KRef, Message][] { const queue = provideStoredQueue(kpid, false); + // Collect messages that were queued on this promise + const queuedMessages: [KRef, Message][] = []; for (const message of getKernelPromiseMessageQueue(kpid)) { - const messageItem: RunQueueItemSend = { - type: 'send', - target: kpid, - message, - }; - enqueueRun(messageItem); + queuedMessages.push([kpid, message]); } ctx.kv.set(`${kpid}.state`, rejected ? 'rejected' : 'fulfilled'); ctx.kv.set(`${kpid}.value`, JSON.stringify(value)); @@ -172,6 +169,7 @@ export function getPromiseMethods(ctx: StoreContext) { // Drop the baseline "decider" refcount now that the promise is settled. decrementRefCount(kpid, 'resolve|decider'); queue.delete(); + return queuedMessages; } /** diff --git a/packages/ocap-kernel/src/store/types.ts b/packages/ocap-kernel/src/store/types.ts index 89c274858..7c35f8f10 100644 --- a/packages/ocap-kernel/src/store/types.ts +++ b/packages/ocap-kernel/src/store/types.ts @@ -1,7 +1,9 @@ import type { KVStore } from '@metamask/kernel-store'; import type { Logger } from '@metamask/logger'; -import type { KRef } from '../types.ts'; +import type { KRef, RunQueueItemNotify, RunQueueItemSend } from '../types.ts'; + +export type CrankBufferItem = RunQueueItemSend | RunQueueItemNotify; export type StoreContext = { kv: KVStore; @@ -19,6 +21,7 @@ export type StoreContext = { crankSettled?: Promise; resolveCrank?: (() => void) | undefined; savepoints: string[]; + crankBuffer: CrankBufferItem[]; // Buffer for sends and notifications during crank subclusters: StoredValue; // Holds Subcluster[] nextSubclusterId: StoredValue; // Holds string vatToSubclusterMap: StoredValue; // Holds Record diff --git a/packages/ocap-kernel/src/vats/VatSyscall.test.ts b/packages/ocap-kernel/src/vats/VatSyscall.test.ts index 06f0fb02c..a93a361de 100644 --- a/packages/ocap-kernel/src/vats/VatSyscall.test.ts +++ b/packages/ocap-kernel/src/vats/VatSyscall.test.ts @@ -43,21 +43,27 @@ describe('VatSyscall', () => { vatSys = new VatSyscall({ vatId: 'v1', kernelQueue, kernelStore, logger }); }); - it('enqueues run for send syscall', async () => { + it('buffers send syscall for crank completion', async () => { const target = 'o+1'; - const message = {} as unknown as Message; + const message = { methargs: { body: '', slots: [] } } as unknown as Message; const vso = ['send', target, message] as unknown as VatSyscallObject; vatSys.handleSyscall(vso); - expect(kernelQueue.enqueueSend).toHaveBeenCalledWith(target, message); + expect(kernelQueue.enqueueSend).toHaveBeenCalledWith( + target, + message, + false, + ); }); it('calls resolvePromises for resolve syscall', async () => { const resolution = ['kp1', false, {}] as unknown as VatOneResolution; const vso = ['resolve', [resolution]] as unknown as VatSyscallObject; vatSys.handleSyscall(vso); - expect(kernelQueue.resolvePromises).toHaveBeenCalledWith('v1', [ - resolution, - ]); + expect(kernelQueue.resolvePromises).toHaveBeenCalledWith( + 'v1', + [resolution], + false, + ); }); describe('subscribe syscall', () => { @@ -83,7 +89,11 @@ describe('VatSyscall', () => { }); const vso = ['subscribe', 'kp1'] as unknown as VatSyscallObject; vatSys.handleSyscall(vso); - expect(kernelQueue.enqueueNotify).toHaveBeenCalledWith('v1', 'kp1'); + expect(kernelQueue.enqueueNotify).toHaveBeenCalledWith( + 'v1', + 'kp1', + false, + ); }); }); diff --git a/packages/ocap-kernel/src/vats/VatSyscall.ts b/packages/ocap-kernel/src/vats/VatSyscall.ts index e77cbc87d..014018c85 100644 --- a/packages/ocap-kernel/src/vats/VatSyscall.ts +++ b/packages/ocap-kernel/src/vats/VatSyscall.ts @@ -71,13 +71,14 @@ export class VatSyscall { } /** - * Handle a 'send' syscall from the vat. + * Handle a 'send' syscall from the vat. The send is buffered and will be + * flushed to the run queue on successful crank completion. * * @param target - The target of the message send. * @param message - The message that was sent. */ #handleSyscallSend(target: KRef, message: Message): void { - this.#kernelQueue.enqueueSend(target, message); + this.#kernelQueue.enqueueSend(target, message, false); } /** @@ -86,7 +87,7 @@ export class VatSyscall { * @param resolutions - One or more promise resolutions. */ #handleSyscallResolve(resolutions: VatOneResolution[]): void { - this.#kernelQueue.resolvePromises(this.vatId, resolutions); + this.#kernelQueue.resolvePromises(this.vatId, resolutions, false); } /** @@ -99,7 +100,7 @@ export class VatSyscall { if (kp.state === 'unresolved') { this.#kernelStore.addPromiseSubscriber(this.vatId, kpid); } else { - this.#kernelQueue.enqueueNotify(this.vatId, kpid); + this.#kernelQueue.enqueueNotify(this.vatId, kpid, false); } }