HOW-TO · RAG

How to Build Error Handling in Agent Pipelines

intermediate20 minBy Fredoline Eruo
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES

Agent pipeline, error monitoring tools, Python 3.10+

What this does

Error handling in agent pipelines catches failures at each stage (LLM call, tool execution, memory access) and applies recovery strategies — retries, fallbacks, graceful degradation — instead of crashing.

Steps

  • Wrap each pipeline stage with typed exceptions. Differentiate between retriable and fatal errors.
class ToolExecutionError(Exception):
    """Tool call failed but can be retried."""
    pass

class LLMNotAvailableError(Exception):
    """LLM is unreachable, use fallback."""
    pass

class ContextOverflowError(Exception):
    """Context window exceeded, need to truncate."""
    pass
  • Implement a pipeline stage executor with retries.
import time
from functools import wraps

def retry(max_attempts=3, base_delay=1.0, exceptions=(ToolExecutionError,)):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    if attempt < max_attempts - 1:
                        time.sleep(base_delay * (2 ** attempt))
            raise last_exc
        return wrapper
    return decorator

@retry(max_attempts=3)
def call_llm(prompt: str):
    # May raise ToolExecutionError if API fails
    return llm.invoke(prompt)
  • Build fallback chains. Try primary path, then degraded path.
def execute_with_fallback(query: str) -> str:
    fallbacks = [
        try_primary_search,    # Full web search
        try_local_search,      # Local vector search only
        try_llm_knowledge,     # No search, rely on LLM knowledge
        lambda q: "Unable to process your request at this time."
    ]

    for fallback in fallbacks:
        try:
            return fallback(query)
        except (ToolExecutionError, TimeoutError):
            continue
  • Handle partial failures gracefully. Some tools succeed, some fail.
def execute_tool_batch(tool_calls: list, registry: ToolRegistry) -> list[dict]:
    results = []
    for tc in tool_calls:
        try:
            result = registry.call(tc.function.name, **json.loads(tc.function.arguments))
            results.append({"role": "tool", "tool_call_id": tc.id, "content": str(result)})
        except Exception as e:
            # Return error as tool result so LLM can decide what to do
            results.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": json.dumps({"error": str(e)})
            })
    return results
  • Add a circuit breaker for external services. Stop calling a failing service repeatedly.
import datetime

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure_time = None
        self.is_open = False

    def call(self, func, *args, **kwargs):
        if self.is_open:
            if (datetime.now() - self.last_failure_time).seconds > self.reset_timeout:
                self.is_open = False  # half-open
            else:
                raise ToolExecutionError("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            if self.failure_count >= self.failure_threshold:
                self.is_open = True
            raise
  • Log all errors with context for debugging.
def safe_pipeline_stage(stage_name: str, func, *args, **kwargs):
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logger.error(f"Pipeline stage '{stage_name}' failed",
            error=str(e), args=args)
        raise

Verification

python -c "
import time
attempts = 0

@retry(max_attempts=3, base_delay=0.1)
def flaky():
    global attempts
    attempts += 1
    if attempts < 2:
        raise ToolExecutionError('fail')
    return 'ok'

print(flaky())
# Expected: ok
"

Common failures

  • Retry exhaustion without fallback. After all retries fail, the error propagates and crashes the agent. Always have a final fallback.
  • Silent failures. Errors caught but not logged make debugging impossible. Log every error with full context.
  • Circuit breaker never resets. Once open, the breaker stays open forever. Implement a half-open state that allows test requests after a timeout.
  • Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
  • Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.

Related guides

  • How to Handle Function Call Errors and Retries
  • How to Implement Logging for Agent Debugging