From 839bb359013dd6ec5206d00cda5c84b4eca03d56 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 30 Jan 2026 18:36:43 +0300 Subject: [PATCH] fix: handle asyncio.CancelledError as retriable in topic writer and reader --- ydb/_grpc/grpcwrapper/common_utils.py | 3 + ydb/_topic_reader/topic_reader_asyncio.py | 17 +++++- .../topic_reader_asyncio_test.py | 57 +++++++++++++++++++ ydb/_topic_writer/topic_writer_asyncio.py | 15 ++++- .../topic_writer_asyncio_test.py | 47 ++++++++++++++- 5 files changed, 135 insertions(+), 4 deletions(-) diff --git a/ydb/_grpc/grpcwrapper/common_utils.py b/ydb/_grpc/grpcwrapper/common_utils.py index cf91b9c9..261a0464 100644 --- a/ydb/_grpc/grpcwrapper/common_utils.py +++ b/ydb/_grpc/grpcwrapper/common_utils.py @@ -234,6 +234,9 @@ async def get_response(): except (grpc.RpcError, grpc.aio.AioRpcError) as e: raise connection._rpc_error_handler(self._connection_state, e) + except asyncio.CancelledError: + # gRPC CancelledError - convert to YDB error for retry logic + raise issues.ConnectionLost("gRPC stream cancelled") if not is_coordination_calls: issues._process_response(grpc_message) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 818eb1a9..783e5636 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -208,6 +208,7 @@ class ReaderReconnector: _static_reader_reconnector_counter = AtomicCounter() _id: int + _closed: bool _settings: topic_reader.PublicReaderSettings _driver: Driver _background_tasks: Set[Task] @@ -224,6 +225,7 @@ def __init__( loop: Optional[asyncio.AbstractEventLoop] = None, ): self._id = ReaderReconnector._static_reader_reconnector_counter.inc_and_get() + self._closed = False self._settings = settings self._driver = driver self._loop = loop if loop is not None else asyncio.get_running_loop() @@ -239,6 +241,7 @@ def __init__( async def _connection_loop(self): attempt = 0 + retry_settings = self._settings._retry_settings() while True: try: logger.debug("reader %s connect attempt %s", self._id, attempt) @@ -247,9 +250,17 @@ async def _connection_loop(self): attempt = 0 self._state_changed.set() await self._stream_reader.wait_error() + except asyncio.CancelledError: + # CancelledError from close() - exit cleanly + if self._closed: + return + # gRPC wrapper converts gRPC CancelledError to ConnectionLost (retriable). + # In Python 3.11+, external task.cancel() is detected in wrapper and re-raised. + # Any CancelledError reaching here is external cancellation - propagate it. + raise except BaseException as err: logger.debug("reader %s, attempt %s connection loop error %s", self._id, attempt, err) - retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt) + retry_info = check_retriable_error(err, retry_settings, attempt) if not retry_info.is_retriable: logger.debug("reader %s stop connection loop due to %s", self._id, err) self._set_first_error(err) @@ -374,6 +385,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co return self._stream_reader.commit(batch) async def close(self, flush: bool): + if self._closed: + return + self._closed = True + logger.debug("reader reconnector %s close", self._id) if self._stream_reader: await self._stream_reader.close(flush) diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 371aee50..72dd295b 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1566,3 +1566,60 @@ async def stream_create( reader_stream_mock_with_error.wait_error.assert_any_await() reader_stream_mock_with_error.wait_messages.assert_any_await() + + async def test_reconnect_on_connection_lost(self, monkeypatch): + """Test that ConnectionLost (from gRPC CancelledError) is treated as retriable. + + This tests the fix for issue #735 - gRPC wrapper converts CancelledError to + ConnectionLost, which should cause reconnection, not permanent failure. + """ + + async def wait_error_with_connection_lost(): + raise issues.ConnectionLost("gRPC stream cancelled") + + reader_stream_mock_with_error = mock.Mock(ReaderStream) + reader_stream_mock_with_error._id = 0 + reader_stream_mock_with_error.wait_error = mock.AsyncMock(side_effect=wait_error_with_connection_lost) + reader_stream_mock_with_error.close = mock.AsyncMock() + + # First stream's wait_messages should also fail (simulating connection issue) + async def wait_messages_with_error(): + raise issues.ConnectionLost("connection lost") + + reader_stream_mock_with_error.wait_messages = mock.AsyncMock(side_effect=wait_messages_with_error) + + async def wait_forever(): + f = asyncio.Future() + await f + + reader_stream_with_messages = mock.Mock(ReaderStream) + reader_stream_with_messages._id = 1 + reader_stream_with_messages.wait_error = mock.AsyncMock(side_effect=wait_forever) + reader_stream_with_messages.wait_messages.return_value = None + reader_stream_with_messages.close = mock.AsyncMock() + + stream_index = 0 + + async def stream_create( + reader_reconnector_id: int, + driver: SupportedDriverType, + settings: PublicReaderSettings, + ): + nonlocal stream_index + stream_index += 1 + if stream_index == 1: + return reader_stream_mock_with_error + elif stream_index == 2: + return reader_stream_with_messages + else: + raise Exception("unexpected create stream") + + with mock.patch.object(ReaderStream, "create", stream_create): + reconnector = ReaderReconnector(mock.Mock(), PublicReaderSettings("", "")) + # This would hang/fail before the fix because gRPC errors weren't retriable + await asyncio.wait_for(reconnector.wait_message(), timeout=5) + await reconnector.close(flush=False) + + # Verify that reconnection happened (ConnectionLost from wait_error triggered reconnect) + reader_stream_mock_with_error.wait_error.assert_any_await() + assert stream_index == 2, "Should have created second stream after ConnectionLost" diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 1c0e410b..61f56525 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -435,9 +435,9 @@ def _check_stop(self): async def _connection_loop(self): retry_settings = RetrySettings(retry_cancelled=True) # todo + attempt = 0 while True: - attempt = 0 # todo calc and reset tasks = [] # noinspection PyBroadException @@ -456,6 +456,7 @@ async def _connection_loop(self): self._id, stream_writer._id, ) + attempt = 0 # Reset after successful connect try: if self._init_info is None: self._last_known_seq_no = stream_writer.last_seqno @@ -495,8 +496,18 @@ async def _connection_loop(self): err_info.sleep_timeout_seconds, ) await asyncio.sleep(err_info.sleep_timeout_seconds) + attempt += 1 - except (asyncio.CancelledError, Exception) as err: + except asyncio.CancelledError: + # CancelledError from close() - exit cleanly + if self._closed: + return + # gRPC wrapper converts gRPC CancelledError to ConnectionLost (retriable). + # In Python 3.11+, external task.cancel() is detected in wrapper and re-raised. + # Any CancelledError reaching here is external cancellation - propagate it. + raise + + except Exception as err: self._stop(err) return finally: diff --git a/ydb/_topic_writer/topic_writer_asyncio_test.py b/ydb/_topic_writer/topic_writer_asyncio_test.py index a616b0b6..3969a615 100644 --- a/ydb/_topic_writer/topic_writer_asyncio_test.py +++ b/ydb/_topic_writer/topic_writer_asyncio_test.py @@ -320,7 +320,7 @@ async def receive(self) -> StreamWriteMessage.WriteResponse: raise Exception("read from closed StreamWriterMock") item = await self.from_server.get() - if isinstance(item, Exception): + if isinstance(item, BaseException): raise item return item @@ -457,6 +457,51 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error( second_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=2)) await reconnector.close(flush=True) + async def test_reconnect_on_connection_lost( + self, + reconnector: WriterAsyncIOReconnector, + get_stream_writer, + ): + """Test that ConnectionLost (from gRPC CancelledError) is treated as retriable.""" + now = datetime.datetime.now(datetime.timezone.utc) + data = "123".encode() + + message1 = PublicMessage( + data=data, + seqno=1, + created_at=now, + ) + message2 = PublicMessage( + data=data, + seqno=2, + created_at=now, + ) + await reconnector.write_with_ack_future([message1, message2]) + + # sent to first stream + stream_writer = get_stream_writer() + + messages = await stream_writer.from_client.get() + assert [InternalMessage(message1)] == messages + messages = await stream_writer.from_client.get() + assert [InternalMessage(message2)] == messages + + # ack first message + stream_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=1)) + + # simulate gRPC connection lost (gRPC wrapper converts CancelledError to ConnectionLost) + stream_writer.from_server.put_nowait(issues.ConnectionLost("gRPC stream cancelled")) + + # writer should reconnect and resend non-acked message + second_writer = get_stream_writer() + second_sent_msg = await asyncio.wait_for(second_writer.from_client.get(), timeout=5) + + expected_messages = [InternalMessage(message2)] + assert second_sent_msg == expected_messages + + second_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=2)) + await reconnector.close(flush=True) + async def test_stop_on_unexpected_exception(self, reconnector: WriterAsyncIOReconnector, get_stream_writer): class TestException(Exception): pass