How to build a structured prompt/response logging pipeline with Fluentd
Fluentd installed, AI service with logging hooks
What this does
This guide builds a Fluentd-based logging pipeline that captures every input and output pair from an AI agent, enriches each record with metadata (model name, latency, token count), and routes the structured logs to multiple destinations — local files for debugging, S3 for long-term archival, and Elasticsearch for real-time search. The pipeline handles high-throughput inference workloads with buffered, asynchronous writes.
Steps
Install the required Fluentd plugins:
td-agent-gem install fluent-plugin-s3 fluent-plugin-elasticsearchExpected output:
Successfully installed fluent-plugin-s3-1.8.0 fluent-plugin-elasticsearch-5.3.0.Configure the AI agent to emit JSON log lines. In Python:
import json, sys def log_interaction(input_text, output_text, model, tokens, latency_ms): record = {"type": "llm_interaction", "input": input_text, "output": output_text, "model": model, "tokens": tokens, "latency_ms": latency_ms, "timestamp": time.time()} sys.stdout.write(json.dumps(record) + "\n") sys.stdout.flush()Create a Fluentd configuration at
/etc/td-agent/td-agent.conf:<source> @type tail path /var/log/ai-agent/*.log tag ai.interactions <parse> @type json </parse> </source> <filter ai.interactions> @type record_transformer <record> hostname "#{Socket.gethostname}" ingested_at "#{Time.now.iso8601}" </record> </filter> <match ai.interactions> @type copy <store> @type elasticsearch host elasticsearch.local port 9200 index_name ai-interactions include_tag_key true </store> <store> @type s3 s3_bucket ai-log-archive s3_region us-east-1 path interactions/%Y/%m/%d/ store_as json <buffer time> @type file path /var/log/td-agent/buffer/s3 timekey 3600 </buffer> </store> </match>Restart Fluentd and check the service status:
sudo systemctl restart td-agent && sudo systemctl status td-agentExpected output:
Active: active (running).Configure log rotation for the agent's log directory:
sudo logrotate -f /etc/logrotate.d/ai-agentSend a test interaction through the agent and verify the log appears:
tail -f /var/log/td-agent/td-agent.log | grep "llm_interaction"Expected output: a line showing the ingested record.
Verification
curl -s "http://elasticsearch.local:9200/ai-interactions/_count" | jq '.count'
Expected output: an integer >= 1, confirming records reached Elasticsearch.
Common failures
- Fluentd fails to tail the log file — check file permissions with
ls -la /var/log/ai-agent/. Thetd-agentuser must have read access. Fix withsudo chmod 644 /var/log/ai-agent/*.log. - Elasticsearch index not found — verify index creation. Fluentd auto-creates indices but may fail if the Elasticsearch cluster has strict index templates. Create the index template manually:
curl -X PUT "elasticsearch.local:9200/_index_template/ai-interactions" -H 'Content-Type: application/json' -d '{"index_patterns":["ai-interactions-*"]}'. - S3 uploads fail with authentication errors — set AWS credentials in Fluentd's environment: add
Environment=AWS_ACCESS_KEY_ID=...to the td-agent systemd unit file. - Buffer files accumulate on disk — the S3 or Elasticsearch endpoint is unreachable. Check
td-agent.logfor connection errors and increaseretry_limitin the buffer configuration.