CQRS doesn’t inherently solve write conflicts; it just separates the read and write concerns, often making those conflicts more apparent.

Imagine a simple e-commerce scenario: a user is viewing a product’s details (read side) while another user is simultaneously updating that same product’s inventory (write side). If we don’t handle this carefully, the second user’s update could overwrite the first user’s changes, or worse, lead to an inconsistent state. CQRS, by design, separates these operations, which is powerful, but it means we need a robust strategy for when two commands targeting the same aggregate arrive at roughly the same time.

Let’s look at how we can manage this using optimistic concurrency.

The Scenario: Two Commands, One Aggregate

Consider a Product aggregate. It has an Id, a Name, and an InventoryCount.

public class Product : AggregateRoot
{
    public Guid Id { get; private set; }
    public string Name { get; private set; }
    public int InventoryCount { get; private set; }

    // Event handlers for state reconstruction
    private void Apply(ProductCreatedEvent ev)
    {
        Id = ev.ProductId;
        Name = ev.Name;
        InventoryCount = ev.InitialInventory;
    }

    // Command handlers
    public void UpdateInventory(int quantityChange)
    {
        if (InventoryCount + quantityChange < 0)
        {
            throw new InvalidOperationException("Inventory cannot be negative.");
        }
        ApplyChange(new InventoryUpdatedEvent(Id, InventoryCount, InventoryCount + quantityChange));
    }

    private void Apply(InventoryUpdatedEvent ev)
    {
        InventoryCount = ev.NewInventoryCount;
    }
}

Now, let’s say we have two commands:

  1. Command A: UpdateInventoryCommand(productId: "some-product-id", quantityChange: -5) (User 1 buys 5 items)
  2. Command B: UpdateInventoryCommand(productId: "some-product-id", quantityChange: -3) (User 2 buys 3 items)

If both commands are processed by the command handler for the Product aggregate, and the Product aggregate is loaded from the event store without any concurrency checks, the outcome depends on which command’s event is applied last.

If Command B’s event is applied last, the inventory might go from 100 to 95 (Command A) and then to 92 (Command B). This is incorrect; the final inventory should reflect both reductions, resulting in 92 (100 - 5 - 3). The reduction from Command A is lost.

Optimistic Concurrency: The Versioning Strategy

Optimistic concurrency works by assigning a version number to each event stream (or aggregate instance). When you load an aggregate, you get its current state and its current version. When you save new events for that aggregate, you must specify the version you expected to be the current one.

If the actual current version in the event store is higher than the version you provided, it means someone else has written events since you loaded it. The event store then rejects your write, signalling a conflict.

Implementation in an Event Store

Most event stores provide a mechanism for this. Let’s assume an event store with an AppendEventsAsync method that takes the aggregate’s expected version.

public interface IEventStore
{
    Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId);
    Task AppendEventsAsync(Guid aggregateId, IEnumerable<DomainEvent> events, int expectedVersion);
}

In our CQRS setup, the command handler would:

  1. Load the aggregate’s events from the event store.
  2. Rehydrate the aggregate to its current state and get its current version (which is simply the number of events loaded).
  3. Apply the new business logic, generating new domain events.
  4. Attempt to append these new events to the event store, providing the current version of the aggregate as the expectedVersion.

The Command Handler Logic

public class ProductCommandHandler : ICommandHandler<UpdateInventoryCommand>
{
    private readonly IEventStore _eventStore;
    private readonly IAggregateFactory _aggregateFactory; // To create aggregate instances

    public ProductCommandHandler(IEventStore eventStore, IAggregateFactory aggregateFactory)
    {
        _eventStore = eventStore;
        _aggregateFactory = aggregateFactory;
    }

    public async Task HandleAsync(UpdateInventoryCommand command)
    {
        // 1. Load events and get current version
        var events = await _eventStore.GetEventsAsync(command.ProductId);
        var currentVersion = events.Count();

        // 2. Rehydrate aggregate
        var product = _aggregateFactory.Create<Product>(events);

        // 3. Apply business logic (generates new events internally)
        product.UpdateInventory(command.QuantityChange);
        var newEvents = product.GetChanges(); // Get the newly generated events

        // 4. Attempt to append events with optimistic concurrency check
        try
        {
            await _eventStore.AppendEventsAsync(command.ProductId, newEvents, currentVersion);
        }
        catch (ConcurrencyException ex) // Assume the event store throws this
        {
            // CONFLICT DETECTED! Now what?
            throw new WriteConflictException($"Write conflict for product {command.ProductId}.", ex);
        }
    }
}

