diff --git a/examples/general/pagination_helper.py b/examples/general/pagination_helper.py new file mode 100644 index 00000000..1dfe9866 --- /dev/null +++ b/examples/general/pagination_helper.py @@ -0,0 +1,55 @@ +"""Example demonstrating the paginate helper for iterating through paginated API results. + +This example shows how to use the paginate() helper function to automatically +handle continuation tokens when fetching all results from a paginated API. +""" + +from nisystemlink.clients.core.helpers import paginate +from nisystemlink.clients.testmonitor import TestMonitorClient +from nisystemlink.clients.testmonitor.models import Result + +# Server configuration is not required when used with SystemLink Client or run through Jupyter on SystemLink +server_configuration = None + +# # Example of setting up the server configuration to point to your instance of SystemLink Enterprise +# from nisystemlink.clients.core import HttpConfiguration +# server_configuration = HttpConfiguration( +# server_uri="https://yourserver.yourcompany.com", +# api_key="YourAPIKeyGeneratedFromSystemLink", +# ) + +client = TestMonitorClient(configuration=server_configuration) + +# Example 1: Basic usage - iterate through all results automatically +print("Example 1: Iterating through all results") +print("-" * 50) +result: Result +for result in paginate(client.get_results, items_field="results", take=100): + print(f"Result ID: {result.id}, Status: {result.status.status_type}") # type: ignore[union-attr] + +# Example 2: Collect all results into a list +print("\nExample 2: Collecting all results into a list") +print("-" * 50) +all_results: list[Result] = list( + paginate(client.get_results, items_field="results", take=100) +) +print(f"Total results retrieved: {len(all_results)}") + +# Example 3: Process in chunks while still using automatic pagination +print("\nExample 3: Processing results in batches") +print("-" * 50) +batch: list[Result] = [] +batch_size = 50 +for i, result in enumerate( + paginate(client.get_results, items_field="results", take=100), start=1 +): + batch.append(result) + if len(batch) >= batch_size: + # Process batch + print(f"Processing batch of {len(batch)} results...") + # Do something with batch + batch = [] + +# Process remaining items +if batch: + print(f"Processing final batch of {len(batch)} results...") diff --git a/nisystemlink/clients/core/helpers/__init__.py b/nisystemlink/clients/core/helpers/__init__.py index 402880f6..f3533f2a 100644 --- a/nisystemlink/clients/core/helpers/__init__.py +++ b/nisystemlink/clients/core/helpers/__init__.py @@ -1,4 +1,5 @@ from ._iterator_file_like import IteratorFileLike from ._minion_id import read_minion_id +from ._pagination import paginate # flake8: noqa diff --git a/nisystemlink/clients/core/helpers/_pagination.py b/nisystemlink/clients/core/helpers/_pagination.py new file mode 100644 index 00000000..07cb3167 --- /dev/null +++ b/nisystemlink/clients/core/helpers/_pagination.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +from typing import Any, Callable, Generator + +from nisystemlink.clients.core._uplink._with_paging import WithPaging + + +def paginate( + fetch_function: Callable[..., WithPaging], + items_field: str, + **fetch_kwargs: Any, +) -> Generator[Any, None, None]: + """Generate items from paginated API responses using continuation tokens. + + This helper function provides a convenient way to iterate over all items + from a paginated API endpoint that uses continuation tokens. It automatically + handles fetching subsequent pages until all items are retrieved. + + Args: + fetch_function: The API function to call to fetch each page of items. + Must accept a ``continuation_token`` parameter and return a response + that derives from ``WithPaging``. + items_field: The name of the field in the response object that contains + the list of items to yield. + **fetch_kwargs: Additional keyword arguments to pass to the fetch function + on every call (e.g., filters, take limits, etc.). + + Yields: + Individual items from each page of results. + + Note: + The fetch function will be called with the `continuation_token` parameter + set to `None` on the first call, then with each subsequent token until + the response contains a `None` continuation token. + """ + continuation_token = None + + while True: + # Set the continuation token parameter for this page + fetch_kwargs["continuation_token"] = continuation_token + + # Fetch the current page + response = fetch_function(**fetch_kwargs) + + # Get the items from the response using the specified field name + items = getattr(response, items_field, []) + + # Yield each item individually + for item in items: + yield item + + # Get the continuation token for the next page + next_continuation_token = response.continuation_token + + # Stop if there are no more pages + if next_continuation_token is None: + break + + # Guard against infinite loop if continuation token doesn't change + if next_continuation_token == continuation_token: + raise RuntimeError("Continuation token did not change between iterations.") + + continuation_token = next_continuation_token diff --git a/tests/core/test_pagination.py b/tests/core/test_pagination.py new file mode 100644 index 00000000..4518c2c9 --- /dev/null +++ b/tests/core/test_pagination.py @@ -0,0 +1,247 @@ +# -*- coding: utf-8 -*- +"""Tests for the pagination helper function.""" + +from typing import Any, List +from unittest.mock import MagicMock + +import pytest +from nisystemlink.clients.core.helpers import paginate + + +class MockResponseWithItems: + """Mock API response object with 'items' field for testing pagination.""" + + def __init__(self, items: List[Any], continuation_token: str | None = None): + self.items = items + self.continuation_token = continuation_token + + +class MockResponseWithResults: + """Mock API response object with 'results' field for testing pagination.""" + + def __init__(self, results: List[Any], continuation_token: str | None = None): + self.results = results + self.continuation_token = continuation_token + + +class TestPaginate: + """Tests for the paginate helper function.""" + + def test__paginate_single_page__yields_all_items(self): + """Test pagination with a single page of results.""" + # Arrange + items = [1, 2, 3, 4, 5] + mock_fetch = MagicMock(return_value=MockResponseWithItems(items, None)) + + # Act + result = list(paginate(mock_fetch, "items")) + + # Assert + assert result == items + assert mock_fetch.call_count == 1 + mock_fetch.assert_called_with(continuation_token=None) + + def test__paginate_multiple_pages__yields_all_items_in_order(self): + """Test pagination with multiple pages.""" + # Arrange + page1_items = [1, 2, 3] + page2_items = [4, 5, 6] + page3_items = [7, 8, 9] + + mock_fetch = MagicMock( + side_effect=[ + MockResponseWithItems(page1_items, "token1"), + MockResponseWithItems(page2_items, "token2"), + MockResponseWithItems(page3_items, None), + ] + ) + + # Act + result = list(paginate(mock_fetch, "items")) + + # Assert + assert result == [1, 2, 3, 4, 5, 6, 7, 8, 9] + assert mock_fetch.call_count == 3 + assert mock_fetch.call_args_list[0][1]["continuation_token"] is None + assert mock_fetch.call_args_list[1][1]["continuation_token"] == "token1" + assert mock_fetch.call_args_list[2][1]["continuation_token"] == "token2" + + def test__paginate_with_results_field__yields_all_items(self): + """Test pagination with 'results' as the items field name.""" + # Arrange + page1_results = ["a", "b"] + page2_results = ["c", "d"] + + mock_fetch = MagicMock( + side_effect=[ + MockResponseWithResults(page1_results, "next1"), + MockResponseWithResults(page2_results, None), + ] + ) + + # Act + result = list(paginate(mock_fetch, items_field="results")) + + # Assert + assert result == ["a", "b", "c", "d"] + assert mock_fetch.call_count == 2 + + def test__paginate_with_additional_kwargs__passes_kwargs_to_fetch(self): + """Test that additional keyword arguments are passed to the fetch function.""" + # Arrange + items = [1, 2, 3] + mock_fetch = MagicMock(return_value=MockResponseWithItems(items, None)) + + # Act + result = list( + paginate( + mock_fetch, "items", take=10, filter="status==PASSED", return_count=True + ) + ) + + # Assert + assert result == items + mock_fetch.assert_called_once_with( + continuation_token=None, take=10, filter="status==PASSED", return_count=True + ) + + def test__paginate_empty_results__returns_empty(self): + """Test pagination with no results.""" + # Arrange + mock_fetch = MagicMock(return_value=MockResponseWithItems([], None)) + + # Act + result = list(paginate(mock_fetch, "items")) + + # Assert + assert result == [] + assert mock_fetch.call_count == 1 + + def test__paginate_empty_page_in_middle__yields_only_non_empty_pages(self): + """Test pagination when a middle page is empty.""" + # Arrange + page1_items = [1, 2, 3] + page2_items = [] + page3_items = [4, 5] + + mock_fetch = MagicMock( + side_effect=[ + MockResponseWithItems(page1_items, "token1"), + MockResponseWithItems(page2_items, "token2"), + MockResponseWithItems(page3_items, None), + ] + ) + + # Act + result = list(paginate(mock_fetch, "items")) + + # Assert + assert result == [1, 2, 3, 4, 5] + assert mock_fetch.call_count == 3 + + def test__paginate_generator__can_be_used_in_for_loop(self): + """Test that paginate works as expected in a for loop.""" + # Arrange + page1_items = [1, 2] + page2_items = [3, 4] + mock_fetch = MagicMock( + side_effect=[ + MockResponseWithItems(page1_items, "token1"), + MockResponseWithItems(page2_items, None), + ] + ) + + # Act + collected = [] + for item in paginate(mock_fetch, "items"): + collected.append(item * 2) + + # Assert + assert collected == [2, 4, 6, 8] + + def test__paginate_lazy_evaluation__only_fetches_as_needed(self): + """Test that pagination is lazy and doesn't fetch all pages immediately.""" + # Arrange + page1_items = [1, 2, 3] + page2_items = [4, 5, 6] + mock_fetch = MagicMock( + side_effect=[ + MockResponseWithItems(page1_items, "token1"), + MockResponseWithItems(page2_items, None), + ] + ) + + # Act + gen = paginate(mock_fetch, "items") + assert mock_fetch.call_count == 0 # Nothing called yet + + first_item = next(gen) + assert first_item == 1 + assert mock_fetch.call_count == 1 # First page fetched + + # Consume rest of first page + next(gen) # 2 + next(gen) # 3 + + # First page exhausted, but second page not yet fetched + assert mock_fetch.call_count == 1 + + # Fetch first item of second page + fourth_item = next(gen) + assert fourth_item == 4 + assert mock_fetch.call_count == 2 # Second page fetched + + def test__paginate_with_kwargs_persisted_across_pages__kwargs_used_on_all_calls( + self, + ): + """Test that kwargs are passed to all fetch function calls.""" + # Arrange + mock_fetch = MagicMock( + side_effect=[ + MockResponseWithItems([1, 2], "token1"), + MockResponseWithItems([3, 4], None), + ] + ) + + # Act + list(paginate(mock_fetch, "items", take=100, filter="active==true")) + + # Assert + for call in mock_fetch.call_args_list: + assert call[1]["take"] == 100 + assert call[1]["filter"] == "active==true" + + def test__paginate_missing_items_field__yields_nothing(self): + """Test pagination when the items field doesn't exist on the response.""" + # Arrange + mock_response = MockResponseWithItems([], None) + # Remove the items field + delattr(mock_response, "items") + mock_fetch = MagicMock(return_value=mock_response) + + # Act + result = list(paginate(mock_fetch, "items")) + + # Assert + assert result == [] + assert mock_fetch.call_count == 1 + + def test__paginate_continuation_token_unchanged__raises_runtime_error(self): + """Test that an error is raised if the continuation token doesn't change.""" + # Arrange + # First call returns token1, second call returns token1 again (infinite loop) + mock_fetch = MagicMock( + side_effect=[ + MockResponseWithItems([1, 2], "token1"), + MockResponseWithItems( + [3, 4], "token1" + ), # Same token - should raise error + ] + ) + + # Act & Assert + with pytest.raises(RuntimeError, match="Continuation token did not change"): + list(paginate(mock_fetch, "items")) + + # Should have made 2 calls before detecting the issue + assert mock_fetch.call_count == 2