Skip to content

KatanaClient Guide

The KatanaClient is the modern, pythonic client for the Katana Manufacturing ERP API. It provides automatic resilience (retries, rate limiting, error handling) using httpx's native transport layer - no decorators or wrapper methods needed.

๐ŸŽฏ Key Features

  • ๐Ÿ›ก๏ธ Automatic Resilience: Transport-layer retries and rate limiting
  • ๐Ÿš€ Zero Configuration: Works out of the box with environment variables
  • ๐Ÿ“ฆ Complete Type Safety: Full type hints and IDE support
  • ๐Ÿ”„ Smart Pagination: Built-in pagination with safety limits
  • ๐Ÿ” Rich Observability: Structured logging and metrics
  • โšก Pythonic Design: Uses httpx's native extension points

๐Ÿš€ Quick Start

Installation & Setup

# Install the client
pip install -e .

# Create .env file with credentials
echo "KATANA_API_KEY=your-api-key-here" > .env
echo "KATANA_BASE_URL=https://api.katanamrp.com/v1" >> .env

Basic Usage

import asyncio

from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.utils import unwrap_data

async def main():
    # Automatic configuration from .env file
    async with KatanaClient() as client:
        # Direct API usage - automatic resilience built-in
        response = await get_all_products.asyncio_detailed(
            client=client,
            limit=50
        )

        products = unwrap_data(response, default=[])
        print(f"Retrieved {len(products)} products")

asyncio.run(main())

๐Ÿ“ฅ Response Handling

Use the helper utilities in katana_public_api_client.utils for consistent response handling. They replace manual status-code checks, give you typed errors, and handle the {"data": [...]} wrapping that Katana applies to every list endpoint.

Helpers at a glance

from katana_public_api_client.utils import unwrap, unwrap_as, unwrap_data, is_success
from katana_public_api_client.domain.converters import unwrap_unset

# Single-object responses (200 OK with parsed model โ€” includes POST creates)
order = unwrap_as(response, ManufacturingOrder)  # type-safe, raises on error

# List responses (200 OK with `data` array)
items = unwrap_data(response, default=[])  # extracts the .data field

# Success-only responses (204 No Content โ€” typically DELETE)
if is_success(response):
    ...

# attrs model fields that may be UNSET
status = unwrap_unset(order.status, None)  # returns None if UNSET

Katana POST creates return 200, not 201. Treat them like any other 200-with-parsed-body response โ€” use unwrap_as(response, Type). See spec-authoring.md for the full convention.

When to use each

Scenario Pattern Example
Single object (200) unwrap_as(response, Type) Get / update / create ops
List endpoint (200) unwrap_data(response, default=[]) List operations
Delete (204) is_success(response) DELETE
attrs UNSET field unwrap_unset(field, default) Optional API fields

Anti-patterns

# โŒ DON'T: manual status code checks
if response.status_code == 200:
    result = response.parsed
# โœ… DO: use helpers
result = unwrap_as(response, ExpectedType)

# โŒ DON'T: isinstance with UNSET
if not isinstance(value, type(UNSET)):
    use(value)
# โœ… DO: use unwrap_unset
use(unwrap_unset(value, default))

# โŒ DON'T: hasattr for attrs-defined fields
if hasattr(order, "status"):
    status = order.status
# โœ… DO: use unwrap_unset (attrs fields always exist, may be UNSET)
status = unwrap_unset(order.status, None)

# โŒ DON'T: wrap API methods to add retries / rate limiting
# โœ… DO: nothing โ€” resilience is at the transport layer; every endpoint inherits it.

# โŒ DON'T: build attrs request bodies with conditional UNSET
optional_field = value if value is not None else UNSET
# โœ… DO: use to_unset
from katana_public_api_client.domain.converters import to_unset
optional_field = to_unset(value)

Exception hierarchy

unwrap() and unwrap_as() raise typed exceptions on non-2xx responses:

  • AuthenticationError โ€” 401 Unauthorized
  • ValidationError โ€” 422 Unprocessable Entity
  • RateLimitError โ€” 429 Too Many Requests (retries exhausted)
  • ServerError โ€” 5xx server errors
  • APIError โ€” other errors (400, 403, 404, โ€ฆ)

