Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/connect-already-connected.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@modelcontextprotocol/core": patch
---

fix: throw error when connecting to already-connected Protocol

Protocol.connect() now throws a descriptive error if called when already
connected to a transport. This prevents silent overwrites that break
concurrent HTTP sessions.

Fixes #1405
8 changes: 8 additions & 0 deletions packages/core/src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,14 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
* The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward.
*/
async connect(transport: Transport): Promise<void> {
if (this._transport) {
throw new Error(
'Protocol is already connected to a transport. ' +
'Call close() before connecting to a new transport, ' +
'or create a new Protocol instance for concurrent connections.'
);
}

this._transport = transport;
const _onclose = this.transport?.onclose;
this._transport.onclose = () => {
Expand Down
140 changes: 90 additions & 50 deletions packages/core/test/shared/protocolTransportHandling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,60 @@ class MockTransport implements Transport {
}
}

describe('Protocol transport handling bug', () => {
function createProtocol(): Protocol<Request, Notification, Result> {
return new (class extends Protocol<Request, Notification, Result> {
protected assertCapabilityForMethod(): void {}
protected assertNotificationCapability(): void {}
protected assertRequestHandlerCapability(): void {}
protected assertTaskCapability(): void {}
protected assertTaskHandlerCapability(): void {}
})();
}

describe('Protocol transport handling', () => {
let protocol: Protocol<Request, Notification, Result>;
let transportA: MockTransport;
let transportB: MockTransport;

beforeEach(() => {
protocol = new (class extends Protocol<Request, Notification, Result> {
protected assertCapabilityForMethod(): void {}
protected assertNotificationCapability(): void {}
protected assertRequestHandlerCapability(): void {}
protected assertTaskCapability(): void {}
protected assertTaskHandlerCapability(): void {}
})();

protocol = createProtocol();
transportA = new MockTransport('A');
transportB = new MockTransport('B');
});

test('should send response to the correct transport when multiple clients are connected', async () => {
test('should throw error when connecting to an already connected protocol', async () => {
// Connect first transport
await protocol.connect(transportA);

// Attempting to connect second transport should throw
await expect(protocol.connect(transportB)).rejects.toThrow('Protocol is already connected to a transport');
});

test('should allow reconnection after close()', async () => {
// Connect first transport
await protocol.connect(transportA);

// Close the connection
await protocol.close();

// Now connecting second transport should work
await expect(protocol.connect(transportB)).resolves.not.toThrow();
});

test('should send response to the correct transport with separate protocol instances', async () => {
// Create separate protocol instances for concurrent connections
const protocolA = createProtocol();
const protocolB = createProtocol();

// Set up a request handler that simulates processing time
let resolveHandler: (value: Result) => void;
const handlerPromise = new Promise<Result>(resolve => {
resolveHandler = resolve;
let resolveHandlerA: (value: Result) => void;
const handlerPromiseA = new Promise<Result>(resolve => {
resolveHandlerA = resolve;
});

let resolveHandlerB: (value: Result) => void;
const handlerPromiseB = new Promise<Result>(resolve => {
resolveHandlerB = resolve;
});

const TestRequestSchema = z.object({
Expand All @@ -62,13 +93,18 @@ describe('Protocol transport handling bug', () => {
.optional()
});

protocol.setRequestHandler(TestRequestSchema, async request => {
protocolA.setRequestHandler(TestRequestSchema, async request => {
console.log(`Processing request from ${request.params?.from}`);
return handlerPromise;
return handlerPromiseA;
});

protocolB.setRequestHandler(TestRequestSchema, async request => {
console.log(`Processing request from ${request.params?.from}`);
return handlerPromiseB;
});

// Client A connects and sends a request
await protocol.connect(transportA);
await protocolA.connect(transportA);

const requestFromA = {
jsonrpc: '2.0' as const,
Expand All @@ -80,9 +116,8 @@ describe('Protocol transport handling bug', () => {
// Simulate client A sending a request
transportA.onmessage?.(requestFromA);

// While A's request is being processed, client B connects
// This overwrites the transport reference in the protocol
await protocol.connect(transportB);
// Client B connects to its own protocol instance
await protocolB.connect(transportB);

const requestFromB = {
jsonrpc: '2.0' as const,
Expand All @@ -94,8 +129,9 @@ describe('Protocol transport handling bug', () => {
// Client B sends its own request
transportB.onmessage?.(requestFromB);

// Now complete A's request
resolveHandler!({ data: 'responseForA' } as Result);
// Complete both requests
resolveHandlerA!({ data: 'responseForA' } as Result);
resolveHandlerB!({ data: 'responseForB' } as Result);

// Wait for async operations to complete
await new Promise(resolve => setTimeout(resolve, 10));
Expand All @@ -104,26 +140,27 @@ describe('Protocol transport handling bug', () => {
console.log('Transport A received:', transportA.sentMessages);
console.log('Transport B received:', transportB.sentMessages);

// FIXED: Each transport now receives its own response

// Transport A should receive response for request ID 1
// Each transport receives its own response
expect(transportA.sentMessages.length).toBe(1);
expect(transportA.sentMessages[0]).toMatchObject({
jsonrpc: '2.0',
id: 1,
result: { data: 'responseForA' }
});

// Transport B should only receive its own response (when implemented)
expect(transportB.sentMessages.length).toBe(1);
expect(transportB.sentMessages[0]).toMatchObject({
jsonrpc: '2.0',
id: 2,
result: { data: 'responseForA' } // Same handler result in this test
result: { data: 'responseForB' }
});
});

test('demonstrates the timing issue with multiple rapid connections', async () => {
test('demonstrates proper handling with separate protocol instances', async () => {
// Create separate protocol instances for concurrent connections
const protocolA = createProtocol();
const protocolB = createProtocol();

const delays: number[] = [];
const results: { transport: string; response: JSONRPCMessage[] }[] = [];

Expand All @@ -137,38 +174,41 @@ describe('Protocol transport handling bug', () => {
.optional()
});

// Set up handler with variable delay
protocol.setRequestHandler(DelayedRequestSchema, async (request, extra) => {
const delay = request.params?.delay || 0;
delays.push(delay);
// Set up handlers with variable delay on both protocols
const setupHandler = (proto: Protocol<Request, Notification, Result>) => {
proto.setRequestHandler(DelayedRequestSchema, async (request, extra) => {
const delay = request.params?.delay || 0;
delays.push(delay);

await new Promise(resolve => setTimeout(resolve, delay));
await new Promise(resolve => setTimeout(resolve, delay));

return {
processedBy: `handler-${extra.requestId}`,
delay: delay
} as Result;
});
return {
processedBy: `handler-${extra.requestId}`,
delay: delay
} as Result;
});
};

// Rapid succession of connections and requests
await protocol.connect(transportA);
setupHandler(protocolA);
setupHandler(protocolB);

// Connect A and send request
await protocolA.connect(transportA);
transportA.onmessage?.({
jsonrpc: '2.0' as const,
method: 'test/delayed',
params: { delay: 50, client: 'A' },
id: 1
});

// Connect B while A is processing
setTimeout(async () => {
await protocol.connect(transportB);
transportB.onmessage?.({
jsonrpc: '2.0' as const,
method: 'test/delayed',
params: { delay: 10, client: 'B' },
id: 2
});
}, 10);
// Connect B (separate instance) while A is processing
await protocolB.connect(transportB);
transportB.onmessage?.({
jsonrpc: '2.0' as const,
method: 'test/delayed',
params: { delay: 10, client: 'B' },
id: 2
});

// Wait for all processing
await new Promise(resolve => setTimeout(resolve, 100));
Expand All @@ -183,7 +223,7 @@ describe('Protocol transport handling bug', () => {

console.log('Timing test results:', results);

// FIXED: Each transport receives its own responses
// Each transport receives its own responses
expect(transportA.sentMessages.length).toBe(1);
expect(transportB.sentMessages.length).toBe(1);
});
Expand Down
Loading