Skip to content

katana_public_api_client.katana_client

katana_public_api_client.katana_client

KatanaClient - The pythonic Katana API client with automatic resilience.

This client uses httpx's native transport layer to provide automatic retries, rate limiting, error handling, and pagination for all API calls without any decorators or wrapper methods needed.

Classes

ErrorLoggingTransport(wrapped_transport=None, logger=None, **kwargs)

Bases: AsyncHTTPTransport

Transport layer that adds detailed error logging for 4xx client errors.

This transport wraps another AsyncHTTPTransport and intercepts responses to log detailed error information using the generated error models.

Parameters:

  • wrapped_transport (AsyncHTTPTransport | None, default: None ) –

    The transport to wrap. If None, creates a new AsyncHTTPTransport.

  • logger (Logger | None, default: None ) –

    Logger instance for capturing error details. If None, creates a default logger.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.

Source code in katana_public_api_client/katana_client.py
def __init__(
    self,
    wrapped_transport: AsyncHTTPTransport | None = None,
    logger: logging.Logger | None = None,
    **kwargs: Any,
):
    """
    Initialize the error logging transport.

    Args:
        wrapped_transport: The transport to wrap. If None, creates a new AsyncHTTPTransport.
        logger: Logger instance for capturing error details. If None, creates a default logger.
        **kwargs: Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.
    """
    super().__init__()
    if wrapped_transport is None:
        wrapped_transport = AsyncHTTPTransport(**kwargs)
    self._wrapped_transport = wrapped_transport
    self.logger = logger or logging.getLogger(__name__)
Functions
handle_async_request(request) async

Handle request and log detailed error information for 4xx responses.

Source code in katana_public_api_client/katana_client.py
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
    """Handle request and log detailed error information for 4xx responses."""
    response = await self._wrapped_transport.handle_async_request(request)

    # Log detailed information for 400-level client errors
    if 400 <= response.status_code < 500:
        await self._log_client_error(response, request)

    return response

KatanaClient(api_key=None, base_url=None, timeout=30.0, max_retries=5, max_pages=100, logger=None, **httpx_kwargs)

Bases: AuthenticatedClient

The pythonic Katana API client with automatic resilience and pagination.

This client inherits from AuthenticatedClient and can be passed directly to generated API methods without needing the .client property.

Features: - Automatic retries on network errors and server errors (5xx) - Automatic rate limit handling with Retry-After header support - Smart auto-pagination that detects and handles paginated responses automatically - Rich logging and observability - Minimal configuration - just works out of the box

Usage
Auto-pagination happens automatically - just call the API

async with KatanaClient() as client: from katana_public_api_client.api.product import get_all_products

# This automatically collects all pages if pagination is detected
response = await get_all_products.asyncio_detailed(
    client=client,  # Pass client directly - no .client needed!
    limit=50  # All pages collected automatically
)

# Get specific page only (add page=X to disable auto-pagination)
response = await get_all_products.asyncio_detailed(
    client=client,
    page=1,      # Get specific page
    limit=100    # Set page size
)

# Control max pages globally
client_limited = KatanaClient(max_pages=5)  # Limit to 5 pages max

Parameters:

  • api_key (str | None, default: None ) –

    Katana API key. If None, will try to load from KATANA_API_KEY env var, .env file, or ~/.netrc file (in that order).

  • base_url (str | None, default: None ) –

    Base URL for the Katana API. Defaults to https://api.katanamrp.com/v1

  • timeout (float, default: 30.0 ) –

    Request timeout in seconds. Defaults to 30.0.

  • max_retries (int, default: 5 ) –

    Maximum number of retry attempts for failed requests. Defaults to 5.

  • max_pages (int, default: 100 ) –

    Maximum number of pages to collect during auto-pagination. Defaults to 100.

  • logger (Logger | None, default: None ) –

    Logger instance for capturing client operations. If None, creates a default logger.

  • **httpx_kwargs (Any, default: {} ) –

    Additional arguments passed to the base AsyncHTTPTransport. Common parameters include: - http2 (bool): Enable HTTP/2 support - limits (httpx.Limits): Connection pool limits - verify (bool | str | ssl.SSLContext): SSL certificate verification - cert (str | tuple): Client-side certificates - trust_env (bool): Trust environment variables for proxy configuration - event_hooks (dict): Custom event hooks (will be merged with built-in hooks)

