Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 94 additions & 57 deletions docs/ken-protocol-assessment.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,51 @@ Key aspects:
| Retransmission | ✓ | Timeout-based retransmit until ACK or max retries |
| Crash-safe persistence | ✓ | Write message first, then update nextSendSeq |
| Local recovery | ✓ | Restore seq state, restart ACK timeout |
| **Transactional turns** | ✓ | Crank buffering defers outputs until crank commit |
| **Deferred transmission** | **Partial** | Buffered within kernel, but RemoteHandle transmits immediately on flush |

### What We're Missing or Differs
### Recent Improvements: Crank Buffering

#### 1. Transactional Turns (Major Gap)
The crank buffering feature (issue #786) significantly improves our alignment with Ken:

**Our new crank model:**
```
crank_start(deliver one item from run queue)
→ create database savepoint
→ vat processes message
→ vat syscalls buffer outputs (sends, notifications) in CrankBuffer
crank_end:
→ if success: atomically flush buffer to run queue + commit state
→ if failure: rollback to savepoint, discard buffer
```

This achieves Ken's core property that **outputs are only externalized after successful turn completion**. Within the kernel:
- `enqueueSend(target, message, immediate=false)` buffers sends
- `enqueueNotify(endpoint, kpid, immediate=false)` buffers notifications
- `resolvePromises(endpoint, resolutions, immediate=false)` buffers all resolution effects
- On successful crank: `#flushCrankBuffer()` moves items to run queue
- On rollback: buffer is discarded along with database changes

### What We're Still Missing or Differs

#### 1. Deferred Network Transmission (Gap)

**Ken's model:**
```
turn_start(deliver one message)
→ processing_function executes
→ outputs buffered in Q_out
turn_end:
→ atomically persist(turn, app_state, Q_out, Done)
→ THEN transmit buffered messages
persist checkpoint → THEN transmit to network
```

**Our model:**
```
message receivedkernel processes → sends outputs immediately
each output: persist message → update seq → transmit
crank completesflush to run queue → eventually delivered to RemoteHandle
RemoteHandle: persist message → transmit immediately
```

The kernel's "crank" mechanism may provide turn-like boundaries, but `RemoteHandle` doesn't coordinate with it. Messages are transmitted immediately after being persisted, not deferred until end of turn.
While crank outputs are now buffered until crank commit, when a message reaches `RemoteHandle` for remote transmission, it is persisted and transmitted in quick succession. A crash between persist and transmit could result in the message being retransmitted on recovery (which is fine due to idempotency), but more critically, there's no coordination ensuring the kernel's crank commit happens before network transmission.

**Impact**: If RemoteHandle transmits a message and then the kernel crashes before its crank fully commits, the remote has received a message that the local kernel will "forget" on recovery. This violates output validity.

**Mitigation needed**: RemoteHandle should only transmit messages after the originating crank has been fully committed. This requires coordination between the kernel's crank lifecycle and RemoteHandle's transmission timing.

#### 2. Done Table / Duplicate Detection (Gap)

Expand All @@ -77,58 +100,63 @@ Ken maintains a `Done` table ensuring:

We track `highestReceivedSeq` but only for ACK purposes. We don't have explicit duplicate detection for incoming messages. If the remote retransmits a message we already processed (but before we ACKed), we could deliver it twice.

#### 3. Output Validity (Partial)
#### 3. Output Validity (Improved, but Partial)

Ken guarantees outputs could have resulted from failure-free execution because:
- Outputs are buffered during a turn
- A crash during processing loses all outputs from that turn
- Only committed outputs escape to the outside world

Our system transmits immediately after persisting, so a crash mid-crank could result in:
- Some messages transmitted to remote
- But kernel state not yet committed
- On recovery, kernel re-executes and sends different/duplicate messages
**Improvement**: With crank buffering, kernel-internal outputs (sends to local vats, notifications) are now properly buffered and discarded on rollback. A crash mid-crank no longer results in partial kernel state.

#### 4. Atomic Checkpoint (Gap)

Ken atomically checkpoints `(turn, app_state, Q_out, Done)` together at end of turn.
**Remaining gap**: For remote messages, the gap described in #1 above means network transmissions could still escape before the crank is fully committed.

Our system persists messages individually as sent. There's no atomic boundary coordinating kernel state with outgoing message state.
#### 4. Atomic Checkpoint (Improved)

#### 5. Deferred Transmission (Gap)

**Ken:** `persist checkpoint → THEN transmit`
Ken atomically checkpoints `(turn, app_state, Q_out, Done)` together at end of turn.

**Ours:** `persist message → transmit immediately`
**Improvement**: The kernel now uses database savepoints to make crank state changes atomic. The `CrankBuffer` contents are flushed atomically with the crank commit.

Ken's approach ensures the "send" is recorded in checkpoint before any transmission. This is crucial for the consistent frontier property.
**Remaining gap**: RemoteHandle's message persistence is separate from the kernel's crank commit. These two persistence operations are not atomic with respect to each other.

#### 6. Input Queue Handling (Gap)
#### 5. FIFO Enforcement on Receive (Gap)

Ken can opportunistically persist incoming messages before delivery. On crash, the input queue is reconstructed from sender retransmissions.
Hold out-of-order messages until predecessors processed:
- Track expected next seq per sender
- Buffer messages that arrive out of order
- Deliver in sequence order only

We don't persist incoming messages. On crash, we rely entirely on senders to retransmit.
We don't currently enforce FIFO delivery order on the receive side.

### Summary Table

| Ken Property | Our System | Notes |
|--------------|------------|-------|
| Exactly-once delivery | **Partial** | At-least-once with no duplicate detection |
| Output validity | **No** | Immediate transmission, no turn boundaries |
| Transactional turns | **No** | No coordination with kernel cranks |
| Consistent frontier | **Partial** | No atomic checkpoint across kernel+remote state |
| Output validity | **Partial** | ✓ for kernel-internal, gap for remote transmission |
| Transactional turns | **Yes** | Crank buffering provides turn boundaries |
| Consistent frontier | **Partial** | Kernel state atomic, but not coordinated with RemoteHandle |
| Local recovery | **Yes** | Crashes don't affect other processes |
| Sender-based logging | **Yes** | Messages persisted until ACKed |
| Deferred transmission | **Partial** | ✓ within kernel, gap at network boundary |
| FIFO ordering | **Partial** | Per-sender seq, but no enforcement on receive side |

## What Would Be Needed to Achieve Ken Properties
## What Would Be Needed to Achieve Full Ken Properties

### 1. Coordinate RemoteHandle with Crank Commit (Critical)

### 1. Coordinate with Kernel Crank Boundaries
The most important remaining gap. Options:

Buffer outgoing messages during crank execution, persist and transmit only at crank commit. This would require:
- `RemoteHandle` to be aware of crank boundaries
- Outgoing messages buffered in memory during crank
- Batch persist + transmit at crank commit
**Option A: Two-phase approach**
- During crank: RemoteHandle persists message but does NOT transmit
- After crank commit: Signal RemoteHandle to transmit persisted messages
- Requires: Crank commit notification mechanism to RemoteHandle

**Option B: Defer to run queue delivery**
- RemoteHandle only transmits when it receives a "transmit" item from run queue
- Crank buffers "transmit" items along with other outputs
- Flush adds transmit items to run queue
- RemoteHandle processes transmit items after crank commit

### 2. Add Done Table

Expand All @@ -137,21 +165,7 @@ Track processed message IDs, deduplicate on receive:
- On receive, check if message already in `Done` before delivering
- ACK messages in `Done` without re-delivering

### 3. Atomic Checkpoint

Persist kernel state and output queue together:
- Single atomic write at end of crank
- Include: kernel state, outgoing messages, Done table updates
- Requires coordination between kernel store and remote message persistence

### 4. Defer Transmission

Transmit only after checkpoint completes:
- Buffer messages during turn
- After atomic checkpoint succeeds, release messages for transmission
- This ensures "send" is recorded before any transmission occurs

### 5. FIFO Enforcement on Receive
### 3. FIFO Enforcement on Receive

Hold out-of-order messages until predecessors processed:
- Track expected next seq per sender
Expand All @@ -160,18 +174,30 @@ Hold out-of-order messages until predecessors processed:

## Architectural Implications

The most significant change would be integrating `RemoteHandle` with the kernel's crank/commit cycle. Currently:
The crank buffering work has brought us significantly closer to Ken's model:

**Before crank buffering:**
```
Kernel Crank:
process message → syscalls may send to remote
process message → syscalls immediately enqueue to run queue

RemoteHandle (independent):
persist each outgoing message → transmit immediately
```

Ken-style architecture:
**After crank buffering:**
```
Kernel Crank:
process message → syscalls buffer outputs

Crank Commit (atomic):
persist(kernel_state) + flush(buffered_outputs to run queue)

RemoteHandle (still independent):
receive from run queue → persist → transmit immediately
```

**Ken-style architecture (goal):**
```
Kernel Crank:
process message → syscalls buffer outputs
Expand All @@ -180,10 +206,21 @@ Crank Commit (atomic):
persist(kernel_state, buffered_outputs, done_table)

Post-Commit:
transmit buffered outputs
signal RemoteHandle to transmit persisted messages
```

This would require the kernel to control when `RemoteHandle` actually transmits, rather than `RemoteHandle` transmitting independently.
The key remaining work is ensuring that network transmission only happens after the crank that produced the message has been fully committed.

## Progress Summary

| Area | Before | After Crank Buffering |
|------|--------|----------------------|
| Kernel-internal output buffering | No | **Yes** |
| Rollback discards uncommitted outputs | No | **Yes** |
| Atomic kernel state + output queue | No | **Yes** |
| Network transmission deferred to commit | No | No (still needed) |
| Done table for deduplication | No | No (still needed) |
| FIFO enforcement on receive | No | No (still needed) |

## References

Expand Down
63 changes: 38 additions & 25 deletions packages/ocap-kernel/src/KernelQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ describe('KernelQueue', () => {
initKernelPromise: vi.fn().mockReturnValue(['kp1']),
incrementRefCount: vi.fn(),
getKernelPromise: vi.fn(),
resolveKernelPromise: vi.fn(),
resolveKernelPromise: vi.fn().mockReturnValue([]),
nextReapAction: vi.fn().mockReturnValue(null),
getGCActions: vi.fn().mockReturnValue([]),
startCrank: vi.fn(),
endCrank: vi.fn(),
createCrankSavepoint: vi.fn(),
rollbackCrank: vi.fn(),
waitForCrank: vi.fn(),
// Crank buffer methods
bufferCrankOutput: vi.fn(),
flushCrankBuffer: vi.fn().mockReturnValue([]),
} as unknown as KernelStore;

kernelQueue = new KernelQueue(kernelStore, terminateVat);
Expand Down Expand Up @@ -244,24 +247,24 @@ describe('KernelQueue', () => {
});

describe('enqueueNotify', () => {
it('creates a notify item and adds it to the run queue', () => {
it('enqueues a notify and increments refcount', () => {
const endpointId = 'v1';
const kpid = 'kp123';
kernelQueue.enqueueNotify(endpointId, kpid);
expect(kernelStore.incrementRefCount).toHaveBeenCalledWith(
kpid,
'notify',
);
expect(kernelStore.enqueueRun).toHaveBeenCalledWith({
type: 'notify',
endpointId,
kpid,
});
expect(kernelStore.incrementRefCount).toHaveBeenCalledWith(
kpid,
'notify',
);
});
});

describe('resolvePromises', () => {
it('resolves kernel promises and notifies subscribers', () => {
it('resolves kernel promises and buffers notifications for subscribers', () => {
const endpointId = 'v1';
const kpid = 'kp123';
const resolution: VatOneResolution = [
Expand All @@ -278,7 +281,7 @@ describe('KernelQueue', () => {
);
const resolveHandler = vi.fn();
kernelQueue.subscriptions.set(kpid, resolveHandler);
kernelQueue.resolvePromises(endpointId, [resolution]);
kernelQueue.resolvePromises(endpointId, [resolution], false);
expect(kernelStore.incrementRefCount).toHaveBeenCalledWith(
kpid,
'resolve|kpid',
Expand All @@ -287,12 +290,17 @@ describe('KernelQueue', () => {
'slot1',
'resolve|slot',
);
expect(kernelStore.enqueueRun).toHaveBeenCalledWith({
// Notifications are buffered with refcount increments
expect(kernelStore.incrementRefCount).toHaveBeenCalledWith(
kpid,
'notify',
);
expect(kernelStore.bufferCrankOutput).toHaveBeenCalledWith({
type: 'notify',
endpointId: 'v2',
kpid,
});
expect(kernelStore.enqueueRun).toHaveBeenCalledWith({
expect(kernelStore.bufferCrankOutput).toHaveBeenCalledWith({
type: 'notify',
endpointId: 'v3',
kpid,
Expand All @@ -302,11 +310,10 @@ describe('KernelQueue', () => {
false,
{ body: 'resolved value', slots: ['slot1'] },
);
expect(resolveHandler).toHaveBeenCalledWith({
body: 'resolved value',
slots: ['slot1'],
});
expect(kernelQueue.subscriptions.has(kpid)).toBe(false);
// Kernel subscription callback is NOT called immediately - deferred to flush
expect(resolveHandler).not.toHaveBeenCalled();
// Subscription is still registered, will be invoked during flush
expect(kernelQueue.subscriptions.has(kpid)).toBe(true);
});

it('handles resolutions with undefined vatId (kernel decider)', () => {
Expand All @@ -326,7 +333,7 @@ describe('KernelQueue', () => {
const resolveHandler = vi.fn();
kernelQueue.subscriptions.set(kpid, resolveHandler);
const insistEndpointIdSpy = vi.spyOn(types, 'insistEndpointId');
kernelQueue.resolvePromises(undefined, [resolution]);
kernelQueue.resolvePromises(undefined, [resolution], false);
expect(insistEndpointIdSpy).not.toHaveBeenCalled();
expect(kernelStore.incrementRefCount).toHaveBeenCalledWith(
kpid,
Expand All @@ -336,7 +343,12 @@ describe('KernelQueue', () => {
'slot1',
'resolve|slot',
);
expect(kernelStore.enqueueRun).toHaveBeenCalledWith({
// Notification is buffered with refcount increment
expect(kernelStore.incrementRefCount).toHaveBeenCalledWith(
kpid,
'notify',
);
expect(kernelStore.bufferCrankOutput).toHaveBeenCalledWith({
type: 'notify',
endpointId: 'v2',
kpid,
Expand All @@ -346,8 +358,9 @@ describe('KernelQueue', () => {
false,
resolution[2],
);
expect(resolveHandler).toHaveBeenCalledWith(resolution[2]);
expect(kernelQueue.subscriptions.has(kpid)).toBe(false);
// Kernel subscription callback is NOT called immediately - deferred to flush
expect(resolveHandler).not.toHaveBeenCalled();
expect(kernelQueue.subscriptions.has(kpid)).toBe(true);
insistEndpointIdSpy.mockRestore();
});

Expand All @@ -368,17 +381,17 @@ describe('KernelQueue', () => {
);
const resolveHandler = vi.fn();
kernelQueue.subscriptions.set(kpid, resolveHandler);
kernelQueue.resolvePromises(endpointId, [resolution]);
expect(kernelStore.enqueueRun).not.toHaveBeenCalledWith(
expect.objectContaining({ type: 'notify' }),
);
kernelQueue.resolvePromises(endpointId, [resolution], false);
// No notifications buffered because no subscribers
expect(kernelStore.bufferCrankOutput).not.toHaveBeenCalled();
expect(kernelStore.resolveKernelPromise).toHaveBeenCalledWith(
kpid,
false,
resolution[2],
);
expect(resolveHandler).toHaveBeenCalledWith(resolution[2]);
expect(kernelQueue.subscriptions.has(kpid)).toBe(false);
// Kernel subscription callback is NOT called immediately - deferred to flush
expect(resolveHandler).not.toHaveBeenCalled();
expect(kernelQueue.subscriptions.has(kpid)).toBe(true);
});

it('throws error if a promise is already resolved', () => {
Expand Down
Loading
Loading