Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions examples/general/pagination_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Example demonstrating the paginate helper for iterating through paginated API results.

This example shows how to use the paginate() helper function to automatically
handle continuation tokens when fetching all results from a paginated API.
"""

from nisystemlink.clients.core.helpers import paginate
from nisystemlink.clients.testmonitor import TestMonitorClient
from nisystemlink.clients.testmonitor.models import Result

# Server configuration is not required when used with SystemLink Client or run through Jupyter on SystemLink
server_configuration = None

# # Example of setting up the server configuration to point to your instance of SystemLink Enterprise
# from nisystemlink.clients.core import HttpConfiguration
# server_configuration = HttpConfiguration(
# server_uri="https://yourserver.yourcompany.com",
# api_key="YourAPIKeyGeneratedFromSystemLink",
# )

client = TestMonitorClient(configuration=server_configuration)

# Example 1: Basic usage - iterate through all results automatically
print("Example 1: Iterating through all results")
print("-" * 50)
result: Result
for result in paginate(client.get_results, items_field="results", take=100):
print(f"Result ID: {result.id}, Status: {result.status.status_type}") # type: ignore[union-attr]

# Example 2: Collect all results into a list
print("\nExample 2: Collecting all results into a list")
print("-" * 50)
all_results: list[Result] = list(
paginate(client.get_results, items_field="results", take=100)
)
print(f"Total results retrieved: {len(all_results)}")

# Example 3: Process in chunks while still using automatic pagination
print("\nExample 3: Processing results in batches")
print("-" * 50)
batch: list[Result] = []
batch_size = 50
for i, result in enumerate(
paginate(client.get_results, items_field="results", take=100), start=1
):
batch.append(result)
if len(batch) >= batch_size:
# Process batch
print(f"Processing batch of {len(batch)} results...")
# Do something with batch
batch = []

# Process remaining items
if batch:
print(f"Processing final batch of {len(batch)} results...")
1 change: 1 addition & 0 deletions nisystemlink/clients/core/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ._iterator_file_like import IteratorFileLike
from ._minion_id import read_minion_id
from ._pagination import paginate

# flake8: noqa
62 changes: 62 additions & 0 deletions nisystemlink/clients/core/helpers/_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
from typing import Any, Callable, Generator

from nisystemlink.clients.core._uplink._with_paging import WithPaging


def paginate(
fetch_function: Callable[..., WithPaging],
items_field: str,
**fetch_kwargs: Any,
) -> Generator[Any, None, None]:
"""Generate items from paginated API responses using continuation tokens.

This helper function provides a convenient way to iterate over all items
from a paginated API endpoint that uses continuation tokens. It automatically
handles fetching subsequent pages until all items are retrieved.

Args:
fetch_function: The API function to call to fetch each page of items.
Must accept a ``continuation_token`` parameter and return a response
that derives from ``WithPaging``.
items_field: The name of the field in the response object that contains
the list of items to yield.
**fetch_kwargs: Additional keyword arguments to pass to the fetch function
on every call (e.g., filters, take limits, etc.).

Yields:
Individual items from each page of results.

