Skip to content

How to Handle Pagination

Handle paginated API responses in data sources to fetch large result sets efficiently.

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.


Basic Pagination Pattern

import attrs
import httpx
from pyvider.data_sources import register_data_source, BaseDataSource
from pyvider.schema import s_data_source, a_str, a_num, a_list, PvsSchema

@attrs.define
class PaginatedQueryConfig:
    endpoint: str
    limit: int = 100  # Max results to fetch

@attrs.define
class PaginatedQueryData:
    id: str
    items: list[dict]
    total_fetched: int
    has_more: bool

@register_data_source("paginated_query")
class PaginatedQuery(BaseDataSource):
    config_class = PaginatedQueryConfig
    state_class = PaginatedQueryData

    @classmethod
    def get_schema(cls) -> PvsSchema:
        return s_data_source({
            "endpoint": a_str(required=True),
            "limit": a_num(default=100),
            "id": a_str(computed=True),
            "items": a_list(a_map(a_str()), computed=True),
            "total_fetched": a_num(computed=True),
            "has_more": a_bool(computed=True),
        })

    async def read(self, config: PaginatedQueryConfig) -> PaginatedQueryData:
        all_items = []
        page = 1
        has_more = True

        async with httpx.AsyncClient() as client:
            while has_more and len(all_items) < config.limit:
                response = await client.get(
                    f"https://api.example.com{config.endpoint}",
                    params={"page": page, "per_page": 100}
                )
                data = response.json()

                all_items.extend(data["items"])
                has_more = data.get("has_more", False)
                page += 1

                # Stop if we have enough
                if len(all_items) >= config.limit:
                    break

        return PaginatedQueryData(
            id=f"{config.endpoint}:{config.limit}",
            items=all_items[:config.limit],
            total_fetched=len(all_items[:config.limit]),
            has_more=has_more,
        )

Token-Based Pagination

Many APIs use cursor/token-based pagination:

async def read(self, config: Config) -> Data:
    all_items = []
    next_token = None

    async with httpx.AsyncClient() as client:
        while len(all_items) < config.limit:
            params = {"limit": min(100, config.limit - len(all_items))}
            if next_token:
                params["next_token"] = next_token

            response = await client.get(
                f"https://api.example.com{config.endpoint}",
                params=params
            )
            data = response.json()

            all_items.extend(data["items"])
            next_token = data.get("next_token")

            # No more pages
            if not next_token:
                break

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items,
        total_fetched=len(all_items),
    )

Offset-Based Pagination

Traditional offset/limit pagination:

async def read(self, config: Config) -> Data:
    all_items = []
    offset = 0
    page_size = 100

    async with httpx.AsyncClient() as client:
        while len(all_items) < config.limit:
            fetch_size = min(page_size, config.limit - len(all_items))

            response = await client.get(
                f"https://api.example.com{config.endpoint}",
                params={"offset": offset, "limit": fetch_size}
            )
            data = response.json()

            items = data.get("items", [])
            if not items:
                break  # No more results

            all_items.extend(items)
            offset += len(items)

            # Got fewer than requested = last page
            if len(items) < fetch_size:
                break

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items,
        total_fetched=len(all_items),
    )

Some APIs use HTTP Link headers (like GitHub):

import httpx
from urllib.parse import parse_qs, urlparse

async def read(self, config: Config) -> Data:
    all_items = []
    url = f"https://api.example.com{config.endpoint}"

    async with httpx.AsyncClient() as client:
        while url and len(all_items) < config.limit:
            response = await client.get(url, params={"per_page": 100})
            data = response.json()

            all_items.extend(data)

            # Parse Link header for next page
            link_header = response.headers.get("Link", "")
            url = None
            for link in link_header.split(","):
                if 'rel="next"' in link:
                    url = link.split(";")[0].strip("<> ")
                    break

            if len(all_items) >= config.limit:
                break

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items[:config.limit],
        total_fetched=len(all_items[:config.limit]),
    )

Parallel Pagination

Fetch multiple pages concurrently (be careful with rate limits):

import asyncio