Raises:

  • ValueError

    If no API key is provided via api_key param, KATANA_API_KEY env var, .env file, or ~/.netrc file.

Note

Transport-related parameters (http2, limits, verify, etc.) are correctly passed to the innermost AsyncHTTPTransport layer, ensuring they take effect even with the layered transport architecture.

Example

async with KatanaClient() as client: ... # All API calls through client get automatic resilience ... response = await some_api_method.asyncio_detailed(client=client)

Source code in katana_public_api_client/katana_client.py
def __init__(
    self,
    api_key: str | None = None,
    base_url: str | None = None,
    timeout: float = 30.0,
    max_retries: int = 5,
    max_pages: int = 100,
    logger: logging.Logger | None = None,
    **httpx_kwargs: Any,
):
    """
    Initialize the Katana API client with automatic resilience features.

    Args:
        api_key: Katana API key. If None, will try to load from KATANA_API_KEY env var,
            .env file, or ~/.netrc file (in that order).
        base_url: Base URL for the Katana API. Defaults to https://api.katanamrp.com/v1
        timeout: Request timeout in seconds. Defaults to 30.0.
        max_retries: Maximum number of retry attempts for failed requests. Defaults to 5.
        max_pages: Maximum number of pages to collect during auto-pagination. Defaults to 100.
        logger: Logger instance for capturing client operations. If None, creates a default logger.
        **httpx_kwargs: Additional arguments passed to the base AsyncHTTPTransport.
            Common parameters include:
            - http2 (bool): Enable HTTP/2 support
            - limits (httpx.Limits): Connection pool limits
            - verify (bool | str | ssl.SSLContext): SSL certificate verification
            - cert (str | tuple): Client-side certificates
            - trust_env (bool): Trust environment variables for proxy configuration
            - event_hooks (dict): Custom event hooks (will be merged with built-in hooks)

    Raises:
        ValueError: If no API key is provided via api_key param, KATANA_API_KEY env var,
            .env file, or ~/.netrc file.

    Note:
        Transport-related parameters (http2, limits, verify, etc.) are correctly
        passed to the innermost AsyncHTTPTransport layer, ensuring they take effect
        even with the layered transport architecture.

    Example:
        >>> async with KatanaClient() as client:
        ...     # All API calls through client get automatic resilience
        ...     response = await some_api_method.asyncio_detailed(client=client)
    """
    load_dotenv()

    # Handle backwards compatibility: accept 'token' kwarg as alias for 'api_key'
    if "token" in httpx_kwargs:
        if api_key is not None:
            raise ValueError("Cannot specify both 'api_key' and 'token' parameters")
        api_key = httpx_kwargs.pop("token")

    # Determine base_url early so we can use it for netrc lookup
    base_url = (
        base_url or os.getenv("KATANA_BASE_URL") or "https://api.katanamrp.com/v1"
    )

    # Setup credentials with priority: param > env (including .env) > netrc
    api_key = (
        api_key or os.getenv("KATANA_API_KEY") or self._read_from_netrc(base_url)
    )

    if not api_key:
        raise ValueError(
            "API key required via: api_key param, KATANA_API_KEY env var, "
            ".env file, or ~/.netrc"
        )

    self.logger = logger or logging.getLogger(__name__)
    self.max_pages = max_pages

    # Domain class instances (lazy-loaded)
    self._products: Products | None = None
    self._materials: Materials | None = None
    self._variants: Variants | None = None
    self._services: Services | None = None
    self._inventory: Inventory | None = None

    # Extract client-level parameters that shouldn't go to the transport
    # Event hooks for observability - start with our defaults
    event_hooks: dict[str, list[Callable[[httpx.Response], Awaitable[None]]]] = {
        "response": [
            self._capture_pagination_metadata,
            self._log_response_metrics,
        ]
    }

    # Extract and merge user hooks
    user_hooks = httpx_kwargs.pop("event_hooks", {})
    for event, hooks in user_hooks.items():
        # Normalize to list and add to existing or create new event
        hook_list = cast(
            list[Callable[[httpx.Response], Awaitable[None]]],
            hooks if isinstance(hooks, list) else [hooks],
        )
        if event in event_hooks:
            event_hooks[event].extend(hook_list)
        else:
            event_hooks[event] = hook_list

    # Check if user wants to override the transport entirely
    custom_transport = httpx_kwargs.pop("transport", None) or httpx_kwargs.pop(
        "async_transport", None
    )

    if custom_transport:
        # User provided a custom transport, use it as-is
        transport = custom_transport
    else:
        # Separate transport-specific kwargs from client-specific kwargs
        # Client-specific params that should NOT go to the transport
        client_only_params = ["headers", "cookies", "params", "auth"]
        client_kwargs = {
            k: httpx_kwargs.pop(k)
            for k in list(httpx_kwargs.keys())
            if k in client_only_params
        }

        # Create resilient transport with remaining transport-specific httpx_kwargs
        # These will be passed to the base AsyncHTTPTransport (http2, limits, verify, etc.)
        transport = ResilientAsyncTransport(
            max_retries=max_retries,
            max_pages=max_pages,
            logger=self.logger,
            **httpx_kwargs,  # Pass through http2, limits, verify, cert, trust_env, etc.
        )

        # Put client-specific params back into httpx_kwargs for the parent class
        httpx_kwargs.update(client_kwargs)

    # Initialize the parent AuthenticatedClient
    super().__init__(
        base_url=base_url,
        token=api_key,
        timeout=httpx.Timeout(timeout),
        httpx_args={
            "transport": transport,
            "event_hooks": event_hooks,
            **httpx_kwargs,  # Include any remaining client-level kwargs
        },
    )
