diff --git a/.changeset/connect-already-connected.md b/.changeset/connect-already-connected.md new file mode 100644 index 000000000..71b5b4d90 --- /dev/null +++ b/.changeset/connect-already-connected.md @@ -0,0 +1,11 @@ +--- +"@modelcontextprotocol/core": patch +--- + +fix: throw error when connecting to already-connected Protocol + +Protocol.connect() now throws a descriptive error if called when already +connected to a transport. This prevents silent overwrites that break +concurrent HTTP sessions. + +Fixes #1405 diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index def841832..9329503d2 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -609,6 +609,14 @@ export abstract class Protocol { + if (this._transport) { + throw new Error( + 'Protocol is already connected to a transport. ' + + 'Call close() before connecting to a new transport, ' + + 'or create a new Protocol instance for concurrent connections.' + ); + } + this._transport = transport; const _onclose = this.transport?.onclose; this._transport.onclose = () => { diff --git a/packages/core/test/shared/protocolTransportHandling.test.ts b/packages/core/test/shared/protocolTransportHandling.test.ts index 0e1b9b5c9..499c49ad1 100644 --- a/packages/core/test/shared/protocolTransportHandling.test.ts +++ b/packages/core/test/shared/protocolTransportHandling.test.ts @@ -28,29 +28,60 @@ class MockTransport implements Transport { } } -describe('Protocol transport handling bug', () => { +function createProtocol(): Protocol { + return new (class extends Protocol { + protected assertCapabilityForMethod(): void {} + protected assertNotificationCapability(): void {} + protected assertRequestHandlerCapability(): void {} + protected assertTaskCapability(): void {} + protected assertTaskHandlerCapability(): void {} + })(); +} + +describe('Protocol transport handling', () => { let protocol: Protocol; let transportA: MockTransport; let transportB: MockTransport; beforeEach(() => { - protocol = new (class extends Protocol { - protected assertCapabilityForMethod(): void {} - protected assertNotificationCapability(): void {} - protected assertRequestHandlerCapability(): void {} - protected assertTaskCapability(): void {} - protected assertTaskHandlerCapability(): void {} - })(); - + protocol = createProtocol(); transportA = new MockTransport('A'); transportB = new MockTransport('B'); }); - test('should send response to the correct transport when multiple clients are connected', async () => { + test('should throw error when connecting to an already connected protocol', async () => { + // Connect first transport + await protocol.connect(transportA); + + // Attempting to connect second transport should throw + await expect(protocol.connect(transportB)).rejects.toThrow('Protocol is already connected to a transport'); + }); + + test('should allow reconnection after close()', async () => { + // Connect first transport + await protocol.connect(transportA); + + // Close the connection + await protocol.close(); + + // Now connecting second transport should work + await expect(protocol.connect(transportB)).resolves.not.toThrow(); + }); + + test('should send response to the correct transport with separate protocol instances', async () => { + // Create separate protocol instances for concurrent connections + const protocolA = createProtocol(); + const protocolB = createProtocol(); + // Set up a request handler that simulates processing time - let resolveHandler: (value: Result) => void; - const handlerPromise = new Promise(resolve => { - resolveHandler = resolve; + let resolveHandlerA: (value: Result) => void; + const handlerPromiseA = new Promise(resolve => { + resolveHandlerA = resolve; + }); + + let resolveHandlerB: (value: Result) => void; + const handlerPromiseB = new Promise(resolve => { + resolveHandlerB = resolve; }); const TestRequestSchema = z.object({ @@ -62,13 +93,18 @@ describe('Protocol transport handling bug', () => { .optional() }); - protocol.setRequestHandler(TestRequestSchema, async request => { + protocolA.setRequestHandler(TestRequestSchema, async request => { console.log(`Processing request from ${request.params?.from}`); - return handlerPromise; + return handlerPromiseA; + }); + + protocolB.setRequestHandler(TestRequestSchema, async request => { + console.log(`Processing request from ${request.params?.from}`); + return handlerPromiseB; }); // Client A connects and sends a request - await protocol.connect(transportA); + await protocolA.connect(transportA); const requestFromA = { jsonrpc: '2.0' as const, @@ -80,9 +116,8 @@ describe('Protocol transport handling bug', () => { // Simulate client A sending a request transportA.onmessage?.(requestFromA); - // While A's request is being processed, client B connects - // This overwrites the transport reference in the protocol - await protocol.connect(transportB); + // Client B connects to its own protocol instance + await protocolB.connect(transportB); const requestFromB = { jsonrpc: '2.0' as const, @@ -94,8 +129,9 @@ describe('Protocol transport handling bug', () => { // Client B sends its own request transportB.onmessage?.(requestFromB); - // Now complete A's request - resolveHandler!({ data: 'responseForA' } as Result); + // Complete both requests + resolveHandlerA!({ data: 'responseForA' } as Result); + resolveHandlerB!({ data: 'responseForB' } as Result); // Wait for async operations to complete await new Promise(resolve => setTimeout(resolve, 10)); @@ -104,9 +140,7 @@ describe('Protocol transport handling bug', () => { console.log('Transport A received:', transportA.sentMessages); console.log('Transport B received:', transportB.sentMessages); - // FIXED: Each transport now receives its own response - - // Transport A should receive response for request ID 1 + // Each transport receives its own response expect(transportA.sentMessages.length).toBe(1); expect(transportA.sentMessages[0]).toMatchObject({ jsonrpc: '2.0', @@ -114,16 +148,19 @@ describe('Protocol transport handling bug', () => { result: { data: 'responseForA' } }); - // Transport B should only receive its own response (when implemented) expect(transportB.sentMessages.length).toBe(1); expect(transportB.sentMessages[0]).toMatchObject({ jsonrpc: '2.0', id: 2, - result: { data: 'responseForA' } // Same handler result in this test + result: { data: 'responseForB' } }); }); - test('demonstrates the timing issue with multiple rapid connections', async () => { + test('demonstrates proper handling with separate protocol instances', async () => { + // Create separate protocol instances for concurrent connections + const protocolA = createProtocol(); + const protocolB = createProtocol(); + const delays: number[] = []; const results: { transport: string; response: JSONRPCMessage[] }[] = []; @@ -137,21 +174,26 @@ describe('Protocol transport handling bug', () => { .optional() }); - // Set up handler with variable delay - protocol.setRequestHandler(DelayedRequestSchema, async (request, extra) => { - const delay = request.params?.delay || 0; - delays.push(delay); + // Set up handlers with variable delay on both protocols + const setupHandler = (proto: Protocol) => { + proto.setRequestHandler(DelayedRequestSchema, async (request, extra) => { + const delay = request.params?.delay || 0; + delays.push(delay); - await new Promise(resolve => setTimeout(resolve, delay)); + await new Promise(resolve => setTimeout(resolve, delay)); - return { - processedBy: `handler-${extra.requestId}`, - delay: delay - } as Result; - }); + return { + processedBy: `handler-${extra.requestId}`, + delay: delay + } as Result; + }); + }; - // Rapid succession of connections and requests - await protocol.connect(transportA); + setupHandler(protocolA); + setupHandler(protocolB); + + // Connect A and send request + await protocolA.connect(transportA); transportA.onmessage?.({ jsonrpc: '2.0' as const, method: 'test/delayed', @@ -159,16 +201,14 @@ describe('Protocol transport handling bug', () => { id: 1 }); - // Connect B while A is processing - setTimeout(async () => { - await protocol.connect(transportB); - transportB.onmessage?.({ - jsonrpc: '2.0' as const, - method: 'test/delayed', - params: { delay: 10, client: 'B' }, - id: 2 - }); - }, 10); + // Connect B (separate instance) while A is processing + await protocolB.connect(transportB); + transportB.onmessage?.({ + jsonrpc: '2.0' as const, + method: 'test/delayed', + params: { delay: 10, client: 'B' }, + id: 2 + }); // Wait for all processing await new Promise(resolve => setTimeout(resolve, 100)); @@ -183,7 +223,7 @@ describe('Protocol transport handling bug', () => { console.log('Timing test results:', results); - // FIXED: Each transport receives its own responses + // Each transport receives its own responses expect(transportA.sentMessages.length).toBe(1); expect(transportB.sentMessages.length).toBe(1); });