is_success(response) returns True for any 2xx status without raising.

๐Ÿ›ก๏ธ Automatic Resilience

Every API call through KatanaClient automatically includes:

Smart Retries

  • Network Errors: Automatic retry with exponential backoff (1s, 2s, 4s, 8s, 16s)
  • Rate Limits (429): ALL methods (including POST/PATCH) retry with Retry-After header support
  • Server Errors (502/503/504): Only idempotent methods (GET, PUT, DELETE, etc.) are retried
  • Client Errors (4xx except 429): No retries
async with KatanaClient(max_retries=5) as client:
    # This call will automatically retry on failures
    # POST/PATCH requests will retry on 429 but not on 5xx errors
    # GET/PUT/DELETE requests will retry on both 429 and 5xx errors
    response = await get_all_products.asyncio_detailed(
        client=client,
        limit=100
    )
    # No decorators or wrapper methods needed!

Rate Limit Handling

The client throttles requests in two layers, both automatic:

  1. Proactive throttling via RateLimitTransport โ€” every outgoing request passes through a pyrate-limiter token bucket sized to Katana's documented 60 req/min budget. Concurrent and paginated workloads pace cleanly without thrashing 429s.
  2. Reactive Retry-After handling in RetryTransport โ€” if a 429 still slips through (e.g. another client is sharing the API key), the retry layer honors Retry-After (numeric seconds or HTTP-date) before the next attempt.

The proactive layer also reads X-Ratelimit-Remaining and X-Ratelimit-Reset on every response and adapts:

  • If the server reports fewer remaining tokens than the local estimate (e.g., another client consumed budget), the local bucket is drained to match.
  • When X-Ratelimit-Remaining hits 0, future requests are gated by an asyncio.Event until X-Ratelimit-Reset elapses.
# Default behavior โ€” 60/min budget, fully automatic.
async with KatanaClient() as client:
    # 100 calls fire as fast as the rate limit allows; no manual pacing
    # needed. We don't pass an explicit ``page`` parameter โ€” that would
    # disable auto-pagination โ€” so each call here is a full-result fetch
    # demonstrating throttling.
    for _ in range(100):
        response = await get_all_products.asyncio_detailed(
            client=client,
            limit=50,
        )

Tuning the budget

If your account has a different rate limit (some Katana plans differ), pass requests_per_minute=:

async with KatanaClient(requests_per_minute=120) as client:
    ...  # paces at 120/min instead of 60/min

Opting out

If you want to manage throttling yourself (or in a test that needs raw throughput), pass requests_per_minute=None to skip the rate-limit layer entirely:

# Reactive 429-retry only; no proactive throttling.
async with KatanaClient(requests_per_minute=None) as client:
    ...

Error Recovery

import logging

# Configure logging to see resilience in action
logging.basicConfig(level=logging.INFO)

async with KatanaClient() as client:
    # Automatic error recovery with detailed logging
    response = await get_all_products.asyncio_detailed(
        client=client,
        limit=100
    )
    # Logs will show retry attempts and recovery

๐Ÿ”„ Smart Pagination

Auto-pagination is ON by default for all GET requests. All pages are automatically collected into a single response.

Automatic Pagination (Default)

async with KatanaClient() as client:
    # Get ALL products across all pages automatically
    all_products = await get_all_products.asyncio_detailed(
        client=client,
        is_sellable=True,  # API filter parameters
        limit=250  # Page size (all pages still collected)
    )
    print(f"Total products: {len(all_products.parsed.data)}")

Single Page Request (Explicit Page)

To get a specific page instead of all pages, add an explicit page parameter. Note: ANY explicit page value (including page=1) disables auto-pagination:

async with KatanaClient() as client:
    # Get ONLY page 2 (auto-pagination disabled when page is explicit)
    page2_products = await get_all_products.asyncio_detailed(
        client=client,
        page=2,       # Explicit page disables auto-pagination
        limit=50
    )
    # Returns just the 50 items on page 2

    # page=1 ALSO disables auto-pagination (returns only first page)
    first_page = await get_all_products.asyncio_detailed(
        client=client,
        page=1,       # Get ONLY page 1, not all pages
        limit=50
    )

