How to implement async prompt logging to S3 without blocking inference latency
AWS S3 access (or MinIO), async Python (asyncio/aiohttp)
What this does
This guide implements a non-blocking logging system that buffers AI inference inputs and outputs in memory and periodically flushes them as gzipped JSONL files to S3 (or MinIO). The logger uses a background asyncio task and a bounded queue, ensuring that logging never adds latency to the critical inference path. This pattern is essential for production AI agents where audit trails must not degrade response times.
Steps
Install the async S3 client:
pip install aioboto3Expected output:
Successfully installed aioboto3-12.4.0.Create an
AsyncS3Loggerclass with an internal queue:import asyncio, json, gzip, time, uuid from datetime import datetime class AsyncS3Logger: def __init__(self, bucket, prefix, flush_interval=60, max_buffer=5000): self.bucket = bucket self.prefix = prefix self.flush_interval = flush_interval self.queue = asyncio.Queue(maxsize=max_buffer) self._task = NoneImplement the enqueue method for non-blocking log submission:
async def log(self, entry_type, data, model=None): record = { "id": str(uuid.uuid4()), "type": entry_type, "timestamp": datetime.utcnow().isoformat(), "model": model, "data": data } await self.queue.put(record)Implement the flush loop that drains the queue and uploads to S3:
async def _flush_loop(self): session = aioboto3.Session() while True: await asyncio.sleep(self.flush_interval) if self.queue.empty(): continue batch = [] while not self.queue.empty(): batch.append(await self.queue.get()) content = "\n".join(json.dumps(r) for r in batch) compressed = gzip.compress(content.encode()) key = f"{self.prefix}/{datetime.utcnow().strftime('%Y/%m/%d/%H')}/{uuid.uuid4()}.jsonl.gz" async with session.client("s3") as s3: await s3.put_object(Bucket=self.bucket, Key=key, Body=compressed, ContentEncoding="gzip", ContentType="application/json")Start the logger as a background task during application startup:
logger = AsyncS3Logger(bucket="ai-logs", prefix="inference") loop = asyncio.get_event_loop() loop.create_task(logger._flush_loop())Integrate the logger into the agent's request handler. After each inference:
await logger.log("completion", {"input": user_input, "output": result}, model=model_name)This call returns immediately — the
queue.putis non-blocking unless the buffer is full.Verify objects appear in S3 after the flush interval:
aws s3 ls s3://ai-logs/inference/ --recursive | tail -5Expected output:
.jsonl.gzfiles appearing within the flush interval.
Verification
aws s3 cp s3://ai-logs/inference/$(date -u +%Y/%m/%d)/$(date -u +%H)/$(aws s3 ls s3://ai-logs/inference/$(date -u +%Y/%m/%d)/$(date -u +%H)/ | tail -1 | awk '{print $4}') - | gunzip | jq '.type' | head -1
Expected output: "completion".
Common failures
- Queue full exception — the flush interval is too long or S3 uploads are slow. Reduce
flush_intervalto 15 seconds or increasemax_bufferto 10000. - S3 credentials missing — ensure
AWS_ACCESS_KEY_IDandAWS_SECRET_ACCESS_KEYare set in the environment, or configure the IAM role if running on EC2. - Memory growth over time — if S3 writes consistently fail, the queue grows unboundedly. Add a
maxsizetoasyncio.Queueand handleQueueFullexceptions by dropping oldest entries.