Attributes
inventory property

Access inventory and stock operations.

Returns:

  • Inventory

    Inventory instance for stock levels, movements, and adjustments.

Example

async with KatanaClient() as client: ... # Check stock levels ... stock = await client.inventory.check_stock("WIDGET-001") ... low_stock = await client.inventory.list_low_stock(threshold=10)

materials property

Access material catalog operations.

Returns:

  • Materials

    Materials instance for material CRUD operations.

Example

async with KatanaClient() as client: ... materials = await client.materials.list() ... material = await client.materials.get(123)

products property

Access product catalog operations.

Returns:

  • Products

    Products instance for product CRUD and search operations.

Example

async with KatanaClient() as client: ... # Product CRUD ... products = await client.products.list(is_sellable=True) ... product = await client.products.get(123) ... results = await client.products.search("widget")

services property

Access service catalog operations.

Returns:

  • Services

    Services instance for service CRUD operations.

Example

async with KatanaClient() as client: ... services = await client.services.list() ... service = await client.services.get(123)

variants property

Access variant catalog operations.

Returns:

  • Variants

    Variants instance for variant CRUD operations.

Example

async with KatanaClient() as client: ... variants = await client.variants.list() ... variant = await client.variants.get(123)

Functions

PaginationTransport(wrapped_transport=None, max_pages=100, logger=None, **kwargs)

Bases: AsyncHTTPTransport

Transport layer that adds automatic pagination for GET requests.

This transport wraps another transport and automatically collects all pages for requests that use 'page' or 'limit' parameters.

Parameters:

  • wrapped_transport (AsyncHTTPTransport | None, default: None ) –

    The transport to wrap. If None, creates a new AsyncHTTPTransport.

  • max_pages (int, default: 100 ) –

    Maximum number of pages to collect during auto-pagination. Defaults to 100.

  • logger (Logger | None, default: None ) –

    Logger instance for capturing pagination operations. If None, creates a default logger.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.

Source code in katana_public_api_client/katana_client.py
def __init__(
    self,
    wrapped_transport: AsyncHTTPTransport | None = None,
    max_pages: int = 100,
    logger: logging.Logger | None = None,
    **kwargs: Any,
):
    """
    Initialize the pagination transport.

    Args:
        wrapped_transport: The transport to wrap. If None, creates a new AsyncHTTPTransport.
        max_pages: Maximum number of pages to collect during auto-pagination. Defaults to 100.
        logger: Logger instance for capturing pagination operations. If None, creates a default logger.
        **kwargs: Additional arguments passed to AsyncHTTPTransport if wrapped_transport is None.
    """
    # If no wrapped transport provided, create a base one
    if wrapped_transport is None:
        wrapped_transport = AsyncHTTPTransport(**kwargs)
        super().__init__()
    else:
        super().__init__()

    self._wrapped_transport = wrapped_transport
    self.max_pages = max_pages
    self.logger = logger or logging.getLogger(__name__)
