From c41c1d143988636bf27629160731ae29a8f7decf Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Fri, 19 Dec 2025 16:36:23 -0600 Subject: [PATCH 1/2] Add HTTP multipart transport for GraphQL subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds support for HTTP multipart transport, which allows GraphQL subscriptions to work over HTTP using the multipart response format. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- docs/code_examples/http_multipart_async.py | 37 ++ docs/modules/gql.rst | 1 + docs/modules/transport_http_multipart.rst | 7 + docs/transports/async_transports.rst | 1 + docs/transports/http_multipart.rst | 159 +++++ gql/transport/http_multipart_transport.py | 323 ++++++++++ tests/conftest.py | 1 + tests/test_http_multipart_transport.py | 691 +++++++++++++++++++++ 8 files changed, 1220 insertions(+) create mode 100644 docs/code_examples/http_multipart_async.py create mode 100644 docs/modules/transport_http_multipart.rst create mode 100644 docs/transports/http_multipart.rst create mode 100644 gql/transport/http_multipart_transport.py create mode 100644 tests/test_http_multipart_transport.py diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/http_multipart_async.py new file mode 100644 index 00000000..d6d6e372 --- /dev/null +++ b/docs/code_examples/http_multipart_async.py @@ -0,0 +1,37 @@ +import asyncio +import logging + +from gql import Client, gql +from gql.transport.http_multipart_transport import HTTPMultipartTransport + +logging.basicConfig(level=logging.INFO) + + +async def main(): + + transport = HTTPMultipartTransport(url="https://gql-book-server.fly.dev/graphql") + + # Using `async with` on the client will start a connection on the transport + # and provide a `session` variable to execute queries on this connection + async with Client( + transport=transport, + ) as session: + + # Request subscription + subscription = gql( + """ + subscription { + book { + title + author + } + } + """ + ) + + # Subscribe and receive streaming updates + async for result in session.subscribe(subscription): + print(f"Received: {result}") + + +asyncio.run(main()) diff --git a/docs/modules/gql.rst b/docs/modules/gql.rst index 035f196f..6937286e 100644 --- a/docs/modules/gql.rst +++ b/docs/modules/gql.rst @@ -29,6 +29,7 @@ Sub-Packages transport_common_adapters_aiohttp transport_common_adapters_websockets transport_exceptions + transport_http_multipart transport_phoenix_channel_websockets transport_requests transport_httpx diff --git a/docs/modules/transport_http_multipart.rst b/docs/modules/transport_http_multipart.rst new file mode 100644 index 00000000..0e91e0af --- /dev/null +++ b/docs/modules/transport_http_multipart.rst @@ -0,0 +1,7 @@ +gql.transport.http\_multipart\_transport module +=============================================== + +.. automodule:: gql.transport.http_multipart_transport + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/transports/async_transports.rst b/docs/transports/async_transports.rst index ba5ca136..7e81fd35 100644 --- a/docs/transports/async_transports.rst +++ b/docs/transports/async_transports.rst @@ -11,6 +11,7 @@ Async transports are transports which are using an underlying async library. The aiohttp httpx_async + http_multipart websockets aiohttp_websockets phoenix diff --git a/docs/transports/http_multipart.rst b/docs/transports/http_multipart.rst new file mode 100644 index 00000000..416f82c9 --- /dev/null +++ b/docs/transports/http_multipart.rst @@ -0,0 +1,159 @@ +.. _http_multipart_transport: + +HTTPMultipartTransport +====================== + +This transport implements GraphQL subscriptions over HTTP using the `multipart subscription protocol`_ +as implemented by Apollo GraphOS Router and other compatible servers. + +This provides an HTTP-based alternative to WebSocket transports for receiving streaming +subscription updates. It's particularly useful when: + +- WebSocket connections are not available or blocked by infrastructure +- You want to use standard HTTP with existing load balancers and proxies +- The backend implements the multipart subscription protocol + +Reference: :class:`gql.transport.http_multipart_transport.HTTPMultipartTransport` + +.. note:: + + This transport is specifically designed for GraphQL subscriptions. While it can handle + queries and mutations via the ``execute()`` method, standard HTTP transports like + :ref:`AIOHTTPTransport ` are more efficient for those operations. + +.. literalinclude:: ../code_examples/http_multipart_async.py + +How It Works +------------ + +The transport sends a standard HTTP POST request with an ``Accept`` header indicating +support for multipart responses: + +.. code-block:: text + + Accept: multipart/mixed;subscriptionSpec="1.0", application/json + +The server responds with a ``multipart/mixed`` content type and streams subscription +updates as separate parts in the response body. Each part contains a JSON payload +with GraphQL execution results. + +Protocol Details +---------------- + +**Message Format** + +Each message part follows this structure: + +.. code-block:: text + + --graphql + Content-Type: application/json + + {"payload": {"data": {...}, "errors": [...]}} + +**Heartbeats** + +Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the +connection alive. These are automatically filtered out by the transport. + +**Error Handling** + +The protocol distinguishes between two types of errors: + +- **GraphQL errors**: Returned within the ``payload`` property alongside data +- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload + +**End of Stream** + +The subscription ends when the server sends the final boundary marker: + +.. code-block:: text + + --graphql-- + +Authentication +-------------- + +Authentication works the same as with :ref:`AIOHTTPTransport `. + +Using HTTP Headers +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL:SERVER_PORT/graphql', + headers={'Authorization': 'Bearer YOUR_TOKEN'} + ) + +Using HTTP Cookies +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url=url, + cookies={"session_id": "your_session_cookie"} + ) + +Or use a cookie jar to save and reuse cookies: + +.. code-block:: python + + import aiohttp + + jar = aiohttp.CookieJar() + transport = HTTPMultipartTransport( + url=url, + client_session_args={'cookie_jar': jar} + ) + +Configuration +------------- + +Timeout Settings +^^^^^^^^^^^^^^^^ + +Set a timeout for the HTTP request: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + timeout=30 # 30 second timeout + ) + +SSL Configuration +^^^^^^^^^^^^^^^^^ + +Control SSL certificate verification: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=False # Disable SSL verification (not recommended for production) + ) + +Or provide a custom SSL context: + +.. code-block:: python + + import ssl + + ssl_context = ssl.create_default_context() + ssl_context.load_cert_chain('client.crt', 'client.key') + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=ssl_context + ) + +Limitations +----------- + +- This transport requires the server to implement the multipart subscription protocol +- Long-lived connections may be terminated by intermediate proxies or load balancers +- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming + +.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py new file mode 100644 index 00000000..2b55dc2d --- /dev/null +++ b/gql/transport/http_multipart_transport.py @@ -0,0 +1,323 @@ +""" +HTTP Multipart Transport for GraphQL Subscriptions + +This transport implements support for GraphQL subscriptions over HTTP using +the multipart subscription protocol as implemented by Apollo GraphOS Router +and other compatible servers. + +Reference: +https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol +""" + +import asyncio +import json +import logging +from ssl import SSLContext +from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union + +import aiohttp +from aiohttp.client_reqrep import Fingerprint +from aiohttp.helpers import BasicAuth +from aiohttp.typedefs import LooseCookies, LooseHeaders +from graphql import ExecutionResult +from multidict import CIMultiDictProxy + +from gql.graphql_request import GraphQLRequest +from gql.transport.async_transport import AsyncTransport +from gql.transport.common.aiohttp_closed_event import create_aiohttp_closed_event +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +log = logging.getLogger(__name__) + + +class HTTPMultipartTransport(AsyncTransport): + """ + Async Transport for GraphQL subscriptions using the multipart subscription protocol. + + This transport sends GraphQL subscription queries via HTTP POST and receives + streaming multipart/mixed responses, where each part contains a JSON payload + with GraphQL execution results. This protocol is implemented by Apollo GraphOS + Router and other compatible servers. + """ + + def __init__( + self, + url: str, + headers: Optional[LooseHeaders] = None, + cookies: Optional[LooseCookies] = None, + auth: Optional[BasicAuth] = None, + ssl: Union[SSLContext, bool, Fingerprint] = True, + timeout: Optional[int] = None, + ssl_close_timeout: Optional[Union[int, float]] = 10, + json_serialize: Callable = json.dumps, + json_deserialize: Callable = json.loads, + client_session_args: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Initialize the HTTP Multipart transport. + + :param url: The GraphQL server URL (http or https) + :param headers: Dict of HTTP Headers + :param cookies: Dict of HTTP cookies + :param auth: BasicAuth object for HTTP authentication + :param ssl: SSL context or validation mode + :param timeout: Request timeout in seconds + :param ssl_close_timeout: Timeout for SSL connection close + :param json_serialize: JSON serializer function + :param json_deserialize: JSON deserializer function + :param client_session_args: Extra args for aiohttp.ClientSession + """ + self.url = url + self.headers = headers or {} + self.cookies = cookies + self.auth = auth + self.ssl = ssl + self.timeout = timeout + self.ssl_close_timeout = ssl_close_timeout + self.json_serialize = json_serialize + self.json_deserialize = json_deserialize + self.client_session_args = client_session_args or {} + + self.session: Optional[aiohttp.ClientSession] = None + self.response_headers: Optional[CIMultiDictProxy[str]] = None + + async def connect(self) -> None: + """Create an aiohttp ClientSession.""" + if self.session is not None: + raise TransportAlreadyConnected("Transport is already connected") + + client_session_args: Dict[str, Any] = { + "cookies": self.cookies, + "headers": self.headers, + "auth": self.auth, + "json_serialize": self.json_serialize, + } + + if self.timeout is not None: + client_session_args["timeout"] = aiohttp.ClientTimeout(total=self.timeout) + + client_session_args.update(self.client_session_args) + + log.debug("Connecting HTTP Multipart transport") + self.session = aiohttp.ClientSession(**client_session_args) + + async def close(self) -> None: + """Close the aiohttp session.""" + if self.session is not None: + log.debug("Closing HTTP Multipart transport") + + if ( + self.client_session_args + and self.client_session_args.get("connector_owner") is False + ): + log.debug("connector_owner is False -> not closing connector") + else: + closed_event = create_aiohttp_closed_event(self.session) + await self.session.close() + try: + await asyncio.wait_for(closed_event.wait(), self.ssl_close_timeout) + except asyncio.TimeoutError: + pass + + self.session = None + + async def subscribe( + self, + request: GraphQLRequest, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Execute a GraphQL subscription and yield results from multipart response. + + :param request: GraphQL request to execute + :yields: ExecutionResult objects as they arrive in the multipart stream + """ + if self.session is None: + raise TransportClosed("Transport is not connected") + + payload = request.payload + if log.isEnabledFor(logging.DEBUG): + log.debug(">>> %s", self.json_serialize(payload)) + + headers = { + "Content-Type": "application/json", + "Accept": ( + "multipart/mixed;boundary=graphql;" + "subscriptionSpec=1.0,application/json" + ), + } + + try: + # Make the POST request + async with self.session.post( + self.url, + json=payload, + headers=headers, + ssl=self.ssl, + ) as response: + # Save response headers + self.response_headers = response.headers + + # Check for errors + if response.status >= 400: + error_text = await response.text() + raise TransportServerError( + f"Server returned {response.status}: {error_text}", + response.status, + ) + + initial_content_type = response.headers.get("Content-Type", "") + if ( + ("multipart/mixed" not in initial_content_type) + or ("boundary=graphql" not in initial_content_type) + or ("subscriptionSpec=1.0" not in initial_content_type) + or ("application/json" not in initial_content_type) + ): + raise TransportProtocolError( + f"Unexpected content-type: {initial_content_type}. " + "Server may not support the multipart subscription protocol." + ) + + # Parse multipart response + async for result in self._parse_multipart_response(response): + yield result + + except (TransportServerError, TransportProtocolError): + # Let these exceptions propagate without wrapping + raise + except Exception as e: + raise TransportConnectionFailed(str(e)) from e + + async def _parse_multipart_response( + self, + response: aiohttp.ClientResponse, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Parse a multipart response stream and yield execution results. + + Uses aiohttp's built-in MultipartReader to handle the multipart protocol. + + :param response: The aiohttp response object + :yields: ExecutionResult objects + """ + # Use aiohttp's built-in multipart reader + reader = aiohttp.MultipartReader.from_response(response) + + # Iterate through each part in the multipart response + while True: + try: + part = await reader.next() + except Exception: + # reader.next() throws on empty parts at the end of the stream. + # (some servers may send this.) + # see: https://github.com/aio-libs/aiohttp/pull/11857 + # As an ugly workaround for now, we can check if we've reached + # EOF and assume this was the case. + if reader.at_eof(): + break + + # Otherwise, re-raise unexpected errors + raise + + if part is None: + # No more parts + break + + assert not isinstance( + part, aiohttp.MultipartReader + ), "Nested multipart parts are not supported in GraphQL subscriptions" + + result = await self._parse_multipart_part(part) + if result: + yield result + + async def _parse_multipart_part( + self, part: aiohttp.BodyPartReader + ) -> Optional[ExecutionResult]: + """ + Parse a single part from a multipart response. + + :param part: aiohttp BodyPartReader for the part + :return: ExecutionResult or None if part is empty/heartbeat + """ + # Verify the part has the correct content type + content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "") + if not content_type.startswith("application/json"): + raise TransportProtocolError( + f"Unexpected part content-type: {content_type}. " + "Expected 'application/json'." + ) + + try: + # Read the part content as text + body = await part.text() + body = body.strip() + + if log.isEnabledFor(logging.DEBUG): + log.debug("<<< %s", body or "(empty body, skipping)") + + if not body: + return None + + # Parse JSON body using custom deserializer + data = self.json_deserialize(body) + + # Handle heartbeats - empty JSON objects + if not data: + log.debug("Received heartbeat, ignoring") + return None + + # The multipart subscription protocol wraps data in a "payload" property + if "payload" not in data: + log.warning("Invalid response: missing 'payload' field") + return None + + payload = data["payload"] + + # Check for transport-level errors (payload is null) + if payload is None: + # If there are errors, this is a transport-level error + errors = data.get("errors") + if errors: + error_messages = [ + error.get("message", "Unknown transport error") + for error in errors + ] + + for message in error_messages: + log.error(f"Transport error: {message}") + + raise TransportServerError("\n\n".join(error_messages)) + else: + # Null payload without errors - just skip this part + return None + + # Extract GraphQL data from payload + return ExecutionResult( + data=payload.get("data"), + errors=payload.get("errors"), + extensions=payload.get("extensions"), + ) + except json.JSONDecodeError as e: + log.warning( + f"Failed to parse JSON: {e}, body: {body[:100] if body else ''}" + ) + return None + + async def execute( + self, + request: GraphQLRequest, + ) -> ExecutionResult: + """ + :raises: NotImplementedError - This transport only supports subscriptions + """ + raise NotImplementedError( + "The HTTP multipart transport does not support queries or " + "mutations. Use HTTPTransport for queries and mutations, or use " + "subscribe() for subscriptions." + ) diff --git a/tests/conftest.py b/tests/conftest.py index cef561f7..9de910ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -128,6 +128,7 @@ async def ssl_aiohttp_server(): "gql.transport.appsync", "gql.transport.common.base", "gql.transport.httpx", + "gql.transport.http_multipart_transport", "gql.transport.phoenix_channel_websockets", "gql.transport.requests", "gql.transport.websockets", diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py new file mode 100644 index 00000000..0784f08f --- /dev/null +++ b/tests/test_http_multipart_transport.py @@ -0,0 +1,691 @@ +import asyncio +import json +from unittest.mock import AsyncMock, patch + +import pytest +from aiohttp import web + +from gql import Client, gql +from gql.graphql_request import GraphQLRequest +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +subscription_str = """ + subscription { + book { + title + author + } + } +""" + +book1 = {"title": "Book 1", "author": "Author 1"} +book2 = {"title": "Book 2", "author": "Author 2"} +book3 = {"title": "Book 3", "author": "Author 3"} + + +def create_multipart_response(books, *, separator="\r\n", include_heartbeat=False): + """Helper to create parts for a streamed response body.""" + parts = [] + + for idx, book in enumerate(books): + data = {"data": {"book": book}} + payload = {"payload": data} + + parts.append(( + f"--graphql{separator}" + f"Content-Type: application/json{separator}" + f"{separator}" + f"{json.dumps(payload)}{separator}" + )) # fmt: skip + + # Add heartbeat after first item if requested + if include_heartbeat and idx == 0: + parts.append(( + f"--graphql{separator}" + f"Content-Type: application/json{separator}" + f"{separator}" + f"{{}}{separator}" + )) # fmt: skip + + # Add end boundary + parts.append(f"--graphql--{separator}") + + return parts + + +@pytest.fixture +@pytest.mark.aiohttp +def multipart_server(aiohttp_server): + async def create_server( + parts, + *, + content_type=( + "multipart/mixed;boundary=graphql;subscriptionSpec=1.0,application/json" + ), + request_handler=lambda *args: None, + ): + async def handler(request): + request_handler(request) + response = web.StreamResponse() + response.headers["Content-Type"] = content_type + response.enable_chunked_encoding() + await response.prepare(request) + for part in parts: + await response.write(part.encode()) + await asyncio.sleep(0) # force the chunk to be written + await response.write_eof() + return response + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + return server + + return create_server + + +@pytest.mark.asyncio +async def test_http_multipart_subscription(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + def assert_response_headers(request): + # Verify the Accept header follows the spec + accept_header = request.headers["accept"] + assert "multipart/mixed" in accept_header + assert "boundary=graphql" in accept_header + assert "subscriptionSpec=1.0" in accept_header + assert "application/json" in accept_header + + parts = create_multipart_response([book1, book2]) + server = await multipart_server(parts, request_handler=assert_response_headers) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_subscription_with_heartbeat(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1, book2], include_heartbeat=True) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.aiohttp +@pytest.mark.asyncio +async def test_http_multipart_unsupported_content_type(aiohttp_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Return text/html instead of application/json + return web.Response(text="

