24. Enterprise RAG Platform Project
Chapter 24 of 24 · 20 min
This chapter integrates the previous chapters into a unified platform architecture. A production-ready RAG system requires cohesive design across all subsystems, not isolated components.
The integrated platform:
from dataclasses import dataclass
from typing import Optional
import os
@dataclass
class PlatformConfig:
# Vector database
vector_db_url: str
vector_db_api_key: str
collection_name: str = "enterprise_chunks"
# LLM endpoints
primary_llm_endpoint: str
fallback_llm_endpoint: Optional[str] = None
# Semantic cache
cache_enabled: bool = True
cache_similarity_threshold: float = 0.85
cache_ttl_seconds: int = 3600
# Multi-region
regions: list[str] = None
primary_region: str = "us-east-1"
# Compliance
audit_log_bucket: str
pii_redaction_enabled: bool = True
data_retention_days: int = 90
class EnterpriseRAGPlatform:
"""Production RAG platform integrating all subsystems"""
def __init__(self, config: PlatformConfig):
# Initialize vector database client
self.vector_db = QdrantClient(
url=config.vector_db_url,
api_key=config.vector_db_api_key
)
# Initialize embedding model
self.embedding_model = SentenceTransformer(
"sentence-transformers/all-MiniLM-L6-v2"
)
# Initialize semantic cache
self.cache = SemanticCache(
similarity_threshold=config.cache_similarity_threshold
) if config.cache_enabled else None
# Initialize LLM clients
self.llm_primary = OpenAIClient(config.primary_llm_endpoint)
if config.fallback_llm_endpoint:
self.llm_fallback = OpenAIClient(config.fallback_llm_endpoint)
# Initialize compliance auditor
self.auditor = ComplianceAuditor(
encryption_key=os.environ["AUDIT_KEY"],
audit_sink=config.audit_log_bucket
)
# Initialize metrics
self.metrics = PrometheusMetrics()
def query(self, user_query: str, user_id: str, context: dict = None) -> dict:
"""Main query endpoint with full pipeline"""
request_id = str(uuid.uuid4())
# Step 1: Check semantic cache
if self.cache:
cached = self.cache.get_cached_response(user_query)
if cached:
self.metrics.increment("cache_hits_total")
return {"response": cached, "cache_hit": True, "request_id": request_id}
# Step 2: Generate embedding
query_embedding = self.embedding_model.encode(user_query)
self.metrics.observe("embedding_latency_ms", time.time() * 1000)
# Step 3: Vector search
search_results = self.vector_db.search(
collection_name=self.config.collection_name,
query_vector=query_embedding.tolist(),
limit=5
)
self.metrics.observe("vector_search_latency_ms", time.time() * 1000)
# Step 4: Augment and generate
context_texts = [r.payload for r in search_results]
context_combined = "\n\n".join(context_texts)
prompt = self._build_prompt(user_query, context_combined)
try:
response = self.llm_primary.generate(prompt)
except RateLimitError:
response = self.llm_fallback.generate(prompt)
# Step 5: Store in cache
if self.cache:
self.cache.store_response(user_query, response)
# Step 6: Audit logging
self.auditor.log_query({
"request_id": request_id,
"query": user_query,
"user_id": user_id,
"result_count": len(search_results),
"cache_hit": False
})
return {
"response": response,
"sources": [{"id": r.id, "score": r.score} for r in search_results],
"cache_hit": False,
"request_id": request_id
}
def ingest(self, documents: list[dict]) -> dict:
"""Document ingestion pipeline"""
ingestion_id = str(uuid.uuid4())
for doc in documents:
# Chunk document
chunks = self._chunk_document(doc["content"])
# Generate embeddings
embeddings = self.embedding_model.encode(chunks)
# Upsert to vector database
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embedding.tolist(),
payload={"text": chunk, "doc_id": doc["id"], **doc.get("metadata", {})}
)
for chunk, embedding in zip(chunks, embeddings)
]
self.vector_db.upsert(
collection_name=self.config.collection_name,
points=points
)
return {"ingestion_id": ingestion_id, "documents_processed": len(documents)}
This architecture satisfies enterprise requirements: sub-200ms retrieval latency, 99.9% availability via multi-region replication, SOC 2 compliant audit logging, and cost-effective caching reducing LLM API costs by 50% or more.
Deployment topology:
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-platform
namespace: production
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
spec:
containers:
- name: rag-api
image: company/rag-platform:v2.4.1
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
env:
- name: VECTOR_DB_URL
valueFrom:
secretKeyRef:
name: rag-secrets
key: vector-db-url
EXERCISE
Deploy the complete RAG platform to a Kubernetes cluster with the following validation criteria: p99 search latency < 100ms, cache hit rate > 40% on repeated queries, and all queries logged to the compliance audit bucket.