Limiting Total Items

To limit the total number of items collected (not just page size), use the max_items extension via the httpx client:

async with KatanaClient() as client:
    httpx_client = client.get_async_httpx_client()
    response = await httpx_client.get(
        "/products",
        params={"limit": 50},           # 50 items per page
        extensions={"max_items": 200}   # Stop after 200 items total
    )

The transport intelligently adjusts the limit on the final request to fetch only what's needed, avoiding over-fetching.

Pagination Behavior Summary

Parameter Scope Effect
limit=50 URL param Page size (50 items per request)
page=2 URL param Get specific page only (disables auto-pagination)
max_pages=5 Client Max pages to fetch
max_items=200 Extension Max total items to collect

โš™๏ธ Configuration

Authentication Methods

The client supports multiple authentication methods (in priority order):

  1. Direct parameter: Pass api_key to KatanaClient()
  2. Environment variable: Set KATANA_API_KEY
  3. .env file: Create a .env file with your credentials
  4. ~/.netrc file: Use standard Unix credential file
# Create .env file
KATANA_API_KEY=your-api-key-here
KATANA_BASE_URL=https://api.katanamrp.com/v1  # Optional
# Automatically loads from .env
async with KatanaClient() as client:
    # Uses credentials from .env
    pass

Using ~/.netrc file

For centralized credential management across multiple tools:

# Add to ~/.netrc
machine api.katanamrp.com
password your-api-key-here

# Set proper permissions (required)
chmod 600 ~/.netrc
# Automatically loads from ~/.netrc
async with KatanaClient() as client:
    # Uses credentials from ~/.netrc
    pass

Note: The password field in netrc stores your API key (bearer token), not an actual password. The login field is optional and ignored.

Using environment variable

export KATANA_API_KEY=your-api-key-here

Using direct parameter

async with KatanaClient(api_key="your-api-key-here") as client:
    # Explicit API key
    pass

Custom Configuration

import logging

# Custom configuration with stdlib logger
async with KatanaClient(
    api_key="custom-key",
    base_url="https://custom.katana.com/v1",
    timeout=60.0,           # Request timeout
    max_retries=5,          # Maximum retry attempts
    logger=logging.getLogger("custom")  # stdlib logger
) as client:
    # Your API calls here
    pass

The logger parameter accepts any object whose debug, info, warning, and error methods accept (msg, *args, **kwargs) โ€” the standard logging.Logger call convention. Both logging.Logger and structlog's BoundLogger satisfy this without adapters:

import structlog

# Use structlog directly โ€” no adapter needed
logger = structlog.get_logger("katana")
async with KatanaClient(logger=logger) as client:
    pass

Advanced httpx Configuration

import httpx

# Pass through httpx configuration
async with KatanaClient(
    # Standard KatanaClient options
    api_key="your-key",
    max_retries=3,

    # httpx client options
    verify=False,           # SSL verification
    proxies="http://proxy:8080",
    headers={"Custom": "Header"},
    event_hooks={
        "request": [custom_request_hook],
        "response": [custom_response_hook]
    }
) as client:
    # Client has both resilience AND custom httpx config
    pass

๐Ÿ” Observability

Logging

import logging

# Configure logging to see what's happening
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

async with KatanaClient() as client:
    # All resilience actions are logged
    response = await get_all_products.asyncio_detailed(
        client=client,
        limit=100
    )

Example log output:

2025-01-15 10:30:15 - katana_client - WARNING - Rate limited on attempt 1, waiting 60.0s
2025-01-15 10:31:16 - katana_client - INFO - Request succeeded after 2 attempts
2025-01-15 10:31:16 - katana_client - DEBUG - Response: 200 GET https://api.katanamrp.com/v1/products (1.24s)

Custom Event Hooks

async def custom_response_hook(response):
    """Custom hook to track API usage."""
    print(f"API call: {response.request.method} {response.request.url}")
    print(f"Status: {response.status_code}")

async with KatanaClient(
    event_hooks={
        "response": [custom_response_hook]
    }
) as client:
    # Your custom hooks are called alongside built-in ones
    response = await get_all_products.asyncio_detailed(
        client=client,
        limit=10
    )

