RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /How-to
  5. /How to build a real-time data pipeline using Apache Kafka or Redis Streams with AI-enriched events
HOW-TO · DEV

How to build a real-time data pipeline using Apache Kafka or Redis Streams with AI-enriched events

advanced·40 min·By Fredoline Eruo
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES

Kafka or Redis installed, AI model endpoint available

What this does

Real-time data pipelines consume event streams and produce enriched events with sub-second latency. This guide explains how to build a streaming pipeline where each raw event is enriched by an AI model before being forwarded to downstream consumers. The AI enrichment step can perform sentiment analysis, entity extraction, translation, spam classification, or any stateless transformation. The pipeline uses Kafka or Redis Streams as the message broker, applies backpressure to handle AI API rate limits, and writes enriched events to an output topic.

Steps

  1. Define the enrichment function. The function receives a raw event (JSON payload), extracts the relevant text fields, sends them to the AI model with a prompt describing the enrichment task, and returns the enriched payload with additional fields appended under an ai_enrichment key.

  2. Implement a consumer-producer loop. The consumer reads from the input topic with enable.auto.commit=False and uses manual offset commits only after the enrichment is successful. This guarantees at-least-once processing.

  3. Add a concurrency limiter using asyncio.Semaphore set to the AI API's concurrent request limit. Use asyncio.gather with bounded concurrency to process multiple events in parallel while respecting the rate limit.

  4. Implement backpressure: when the AI API returns a 429 status, pause message consumption for the duration of the Retry-After header plus jitter, then resume. Track consecutive failures and stop consuming if the rate limit persists for more than 60 seconds.

  5. Producer writes enriched events to an output topic. Use an async producer configured with linger.ms=5 and compression.type=lz4 to batch small writes efficiently without sacrificing latency.

  6. Add a dead-letter channel. If enrichment fails after three retries, write the original event to a dead_letter topic with a failure_reason header and continue processing.

  7. Deploy the pipeline as a long-running service. Use systemd or a process supervisor. Add a health check endpoint that reports consumer lag and enrichment success rate.

Verification

# Verify enriched events appear in the output topic
python3 -c "
import subprocess, json, sys
result = subprocess.run(
    ['python3', 'scripts/verify_kafka_enrichment.py'],
    capture_output=True, text=True, timeout=45
)
output = json.loads(result.stdout)
assert output['enriched_count'] > 0, 'No enriched events found in output topic'
assert output['enrichment_rate'] >= 0.95, f'Enrichment rate too low: {output[\"enrichment_rate\"]}'
print(f'Verification passed: {output[\"enriched_count\"]} enriched events, rate: {output[\"enrichment_rate\"]:.2%}')
sys.exit(0)
"
# Expected: Verification passed: <N> enriched events, rate: >=95%

Common failures

  • Consumer lag grows because AI enrichment latency exceeds the event ingestion rate. Increase the asyncio.Semaphore value up to the AI API's concurrency limit, or batch multiple events into a single AI call to reduce per-event latency overhead.
  • AI enrichment produces inconsistent results when the same event is processed twice after a consumer rebalance. The at-least-once semantics of Kafka mean events can be processed more than once. Design enrichment prompts to be idempotent and store a hash of the enrichment output to detect duplicates on the consumer side.
  • The Kafka consumer group rebalances every time a new instance is added, stalling processing. Set session.timeout.ms to 30000 and max.poll.interval.ms to 600000 on the consumer. Use static group membership (group.instance.id) in production to avoid unnecessary rebalances.
  • Redis Streams consumer group does not acknowledge pending messages after a crash. Implement a startup reconciliation step that reads all PEL (Pending Entries List) entries, retries them, and acknowledges them before beginning normal consumption.
  • Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
  • Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.

Related guides

  • How to design a batch data pipeline that extracts from APIs, transforms with AI, and loads into a data warehouse
  • How to set up an AI-assisted web scraping pipeline that extracts structured data from HTML
← All how-to guidesCourses →