KatanaClient Guide¶
The KatanaClient is the modern, pythonic client for the Katana Manufacturing ERP API. It provides automatic resilience (retries, rate limiting, error handling) using httpx's native transport layer - no decorators or wrapper methods needed.
๐ฏ Key Features¶
- ๐ก๏ธ Automatic Resilience: Transport-layer retries and rate limiting
- ๐ Zero Configuration: Works out of the box with environment variables
- ๐ฆ Complete Type Safety: Full type hints and IDE support
- ๐ Smart Pagination: Built-in pagination with safety limits
- ๐ Rich Observability: Structured logging and metrics
- โก Pythonic Design: Uses httpx's native extension points
๐ Quick Start¶
Installation & Setup¶
# Install the client
pip install -e .
# Create .env file with credentials
echo "KATANA_API_KEY=your-api-key-here" > .env
echo "KATANA_BASE_URL=https://api.katanamrp.com/v1" >> .env
Basic Usage¶
import asyncio
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.utils import unwrap_data
async def main():
# Automatic configuration from .env file
async with KatanaClient() as client:
# Direct API usage - automatic resilience built-in
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
products = unwrap_data(response, default=[])
print(f"Retrieved {len(products)} products")
asyncio.run(main())
๐ฅ Response Handling¶
Use the helper utilities in katana_public_api_client.utils for consistent response
handling. They replace manual status-code checks, give you typed errors, and handle the
{"data": [...]} wrapping that Katana applies to every list endpoint.
Helpers at a glance¶
from katana_public_api_client.utils import unwrap, unwrap_as, unwrap_data, is_success
from katana_public_api_client.domain.converters import unwrap_unset
# Single-object responses (200 OK with parsed model โ includes POST creates)
order = unwrap_as(response, ManufacturingOrder) # type-safe, raises on error
# List responses (200 OK with `data` array)
items = unwrap_data(response, default=[]) # extracts the .data field
# Success-only responses (204 No Content โ typically DELETE)
if is_success(response):
...
# attrs model fields that may be UNSET
status = unwrap_unset(order.status, None) # returns None if UNSET
Katana POST creates return 200, not 201. Treat them like any other 200-with-parsed-body response โ use
unwrap_as(response, Type). See spec-authoring.md for the full convention.
When to use each¶
| Scenario | Pattern | Example |
|---|---|---|
| Single object (200) | unwrap_as(response, Type) |
Get / update / create ops |
| List endpoint (200) | unwrap_data(response, default=[]) |
List operations |
| Delete (204) | is_success(response) |
DELETE |
| attrs UNSET field | unwrap_unset(field, default) |
Optional API fields |
Anti-patterns¶
# โ DON'T: manual status code checks
if response.status_code == 200:
result = response.parsed
# โ
DO: use helpers
result = unwrap_as(response, ExpectedType)
# โ DON'T: isinstance with UNSET
if not isinstance(value, type(UNSET)):
use(value)
# โ
DO: use unwrap_unset
use(unwrap_unset(value, default))
# โ DON'T: hasattr for attrs-defined fields
if hasattr(order, "status"):
status = order.status
# โ
DO: use unwrap_unset (attrs fields always exist, may be UNSET)
status = unwrap_unset(order.status, None)
# โ DON'T: wrap API methods to add retries / rate limiting
# โ
DO: nothing โ resilience is at the transport layer; every endpoint inherits it.
# โ DON'T: build attrs request bodies with conditional UNSET
optional_field = value if value is not None else UNSET
# โ
DO: use to_unset
from katana_public_api_client.domain.converters import to_unset
optional_field = to_unset(value)
Exception hierarchy¶
unwrap() and unwrap_as() raise typed exceptions on non-2xx responses:
AuthenticationErrorโ 401 UnauthorizedValidationErrorโ 422 Unprocessable EntityRateLimitErrorโ 429 Too Many Requests (retries exhausted)ServerErrorโ 5xx server errorsAPIErrorโ other errors (400, 403, 404, โฆ)
is_success(response) returns True for any 2xx status without raising.
๐ก๏ธ Automatic Resilience¶
Every API call through KatanaClient automatically includes:
Smart Retries¶
- Network Errors: Automatic retry with exponential backoff (1s, 2s, 4s, 8s, 16s)
- Rate Limits (429): ALL methods (including POST/PATCH) retry with
Retry-Afterheader support - Server Errors (502/503/504): Only idempotent methods (GET, PUT, DELETE, etc.) are retried
- Client Errors (4xx except 429): No retries
async with KatanaClient(max_retries=5) as client:
# This call will automatically retry on failures
# POST/PATCH requests will retry on 429 but not on 5xx errors
# GET/PUT/DELETE requests will retry on both 429 and 5xx errors
response = await get_all_products.asyncio_detailed(
client=client,
limit=100
)
# No decorators or wrapper methods needed!
Rate Limit Handling¶
The client throttles requests in two layers, both automatic:
- Proactive throttling via
RateLimitTransportโ every outgoing request passes through a pyrate-limiter token bucket sized to Katana's documented 60 req/min budget. Concurrent and paginated workloads pace cleanly without thrashing 429s. - Reactive
Retry-Afterhandling inRetryTransportโ if a 429 still slips through (e.g. another client is sharing the API key), the retry layer honorsRetry-After(numeric seconds or HTTP-date) before the next attempt.
The proactive layer also reads X-Ratelimit-Remaining and X-Ratelimit-Reset on every
response and adapts:
- If the server reports fewer remaining tokens than the local estimate (e.g., another client consumed budget), the local bucket is drained to match.
- When
X-Ratelimit-Remaininghits 0, future requests are gated by anasyncio.EventuntilX-Ratelimit-Resetelapses.
# Default behavior โ 60/min budget, fully automatic.
async with KatanaClient() as client:
# 100 calls fire as fast as the rate limit allows; no manual pacing
# needed. We don't pass an explicit ``page`` parameter โ that would
# disable auto-pagination โ so each call here is a full-result fetch
# demonstrating throttling.
for _ in range(100):
response = await get_all_products.asyncio_detailed(
client=client,
limit=50,
)
Tuning the budget¶
If your account has a different rate limit (some Katana plans differ), pass
requests_per_minute=:
async with KatanaClient(requests_per_minute=120) as client:
... # paces at 120/min instead of 60/min
Opting out¶
If you want to manage throttling yourself (or in a test that needs raw throughput), pass
requests_per_minute=None to skip the rate-limit layer entirely:
# Reactive 429-retry only; no proactive throttling.
async with KatanaClient(requests_per_minute=None) as client:
...
Error Recovery¶
import logging
# Configure logging to see resilience in action
logging.basicConfig(level=logging.INFO)
async with KatanaClient() as client:
# Automatic error recovery with detailed logging
response = await get_all_products.asyncio_detailed(
client=client,
limit=100
)
# Logs will show retry attempts and recovery
๐ Smart Pagination¶
Auto-pagination is ON by default for all GET requests. All pages are automatically collected into a single response.
Automatic Pagination (Default)¶
async with KatanaClient() as client:
# Get ALL products across all pages automatically
all_products = await get_all_products.asyncio_detailed(
client=client,
is_sellable=True, # API filter parameters
limit=250 # Page size (all pages still collected)
)
print(f"Total products: {len(all_products.parsed.data)}")
Single Page Request (Explicit Page)¶
To get a specific page instead of all pages, add an explicit page parameter. Note:
ANY explicit page value (including page=1) disables auto-pagination:
async with KatanaClient() as client:
# Get ONLY page 2 (auto-pagination disabled when page is explicit)
page2_products = await get_all_products.asyncio_detailed(
client=client,
page=2, # Explicit page disables auto-pagination
limit=50
)
# Returns just the 50 items on page 2
# page=1 ALSO disables auto-pagination (returns only first page)
first_page = await get_all_products.asyncio_detailed(
client=client,
page=1, # Get ONLY page 1, not all pages
limit=50
)
Limiting Total Items¶
To limit the total number of items collected (not just page size), use the max_items
extension via the httpx client:
async with KatanaClient() as client:
httpx_client = client.get_async_httpx_client()
response = await httpx_client.get(
"/products",
params={"limit": 50}, # 50 items per page
extensions={"max_items": 200} # Stop after 200 items total
)
The transport intelligently adjusts the limit on the final request to fetch only
what's needed, avoiding over-fetching.
Pagination Behavior Summary¶
| Parameter | Scope | Effect |
|---|---|---|
limit=50 |
URL param | Page size (50 items per request) |
page=2 |
URL param | Get specific page only (disables auto-pagination) |
max_pages=5 |
Client | Max pages to fetch |
max_items=200 |
Extension | Max total items to collect |
โ๏ธ Configuration¶
Authentication Methods¶
The client supports multiple authentication methods (in priority order):
- Direct parameter: Pass
api_keytoKatanaClient() - Environment variable: Set
KATANA_API_KEY .envfile: Create a.envfile with your credentials~/.netrcfile: Use standard Unix credential file
Using .env file (Recommended)¶
# Create .env file
KATANA_API_KEY=your-api-key-here
KATANA_BASE_URL=https://api.katanamrp.com/v1 # Optional
# Automatically loads from .env
async with KatanaClient() as client:
# Uses credentials from .env
pass
Using ~/.netrc file¶
For centralized credential management across multiple tools:
# Add to ~/.netrc
machine api.katanamrp.com
password your-api-key-here
# Set proper permissions (required)
chmod 600 ~/.netrc
# Automatically loads from ~/.netrc
async with KatanaClient() as client:
# Uses credentials from ~/.netrc
pass
Note: The password field in netrc stores your API key (bearer token), not an
actual password. The login field is optional and ignored.
Using environment variable¶
Using direct parameter¶
Custom Configuration¶
import logging
# Custom configuration with stdlib logger
async with KatanaClient(
api_key="custom-key",
base_url="https://custom.katana.com/v1",
timeout=60.0, # Request timeout
max_retries=5, # Maximum retry attempts
logger=logging.getLogger("custom") # stdlib logger
) as client:
# Your API calls here
pass
The logger parameter accepts any object whose debug, info, warning, and error
methods accept (msg, *args, **kwargs) โ the standard logging.Logger call convention.
Both logging.Logger and structlog's BoundLogger satisfy this without adapters:
import structlog
# Use structlog directly โ no adapter needed
logger = structlog.get_logger("katana")
async with KatanaClient(logger=logger) as client:
pass
Advanced httpx Configuration¶
import httpx
# Pass through httpx configuration
async with KatanaClient(
# Standard KatanaClient options
api_key="your-key",
max_retries=3,
# httpx client options
verify=False, # SSL verification
proxies="http://proxy:8080",
headers={"Custom": "Header"},
event_hooks={
"request": [custom_request_hook],
"response": [custom_response_hook]
}
) as client:
# Client has both resilience AND custom httpx config
pass
๐ Observability¶
Logging¶
import logging
# Configure logging to see what's happening
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
async with KatanaClient() as client:
# All resilience actions are logged
response = await get_all_products.asyncio_detailed(
client=client,
limit=100
)
Example log output:
2025-01-15 10:30:15 - katana_client - WARNING - Rate limited on attempt 1, waiting 60.0s
2025-01-15 10:31:16 - katana_client - INFO - Request succeeded after 2 attempts
2025-01-15 10:31:16 - katana_client - DEBUG - Response: 200 GET https://api.katanamrp.com/v1/products (1.24s)
Custom Event Hooks¶
async def custom_response_hook(response):
"""Custom hook to track API usage."""
print(f"API call: {response.request.method} {response.request.url}")
print(f"Status: {response.status_code}")
async with KatanaClient(
event_hooks={
"response": [custom_response_hook]
}
) as client:
# Your custom hooks are called alongside built-in ones
response = await get_all_products.asyncio_detailed(
client=client,
limit=10
)
๐งช Testing¶
Mocking for Tests¶
import pytest
from unittest.mock import AsyncMock, patch
from katana_public_api_client import KatanaClient
@pytest.mark.asyncio
async def test_api_integration():
"""Test API integration with mocked responses."""
with patch.dict('os.environ', {'KATANA_API_KEY': 'test-key'}):
async with KatanaClient() as client:
# Mock the underlying httpx client
with patch.object(client, 'get_async_httpx_client') as mock_httpx:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"data": [{"id": 1}]}
mock_httpx.return_value.request = AsyncMock(return_value=mock_response)
# Test your API logic here
from katana_public_api_client.api.product import get_all_products
response = await get_all_products.asyncio_detailed(
client=client,
limit=10
)
assert response.status_code == 200
Integration Tests¶
import os
import pytest
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_api():
"""Test against real Katana API (requires KATANA_API_KEY)."""
api_key = os.getenv('KATANA_API_KEY')
if not api_key:
pytest.skip("KATANA_API_KEY not set")
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
limit=1
)
assert response.status_code == 200
assert hasattr(response.parsed, 'data')
๐ง Advanced Patterns¶
Custom Transport¶
from katana_public_api_client.katana_client import ResilientAsyncTransport
# Create custom transport with different settings
custom_transport = ResilientAsyncTransport(
max_retries=10,
max_pages=50, # Limit automatic pagination
logger=custom_logger
)
async with KatanaClient(
transport=custom_transport
) as client:
# Uses your custom retry logic
pass
Batch Operations¶
import asyncio
from katana_public_api_client.api.product import get_product
async def process_products_in_batches(product_ids, batch_size=10):
"""Process products in batches with automatic resilience."""
async with KatanaClient() as client:
results = []
for i in range(0, len(product_ids), batch_size):
batch = product_ids[i:i + batch_size]
# Each call automatically has resilience
batch_results = await asyncio.gather(*[
get_product.asyncio_detailed(client=client, id=product_id)
for product_id in batch
])
results.extend(batch_results)
# Be nice to the API
await asyncio.sleep(0.1)
return results
Error Handling¶
import httpx
from katana_public_api_client.utils import unwrap_data, APIError
async with KatanaClient() as client:
try:
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
products = unwrap_data(response, default=[])
print(f"Success: {len(products)} products")
except httpx.TimeoutException:
print("Request timed out after retries")
except httpx.ConnectError:
print("Connection failed after retries")
except APIError as e:
print(f"Unexpected API response: {e.status_code}")
๐ Best Practices¶
1. Use Context Managers¶
# โ
Good: Properly manages connections
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
# โ Bad: Doesn't close connections
client = KatanaClient()
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
2. Configure Appropriate Timeouts¶
# โ
Good: Reasonable timeout for your use case
async with KatanaClient(timeout=30.0) as client:
# Timeout is appropriate for expected response time
pass
# โ Bad: Too short timeout causes unnecessary retries
async with KatanaClient(timeout=1.0) as client:
# Will likely timeout and retry frequently
pass
3. Let Auto-Pagination Handle Large Datasets¶
# โ
Good: Auto-pagination is ON by default with safety limits
all_products = await get_all_products.asyncio_detailed(
client=client,
limit=250 # Sets page size; all pages collected automatically
)
# โ
Good: Use explicit page when you need just one page
page2 = await get_all_products.asyncio_detailed(
client=client,
page=2, # Explicit page = single page only
limit=100
)
# โ Bad: Manual pagination loop without safety limits
page = 1
while True: # Could run forever!
response = await get_all_products.asyncio_detailed(
client=client,
page=page,
limit=100
)
# ... handle response
page += 1
4. Handle Different Response Types¶
from katana_public_api_client.utils import unwrap_data, AuthenticationError, APIError
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
# โ
Good: use unwrap helpers + typed exceptions
try:
products = unwrap_data(response, default=[])
print(f"Retrieved {len(products)} products")
except AuthenticationError:
print("Authentication failed")
except APIError as e:
print(f"Unexpected status: {e.status_code}")
5. Use Direct API Calls¶
from katana_public_api_client.api.product import get_all_products
# โ
Good: Direct API calls with automatic resilience
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
is_sellable=True
)
products = unwrap_data(response, default=[])
๐ Performance Tips¶
1. Reuse Client Instances¶
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.api.sales_order import get_all_sales_orders
from katana_public_api_client.api.inventory import get_all_inventory_points
# โ
Good: One client for multiple operations
async with KatanaClient() as client:
products = await get_all_products.asyncio_detailed(client=client)
orders = await get_all_sales_orders.asyncio_detailed(client=client)
inventory = await get_all_inventory_points.asyncio_detailed(client=client)
# โ Bad: New client for each operation
for operation in [get_all_products, get_all_sales_orders]:
async with KatanaClient() as client:
await operation.asyncio_detailed(client=client)
2. Optimize Page Sizes¶
# โ
Good: Reasonable page size
products = await get_all_products.asyncio_detailed(
client=client,
limit=250 # Good balance of efficiency and memory - automatically paginated
)
# โ Bad: Too small (many requests)
products = await get_all_products.asyncio_detailed(
client=client,
limit=10 # Will make many small requests
)
# โ Bad: Too large (may hit API limits)
products = await get_all_products.asyncio_detailed(
client=client,
limit=10000 # May exceed API limits
)
3. Use Concurrent Requests Wisely¶
import asyncio
from katana_public_api_client.api.product import get_product
# โ
Good: Limited concurrency respects rate limits
async def get_multiple_products(product_ids):
async with KatanaClient() as client:
# Process in small batches
results = []
for i in range(0, len(product_ids), 5): # 5 concurrent requests
batch = product_ids[i:i+5]
batch_results = await asyncio.gather(*[
get_product.asyncio_detailed(client=client, id=pid)
for pid in batch
])
results.extend(batch_results)
# Small delay between batches
if i + 5 < len(product_ids):
await asyncio.sleep(0.2)
return results
๐ API Reference¶
KatanaClient¶
class KatanaClient(AuthenticatedClient):
"""The pythonic Katana API client with automatic resilience and pagination."""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: float = 30.0,
max_retries: int = 5,
max_pages: int = 100,
logger: Optional[Logger] = None,
**httpx_kwargs: Any,
): ...
async def __aenter__(self) -> "KatanaClient": ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
ResilientAsyncTransport¶
Factory function that creates a layered transport with automatic resilience:
def ResilientAsyncTransport(
max_retries: int = 5,
max_pages: int = 100,
logger: Optional[Logger] = None,
**kwargs: Any
) -> RetryTransport:
"""
Creates a chained transport with:
1. AsyncHTTPTransport (base HTTP transport)
2. ErrorLoggingTransport (logs detailed 4xx errors)
3. PaginationTransport (auto-collects paginated responses)
4. RetryTransport (handles retries with Retry-After header support)
"""
...
Next Steps: Check out the API Reference for detailed endpoint documentation, or see Testing Guide for testing patterns.