๐Ÿงช Testing

Mocking for Tests

import pytest
from unittest.mock import AsyncMock, patch
from katana_public_api_client import KatanaClient

@pytest.mark.asyncio
async def test_api_integration():
    """Test API integration with mocked responses."""
    with patch.dict('os.environ', {'KATANA_API_KEY': 'test-key'}):
        async with KatanaClient() as client:
            # Mock the underlying httpx client
            with patch.object(client, 'get_async_httpx_client') as mock_httpx:
                mock_response = AsyncMock()
                mock_response.status_code = 200
                mock_response.json.return_value = {"data": [{"id": 1}]}

                mock_httpx.return_value.request = AsyncMock(return_value=mock_response)

                # Test your API logic here
                from katana_public_api_client.api.product import get_all_products
                response = await get_all_products.asyncio_detailed(
                    client=client,
                    limit=10
                )

                assert response.status_code == 200

Integration Tests

import os

import pytest

from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products

@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_api():
    """Test against real Katana API (requires KATANA_API_KEY)."""
    api_key = os.getenv('KATANA_API_KEY')
    if not api_key:
        pytest.skip("KATANA_API_KEY not set")

    async with KatanaClient() as client:
        response = await get_all_products.asyncio_detailed(
            client=client,
            limit=1
        )

        assert response.status_code == 200
        assert hasattr(response.parsed, 'data')

๐Ÿ”ง Advanced Patterns

Custom Transport

from katana_public_api_client.katana_client import ResilientAsyncTransport

# Create custom transport with different settings
custom_transport = ResilientAsyncTransport(
    max_retries=10,
    max_pages=50,  # Limit automatic pagination
    logger=custom_logger
)

async with KatanaClient(
    transport=custom_transport
) as client:
    # Uses your custom retry logic
    pass

Batch Operations

import asyncio
from katana_public_api_client.api.product import get_product

async def process_products_in_batches(product_ids, batch_size=10):
    """Process products in batches with automatic resilience."""
    async with KatanaClient() as client:
        results = []
        for i in range(0, len(product_ids), batch_size):
            batch = product_ids[i:i + batch_size]

            # Each call automatically has resilience
            batch_results = await asyncio.gather(*[
                get_product.asyncio_detailed(client=client, id=product_id)
                for product_id in batch
            ])

            results.extend(batch_results)

            # Be nice to the API
            await asyncio.sleep(0.1)

        return results

Error Handling

import httpx
from katana_public_api_client.utils import unwrap_data, APIError

async with KatanaClient() as client:
    try:
        response = await get_all_products.asyncio_detailed(
            client=client,
            limit=50
        )
        products = unwrap_data(response, default=[])
        print(f"Success: {len(products)} products")

    except httpx.TimeoutException:
        print("Request timed out after retries")
    except httpx.ConnectError:
        print("Connection failed after retries")
    except APIError as e:
        print(f"Unexpected API response: {e.status_code}")

๐Ÿ“š Best Practices

1. Use Context Managers

# โœ… Good: Properly manages connections
async with KatanaClient() as client:
    response = await get_all_products.asyncio_detailed(
        client=client,
        limit=50
    )

# โŒ Bad: Doesn't close connections
client = KatanaClient()
response = await get_all_products.asyncio_detailed(
    client=client,
    limit=50
)

2. Configure Appropriate Timeouts

# โœ… Good: Reasonable timeout for your use case
async with KatanaClient(timeout=30.0) as client:
    # Timeout is appropriate for expected response time
    pass

# โŒ Bad: Too short timeout causes unnecessary retries
async with KatanaClient(timeout=1.0) as client:
    # Will likely timeout and retry frequently
    pass

3. Let Auto-Pagination Handle Large Datasets

# โœ… Good: Auto-pagination is ON by default with safety limits
all_products = await get_all_products.asyncio_detailed(
    client=client,
    limit=250  # Sets page size; all pages collected automatically
)

# โœ… Good: Use explicit page when you need just one page
page2 = await get_all_products.asyncio_detailed(
    client=client,
    page=2,    # Explicit page = single page only
    limit=100
)