Functions
handle_async_request(request) async

Handle request with automatic pagination for GET requests with page/limit params.

Source code in katana_public_api_client/katana_client.py
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
    """Handle request with automatic pagination for GET requests with page/limit params."""
    # Check if this is a paginated request
    should_paginate = (
        request.method == "GET"
        and hasattr(request, "url")
        and request.url
        and request.url.params
        and ("page" in request.url.params or "limit" in request.url.params)
    )

    if should_paginate:
        return await self._handle_paginated_request(request)
    else:
        # For non-paginated requests, just pass through to wrapped transport
        return await self._wrapped_transport.handle_async_request(request)

RateLimitAwareRetry(*args, **kwargs)

Bases: Retry

Custom Retry class that allows non-idempotent methods (POST, PATCH) to be retried ONLY when receiving a 429 (Too Many Requests) status code.

For all other retryable status codes (502, 503, 504), only idempotent methods (HEAD, GET, PUT, DELETE, OPTIONS, TRACE) will be retried.

This ensures we don't accidentally retry non-idempotent operations after server errors, but we DO retry them when we're being rate-limited.

Source code in katana_public_api_client/katana_client.py
def __init__(self, *args: Any, **kwargs: Any):
    """Initialize and track the current request method."""
    super().__init__(*args, **kwargs)
    self._current_method: str | None = None
Functions
increment()

Return a new retry instance with the attempt count incremented.

Source code in katana_public_api_client/katana_client.py
def increment(self) -> "RateLimitAwareRetry":
    """Return a new retry instance with the attempt count incremented."""
    # Call parent's increment which creates a new instance of our class
    new_retry = cast(RateLimitAwareRetry, super().increment())
    # Preserve the current method across retry attempts
    new_retry._current_method = self._current_method
    return new_retry
is_retryable_method(method)

Allow all methods to pass through the initial check.

Store the method for later use in is_retryable_status_code.

Source code in katana_public_api_client/katana_client.py
def is_retryable_method(self, method: str) -> bool:
    """
    Allow all methods to pass through the initial check.

    Store the method for later use in is_retryable_status_code.
    """
    self._current_method = method.upper()
    # Accept all methods - we'll filter in is_retryable_status_code
    return self._current_method in self.allowed_methods
is_retryable_status_code(status_code)

Check if a status code is retryable for the current method.

For 429 (rate limiting), allow all methods. For other errors (502, 503, 504), only allow idempotent methods.

Source code in katana_public_api_client/katana_client.py
def is_retryable_status_code(self, status_code: int) -> bool:
    """
    Check if a status code is retryable for the current method.

    For 429 (rate limiting), allow all methods.
    For other errors (502, 503, 504), only allow idempotent methods.
    """
    # First check if the status code is in the allowed list at all
    if status_code not in self.status_forcelist:
        return False

    # If we don't know the method, fall back to default behavior
    if self._current_method is None:
        return True

    # Rate limiting (429) - retry all methods
    if status_code == HTTPStatus.TOO_MANY_REQUESTS:
        return True

    # Other retryable errors - only retry idempotent methods
    return self._current_method in self.IDEMPOTENT_METHODS

Functions

ResilientAsyncTransport(max_retries=5, max_pages=100, logger=None, **kwargs)

Factory function that creates a chained transport with error logging, pagination, and retry capabilities.

This function chains multiple transport layers: 1. AsyncHTTPTransport (base HTTP transport) 2. ErrorLoggingTransport (logs detailed 4xx errors) 3. PaginationTransport (auto-collects paginated responses) 4. RetryTransport (handles retries with Retry-After header support)

