Local AI APIs and Integration
Learn local ai apis and integration through RunLocalAI's practical lens: api, fastapi, openai compatible and streaming, hardware fit, runtime settings, verification habits and local-vs-cloud tradeoffs.
- B002
- B003
Why this course matters
Local AI APIs and Integration is for builders turning local models into working tools, agents and retrieval systems. It connects api, fastapi, openai compatible, streaming and gateway to the questions RunLocalAI wants every reader to answer before they install, upgrade or scale a model: will it run, what will it cost in memory, what setting changes the result, and how do you verify the answer instead of trusting a demo?
What you will be able to do
By the end, you should be able to explain the main tradeoffs in plain language, choose a safe next experiment, and use the chapter exercises as a repeatable operator checklist. The course favors local evidence, hardware fit, context limits, latency and failure modes over generic AI vocabulary.
How to use this course
Start at chapter one if the topic is new. If you already have a working stack, scan for chapters such as API Design Principles, OpenAI API Format, FastAPI Basics and Chat Completions Endpoint and use those lessons as a quality-control pass before changing a workstation, team workflow or production-like local deployment.
- 01API Design PrinciplesAn API is a contract. The moment you publish an endpoint, you are promising stability to every consumer that depends on it. Designing for local AI serving means understanding the difference between the *capabilities* of your inference engine and the *interface* you expose. These two layers should remain independent. The interface should never leak implementation details about what model is running or how it is being served. ### What Makes a Good Local AI API A well-designed local AI API prioritizes three properties: compatibility, predictability, and observability. Compatibility means existing clients work without modification. Predictability means the API behaves consistently under load. Observability means failures can be diagnosed without guessing. Start by defining the request-response contract in a schema. For OpenAI-compatible endpoints, this means mapping your internal representation to the format clients expect. A request to `/v1/chat/completions` should receive a response that matches the OpenAI schema, not a custom format unique to your setup. ### Core Design Decisions Request validation happens at the boundary. Reject malformed requests with 422 status codes and clear error messages before they reach your inference pipeline. Never let invalid input propagate downstream where it becomes harder to debug. Response structure should be consistent even when the underlying operation fails. A 200 response and a 500 response should share the same top-level keys. Clients should never encounter different JSON shapes depending on which code path executed. Streaming responses require special consideration. A streaming response is not just a series of HTTP chunks. It is a structured byte stream where the client parses content and reconstructs the complete response. Breaking changes in the stream format will silently break all consumers. ### The Portability Principle Design interfaces that do not depend on the serving backend. If you switch from vLLM to Ollama or change your inference engine, the API layer should adapt without forcing clients to update their code. This means abstracting model loading, batching, and memory management behind a service layer that your HTTP handlers call.15 min
- 02OpenAI API FormatUnderstanding the exact JSON structure of OpenAI API requests and responses is essential for building compatible endpoints. The format is well-documented, but subtle details like null handling, default values, and field naming conventions cause most compatibility issues in practice. ### Request Structure A chat completions request carries a messages array, model identifier, and several optional parameters. The messages array contains objects with `role` and `content` fields. Roles include `system`, `user`, and `assistant`. Each role instructs the model behavior differently. ```json { "model": "llama3.2:latest", "messages": [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Explain API design."} ], "temperature": 0.7, "max_tokens": 512, "stream": false } ``` The `model` field identifies which model should process the request. In a local setup, this string might map to a local model file or a container image. The API layer is responsible for resolving this identifier. ### Response Structure A non-streaming response follows this structure: ```json { "id": "chatcmpl-abc123", "object": "chat.completion", "created": 1700000000, "model": "llama3.2:latest", "choices": [ { "index": 0, "message": { "role": "assistant", "content": "API design involves..." }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": 20, "completion_tokens": 45, "total_tokens": 65 } } ``` The `finish_reason` field indicates why generation stopped. Common values are `stop` (natural completion), `length` (hit max_tokens), and `content_filter` (content flagged). Always include usage statistics even for local models. Clients rely on token counts for cost tracking and analytics. ### Common Compatibility Pitfalls Omitting the `usage` field breaks clients that expect to track token consumption. Using inconsistent field casing (camelCase vs snake_case) breaks clients that parse based on schema expectations. Returning `finish_reason: "stop"` with incorrect casing will cause validation failures in strict clients.15 min
- 03FastAPI BasicsFastAPI handles the plumbing that every web service needs: routing, request parsing, response serialization, and error handling. Learning to use its type system correctly eliminates an entire class of bugs where invalid data reaches your business logic. ### Project Structure A minimal FastAPI project requires an application instance, defined routes, and a run command. Organize code so that the FastAPI app object imports from separate modules rather than containing all logic inline. ``` app/ Γö£ΓöÇΓöÇ main.py Γö£ΓöÇΓöÇ routers/ Γöé Γö£ΓöÇΓöÇ completions.py Γöé ΓööΓöÇΓöÇ embeddings.py Γö£ΓöÇΓöÇ models/ Γöé Γö£ΓöÇΓöÇ requests.py Γöé ΓööΓöÇΓöÇ responses.py ΓööΓöÇΓöÇ services/ ΓööΓöÇΓöÇ inference.py ``` The `main.py` file creates the FastAPI instance, includes routers, and configures middleware. Routers group related endpoints together. Models define request and response schemas using Pydantic. Services contain the business logic that routers call. ### Defining a Route ```python from fastapi import FastAPI, HTTPException from pydantic import BaseModel app = FastAPI() class CompletionRequest(BaseModel): model: str messages: list[dict] temperature: float = 0.7 max_tokens: int | None = None @app.post("/v1/chat/completions") async def create_completion(request: CompletionRequest): if not request.messages: raise HTTPException(status_code=400, detail="messages cannot be empty") return {"choices": [{"message": {"content": "response"}}]} ``` The `async def` syntax allows the endpoint to handle concurrent requests without blocking. The Pydantic model validates the request body automatically. Invalid requests receive a 422 response without any custom validation code. ### Running the Server ```bash uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload ``` The `--reload` flag enables auto-reload during development. For production, remove this flag and consider running behind a reverse proxy like nginx. ### Failure Modes Importing modules with circular dependencies crashes at startup with a cryptic error. Failing to install a dependency causes 500 errors at runtime instead of clear installation messages. Returning a raw Python dict from an endpoint bypasses Pydantic serialization, causing inconsistent response formats.20 min
- 04Chat Completions EndpointThe chat completions endpoint is the workhorse of AI APIs. Implementing it correctly means handling message formatting, inference calls, token counting, and error propagation in a single request-response cycle that must complete within tight time constraints. ### Implementation Walkthrough Start with the Pydantic model that mirrors the OpenAI request format. ```python from pydantic import BaseModel, Field from typing import Literal class Message(BaseModel): role: Literal["system", "user", "assistant"] content: str class CompletionRequest(BaseModel): model: str messages: list[Message] temperature: float = Field(default=0.7, ge=0, le=2) max_tokens: int = Field(default=256, gt=0) stream: bool = False top_p: float | None = None frequency_penalty: float | None = None presence_penalty: float | None = None ``` Field validators enforce constraints like temperature range. The request model handles validation before any inference code runs. ### Building the Response ```python import time class CompletionResponse(BaseModel): id: str object: str = "chat.completion" created: int model: str choices: list usage: dict def create_response(model: str, message: str, tokens: int) -> dict: return { "id": f"chatcmpl-{random_id()}", "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "message": {"role": "assistant", "content": message}, "finish_reason": "stop" }], "usage": { "prompt_tokens": count_tokens(message), "completion_tokens": tokens, "total_tokens": count_tokens(message) + tokens } } ``` The response follows the exact OpenAI schema. Token counting requires a tokenizer that matches the model being served. Using the wrong tokenizer produces incorrect usage statistics. ### Handling Inference ```python async def generate(request: CompletionRequest): try: # Convert messages to prompt format expected by model prompt = format_messages(request.messages) # Call inference service result = await inference_client.generate( prompt=prompt, max_tokens=request.max_tokens, temperature=request.temperature ) return create_response(request.model, result.text, result.tokens) except ModelNotFoundError: raise HTTPException(status_code=404, detail="Model not found") except InferenceTimeoutError: raise HTTPException(status_code=504, detail="Inference timeout") ``` Error handling converts internal exceptions into appropriate HTTP status codes. Clients should never receive a 500 error with a raw traceback.20 min
- 05Streaming with SSEServer-Sent Events (SSE) deliver real-time responses without WebSocket complexity. Understanding the SSE format and chunk serialization is essential for implementing streaming endpoints that clients like the OpenAI SDK expect. ### SSE Protocol Basics SSE uses a text-based format where each event is separated by double newlines. Events contain a `data:` prefix followed by the payload. The stream terminates with an optional `data: [DONE]` message. ``` data: {"id":"1","choices":[{"delta":{"content":"Hello"}}]} data: {"id":"1","choices":[{"delta":{"content":" world"}}]} data: [DONE] ``` Clients parse these lines and reconstruct the complete response. A missing newline breaks the entire stream. An incorrectly formatted chunk causes the client to ignore all subsequent data. ### FastAPI Streaming Response ```python from fastapi.responses import StreamingResponse import json async def stream_completion(request: CompletionRequest): async def event_generator(): prompt = format_messages(request.messages) async for chunk in inference_client.stream_generate(prompt): delta = {"content": chunk.text} event = { "id": f"chatcmpl-{random_id()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model, "choices": [{"index": 0, "delta": delta, "finish_reason": None}] } yield f"data: {json.dumps(event)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") ``` The `StreamingResponse` class handles the HTTP chunked transfer encoding automatically. The generator yields bytes that FastAPI sends to the client as they arrive. ### Chunk Structure Each chunk follows the chat completion chunk schema with `delta` instead of `message`. The `finish_reason` is null during streaming and only appears in the final conceptual chunk. ```json { "id": "chatcmpl-abc", "object": "chat.completion.chunk", "created": 1700000000, "model": "llama3.2", "choices": [{ "index": 0, "delta": {"content": "Hello"}, "finish_reason": null }] } ``` ### Common Failure Modes Forgetting to flush the response buffer causes the client to receive all chunks at once instead of in real-time. Mixing text and binary data in the stream breaks clients expecting text-only. Sending chunks after `data: [DONE]` causes parsing errors on the client.20 min
- 06API Key AuthenticationAuthentication protects your API from unauthorized access. For local deployments, a simple API key header check prevents accidental exposure and blocks non-local clients from consuming resources meant for internal use. ### Header-Based Authentication The standard pattern checks for a specific header and validates its value against a stored secret. FastAPI provides a dependency system that makes this clean and reusable. ```python from fastapi import Header, HTTPException async def verify_api_key(x_api_key: str = Header(default=None)): if not x_api_key: raise HTTPException( status_code=401, detail="Missing API key. Provide X-API-Key header." ) if x_api_key != os.environ.get("API_KEY"): raise HTTPException( status_code=403, detail="Invalid API key." ) return True ``` The `Header()` parameter extracts the header value and validates its presence. Returning an error with a `detail` message helps clients diagnose authentication failures. ### Applying Authentication to Routes ```python @app.post("/v1/chat/completions") async def completions(request: CompletionRequest, _: bool = Depends(verify_api_key)): return await generate(request) ``` The `Depends()` function injects the authentication check before the endpoint logic executes. If authentication fails, the endpoint never runs. ### API Key Generation Generate keys using a cryptographically secure random function. Store the hash, not the plain key, to limit damage if your database is compromised. ```python import secrets def generate_api_key() -> tuple[str, str]: raw_key = secrets.token_urlsafe(32) hashed_key = hashlib.sha256(raw_key.encode()).hexdigest() return raw_key, hashed_key ``` When a client provides a key, hash it and compare against the stored hash. ### Security Considerations Never log API keys. Never return the key in responses. Use HTTPS in production even for local deployment to prevent key interception by local network observers. Rotate keys regularly and provide an endpoint for key revocation.20 min
- 07Rate LimitingRate limiting protects your inference infrastructure from overload. Without it, a single client can consume all available GPU memory, causing degraded service for everyone. Token bucket and sliding window algorithms provide the right balance between fairness and burst handling. ### Token Bucket Implementation ```python import time from collections import defaultdict class RateLimiter: def __init__(self, rate: int, per_seconds: int): self.rate = rate self.per_seconds = per_seconds self.buckets = defaultdict(lambda: {"tokens": rate, "last_refill": time.time()}) def allow_request(self, key: str) -> bool: bucket = self.buckets[key] now = time.time() # Refill tokens elapsed = now - bucket["last_refill"] refill = (elapsed / self.per_seconds) * self.rate bucket["tokens"] = min(self.rate, bucket["tokens"] + refill) bucket["last_refill"] = now if bucket["tokens"] >= 1: bucket["tokens"] -= 1 return True return False ``` Each client maintains a bucket that refills at a fixed rate. Requests consume tokens. When the bucket is empty, requests are rejected. ### Integration with FastAPI ```python limiter = RateLimiter(rate=60, per_seconds=60) # 60 requests per minute @app.middleware("http") async def rate_limit_middleware(request: Request, call_next): client_id = request.headers.get("X-API-Key", request.client.host) if not limiter.allow_request(client_id): return JSONResponse( status_code=429, content={"error": {"message": "Rate limit exceeded", "type": "rate_limit_error"}} ) response = await call_next(request) return response ``` The middleware intercepts every request before it reaches the endpoint. Rate limit exceeded responses include a body matching the OpenAI error format. ### Response Headers Inform clients about their remaining quota using response headers. ```python response.headers["X-RateLimit-Limit"] = "60" response.headers["X-RateLimit-Remaining"] = str(remaining_tokens) response.headers["X-RateLimit-Reset"] = str(reset_timestamp) ``` These headers allow clients to implement their own backoff strategies rather than blindly retrying. ### Failure Modes Rate limit state stored in memory breaks when running multiple workers. Use Redis for distributed rate limiting across worker processes. Setting limits too low causes false positives for legitimate usage spikes.20 min
- 08Multi-Model GatewayA gateway that routes requests to different local models based on the model identifier allows a single API endpoint to serve multiple use cases. The routing logic should be abstracted so that adding new models requires only configuration changes, not code changes. ### Architecture Overview ``` Client Request (model: "llama3.2") │ ▼ Gateway Layer │ ├──► Model Registry │ │ │ "llama3.2" → /models/llama3.2 │ "mistral" → /models/mistral │ ▼ Inference Engines │ ├──► vLLM Engine └──► Ollama Engine ``` The gateway receives all requests, looks up the model in a registry, and forwards the request to the appropriate engine. The client never knows which engine handles their request. ### Model Registry ```python from pydantic import BaseModel class ModelConfig(BaseModel): name: str engine: str # "vllm" or "ollama" endpoint: str max_tokens: int supports_streaming: bool MODEL_REGISTRY: dict[str, ModelConfig] = {} def register_model(config: ModelConfig): MODEL_REGISTRY[config.name] = config register_model(ModelConfig( name="llama3.2:latest", engine="ollama", endpoint="http://localhost:11434", max_tokens=4096, supports_streaming=True )) ``` The registry maps model identifiers to their serving configuration. Add new models by calling `register_model()` with their configuration. ### Request Routing ```python async def route_request(model: str, request_data: dict): if model not in MODEL_REGISTRY: raise HTTPException(status_code=404, detail=f"Model '{model}' not found") config = MODEL_REGISTRY[model] if config.engine == "ollama": return await ollama_generate(config.endpoint, request_data) elif config.engine == "vllm": return await vllm_generate(config.endpoint, request_data) ``` The routing function selects the correct inference engine based on model configuration. Adding support for new engines requires adding a new branch and a new client function. ### Failure Modes Model registry lookup fails silently if case sensitivity is not handled consistently. A model marked as supporting streaming will return 500 errors if the actual engine does not support it. Unreachable engine endpoints cause timeouts unless connection pooling and retry logic are implemented.20 min
- 09Model RoutingModel routing extends the gateway concept by dynamically selecting the optimal model based on request characteristics, not just the model identifier. This enables cost optimization, latency reduction, and load distribution across heterogeneous hardware. ### Routing Criteria Model selection can consider multiple factors: request complexity, latency requirements, cost constraints, and hardware availability. A simple request might route to a smaller, faster model while a complex analysis request routes to a larger, more capable model. ```python class RoutingStrategy: def select_model(self, request: CompletionRequest) -> str: raise NotImplementedError class LatencyRouting(RoutingStrategy): def select_model(self, request: CompletionRequest) -> str: # Route to fastest available model return min( MODEL_REGISTRY.items(), key=lambda item: item[1].avg_latency )[0] class CostRouting(RoutingStrategy): def select_model(self, request: CompletionRequest) -> str: # Route to cheapest model that meets requirements for model_name, config in MODEL_REGISTRY.items(): if config.capability_score >= self._estimate_required_score(request): return model_name return "default-model" ``` ### Request Classification ```python def classify_request(request: CompletionRequest) -> str: # Estimate complexity based on message count and content length total_tokens = sum(len(m.content) for m in request.messages) if len(request.messages) <= 2 and total_tokens < 200: return "simple" elif len(request.messages) <= 5 and total_tokens < 1000: return "medium" return "complex" ``` This simple heuristic estimates request complexity without making an inference call. More sophisticated approaches use a lightweight classifier or maintain historical performance data. ### Dynamic Routing Implementation ```python class DynamicRouter: def __init__(self): self.strategies = { "simple": "llama3.2-tiny", "medium": "llama3.2", "complex": "llama3.2-large" } async def route(self, request: CompletionRequest) -> dict: classification = classify_request(request) target_model = self.strategies.get(classification, "llama3.2") # Override if client explicitly requests a model if request.model not in ("auto", "dynamic"): target_model = request.model return await self.forward_to_model(target_model, request) ``` The router classifies incoming requests and selects the target model. Explicit model requests override the automatic selection, preserving user intent. ### Fallback Chains When the primary model is unavailable or times out, route to a fallback model. ```python async def route_with_fallback(request: CompletionRequest) -> dict: models = ["llama3.2-large", "llama3.2", "llama3.2-tiny"] for model in models: try: return await self.forward_to_model(model, request, timeout=10) except TimeoutError: continue raise HTTPException(status_code=503, detail="All models unavailable") ``` Fallback chains ensure availability even when specific models are overloaded or offline.20 min
- 10Request LoggingRequest logging transforms debugging from guesswork into scienceΓÇöevery HTTP method, path, status code, and timing metric tells a story about system behavior. Production APIs without logging are black boxes. When a client reports unexpected behavior, engineers without logs spend hours reproducing issues. Logs make debugging a deterministic process: examine the request sequence, identify anomalies, and trace problems to root causes. Structured logging in FastAPI requires the `logging` module configured with JSON output for machine parsing. A logger captures request metadata, response status codes, and timing information at minimum. Adding request body sampling (with size limits) enables deeper forensic analysis when failures occur. The logging middleware pattern wraps each request, capturing start time before the handler executes and computing elapsed time after completion. Context variables propagate request identifiers through nested function calls, ensuring logs can be correlated across distributed components. ```python import logging import time import uuid from contextvars import ContextVar from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request request_id_var: ContextVar[str] = ContextVar("request_id") logger = logging.getLogger("api.access") class AccessLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())) request_id_var.set(request_id) start_time = time.perf_counter() logger.info( "request_start", extra={ "request_id": request_id, "method": request.method, "path": request.url.path, "client": request.client.host if request.client else None, } ) response = await call_next(request) elapsed = (time.perf_counter() - start_time) * 1000 logger.info( "request_complete", extra={ "request_id": request_id, "status_code": response.status_code, "duration_ms": round(elapsed, 2), } ) response.headers["X-Request-ID"] = request_id return response ``` Failure modes include log explosion from verbose output, storage costs for retention policies, and sensitive data exposure in plaintext logs. Address these by establishing log levels (DEBUG for development, INFO for production), implementing PII filtering, and rotating logs with compression. Rotate logs daily or when exceeding size thresholds. Ship logs to a centralized system for aggregation and search. Retention policies balance investigative value against storage costsΓÇötypically 30 days for hot storage, 12 months for cold archive.15 min
- 11Error ResponsesClients cannot handle errors they cannot parseΓÇöstandardized error schemas turn vague HTTP status codes into actionable debugging information. HTTP status codes communicate error categories: 400 for client mistakes, 500 for server failures, 429 for rate limits. These codes enable programmatic error handling, but they lack specificity. A client receiving a 400 status code cannot determine whether the request was malformed JSON, missing a required field, or violating a validation constraint without additional context. RFC 7807 defines a Problem Details format for HTTP APIs. This standard structure includes a type URI identifying the error category, a title summarizing the issue, detail describing what went wrong, and instance indicating which request triggered the failure. Adopting this format ensures consistent error parsing across all API consumers. ```python from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel, ValidationError import logging logger = logging.getLogger("api.errors") class ProblemDetail(BaseModel): type: str title: str status: int detail: str instance: str class ErrorHandler: @staticmethod def handle_validation_error(request: Request, exc: ValidationError) -> JSONResponse: errors = [] for error in exc.errors(): errors.append({ "field": ".".join(str(loc) for loc in error["loc"]), "message": error["msg"], "type": error["type"], }) return JSONResponse( status_code=422, content={ "type": "https://api.example.com/errors/validation", "title": "Unprocessable Entity", "status": 422, "detail": "Request validation failed", "instance": str(request.url), "errors": errors, } ) @staticmethod def handle_generic_error(request: Request, exc: Exception) -> JSONResponse: logger.exception("Unhandled exception", exc_info=exc) return JSONResponse( status_code=500, content={ "type": "https://api.example.com/errors/internal", "title": "Internal Server Error", "status": 500, "detail": "An unexpected error occurred", "instance": str(request.url), } ) app = FastAPI() app.add_exception_handler(ValidationError, ErrorHandler.handle_validation_error) app.add_exception_handler(Exception, ErrorHandler.handle_generic_error) ``` Validation errors return 422 with field-level detail. Rate limit errors return 429 with `Retry-After` headers. Authentication failures return 401 with `WWW-Authenticate` challenge headers. Each error type follows its own conventions while maintaining the Problem Details structure. Never expose internal error details (stack traces, database errors) in API responses. Log them server-side for debugging while returning generic messages to clients. Internal details help attackers identify vulnerabilities.15 min
- 12Health ChecksHealth endpoints let orchestration systems verify readinessΓÇöseparate liveness probes from readiness checks to enable graceful degradation. Kubernetes uses health checks to manage pod lifecycle. Liveness probes determine whether a container should be restarted. Readiness probes determine whether a container can receive traffic. These probes must return quickly and accurately reflect the service's ability to function. A naive health endpoint simply returns 200. This passes when the server starts but provides no information about downstream dependencies. A realistic health check verifies database connectivity, cache availability, and external API reachability before reporting healthy status. ```python from fastapi import FastAPI from pydantic import BaseModel import asyncpg import aioredis class HealthStatus(BaseModel): status: str checks: dict app = FastAPI() async def check_database() -> dict: try: pool = app.state.db_pool async with pool.acquire() as conn: result = await conn.fetchval("SELECT 1") return {"database": {"status": "healthy", "latency_ms": 0}} except Exception as exc: return {"database": {"status": "unhealthy", "error": str(exc)}} async def check_cache() -> dict: try: redis = app.state.redis latency_start = datetime.now() await redis.ping() latency = (datetime.now() - latency_start).total_seconds() * 1000 return {"cache": {"status": "healthy", "latency_ms": round(latency, 1)}} except Exception as exc: return {"cache": {"status": "unhealthy", "error": str(exc)}} @app.get("/health/live") async def liveness(): return HealthStatus(status="alive", checks={}) @app.get("/health/ready") async def readiness(): checks = {} checks.update(await check_database()) checks.update(await check_cache()) unhealthy = [k for k, v in checks.items() if v.get("status") == "unhealthy"] if unhealthy: return JSONResponse( status_code=503, content=HealthStatus( status="unhealthy", checks=checks ).model_dump() ) return HealthStatus(status="healthy", checks=checks) ``` Liveness endpoints return immediately with no dependency checks. A slow liveness probe causes Kubernetes to restart containers unnecessarily. Readiness endpoints perform thorough checks and return 503 when dependencies fail, signaling that traffic should be routed elsewhere. Monitor health endpoint latency in production. A health check taking more than 100ms suggests resource contention or connection pool exhaustion. Alert on prolonged slowness before it impacts actual request handling.15 min
- 13OpenAPI DocumentationOpenAPI specifications transform APIs from guesswork into self-documenting interfacesΓÇögenerated documentation reduces integration friction for every client. FastAPI automatically generates an OpenAPI 3.1 specification from route decorators, Pydantic models, and type hints. This specification drives automatic documentation UIs, client code generation, and API discovery tools. Maintaining accurate specifications requires ensuring types and examples reflect actual behavior. Route documentation begins with docstrings describing endpoint purposes, parameter meanings, and response schemas. Pydantic models include field descriptions that appear in the specification. Example values let clients understand expected formats without reading implementation code. ```python from fastapi import FastAPI from pydantic import BaseModel, Field from typing import Optional app = FastAPI(title="Inference API", version="1.0.0") class ChatCompletionRequest(BaseModel): model: str = Field( ..., description="Model identifier for completion generation", example="llama3.2:latest" ) messages: list[dict] = Field( ..., description="Conversation messages with role and content", example=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is API design?"} ] ) temperature: Optional[float] = Field( 0.7, description="Sampling temperature between 0 and 2", ge=0, le=2 ) max_tokens: Optional[int] = Field( 512, description="Maximum tokens to generate", ge=1, le=4096 ) class ChatCompletionResponse(BaseModel): model: str content: str tokens_used: int finish_reason: str @app.post( "/v1/chat/completions", response_model=ChatCompletionResponse, summary="Generate chat completions", description="Creates model-generated content from conversation context" ) async def create_chat_completion(request: ChatCompletionRequest): # Implementation pass ``` Enable request validation examples in the interactive documentation. Setting `openapi_url="/openapi.json"` makes the specification available for external tooling. Authentication schemes require definition in the OpenAPI spec to appear in the documentation UI. Test documentation by generating a client with `openapi-generator`. If generated code fails to compile or produces nonsensical types, the specification needs refinement. Client-driven validation catches specification gaps before they frustrate API consumers.15 min
- 14Client LibrariesClient libraries abstract HTTP complexity into idiomatic interfacesΓÇöreliable error handling and automatic retry logic reduce integration failures for every consumer. Raw HTTP client code repeats authentication headers, base URL configuration, and error handling across endpoints. Client libraries encapsulate these patterns, providing clean methods that map to API operations. Consumers focus on business logic rather than connection management. Python clients typically use `httpx` for async support. Type stubs enable autocomplete for method signatures and parameters. Response parsing validates server payloads against expected schemas, catching mismatches before they propagate through business logic. ```python from httpx import AsyncClient, Response, TimeoutException from pydantic import BaseModel from typing import Optional import asyncio class InferenceError(Exception): def __init__(self, status_code: int, message: str): self.status_code = status_code self.message = message super().__init__(f"{status_code}: {message}") class CompletionRequest(BaseModel): model: str messages: list[dict] temperature: float = 0.7 class CompletionResponse(BaseModel): id: str model: str content: str usage: dict class AsyncInferenceClient: def __init__(self, base_url: str, api_key: str, timeout: float = 30.0): self.base_url = base_url.rstrip("/") self.api_key = api_key self._client = AsyncClient( timeout=timeout, headers={"Authorization": f"Bearer {api_key}"} ) async def completions(self, request: CompletionRequest) -> CompletionResponse: for attempt in range(3): try: response = await self._client.post( f"{self.base_url}/v1/chat/completions", json=request.model_dump() ) if response.status_code == 429: await asyncio.sleep(2 ** attempt) continue if response.status_code >= 400: raise InferenceError(response.status_code, response.text) return CompletionResponse(**response.json()) except TimeoutException: if attempt == 2: raise InferenceError(408, "Request timeout") await asyncio.sleep(1) raise InferenceError(503, "Service unavailable after retries") async def close(self): await self._client.aclose() # Usage async def main(): client = AsyncInferenceClient("http://localhost:8000", "test-key") try: result = await client.completions( CompletionRequest(model="llama3.2", messages=[{"role": "user", "content": "Hello"}]) ) print(result.content) finally: await client.close() ``` Retry logic handles transient failures. Exponential backoff with jitter prevents thundering herd problems when the API recovers. Timeout configuration prevents hanging indefinitely on unresponsive services. Include connection pooling configuration for high-throughput applications. Set keepalive timeouts to prevent stale connections. Add request ID headers for tracing across service boundaries.15 min
- 15Load TestingLoad testing reveals bottlenecks before production traffic exposes themΓÇösynthetic workloads simulate realistic traffic patterns and measure system behavior under stress. Load tests serve multiple purposes: identifying performance regressions, establishing baseline metrics, validating capacity planning, and uncovering race conditions. Without testing, production incidents reveal performance characteristics the hard way. `locust` provides Python-based load testing with distributed execution support. Test scripts define user behavior, wait times, and success criteria. Locust automatically scales simulated users across worker processes. ```python from locust import HttpUser, task, between, events import json class InferenceUser(HttpUser): wait_time = between(1, 3) def on_start(self): self.headers = { "Authorization": "Bearer test-key", "Content-Type": "application/json" } @task def completions(self): payload = { "model": "llama3.2:latest", "messages": [{"role": "user", "content": "What is load testing?"}], "temperature": 0.7 } with self.client.post( "/v1/chat/completions", json=payload, headers=self.headers, catch_response=True ) as response: if response.status_code == 200: data = response.json() if "content" in data: response.success() else: response.failure("Missing content field") elif response.status_code == 503: response.success() # Expected under load else: response.failure(f"Unexpected status: {response.status_code}") @events.init_command_line_parser.add_listener def add_custom_arguments(parser): parser.arg_parser.add_argument("--model", type=str, default="llama3.2:latest") ``` Run tests with increasing user counts to identify the saturation point. Monitor response time percentiles (p50, p95, p99) rather than averages. A p99 latency exceeding several seconds suggests queue buildup or resource contention. Target SLOs determine passing criteria. If the API must respond within 500ms for 95% of requests, the load test validates this threshold. Failed requests and timeout rates indicate capacity limits.15 min
- 16Caching LayerCaching eliminates redundant inference requestsΓÇöcache keys based on request hashes enable sub-millisecond responses for repeated identical inputs. Inference requests are computationally expensive. Identical inputs to large language models produce identical outputs. Caching these responses eliminates GPU cycles for requests that have been answered before. Redis serves as the caching backend for most API deployments. TTL policies balance cache freshness against storage costs. Cache invalidation strategies determine when stored responses become stale. ```python import hashlib import json import redis.asyncio as redis from typing import Optional from pydantic import BaseModel class CachedRequest(BaseModel): model: str messages: list[dict] temperature: float class InferenceCache: def __init__(self, redis_url: str, ttl_seconds: int = 3600): self.redis_url = redis_url self.ttl = ttl_seconds async def __aenter__(self): self.client = await redis.from_url(self.redis_url) return self async def __aexit__(self, *args): await self.client.close() def _make_key(self, request: CachedRequest) -> str: canonical = json.dumps(request.model_dump(), sort_keys=True) hash_value = hashlib.sha256(canonical.encode()).hexdigest()[:16] return f"inference:v1:{request.model}:{hash_value}" async def get(self, request: CachedRequest) -> Optional[dict]: key = self._make_key(request) cached = await self.client.get(key) return json.loads(cached) if cached else None async def set(self, request: CachedRequest, response: dict) -> None: key = self._make_key(request) await self.client.setex(key, self.ttl, json.dumps(response)) # Usage in endpoint @app.post("/v1/chat/completions") async def completions(request: CompletionRequest, cache: InferenceCache): cached_response = await cache.get(CachedRequest.model_validate(request)) if cached_response: cached_response["cached"] = True return cached_response # Generate response response = await generate_completion(request) await cache.set(CachedRequest.model_validate(request), response) return response ``` Cache hit rates above 80% typically indicate effective caching strategies. Monitor cache effectiveness with metrics tracking hit ratio, memory usage, and eviction rates. High eviction rates suggest insufficient cache capacity or TTL values that are too short. Consider semantic caching for near-identical requests. Requests differing only in whitespace or formatting should hit the cache. Normalize inputs before computing cache keys to maximize hit rates.15 min
- 17Production HardeningProduction APIs require defense in depthΓÇörate limiting, authentication, and circuit breakers protect against both accidental overload and intentional abuse. Production environments face traffic patterns development never simulated. Burst traffic, concurrent requests, and adversarial users stress systems in ways unit tests cannot. Hardening applies defensive measures that preserve functionality while limiting damage from unexpected conditions. Rate limiting protects against both abuse and accidental overload. Token bucket algorithms allow burst capacity while enforcing sustained rates. Different limits for different clients enable fair resource allocation. ```python from fastapi import FastAPI, Request, HTTPException from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from starlette.responses import JSONResponse import time limiter = Limiter(key_func=get_remote_address) @app.exception_handler(RateLimitExceeded) async def rate_limit_handler(request: Request, exc: RateLimitExceeded): return JSONResponse( status_code=429, content={ "type": "https://api.example.com/errors/rate-limit", "title": "Too Many Requests", "status": 429, "detail": str(exc.detail), "retry_after": exc.detail.split()[-1] if "second" in exc.detail else 60 } ) @app.post("/v1/chat/completions") @limiter.limit("60/minute") async def completions(request: Request): # Endpoint implementation pass # Per-client rate limiting @app.post("/v1/embeddings") @limiter.limit("120/minute", key_func=lambda req: req.state.api_key) async def embeddings(request: Request): # Endpoint implementation pass ``` Authentication prevents unauthorized access. Bearer tokens in Authorization headers validate clients before processing requests. API key rotation enables security incidents without downtime. Token validation should happen before any business logic executes. Circuit breakers prevent cascading failures. When a dependency fails repeatedly, the circuit opens and requests fail immediately rather than waiting for timeouts. This prevents resource exhaustion and enables partial functionality during outages. ```python import asyncio from dataclasses import dataclass from datetime import datetime, timedelta from typing import Optional @dataclass class CircuitState: failures: int = 0 last_failure: Optional[datetime] = None is_open: bool = False opened_at: Optional[datetime] = None class CircuitBreaker: def __init__(self, threshold: int = 5, timeout: int = 60): self.threshold = threshold self.timeout = timeout self.state = CircuitState() async def call(self, func, *args, **kwargs): if self.state.is_open: if datetime.now() - self.state.opened_at > timedelta(seconds=self.timeout): self.state.is_open = False else: raise Exception("Circuit open - dependency unavailable") try: result = await func(*args, **kwargs) self.state.failures = 0 return result except Exception as exc: self.state.failures += 1 self.state.last_failure = datetime.now() if self.state.failures >= self.threshold: self.state.is_open = True self.state.opened_at = datetime.now() raise exc ```15 min
- 18API Gateway ProjectAn API gateway consolidates authentication, rate limiting, and routingΓÇöbuilding one demonstrates integration of every concept from this course. This capstone project combines logging, error handling, health checks, documentation, client libraries, load testing, caching, and production hardening into a single deployable system. The gateway accepts requests, validates authentication, applies rate limits, routes to backend services, caches responses, and returns formatted results. All interactions log for debugging. All errors follow RFC 7807 format. ```python from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import JSONResponse from slowapi import Limiter from slowapi.errors import RateLimitExceeded from pydantic import BaseModel, Field from typing import Optional import hashlib import json import redis.asyncio as redis import time import uuid from contextvars import ContextVar request_id_var: ContextVar[str] = ContextVar("request_id") logger = logging.getLogger("gateway") app = FastAPI(title="AI Gateway v1.0") limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter class GatewayConfig: redis_url: str = "redis://localhost:6379" backend_url: str = "http://localhost:11434" cache_ttl: int = 3600 rate_limit: str = "100/minute" config = GatewayConfig() @app.on_event("startup") async def startup(): app.state.redis = await redis.from_url(config.redis_url) @app.on_event("shutdown") async def shutdown(): await app.state.redis.close() @app.middleware("http") async def logging_middleware(request: Request, call_next): request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())) request_id_var.set(request_id) start = time.perf_counter() response = await call_next(request) duration = (time.perf_counter() - start) * 1000 logger.info( "request_complete", extra={ "request_id": request_id, "method": request.method, "path": request.url.path, "status": response.status_code, "duration_ms": round(duration, 1) } ) response.headers["X-Request-ID"] = request_id return response @app.exception_handler(RateLimitExceeded) async def rate_limit_exceeded(request: Request, exc: RateLimitExceeded): return JSONResponse( status_code=429, content={ "type": "https://example.com/errors/rate-limit", "title": "Too Many Requests", "status": 429, "detail": str(exc.detail), "instance": str(request.url) } ) @app.exception_handler(Exception) async def generic_exception(request: Request, exc: Exception): logger.exception("Unhandled exception") return JSONResponse( status_code=500, content={ "type": "https://example.com/errors/internal", "title": "Internal Server Error", "status": 500, "detail": "An unexpected error occurred", "instance": str(request.url) } ) class CompletionRequest(BaseModel): model: str = Field(..., example="llama3.2:latest") messages: list[dict] = Field(..., min_length=1) temperature: Optional[float] = Field(0.7, ge=0, le=2) def verify_api_key(request: Request) -> str: auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization") return auth_header[7:] @limiter.limit("100/minute") @app.post("/v1/chat/completions", summary="Generate chat completions") async def chat_completions( request: Request, body: CompletionRequest, api_key: str = Depends(verify_api_key) ): cache_key = f"cache:completion:{hashlib.sha256(json.dumps(body.model_dump(), sort_keys=True).encode()).hexdigest()}" cached = await app.state.redis.get(cache_key) if cached: response = json.loads(cached) response["cached"] = True return response async with httpx.AsyncClient(timeout=30.0) as client: backend_response = await client.post( f"{config.backend_url}/api/chat", json={"model": body.model, "messages": body.messages} ) backend_response.raise_for_status() result = backend_response.json() response = { "model": body.model, "content": result.get("message", {}).get("content", ""), "tokens_used": result.get("eval_count", 0), "finish_reason": "stop" } await app.state.redis.setex(cache_key, config.cache_ttl, json.dumps(response)) return response @app.get("/health/live") async def liveness(): return {"status": "alive"} @app.get("/health/ready") async def readiness(): try: await app.state.redis.ping() return {"status": "healthy"} except Exception: return JSONResponse(status_code=503, content={"status": "unhealthy"}) ``` The gateway combines all hardening techniques. Authentication middleware validates every request before business logic. Rate limiting middleware enforces fair usage. Logging middleware captures all interactions for debugging. Error handlers format all failures consistently. Test the gateway end-to-end: authenticate successfully, hit rate limits, observe cached responses, verify health checks, and generate OpenAPI documentation. Load test with `locust` to validate performance under concurrent traffic.15 min