HOW-TO · OPS

How to implement async prompt logging to S3 without blocking inference latency

advanced25 minBy Fredoline Eruo
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES

AWS S3 access (or MinIO), async Python (asyncio/aiohttp)

What this does

This guide implements a non-blocking logging system that buffers AI inference inputs and outputs in memory and periodically flushes them as gzipped JSONL files to S3 (or MinIO). The logger uses a background asyncio task and a bounded queue, ensuring that logging never adds latency to the critical inference path. This pattern is essential for production AI agents where audit trails must not degrade response times.

Steps

  1. Install the async S3 client:

    pip install aioboto3
    

    Expected output: Successfully installed aioboto3-12.4.0.

  2. Create an AsyncS3Logger class with an internal queue:

    import asyncio, json, gzip, time, uuid
    from datetime import datetime
    
    class AsyncS3Logger:
        def __init__(self, bucket, prefix, flush_interval=60, max_buffer=5000):
            self.bucket = bucket
            self.prefix = prefix
            self.flush_interval = flush_interval
            self.queue = asyncio.Queue(maxsize=max_buffer)
            self._task = None
    
  3. Implement the enqueue method for non-blocking log submission:

    async def log(self, entry_type, data, model=None):
        record = {
            "id": str(uuid.uuid4()), "type": entry_type,
            "timestamp": datetime.utcnow().isoformat(), "model": model,
            "data": data
        }
        await self.queue.put(record)
    
  4. Implement the flush loop that drains the queue and uploads to S3:

    async def _flush_loop(self):
        session = aioboto3.Session()
        while True:
            await asyncio.sleep(self.flush_interval)
            if self.queue.empty():
                continue
            batch = []
            while not self.queue.empty():
                batch.append(await self.queue.get())
            content = "\n".join(json.dumps(r) for r in batch)
            compressed = gzip.compress(content.encode())
            key = f"{self.prefix}/{datetime.utcnow().strftime('%Y/%m/%d/%H')}/{uuid.uuid4()}.jsonl.gz"
            async with session.client("s3") as s3:
                await s3.put_object(Bucket=self.bucket, Key=key,
                                    Body=compressed, ContentEncoding="gzip",
                                    ContentType="application/json")
    
  5. Start the logger as a background task during application startup:

    logger = AsyncS3Logger(bucket="ai-logs", prefix="inference")
    loop = asyncio.get_event_loop()
    loop.create_task(logger._flush_loop())
    
  6. Integrate the logger into the agent's request handler. After each inference:

    await logger.log("completion", {"input": user_input, "output": result}, model=model_name)
    

    This call returns immediately — the queue.put is non-blocking unless the buffer is full.

  7. Verify objects appear in S3 after the flush interval:

    aws s3 ls s3://ai-logs/inference/ --recursive | tail -5
    

    Expected output: .jsonl.gz files appearing within the flush interval.

Verification

aws s3 cp s3://ai-logs/inference/$(date -u +%Y/%m/%d)/$(date -u +%H)/$(aws s3 ls s3://ai-logs/inference/$(date -u +%Y/%m/%d)/$(date -u +%H)/ | tail -1 | awk '{print $4}') - | gunzip | jq '.type' | head -1

Expected output: "completion".

Common failures

  • Queue full exception — the flush interval is too long or S3 uploads are slow. Reduce flush_interval to 15 seconds or increase max_buffer to 10000.
  • S3 credentials missing — ensure AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set in the environment, or configure the IAM role if running on EC2.
  • Memory growth over time — if S3 writes consistently fail, the queue grows unboundedly. Add a maxsize to asyncio.Queue and handle QueueFull exceptions by dropping oldest entries.

Related guides