# โŒ Bad: Manual pagination loop without safety limits
page = 1
while True:  # Could run forever!
    response = await get_all_products.asyncio_detailed(
        client=client,
        page=page,
        limit=100
    )
    # ... handle response
    page += 1

4. Handle Different Response Types

from katana_public_api_client.utils import unwrap_data, AuthenticationError, APIError

async with KatanaClient() as client:
    response = await get_all_products.asyncio_detailed(
        client=client,
        limit=50
    )

    # โœ… Good: use unwrap helpers + typed exceptions
    try:
        products = unwrap_data(response, default=[])
        print(f"Retrieved {len(products)} products")
    except AuthenticationError:
        print("Authentication failed")
    except APIError as e:
        print(f"Unexpected status: {e.status_code}")

5. Use Direct API Calls

from katana_public_api_client.api.product import get_all_products

# โœ… Good: Direct API calls with automatic resilience
async with KatanaClient() as client:
    response = await get_all_products.asyncio_detailed(
        client=client,
        is_sellable=True
    )
    products = unwrap_data(response, default=[])

๐Ÿš€ Performance Tips

1. Reuse Client Instances

from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.api.sales_order import get_all_sales_orders
from katana_public_api_client.api.inventory import get_all_inventory_points

# โœ… Good: One client for multiple operations
async with KatanaClient() as client:
    products = await get_all_products.asyncio_detailed(client=client)
    orders = await get_all_sales_orders.asyncio_detailed(client=client)
    inventory = await get_all_inventory_points.asyncio_detailed(client=client)

# โŒ Bad: New client for each operation
for operation in [get_all_products, get_all_sales_orders]:
    async with KatanaClient() as client:
        await operation.asyncio_detailed(client=client)

2. Optimize Page Sizes

# โœ… Good: Reasonable page size
products = await get_all_products.asyncio_detailed(
    client=client,
    limit=250  # Good balance of efficiency and memory - automatically paginated
)

# โŒ Bad: Too small (many requests)
products = await get_all_products.asyncio_detailed(
    client=client,
    limit=10   # Will make many small requests
)

# โŒ Bad: Too large (may hit API limits)
products = await get_all_products.asyncio_detailed(
    client=client,
    limit=10000  # May exceed API limits
)

3. Use Concurrent Requests Wisely

import asyncio
from katana_public_api_client.api.product import get_product

# โœ… Good: Limited concurrency respects rate limits
async def get_multiple_products(product_ids):
    async with KatanaClient() as client:
        # Process in small batches
        results = []
        for i in range(0, len(product_ids), 5):  # 5 concurrent requests
            batch = product_ids[i:i+5]
            batch_results = await asyncio.gather(*[
                get_product.asyncio_detailed(client=client, id=pid)
                for pid in batch
            ])
            results.extend(batch_results)

            # Small delay between batches
            if i + 5 < len(product_ids):
                await asyncio.sleep(0.2)

        return results

๐Ÿ“– API Reference

KatanaClient

class KatanaClient(AuthenticatedClient):
    """The pythonic Katana API client with automatic resilience and pagination."""

    def __init__(
        self,
        api_key: Optional[str] = None,
        base_url: Optional[str] = None,
        timeout: float = 30.0,
        max_retries: int = 5,
        max_pages: int = 100,
        logger: Optional[Logger] = None,
        **httpx_kwargs: Any,
    ): ...

    async def __aenter__(self) -> "KatanaClient": ...
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...

ResilientAsyncTransport

Factory function that creates a layered transport with automatic resilience:

def ResilientAsyncTransport(
    max_retries: int = 5,
    max_pages: int = 100,
    logger: Optional[Logger] = None,
    **kwargs: Any
) -> RetryTransport:
    """
    Creates a chained transport with:
    1. AsyncHTTPTransport (base HTTP transport)
    2. ErrorLoggingTransport (logs detailed 4xx errors)
    3. PaginationTransport (auto-collects paginated responses)
    4. RetryTransport (handles retries with Retry-After header support)
    """
    ...

Next Steps: Check out the API Reference for detailed endpoint documentation, or see Testing Guide for testing patterns.