Note:
The fetch function will be called with the `continuation_token` parameter
set to `None` on the first call, then with each subsequent token until
the response contains a `None` continuation token.
"""
continuation_token = None

while True:
# Set the continuation token parameter for this page
fetch_kwargs["continuation_token"] = continuation_token

# Fetch the current page
response = fetch_function(**fetch_kwargs)

# Get the items from the response using the specified field name
items = getattr(response, items_field, [])

# Yield each item individually
for item in items:
yield item

# Get the continuation token for the next page
next_continuation_token = response.continuation_token

# Stop if there are no more pages
if next_continuation_token is None:
break

# Guard against infinite loop if continuation token doesn't change
if next_continuation_token == continuation_token:
raise RuntimeError("Continuation token did not change between iterations.")

continuation_token = next_continuation_token
247 changes: 247 additions & 0 deletions tests/core/test_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# -*- coding: utf-8 -*-
"""Tests for the pagination helper function."""

from typing import Any, List
from unittest.mock import MagicMock

import pytest
from nisystemlink.clients.core.helpers import paginate


class MockResponseWithItems:
"""Mock API response object with 'items' field for testing pagination."""

def __init__(self, items: List[Any], continuation_token: str | None = None):
self.items = items
self.continuation_token = continuation_token


class MockResponseWithResults:
"""Mock API response object with 'results' field for testing pagination."""

def __init__(self, results: List[Any], continuation_token: str | None = None):
self.results = results
self.continuation_token = continuation_token


class TestPaginate:
"""Tests for the paginate helper function."""

def test__paginate_single_page__yields_all_items(self):
"""Test pagination with a single page of results."""
# Arrange
items = [1, 2, 3, 4, 5]
mock_fetch = MagicMock(return_value=MockResponseWithItems(items, None))

# Act
result = list(paginate(mock_fetch, "items"))

# Assert
assert result == items
assert mock_fetch.call_count == 1
mock_fetch.assert_called_with(continuation_token=None)

def test__paginate_multiple_pages__yields_all_items_in_order(self):
"""Test pagination with multiple pages."""
# Arrange
page1_items = [1, 2, 3]
page2_items = [4, 5, 6]
page3_items = [7, 8, 9]

mock_fetch = MagicMock(
side_effect=[
MockResponseWithItems(page1_items, "token1"),
MockResponseWithItems(page2_items, "token2"),
MockResponseWithItems(page3_items, None),
]
)

# Act
result = list(paginate(mock_fetch, "items"))

# Assert
assert result == [1, 2, 3, 4, 5, 6, 7, 8, 9]
assert mock_fetch.call_count == 3
assert mock_fetch.call_args_list[0][1]["continuation_token"] is None
assert mock_fetch.call_args_list[1][1]["continuation_token"] == "token1"
assert mock_fetch.call_args_list[2][1]["continuation_token"] == "token2"

def test__paginate_with_results_field__yields_all_items(self):
"""Test pagination with 'results' as the items field name."""
# Arrange
page1_results = ["a", "b"]
page2_results = ["c", "d"]

mock_fetch = MagicMock(
side_effect=[
MockResponseWithResults(page1_results, "next1"),
MockResponseWithResults(page2_results, None),
]
)

# Act
result = list(paginate(mock_fetch, items_field="results"))

# Assert
assert result == ["a", "b", "c", "d"]
assert mock_fetch.call_count == 2

def test__paginate_with_additional_kwargs__passes_kwargs_to_fetch(self):
"""Test that additional keyword arguments are passed to the fetch function."""
# Arrange
items = [1, 2, 3]
mock_fetch = MagicMock(return_value=MockResponseWithItems(items, None))

# Act
result = list(
paginate(
mock_fetch, "items", take=10, filter="status==PASSED", return_count=True
)
)

# Assert
assert result == items
mock_fetch.assert_called_once_with(
continuation_token=None, take=10, filter="status==PASSED", return_count=True
)

def test__paginate_empty_results__returns_empty(self):
"""Test pagination with no results."""
# Arrange
mock_fetch = MagicMock(return_value=MockResponseWithItems([], None))

# Act
result = list(paginate(mock_fetch, "items"))

# Assert
assert result == []
assert mock_fetch.call_count == 1

def test__paginate_empty_page_in_middle__yields_only_non_empty_pages(self):
"""Test pagination when a middle page is empty."""
# Arrange
page1_items = [1, 2, 3]
page2_items = []
page3_items = [4, 5]

mock_fetch = MagicMock(
side_effect=[
MockResponseWithItems(page1_items, "token1"),
MockResponseWithItems(page2_items, "token2"),
MockResponseWithItems(page3_items, None),
]
)

# Act
result = list(paginate(mock_fetch, "items"))

# Assert
assert result == [1, 2, 3, 4, 5]
assert mock_fetch.call_count == 3

def test__paginate_generator__can_be_used_in_for_loop(self):
"""Test that paginate works as expected in a for loop."""
# Arrange
page1_items = [1, 2]
page2_items = [3, 4]
mock_fetch = MagicMock(
side_effect=[
MockResponseWithItems(page1_items, "token1"),
MockResponseWithItems(page2_items, None),
]
)

# Act
collected = []
for item in paginate(mock_fetch, "items"):
collected.append(item * 2)

# Assert
assert collected == [2, 4, 6, 8]

def test__paginate_lazy_evaluation__only_fetches_as_needed(self):
"""Test that pagination is lazy and doesn't fetch all pages immediately."""
# Arrange
page1_items = [1, 2, 3]
page2_items = [4, 5, 6]
mock_fetch = MagicMock(
side_effect=[
MockResponseWithItems(page1_items, "token1"),
MockResponseWithItems(page2_items, None),
]
)

# Act
gen = paginate(mock_fetch, "items")
assert mock_fetch.call_count == 0 # Nothing called yet

first_item = next(gen)
assert first_item == 1
assert mock_fetch.call_count == 1 # First page fetched

# Consume rest of first page
next(gen) # 2
next(gen) # 3

# First page exhausted, but second page not yet fetched
assert mock_fetch.call_count == 1

# Fetch first item of second page
fourth_item = next(gen)
assert fourth_item == 4
assert mock_fetch.call_count == 2 # Second page fetched

def test__paginate_with_kwargs_persisted_across_pages__kwargs_used_on_all_calls(
self,
):
"""Test that kwargs are passed to all fetch function calls."""
# Arrange
mock_fetch = MagicMock(
side_effect=[
MockResponseWithItems([1, 2], "token1"),
MockResponseWithItems([3, 4], None),
]
)

# Act
list(paginate(mock_fetch, "items", take=100, filter="active==true"))

# Assert
for call in mock_fetch.call_args_list:
assert call[1]["take"] == 100
assert call[1]["filter"] == "active==true"

def test__paginate_missing_items_field__yields_nothing(self):
"""Test pagination when the items field doesn't exist on the response."""
# Arrange
mock_response = MockResponseWithItems([], None)
# Remove the items field
delattr(mock_response, "items")
mock_fetch = MagicMock(return_value=mock_response)

# Act
result = list(paginate(mock_fetch, "items"))

# Assert
assert result == []
assert mock_fetch.call_count == 1

def test__paginate_continuation_token_unchanged__raises_runtime_error(self):
"""Test that an error is raised if the continuation token doesn't change."""
# Arrange
# First call returns token1, second call returns token1 again (infinite loop)
mock_fetch = MagicMock(
side_effect=[
MockResponseWithItems([1, 2], "token1"),
MockResponseWithItems(
[3, 4], "token1"
), # Same token - should raise error
]
)

# Act & Assert
with pytest.raises(RuntimeError, match="Continuation token did not change"):
list(paginate(mock_fetch, "items"))

# Should have made 2 calls before detecting the issue
assert mock_fetch.call_count == 2