Parameters:

  • max_retries (int, default: 5 ) –

    Maximum number of retry attempts for failed requests. Defaults to 5.

  • max_pages (int, default: 100 ) –

    Maximum number of pages to collect during auto-pagination. Defaults to 100.

  • logger (Logger | None, default: None ) –

    Logger instance for capturing operations. If None, creates a default logger.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the base AsyncHTTPTransport. Common parameters include: - http2 (bool): Enable HTTP/2 support - limits (httpx.Limits): Connection pool limits - verify (bool | str | ssl.SSLContext): SSL certificate verification - cert (str | tuple): Client-side certificates - trust_env (bool): Trust environment variables for proxy configuration

Returns:

  • RetryTransport

    A RetryTransport instance wrapping all the layered transports.

Note

When using a custom transport, parameters like http2, limits, and verify must be passed to this factory function (which passes them to the base AsyncHTTPTransport), not to the httpx.Client/AsyncClient constructor.

Example
transport = ResilientAsyncTransport(max_retries=3, max_pages=50)
async with httpx.AsyncClient(transport=transport) as client:
    response = await client.get("https://api.example.com/items")
Source code in katana_public_api_client/katana_client.py
def ResilientAsyncTransport(
    max_retries: int = 5,
    max_pages: int = 100,
    logger: logging.Logger | None = None,
    **kwargs: Any,
) -> RetryTransport:
    """
    Factory function that creates a chained transport with error logging,
    pagination, and retry capabilities.

    This function chains multiple transport layers:
    1. AsyncHTTPTransport (base HTTP transport)
    2. ErrorLoggingTransport (logs detailed 4xx errors)
    3. PaginationTransport (auto-collects paginated responses)
    4. RetryTransport (handles retries with Retry-After header support)

    Args:
        max_retries: Maximum number of retry attempts for failed requests. Defaults to 5.
        max_pages: Maximum number of pages to collect during auto-pagination. Defaults to 100.
        logger: Logger instance for capturing operations. If None, creates a default logger.
        **kwargs: Additional arguments passed to the base AsyncHTTPTransport.
            Common parameters include:
            - http2 (bool): Enable HTTP/2 support
            - limits (httpx.Limits): Connection pool limits
            - verify (bool | str | ssl.SSLContext): SSL certificate verification
            - cert (str | tuple): Client-side certificates
            - trust_env (bool): Trust environment variables for proxy configuration

    Returns:
        A RetryTransport instance wrapping all the layered transports.

    Note:
        When using a custom transport, parameters like http2, limits, and verify
        must be passed to this factory function (which passes them to the base
        AsyncHTTPTransport), not to the httpx.Client/AsyncClient constructor.

    Example:
        ```python
        transport = ResilientAsyncTransport(max_retries=3, max_pages=50)
        async with httpx.AsyncClient(transport=transport) as client:
            response = await client.get("https://api.example.com/items")
        ```
    """
    if logger is None:
        logger = logging.getLogger(__name__)

    # Build the transport chain from inside out:
    # 1. Base AsyncHTTPTransport
    base_transport = AsyncHTTPTransport(**kwargs)

    # 2. Wrap with error logging
    error_logging_transport = ErrorLoggingTransport(
        wrapped_transport=base_transport,
        logger=logger,
    )

    # 3. Wrap with pagination
    pagination_transport = PaginationTransport(
        wrapped_transport=error_logging_transport,
        max_pages=max_pages,
        logger=logger,
    )

    # Finally wrap with retry logic (outermost layer)
    # Use RateLimitAwareRetry which:
    # - Retries ALL methods (including POST/PATCH) for 429 rate limiting
    # - Retries ONLY idempotent methods for server errors (502, 503, 504)
    retry = RateLimitAwareRetry(
        total=max_retries,
        backoff_factor=1.0,  # Exponential backoff: 1, 2, 4, 8, 16 seconds
        respect_retry_after_header=True,  # Honor server's Retry-After header
        status_forcelist=[
            429,
            502,
            503,
            504,
        ],  # Status codes that should trigger retries
        allowed_methods=[
            "HEAD",
            "GET",
            "PUT",
            "DELETE",
            "OPTIONS",
            "TRACE",
            "POST",
            "PATCH",
        ],
    )
    retry_transport = RetryTransport(
        transport=pagination_transport,
        retry=retry,
    )

    return retry_transport