diff --git a/src/mcp/client/stdio.py b/src/mcp/client/stdio.py index 5b8209eeb..5f3f07565 100644 --- a/src/mcp/client/stdio.py +++ b/src/mcp/client/stdio.py @@ -158,7 +158,7 @@ async def stdout_reader(): session_message = SessionMessage(message) await read_stream_writer.send(session_message) - except anyio.ClosedResourceError: # pragma: lax no cover + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover await anyio.lowlevel.checkpoint() async def stdin_writer(): @@ -174,7 +174,7 @@ async def stdin_writer(): errors=server.encoding_error_handler, ) ) - except anyio.ClosedResourceError: # pragma: no cover + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover await anyio.lowlevel.checkpoint() async with anyio.create_task_group() as tg, process: @@ -205,6 +205,7 @@ async def stdin_writer(): except ProcessLookupError: # pragma: no cover # Process already exited, which is fine pass + await read_stream.aclose() await write_stream.aclose() await read_stream_writer.aclose() diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index 864d387bd..7b978c5fc 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -64,7 +64,7 @@ async def stdin_reader(): session_message = SessionMessage(message) await read_stream_writer.send(session_message) - except anyio.ClosedResourceError: # pragma: no cover + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover await anyio.lowlevel.checkpoint() async def stdout_writer(): @@ -74,7 +74,7 @@ async def stdout_writer(): json = session_message.message.model_dump_json(by_alias=True, exclude_unset=True) await stdout.write(json + "\n") await stdout.flush() - except anyio.ClosedResourceError: # pragma: no cover + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover await anyio.lowlevel.checkpoint() async with anyio.create_task_group() as tg: diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index f70c24eee..bf190f2ea 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -620,3 +620,50 @@ def sigterm_handler(signum, frame): f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-ignoring process. " f"Expected between 2-4 seconds (2s stdin timeout + termination time)." ) + + +@pytest.mark.anyio +async def test_stdio_client_quick_exit_race_condition(): + """Test that stdio_client handles quick context exits without crashing. + + This reproduces the race condition where: + 1. Subprocess is spawned and starts outputting data + 2. User code exits the context quickly (e.g., timeout, error, disconnect) + 3. Cleanup code closes streams while background tasks are still using them + 4. Background tasks should handle closed streams gracefully (no BrokenResourceError) + + The fix ensures: + - Tasks are cancelled before streams are closed + - Tasks handle BrokenResourceError gracefully as a fallback + """ + + # Create a Python script that continuously outputs data + # This simulates a subprocess that's slow to shut down + continuous_output_script = textwrap.dedent( + """ + import sys + import time + + # Continuously output to keep stdout_reader busy + for i in range(100): + print(f'{{"jsonrpc":"2.0","id":{i},"result":{{}}}}') + sys.stdout.flush() + time.sleep(0.01) + """ + ) + + server_params = StdioServerParameters( + command=sys.executable, + args=["-c", continuous_output_script], + ) + + # This should not raise an ExceptionGroup or BrokenResourceError + # The background tasks should handle stream closure gracefully + async with stdio_client(server_params) as (_, _): + # Immediately exit - triggers cleanup while subprocess is still outputting + pass + + # If we get here without exception, the race condition is handled correctly + # The tasks either: + # 1. Were cancelled before stream closure (proper fix) + # 2. Handled BrokenResourceError gracefully (defense in depth)