From 37768b07818f8e5e698c351e63a7696f7f417e0b Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Mon, 13 Jan 2025 16:30:04 -0600 Subject: [PATCH 01/13] Add polling classes for common auto review and downstream patterns --- indico_toolkit/polling/__init__.py | 8 ++ indico_toolkit/polling/autoreview.py | 202 +++++++++++++++++++++++++++ indico_toolkit/polling/downstream.py | 143 +++++++++++++++++++ indico_toolkit/polling/queries.py | 65 +++++++++ 4 files changed, 418 insertions(+) create mode 100644 indico_toolkit/polling/__init__.py create mode 100644 indico_toolkit/polling/autoreview.py create mode 100644 indico_toolkit/polling/downstream.py create mode 100644 indico_toolkit/polling/queries.py diff --git a/indico_toolkit/polling/__init__.py b/indico_toolkit/polling/__init__.py new file mode 100644 index 0000000..10201a4 --- /dev/null +++ b/indico_toolkit/polling/__init__.py @@ -0,0 +1,8 @@ +from .autoreview import AutoReviewed, AutoReviewPoller +from .downstream import DownstreamPoller + +__all__ = ( + "AutoReviewed", + "AutoReviewPoller", + "DownstreamPoller", +) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py new file mode 100644 index 0000000..b81ac61 --- /dev/null +++ b/indico_toolkit/polling/autoreview.py @@ -0,0 +1,202 @@ +import asyncio +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from indico import AsyncIndicoClient, IndicoConfig # type: ignore[import-untyped] +from indico.errors import IndicoError # type: ignore[import-untyped] +from indico.queries import ( # type: ignore[import-untyped] + GetSubmission, + JobStatus, + RetrieveStorageObject, + SubmitReview, +) + +from .. import etloutput, results +from ..etloutput import EtlOutput +from ..results import Document, Result +from ..retry import retry +from .queries import SubmissionIdsPendingAutoReview + +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable, Mapping + from typing import Any, TypeAlias + + SubmissionId: TypeAlias = int + Worker: TypeAlias = asyncio.Task[None] + WorkerQueue: TypeAlias = asyncio.Queue[tuple[SubmissionId, Worker]] + +logger = logging.getLogger(__name__) + + +@dataclass +class AutoReviewed: + changes: "dict[str, Any] | list[dict[str, Any]]" + stp: bool = False + + +class AutoReviewPoller: + """ + Polls for submissions requiring auto review, processes them concurrently, + and submits the review results. + """ + + def __init__( + self, + config: IndicoConfig, + workflow_id: int, + auto_review: "Callable[[Result, Mapping[Document, EtlOutput]], Awaitable[AutoReviewed]]", # noqa: E501 + *, + concurrency: int = 8, + enqueue_delay: float = 1, + poll_delay: float = 30, + load_etl_output: bool = True, + load_text: bool = True, + load_tokens: bool = True, + load_tables: bool = False, + retry_count: int = 4, + retry_wait: float = 1, + retry_backoff: float = 4, + retry_jitter: float = 0.5, + ): + self._config = config + self._workflow_id = workflow_id + self._concurrency = concurrency + self._auto_review = auto_review + self._enqueue_delay = enqueue_delay + self._poll_delay = poll_delay + self._load_etl_output = load_etl_output + self._load_text = load_text + self._load_tokens = load_tokens + self._load_tables = load_tables + + self._retry = retry( + IndicoError, + retry_count, + retry_wait, + retry_backoff, + retry_jitter, + ) + self._worker_slots = asyncio.Semaphore(concurrency) + self._worker_queue: WorkerQueue = asyncio.Queue(1) + self._processing_submission_ids: set[SubmissionId] = set() + + async def run(self) -> None: + logger.info( + "Starting auto review poller for: " + f"host={self._config.host} " + f"workflow_id={self._workflow_id}" + f"concurrency={self._concurrency}" + ) + + async with AsyncIndicoClient(self._config) as client: + self._client_call = self._retry(client.call) + await asyncio.gather( + self._spawn_workers(), + *(self._reap_workers() for _ in range(self._concurrency)), + ) + + async def _retrieve_storage_object(self, url: str) -> object: + return await self._client_call(RetrieveStorageObject(url)) + + async def _spawn_workers(self) -> None: + """ + Poll for submissions pending auto review and spawn workers to process them. + `self._worker_slots` limits the number of workers that can run concurrently. + Submission IDs in progress are tracked with `self._processing_submission_ids`. + """ + logger.info( + f"Polling submissions pending auto review every {self._poll_delay} seconds" + ) + + while True: + try: + submission_ids: set[int] = await self._client_call( + SubmissionIdsPendingAutoReview(self._workflow_id) + ) + except Exception: + logger.exception("Error occurred while polling submissions") + await asyncio.sleep(self._poll_delay) + continue + + submission_ids -= self._processing_submission_ids + + if not submission_ids: + await asyncio.sleep(self._poll_delay) + continue + + for submission_id in submission_ids: + await self._worker_slots.acquire() + logger.info(f"Spawning worker for {submission_id=}") + self._processing_submission_ids.add(submission_id) + worker = asyncio.create_task(self._worker(submission_id)) + await self._worker_queue.put((submission_id, worker)) + await asyncio.sleep(self._enqueue_delay) + + async def _worker(self, submission_id: SubmissionId) -> None: + """ + Process a single submission by retrieving submission metadata, the result file, + etl output, calling `self._auto_review`, and submitting changes. + """ + logger.info(f"Retrieving metadata for {submission_id=}") + submission = await self._client_call(GetSubmission(submission_id)) + + logger.info(f"Retrieving results for {submission_id=}") + result = await results.load_async( + submission.result_file, + reader=self._retrieve_storage_object, + ) + + if self._load_etl_output: + logger.info(f"Retrieving etl output for {submission_id=}") + etl_outputs = { + document: await etloutput.load_async( + document.etl_output_url, + reader=self._retrieve_storage_object, + text=self._load_text, + tokens=self._load_tokens, + tables=self._load_tables, + ) + for document in result.documents + } + else: + logger.info(f"Skipping etl output for {submission_id=}") + etl_outputs = {} + + logger.info(f"Applying auto review for {submission_id=}") + auto_reviewed = await self._auto_review(result, etl_outputs) + + logger.info(f"Submitting auto review for {submission_id=}") + job = await self._client_call( + SubmitReview( + submission_id, + changes=auto_reviewed.changes, + force_complete=auto_reviewed.stp, + ) + ) + job = await self._client_call(JobStatus(job.id)) + + if job.status == "SUCCESS": + logger.info(f"Completed auto review of {submission_id=}") + else: + logger.error( + f"Submit failed for {submission_id=}: " + f"{job.status=!r} {job.result=!r}" + ) + + async def _reap_workers(self) -> None: + """ + Reap completed workers, releasing their slots for new tasks. Log errors for + submissions that failed to process. Remove their submission IDs from + `self._processing_submission_ids` to be retried. + """ + while True: + submission_id, worker = await self._worker_queue.get() + + try: + await worker + except Exception: + logger.exception(f"Error occurred while processing {submission_id=}") + + self._processing_submission_ids.remove(submission_id) + self._worker_slots.release() diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py new file mode 100644 index 0000000..9358dcd --- /dev/null +++ b/indico_toolkit/polling/downstream.py @@ -0,0 +1,143 @@ +import asyncio +import logging +from typing import TYPE_CHECKING + +from indico import AsyncIndicoClient, IndicoConfig # type: ignore[import-untyped] +from indico.errors import IndicoError # type: ignore[import-untyped] +from indico.queries import ( # type: ignore[import-untyped] + GetSubmission, + UpdateSubmission, +) +from indico.types import Submission # type: ignore[import-untyped] + +from ..retry import retry +from .queries import SubmissionIdsPendingDownstream + +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + from typing import TypeAlias + + SubmissionId: TypeAlias = int + Worker: TypeAlias = asyncio.Task[None] + WorkerQueue: TypeAlias = asyncio.Queue[tuple[SubmissionId, Worker]] + +logger = logging.getLogger(__name__) + + +class DownstreamPoller: + """ + Polls for submissions pending downstream egestion and processes them concurrently. + """ + + def __init__( + self, + config: IndicoConfig, + workflow_id: int, + downstream: "Callable[[Submission], Awaitable[None]]", + *, + concurrency: int = 8, + enqueue_delay: float = 1, + poll_delay: float = 30, + retry_count: int = 4, + retry_wait: float = 1, + retry_backoff: float = 4, + retry_jitter: float = 0.5, + ): + self._config = config + self._workflow_id = workflow_id + self._concurrency = concurrency + self._downstream = downstream + self._enqueue_delay = enqueue_delay + self._poll_delay = poll_delay + + self._retry = retry( + IndicoError, + retry_count, + retry_wait, + retry_backoff, + retry_jitter, + ) + self._worker_slots = asyncio.Semaphore(concurrency) + self._worker_queue: WorkerQueue = asyncio.Queue(1) + self._processing_submission_ids: set[SubmissionId] = set() + + async def run(self) -> None: + logger.info( + "Starting downstream poller for: " + f"host={self._config.host} " + f"workflow_id={self._workflow_id}" + f"concurrency={self._concurrency}" + ) + + async with AsyncIndicoClient(self._config) as client: + self._client_call = self._retry(client.call) + await asyncio.gather( + self._spawn_workers(), + *(self._reap_workers() for _ in range(self._concurrency)), + ) + + async def _spawn_workers(self) -> None: + """ + Poll for completed and failed submissions and spawn workers to send them + downstream. `self._worker_slots` limits the number of workers that can run + concurrently. Submission IDs in progress are tracked with + `self._processing_submission_ids`. + """ + logger.info( + f"Polling submissions pending downstream every {self._poll_delay} seconds" + ) + + while True: + try: + submission_ids: set[int] = await self._client_call( + SubmissionIdsPendingDownstream(self._workflow_id) + ) + except Exception: + logger.exception("Error occurred while polling submissions") + await asyncio.sleep(self._poll_delay) + continue + + submission_ids -= self._processing_submission_ids + + if not submission_ids: + await asyncio.sleep(self._poll_delay) + continue + + for submission_id in submission_ids: + await self._worker_slots.acquire() + logger.info(f"Spawning worker for {submission_id=}") + self._processing_submission_ids.add(submission_id) + worker = asyncio.create_task(self._worker(submission_id)) + await self._worker_queue.put((submission_id, worker)) + await asyncio.sleep(self._enqueue_delay) + + async def _worker(self, submission_id: SubmissionId) -> None: + """ + Process a single submission by retrieving submission metadata and calling + `self._downstream`. Once completed, mark the submission retrieved. + """ + logger.info(f"Retrieving metadata for {submission_id=}") + submission = await self._client_call(GetSubmission(submission_id)) + + logger.info(f"Sending {submission_id=} downstream") + await self._downstream(submission) + + logger.info(f"Marking {submission_id=} retrieved") + await self._client_call(UpdateSubmission(submission_id, retrieved=True)) + + async def _reap_workers(self) -> None: + """ + Reap completed workers, releasing their slots for new tasks. Log errors for + submissions that failed to process. Remove their submission IDs from + `self._processing_submission_ids` to be retried. + """ + while True: + submission_id, worker = await self._worker_queue.get() + + try: + await worker + except Exception: + logger.exception(f"Error occurred while processing {submission_id=}") + + self._processing_submission_ids.remove(submission_id) + self._worker_slots.release() diff --git a/indico_toolkit/polling/queries.py b/indico_toolkit/polling/queries.py new file mode 100644 index 0000000..fe0b4fc --- /dev/null +++ b/indico_toolkit/polling/queries.py @@ -0,0 +1,65 @@ +from typing import Any + +from indico.queries import GraphQLRequest # type: ignore[import-untyped] + + +class SubmissionIdsPendingAutoReview(GraphQLRequest): # type: ignore[misc] + QUERY = """ + query SubmissionIdsPendingAutoReview($workflowIds: [Int]) { + submissions( + desc: false + filters: { status: PENDING_AUTO_REVIEW } + limit: 1000 + orderBy: ID + workflowIds: $workflowIds + ) { + submissions { + id + } + } + } + """ + + def __init__(self, workflow_id: int): + super().__init__(self.QUERY, {"workflowIds": [workflow_id]}) + + def process_response(self, response: Any) -> set[int]: + response = super().process_response(response) + return { + submission["id"] for submission in response["submissions"]["submissions"] + } + + +class SubmissionIdsPendingDownstream(GraphQLRequest): # type: ignore[misc] + QUERY = """ + query SubmissionIdsPendingDownstream($workflowIds: [Int]) { + submissions( + desc: false + filters: { + AND: { + retrieved: false + OR: [ + { status: COMPLETE } + { status: FAILED } + ] + } + } + limit: 1000 + orderBy: ID + workflowIds: $workflowIds + ) { + submissions { + id + } + } + } + """ + + def __init__(self, workflow_id: int): + super().__init__(self.QUERY, {"workflowIds": [workflow_id]}) + + def process_response(self, response: Any) -> set[int]: + response = super().process_response(response) + return { + submission["id"] for submission in response["submissions"]["submissions"] + } From 30c1990a843f6a777713c112dfd3127b668be22d Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Mon, 13 Jan 2025 16:41:25 -0600 Subject: [PATCH 02/13] Bump required `indico-client` version to include `AsyncIndicoClient` --- pyproject.toml | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2919a7a..cbb7c57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ home-page = "https://github.com/IndicoDataSolutions/Indico-Solutions-Toolkit" classifiers = [ "License :: OSI Approved :: MIT License",] description-file = "README.md" requires = [ - "indico-client>=5.1.4", + "indico-client>=6.1.0", "plotly==5.2.1", "tqdm==4.50.0", "faker==13.3.3", diff --git a/requirements.txt b/requirements.txt index 0e8b39a..f2cb2f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -indico-client>=5.1.4 +indico-client>=6.1.0 python-dateutil==2.8.1 pytz==2021.1 pytest==8.3.4 From ff889e75bfdd4c6f0aab1eaf2a07e4fca3507d90 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 14 Jan 2025 08:25:52 -0600 Subject: [PATCH 03/13] Change `auto_review` callable type signature to be more flexible --- indico_toolkit/polling/autoreview.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py index b81ac61..6e08f1f 100644 --- a/indico_toolkit/polling/autoreview.py +++ b/indico_toolkit/polling/autoreview.py @@ -19,7 +19,7 @@ from .queries import SubmissionIdsPendingAutoReview if TYPE_CHECKING: - from collections.abc import Awaitable, Callable, Mapping + from collections.abc import Awaitable, Callable from typing import Any, TypeAlias SubmissionId: TypeAlias = int @@ -45,7 +45,7 @@ def __init__( self, config: IndicoConfig, workflow_id: int, - auto_review: "Callable[[Result, Mapping[Document, EtlOutput]], Awaitable[AutoReviewed]]", # noqa: E501 + auto_review: "Callable[[Result, dict[Document, EtlOutput]], Awaitable[AutoReviewed]]", # noqa: E501 *, concurrency: int = 8, enqueue_delay: float = 1, From 03376075780863f282cdae452ef38cf8745a51fa Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 14 Jan 2025 08:26:28 -0600 Subject: [PATCH 04/13] Rename `run()` to `poll_forever()` --- indico_toolkit/polling/autoreview.py | 4 ++-- indico_toolkit/polling/downstream.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py index 6e08f1f..64464f8 100644 --- a/indico_toolkit/polling/autoreview.py +++ b/indico_toolkit/polling/autoreview.py @@ -20,7 +20,7 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable - from typing import Any, TypeAlias + from typing import Any, NoReturn, TypeAlias SubmissionId: TypeAlias = int Worker: TypeAlias = asyncio.Task[None] @@ -81,7 +81,7 @@ def __init__( self._worker_queue: WorkerQueue = asyncio.Queue(1) self._processing_submission_ids: set[SubmissionId] = set() - async def run(self) -> None: + async def poll_forever(self) -> "NoReturn": # type: ignore[misc] logger.info( "Starting auto review poller for: " f"host={self._config.host} " diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py index 9358dcd..4a1fbc8 100644 --- a/indico_toolkit/polling/downstream.py +++ b/indico_toolkit/polling/downstream.py @@ -15,7 +15,7 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable - from typing import TypeAlias + from typing import NoReturn, TypeAlias SubmissionId: TypeAlias = int Worker: TypeAlias = asyncio.Task[None] @@ -61,7 +61,7 @@ def __init__( self._worker_queue: WorkerQueue = asyncio.Queue(1) self._processing_submission_ids: set[SubmissionId] = set() - async def run(self) -> None: + async def poll_forever(self) -> "NoReturn": # type: ignore[misc] logger.info( "Starting downstream poller for: " f"host={self._config.host} " From 5c8a6705a1ea2934ec410fe22a84601a0cf224a4 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 14 Jan 2025 08:26:44 -0600 Subject: [PATCH 05/13] Fix argument names for `retry()` --- indico_toolkit/polling/autoreview.py | 8 ++++---- indico_toolkit/polling/downstream.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py index 64464f8..0a4be2f 100644 --- a/indico_toolkit/polling/autoreview.py +++ b/indico_toolkit/polling/autoreview.py @@ -72,10 +72,10 @@ def __init__( self._retry = retry( IndicoError, - retry_count, - retry_wait, - retry_backoff, - retry_jitter, + count=retry_count, + wait=retry_wait, + backoff=retry_backoff, + jitter=retry_jitter, ) self._worker_slots = asyncio.Semaphore(concurrency) self._worker_queue: WorkerQueue = asyncio.Queue(1) diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py index 4a1fbc8..96ea961 100644 --- a/indico_toolkit/polling/downstream.py +++ b/indico_toolkit/polling/downstream.py @@ -52,10 +52,10 @@ def __init__( self._retry = retry( IndicoError, - retry_count, - retry_wait, - retry_backoff, - retry_jitter, + count=retry_count, + wait=retry_wait, + backoff=retry_backoff, + jitter=retry_jitter, ) self._worker_slots = asyncio.Semaphore(concurrency) self._worker_queue: WorkerQueue = asyncio.Queue(1) From f7662e1c61a0c5bae1029460ac5ea8cd48112048 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 14 Jan 2025 09:23:41 -0600 Subject: [PATCH 06/13] Use better argument names for worker count and spawn rate --- indico_toolkit/polling/autoreview.py | 22 +++++++++++----------- indico_toolkit/polling/downstream.py | 18 +++++++++--------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py index 0a4be2f..8cbae5f 100644 --- a/indico_toolkit/polling/autoreview.py +++ b/indico_toolkit/polling/autoreview.py @@ -37,8 +37,8 @@ class AutoReviewed: class AutoReviewPoller: """ - Polls for submissions requiring auto review, processes them concurrently, - and submits the review results. + Polls for submissions requiring auto review, processes them, + and submits the review results concurrently. """ def __init__( @@ -47,8 +47,8 @@ def __init__( workflow_id: int, auto_review: "Callable[[Result, dict[Document, EtlOutput]], Awaitable[AutoReviewed]]", # noqa: E501 *, - concurrency: int = 8, - enqueue_delay: float = 1, + worker_count: int = 8, + spawn_rate: float = 1, poll_delay: float = 30, load_etl_output: bool = True, load_text: bool = True, @@ -61,9 +61,9 @@ def __init__( ): self._config = config self._workflow_id = workflow_id - self._concurrency = concurrency self._auto_review = auto_review - self._enqueue_delay = enqueue_delay + self._worker_count = worker_count + self._spawn_rate = spawn_rate self._poll_delay = poll_delay self._load_etl_output = load_etl_output self._load_text = load_text @@ -77,7 +77,7 @@ def __init__( backoff=retry_backoff, jitter=retry_jitter, ) - self._worker_slots = asyncio.Semaphore(concurrency) + self._worker_slots = asyncio.Semaphore(worker_count) self._worker_queue: WorkerQueue = asyncio.Queue(1) self._processing_submission_ids: set[SubmissionId] = set() @@ -85,15 +85,15 @@ async def poll_forever(self) -> "NoReturn": # type: ignore[misc] logger.info( "Starting auto review poller for: " f"host={self._config.host} " - f"workflow_id={self._workflow_id}" - f"concurrency={self._concurrency}" + f"workflow_id={self._workflow_id} " + f"worker_count={self._worker_count}" ) async with AsyncIndicoClient(self._config) as client: self._client_call = self._retry(client.call) await asyncio.gather( self._spawn_workers(), - *(self._reap_workers() for _ in range(self._concurrency)), + *(self._reap_workers() for _ in range(self._worker_count)), ) async def _retrieve_storage_object(self, url: str) -> object: @@ -131,7 +131,7 @@ async def _spawn_workers(self) -> None: self._processing_submission_ids.add(submission_id) worker = asyncio.create_task(self._worker(submission_id)) await self._worker_queue.put((submission_id, worker)) - await asyncio.sleep(self._enqueue_delay) + await asyncio.sleep(1 / self._spawn_rate) async def _worker(self, submission_id: SubmissionId) -> None: """ diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py index 96ea961..2260e11 100644 --- a/indico_toolkit/polling/downstream.py +++ b/indico_toolkit/polling/downstream.py @@ -35,8 +35,8 @@ def __init__( workflow_id: int, downstream: "Callable[[Submission], Awaitable[None]]", *, - concurrency: int = 8, - enqueue_delay: float = 1, + worker_count: int = 8, + spawn_rate: float = 1, poll_delay: float = 30, retry_count: int = 4, retry_wait: float = 1, @@ -45,9 +45,9 @@ def __init__( ): self._config = config self._workflow_id = workflow_id - self._concurrency = concurrency self._downstream = downstream - self._enqueue_delay = enqueue_delay + self._worker_count = worker_count + self._spawn_rate = spawn_rate self._poll_delay = poll_delay self._retry = retry( @@ -57,7 +57,7 @@ def __init__( backoff=retry_backoff, jitter=retry_jitter, ) - self._worker_slots = asyncio.Semaphore(concurrency) + self._worker_slots = asyncio.Semaphore(worker_count) self._worker_queue: WorkerQueue = asyncio.Queue(1) self._processing_submission_ids: set[SubmissionId] = set() @@ -65,15 +65,15 @@ async def poll_forever(self) -> "NoReturn": # type: ignore[misc] logger.info( "Starting downstream poller for: " f"host={self._config.host} " - f"workflow_id={self._workflow_id}" - f"concurrency={self._concurrency}" + f"workflow_id={self._workflow_id} " + f"worker_count={self._worker_count}" ) async with AsyncIndicoClient(self._config) as client: self._client_call = self._retry(client.call) await asyncio.gather( self._spawn_workers(), - *(self._reap_workers() for _ in range(self._concurrency)), + *(self._reap_workers() for _ in range(self._worker_count)), ) async def _spawn_workers(self) -> None: @@ -109,7 +109,7 @@ async def _spawn_workers(self) -> None: self._processing_submission_ids.add(submission_id) worker = asyncio.create_task(self._worker(submission_id)) await self._worker_queue.put((submission_id, worker)) - await asyncio.sleep(self._enqueue_delay) + await asyncio.sleep(1 / self._spawn_rate) async def _worker(self, submission_id: SubmissionId) -> None: """ From 1c1026f525039e5f978fd4be76c8c40c9f95e5fe Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 14 Jan 2025 09:25:42 -0600 Subject: [PATCH 07/13] Add example script for `AutoReviewPoller` --- examples/poll_auto_review.py | 56 ++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 examples/poll_auto_review.py diff --git a/examples/poll_auto_review.py b/examples/poll_auto_review.py new file mode 100644 index 0000000..8b90c09 --- /dev/null +++ b/examples/poll_auto_review.py @@ -0,0 +1,56 @@ +""" +A feature-complete auto review polling script that incorporates asyncio, +automatic retry, result file dataclasses, and etl output dataclasses. + +Max workers, spawn rate, etl output loading, and retry behavior are all configurable. +See the `AutoReviewPoller` class definition. +""" + +import asyncio +import sys + +from indico import IndicoConfig + +from indico_toolkit.polling import AutoReviewPoller, AutoReviewed +from indico_toolkit.etloutput import EtlOutput +from indico_toolkit.results import Result, Document + + +async def auto_review( + result: Result, etl_outputs: dict[Document, EtlOutput] +) -> AutoReviewed: + """ + Apply auto review rules to predictions and determine straight through processing. + Any IO performed (network requests, file reads/writes, etc) must be awaited to + avoid blocking the asyncio loop that schedules this coroutine. + """ + predictions = result.pre_review + + # Apply auto review rules. + + return AutoReviewed( + changes=predictions.to_changes(result), + stp=False, # Defaults to `False` and can be omitted. + ) + + +if __name__ == "__main__": + import logging + + logging.basicConfig( + format=( + r"[%(asctime)s] " + r"%(name)s:%(funcName)s():%(lineno)s: " + r"%(levelname)s %(message)s" + ), + level=logging.INFO, + force=True, + ) + workflow_id = int(sys.argv[1]) + asyncio.run( + AutoReviewPoller( + IndicoConfig(), # Host and token read from environment variables. + workflow_id, + auto_review, + ).poll_forever() + ) From f26e5fae6a718ce8330b001ca81723834fa21b13 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 14 Jan 2025 10:51:20 -0600 Subject: [PATCH 08/13] Improve `DownstreamPoller` docstring --- indico_toolkit/polling/downstream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py index 2260e11..ab062ec 100644 --- a/indico_toolkit/polling/downstream.py +++ b/indico_toolkit/polling/downstream.py @@ -26,7 +26,8 @@ class DownstreamPoller: """ - Polls for submissions pending downstream egestion and processes them concurrently. + Polls for completed and failed submissions pending downstream egestion, processes + them concurrently, and marks them as retrieved. """ def __init__( From 2b2872a5193437d360e17f3d5b2607e16a3bf871 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Wed, 15 Jan 2025 14:13:03 -0600 Subject: [PATCH 09/13] Update auto review polling example --- examples/poll_auto_review.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/poll_auto_review.py b/examples/poll_auto_review.py index 8b90c09..acd754c 100644 --- a/examples/poll_auto_review.py +++ b/examples/poll_auto_review.py @@ -30,7 +30,7 @@ async def auto_review( return AutoReviewed( changes=predictions.to_changes(result), - stp=False, # Defaults to `False` and can be omitted. + stp=True, # Defaults to `False` and may be omitted. ) @@ -49,7 +49,10 @@ async def auto_review( workflow_id = int(sys.argv[1]) asyncio.run( AutoReviewPoller( - IndicoConfig(), # Host and token read from environment variables. + IndicoConfig( + host="try.indico.io", + api_token_path="indico_api_token.txt", + ), workflow_id, auto_review, ).poll_forever() From 19d1a6c2bc58377bd57852149f8d3745b9d7aca5 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Wed, 15 Jan 2025 14:18:22 -0600 Subject: [PATCH 10/13] Make types hints Python 3.9 compatible --- indico_toolkit/polling/autoreview.py | 8 ++++---- indico_toolkit/polling/downstream.py | 8 ++++---- indico_toolkit/polling/queries.py | 9 ++++++--- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py index 8cbae5f..0ff6d7d 100644 --- a/indico_toolkit/polling/autoreview.py +++ b/indico_toolkit/polling/autoreview.py @@ -78,8 +78,8 @@ def __init__( jitter=retry_jitter, ) self._worker_slots = asyncio.Semaphore(worker_count) - self._worker_queue: WorkerQueue = asyncio.Queue(1) - self._processing_submission_ids: set[SubmissionId] = set() + self._worker_queue: "WorkerQueue" = asyncio.Queue(1) + self._processing_submission_ids: "set[SubmissionId]" = set() async def poll_forever(self) -> "NoReturn": # type: ignore[misc] logger.info( @@ -111,7 +111,7 @@ async def _spawn_workers(self) -> None: while True: try: - submission_ids: set[int] = await self._client_call( + submission_ids: "set[SubmissionId]" = await self._client_call( SubmissionIdsPendingAutoReview(self._workflow_id) ) except Exception: @@ -133,7 +133,7 @@ async def _spawn_workers(self) -> None: await self._worker_queue.put((submission_id, worker)) await asyncio.sleep(1 / self._spawn_rate) - async def _worker(self, submission_id: SubmissionId) -> None: + async def _worker(self, submission_id: "SubmissionId") -> None: """ Process a single submission by retrieving submission metadata, the result file, etl output, calling `self._auto_review`, and submitting changes. diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py index ab062ec..b70ac10 100644 --- a/indico_toolkit/polling/downstream.py +++ b/indico_toolkit/polling/downstream.py @@ -59,8 +59,8 @@ def __init__( jitter=retry_jitter, ) self._worker_slots = asyncio.Semaphore(worker_count) - self._worker_queue: WorkerQueue = asyncio.Queue(1) - self._processing_submission_ids: set[SubmissionId] = set() + self._worker_queue: "WorkerQueue" = asyncio.Queue(1) + self._processing_submission_ids: "set[SubmissionId]" = set() async def poll_forever(self) -> "NoReturn": # type: ignore[misc] logger.info( @@ -90,7 +90,7 @@ async def _spawn_workers(self) -> None: while True: try: - submission_ids: set[int] = await self._client_call( + submission_ids: "set[SubmissionId]" = await self._client_call( SubmissionIdsPendingDownstream(self._workflow_id) ) except Exception: @@ -112,7 +112,7 @@ async def _spawn_workers(self) -> None: await self._worker_queue.put((submission_id, worker)) await asyncio.sleep(1 / self._spawn_rate) - async def _worker(self, submission_id: SubmissionId) -> None: + async def _worker(self, submission_id: "SubmissionId") -> None: """ Process a single submission by retrieving submission metadata and calling `self._downstream`. Once completed, mark the submission retrieved. diff --git a/indico_toolkit/polling/queries.py b/indico_toolkit/polling/queries.py index fe0b4fc..bd629f5 100644 --- a/indico_toolkit/polling/queries.py +++ b/indico_toolkit/polling/queries.py @@ -1,7 +1,10 @@ -from typing import Any +from typing import TYPE_CHECKING from indico.queries import GraphQLRequest # type: ignore[import-untyped] +if TYPE_CHECKING: + from typing import Any + class SubmissionIdsPendingAutoReview(GraphQLRequest): # type: ignore[misc] QUERY = """ @@ -23,7 +26,7 @@ class SubmissionIdsPendingAutoReview(GraphQLRequest): # type: ignore[misc] def __init__(self, workflow_id: int): super().__init__(self.QUERY, {"workflowIds": [workflow_id]}) - def process_response(self, response: Any) -> set[int]: + def process_response(self, response: "Any") -> set[int]: response = super().process_response(response) return { submission["id"] for submission in response["submissions"]["submissions"] @@ -58,7 +61,7 @@ class SubmissionIdsPendingDownstream(GraphQLRequest): # type: ignore[misc] def __init__(self, workflow_id: int): super().__init__(self.QUERY, {"workflowIds": [workflow_id]}) - def process_response(self, response: Any) -> set[int]: + def process_response(self, response: "Any") -> set[int]: response = super().process_response(response) return { submission["id"] for submission in response["submissions"]["submissions"] From 4cf48f3f05a4975837ee2c52ae8e5b2fd8c9f647 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Wed, 15 Jan 2025 14:55:01 -0600 Subject: [PATCH 11/13] Catch all exceptions raised by a client call --- indico_toolkit/polling/autoreview.py | 3 +-- indico_toolkit/polling/downstream.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py index 0ff6d7d..856c4eb 100644 --- a/indico_toolkit/polling/autoreview.py +++ b/indico_toolkit/polling/autoreview.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING from indico import AsyncIndicoClient, IndicoConfig # type: ignore[import-untyped] -from indico.errors import IndicoError # type: ignore[import-untyped] from indico.queries import ( # type: ignore[import-untyped] GetSubmission, JobStatus, @@ -71,7 +70,7 @@ def __init__( self._load_tables = load_tables self._retry = retry( - IndicoError, + Exception, count=retry_count, wait=retry_wait, backoff=retry_backoff, diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py index b70ac10..537824e 100644 --- a/indico_toolkit/polling/downstream.py +++ b/indico_toolkit/polling/downstream.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING from indico import AsyncIndicoClient, IndicoConfig # type: ignore[import-untyped] -from indico.errors import IndicoError # type: ignore[import-untyped] from indico.queries import ( # type: ignore[import-untyped] GetSubmission, UpdateSubmission, @@ -52,7 +51,7 @@ def __init__( self._poll_delay = poll_delay self._retry = retry( - IndicoError, + Exception, count=retry_count, wait=retry_wait, backoff=retry_backoff, From db6081f2daf31276121e388e064e0996c0575083 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Wed, 15 Jan 2025 14:55:44 -0600 Subject: [PATCH 12/13] Log when downstream completes --- indico_toolkit/polling/downstream.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indico_toolkit/polling/downstream.py b/indico_toolkit/polling/downstream.py index 537824e..b22b7f1 100644 --- a/indico_toolkit/polling/downstream.py +++ b/indico_toolkit/polling/downstream.py @@ -125,6 +125,8 @@ async def _worker(self, submission_id: "SubmissionId") -> None: logger.info(f"Marking {submission_id=} retrieved") await self._client_call(UpdateSubmission(submission_id, retrieved=True)) + logger.info(f"Completed dowstream of {submission_id=}") + async def _reap_workers(self) -> None: """ Reap completed workers, releasing their slots for new tasks. Log errors for From 6243fe59a872d03304f07e2d0dc518825688915d Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 21 Jan 2025 13:01:19 -0600 Subject: [PATCH 13/13] Add the ability to reject submissions with `AutoReviewPoller` --- examples/poll_auto_review.py | 3 ++- indico_toolkit/polling/autoreview.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/poll_auto_review.py b/examples/poll_auto_review.py index acd754c..8a22599 100644 --- a/examples/poll_auto_review.py +++ b/examples/poll_auto_review.py @@ -30,7 +30,8 @@ async def auto_review( return AutoReviewed( changes=predictions.to_changes(result), - stp=True, # Defaults to `False` and may be omitted. + reject=False, # Defaults to `False` and may be omitted. + stp=False, # Defaults to `False` and may be omitted. ) diff --git a/indico_toolkit/polling/autoreview.py b/indico_toolkit/polling/autoreview.py index 856c4eb..c93fa9a 100644 --- a/indico_toolkit/polling/autoreview.py +++ b/indico_toolkit/polling/autoreview.py @@ -31,6 +31,7 @@ @dataclass class AutoReviewed: changes: "dict[str, Any] | list[dict[str, Any]]" + reject: bool = False stp: bool = False @@ -170,6 +171,7 @@ async def _worker(self, submission_id: "SubmissionId") -> None: SubmitReview( submission_id, changes=auto_reviewed.changes, + rejected=auto_reviewed.reject, force_complete=auto_reviewed.stp, ) )