hello

", content_type="text/html") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Unexpected content-type" in str(exc_info.value) + + +@pytest.mark.aiohttp +@pytest.mark.asyncio +async def test_http_multipart_server_error(aiohttp_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + return web.Response(text="Internal Server Error", status=500) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "500: Internal Server Error" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_level_error(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Transport error has null payload with errors at top level + error_response = { + "payload": None, + "errors": [{"message": "Transport connection failed"}], + } + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(error_response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Transport connection failed" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_graphql_errors(multipart_server): + from gql.transport.exceptions import TransportQueryError + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # GraphQL errors come inside the payload + response = { + "payload": { + "data": {"book": {**book1, "author": None}}, + "errors": [ + {"message": "could not fetch author", "path": ["book", "author"]} + ], + } + } + parts = [ + ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # Client raises TransportQueryError when there are errors in the result + with pytest.raises(TransportQueryError) as exc_info: + async for result in session.subscribe(query): + pass + + # Verify error details + assert "could not fetch author" in str(exc_info.value).lower() + assert exc_info.value.data is not None + assert exc_info.value.data["book"]["author"] is None + # Verify we can still get data for the non-error fields + assert exc_info.value.data["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_execute_method(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1, book2]) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + # execute() should raise NotImplementedError + with pytest.raises(NotImplementedError) as exc_info: + await session.execute(query) + + assert "does not support queries or mutations" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_already_connected(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([]) + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + await transport.connect() + + with pytest.raises(TransportAlreadyConnected): + await transport.connect() + + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_transport_not_connected(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + request = GraphQLRequest(query) + + with pytest.raises(TransportClosed): + async for result in transport.subscribe(request): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_execute_empty_response(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Return empty multipart response (no data parts) + parts = ["--graphql--\r\n"] + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # execute() should raise NotImplementedError + with pytest.raises(NotImplementedError) as exc_info: + await session.execute(query) + + assert "does not support queries or mutations" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_newline_separator(multipart_server): + """Test that LF-only separators are rejected (spec requires CRLF).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # The GraphQL over HTTP spec requires CRLF line endings in multipart responses + # https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md + parts = create_multipart_response([book1], separator="\n") + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + # Non-compliant multipart format (LF instead of CRLF) should fail + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_transport_connection_failed_error(): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Use an invalid URL that will fail to connect + transport = HTTPMultipartTransport(url="http://invalid.local:-1/graphql", timeout=1) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_connector_owner_false(multipart_server): + """Test closing transport with connector_owner=False.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + url = server.make_url("/") + + transport = HTTPMultipartTransport( + url=url, client_session_args={"connector_owner": False} + ) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_http_multipart_ssl_close_timeout(multipart_server): + """Test SSL close timeout during transport close.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1], separator="\n") + server = await multipart_server(parts) + url = server.make_url("/") + + transport = HTTPMultipartTransport(url=url, ssl_close_timeout=0.001) + + await transport.connect() + + # Mock the closed event to timeout + with patch( + "gql.transport.http_multipart_transport.create_aiohttp_closed_event" + ) as mock_event: + mock_wait = AsyncMock() + mock_wait.side_effect = asyncio.TimeoutError() + mock_event.return_value.wait = mock_wait + + # Should handle timeout gracefully + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_malformed_json(multipart_server): + """Test handling of malformed JSON in multipart response.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "{invalid json }\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip malformed parts + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_payload_null_no_errors(multipart_server): + """Test handling of null payload without errors.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Null payload but no errors + response = {"payload": None} + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Null payload without errors should return nothing + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_invalid_utf8(multipart_server): + """Test handling of invalid UTF-8 in multipart response.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "\xff\xfe\r\n" # Contains invalid UTF-8 + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip invalid part + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_chunked_boundary_split(multipart_server): + """Test parsing when boundary is split across chunks.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + "--gra", + ( + "phql\r\nContent-Type: application/json\r\n\r\n" + '{"payload": {"data": {"book": {"title": "Bo' + ), + 'ok 1"}}}}\r\n--graphql--\r\n', + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_wrong_part_content_type(multipart_server): + """Test that parts with wrong content-type raise an error.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Part with text/html instead of application/json + parts = [ + ("--graphql\r\n" "Content-Type: text/html\r\n" "\r\n" "

hello

\r\n"), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Unexpected part content-type" in str(exc_info.value) + assert "text/html" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_response_headers(multipart_server): + """Test that response headers are captured in the transport.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Verify response headers are captured + assert transport.response_headers is not None + assert "Content-Type" in transport.response_headers + assert "multipart/mixed" in transport.response_headers["Content-Type"] + + +@pytest.mark.asyncio +async def test_http_multipart_empty_body(multipart_server): + """Test part with empty body after stripping.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Part with only whitespace body + parts = [ + "--graphql\r\nContent-Type: application/json\r\n\r\n \r\n", + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_missing_payload_field(multipart_server): + """Test handling of response missing required 'payload' field.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + response = {"foo": "bar"} # No payload field! + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip invalid response and return no results + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_with_content_length_headers(multipart_server): + """Test multipart response with Content-Length headers (like real servers send).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Simulate real server behavior: each part has Content-Length header + book1_payload = json.dumps({"payload": {"data": {"book": book1}}}) + book2_payload = json.dumps({"payload": {"data": {"book": book2}}}) + heartbeat_payload = "{}" + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(heartbeat_payload)}\r\n" + "\r\n" + f"{heartbeat_payload}\r\n" + ), + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(book1_payload)}\r\n" + "\r\n" + f"{book1_payload}\r\n" + ), + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(book2_payload)}\r\n" + "\r\n" + f"{book2_payload}\r\n" + ), + "--graphql\r\n", # Extra empty part like real servers + "--graphql--\r\n", # Final boundary + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should get 2 books (heartbeat and empty part filtered) + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" From 364f3401282644b61dd7381bb6fbd0d23c9657bf Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Fri, 19 Dec 2025 18:27:12 -0600 Subject: [PATCH 2/2] Update http_multipart.rst --- docs/transports/http_multipart.rst | 29 +++-------------------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/docs/transports/http_multipart.rst b/docs/transports/http_multipart.rst index 416f82c9..b55bb1b5 100644 --- a/docs/transports/http_multipart.rst +++ b/docs/transports/http_multipart.rst @@ -17,9 +17,8 @@ Reference: :class:`gql.transport.http_multipart_transport.HTTPMultipartTransport .. note:: - This transport is specifically designed for GraphQL subscriptions. While it can handle - queries and mutations via the ``execute()`` method, standard HTTP transports like - :ref:`AIOHTTPTransport ` are more efficient for those operations. + This transport is specifically designed for GraphQL subscriptions. It does not support + mutations or queries. .. literalinclude:: ../code_examples/http_multipart_async.py @@ -40,21 +39,7 @@ with GraphQL execution results. Protocol Details ---------------- -**Message Format** - -Each message part follows this structure: - -.. code-block:: text - - --graphql - Content-Type: application/json - - {"payload": {"data": {...}, "errors": [...]}} - -**Heartbeats** - -Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the -connection alive. These are automatically filtered out by the transport. +See https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol **Error Handling** @@ -63,14 +48,6 @@ The protocol distinguishes between two types of errors: - **GraphQL errors**: Returned within the ``payload`` property alongside data - **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload -**End of Stream** - -The subscription ends when the server sends the final boundary marker: - -.. code-block:: text - - --graphql-- - Authentication --------------