RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /MCP Server Implementation
  6. /Ch. 10
MCP Server Implementation

10. API Integration Server

Chapter 10 of 22 · 20 min
KEY INSIGHT

API MCP servers translate between the MCP protocol and external HTTP APIs, handling authentication, rate limiting, and response transformation. Integrating external APIs into MCP requires converting between protocol formats and API-specific behaviors. This chapter covers the patterns needed for reliable API integration. **HTTP Client Setup:** ```python import httpx from httpx import AsyncClient class APIServer: def __init__(self, base_url: str, api_key: str): self.base_url = base_url.rstrip("/") self._client = AsyncClient( base_url=self.base_url, headers={"Authorization": f"Bearer {api_key}"}, timeout=30.0 ) async def close(self): await self._client.aclose() ``` **Pagination Handling:** APIs return paginated results. MCP tools should handle this transparently. ```python async def fetch_all_pages(self, endpoint: str, max_items: int = 1000) -> list[dict]: results = [] cursor = None while len(results) < max_items: params = {"limit": 100} if cursor: params["cursor"] = cursor response = await self._client.get(endpoint, params=params) response.raise_for_status() data = response.json() items = data.get("items", data.get("results", [])) results.extend(items) cursor = data.get("next_cursor") if not cursor: break return results[:max_items] ``` **Error Translation:** ```python async def safe_request(self, method: str, path: str, **kwargs): try: response = await self._client.request(method, path, **kwargs) if response.status_code == 200: return response.json() elif response.status_code == 401: raise ToolError("API authentication failed") elif response.status_code == 403: raise ToolError("API access forbidden") elif response.status_code == 429: raise ToolError("API rate limit exceeded, retry later") elif response.status_code >= 500: raise ToolError(f"API server error: {response.status_code}") else: raise ToolError(f"API error: {response.status_code}") except httpx.TimeoutException: raise ToolError("API request timed out") except httpx.ConnectError: raise ToolError("API connection failed") ``` **Tool Implementation:** ```python @server.call_tool() async def call_tool(name: str, args: dict) -> str: match name: case "list_repos": repos = await self.fetch_all_pages("/api/repos") return json.dumps(repos, indent=2) case "get_issues": issues = await self.safe_request( "GET", f"/api/repos/{args['repo']}/issues", params={"state": args.get("state", "open")} ) return json.dumps(issues, indent=2) case "create_issue": result = await self.safe_request( "POST", f"/api/repos/{args['repo']}/issues", json={ "title": args["title"], "body": args.get("body", ""), "labels": args.get("labels", []) } ) return f"Created issue #{result['number']}" ``` **Rate Limiting:** ```python import asyncio from datetime import datetime, timedelta class RateLimiter: def __init__(self, max_requests: int, per_seconds: int): self.max_requests = max_requests self.per_seconds = per_seconds self._requests: list[datetime] = [] async def acquire(self): now = datetime.now() window_start = now - timedelta(seconds=self.per_seconds) self._requests = [r for r in self._requests if r > window_start] if len(self._requests) >= self.max_requests: wait_time = (self._requests[0] - window_start).total_seconds() await asyncio.sleep(wait_time) self._requests.append(datetime.now()) ``` **Response Caching:** ```python from functools import lru_cache import time @lru_cache(maxsize=100) async def cached_request(path: str, max_age: int = 60): cache_key = f"{path}:{int(time.time() / max_age)}" # Implementation with timestamp in cache key for TTL ```

EXERCISE

Build an API integration server for a weather service. Implement tools for current conditions, forecast retrieval, and historical data. Handle missing data gracefully and transform API responses into consistent formats.

← Chapter 9
Database Query Server
Chapter 11 →
SSE Transport