Skip to content

http

Class for interacting with paginated APIs over HTTP.

HTTPClient(base_url='', per_page=100, timeout=5.0, verify=True, *, headers=None, client=None, verbose=True, batch_size=10, n_workers=2, retries=3, on_errors='warn')

HTTP client for interacting with paginated API.

Parameters:

Name Type Description Default
base_url str

The base URL of the API.

''
per_page int

Default number of entries per page, default 100

100
timeout float

Maximum timeout for requests in seconds, default 5.0

5.0
verify bool or str

Verify SSL credentials, default True

Also accepts a path string to an SSL certificate file.

True
headers dict[str, str]

Collection of headers to send with each request, default None

None
client AsyncClient

Authenticated httpx.AsyncClient, default None

None
verbose bool

Set to False to disable progress bar, default True

True
batch_size int

Size of batches to make requests with, default 10

10
n_workers int

Number of workers to make requests, default 2

2
retries int

Number of times to retry a request, default 3

3
on_errors Literal['warn', 'raise']

Warn about failed requests or raise immediately on failure, default "warn"

'warn'
Source code in bavapi/http.py
def __init__(
    self,
    base_url: str = "",
    per_page: int = 100,
    timeout: float = 5.0,
    verify: Union[bool, str] = True,
    *,
    headers: Optional[Dict[str, str]] = None,
    client: Optional[AsyncClientType] = None,
    verbose: bool = True,
    batch_size: int = 10,
    n_workers: int = 2,
    retries: int = 3,
    on_errors: Literal["warn", "raise"] = "warn",
) -> None:
    self.per_page = per_page
    self.verbose = verbose
    self.batch_size = batch_size
    self.n_workers = n_workers
    self.retries = retries
    self.on_errors: Literal["warn", "raise"] = on_errors

    self.client = client or httpx.AsyncClient(
        headers=headers,
        timeout=timeout,
        verify=verify,
        base_url=base_url,
    )

aclose() async

Asynchronously close all client connections.

Source code in bavapi/http.py
async def aclose(self) -> None:
    """Asynchronously close all client connections."""
    return await self.client.aclose()

get(endpoint, query) async

Perform GET request on the given endpoint.

Parameters:

Name Type Description Default
endpoint str

Path to endpoint.

required
query Query

Request parameters.

required

Returns:

Type Description
Response

Requested response object.

Raises:

Type Description
APIError

If request fails.

Source code in bavapi/http.py
async def get(self, endpoint: str, query: _Query) -> httpx.Response:
    """Perform GET request on the given endpoint.

    Parameters
    ----------
    endpoint : str
        Path to endpoint.
    query : Query
        Request parameters.

    Returns
    -------
    httpx.Response
        Requested response object.

    Raises
    ------
    APIError
        If request fails.
    """
    url = f"{endpoint}/{query.item_id}" if query.item_id is not None else endpoint

    resp = await self.client.get(url, params=query.to_params(endpoint))

    if resp.status_code != 200:
        try:
            message = resp.json()["message"]
        except (KeyError, JSONDecodeError):
            message = "An error occurred with the Fount."

        raise APIError(f"Error {resp.status_code}:\n{message}\nurl={resp.url}")

    return resp

get_pages(endpoint, query, n_pages) async

Perform GET requests for a given number of pages on an endpoint.

Parameters:

Name Type Description Default
endpoint str

Path to endpoint.

required
query Query

Request parameters.

required
n_pages int

Number of pages to request.

required

Returns:

Type Description
list[Response]

List of response objects.

Source code in bavapi/http.py
async def get_pages(
    self,
    endpoint: str,
    query: _Query,
    n_pages: int,
) -> List[httpx.Response]:
    """Perform GET requests for a given number of pages on an endpoint.

    Parameters
    ----------
    endpoint : str
        Path to endpoint.
    query : Query
        Request parameters.
    n_pages : int
        Number of pages to request.

    Returns
    -------
    list[httpx.Response]
        List of response objects.
    """
    get_func = aretry(self.get, self.retries, delay=0.25)
    queue: asyncio.Queue[Iterable[_Query]] = asyncio.Queue()

    for batch in batched(query.paginated(n_pages), self.batch_size):
        queue.put_nowait(batch)

    pbar = tqdm(desc=f"{endpoint} query", total=n_pages) if self.verbose else None
    try:
        fetcher: PageFetcher[httpx.Response] = PageFetcher(pbar, self.on_errors)

        workers = [
            asyncio.create_task(fetcher.worker(get_func, endpoint, queue))
            for _ in range(self.n_workers)
        ]

        await asyncio.gather(*workers)
    finally:
        if pbar:
            pbar.close()

    fetcher.warn_if_errors()

    return fetcher.results

query(endpoint, query) async

Perform a paginated GET request on the given endpoint.

Parameters:

Name Type Description Default
endpoint str

Path to endpoint.

required
query Query

Request parameters.

required

Returns:

Type Description
Iterator[JSONDict]

An iterator of JSONDict objects.

Raises:

Type Description
APIError

If any request fails.

DataNotFoundError

If response data is empty.

RateLimitExceededError

If response would exceed the rate limit.

Source code in bavapi/http.py
async def query(
    self,
    endpoint: str,
    query: _Query,
) -> Iterator[JSONDict]:
    """Perform a paginated GET request on the given endpoint.

    Parameters
    ----------
    endpoint : str
        Path to endpoint.
    query : Query
        Request parameters.

    Returns
    -------
    Iterator[JSONDict]
        An iterator of JSONDict objects.

    Raises
    ------
    APIError
        If any request fails.
    DataNotFoundError
        If response data is empty.
    RateLimitExceededError
        If response would exceed the rate limit.
    """
    per_page = query.per_page or self.per_page
    init_per_page = per_page if query.is_single_page() else 1
    handshake_func = aretry(self.get, self.retries, 0.25, (ssl.SSLError,))
    resp = await handshake_func(endpoint, query.with_page(per_page=init_per_page))

    payload: Dict[str, JSONData] = resp.json()
    data: JSONData = payload["data"]

    if not data:
        raise DataNotFoundError("Your query returned no results.")

    if isinstance(data, dict):
        return iter((data,))

    meta = cast(JSONDict, payload["meta"])
    total = cast(int, meta["total"])

    if query.is_single_page() or len(data) == total:
        return iter(data)

    n_pages = _calculate_pages(query.page, per_page, query.max_pages, total)

    if n_pages > (limit_remaining := int(resp.headers["x-ratelimit-remaining"])):
        raise RateLimitExceededError(
            f"Number of pages ({n_pages}) for this request "
            f"exceeds the rate limit ({limit_remaining}, "
            f"total={resp.headers['x-ratelimit-limit']})."
        )

    pages = await self.get_pages(
        endpoint, query.with_page(per_page=per_page), n_pages
    )

    return (i for page in pages for i in page.json()["data"])