How to build a real-time data pipeline using Apache Kafka or Redis Streams with AI-enriched events
Kafka or Redis installed, AI model endpoint available
What this does
Real-time data pipelines consume event streams and produce enriched events with sub-second latency. This guide explains how to build a streaming pipeline where each raw event is enriched by an AI model before being forwarded to downstream consumers. The AI enrichment step can perform sentiment analysis, entity extraction, translation, spam classification, or any stateless transformation. The pipeline uses Kafka or Redis Streams as the message broker, applies backpressure to handle AI API rate limits, and writes enriched events to an output topic.
Steps
Define the enrichment function. The function receives a raw event (JSON payload), extracts the relevant text fields, sends them to the AI model with a prompt describing the enrichment task, and returns the enriched payload with additional fields appended under an
ai_enrichmentkey.Implement a consumer-producer loop. The consumer reads from the input topic with
enable.auto.commit=Falseand uses manual offset commits only after the enrichment is successful. This guarantees at-least-once processing.Add a concurrency limiter using
asyncio.Semaphoreset to the AI API's concurrent request limit. Useasyncio.gatherwith bounded concurrency to process multiple events in parallel while respecting the rate limit.Implement backpressure: when the AI API returns a 429 status, pause message consumption for the duration of the
Retry-Afterheader plus jitter, then resume. Track consecutive failures and stop consuming if the rate limit persists for more than 60 seconds.Producer writes enriched events to an output topic. Use an async producer configured with
linger.ms=5andcompression.type=lz4to batch small writes efficiently without sacrificing latency.Add a dead-letter channel. If enrichment fails after three retries, write the original event to a
dead_lettertopic with afailure_reasonheader and continue processing.Deploy the pipeline as a long-running service. Use
systemdor a process supervisor. Add a health check endpoint that reports consumer lag and enrichment success rate.
Verification
# Verify enriched events appear in the output topic
python3 -c "
import subprocess, json, sys
result = subprocess.run(
['python3', 'scripts/verify_kafka_enrichment.py'],
capture_output=True, text=True, timeout=45
)
output = json.loads(result.stdout)
assert output['enriched_count'] > 0, 'No enriched events found in output topic'
assert output['enrichment_rate'] >= 0.95, f'Enrichment rate too low: {output[\"enrichment_rate\"]}'
print(f'Verification passed: {output[\"enriched_count\"]} enriched events, rate: {output[\"enrichment_rate\"]:.2%}')
sys.exit(0)
"
# Expected: Verification passed: <N> enriched events, rate: >=95%
Common failures
- Consumer lag grows because AI enrichment latency exceeds the event ingestion rate. Increase the
asyncio.Semaphorevalue up to the AI API's concurrency limit, or batch multiple events into a single AI call to reduce per-event latency overhead. - AI enrichment produces inconsistent results when the same event is processed twice after a consumer rebalance. The at-least-once semantics of Kafka mean events can be processed more than once. Design enrichment prompts to be idempotent and store a hash of the enrichment output to detect duplicates on the consumer side.
- The Kafka consumer group rebalances every time a new instance is added, stalling processing. Set
session.timeout.msto 30000 andmax.poll.interval.msto 600000 on the consumer. Use static group membership (group.instance.id) in production to avoid unnecessary rebalances. - Redis Streams consumer group does not acknowledge pending messages after a crash. Implement a startup reconciliation step that reads all PEL (Pending Entries List) entries, retries them, and acknowledges them before beginning normal consumption.
- Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
- Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.