diff --git a/examples/coordination/example.py b/examples/coordination/example.py index 0b817ff3..42f89fe6 100644 --- a/examples/coordination/example.py +++ b/examples/coordination/example.py @@ -43,7 +43,7 @@ def run(endpoint, database): threads = [] for i in range(4): - worker_name = f"worker {i+1}" + worker_name = f"worker {i + 1}" if i < 2: thread = threading.Thread(target=linear_workload, args=(driver.coordination_client, worker_name)) else: diff --git a/examples/topic/writer_example.py b/examples/topic/writer_example.py index b122e78c..99346a27 100644 --- a/examples/topic/writer_example.py +++ b/examples/topic/writer_example.py @@ -1,7 +1,7 @@ import concurrent.futures import datetime from typing import Dict, List -from concurrent.futures import Future, wait +from concurrent.futures import Future, wait # noqa: F401 import ydb from ydb import TopicWriterMessage diff --git a/test-requirements.txt b/test-requirements.txt index 2711066d..a5b65963 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -38,7 +38,7 @@ zipp==3.19.1 aiohttp>=3.9.0 pytest-pep8 pytest-flake8 -flake8==3.9.2 +flake8==6.1.0 sqlalchemy==1.4.26 pylint-protobuf cython diff --git a/tests/iam/test_auth.py b/tests/iam/test_auth.py index 54117b53..09a1c03a 100644 --- a/tests/iam/test_auth.py +++ b/tests/iam/test_auth.py @@ -14,7 +14,7 @@ def __init__(self, account_id, key_id, private_key, iam_endpoint=None, iam_chann self.iam_channel_credentials = iam_channel_credentials def __eq__(self, other): - return self.__dict__ == other.__dict__ if type(self) == type(other) else False + return self.__dict__ == other.__dict__ if type(self) is type(other) else False @patch("builtins.open", new_callable=mock_open, read_data=CONTENT1) diff --git a/tests/scheme/scheme_test.py b/tests/scheme/scheme_test.py index 4f93c8ea..1a105620 100644 --- a/tests/scheme/scheme_test.py +++ b/tests/scheme/scheme_test.py @@ -1,4 +1,4 @@ -import typing +import typing # noqa: F401 import pytest import ydb diff --git a/tests/test_errors.py b/tests/test_errors.py index 93f6f4a3..bdecebef 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -11,7 +11,7 @@ def test_scheme_error(driver_sync, database): server_code = ydb.issues.StatusCode.SCHEME_ERROR - assert type(exc.value) == ydb.issues.SchemeError + assert type(exc.value) is ydb.issues.SchemeError assert exc.value.status == server_code assert f"server_code: {server_code}" in str(exc.value) assert "Path does not exist" in str(exc.value) diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index aed552ab..df729e96 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -1,7 +1,7 @@ from __future__ import annotations import asyncio -from typing import List +from typing import List # noqa: F401 import pytest diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index 68ddcf8d..678f5ff6 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -36,8 +36,6 @@ def wrapper(rpc_state, response_pb, driver=None): def _get_shared_event_loop() -> asyncio.AbstractEventLoop: - global _shared_event_loop - if _shared_event_loop is not None: return _shared_event_loop @@ -45,12 +43,19 @@ def _get_shared_event_loop() -> asyncio.AbstractEventLoop: if _shared_event_loop is not None: return _shared_event_loop - event_loop_set_done: concurrent.futures.Future[asyncio.AbstractEventLoop] = concurrent.futures.Future() + loop_ready: threading.Event = threading.Event() def start_event_loop(): event_loop = asyncio.new_event_loop() - event_loop_set_done.set_result(event_loop) asyncio.set_event_loop(event_loop) + + def on_loop_started(): + # Set global only when loop is actually running + global _shared_event_loop + _shared_event_loop = event_loop + loop_ready.set() + + event_loop.call_soon(on_loop_started) event_loop.run_forever() t = threading.Thread( @@ -60,7 +65,11 @@ def start_event_loop(): ) t.start() - _shared_event_loop = event_loop_set_done.result() + loop_ready.wait() + + if _shared_event_loop is None: + raise RuntimeError("Event loop was not properly initialized") + return _shared_event_loop diff --git a/ydb/retries.py b/ydb/retries.py index 0b3bec52..c151e3d2 100644 --- a/ydb/retries.py +++ b/ydb/retries.py @@ -77,7 +77,7 @@ def __init__(self, timeout: float) -> None: def __eq__(self, other: object) -> bool: return ( - type(self) == type(other) and isinstance(other, YdbRetryOperationSleepOpt) and self.timeout == other.timeout + type(self) is type(other) and isinstance(other, YdbRetryOperationSleepOpt) and self.timeout == other.timeout ) def __repr__(self) -> str: @@ -91,7 +91,7 @@ def __init__(self, result: Any) -> None: def __eq__(self, other: object) -> bool: return ( - type(self) == type(other) + type(self) is type(other) and isinstance(other, YdbRetryOperationFinalResult) and self.result == other.result and self.exc == other.exc diff --git a/ydb/table_test.py b/ydb/table_test.py index d5d86e05..b365fda1 100644 --- a/ydb/table_test.py +++ b/ydb/table_test.py @@ -14,7 +14,7 @@ def test_retry_operation_impl(monkeypatch): monkeypatch.setattr( issues.Error, "__eq__", - lambda self, other: type(self) == type(other) and self.message == other.message, + lambda self, other: type(self) is type(other) and self.message == other.message, ) retry_once_settings = RetrySettings( @@ -43,7 +43,7 @@ def __init__(self, message): self.message = message def __eq__(self, other): - return type(self) == type(other) and self.message == other.message + return type(self) is type(other) and self.message == other.message def check_unretriable_error(err_type, call_ydb_handler): retry_once_settings.on_ydb_error_callback.reset_mock()