async def read(self, config: Config) -> Data:
    # Determine total pages needed
    pages_needed = (config.limit + 99) // 100  # Round up

    async with httpx.AsyncClient() as client:
        # Create tasks for each page
        tasks = [
            self._fetch_page(client, config.endpoint, page)
            for page in range(1, pages_needed + 1)
        ]

        # Fetch all pages concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Combine results
        all_items = []
        for result in results:
            if isinstance(result, Exception):
                continue  # Skip failed pages
            all_items.extend(result)

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items[:config.limit],
        total_fetched=len(all_items[:config.limit]),
    )

async def _fetch_page(
    self,
    client: httpx.AsyncClient,
    endpoint: str,
    page: int
) -> list[dict]:
    """Fetch a single page."""
    response = await client.get(
        f"https://api.example.com{endpoint}",
        params={"page": page, "per_page": 100}
    )
    data = response.json()
    return data.get("items", [])

Rate Limiting

Handle API rate limits during pagination:

import asyncio
from datetime import datetime, timedelta

class RateLimitedQuery(BaseDataSource):
    def __init__(self):
        super().__init__()
        self._last_request = None
        self._min_interval = timedelta(milliseconds=100)  # 10 req/sec

    async def _wait_for_rate_limit(self):
        """Ensure we don't exceed rate limit."""
        if self._last_request:
            elapsed = datetime.now() - self._last_request
            if elapsed < self._min_interval:
                wait_time = (self._min_interval - elapsed).total_seconds()
                await asyncio.sleep(wait_time)
        self._last_request = datetime.now()

    async def read(self, config: Config) -> Data:
        all_items = []
        page = 1

        async with httpx.AsyncClient() as client:
            while len(all_items) < config.limit:
                # Respect rate limit
                await self._wait_for_rate_limit()

                response = await client.get(
                    f"https://api.example.com{config.endpoint}",
                    params={"page": page, "per_page": 100}
                )

                # Handle 429 Too Many Requests
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", "60"))
                    await asyncio.sleep(retry_after)
                    continue  # Retry same page

                data = response.json()
                all_items.extend(data["items"])

                if not data.get("has_more"):
                    break

                page += 1

        return Data(
            id=f"{config.endpoint}:{config.limit}",
            items=all_items[:config.limit],
        )

Error Handling

Handle pagination errors gracefully:

async def read(self, config: Config) -> Data:
    all_items = []
    page = 1
    errors = []

    async with httpx.AsyncClient() as client:
        while page <= 10 and len(all_items) < config.limit:  # Max 10 pages
            try:
                response = await client.get(
                    f"https://api.example.com{config.endpoint}",
                    params={"page": page, "per_page": 100},
                    timeout=30.0  # Add timeout
                )
                response.raise_for_status()

                data = response.json()
                all_items.extend(data.get("items", []))

                if not data.get("has_more"):
                    break

                page += 1

            except httpx.HTTPError as e:
                errors.append(f"Page {page} failed: {e}")
                # Continue to next page instead of failing completely
                page += 1
                continue

            except Exception as e:
                errors.append(f"Unexpected error on page {page}: {e}")
                break  # Stop on unexpected errors

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items[:config.limit],
        total_fetched=len(all_items),
        errors=errors if errors else None,
    )

Best Practices

1. Set Maximum Pages

Prevent infinite loops:

async def read(self, config: Config) -> Data:
    max_pages = 100  # Safety limit
    page = 0

    while page < max_pages and len(all_items) < config.limit:
        # Fetch page
        page += 1

2. Respect User Limits

Don't fetch more than requested:

while len(all_items) < config.limit:
    # Fetch only what's needed
    fetch_size = min(100, config.limit - len(all_items))
    ...

3. Return Partial Results

Don't fail if some pages error:

async def read(self, config: Config) -> Data:
    all_items = []
    try:
        # Fetch pages
        ...
    except Exception as e:
        # Return what we got
        return Data(
            items=all_items,
            error=f"Partial results due to: {e}"
        )

4. Cache Expensive Queries

from functools import lru_cache

@lru_cache(maxsize=32)
async def read(self, config: Config) -> Data:
    # Pagination results cached by config
    ...

5. Add Progress Logging

import logging

async def read(self, config: Config) -> Data:
    logger = logging.getLogger(__name__)
    page = 1

    while ...:
        logger.info(f"Fetching page {page}, got {len(all_items)} items so far")
        page += 1

See Also