diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts index 83d0382a2..165d93a62 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts @@ -194,6 +194,7 @@ export class PlatformServicesClient implements PlatformServices { * @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200). * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. + * @param incarnationId - Unique identifier for this kernel instance. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -202,6 +203,7 @@ export class PlatformServicesClient implements PlatformServices { options: RemoteCommsOptions, remoteMessageHandler: (from: string, message: string) => Promise, onRemoteGiveUp?: (peerId: string) => void, + incarnationId?: string, ): Promise { this.#remoteMessageHandler = remoteMessageHandler; this.#remoteGiveUpHandler = onRemoteGiveUp; @@ -210,6 +212,7 @@ export class PlatformServicesClient implements PlatformServices { ...Object.fromEntries( Object.entries(options).filter(([, value]) => value !== undefined), ), + ...(incarnationId !== undefined && { incarnationId }), }); } diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index 2df5f9b83..8336102ce 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -406,6 +406,7 @@ describe('PlatformServicesServer', () => { { relays }, expect.any(Function), expect.any(Function), + undefined, ); }); @@ -428,6 +429,7 @@ describe('PlatformServicesServer', () => { options, expect.any(Function), expect.any(Function), + undefined, ); }); diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts index 75bb13f93..cf0bdea7f 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts @@ -270,14 +270,13 @@ export class PlatformServicesServer { * connections from other kernels. * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). * @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200). - * @param _onRemoteGiveUp - Unused parameter (kept for interface compatibility). - * Remote give-up notifications are sent via RPC instead. + * @param incarnationId - This kernel's incarnation ID for handshake protocol. * @returns A promise that resolves when network access has been initialized. */ async #initializeRemoteComms( keySeed: string, options: RemoteCommsOptions, - _onRemoteGiveUp?: (peerId: string) => void, + incarnationId?: string, ): Promise { if (this.#sendRemoteMessageFunc) { throw Error('remote comms already initialized'); @@ -293,6 +292,7 @@ export class PlatformServicesServer { options, this.#handleRemoteMessage.bind(this), this.#handleRemoteGiveUp.bind(this), + incarnationId, ); this.#sendRemoteMessageFunc = sendRemoteMessage; this.#stopRemoteCommsFunc = stop; diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 37da6c2df..4ff51fc81 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -4,6 +4,7 @@ import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel'; import type { KRef } from '@metamask/ocap-kernel'; import { startRelay } from '@ocap/cli/relay'; import { delay } from '@ocap/repo-tools/test-utils'; +import { unlink } from 'node:fs/promises'; import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { makeTestKernel, runTestVats } from '../helpers/kernel.ts'; @@ -865,6 +866,72 @@ describe.sequential('Remote Communications E2E', () => { ); }); + describe('Incarnation Detection', () => { + it( + 'detects incarnation change when peer restarts with fresh state', + async () => { + // Initialize with low retry attempts to trigger give-up on incarnation change + await kernel1.initRemoteComms({ + relays: testRelays, + maxRetryAttempts: 2, + }); + await kernel2.initRemoteComms({ relays: testRelays }); + + const aliceConfig = makeRemoteVatConfig('Alice'); + const bobConfig = makeRemoteVatConfig('Bob'); + + await launchVatAndGetURL(kernel1, aliceConfig); + const bobURL = await launchVatAndGetURL(kernel2, bobConfig); + const aliceRef = getVatRootRef(kernel1, kernelStore1, 'Alice'); + + // Establish connection and exchange handshakes + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + + // Stop kernel2 + await kernel2.stop(); + + // Simulate state loss by deleting kernel2's database and creating fresh + kernelDatabase2.close(); + const freshDb = await makeSQLKernelDatabase({ + dbFilename: 'rc-e2e-test-kernel2-fresh.db', + }); + + try { + // Create a completely new kernel (new incarnation ID, no previous state) + // eslint-disable-next-line require-atomic-updates + kernel2 = await makeTestKernel(freshDb, true); + await kernel2.initRemoteComms({ relays: testRelays }); + + // Launch Bob again (fresh vat, no previous state) + await launchVatAndGetURL(kernel2, bobConfig); + + // Send a message - when the new kernel connects, it will have a different + // incarnation ID than before. The handshake will detect this change + // and trigger promise rejection for pending work. + // The await will naturally wait for the promise to settle - either + // succeeding (unexpected) or failing due to incarnation change detection. + const result = await kernel1.queueMessage( + aliceRef, + 'sendRemoteMessage', + [bobURL, 'hello', ['Alice']], + ); + const response = kunser(result); + + // The message should fail because incarnation changed + // Either due to promise rejection or remote state being lost + expect(response).toBeInstanceOf(Error); + } finally { + freshDb.close(); + // Clean up the fresh database file + await unlink('rc-e2e-test-kernel2-fresh.db').catch(() => { + // Ignore errors if file doesn't exist + }); + } + }, + NETWORK_TIMEOUT * 3, + ); + }); + describe('Promise Rejection on Remote Give-Up', () => { it( 'rejects promises when remote connection is lost after max retries', diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index e759f5001..c9157c8c4 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -106,15 +106,11 @@ export class Kernel { } if (options.resetStorage) { - this.#resetKernelState(); // If mnemonic is provided with resetStorage, also clear identity // to allow recovery with the new mnemonic - if (options.mnemonic) { - this.#kernelStore.kv.delete('keySeed'); - this.#kernelStore.kv.delete('peerId'); - this.#kernelStore.kv.delete('ocapURLKey'); - } + this.#resetKernelState({ resetIdentity: Boolean(options.mnemonic) }); } + this.#kernelQueue = new KernelQueue( this.#kernelStore, async (vatId, reason) => this.#vatManager.terminateVat(vatId, reason), @@ -516,12 +512,22 @@ export class Kernel { /** * Reset the kernel state. - */ - #resetKernelState(): void { - this.#kernelStore.reset({ - // XXX special case hack so that network address survives restart when testing - except: ['keySeed', 'peerId', 'ocapURLKey'], - }); + * + * @param options - Options for the reset. + * @param options.resetIdentity - If true, also clears identity keys (keySeed, peerId, ocapURLKey, incarnationId). + */ + #resetKernelState({ + resetIdentity = false, + }: { resetIdentity?: boolean } = {}): void { + if (resetIdentity) { + // Full reset including identity - used when recovering from mnemonic + this.#kernelStore.reset(); + } else { + // Preserve identity keys so network address survives restart + this.#kernelStore.reset({ + except: ['keySeed', 'peerId', 'ocapURLKey', 'incarnationId'], + }); + } } /** diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index 32e1a91e8..3118c055d 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -80,6 +80,7 @@ describe('RemoteManager', () => { logger, undefined, expect.any(Function), + kernelStore.getOrCreateIncarnationId(), ); }); @@ -102,10 +103,12 @@ describe('RemoteManager', () => { relays: ['relay1', 'relay2'], maxRetryAttempts: 5, maxQueue: 100, + mnemonic: undefined, }, logger, undefined, expect.any(Function), + kernelStore.getOrCreateIncarnationId(), ); }); @@ -133,6 +136,7 @@ describe('RemoteManager', () => { logger, keySeed, expect.any(Function), + kernelStore.getOrCreateIncarnationId(), ); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index 4c1f10bad..f6331fe6d 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -50,6 +50,12 @@ export class RemoteManager { /** Optional mnemonic for seed derivation */ readonly #mnemonic: string | undefined; + /** + * Unique identifier for this kernel instance. + * Used to detect when a remote peer has lost its state and reconnected. + */ + readonly #incarnationId: string; + /** Remote communications interface */ #remoteComms: RemoteComms | undefined; @@ -81,6 +87,8 @@ export class RemoteManager { this.#logger = logger; this.#keySeed = keySeed; this.#mnemonic = mnemonic; + // Get incarnation ID from store - it's persisted so it survives restarts + this.#incarnationId = kernelStore.getOrCreateIncarnationId(); } /** @@ -153,6 +161,7 @@ export class RemoteManager { this.#logger, this.#keySeed, this.#handleRemoteGiveUp.bind(this), + this.#incarnationId, ); // Restore all remotes that were previously established diff --git a/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts b/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts index 5207ff8b0..724e36748 100644 --- a/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts @@ -143,6 +143,7 @@ describe('remote-comms', () => { }), mockRemoteMessageHandler, undefined, // onRemoteGiveUp + undefined, // incarnationId ); }); @@ -165,6 +166,7 @@ describe('remote-comms', () => { }), mockRemoteMessageHandler, undefined, + undefined, // incarnationId ); }); @@ -184,6 +186,28 @@ describe('remote-comms', () => { expect.any(Object), mockRemoteMessageHandler, onRemoteGiveUp, + undefined, // incarnationId + ); + }); + + it('passes incarnationId to platformServices', async () => { + const incarnationId = 'test-incarnation-id'; + await initRemoteComms( + mockKernelStore, + mockPlatformServices, + mockRemoteMessageHandler, + {}, + undefined, // logger + undefined, // keySeed + undefined, // onRemoteGiveUp + incarnationId, + ); + expect(mockPlatformServices.initializeRemoteComms).toHaveBeenCalledWith( + expect.any(String), + expect.any(Object), + mockRemoteMessageHandler, + undefined, + incarnationId, ); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts b/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts index 083823aa7..fe952c195 100644 --- a/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts +++ b/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts @@ -104,6 +104,7 @@ export function getKnownRelays(kv: KVStore): string[] { * @param logger - The logger to use. * @param keySeed - Optional seed for libp2p key generation. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. + * @param incarnationId - Unique identifier for this kernel instance. * * @returns the initialized remote comms object. */ @@ -115,6 +116,7 @@ export async function initRemoteComms( logger?: Logger, keySeed?: string, onRemoteGiveUp?: OnRemoteGiveUp, + incarnationId?: string, ): Promise { let peerId: string; let ocapURLKey: Uint8Array; @@ -167,6 +169,7 @@ export async function initRemoteComms( { ...options, relays: knownRelays }, remoteMessageHandler, onRemoteGiveUp, + incarnationId, ); /** diff --git a/packages/ocap-kernel/src/remotes/platform/handshake.test.ts b/packages/ocap-kernel/src/remotes/platform/handshake.test.ts new file mode 100644 index 000000000..f69c52821 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/handshake.test.ts @@ -0,0 +1,306 @@ +import { Logger } from '@metamask/logger'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +import { + isHandshakeMessage, + performInboundHandshake, + performOutboundHandshake, +} from './handshake.ts'; +import type { HandshakeDeps, HandshakeMessage } from './handshake.ts'; +import type { Channel } from '../types.ts'; + +describe('handshake', () => { + describe('isHandshakeMessage', () => { + it('returns true for handshake message', () => { + const message: HandshakeMessage = { + method: 'handshake', + params: { incarnationId: 'test-id' }, + }; + expect(isHandshakeMessage(message)).toBe(true); + }); + + it('returns true for handshakeAck message', () => { + const message: HandshakeMessage = { + method: 'handshakeAck', + params: { incarnationId: 'test-id' }, + }; + expect(isHandshakeMessage(message)).toBe(true); + }); + + it('returns false for null', () => { + expect(isHandshakeMessage(null)).toBe(false); + }); + + it('returns false for non-object', () => { + expect(isHandshakeMessage('string')).toBe(false); + expect(isHandshakeMessage(123)).toBe(false); + expect(isHandshakeMessage(undefined)).toBe(false); + }); + + it('returns false for object with different method', () => { + expect(isHandshakeMessage({ method: 'delivery' })).toBe(false); + expect(isHandshakeMessage({ method: 'other' })).toBe(false); + }); + + it('returns false for object without method', () => { + expect(isHandshakeMessage({ params: {} })).toBe(false); + }); + + it('returns false for handshake message without params', () => { + expect(isHandshakeMessage({ method: 'handshake' })).toBe(false); + expect(isHandshakeMessage({ method: 'handshakeAck' })).toBe(false); + }); + + it('returns false for handshake message with non-object params', () => { + expect(isHandshakeMessage({ method: 'handshake', params: null })).toBe( + false, + ); + expect( + isHandshakeMessage({ method: 'handshake', params: 'string' }), + ).toBe(false); + }); + + it('returns false for handshake message without incarnationId', () => { + expect(isHandshakeMessage({ method: 'handshake', params: {} })).toBe( + false, + ); + expect( + isHandshakeMessage({ + method: 'handshakeAck', + params: { other: 'data' }, + }), + ).toBe(false); + }); + + it('returns false for handshake message with non-string incarnationId', () => { + expect( + isHandshakeMessage({ + method: 'handshake', + params: { incarnationId: 123 }, + }), + ).toBe(false); + expect( + isHandshakeMessage({ + method: 'handshakeAck', + params: { incarnationId: null }, + }), + ).toBe(false); + }); + }); + + describe('performOutboundHandshake', () => { + let mockChannel: Channel; + let mockDeps: HandshakeDeps; + let logger: Logger; + + beforeEach(() => { + logger = new Logger(); + vi.spyOn(logger, 'log'); + + mockChannel = { + peerId: 'test-peer-id', + msgStream: { + write: vi.fn().mockResolvedValue(undefined), + read: vi.fn(), + unwrap: vi.fn(), + }, + } as unknown as Channel; + + mockDeps = { + localIncarnationId: 'local-incarnation-123', + logger, + setRemoteIncarnation: vi.fn().mockReturnValue(false), + }; + }); + + it('sends handshake and waits for handshakeAck', async () => { + const handshakeAck = JSON.stringify({ + method: 'handshakeAck', + params: { incarnationId: 'remote-incarnation-456' }, + }); + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce({ + subarray: () => new TextEncoder().encode(handshakeAck), + }); + + const result = await performOutboundHandshake(mockChannel, mockDeps); + + // Verify handshake was sent + expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(1); + const writeCall = mockChannel.msgStream.write as ReturnType; + const sentData = new TextDecoder().decode( + writeCall.mock.calls[0][0] as Uint8Array, + ); + const sentMessage = JSON.parse(sentData); + expect(sentMessage).toStrictEqual({ + method: 'handshake', + params: { incarnationId: 'local-incarnation-123' }, + }); + + // Verify result + expect(result.remoteIncarnationId).toBe('remote-incarnation-456'); + expect(result.incarnationChanged).toBe(false); + + // Verify incarnation was set + expect(mockDeps.setRemoteIncarnation).toHaveBeenCalledWith( + 'test-peer-id', + 'remote-incarnation-456', + ); + }); + + it('returns incarnationChanged=true when incarnation changes', async () => { + const handshakeAck = JSON.stringify({ + method: 'handshakeAck', + params: { incarnationId: 'new-incarnation' }, + }); + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce({ + subarray: () => new TextEncoder().encode(handshakeAck), + }); + ( + mockDeps.setRemoteIncarnation as ReturnType + ).mockReturnValue(true); + + const result = await performOutboundHandshake(mockChannel, mockDeps); + + expect(result.incarnationChanged).toBe(true); + }); + + it('throws when response is not handshakeAck', async () => { + const wrongResponse = JSON.stringify({ + method: 'handshake', // Wrong! Should be handshakeAck + params: { incarnationId: 'remote-incarnation' }, + }); + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce({ + subarray: () => new TextEncoder().encode(wrongResponse), + }); + + await expect( + performOutboundHandshake(mockChannel, mockDeps), + ).rejects.toThrow('Expected handshakeAck'); + }); + + it('throws when response is not valid JSON', async () => { + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce({ + subarray: () => new TextEncoder().encode('not json'), + }); + + await expect( + performOutboundHandshake(mockChannel, mockDeps), + ).rejects.toThrow('JSON'); + }); + }); + + describe('performInboundHandshake', () => { + let mockChannel: Channel; + let mockDeps: HandshakeDeps; + let logger: Logger; + + beforeEach(() => { + logger = new Logger(); + vi.spyOn(logger, 'log'); + + mockChannel = { + peerId: 'test-peer-id', + msgStream: { + write: vi.fn().mockResolvedValue(undefined), + read: vi.fn(), + unwrap: vi.fn(), + }, + } as unknown as Channel; + + mockDeps = { + localIncarnationId: 'local-incarnation-123', + logger, + setRemoteIncarnation: vi.fn().mockReturnValue(false), + }; + }); + + it('waits for handshake and sends handshakeAck', async () => { + const handshake = JSON.stringify({ + method: 'handshake', + params: { incarnationId: 'remote-incarnation-456' }, + }); + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce({ + subarray: () => new TextEncoder().encode(handshake), + }); + + const result = await performInboundHandshake(mockChannel, mockDeps); + + // Verify handshakeAck was sent + expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(1); + const writeCall = mockChannel.msgStream.write as ReturnType; + const sentData = new TextDecoder().decode( + writeCall.mock.calls[0][0] as Uint8Array, + ); + const sentMessage = JSON.parse(sentData); + expect(sentMessage).toStrictEqual({ + method: 'handshakeAck', + params: { incarnationId: 'local-incarnation-123' }, + }); + + // Verify result + expect(result.remoteIncarnationId).toBe('remote-incarnation-456'); + expect(result.incarnationChanged).toBe(false); + + // Verify incarnation was set + expect(mockDeps.setRemoteIncarnation).toHaveBeenCalledWith( + 'test-peer-id', + 'remote-incarnation-456', + ); + }); + + it('returns incarnationChanged=true when incarnation changes', async () => { + const handshake = JSON.stringify({ + method: 'handshake', + params: { incarnationId: 'new-incarnation' }, + }); + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce({ + subarray: () => new TextEncoder().encode(handshake), + }); + ( + mockDeps.setRemoteIncarnation as ReturnType + ).mockReturnValue(true); + + const result = await performInboundHandshake(mockChannel, mockDeps); + + expect(result.incarnationChanged).toBe(true); + }); + + it('throws when first message is not handshake', async () => { + const wrongMessage = JSON.stringify({ + method: 'handshakeAck', // Wrong! Should be handshake + params: { incarnationId: 'remote-incarnation' }, + }); + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce({ + subarray: () => new TextEncoder().encode(wrongMessage), + }); + + await expect( + performInboundHandshake(mockChannel, mockDeps), + ).rejects.toThrow('Expected handshake'); + }); + + it('throws when channel closes during read', async () => { + vi.spyOn(mockChannel.msgStream, 'read') + .mockImplementation() + .mockResolvedValueOnce(undefined); + + await expect( + performInboundHandshake(mockChannel, mockDeps), + ).rejects.toThrow('Channel closed during handshake'); + }); + }); +}); diff --git a/packages/ocap-kernel/src/remotes/platform/handshake.ts b/packages/ocap-kernel/src/remotes/platform/handshake.ts new file mode 100644 index 000000000..6c7191d71 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/handshake.ts @@ -0,0 +1,206 @@ +import type { Logger } from '@metamask/logger'; +import { toString as bufToString, fromString } from 'uint8arrays'; + +import { writeWithTimeout } from './channel-utils.ts'; +import { DEFAULT_WRITE_TIMEOUT_MS } from './constants.ts'; +import type { Channel } from '../types.ts'; + +/** + * Handshake timeout in milliseconds. + */ +const HANDSHAKE_TIMEOUT_MS = 10_000; + +/** + * Type for handshake protocol messages exchanged during connection establishment. + */ +export type HandshakeMessage = + | { method: 'handshake'; params: { incarnationId: string } } + | { method: 'handshakeAck'; params: { incarnationId: string } }; + +/** + * Result of a handshake operation. + */ +export type HandshakeResult = { + /** The remote peer's incarnation ID. */ + remoteIncarnationId: string; + /** Whether the incarnation changed from a previously known value. */ + incarnationChanged: boolean; +}; + +/** + * Dependencies for the handshake handler. + */ +export type HandshakeDeps = { + /** This kernel's incarnation ID. */ + localIncarnationId: string; + /** Logger for diagnostic output. */ + logger: Logger; + /** Set the incarnation ID for a peer. Returns true if it changed. */ + setRemoteIncarnation: (peerId: string, incarnationId: string) => boolean; +}; + +/** + * Check if a parsed message is a handshake protocol message. + * + * @param parsed - The parsed message object. + * @returns True if this is a handshake or handshakeAck message. + */ +export function isHandshakeMessage( + parsed: unknown, +): parsed is HandshakeMessage { + if (typeof parsed !== 'object' || parsed === null) { + return false; + } + const candidate = parsed as Record; + if (candidate.method !== 'handshake' && candidate.method !== 'handshakeAck') { + return false; + } + // Validate params.incarnationId exists and is a string + const params = candidate.params as Record | undefined; + return typeof params?.incarnationId === 'string'; +} + +/** + * Read a message from a channel with timeout. + * + * @param channel - The channel to read from. + * @param timeoutMs - Timeout in milliseconds. + * @returns The message string. + */ +async function readWithTimeout( + channel: Channel, + timeoutMs: number, +): Promise { + const abortController = new AbortController(); + const timeoutId = setTimeout(() => { + abortController.abort(); + }, timeoutMs); + + // Create abort handler as named function so we can remove it in finally + let rejectTimeout: (error: Error) => void; + const abortHandler = (): void => { + rejectTimeout(new Error('Handshake read timed out')); + }; + + // Create a promise that rejects on timeout + const timeoutPromise = new Promise((_resolve, reject) => { + rejectTimeout = reject; + abortController.signal.addEventListener('abort', abortHandler); + }); + + const readPromise = (async () => { + const readBuf = await channel.msgStream.read(); + if (!readBuf) { + throw new Error('Channel closed during handshake'); + } + return bufToString(readBuf.subarray()); + })(); + + try { + return await Promise.race([readPromise, timeoutPromise]); + } finally { + clearTimeout(timeoutId); + abortController.signal.removeEventListener('abort', abortHandler); + } +} + +/** + * Perform handshake as the initiator (outbound connection). + * Sends handshake message and waits for handshakeAck. + * + * @param channel - The channel to perform handshake on. + * @param deps - Handshake dependencies. + * @returns The handshake result, or undefined if handshake is not configured. + */ +export async function performOutboundHandshake( + channel: Channel, + deps: HandshakeDeps, +): Promise { + const { localIncarnationId, logger, setRemoteIncarnation } = deps; + const { peerId } = channel; + const shortPeerId = peerId.slice(0, 8); + const shortIncarnation = localIncarnationId.slice(0, 8); + + // Send handshake + const handshakeMsg: HandshakeMessage = { + method: 'handshake', + params: { incarnationId: localIncarnationId }, + }; + logger.log( + `${shortPeerId}:: sending handshake with incarnation ${shortIncarnation}`, + ); + await writeWithTimeout( + channel, + fromString(JSON.stringify(handshakeMsg)), + DEFAULT_WRITE_TIMEOUT_MS, + ); + + // Wait for handshakeAck + logger.log(`${shortPeerId}:: waiting for handshakeAck`); + const response = await readWithTimeout(channel, HANDSHAKE_TIMEOUT_MS); + const parsed = JSON.parse(response); + + if (!isHandshakeMessage(parsed) || parsed.method !== 'handshakeAck') { + throw new Error( + `Expected handshakeAck, got: ${parsed?.method ?? 'unknown'}`, + ); + } + + const remoteIncarnationId = parsed.params.incarnationId; + logger.log( + `${shortPeerId}:: received handshakeAck with incarnation ${remoteIncarnationId.slice(0, 8)}`, + ); + + const incarnationChanged = setRemoteIncarnation(peerId, remoteIncarnationId); + + return { remoteIncarnationId, incarnationChanged }; +} + +/** + * Perform handshake as the responder (inbound connection). + * Waits for handshake message and sends handshakeAck. + * + * @param channel - The channel to perform handshake on. + * @param deps - Handshake dependencies. + * @returns The handshake result, or undefined if handshake is not configured. + */ +export async function performInboundHandshake( + channel: Channel, + deps: HandshakeDeps, +): Promise { + const { localIncarnationId, logger, setRemoteIncarnation } = deps; + const { peerId } = channel; + const shortPeerId = peerId.slice(0, 8); + + // Wait for handshake + logger.log(`${shortPeerId}:: waiting for handshake`); + const message = await readWithTimeout(channel, HANDSHAKE_TIMEOUT_MS); + const parsed = JSON.parse(message); + + if (!isHandshakeMessage(parsed) || parsed.method !== 'handshake') { + throw new Error(`Expected handshake, got: ${parsed?.method ?? 'unknown'}`); + } + + const remoteIncarnationId = parsed.params.incarnationId; + logger.log( + `${shortPeerId}:: received handshake with incarnation ${remoteIncarnationId.slice(0, 8)}`, + ); + + // Send handshakeAck + const ackMsg: HandshakeMessage = { + method: 'handshakeAck', + params: { incarnationId: localIncarnationId }, + }; + logger.log( + `${shortPeerId}:: sending handshakeAck with incarnation ${localIncarnationId.slice(0, 8)}`, + ); + await writeWithTimeout( + channel, + fromString(JSON.stringify(ackMsg)), + DEFAULT_WRITE_TIMEOUT_MS, + ); + + const incarnationChanged = setRemoteIncarnation(peerId, remoteIncarnationId); + + return { remoteIncarnationId, incarnationChanged }; +} diff --git a/packages/ocap-kernel/src/remotes/platform/peer-state-manager.test.ts b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.test.ts index 4f619ec98..a17e81a7b 100644 --- a/packages/ocap-kernel/src/remotes/platform/peer-state-manager.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.test.ts @@ -19,6 +19,7 @@ describe('PeerStateManager', () => { expect(state).toStrictEqual({ channel: undefined, locationHints: [], + remoteIncarnationId: undefined, }); }); @@ -151,6 +152,69 @@ describe('PeerStateManager', () => { }); }); + describe('incarnation tracking', () => { + describe('setRemoteIncarnation', () => { + it('returns false for first incarnation (not a change)', () => { + const changed = manager.setRemoteIncarnation('peer1', 'incarnation-1'); + + expect(changed).toBe(false); + }); + + it('returns false when setting same incarnation', () => { + manager.setRemoteIncarnation('peer1', 'incarnation-1'); + const changed = manager.setRemoteIncarnation('peer1', 'incarnation-1'); + + expect(changed).toBe(false); + }); + + it('returns true when incarnation changes', () => { + manager.setRemoteIncarnation('peer1', 'incarnation-1'); + const changed = manager.setRemoteIncarnation('peer1', 'incarnation-2'); + + expect(changed).toBe(true); + }); + + it('stores the new incarnation when changed', () => { + manager.setRemoteIncarnation('peer1', 'incarnation-1'); + manager.setRemoteIncarnation('peer1', 'incarnation-2'); + + expect(manager.getState('peer1').remoteIncarnationId).toBe( + 'incarnation-2', + ); + }); + + it('logs when first incarnation is received', () => { + manager.setRemoteIncarnation('peer1', 'incarnation-id-12345'); + + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('first incarnation ID received'), + ); + }); + + it('logs when incarnation changes', () => { + manager.setRemoteIncarnation('peer1', 'incarnation-1'); + mockLogger.log.mockClear(); + manager.setRemoteIncarnation('peer1', 'incarnation-2'); + + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('incarnation changed'), + ); + }); + + it('does not affect other peers', () => { + manager.setRemoteIncarnation('peer1', 'incarnation-1'); + manager.setRemoteIncarnation('peer2', 'incarnation-2'); + + expect(manager.getState('peer1').remoteIncarnationId).toBe( + 'incarnation-1', + ); + expect(manager.getState('peer2').remoteIncarnationId).toBe( + 'incarnation-2', + ); + }); + }); + }); + describe('addLocationHints', () => { it('adds hints to empty list', () => { manager.addLocationHints('peer1', ['hint1', 'hint2']); @@ -295,6 +359,7 @@ describe('PeerStateManager', () => { expect(states[0]).toStrictEqual({ channel: { peerId: 'peer1' }, locationHints: ['hint1'], + remoteIncarnationId: undefined, }); }); }); diff --git a/packages/ocap-kernel/src/remotes/platform/peer-state-manager.ts b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.ts index 8cf99520a..a1c0fd0fd 100644 --- a/packages/ocap-kernel/src/remotes/platform/peer-state-manager.ts +++ b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.ts @@ -9,6 +9,8 @@ import type { Channel } from '../types.ts'; export type PeerState = { channel: Channel | undefined; locationHints: string[]; + /** The incarnation ID of the remote peer, if known from handshake. */ + remoteIncarnationId: string | undefined; }; /** @@ -49,7 +51,11 @@ export class PeerStateManager { getState(peerId: string): PeerState { let state = this.#peerStates.get(peerId); if (!state) { - state = { channel: undefined, locationHints: [] }; + state = { + channel: undefined, + locationHints: [], + remoteIncarnationId: undefined, + }; this.#peerStates.set(peerId, state); // Initialize lastConnectionTime to enable stale peer cleanup // even for peers that never successfully connect @@ -112,6 +118,40 @@ export class PeerStateManager { this.#intentionallyClosed.delete(peerId); } + /** + * Set the remote incarnation ID for a peer. + * + * @param peerId - The peer ID. + * @param incarnationId - The remote incarnation ID. + * @returns True if the incarnation changed (was different from the previous value), + * false if this is the first incarnation or the same as before. + */ + setRemoteIncarnation(peerId: string, incarnationId: string): boolean { + const state = this.getState(peerId); + const previousIncarnation = state.remoteIncarnationId; + + // If no previous incarnation, this is the first connection - not a change + if (previousIncarnation === undefined) { + state.remoteIncarnationId = incarnationId; + this.#logger.log( + `${peerId.slice(0, 8)}:: first incarnation ID received: ${incarnationId.slice(0, 8)}`, + ); + return false; + } + + // If same incarnation, no change + if (previousIncarnation === incarnationId) { + return false; + } + + // Incarnation changed - update and return true + state.remoteIncarnationId = incarnationId; + this.#logger.log( + `${peerId.slice(0, 8)}:: incarnation changed from ${previousIncarnation.slice(0, 8)} to ${incarnationId.slice(0, 8)}`, + ); + return true; + } + /** * Register location hints for a peer. * diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index 3653992ee..310411f50 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -85,8 +85,9 @@ describe('reconnection-lifecycle', () => { reuseOrReturnChannel: vi.fn().mockResolvedValue(mockChannel), checkConnectionLimit: vi.fn(), checkConnectionRateLimit: vi.fn(), - closeChannel: vi.fn().mockResolvedValue(undefined), registerChannel: vi.fn(), + doOutboundHandshake: vi.fn().mockResolvedValue(true), + closeChannel: vi.fn().mockResolvedValue(undefined), } as unknown as ReconnectionLifecycleDeps; }); @@ -210,6 +211,47 @@ describe('reconnection-lifecycle', () => { ); }); + it('performs handshake before registering channel', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.doOutboundHandshake).toHaveBeenCalledWith(mockChannel); + // Verify handshake is called before registerChannel + const handshakeCallOrder = ( + deps.doOutboundHandshake as ReturnType + ).mock.invocationCallOrder[0]; + const registerCallOrder = ( + deps.registerChannel as ReturnType + ).mock.invocationCallOrder[0]; + expect(handshakeCallOrder).toBeLessThan(registerCallOrder as number); + }); + + it('closes channel and throws when handshake fails', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + (deps.doOutboundHandshake as ReturnType).mockResolvedValue( + false, + ); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.closeChannel).toHaveBeenCalledWith(mockChannel, 'peer1'); + expect(deps.registerChannel).not.toHaveBeenCalled(); + expect(deps.outputError).toHaveBeenCalledWith( + 'peer1', + expect.stringContaining('reconnection attempt'), + expect.any(Error), + ); + }); + it('resets backoff on successful reconnection', async () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index a780c51f9..de2abf161 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -31,12 +31,15 @@ export type ReconnectionLifecycleDeps = { ) => Promise; checkConnectionLimit: () => void; checkConnectionRateLimit: (peerId: string) => void; - closeChannel: (channel: Channel, peerId: string) => Promise; registerChannel: ( peerId: string, channel: Channel, errorContext?: string, ) => void; + /** Perform outbound handshake. Returns true if successful. */ + doOutboundHandshake: (channel: Channel) => Promise; + /** Close a channel. */ + closeChannel: (channel: Channel, peerId: string) => Promise; }; /** @@ -68,8 +71,9 @@ export function makeReconnectionLifecycle( reuseOrReturnChannel, checkConnectionLimit, checkConnectionRateLimit, - closeChannel, registerChannel, + doOutboundHandshake, + closeChannel, } = deps; /** @@ -201,6 +205,12 @@ export function makeReconnectionLifecycle( } throw error; } + // Perform handshake before registering the channel + const handshakeOk = await doOutboundHandshake(channel); + if (!handshakeOk) { + await closeChannel(channel, peerId); + throw new Error('Handshake failed during reconnection'); + } registerChannel(peerId, channel, 'reading channel to'); } diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index a96724427..2cd10cf35 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -1226,12 +1226,59 @@ describe('transport.initTransport', () => { }); }); - // TODO: This test needs to be rewritten to work with the ACK protocol - // The race condition being tested (inbound connection arriving during reconnection dial) - // interacts with the ACK protocol in complex ways that need careful analysis. - it.todo( - 'reuses existing channel when inbound connection arrives during reconnection dial', - ); + it('reuses existing channel when inbound connection arrives during reconnection dial', async () => { + // Capture inbound handler before init + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler) => { + inboundHandler = handler; + }, + ); + + // Create channels + const outboundChannel = createMockChannel('peer-1'); + const inboundChannel = createMockChannel('peer-1'); + + // Control when the dial completes + let resolveDial: ((channel: MockChannel) => void) | undefined; + const dialPromise = new Promise((resolve) => { + resolveDial = resolve; + }); + mockConnectionFactory.dialIdempotent.mockReturnValue(dialPromise); + + const { sendRemoteMessage } = await initTransport('0x1234', {}, vi.fn()); + + // Start sending a message - this will trigger the dial + const sendPromise = sendRemoteMessage('peer-1', makeTestMessage('hello')); + + // Verify dial was initiated + await vi.waitFor(() => { + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalled(); + }); + + // While the dial is pending, an inbound connection arrives from the same peer + inboundHandler?.(inboundChannel); + + // Now complete the dial - the outbound channel should be closed since inbound is already registered + resolveDial?.(outboundChannel); + + // Wait for the message to be sent + await sendPromise; + + // The outbound channel should have been closed since inbound connection was already established + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + outboundChannel, + 'peer-1', + ); + + // Verify that messages go through the inbound channel (which was registered first) + // or the outbound channel that was dialed - either is acceptable + // The important thing is that we don't have duplicate channels + const totalWrites = + inboundChannel.msgStream.write.mock.calls.length + + outboundChannel.msgStream.write.mock.calls.length; + expect(totalWrites).toBeGreaterThanOrEqual(1); + }); }); describe('error handling', () => { @@ -2047,23 +2094,21 @@ describe('transport.initTransport', () => { }); it('times out after 10 seconds when write hangs', async () => { - // Ensure isReconnecting returns false so we actually call writeWithTimeout - mockReconnectionManager.isReconnecting.mockReturnValue(false); - const mockChannel = createMockChannel('peer-1'); - // Make write hang indefinitely - return a new hanging promise each time - mockChannel.msgStream.write.mockReset(); + // Make write never resolve to simulate a hang mockChannel.msgStream.write.mockReturnValue( new Promise(() => { - // Never resolves - simulates hanging write + /* Never resolves */ }), ); mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); - let mockSignal: ReturnType | undefined; + // Track all created abort signals so we can trigger abort manually + const mockSignals: ReturnType[] = []; vi.spyOn(AbortSignal, 'timeout').mockImplementation((ms: number) => { - mockSignal = makeAbortSignalMock(ms); - return mockSignal; + const signal = makeAbortSignalMock(ms); + mockSignals.push(signal); + return signal; }); const { sendRemoteMessage } = await initTransport('0x1234', {}, vi.fn()); @@ -2073,23 +2118,18 @@ describe('transport.initTransport', () => { makeTestMessage('test message'), ); - // Wait for the promise to be set up and event listener registered - await new Promise((resolve) => queueMicrotask(() => resolve())); - - // Verify write was called (proves we're not returning early) - expect(mockChannel.msgStream.write).toHaveBeenCalled(); - - // Manually trigger the abort to simulate timeout - mockSignal?.abort(); + // Wait for the write to be initiated + await vi.waitFor(() => { + expect(mockChannel.msgStream.write).toHaveBeenCalled(); + }); - // Wait for the abort handler to execute - await new Promise((resolve) => queueMicrotask(() => resolve())); + // Manually trigger the abort on the signal to simulate timeout + for (const signal of mockSignals) { + signal.abort(); + } - // sendRemoteMessage throws on timeout + // sendRemoteMessage should reject with timeout error await expect(sendPromise).rejects.toThrow('Message send timed out'); - - // Verify that connection loss handling was triggered - expect(mockReconnectionManager.startReconnection).toHaveBeenCalled(); }); it('does not timeout if write completes before timeout', async () => { @@ -2113,23 +2153,21 @@ describe('transport.initTransport', () => { }); it('handles timeout errors and triggers connection loss handling', async () => { - // Ensure isReconnecting returns false so we actually call writeWithTimeout - mockReconnectionManager.isReconnecting.mockReturnValue(false); - const mockChannel = createMockChannel('peer-1'); - // Make write hang indefinitely - return a new hanging promise each time - mockChannel.msgStream.write.mockReset(); + // Make write never resolve to simulate a hang mockChannel.msgStream.write.mockReturnValue( new Promise(() => { - // Never resolves - simulates hanging write + /* Never resolves */ }), ); mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); - let mockSignal: ReturnType | undefined; + // Track all created abort signals so we can trigger abort manually + const mockSignals: ReturnType[] = []; vi.spyOn(AbortSignal, 'timeout').mockImplementation((ms: number) => { - mockSignal = makeAbortSignalMock(ms); - return mockSignal; + const signal = makeAbortSignalMock(ms); + mockSignals.push(signal); + return signal; }); const { sendRemoteMessage } = await initTransport('0x1234', {}, vi.fn()); @@ -2139,20 +2177,23 @@ describe('transport.initTransport', () => { makeTestMessage('test message'), ); - // Wait for the promise to be set up and event listener registered - await new Promise((resolve) => queueMicrotask(() => resolve())); - - // Manually trigger the abort to simulate timeout - mockSignal?.abort(); + // Wait for the write to be initiated + await vi.waitFor(() => { + expect(mockChannel.msgStream.write).toHaveBeenCalled(); + }); - // Wait for the abort handler to execute - await new Promise((resolve) => queueMicrotask(() => resolve())); + // Manually trigger the abort on the signal to simulate timeout + for (const signal of mockSignals) { + signal.abort(); + } - // sendRemoteMessage throws on timeout + // Wait for the promise to reject await expect(sendPromise).rejects.toThrow('Message send timed out'); // Verify that connection loss handling was triggered - expect(mockReconnectionManager.startReconnection).toHaveBeenCalled(); + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-1', + ); }); it('propagates write errors that occur before timeout', async () => { @@ -2197,23 +2238,21 @@ describe('transport.initTransport', () => { }); it('error message includes correct timeout duration', async () => { - // Ensure isReconnecting returns false so we actually call writeWithTimeout - mockReconnectionManager.isReconnecting.mockReturnValue(false); - const mockChannel = createMockChannel('peer-1'); - // Make write hang indefinitely - return a new hanging promise each time - mockChannel.msgStream.write.mockReset(); + // Make write never resolve to simulate a hang mockChannel.msgStream.write.mockReturnValue( new Promise(() => { - // Never resolves - simulates hanging write + /* Never resolves */ }), ); mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); - let mockSignal: ReturnType | undefined; + // Track all created abort signals so we can trigger abort manually + const mockSignals: ReturnType[] = []; vi.spyOn(AbortSignal, 'timeout').mockImplementation((ms: number) => { - mockSignal = makeAbortSignalMock(ms); - return mockSignal; + const signal = makeAbortSignalMock(ms); + mockSignals.push(signal); + return signal; }); const { sendRemoteMessage } = await initTransport('0x1234', {}, vi.fn()); @@ -2223,36 +2262,31 @@ describe('transport.initTransport', () => { makeTestMessage('test message'), ); - // Wait for the promise to be set up and event listener registered - await new Promise((resolve) => queueMicrotask(() => resolve())); - - // Manually trigger the abort to simulate timeout - mockSignal?.abort(); + // Wait for the write to be initiated + await vi.waitFor(() => { + expect(mockChannel.msgStream.write).toHaveBeenCalled(); + }); - // Wait for the abort handler to execute - await new Promise((resolve) => queueMicrotask(() => resolve())); + // Manually trigger the abort on the signal to simulate timeout + for (const signal of mockSignals) { + signal.abort(); + } - // sendRemoteMessage throws on timeout with the duration in the message + // Verify error message includes the correct timeout duration (10000ms) await expect(sendPromise).rejects.toThrow('10000ms'); - - // Verify that writeWithTimeout was called - expect(mockChannel.msgStream.write).toHaveBeenCalled(); }); it('handles multiple concurrent writes with timeout', async () => { - // Ensure isReconnecting returns false so we actually call writeWithTimeout - mockReconnectionManager.isReconnecting.mockReturnValue(false); - const mockChannel = createMockChannel('peer-1'); - // Make write hang indefinitely - return a new hanging promise each time - mockChannel.msgStream.write.mockReset(); + // Make write never resolve to simulate a hang mockChannel.msgStream.write.mockReturnValue( new Promise(() => { - // Never resolves - simulates hanging write + /* Never resolves */ }), ); mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + // Track all created abort signals so we can trigger abort manually const mockSignals: ReturnType[] = []; vi.spyOn(AbortSignal, 'timeout').mockImplementation((ms: number) => { const signal = makeAbortSignalMock(ms); @@ -2262,6 +2296,7 @@ describe('transport.initTransport', () => { const { sendRemoteMessage } = await initTransport('0x1234', {}, vi.fn()); + // Send multiple messages concurrently const sendPromise1 = sendRemoteMessage( 'peer-1', makeTestMessage('message 1'), @@ -2271,23 +2306,362 @@ describe('transport.initTransport', () => { makeTestMessage('message 2'), ); - // Wait for the promises to be set up and event listeners registered - await new Promise((resolve) => queueMicrotask(() => resolve())); + // Wait for writes to be initiated + await vi.waitFor(() => { + expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(2); + }); // Manually trigger the abort on all signals to simulate timeout for (const signal of mockSignals) { signal.abort(); } - // Wait for the abort handlers to execute - await new Promise((resolve) => queueMicrotask(() => resolve())); - - // sendRemoteMessage throws on timeout + // Both promises should reject with timeout error await expect(sendPromise1).rejects.toThrow('Message send timed out'); await expect(sendPromise2).rejects.toThrow('Message send timed out'); - // Verify that writeWithTimeout was called for both messages - expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(2); + // Verify that each write got its own timeout signal + expect(mockSignals.length).toBeGreaterThanOrEqual(2); + }); + }); + + describe('handshake protocol', () => { + it('sends handshake on outbound connection and waits for ack', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + const localIncarnationId = 'test-incarnation-id'; + // Mock read to return handshakeAck for the outbound handshake + const handshakeAck = JSON.stringify({ + method: 'handshakeAck', + params: { incarnationId: 'remote-incarnation-id' }, + }); + mockChannel.msgStream.read + .mockResolvedValueOnce(new TextEncoder().encode(handshakeAck)) + .mockReturnValue( + new Promise(() => { + /* Never resolves - normal read loop */ + }), + ); + + const { sendRemoteMessage } = await initTransport( + '0x1234', + {}, + vi.fn(), + undefined, // onRemoteGiveUp + localIncarnationId, + ); + + // Send a message to establish outbound connection + await sendRemoteMessage('peer-1', makeTestMessage('hello')); + + // Verify handshake was sent (first write) + expect(mockChannel.msgStream.write).toHaveBeenCalled(); + const { calls } = mockChannel.msgStream.write.mock; + const firstWrite = new TextDecoder().decode(calls[0][0] as Uint8Array); + const handshakeMsg = JSON.parse(firstWrite); + expect(handshakeMsg).toStrictEqual({ + method: 'handshake', + params: { incarnationId: localIncarnationId }, + }); + + // Second write should be the actual message + const secondWrite = new TextDecoder().decode(calls[1][0] as Uint8Array); + expect(secondWrite).toContain('hello'); + }); + + it('does not send handshake when no incarnationId provided', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + // No handshake means no need to mock handshakeAck read + mockChannel.msgStream.read.mockReturnValue( + new Promise(() => { + /* Never resolves */ + }), + ); + + const { sendRemoteMessage } = await initTransport( + '0x1234', + {}, + vi.fn(), + undefined, // onRemoteGiveUp + undefined, // no incarnationId + ); + + await sendRemoteMessage('peer-1', makeTestMessage('hello')); + + // Should only have the actual message, no handshake + expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(1); + const writeCall = new TextDecoder().decode( + mockChannel.msgStream.write.mock.calls[0][0] as Uint8Array, + ); + const parsedMsg = JSON.parse(writeCall); + expect(parsedMsg.method).not.toBe('handshake'); + }); + + it('handles incoming handshake and replies with handshakeAck', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler: (channel: MockChannel) => void) => { + inboundHandler = handler; + }, + ); + + const remoteMessageHandler = vi.fn().mockResolvedValue(''); + const localIncarnationId = 'local-incarnation'; + await initTransport( + '0x1234', + {}, + remoteMessageHandler, + undefined, + localIncarnationId, + ); + + // Create mock inbound channel - first read is handshake, then regular messages + const mockInboundChannel = createMockChannel('remote-peer'); + const handshakeMessage = JSON.stringify({ + method: 'handshake', + params: { incarnationId: 'remote-incarnation' }, + }); + const regularMessage = makeTestMessage('hello'); + mockInboundChannel.msgStream.read + .mockResolvedValueOnce(new TextEncoder().encode(handshakeMessage)) + .mockResolvedValueOnce(new TextEncoder().encode(regularMessage)) + .mockReturnValue( + new Promise(() => { + /* Never resolves */ + }), + ); + + // Trigger inbound connection + inboundHandler?.(mockInboundChannel); + + // Wait for handshake to be processed and ack to be sent + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('received handshake'), + ); + }); + + // Verify handshakeAck was sent + await vi.waitFor(() => { + expect(mockInboundChannel.msgStream.write).toHaveBeenCalled(); + }); + const ackWrite = new TextDecoder().decode( + mockInboundChannel.msgStream.write.mock.calls[0][0] as Uint8Array, + ); + const ackMsg = JSON.parse(ackWrite); + expect(ackMsg).toStrictEqual({ + method: 'handshakeAck', + params: { incarnationId: localIncarnationId }, + }); + + // Regular message after handshake should be passed to handler + await vi.waitFor(() => { + expect(remoteMessageHandler).toHaveBeenCalledWith( + 'remote-peer', + regularMessage, + ); + }); + }); + + it('rejects inbound connection when handshake fails', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler: (channel: MockChannel) => void) => { + inboundHandler = handler; + }, + ); + + const localIncarnationId = 'local-incarnation'; + await initTransport('0x1234', {}, vi.fn(), undefined, localIncarnationId); + + // Create mock inbound channel that sends wrong message type + const mockInboundChannel = createMockChannel('remote-peer'); + const wrongMessage = JSON.stringify({ + method: 'handshakeAck', // Wrong! Should be handshake for inbound + params: { incarnationId: 'remote-incarnation' }, + }); + mockInboundChannel.msgStream.read.mockResolvedValueOnce( + new TextEncoder().encode(wrongMessage), + ); + + // Trigger inbound connection + inboundHandler?.(mockInboundChannel); + + // Wait for rejection to be logged + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining( + 'rejecting inbound connection due to handshake failure', + ), + ); + }); + + // Channel should be closed + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + mockInboundChannel, + 'remote-peer', + ); + }); + + it('calls onRemoteGiveUp when incarnation changes', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler: (channel: MockChannel) => void) => { + inboundHandler = handler; + }, + ); + + const onRemoteGiveUp = vi.fn(); + const localIncarnationId = 'local-incarnation'; + await initTransport( + '0x1234', + {}, + vi.fn().mockResolvedValue(''), + onRemoteGiveUp, + localIncarnationId, + ); + + // First handshake from remote peer + const mockInboundChannel1 = createMockChannel('remote-peer'); + const handshakeMessage1 = JSON.stringify({ + method: 'handshake', + params: { incarnationId: 'incarnation-1' }, + }); + mockInboundChannel1.msgStream.read + .mockResolvedValueOnce(new TextEncoder().encode(handshakeMessage1)) + .mockReturnValue( + new Promise(() => { + /* Never resolves */ + }), + ); + + inboundHandler?.(mockInboundChannel1); + + // Wait for first handshake to be processed + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('first incarnation ID received'), + ); + }); + + // First incarnation should not trigger onRemoteGiveUp + expect(onRemoteGiveUp).not.toHaveBeenCalled(); + + // Second handshake with different incarnation (simulating peer restart) + const mockInboundChannel2 = createMockChannel('remote-peer'); + const handshakeMessage2 = JSON.stringify({ + method: 'handshake', + params: { incarnationId: 'incarnation-2' }, + }); + mockInboundChannel2.msgStream.read + .mockResolvedValueOnce(new TextEncoder().encode(handshakeMessage2)) + .mockReturnValue( + new Promise(() => { + /* Never resolves */ + }), + ); + + inboundHandler?.(mockInboundChannel2); + + // Wait for second handshake to be processed + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('incarnation changed'), + ); + }); + + // Changed incarnation should trigger onRemoteGiveUp + expect(onRemoteGiveUp).toHaveBeenCalledWith('remote-peer'); + }); + + it('passes regular messages to remoteMessageHandler after handshake', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler: (channel: MockChannel) => void) => { + inboundHandler = handler; + }, + ); + + const remoteMessageHandler = vi.fn().mockResolvedValue(''); + const localIncarnationId = 'local-incarnation'; + await initTransport( + '0x1234', + {}, + remoteMessageHandler, + undefined, + localIncarnationId, + ); + + // Create mock inbound channel - handshake first, then regular message + const mockInboundChannel = createMockChannel('remote-peer'); + const handshakeMessage = JSON.stringify({ + method: 'handshake', + params: { incarnationId: 'remote-incarnation' }, + }); + const regularMessage = makeTestMessage('hello'); + mockInboundChannel.msgStream.read + .mockResolvedValueOnce(new TextEncoder().encode(handshakeMessage)) + .mockResolvedValueOnce(new TextEncoder().encode(regularMessage)) + .mockReturnValue( + new Promise(() => { + /* Never resolves */ + }), + ); + + inboundHandler?.(mockInboundChannel); + + // Wait for message to be processed + await vi.waitFor(() => { + expect(remoteMessageHandler).toHaveBeenCalledWith( + 'remote-peer', + regularMessage, + ); + }); + }); + + it('skips handshake when no incarnationId and accepts inbound messages directly', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler: (channel: MockChannel) => void) => { + inboundHandler = handler; + }, + ); + + const remoteMessageHandler = vi.fn().mockResolvedValue(''); + // No incarnationId - handshake should be skipped + await initTransport( + '0x1234', + {}, + remoteMessageHandler, + undefined, + undefined, // no incarnationId + ); + + // Create mock inbound channel with regular message directly (no handshake) + const mockInboundChannel = createMockChannel('remote-peer'); + const regularMessage = makeTestMessage('hello'); + mockInboundChannel.msgStream.read + .mockResolvedValueOnce(new TextEncoder().encode(regularMessage)) + .mockReturnValue( + new Promise(() => { + /* Never resolves */ + }), + ); + + inboundHandler?.(mockInboundChannel); + + // Wait for message to be processed directly + await vi.waitFor(() => { + expect(remoteMessageHandler).toHaveBeenCalledWith( + 'remote-peer', + regularMessage, + ); + }); + + // No handshakeAck should be sent + expect(mockInboundChannel.msgStream.write).not.toHaveBeenCalled(); }); }); }); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index e57a062ed..1ea9d4092 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -15,6 +15,11 @@ import { DEFAULT_WRITE_TIMEOUT_MS, SCTP_USER_INITIATED_ABORT, } from './constants.ts'; +import { + performOutboundHandshake, + performInboundHandshake, +} from './handshake.ts'; +import type { HandshakeDeps } from './handshake.ts'; import { PeerStateManager } from './peer-state-manager.ts'; import { makeConnectionRateLimiter, @@ -51,6 +56,7 @@ import type { * @param options.maxConnectionAttemptsPerMinute - Maximum connection attempts per minute per peer (default: 10). * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). + * @param localIncarnationId - This kernel's incarnation ID for handshake protocol. * * @returns a function to send messages **and** a `stop()` to cancel/release everything. */ @@ -59,6 +65,7 @@ export async function initTransport( options: RemoteCommsOptions, remoteMessageHandler: RemoteMessageHandler, onRemoteGiveUp?: OnRemoteGiveUp, + localIncarnationId?: string, ): Promise<{ sendRemoteMessage: SendRemoteMessage; stop: StopRemoteComms; @@ -93,16 +100,16 @@ export async function initTransport( maxConnectionAttemptsPerMinute, ); let cleanupIntervalId: ReturnType | undefined; - // Holder for handleConnectionLoss - initialized later after all dependencies are defined - // This breaks the circular dependency between readChannel → handleConnectionLoss → registerChannel - const reconnectionHolder: { - handleConnectionLoss: ((peerId: string) => void) | undefined; - } = { handleConnectionLoss: undefined }; + // Holder for handleConnectionLoss - assigned after reconnectionLifecycle is created. + // The inner function is only called from callbacks after initialization completes. + const connectionLossHolder: { + impl: ((peerId: string) => void) | undefined; + } = { impl: undefined }; const handleConnectionLoss = (peerId: string): void => { - if (!reconnectionHolder.handleConnectionLoss) { - throw new Error('handleConnectionLoss not initialized'); + if (!connectionLossHolder.impl) { + throw new Error('handleConnectionLoss called before initialization'); } - reconnectionHolder.handleConnectionLoss(peerId); + connectionLossHolder.impl(peerId); }; const connectionFactory = await ConnectionFactory.make( keySeed, @@ -112,6 +119,68 @@ export async function initTransport( maxRetryAttempts, ); + // Create handshake dependencies (only if incarnation ID is configured) + const handshakeDeps: HandshakeDeps | undefined = localIncarnationId + ? { + localIncarnationId, + logger, + setRemoteIncarnation: (peerId: string, incarnationId: string) => + peerStateManager.setRemoteIncarnation(peerId, incarnationId), + } + : undefined; + + /** + * Perform outbound handshake and handle incarnation changes. + * Returns true if handshake succeeded (or was skipped), false if it failed. + * + * @param channel - The channel to perform handshake on. + * @returns True if handshake succeeded or was skipped. + */ + async function doOutboundHandshake(channel: Channel): Promise { + if (!handshakeDeps) { + return true; // No handshake configured, skip + } + try { + const result = await performOutboundHandshake(channel, handshakeDeps); + if (result.incarnationChanged && onRemoteGiveUp) { + logger.log( + `${channel.peerId.slice(0, 8)}:: incarnation changed during outbound handshake, triggering promise rejection`, + ); + onRemoteGiveUp(channel.peerId); + } + return true; + } catch (problem) { + outputError(channel.peerId, 'outbound handshake', problem); + return false; + } + } + + /** + * Perform inbound handshake and handle incarnation changes. + * Returns true if handshake succeeded (or was skipped), false if it failed. + * + * @param channel - The channel to perform handshake on. + * @returns True if handshake succeeded or was skipped. + */ + async function doInboundHandshake(channel: Channel): Promise { + if (!handshakeDeps) { + return true; // No handshake configured, skip + } + try { + const result = await performInboundHandshake(channel, handshakeDeps); + if (result.incarnationChanged && onRemoteGiveUp) { + logger.log( + `${channel.peerId.slice(0, 8)}:: incarnation changed during inbound handshake, triggering promise rejection`, + ); + onRemoteGiveUp(channel.peerId); + } + return true; + } catch (problem) { + outputError(channel.peerId, 'inbound handshake', problem); + return false; + } + } + /** * Clean up stale peer data for peers inactive for more than stalePeerTimeoutMs. * A peer is considered stale if: @@ -134,6 +203,7 @@ export async function initTransport( /** * Register a channel for a peer, closing any previous channel. * This ensures proper cleanup of old channels to prevent leaks. + * Handshake must be completed BEFORE calling this function. * * @param peerId - The peer ID. * @param channel - The channel to register. @@ -228,6 +298,7 @@ export async function initTransport( logger.log(`${from}:: recv ${message.substring(0, 200)}`); // Pass all messages to handler (including ACK-only messages - handler handles them) + // Handshake messages are handled during connection establishment, not here. try { const reply = await remoteMessageHandler(from, message); // Send reply if non-empty (reply is already a serialized string from RemoteHandle) @@ -318,12 +389,12 @@ export async function initTransport( checkConnectionLimit, checkConnectionRateLimit: (peerId: string) => connectionRateLimiter.checkAndRecord(peerId, 'connectionRate'), + registerChannel, + doOutboundHandshake, closeChannel: async (channel, peerId) => connectionFactory.closeChannel(channel, peerId), - registerChannel, }); - reconnectionHolder.handleConnectionLoss = - reconnectionLifecycle.handleConnectionLoss; + connectionLossHolder.impl = reconnectionLifecycle.handleConnectionLoss; /** * Send a message string to a peer. @@ -399,6 +470,12 @@ export async function initTransport( } throw error; } + // Perform handshake before registering the channel + const handshakeOk = await doOutboundHandshake(channel); + if (!handshakeOk) { + await connectionFactory.closeChannel(channel, targetPeerId); + throw Error('Handshake failed'); + } registerChannel(targetPeerId, channel, 'reading channel to'); } } catch (problem) { @@ -442,8 +519,12 @@ export async function initTransport( }); } - // Set up inbound connection handler - connectionFactory.onInboundConnection((channel) => { + /** + * Handle inbound connection setup including handshake. + * + * @param channel - The inbound channel. + */ + async function handleInboundConnection(channel: Channel): Promise { // Reject inbound connections from intentionally closed peers if (peerStateManager.isIntentionallyClosed(channel.peerId)) { logger.log( @@ -467,7 +548,24 @@ export async function initTransport( throw error; } + // Perform handshake before registering the channel + const handshakeOk = await doInboundHandshake(channel); + if (!handshakeOk) { + logger.log( + `${channel.peerId}:: rejecting inbound connection due to handshake failure`, + ); + closeRejectedChannel(channel); + return; + } + registerChannel(channel.peerId, channel, 'error in inbound channel read'); + } + + // Set up inbound connection handler + connectionFactory.onInboundConnection((channel) => { + handleInboundConnection(channel).catch((problem) => { + outputError(channel.peerId, 'inbound connection setup', problem); + }); }); // Install wake detector to reset backoff on sleep/wake diff --git a/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.test.ts b/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.test.ts index 6d718c92f..1e839ce5c 100644 --- a/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.test.ts +++ b/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.test.ts @@ -70,6 +70,15 @@ describe('initializeRemoteComms', () => { expect(is(validParams, initializeRemoteCommsSpec.params)).toBe(true); }); + it('should accept params with incarnationId', () => { + const validParams = { + keySeed: '0x1234567890abcdef', + incarnationId: 'test-incarnation-id', + }; + + expect(is(validParams, initializeRemoteCommsSpec.params)).toBe(true); + }); + it('should reject params with missing keySeed', () => { const invalidParams = { relays: ['/dns4/relay.example/tcp/443/wss/p2p/relay'], @@ -245,6 +254,7 @@ describe('initializeRemoteComms', () => { '/dns4/relay2.example/tcp/443/wss/p2p/relay2', ], }, + undefined, ); expect(result).toBeNull(); }); @@ -308,9 +318,13 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xemptyrelays', { - relays: [], - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xemptyrelays', + { + relays: [], + }, + undefined, + ); }); it('should handle empty string parameters', async () => { @@ -329,9 +343,13 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('', { - relays: [''], - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '', + { + relays: [''], + }, + undefined, + ); }); it('should handle unicode characters in parameters', async () => { @@ -353,6 +371,7 @@ describe('initializeRemoteComms', () => { expect(mockInitializeRemoteComms).toHaveBeenCalledWith( '🔑unicode-seed🔑', { relays: ['🌐unicode-relay🌐'] }, + undefined, ); }); @@ -409,9 +428,13 @@ describe('initializeRemoteComms', () => { initializeRemoteCommsHandler.implementation(hooks, params), ).rejects.toThrow(error); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith(keySeed, { - relays: ['test-relay'], - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + keySeed, + { + relays: ['test-relay'], + }, + undefined, + ); }, ); @@ -436,9 +459,13 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xmanyrelays', { - relays: manyRelays, - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xmanyrelays', + { + relays: manyRelays, + }, + undefined, + ); }); it('should handle complex initialization scenarios', async () => { @@ -501,9 +528,13 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xtestseed', { - maxRetryAttempts: 5, - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xtestseed', + { + maxRetryAttempts: 5, + }, + undefined, + ); }); it('should pass maxQueue to hook when provided', async () => { @@ -522,9 +553,13 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xtestseed', { - maxQueue: 100, - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xtestseed', + { + maxQueue: 100, + }, + undefined, + ); }); it('should pass all options when all are provided', async () => { @@ -545,11 +580,15 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xtestseed', { - relays: ['/dns4/relay.example/tcp/443/wss/p2p/relay'], - maxRetryAttempts: 5, - maxQueue: 100, - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xtestseed', + { + relays: ['/dns4/relay.example/tcp/443/wss/p2p/relay'], + maxRetryAttempts: 5, + maxQueue: 100, + }, + undefined, + ); }); it('should pass empty options when only keySeed is provided', async () => { @@ -567,7 +606,11 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xtestseed', {}); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xtestseed', + {}, + undefined, + ); }); it('should not include undefined optional params in options', async () => { @@ -637,9 +680,13 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xtestseed', { - maxRetryAttempts: 0, - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xtestseed', + { + maxRetryAttempts: 0, + }, + undefined, + ); }); it('should pass maxQueue set to zero to hook', async () => { @@ -658,9 +705,36 @@ describe('initializeRemoteComms', () => { await initializeRemoteCommsHandler.implementation(hooks, params); - expect(mockInitializeRemoteComms).toHaveBeenCalledWith('0xtestseed', { - maxQueue: 0, - }); + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xtestseed', + { + maxQueue: 0, + }, + undefined, + ); + }); + + it('should pass incarnationId when provided', async () => { + const mockInitializeRemoteComms: InitializeRemoteComms = vi.fn( + async () => null, + ); + + const hooks = { + initializeRemoteComms: mockInitializeRemoteComms, + }; + + const params = { + keySeed: '0xtestseed', + incarnationId: 'test-incarnation-id', + }; + + await initializeRemoteCommsHandler.implementation(hooks, params); + + expect(mockInitializeRemoteComms).toHaveBeenCalledWith( + '0xtestseed', + {}, + 'test-incarnation-id', + ); }); }); }); diff --git a/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts b/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts index fc882c414..faf6e6f61 100644 --- a/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts +++ b/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts @@ -15,6 +15,7 @@ const initializeRemoteCommsParamsStruct = object({ relays: optional(array(string())), maxRetryAttempts: optional(number()), maxQueue: optional(number()), + incarnationId: optional(string()), }); type InitializeRemoteCommsParams = { @@ -22,6 +23,7 @@ type InitializeRemoteCommsParams = { relays?: string[]; maxRetryAttempts?: number; maxQueue?: number; + incarnationId?: string; }; export type InitializeRemoteCommsSpec = MethodSpec< @@ -39,6 +41,7 @@ export const initializeRemoteCommsSpec: InitializeRemoteCommsSpec = { export type InitializeRemoteComms = ( keySeed: string, options: RemoteCommsOptions, + incarnationId?: string, ) => Promise; type InitializeRemoteCommsHooks = { @@ -66,6 +69,10 @@ export const initializeRemoteCommsHandler: InitializeRemoteCommsHandler = { if (params.maxQueue !== undefined) { options.maxQueue = params.maxQueue; } - return await initializeRemoteComms(params.keySeed, options); + return await initializeRemoteComms( + params.keySeed, + options, + params.incarnationId, + ); }, }; diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index 5f47cb80b..788935d44 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -87,6 +87,7 @@ describe('kernel store', () => { 'getNextRemoteId', 'getNextVatId', 'getObjectRefCount', + 'getOrCreateIncarnationId', 'getOwner', 'getPendingMessage', 'getPinnedObjects', @@ -426,4 +427,63 @@ describe('kernel store', () => { expect(ks.kv.get('customKey')).toBeUndefined(); }); }); + + describe('incarnation ID', () => { + it('generates a new incarnation ID on first call', () => { + const ks = makeKernelStore(mockKernelDatabase); + + const incarnationId = ks.getOrCreateIncarnationId(); + + expect(typeof incarnationId).toBe('string'); + expect(incarnationId).toHaveLength(36); // UUID format + expect(ks.kv.get('incarnationId')).toBe(incarnationId); + }); + + it('returns the same incarnation ID on subsequent calls', () => { + const ks = makeKernelStore(mockKernelDatabase); + + const first = ks.getOrCreateIncarnationId(); + const second = ks.getOrCreateIncarnationId(); + const third = ks.getOrCreateIncarnationId(); + + expect(first).toBe(second); + expect(second).toBe(third); + }); + + it('preserves incarnation ID across store instances', () => { + const ks1 = makeKernelStore(mockKernelDatabase); + const incarnationId = ks1.getOrCreateIncarnationId(); + + // Create a new store instance pointing to the same database + const ks2 = makeKernelStore(mockKernelDatabase); + const loaded = ks2.getOrCreateIncarnationId(); + + expect(loaded).toBe(incarnationId); + }); + + it('regenerates incarnation ID after storage reset', () => { + const ks = makeKernelStore(mockKernelDatabase); + const original = ks.getOrCreateIncarnationId(); + + // Reset storage (clears incarnationId since it's not in except list) + ks.reset(); + + const regenerated = ks.getOrCreateIncarnationId(); + + expect(regenerated).not.toBe(original); + expect(regenerated).toHaveLength(36); + }); + + it('preserves incarnation ID when reset with except list', () => { + const ks = makeKernelStore(mockKernelDatabase); + const original = ks.getOrCreateIncarnationId(); + + // Reset with incarnationId in except list + ks.reset({ except: ['incarnationId'] }); + + const preserved = ks.getOrCreateIncarnationId(); + + expect(preserved).toBe(original); + }); + }); }); diff --git a/packages/ocap-kernel/src/store/index.ts b/packages/ocap-kernel/src/store/index.ts index f058e9d89..f3c99e53e 100644 --- a/packages/ocap-kernel/src/store/index.ts +++ b/packages/ocap-kernel/src/store/index.ts @@ -232,6 +232,24 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { kdb.clear(); } + /** + * Get or create the kernel's incarnation ID. + * The incarnation ID is persisted so it survives kernel restarts, + * but is regenerated when storage is cleared (when state is actually lost). + * + * @returns The incarnation ID (existing or newly generated). + */ + function getOrCreateIncarnationId(): string { + const existing = context.kv.get('incarnationId'); + if (existing) { + return existing; + } + // eslint-disable-next-line n/no-unsupported-features/node-builtins + const newId = globalThis.crypto.randomUUID(); + context.kv.set('incarnationId', newId); + return newId; + } + return harden({ ...id, ...queue, @@ -252,6 +270,7 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { deleteVat, clear, reset, + getOrCreateIncarnationId, kv, }); } diff --git a/packages/ocap-kernel/src/types.ts b/packages/ocap-kernel/src/types.ts index e5ad35dbe..e23f14421 100644 --- a/packages/ocap-kernel/src/types.ts +++ b/packages/ocap-kernel/src/types.ts @@ -324,6 +324,7 @@ export type PlatformServices = { * @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200). * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. + * @param incarnationId - Unique identifier for this kernel instance. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -332,6 +333,7 @@ export type PlatformServices = { options: RemoteCommsOptions, remoteMessageHandler: RemoteMessageHandler, onRemoteGiveUp?: OnRemoteGiveUp, + incarnationId?: string, ) => Promise; /** * Stop network communications.