Handling the Conflict (The "What Now?" Part)

When a ConcurrencyException is caught, the command handler needs to decide how to proceed. The most common strategies are:

  • Retry: This is often the simplest approach, especially for idempotent commands. The command handler can simply re-attempt the entire process: load the latest state (which will now include the events that caused the conflict), re-apply the business logic, and try to save again. This is usually done with a limited number of retries and an exponential backoff strategy.

    • Diagnosis: Look for ConcurrencyException in your command handler logs.
    • Fix: Implement a retry mechanism in your command handler. For example, using Polly in .NET.
      // Using Polly for retries
      var retryPolicy = Policy
          .Handle<WriteConflictException>()
          .WaitAndRetryAsync(
              retryCount: 3,
              sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
              onRetry: (exception, timeSpan, retryCount, context) =>
              {
                  // Log the retry attempt
                  _logger.LogWarning($"Retry {retryCount} for command due to conflict. Waiting {timeSpan.TotalSeconds}s.");
              });
      
      await retryPolicy.ExecuteAsync(async () =>
      {
          // ... original command handler logic ...
      });
      
    • Why it works: The retry reloads the aggregate, incorporating the changes from the conflicting write. The business logic is then applied to the new state, and the append is attempted again with the new expected version.
  • Merge (If Possible): If the operations are commutative or can be intelligently merged, you might try to merge the changes. In our inventory example, if User 1 wanted to add 5 items and User 2 wanted to add 3 items, and a conflict occurred, we could potentially merge these into a single "add 8 items" operation. This is complex and highly domain-specific.

    • Diagnosis: ConcurrencyException where the conflicting events represent operations that could be merged (e.g., multiple independent additions/removals that don’t invalidate each other).
    • Fix: In the catch block, load the latest aggregate state. Re-apply the original command’s logic to the latest state. Then, examine the newly arrived events that caused the conflict. If they represent operations that can be combined (e.g., two ItemAddedToCart events for the same item by different users), create a new command or event that represents the merged operation.
    • Why it works: It attempts to resolve the conflict by creating a single, consolidated operation that reflects the intent of both conflicting operations.
  • Reject and Notify User: For operations where merging is impossible or undesirable (e.g., updating a product name, or a critical inventory deduction where the user must be informed), the best approach is to inform the user that their action could not be completed due to a conflict and ask them to try again, or to review the current state.

    • Diagnosis: ConcurrencyException for operations that are not easily mergeable or where state consistency is paramount and requires user intervention.
    • Fix: In the catch block, instead of retrying, throw a specific UserFacingConflictException or return a command result indicating failure and providing a message like "The product inventory was updated by another user. Please review the current stock and try again."
    • Why it works: It prevents the system from making potentially incorrect automatic changes and ensures the user is aware of the conflict and can make an informed decision.

The Version Number in the Event Store

When you use an event store like EventStoreDB, the version is often represented by the event stream’s version number. When you AppendToStreamAsync(streamId, expectedVersion, events), EventStoreDB checks if the stream’s current version matches expectedVersion. If not, it throws a WrongExpectedVersionException (which is a type of concurrency error).

For example, if a Product aggregate has 10 events, its version is 10. If you load it, rehydrate, and generate 2 new events, you would attempt to append them with expectedVersion = 10. If another command also appended 3 events, the stream version would now be 13. Your append with expectedVersion = 10 would fail.

The Next Hurdle: Eventual Consistency

After successfully handling a write conflict and appending events, your write side has moved forward. However, your read side, which might be populated from a separate database or cache, is likely still reflecting the old state. This is the essence of eventual consistency. Your next challenge will be ensuring your read models are updated promptly and correctly, and that users querying the read side see a consistent (though potentially slightly stale) view of the data.

Want structured learning?

Take the full Cqrs course →