15. Usage Tracking
Usage tracking captures the complete lifecycle of inference requests, enabling audit trails, performance analysis, and operational debugging. Without thorough tracking, troubleshooting requires reproducing issues—a tedious process that delays resolution.
Request tracking requires unique identifiers propagated through the entire request lifecycle. Client-generated request IDs flow through the gateway to providers and into response metadata. Correlating logs across components becomes straightforward when identifiers remain consistent.
from datetime import datetime
from typing import Optional
import json
class RequestTracker:
def __init__(self, storage: TrackingStorage):
self.storage = storage
self.active_requests: dict[str, RequestContext] = {}
async def begin_request(self, request: GatewayRequest) -> str:
request_id = request.client_request_id or self._generate_id()
context = RequestContext(
request_id=request_id,
timestamp=datetime.utcnow(),
prompt=request.prompt,
model=request.model,
provider=None,
status=Status.PENDING,
metadata=request.tracking_metadata
)
self.active_requests[request_id] = context
await self.storage.append(request_id, context.to_event("started"))
return request_id
async def begin_provider_call(self, request_id: str,
provider: str,
model: str):
context = self.active_requests[request_id]
context.provider = provider
context.model = model
context.status = Status.PROVIDER_CALLED
await self.storage.append(request_id,
context.to_event("provider_call_started",
{"provider": provider, "model": model}))
async def complete_request(self, request_id: str,
response: ProviderResponse,
latency_ms: int):
context = self.active_requests.pop(request_id)
context.status = Status.COMPLETED
context.latency_ms = latency_ms
context.tokens_used = response.usage.total_tokens
await self.storage.append(request_id,
context.to_event("completed", {
"latency_ms": latency_ms,
"tokens": response.usage.total_tokens,
"cost_cents": response.estimated_cost
}))
async def fail_request(self, request_id: str,
error: Exception,
provider: Optional[str] = None):
context = self.active_requests.pop(request_id, None)
if context:
context.status = Status.FAILED
await self.storage.append(request_id,
context.to_event("failed", {
"error_type": type(error).__name__,
"error_message": str(error),
"provider": provider
}))
class RequestContext:
def __init__(self, request_id: str, timestamp: datetime,
prompt: str, model: str, provider: Optional[str],
status: Status, metadata: dict):
self.request_id = request_id
self.timestamp = timestamp
self.prompt = prompt
self.model = model
self.provider = provider
self.status = status
self.metadata = metadata
self.latency_ms: Optional[int] = None
self.tokens_used: Optional[int] = None
def to_event(self, event_type: str, data: dict = None) -> dict:
return {
"request_id": self.request_id,
"timestamp": self.timestamp.isoformat(),
"event": event_type,
"model": self.model,
"provider": self.provider,
"status": self.status.value,
**data
}
Retention policies balance analytical needs against storage costs. Detailed tracking for the past 7 days enables rapid debugging. Aggregated metrics for the past 90 days support trend analysis. Long-term storage preserves only high-level statistics for compliance and capacity planning.
Implement request tracking in your gateway. Create a query interface that retrieves request history by ID, user, or time range. Add a retention policy that archives detailed data after 30 days.