RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Advanced RAG — Chunking, Retrieval, Re-ranking
  6. /Ch. 19
Advanced RAG — Chunking, Retrieval, Re-ranking

19. Pipeline Optimization

Chapter 19 of 24 · 25 min
KEY INSIGHT

Slow pipeline stages create bottlenecks; profiling and parallelizing independent stages is the fastest path to throughput gains. ### Where Pipelines Slow Down RAG pipelines have these typical stages: query parsing, rewriting/decomposition, embedding generation, vector search, reranking, context processing, and answer generation. The time cost is not distributed evenly—"embedding + search" dominates at high scale, while "answer generation" dominates for long contexts. ### Profiling the Pipeline ```python import time from functools import wraps def profileStage(func): @wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) elapsed = time.perf_counter() - start print(f"[PROFILE] {func.__name__}: {elapsed*1000:.1f}ms") return result, elapsed return wrapper # Instrumented pipeline def ragPipeline(query: str) -> tuple[str, dict]: timings = {} # Stage 1: Rewrite rewritten_queries, t1 = profileStage(rewriteQueryBatch)([query]) timings["rewrite"] = t1 # Stage 2: Embed embeddings, t2 = profileStage(generateEmbeddings)(rewritten_queries) timings["embedding"] = t2 # Stage 3: Search results, t3 = profileStage(vectorSearch)(embeddings, top_k=10) timings["search"] = t3 # Stage 4: Re-rank reranked, t4 = profileStage(rerankResults)(results, query) timings["rerank"] = t4 # Stage 5: Generate answer, t5 = profileStage(generateAnswer)(reranked, query) timings["generate"] = t5 total = sum(timings.values()) print(f"[PROFILE] Total: {total*1000:.1f}ms") print(f"[PROFILE] Breakdown: { {k: f'{v*1000:.1f}ms' for k,v in timings.items()} }") return answer, {"timings": timings, "total": total} ``` ### Parallelizing Independent Stages Rewrite and embedding are independent per-query but are often called serially. If decomposing into multiple sub-queries, embed them in parallel: ```python from concurrent.futures import ThreadPoolExecutor, as_completed def parallelEmbed(queryBatch: list[str], max_workers: int = 10) -> list[list[float]]: embeddings = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(embed_texts, [q]): i for i, q in enumerate(queryBatch)} results = [None] * len(queryBatch) for future in as_completed(futures): idx = futures[future] results[idx] = future.result()[0] return results # Before: serial embedding # for q in queries: # emb = embed_texts([q]) # blocking per query # After: parallel embedding # embeddings = parallelEmbed(queries, max_workers=10) # ~10x throughput for 10 queries ``` ### Async Wait Times For production HTTP-based calls (embedding API, generation API), use `asyncio` with `aiohttp` to overlap network I/O: ```python import asyncio import aiohttp async def asyncEmbedBatch( queries: list[str], api_url: str, api_key: str, batch_size: int = 100 ) -> list[list[float]]: semaphore = asyncio.Semaphore(batch_size) async def embedSingle(query: str) -> list[float]: async with semaphore: async with aiohttp.ClientSession() as session: payload = {"input": query} headers = {"Authorization": f"Bearer {api_key}"} async with session.post(api_url, json=payload, headers=headers) as resp: data = await resp.json() return data["embedding"] tasks = [embedSingle(q) for q in queries] return await asyncio.gather(*tasks) ``` ### Async Answer Generation ```python async def asyncPipeline(query: str) -> str: # Rewrite stage rewrite_task = asyncio.to_thread(rewriteQueryBatch, [query]) # Start rewrite while we await rewritten = await rewrite_task # Parallel: embed + generate query synopsis embed_task = asyncio.to_thread(generateEmbeddings, rewritten) search_task = asyncio.to_thread(vectorSearch, await embed_task, top_k=10) # Await search, then rerank results = await search_task reranked, rerank_time = await asyncio.to_thread(rerankWithCrossEncoder, results, query) # Generate answer, gen_time = await asyncio.to_thread(generateAnswer, reranked, query) return answer ``` ### Failure Modes Thread pool sizes that exceed the API rate limit cause 429 errors. Cache embeddings server-side when possible to avoid redundant computation. Async code introduces subtle race conditions—ensure thread safety on shared vector store connections. For local models, GIL contention limits parallelization of CPU-bound embedding runs; use batch inference instead.

EXERCISE

Profile the full pipeline with timing instrumentation. Identify the top 2 slowest stages. Parallelize at least one of them and measure the end-to-end latency reduction. (15 min)

← Chapter 18
Multi-Modal RAG
Chapter 20 →
Caching Strategies