KEY INSIGHT
Slow pipeline stages create bottlenecks; profiling and parallelizing independent stages is the fastest path to throughput gains.
### Where Pipelines Slow Down
RAG pipelines have these typical stages: query parsing, rewriting/decomposition, embedding generation, vector search, reranking, context processing, and answer generation. The time cost is not distributed evenly—"embedding + search" dominates at high scale, while "answer generation" dominates for long contexts.
### Profiling the Pipeline
```python
import time
from functools import wraps
def profileStage(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"[PROFILE] {func.__name__}: {elapsed*1000:.1f}ms")
return result, elapsed
return wrapper
# Instrumented pipeline
def ragPipeline(query: str) -> tuple[str, dict]:
timings = {}
# Stage 1: Rewrite
rewritten_queries, t1 = profileStage(rewriteQueryBatch)([query])
timings["rewrite"] = t1
# Stage 2: Embed
embeddings, t2 = profileStage(generateEmbeddings)(rewritten_queries)
timings["embedding"] = t2
# Stage 3: Search
results, t3 = profileStage(vectorSearch)(embeddings, top_k=10)
timings["search"] = t3
# Stage 4: Re-rank
reranked, t4 = profileStage(rerankResults)(results, query)
timings["rerank"] = t4
# Stage 5: Generate
answer, t5 = profileStage(generateAnswer)(reranked, query)
timings["generate"] = t5
total = sum(timings.values())
print(f"[PROFILE] Total: {total*1000:.1f}ms")
print(f"[PROFILE] Breakdown: { {k: f'{v*1000:.1f}ms' for k,v in timings.items()} }")
return answer, {"timings": timings, "total": total}
```
### Parallelizing Independent Stages
Rewrite and embedding are independent per-query but are often called serially. If decomposing into multiple sub-queries, embed them in parallel:
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallelEmbed(queryBatch: list[str], max_workers: int = 10) -> list[list[float]]:
embeddings = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(embed_texts, [q]): i for i, q in enumerate(queryBatch)}
results = [None] * len(queryBatch)
for future in as_completed(futures):
idx = futures[future]
results[idx] = future.result()[0]
return results
# Before: serial embedding
# for q in queries:
# emb = embed_texts([q]) # blocking per query
# After: parallel embedding
# embeddings = parallelEmbed(queries, max_workers=10) # ~10x throughput for 10 queries
```
### Async Wait Times
For production HTTP-based calls (embedding API, generation API), use `asyncio` with `aiohttp` to overlap network I/O:
```python
import asyncio
import aiohttp
async def asyncEmbedBatch(
queries: list[str],
api_url: str,
api_key: str,
batch_size: int = 100
) -> list[list[float]]:
semaphore = asyncio.Semaphore(batch_size)
async def embedSingle(query: str) -> list[float]:
async with semaphore:
async with aiohttp.ClientSession() as session:
payload = {"input": query}
headers = {"Authorization": f"Bearer {api_key}"}
async with session.post(api_url, json=payload, headers=headers) as resp:
data = await resp.json()
return data["embedding"]
tasks = [embedSingle(q) for q in queries]
return await asyncio.gather(*tasks)
```
### Async Answer Generation
```python
async def asyncPipeline(query: str) -> str:
# Rewrite stage
rewrite_task = asyncio.to_thread(rewriteQueryBatch, [query])
# Start rewrite while we await
rewritten = await rewrite_task
# Parallel: embed + generate query synopsis
embed_task = asyncio.to_thread(generateEmbeddings, rewritten)
search_task = asyncio.to_thread(vectorSearch, await embed_task, top_k=10)
# Await search, then rerank
results = await search_task
reranked, rerank_time = await asyncio.to_thread(rerankWithCrossEncoder, results, query)
# Generate
answer, gen_time = await asyncio.to_thread(generateAnswer, reranked, query)
return answer
```
### Failure Modes
Thread pool sizes that exceed the API rate limit cause 429 errors. Cache embeddings server-side when possible to avoid redundant computation. Async code introduces subtle race conditions—ensure thread safety on shared vector store connections. For local models, GIL contention limits parallelization of CPU-bound embedding runs; use batch inference instead.