diff --git a/.changeset/fix-onerror-callbacks.md b/.changeset/fix-onerror-callbacks.md new file mode 100644 index 000000000..4ca4e72e4 --- /dev/null +++ b/.changeset/fix-onerror-callbacks.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/server': patch +--- + +Fix transport errors being silently swallowed by adding missing `onerror` callback invocations before all `createJsonErrorResponse` calls in `WebStandardStreamableHTTPServerTransport`. This ensures errors like parse failures, invalid headers, and session validation errors are properly reported via the `onerror` callback. diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index ae8bad97e..52d1c76d5 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -376,6 +376,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // The client MUST include an Accept header, listing text/event-stream as a supported content type. const acceptHeader = req.headers.get('accept'); if (!acceptHeader?.includes('text/event-stream')) { + this.onerror?.(new Error('Not Acceptable: Client must accept text/event-stream')); return this.createJsonErrorResponse(406, -32_000, 'Not Acceptable: Client must accept text/event-stream'); } @@ -402,6 +403,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // Check if there's already an active standalone SSE stream for this session if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) { // Only one GET SSE stream is allowed per session + this.onerror?.(new Error('Conflict: Only one SSE stream is allowed per session')); return this.createJsonErrorResponse(409, -32_000, 'Conflict: Only one SSE stream is allowed per session'); } @@ -453,6 +455,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { */ private async replayEvents(lastEventId: string): Promise { if (!this._eventStore) { + this.onerror?.(new Error('Event store not configured')); return this.createJsonErrorResponse(400, -32_000, 'Event store not configured'); } @@ -463,11 +466,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { streamId = await this._eventStore.getStreamIdForEventId(lastEventId); if (!streamId) { + this.onerror?.(new Error('Invalid event ID format')); return this.createJsonErrorResponse(400, -32_000, 'Invalid event ID format'); } // Check conflict with the SAME streamId we'll use for mapping if (this._streamMapping.get(streamId) !== undefined) { + this.onerror?.(new Error('Conflict: Stream already has an active connection')); return this.createJsonErrorResponse(409, -32_000, 'Conflict: Stream already has an active connection'); } } @@ -586,6 +591,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const acceptHeader = req.headers.get('accept'); // The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types. if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) { + this.onerror?.(new Error('Not Acceptable: Client must accept both application/json and text/event-stream')); return this.createJsonErrorResponse( 406, -32_000, @@ -595,6 +601,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const ct = req.headers.get('content-type'); if (!ct || !ct.includes('application/json')) { + this.onerror?.(new Error('Unsupported Media Type: Content-Type must be application/json')); return this.createJsonErrorResponse(415, -32_000, 'Unsupported Media Type: Content-Type must be application/json'); } @@ -608,6 +615,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { try { rawMessage = await req.json(); } catch { + this.onerror?.(new Error('Parse error: Invalid JSON')); return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON'); } } else { @@ -622,6 +630,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { ? rawMessage.map(msg => JSONRPCMessageSchema.parse(msg)) : [JSONRPCMessageSchema.parse(rawMessage)]; } catch { + this.onerror?.(new Error('Parse error: Invalid JSON-RPC message')); return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON-RPC message'); } @@ -632,9 +641,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // If it's a server with session management and the session ID is already set we should reject the request // to avoid re-initialization. if (this._initialized && this.sessionId !== undefined) { + this.onerror?.(new Error('Invalid Request: Server already initialized')); return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Server already initialized'); } if (messages.length > 1) { + this.onerror?.(new Error('Invalid Request: Only one initialization request is allowed')); return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Only one initialization request is allowed'); } this.sessionId = this.sessionIdGenerator?.(); @@ -814,6 +825,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } if (!this._initialized) { // If the server has not been initialized yet, reject all requests + this.onerror?.(new Error('Bad Request: Server not initialized')); return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Server not initialized'); } @@ -821,11 +833,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { if (!sessionId) { // Non-initialization requests without a session ID should return 400 Bad Request + this.onerror?.(new Error('Bad Request: Mcp-Session-Id header is required')); return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Mcp-Session-Id header is required'); } if (sessionId !== this.sessionId) { // Reject requests with invalid session ID with 404 Not Found + this.onerror?.(new Error('Session not found')); return this.createJsonErrorResponse(404, -32_001, 'Session not found'); } @@ -849,6 +863,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const protocolVersion = req.headers.get('mcp-protocol-version'); if (protocolVersion !== null && !SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) { + this.onerror?.(new Error(`Bad Request: Unsupported protocol version: ${protocolVersion}`)); return this.createJsonErrorResponse( 400, -32_000, diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index 2a08d669a..872ca9c41 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -768,4 +768,195 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { await expect(transport.start()).rejects.toThrow('Transport already started'); }); }); + + describe('HTTPServerTransport - onerror callback', () => { + let transport: WebStandardStreamableHTTPServerTransport; + let mcpServer: McpServer; + let errors: Error[]; + + beforeEach(async () => { + errors = []; + mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: {} }); + + transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID() + }); + + transport.onerror = err => errors.push(err); + + await mcpServer.connect(transport); + }); + + afterEach(async () => { + await transport.close(); + }); + + async function initializeServer(): Promise { + const request = createRequest('POST', TEST_MESSAGES.initialize); + const response = await transport.handleRequest(request); + return response.headers.get('mcp-session-id') as string; + } + + it('should call onerror for invalid JSON', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'application/json' + }, + body: 'not valid json' + }); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Parse error'); + }); + + it('should call onerror for invalid JSON-RPC message', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ not: 'valid jsonrpc' }) + }); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Parse error'); + }); + + it('should call onerror for missing Accept header on POST', async () => { + const request = createRequest('POST', TEST_MESSAGES.initialize, { accept: 'application/json' }); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(406); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Not Acceptable'); + }); + + it('should call onerror for unsupported Content-Type', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'text/plain' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(415); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Unsupported Media Type'); + }); + + it('should call onerror for server not initialized', async () => { + const request = createRequest('POST', TEST_MESSAGES.toolsList); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Server not initialized'); + }); + + it('should call onerror for invalid session ID', async () => { + await initializeServer(); + + const request = createRequest('POST', TEST_MESSAGES.toolsList, { sessionId: 'invalid-session-id' }); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(404); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Session not found'); + }); + + it('should call onerror for re-initialization attempt', async () => { + await initializeServer(); + + const request = createRequest('POST', TEST_MESSAGES.initialize); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Server already initialized'); + }); + + it('should call onerror for GET without Accept header', async () => { + const sessionId = await initializeServer(); + + const request = createRequest('GET', undefined, { sessionId, accept: 'application/json' }); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(406); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Not Acceptable'); + }); + + it('should call onerror for concurrent SSE streams', async () => { + const sessionId = await initializeServer(); + + const request1 = createRequest('GET', undefined, { sessionId }); + await transport.handleRequest(request1); + + const request2 = createRequest('GET', undefined, { sessionId }); + const response2 = await transport.handleRequest(request2); + + expect(response2.status).toBe(409); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Conflict'); + }); + + it('should call onerror for unsupported protocol version', async () => { + const sessionId = await initializeServer(); + + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': 'unsupported-version' + }, + body: JSON.stringify(TEST_MESSAGES.toolsList) + }); + + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(errors.length).toBeGreaterThan(0); + const error = errors[0]; + expect(error).toBeDefined(); + expect(error?.message).toContain('Unsupported protocol version'); + }); + }); });