14. Cache Invalidation
Cache invalidation in semantic caches presents unique challenges because exact matches don't exist—only semantic proximity. A cached answer about "databaseconnection timeouts" might need invalidation when infrastructure changes make it obsolete.
Time-based expiration is the simplest strategy:
def invalidate_by_ttl(self, key: str):
"""Explicit TTL-based invalidation"""
self.redis.expire(key, 0) # Immediate expiration
return True
def invalidate_by_source(self, source_document_id: str):
"""Invalidate all cache entries derived from a specific document"""
affected_keys = self.redis.smembers(f"doc_sources:{source_document_id}")
pipe = self.redis.pipeline()
for key in affected_keys:
pipe.delete(key)
pipe.delete(f"doc_sources:{source_document_id}")
pipe.execute()
Tag-based invalidation links cache entries to source documents:
def tag_cache_entry(self, cache_key: str, source_doc_ids: list[str]):
for doc_id in source_doc_ids:
self.redis.sadd(f"doc_sources:{doc_id}", cache_key)
self.redis.sadd(f"cache:{cache_key}:tags", *source_doc_ids)
# On document update
def on_document_updated(self, doc_id: str, new_content: str):
# Invalidate related cache entries
related_keys = self.redis.smembers(f"doc_sources:{doc_id}")
if not related_keys:
return {"invalidated": 0}
pipe = self.redis.pipeline()
for key in related_keys:
pipe.delete(key)
pipe.delete(f"doc_sources:{doc_id}")
pipe.execute()
return {"invalidated": len(related_keys)}
Failure Modes:
- Cascading invalidation storms: Document updates touching thousands of derived cache entries cause sudden cache miss spikes. Solution: batch invalidation with jitter to spread load.
- Stale cross-references: Cache entries tagged with multiple documents become invalid when only one source updates. The entry must be fully purged.
- Orphaned keys: Tags deleted before cache keys, or vice versa. Transaction-based operations prevent partial cleanup.
Monitoring invalidation rates helps detect problems:
def get_invalidation_stats(self) -> dict:
info = self.redis.info("stats")
keyspace_hits = info.get("keyspace_hits", 0)
keyspace_misses = info.get("keyspace_misses", 0)
return {
"hit_rate": keyspace_hits / (keyspace_hits + keyspace_misses),
"total_invalidations": self.redis.get("metrics:invalidations") or 0,
"avg_ttl_remaining": self._avg_ttl()
}
Local verification checkpoint
Run the smallest example from this chapter in a local workspace and record the package version, runtime, data path, and observed output. If the result depends on model size, vector count, CPU/GPU backend, or available memory, note that constraint beside the exercise so the lesson remains reproducible.
Extend the cache store to support tag-based invalidation. Simulate a document update and verify that all related cache entries are removed.