Caching is often a performance bottleneck, but its Achilles’ heel is stale data.
Let’s look at how a typical e-commerce scenario might handle product updates. Imagine a ProductService that manages product data and a ProductCache that stores frequently accessed product details for faster retrieval.
# product_service.py
class ProductService:
def get_product(self, product_id):
# Imagine this hits a database
print(f"Fetching product {product_id} from DB...")
return {"id": product_id, "name": "Gadget X", "price": 99.99}
# product_cache.py
class ProductCache:
def __init__(self):
self._cache = {}
def get(self, product_id):
print(f"Checking cache for product {product_id}...")
return self._cache.get(product_id)
def set(self, product_id, data):
print(f"Setting cache for product {product_id}...")
self._cache[product_id] = data
def invalidate(self, product_id):
print(f"Invalidating cache for product {product_id}...")
if product_id in self._cache:
del self._cache[product_id]
# main_app.py
from product_service import ProductService
from product_cache import ProductCache
product_service = ProductService()
product_cache = ProductCache()
# Initial fetch and cache
product_id = "abc-123"
product_data = product_service.get_product(product_id)
product_cache.set(product_id, product_data)
# Subsequent fetches hit cache
print("\n--- First few fetches ---")
for _ in range(3):
cached_data = product_cache.get(product_id)
if not cached_data:
cached_data = product_service.get_product(product_id)
product_cache.set(product_id, cached_data)
print(f"Retrieved: {cached_data}")
# Now, imagine a product update happens
print("\n--- Product Update ---")
# This update would typically come from an admin interface or another service
updated_product_data = {"id": product_id, "name": "Gadget X Pro", "price": 109.99}
# The problem: how does the cache know to update?
# Without a mechanism, subsequent fetches will still return stale data.
# product_cache.set(product_id, updated_product_data) # This would be manual, not event-driven
print("\n--- Fetch after update (without invalidation) ---")
# This will still hit the old data in the cache
cached_data = product_cache.get(product_id)
if not cached_data:
cached_data = product_service.get_product(product_id)
product_cache.set(product_id, cached_data)
print(f"Retrieved: {cached_data}")
The core problem is that the ProductCache is unaware of changes happening in the ProductService. The simplest, but most inefficient, solution is polling: the cache periodically asks the service if data has changed. This is wasteful and introduces latency.
A better approach is event-driven invalidation. When the ProductService updates a product, it publishes an event. Another component, often a message queue or an event bus, picks up this event and signals the ProductCache to remove the stale entry.
Here’s how that might look with a simplified eventing mechanism:
# event_bus.py
class EventBus:
def __init__(self):
self._listeners = {}
def subscribe(self, event_type, callback):
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(callback)
print(f"Subscribed to {event_type}")
def publish(self, event_type, data):
print(f"Publishing event: {event_type} with data: {data}")
if event_type in self._listeners:
for callback in self._listeners[event_type]:
callback(data)
# product_service.py (modified)
# ... (previous ProductService code)
class ProductService:
def __init__(self, event_bus):
self.event_bus = event_bus
def update_product(self, product_id, new_data):
# Imagine this updates the database
print(f"Updating product {product_id} in DB...")
# After successful DB update:
self.event_bus.publish("product_updated", {"id": product_id, **new_data})
return True
# product_cache.py (modified)
# ... (previous ProductCache code)
class ProductCache:
def __init__(self, event_bus):
self._cache = {}
self.event_bus = event_bus
self.event_bus.subscribe("product_updated", self.handle_product_update)
def handle_product_update(self, data):
product_id = data.get("id")
if product_id:
self.invalidate(product_id)
# ... (get, set, invalidate methods from before)
# main_app.py (modified)
from product_service import ProductService
from product_cache import ProductCache
from event_bus import EventBus
event_bus = EventBus()
product_service = ProductService(event_bus)
product_cache = ProductCache(event_bus) # Cache now knows about the event bus
# Initial setup and fetch
product_id = "abc-123"
product_data = product_service.get_product(product_id)
product_cache.set(product_id, product_data)
print("\n--- Fetching from cache ---")
for _ in range(2):
cached_data = product_cache.get(product_id)
if not cached_data:
cached_data = product_service.get_product(product_id)
product_cache.set(product_id, cached_data)
print(f"Retrieved: {cached_data}")
# Product update now triggers an event
print("\n--- Product Update with Event ---")
updated_product_data = {"name": "Gadget X Pro", "price": 109.99}
product_service.update_product(product_id, updated_product_data)
print("\n--- Fetch after update (with event invalidation) ---")
# Cache is now invalid for this product
cached_data = product_cache.get(product_id)
if not cached_data:
# This time, it will fetch from the DB and re-cache
cached_data = product_service.get_product(product_id)
product_cache.set(product_id, cached_data)
print(f"Retrieved: {cached_data}")
The ProductService now takes an EventBus instance. When update_product is called, after the database update, it publishes a product_updated event with the product’s ID. The ProductCache subscribes to this event type and, upon receiving it, calls its invalidate method for the specific product. This ensures that the next request for that product will bypass the cache and fetch fresh data from the source.
This pattern decouples the data source from the cache. The cache doesn’t need to know how the data changed, only that it changed. This is crucial for scalability, as the event bus can handle fan-out to many cache instances or other services that might depend on product updates.
The real magic is in how the cache reacts to external signals rather than initiating checks. This shifts the burden of consistency from the consumer (the cache) to the producer (the service). The event bus acts as the impartial messenger, ensuring that all interested parties are notified of state changes without any single party needing to know about all the others.
The next logical step is handling cache warming after invalidation, or understanding how to manage cache coherence across distributed systems using more robust messaging patterns like Kafka or RabbitMQ.