CQRS command and query pipelines are not merely about separating reads from writes; they’re about fundamentally changing how you reason about data flow and state changes.
Let’s watch a command pipeline in action. Imagine an e-commerce system. A customer adds an item to their cart. This isn’t just a database update; it’s a command: AddItemToCartCommand.
{
"CommandType": "AddItemToCartCommand",
"AggregateId": "user-123",
"Payload": {
"ItemId": "product-abc",
"Quantity": 1,
"Price": 19.99
},
"Timestamp": "2023-10-27T10:30:00Z"
}
This command is sent to a command handler. The handler’s job is to load the Cart aggregate (identified by user-123), apply the AddItemToCart logic to it (which might involve checking inventory, validating the item, etc.), and then persist the resulting changes. The output of this process isn’t just a success/failure; it’s a sequence of domain events.
[
{
"EventType": "ItemAddedToCartEvent",
"AggregateId": "user-123",
"Payload": {
"ItemId": "product-abc",
"Quantity": 1,
"Price": 19.99
},
"Timestamp": "2023-10-27T10:30:05Z"
}
]
These events are then published. They represent facts about what happened. The command pipeline’s primary job is to ensure that these events are generated correctly and reliably based on the incoming command and the current state of the aggregate.
Now, the query pipeline. A user wants to view their cart. This triggers a query: GetCartQuery.
{
"QueryType": "GetCartQuery",
"QueryParameters": {
"UserId": "user-123"
}
}
This query bypasses the command handlers and aggregates entirely. It goes directly to a read model, often a denormalized data store optimized for quick retrieval. This read model was built by subscribing to the domain events published by the command pipeline.
{
"CartId": "user-123",
"Items": [
{
"ItemId": "product-abc",
"Quantity": 1,
"Price": 19.99
}
],
"TotalItems": 1,
"TotalPrice": 19.99
}
The system doesn’t query the Cart aggregate directly for this view; it queries a specialized CartReadModel. This separation is key. The command pipeline focuses on consistency and state transitions (ensuring invariants are met), while the query pipeline focuses on performance and serving specific read needs.
Observing these pipelines means instrumenting both sides. For commands, you want to log the incoming command, the aggregate it targets, the events generated, and the final state of the aggregate after applying those events. For queries, you log the incoming query, the read model it hits, and the data returned. This gives you visibility into the entire flow.
A common pattern for logging these events is to use a dedicated event bus or message broker. When the command handler successfully processes a command and generates events, it publishes these events to the bus. Any interested subscriber, including your logging service and your read model projectors, can then consume these events.
# Example using a hypothetical message bus
def handle_add_item_command(command):
cart_aggregate = cart_repository.get(command.aggregate_id)
events = cart_aggregate.add_item(command.payload.item_id, command.payload.quantity, command.payload.price)
cart_repository.save(cart_aggregate, events)
for event in events:
message_bus.publish(event, topic="domain_events")
logger.info(f"Command {command.command_type} processed. Generated events: {len(events)}")
def project_cart_read_model(event):
if event.event_type == "ItemAddedToCartEvent":
read_model_repository.update_cart(event.aggregate_id, event.payload)
logger.info(f"Read model updated for cart {event.aggregate_id} due to {event.event_type}")
message_bus.subscribe("domain_events", project_cart_read_model)
This system is designed to handle eventual consistency. The command pipeline ensures that the source of truth (the events) is updated atomically. The read models, however, are updated asynchronously by subscribing to these events. This means there’s a small window where a query might return stale data until the read model projector has processed the latest event. This is a fundamental trade-off for achieving high read throughput.
The most surprising aspect for many is how resilient this event-driven approach makes your system. If your read model database goes down, it doesn’t stop new commands from being processed. Once the database is back online, the read model projector can simply re-process any missed events from the event bus or a persistent event store, catching up to the current state without losing any data or requiring manual intervention.
Once you’ve got robust logging and observation for your command and query pipelines, the next natural step is to think about how to manage and monitor the health of your read models themselves, ensuring they stay synchronized and performant.