Skip to content

katana_public_api_client.katana_client

katana_public_api_client.katana_client

KatanaClient - The pythonic Katana API client with automatic resilience.

This client uses httpx's native transport layer to provide automatic retries, rate limiting, error handling, and pagination for all API calls without any decorators or wrapper methods needed.

Attributes

Classes

ErrorLoggingTransport(wrapped_transport=None, logger=None, **kwargs)

Bases: AsyncBaseTransport

Transport layer that adds detailed error logging for 4xx client errors.

This transport wraps another transport and intercepts responses to log detailed error information using the generated error models. Inherits from AsyncBaseTransport (not AsyncHTTPTransport) so we don't spin up an unused connection pool inside this layer; all I/O goes through the wrapped transport.

Parameters:

  • wrapped_transport (AsyncBaseTransport | None, default: None ) –

    The transport to wrap. If None, creates a new AsyncHTTPTransport.

  • logger (Logger | None, default: None ) –

    Logger instance for capturing error details. If None, creates a default logger.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.

Source code in katana_public_api_client/katana_client.py
def __init__(
    self,
    wrapped_transport: AsyncBaseTransport | None = None,
    logger: Logger | None = None,
    **kwargs: Any,
):
    """
    Initialize the error logging transport.

    Args:
        wrapped_transport: The transport to wrap. If None, creates a new AsyncHTTPTransport.
        logger: Logger instance for capturing error details. If None, creates a default logger.
        **kwargs: Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.
    """
    if wrapped_transport is None:
        wrapped_transport = AsyncHTTPTransport(**kwargs)
    self._wrapped_transport = wrapped_transport
    self.logger: Logger = logger or logging.getLogger(__name__)
Functions
aclose() async

Propagate close down the wrapped chain so inner transports release resources.

Source code in katana_public_api_client/katana_client.py
async def aclose(self) -> None:
    """Propagate close down the wrapped chain so inner transports release resources."""
    await self._wrapped_transport.aclose()
handle_async_request(request) async

Handle request and log detailed error information for 4xx responses.

Source code in katana_public_api_client/katana_client.py
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
    """Handle request and log detailed error information for 4xx responses."""
    response = await self._wrapped_transport.handle_async_request(request)

    # Log detailed information for 400-level client errors
    if 400 <= response.status_code < 500:
        await self._log_client_error(response, request)

    return response

KatanaClient(api_key=None, base_url=None, timeout=30.0, max_retries=5, max_pages=100, logger=None, *, requests_per_minute=60, **httpx_kwargs)

Bases: AuthenticatedClient

The pythonic Katana API client with automatic resilience and pagination.

This client inherits from AuthenticatedClient and can be passed directly to generated API methods without needing the .client property.

Features: - Automatic retries on network errors and server errors (5xx) - Automatic rate limit handling with Retry-After header support - Auto-pagination ON by default for GET requests (collects all pages automatically) - Uses 250 items per page (Katana's max) for efficient pagination - Rich logging and observability - Minimal configuration - just works out of the box

Auto-pagination behavior: - ON by default for GET requests with NO page parameter - Uses 250 items per page when no limit specified by caller - If caller specifies a limit, that limit is used per page - ANY explicit page parameter disables auto-pagination (e.g., page=1) - Disabled per-request via extensions: extensions={"auto_pagination": False} - Control max pages via max_pages constructor parameter - Limit total items via extensions: extensions={"max_items": 200}

Usage

async with KatanaClient() as client: from katana_public_api_client.api.product import get_all_products

# Auto-pagination is ON - all pages collected automatically
# Uses 250 items per page for efficiency
response = await get_all_products.asyncio_detailed(
    client=client,  # Pass client directly - no .client needed!
)

# Use a custom limit per page (100 instead of 250)
response = await get_all_products.asyncio_detailed(
    client=client,
    limit=100,   # Use 100 per page
)

# Get a specific page only (ANY page param disables auto-pagination)
response = await get_all_products.asyncio_detailed(
    client=client,
    page=2,      # Get page 2 only
    limit=50
)

# Limit total items collected (via httpx client)
httpx_client = client.get_async_httpx_client()
response = await httpx_client.get(
    "/products",
    extensions={"max_items": 200}   # Stop after 200 items
)

# Control max pages globally
client_limited = KatanaClient(max_pages=5)  # Limit to 5 pages max

Parameters:

  • api_key (str | None, default: None ) –

    Katana API key. If None, will try to load from KATANA_API_KEY env var, .env file, or ~/.netrc file (in that order).

  • base_url (str | None, default: None ) –

    Base URL for the Katana API. Defaults to https://api.katanamrp.com/v1

  • timeout (float, default: 30.0 ) –

    Request timeout in seconds. Defaults to 30.0.

  • max_retries (int, default: 5 ) –

    Maximum number of retry attempts for failed requests. Defaults to 5.

  • max_pages (int, default: 100 ) –

    Maximum number of pages to collect during auto-pagination. Defaults to 100.

  • requests_per_minute (int | None, default: 60 ) –

    Steady-state request budget for the proactive rate limiter. Defaults to 60 (Katana's documented limit). Set to None to disable the rate limiter entirely (e.g. when callers want to manage throttling themselves, or for tests that need raw throughput). When the limiter is active, every actual HTTP request — including retries and per-page paginated fetches — consumes one token, and the transport adapts to the server's X-Ratelimit-Remaining / X-Ratelimit-Reset headers.

  • logger (Logger | None, default: None ) –

    Any object whose debug/info/warning/error methods accept (msg, *args, **kwargs) — the standard logging.Logger call convention (e.g. logging.Logger, structlog.BoundLogger). If None, creates a default stdlib logger.

  • **httpx_kwargs (Any, default: {} ) –

    Additional arguments passed to the base AsyncHTTPTransport. Common parameters include: - http2 (bool): Enable HTTP/2 support - limits (httpx.Limits): Connection pool limits - verify (bool | str | ssl.SSLContext): SSL certificate verification - cert (str | tuple): Client-side certificates - trust_env (bool): Trust environment variables for proxy configuration - event_hooks (dict): Custom event hooks (will be merged with built-in hooks)

Raises:

  • ValueError

    If no API key is provided via api_key param, KATANA_API_KEY env var, .env file, or ~/.netrc file.

Note

Transport-related parameters (http2, limits, verify, etc.) are correctly passed to the innermost AsyncHTTPTransport layer, ensuring they take effect even with the layered transport architecture.

Example

async with KatanaClient() as client: ... # All API calls through client get automatic resilience ... response = await some_api_method.asyncio_detailed(client=client)

Source code in katana_public_api_client/katana_client.py
def __init__(
    self,
    api_key: str | None = None,
    base_url: str | None = None,
    timeout: float = 30.0,
    max_retries: int = 5,
    max_pages: int = 100,
    logger: Logger | None = None,
    *,
    requests_per_minute: int | None = 60,
    **httpx_kwargs: Any,
):
    """
    Initialize the Katana API client with automatic resilience features.

    Args:
        api_key: Katana API key. If None, will try to load from KATANA_API_KEY env var,
            .env file, or ~/.netrc file (in that order).
        base_url: Base URL for the Katana API. Defaults to https://api.katanamrp.com/v1
        timeout: Request timeout in seconds. Defaults to 30.0.
        max_retries: Maximum number of retry attempts for failed requests. Defaults to 5.
        max_pages: Maximum number of pages to collect during auto-pagination. Defaults to 100.
        requests_per_minute: Steady-state request budget for the proactive
            rate limiter. Defaults to 60 (Katana's documented limit). Set to
            ``None`` to disable the rate limiter entirely (e.g. when callers
            want to manage throttling themselves, or for tests that need raw
            throughput). When the limiter is active, every actual HTTP
            request — including retries and per-page paginated fetches —
            consumes one token, and the transport adapts to the server's
            ``X-Ratelimit-Remaining`` / ``X-Ratelimit-Reset`` headers.
        logger: Any object whose debug/info/warning/error methods accept
            (msg, *args, **kwargs) — the standard logging.Logger call convention
            (e.g. logging.Logger, structlog.BoundLogger). If None, creates a
            default stdlib logger.
        **httpx_kwargs: Additional arguments passed to the base AsyncHTTPTransport.
            Common parameters include:
            - http2 (bool): Enable HTTP/2 support
            - limits (httpx.Limits): Connection pool limits
            - verify (bool | str | ssl.SSLContext): SSL certificate verification
            - cert (str | tuple): Client-side certificates
            - trust_env (bool): Trust environment variables for proxy configuration
            - event_hooks (dict): Custom event hooks (will be merged with built-in hooks)

    Raises:
        ValueError: If no API key is provided via api_key param, KATANA_API_KEY env var,
            .env file, or ~/.netrc file.

    Note:
        Transport-related parameters (http2, limits, verify, etc.) are correctly
        passed to the innermost AsyncHTTPTransport layer, ensuring they take effect
        even with the layered transport architecture.

    Example:
        >>> async with KatanaClient() as client:
        ...     # All API calls through client get automatic resilience
        ...     response = await some_api_method.asyncio_detailed(client=client)
    """
    load_dotenv()

    # Handle backwards compatibility: accept 'token' kwarg as alias for 'api_key'
    if "token" in httpx_kwargs:
        if api_key is not None:
            raise ValueError("Cannot specify both 'api_key' and 'token' parameters")
        api_key = httpx_kwargs.pop("token")

    # Determine base_url early so we can use it for netrc lookup
    base_url = (
        base_url or os.getenv("KATANA_BASE_URL") or "https://api.katanamrp.com/v1"
    )

    # Setup credentials with priority: param > env (including .env) > netrc
    api_key = (
        api_key or os.getenv("KATANA_API_KEY") or self._read_from_netrc(base_url)
    )

    if not api_key:
        raise ValueError(
            "API key required via: api_key param, KATANA_API_KEY env var, "
            ".env file, or ~/.netrc"
        )

    self.logger: Logger = logger or logging.getLogger(__name__)
    self.max_pages = max_pages

    # Warn if SSL verification is disabled — risk of MITM attacks
    if httpx_kwargs.get("verify") is False:
        self.logger.warning(
            "SSL certificate verification is disabled (verify=False). "
            "This exposes the connection to MITM attacks. "
            "Only use this for local development."
        )

    # Domain class instances (lazy-loaded)
    self._products: Products | None = None
    self._materials: Materials | None = None
    self._variants: Variants | None = None
    self._services: Services | None = None
    self._api_namespace: ApiNamespace | None = None

    # Extract client-level parameters that shouldn't go to the transport
    # Event hooks for observability - start with our defaults
    event_hooks: dict[str, list[Callable[[httpx.Response], Awaitable[None]]]] = {
        "response": [
            self._capture_pagination_metadata,
            self._log_response_metrics,
        ]
    }

    # Extract and merge user hooks
    user_hooks = httpx_kwargs.pop("event_hooks", {})
    for event, hooks in user_hooks.items():
        # Normalize to list and add to existing or create new event
        hook_list = cast(
            list[Callable[[httpx.Response], Awaitable[None]]],
            hooks if isinstance(hooks, list) else [hooks],
        )
        if event in event_hooks:
            event_hooks[event].extend(hook_list)
        else:
            event_hooks[event] = hook_list

    # Check if user wants to override the transport entirely
    custom_transport = httpx_kwargs.pop("transport", None) or httpx_kwargs.pop(
        "async_transport", None
    )

    if custom_transport:
        # User provided a custom transport, use it as-is
        transport = custom_transport
    else:
        # Separate transport-specific kwargs from client-specific kwargs
        # Client-specific params that should NOT go to the transport
        client_only_params = ["headers", "cookies", "params", "auth"]
        client_kwargs = {
            k: httpx_kwargs.pop(k)
            for k in list(httpx_kwargs.keys())
            if k in client_only_params
        }

        # Create resilient transport with remaining transport-specific httpx_kwargs
        # These will be passed to the base AsyncHTTPTransport (http2, limits, verify, etc.)
        transport = ResilientAsyncTransport(
            max_retries=max_retries,
            max_pages=max_pages,
            requests_per_minute=requests_per_minute,
            logger=self.logger,
            **httpx_kwargs,  # Pass through http2, limits, verify, cert, trust_env, etc.
        )

        # Put client-specific params back into httpx_kwargs for the parent class
        httpx_kwargs.update(client_kwargs)

    # Initialize the parent AuthenticatedClient
    super().__init__(
        base_url=base_url,
        token=api_key,
        timeout=httpx.Timeout(timeout),
        httpx_args={
            "transport": transport,
            "event_hooks": event_hooks,
            **httpx_kwargs,  # Include any remaining client-level kwargs
        },
    )
Attributes
api property

Thin CRUD wrappers for all API resources. Returns raw attrs models.

Example

async with KatanaClient() as client: ... products = await client.api.products.list(is_sellable=True) ... product = await client.api.products.get(123) ... await client.api.products.delete(123)

materials property

Access material catalog operations.

Returns:

  • Materials

    Materials instance for material CRUD operations.

Example

async with KatanaClient() as client: ... materials = await client.materials.list() ... material = await client.materials.get(123)

products property

Access product catalog operations.

Returns:

  • Products

    Products instance for product CRUD and search operations.

Example

async with KatanaClient() as client: ... # Product CRUD ... products = await client.products.list(is_sellable=True) ... product = await client.products.get(123) ... results = await client.products.search("widget")

services property

Access service catalog operations.

Returns:

  • Services

    Services instance for service CRUD operations.

Example

async with KatanaClient() as client: ... services = await client.services.list() ... service = await client.services.get(123)

variants property

Access variant catalog operations.

Returns:

  • Variants

    Variants instance for variant CRUD operations.

Example

async with KatanaClient() as client: ... variants = await client.variants.list() ... variant = await client.variants.get(123)

Functions

PaginationTransport(wrapped_transport=None, max_pages=100, logger=None, **kwargs)

Bases: AsyncBaseTransport

Transport layer that adds automatic pagination for GET requests.

This transport wraps another transport and automatically collects all pages for GET requests by default. Inherits from AsyncBaseTransport (not AsyncHTTPTransport) so we don't spin up an unused connection pool inside this layer; all I/O goes through the wrapped transport.

Auto-pagination behavior: - ON by default for GET requests with NO page parameter in URL - Uses 250 items per page (Katana's max) when no limit specified by caller - If caller specifies a limit, that limit is used (caller's choice) - ANY explicit page parameter in URL disables auto-pagination (e.g., ?page=1) - Disabled when request has extensions={"auto_pagination": False} - Only applies to GET requests (POST, PUT, etc. are never paginated)

Controlling pagination limits: - max_pages (constructor): Maximum number of pages to fetch - max_items (extension): Maximum total items to collect, e.g., extensions={"max_items": 200} stops after 200 items

Parameters:

  • wrapped_transport (AsyncBaseTransport | None, default: None ) –

    The transport to wrap. If None, creates a new AsyncHTTPTransport.

  • max_pages (int, default: 100 ) –

    Maximum number of pages to collect during auto-pagination. Defaults to 100.

  • logger (Logger | None, default: None ) –

    Logger instance for capturing pagination operations. If None, creates a default logger.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.

Source code in katana_public_api_client/katana_client.py
def __init__(
    self,
    wrapped_transport: AsyncBaseTransport | None = None,
    max_pages: int = 100,
    logger: Logger | None = None,
    **kwargs: Any,
):
    """
    Initialize the pagination transport.

    Args:
        wrapped_transport: The transport to wrap. If None, creates a new AsyncHTTPTransport.
        max_pages: Maximum number of pages to collect during auto-pagination. Defaults to 100.
        logger: Logger instance for capturing pagination operations. If None, creates a default logger.
        **kwargs: Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.
    """
    if wrapped_transport is None:
        wrapped_transport = AsyncHTTPTransport(**kwargs)

    self._wrapped_transport = wrapped_transport
    self.max_pages = max_pages
    self.logger: Logger = logger or logging.getLogger(__name__)
Functions
aclose() async

Propagate close down the wrapped chain so inner transports release resources.

Source code in katana_public_api_client/katana_client.py
async def aclose(self) -> None:
    """Propagate close down the wrapped chain so inner transports release resources."""
    await self._wrapped_transport.aclose()
handle_async_request(request) async

Handle request with automatic pagination for GET requests.

Auto-pagination is ON by default for GET requests. It is disabled when: - extensions={"auto_pagination": False} is set, OR - ANY explicit page parameter is in the URL (e.g., ?page=1 or ?page=2)

To get auto-pagination, simply don't pass a page parameter. The transport will automatically use 250 items per page (Katana's max) unless you specify a limit, in which case your limit will be respected.

Source code in katana_public_api_client/katana_client.py
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
    """Handle request with automatic pagination for GET requests.

    Auto-pagination is ON by default for GET requests. It is disabled when:
    - `extensions={"auto_pagination": False}` is set, OR
    - ANY explicit `page` parameter is in the URL (e.g., `?page=1` or `?page=2`)

    To get auto-pagination, simply don't pass a page parameter. The transport
    will automatically use 250 items per page (Katana's max) unless you specify
    a limit, in which case your limit will be respected.
    """
    # Check if auto-pagination is explicitly disabled via request extensions
    auto_pagination = request.extensions.get("auto_pagination", True)

    # ANY page param in URL disables auto-pagination - caller wants specific page
    has_explicit_page = "page" in request.url.params

    # Only paginate GET requests when auto_pagination is enabled and no explicit page
    should_paginate = (
        request.method == "GET" and auto_pagination and not has_explicit_page
    )

    if should_paginate:
        return await self._handle_paginated_request(request)
    else:
        # For non-paginated requests, just pass through to wrapped transport
        return await self._wrapped_transport.handle_async_request(request)

RateLimitAwareRetry(*args, **kwargs)

Bases: Retry

Custom Retry class that allows non-idempotent methods (POST, PATCH) to be retried ONLY when receiving a 429 (Too Many Requests) status code.

For all other retryable status codes (502, 503, 504), only idempotent methods (HEAD, GET, PUT, DELETE, OPTIONS, TRACE) will be retried.

This ensures we don't accidentally retry non-idempotent operations after server errors, but we DO retry them when we're being rate-limited.

Source code in katana_public_api_client/katana_client.py
def __init__(self, *args: Any, **kwargs: Any):
    """Initialize and track the current request method."""
    super().__init__(*args, **kwargs)
    self._current_method: str | None = None
Functions
increment()

Return a new retry instance with the attempt count incremented.

Source code in katana_public_api_client/katana_client.py
def increment(self) -> "RateLimitAwareRetry":
    """Return a new retry instance with the attempt count incremented."""
    # Call parent's increment which creates a new instance of our class
    new_retry = cast(RateLimitAwareRetry, super().increment())
    # Preserve the current method across retry attempts
    new_retry._current_method = self._current_method
    return new_retry
is_retryable_method(method)

Allow all methods to pass through the initial check.

Store the method for later use in is_retryable_status_code.

Source code in katana_public_api_client/katana_client.py
def is_retryable_method(self, method: str) -> bool:
    """
    Allow all methods to pass through the initial check.

    Store the method for later use in is_retryable_status_code.
    """
    self._current_method = method.upper()
    # Accept all methods - we'll filter in is_retryable_status_code
    return self._current_method in self.allowed_methods
is_retryable_status_code(status_code)

Check if a status code is retryable for the current method.

For 429 (rate limiting), allow all methods. For other errors (502, 503, 504), only allow idempotent methods.

Source code in katana_public_api_client/katana_client.py
def is_retryable_status_code(self, status_code: int) -> bool:
    """
    Check if a status code is retryable for the current method.

    For 429 (rate limiting), allow all methods.
    For other errors (502, 503, 504), only allow idempotent methods.
    """
    # First check if the status code is in the allowed list at all
    if status_code not in self.status_forcelist:
        return False

    # If we don't know the method, fall back to default behavior
    if self._current_method is None:
        return True

    # Rate limiting (429) - retry all methods
    if status_code == HTTPStatus.TOO_MANY_REQUESTS:
        return True

    # Other retryable errors - only retry idempotent methods
    return self._current_method in self.IDEMPOTENT_METHODS

RateLimitTransport(wrapped_transport=None, *, requests_per_minute=60, logger=None, **kwargs)

Bases: AsyncBaseTransport

Proactive rate-limiter that respects Katana's X-Ratelimit-* headers.

Wraps another transport and gates each request through a pyrate-limiter token bucket sized for Katana's documented rate limit (60 req/min by default, X-Ratelimit-Limit per the spec). After every response the transport reads X-Ratelimit-Remaining / X-Ratelimit-Reset and adapts:

  • Sync down: when the server reports fewer remaining tokens than our local estimate (e.g., another client is sharing the API key), drain the local bucket to match. We never sync up — the server is authoritative on the lower bound only.
  • Reset gate: when remaining hits 0, an asyncio.Event blocks all future requests until X-Ratelimit-Reset elapses. This prevents pyrate's bucket from racing ahead of Katana's window.

Stack placement is innermost (above the base AsyncHTTPTransport): every actual HTTP request — including retries from RetryTransport above and per-page paginated fetches from PaginationTransport — consumes one token, matching how Katana counts requests server-side.

Retry-After waiting on 429 responses stays in RetryTransport (urllib3.Retry honors the header via respect_retry_after_header=True). Sleeping in this transport on 429 would double-delay; we only update the reset gate from headers and let retry handle the actual wait.

Out-of-order responses are handled correctly: the sync-down logic only fires when the response's remaining is below the current estimate, so a delayed earlier response with a higher remaining value won't overwrite a fresher (lower) estimate.

Inherits from AsyncBaseTransport (not AsyncHTTPTransport) because we delegate every request to _wrapped_transport — there's no need to spin up an unused connection pool inside this layer.

Parameters:

  • wrapped_transport (AsyncBaseTransport | None, default: None ) –

    The transport to wrap. If None, creates a new AsyncHTTPTransport.

  • requests_per_minute (int, default: 60 ) –

    Steady-state request budget. Must be > 0; callers wanting to disable the limiter entirely should omit this transport from the chain rather than passing 0.

  • logger (Logger | None, default: None ) –

    Logger instance for capturing state changes. If None, creates a default logger.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.

Source code in katana_public_api_client/katana_client.py
def __init__(
    self,
    wrapped_transport: AsyncBaseTransport | None = None,
    *,
    requests_per_minute: int = 60,
    logger: Logger | None = None,
    **kwargs: Any,
) -> None:
    """Initialize the rate-limit transport.

    Args:
        wrapped_transport: The transport to wrap. If None, creates a new
            AsyncHTTPTransport.
        requests_per_minute: Steady-state request budget. Must be ``> 0``;
            callers wanting to disable the limiter entirely should omit
            this transport from the chain rather than passing 0.
        logger: Logger instance for capturing state changes. If None,
            creates a default logger.
        **kwargs: Additional arguments passed to AsyncHTTPTransport if
            wrapped_transport is None.
    """
    if requests_per_minute <= 0:
        msg = (
            f"requests_per_minute must be positive, got {requests_per_minute}; "
            "to disable rate limiting, omit this transport from the chain"
        )
        raise ValueError(msg)
    if wrapped_transport is None:
        wrapped_transport = AsyncHTTPTransport(**kwargs)
    self._wrapped_transport = wrapped_transport
    self._rpm = requests_per_minute
    self._limiter = Limiter(
        Rate(requests_per_minute, Duration.MINUTE), buffer_ms=50
    )
    self._reset_gate = asyncio.Event()
    self._reset_gate.set()  # initially open — no active reset window
    self._reset_handle: asyncio.TimerHandle | None = None
    # Epoch-ms deadline of the active gate (or ``None`` when the gate is
    # open). Tracks the *latest* observed reset so out-of-order responses
    # with an earlier deadline can't shorten the gate, and so a timer
    # callback whose deadline has been superseded can no-op.
    self._reset_until_epoch_ms: int | None = None
    self._release_task: asyncio.Task[None] | None = None
    self._lock = asyncio.Lock()
    self._estimated_remaining = requests_per_minute
    self.logger: Logger = logger or logging.getLogger(__name__)
Functions
aclose() async

Cancel the pending reset timer and any in-flight release, then close the wrapped transport.

Two cancellation paths must run before delegating aclose down the chain:

  1. _reset_handle — the loop.call_later callback. If it fires after the loop is cleaned up, we get scheduling errors.
  2. _release_task — the async _release_reset_gate task spawned when the timer fired. If the timer fired just before shutdown, the task may still be running (waiting on self._lock); without explicit cancel + await, asyncio emits "Task was destroyed but it is pending" and the task can race with the wrapped transport's own shutdown.
Source code in katana_public_api_client/katana_client.py
async def aclose(self) -> None:
    """Cancel the pending reset timer and any in-flight release, then close the wrapped transport.

    Two cancellation paths must run before delegating ``aclose`` down
    the chain:

    1. ``_reset_handle`` — the ``loop.call_later`` callback. If it
       fires after the loop is cleaned up, we get scheduling errors.
    2. ``_release_task`` — the async ``_release_reset_gate`` task
       spawned when the timer fired. If the timer fired *just before*
       shutdown, the task may still be running (waiting on
       ``self._lock``); without explicit cancel + await, asyncio
       emits "Task was destroyed but it is pending" and the task can
       race with the wrapped transport's own shutdown.
    """
    if self._reset_handle is not None and not self._reset_handle.cancelled():
        self._reset_handle.cancel()
    self._reset_handle = None

    if self._release_task is not None and not self._release_task.done():
        self._release_task.cancel()
        # Suppress the cancellation so it doesn't propagate; we just
        # want the task off the loop before we close the wrapped
        # transport.
        with contextlib.suppress(asyncio.CancelledError):
            await self._release_task
    self._release_task = None

    await self._wrapped_transport.aclose()
handle_async_request(request) async

Acquire a token, forward the request, and observe rate-limit headers.

Source code in katana_public_api_client/katana_client.py
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
    """Acquire a token, forward the request, and observe rate-limit headers."""
    # Block on any active reset window (set when remaining hit 0 on a
    # prior response). Acts as the override on top of pyrate's bucket
    # so we don't fire the burst-budget into a window the server has
    # already declared exhausted.
    await self._reset_gate.wait()

    await self._limiter.try_acquire_async(name=_RATE_LIMIT_BUCKET_NAME)

    # Re-check the gate after acquiring. While this request was queued
    # on pyrate's bucket, a concurrent response observer may have seen
    # ``X-Ratelimit-Remaining: 0`` and engaged the gate; without the
    # second wait, we'd slip past it into the now-exhausted window.
    # Loop until the gate is stably open through both checks: an even
    # later engage during this wait should re-block us. The acquired
    # token is held across the wait — pyrate refills its bucket over
    # the window naturally, so this is not a wasted budget.
    while not self._reset_gate.is_set():
        await self._reset_gate.wait()

    # Optimistically debit our local estimate to match what the server
    # is about to see. Without this, the server's ``X-Ratelimit-Remaining``
    # response would always be one lower than our untouched estimate,
    # causing ``_observe_response`` to drain a redundant token on every
    # request and effectively halve our usable budget. The lock keeps
    # the debit and any concurrent sync-down consistent.
    async with self._lock:
        self._estimated_remaining = max(0, self._estimated_remaining - 1)

    response = await self._wrapped_transport.handle_async_request(request)

    await self._observe_response(response)
    return response

Functions

ResilientAsyncTransport(max_retries=5, max_pages=100, logger=None, *, requests_per_minute=60, **kwargs)

Factory function that creates a chained transport with error logging, pagination, rate limiting, and retry capabilities.

This function chains multiple transport layers (innermost → outermost): 1. AsyncHTTPTransport (base HTTP transport) 2. RateLimitTransport (proactive 60-req/min throttle, header-aware) 3. ErrorLoggingTransport (logs detailed 4xx errors) 4. PaginationTransport (auto-collects paginated responses) 5. RetryTransport (handles retries with Retry-After header support)

The rate limiter is innermost (above the base) because Katana counts every HTTP request — retries from the outer RetryTransport and individual paginated pages from PaginationTransport all consume server-side budget. Placing the limiter higher up would under-count.

Parameters:

  • max_retries (int, default: 5 ) –

    Maximum number of retry attempts for failed requests. Defaults to 5.

  • max_pages (int, default: 100 ) –

    Maximum number of pages to collect during auto-pagination. Defaults to 100.

  • requests_per_minute (int | None, default: 60 ) –

    Steady-state request budget for the rate-limit transport. Defaults to 60 (Katana's documented default). Pass None to omit the rate-limit layer entirely (e.g. when the caller is responsible for throttling, or for tests that need raw throughput).

  • logger (Logger | None, default: None ) –

    Logger instance for capturing operations. If None, creates a default logger.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the base AsyncHTTPTransport. Common parameters include: - http2 (bool): Enable HTTP/2 support - limits (httpx.Limits): Connection pool limits - verify (bool | str | ssl.SSLContext): SSL certificate verification - cert (str | tuple): Client-side certificates - trust_env (bool): Trust environment variables for proxy configuration

Returns:

  • RetryTransport

    A RetryTransport instance wrapping all the layered transports.

Note

When using a custom transport, parameters like http2, limits, and verify must be passed to this factory function (which passes them to the base AsyncHTTPTransport), not to the httpx.Client/AsyncClient constructor.

Example
transport = ResilientAsyncTransport(max_retries=3, max_pages=50)
async with httpx.AsyncClient(transport=transport) as client:
    response = await client.get("https://api.example.com/items")
Source code in katana_public_api_client/katana_client.py
def ResilientAsyncTransport(
    max_retries: int = 5,
    max_pages: int = 100,
    logger: Logger | None = None,
    *,
    requests_per_minute: int | None = 60,
    **kwargs: Any,
) -> RetryTransport:
    """
    Factory function that creates a chained transport with error logging,
    pagination, rate limiting, and retry capabilities.

    This function chains multiple transport layers (innermost → outermost):
    1. AsyncHTTPTransport (base HTTP transport)
    2. RateLimitTransport (proactive 60-req/min throttle, header-aware)
    3. ErrorLoggingTransport (logs detailed 4xx errors)
    4. PaginationTransport (auto-collects paginated responses)
    5. RetryTransport (handles retries with Retry-After header support)

    The rate limiter is innermost (above the base) because Katana counts
    *every* HTTP request — retries from the outer ``RetryTransport`` and
    individual paginated pages from ``PaginationTransport`` all consume
    server-side budget. Placing the limiter higher up would under-count.

    Args:
        max_retries: Maximum number of retry attempts for failed requests. Defaults to 5.
        max_pages: Maximum number of pages to collect during auto-pagination. Defaults to 100.
        requests_per_minute: Steady-state request budget for the rate-limit
            transport. Defaults to 60 (Katana's documented default). Pass ``None``
            to omit the rate-limit layer entirely (e.g. when the caller is
            responsible for throttling, or for tests that need raw throughput).
        logger: Logger instance for capturing operations. If None, creates a default logger.
        **kwargs: Additional arguments passed to the base AsyncHTTPTransport.
            Common parameters include:
            - http2 (bool): Enable HTTP/2 support
            - limits (httpx.Limits): Connection pool limits
            - verify (bool | str | ssl.SSLContext): SSL certificate verification
            - cert (str | tuple): Client-side certificates
            - trust_env (bool): Trust environment variables for proxy configuration

    Returns:
        A RetryTransport instance wrapping all the layered transports.

    Note:
        When using a custom transport, parameters like http2, limits, and verify
        must be passed to this factory function (which passes them to the base
        AsyncHTTPTransport), not to the httpx.Client/AsyncClient constructor.

    Example:
        ```python
        transport = ResilientAsyncTransport(max_retries=3, max_pages=50)
        async with httpx.AsyncClient(transport=transport) as client:
            response = await client.get("https://api.example.com/items")
        ```
    """
    resolved_logger: Logger = (
        logger if logger is not None else logging.getLogger(__name__)
    )

    # Build the transport chain from inside out:
    # 1. Base AsyncHTTPTransport
    inner_transport: AsyncBaseTransport = AsyncHTTPTransport(**kwargs)

    # 2. Wrap with rate limiting (innermost wrapping layer — every actual
    #    HTTP request, including retries and per-page paginated fetches,
    #    consumes one token. ``None`` skips this layer entirely.)
    if requests_per_minute is not None:
        inner_transport = RateLimitTransport(
            wrapped_transport=inner_transport,
            requests_per_minute=requests_per_minute,
            logger=resolved_logger,
        )

    # 3. Wrap with error logging
    error_logging_transport = ErrorLoggingTransport(
        wrapped_transport=inner_transport,
        logger=resolved_logger,
    )

    # 4. Wrap with pagination
    pagination_transport = PaginationTransport(
        wrapped_transport=error_logging_transport,
        max_pages=max_pages,
        logger=resolved_logger,
    )

    # Finally wrap with retry logic (outermost layer)
    # Use RateLimitAwareRetry which:
    # - Retries ALL methods (including POST/PATCH) for 429 rate limiting
    # - Retries ONLY idempotent methods for server errors (502, 503, 504)
    retry = RateLimitAwareRetry(
        total=max_retries,
        backoff_factor=1.0,  # Exponential backoff: 1, 2, 4, 8, 16 seconds
        respect_retry_after_header=True,  # Honor server's Retry-After header
        status_forcelist=[
            429,
            502,
            503,
            504,
        ],  # Status codes that should trigger retries
        allowed_methods=[
            "HEAD",
            "GET",
            "PUT",
            "DELETE",
            "OPTIONS",
            "TRACE",
            "POST",
            "PATCH",
        ],
    )
    retry_transport = RetryTransport(
        transport=pagination_transport,
        retry=retry,
    )

    return retry_transport