Announcing my new book - Practical ASP.NET MVC 4
This article requires a basic understanding of Domain Driven Design.
When an Application Service receives control, it loads an Aggregate and retrieves any supporting Domain Services needed by the Aggregate’s business operation. When the Application Service delegates to the Aggregate business operation, the Aggregate’s method produces Events as the outcome. Those Events mutate the state of the Aggregate and are also published as notifications to all subscribers. The Aggregate’s business method may require passing one or more Domain Services as parameters. The use of any such Domain Services could compute values used to cause side effects to the Aggregate’s state. Some such Domain Service operations could include calling a payment gateway, requesting a unique identity, or querying data from a remote system.
The following Application Service implemented in C# shows how the steps might be supported.
public class CustomerApplicationService
{
// event store for accessing event streams
IEventStore _eventStore;
// domain service that is neeeded by aggregate
IPricingService _pricingService;
// pass dependencies for this application service via constructor
public CustomerApplicationService(
IEventStore eventStore,
IPricingService pricing)
{
_eventStore = eventStore;
_pricingService = pricing;
}
// Step 1: LockForAccountOverdraft method of
// Customer Application Service is called
public void LockForAccountOverdraft(
CustomerId customerId, string comment)
{
// Step 2.1: Load event stream for Customer, given its id
var stream = _eventStore.LoadEventStream(customerId);
// Step 2.2: Build aggregate from event stream
var customer = new Customer(stream.Events);
// Step 3: call aggregate method, passing it arguments and
// pricing domain service
customer.LockForAccountOverdraft(comment, _pricingService);
// Step 4: commit changes to the event stream by id
_eventStore.AppendToStream(
customerId, stream.Version, customer.Changes);
}
public void LockCustomer(CustomerId customerId, string reason)
{
var stream = _eventStore.LoadEventStream(customerId);
var customer = new Customer(stream.Events);
customer.Lock(reason);
_eventStore.AppendToStream(
customerId, stream.Version, customer.Changes);
}
// other methods on this application service
}
The CustomerApplicationService is initialized with two dependencies through the constructor, the IEventStore andIPricingService. Constructor-based initialization is a worthy means to fulfill the dependencies, but they could have been retrieved by means of a Service Locator or using dependency injection. Your team standards and practices reign.
Our IEventStore can have a simple interface definition, and our EventStream follows suit:
public interface IEventStore
{
EventStream LoadEventStream(IIdentity id);
EventStream LoadEventStream(
IIdentity id, int skipEvents, int maxCount);
void AppendToStream(
IIdentity id, int expectedVersion, ICollection<IEvent> events);
}
public class EventStream
{
// version of the event stream returned
public int Version;
// all events in the stream
public List<IEvent> Events;
}
This Event Store can be implemented quite easily with a relational database (i.e. Microsoft SQL, Oracle, and MySQL) or with a NoSql store that has strong consistency guarantees (i.e. file system, MongoDB, RavenDB, and Azure Blob Storage).
We load Events from the Event Store using the unique identity of the Aggregate instance to be reconstituted. Let’s see how this can be done for an Aggregate named Customer. Although the unique identity could have any type, for expressiveness let’s use an IIdentity interface implemented by CustomerId.
We need to load the Events belonging to the specific Customer, and the Events are passed to its constructor to instantiate the Aggregate:
var eventStream = _eventStore.LoadEventStream(customerId);
var customer = new Customer(eventStream.Events);
As seen here, the Aggregate applies each Event by replaying them through method Mutate(). Here’s how it works:
public partial class Customer
{
public Customer(IEnumerable<IEvent> events)
{
// reinstate this aggregate to the latest version
foreach (var @event in events)
{
Mutate(@event);
}
}
public bool ConsumptionLocked { get; private set; }
public void Mutate(IEvent e)
{
// .NET magic to call one of 'When' handlers with
// matching signature
((dynamic) this).When((dynamic)e);
}
public void When(CustomerLocked e)
{
ConsumptionLocked = true;
}
public void When(CustomerUnlocked e)
{
ConsumptionLocked = false;
}
// etc.
Mutate() just locates (via .NET dynamics) the appropriate overloaded When() method by the specific Event parameter type, and then executes the method by passing in the Event. After Mutate() has completed, the Customer instance has a completely reconstituted state.
We can make a reusable query operation for reconstituting an Aggregate instance from the Event Store:
public Customer LoadCustomerById(CustomerId id)
{
var eventStream = _eventStore.LoadEventStream(id);
var customer = new Customer(eventStream.Events);
return customer;
}
After considering how an Aggregate instance can be reconstituted from a Stream of historic Events, it’s easy to imagine other uses for the historic record. We can use them to look back in time just to view what happened, and when. The view capability becomes even more powerful when considering the need to debug production deployments.
How are business operations performed? Once the Aggregate is reconstituted from the Event Store, the Application Service delegates to a command operation on the Aggregate instance. It uses its current state and any Domain Services required by the contract to carry out the operation. As a behavior is executed, changes to the state are expressed as new Events. Each new Event is passed to the Aggregate’s Apply() method
As seen in the following code, new Events are accumulated in the Changes collection and then used to mutate the current state of the Aggregate:
public partial class Customer
{
...
void Apply(IEvent event)
{
// append event to change list for further persistence
Changes.Add(event);
// pass each event to modify current in-memory state
Mutate(event);
}
...
}
All Events added to the Changes collection will be persisted as newly appended. Since each Event is also used to immediately mutate the Aggregate’s state, if a behavior has multiple steps, each subsequent step has up-to-date state to operate on.
Next take a look at some of the business behavior of the Customer Aggregate:
public partial class Customer
{
// Second part of aggregate class
public List<IEvent> Changes = new List<IEvent>();
public void LockForAccountOverdraft(
string comment, IPricingService pricing)
{
if (!ManualBilling)
{
var balance = pricing.GetOverdraftThreshold(Currency);
if (Balance < balance)
{
LockCustomer("Overdraft. " + comment);
}
}
}
public void LockCustomer(string reason)
{
if (!ConsumptionLocked)
{
Apply(new CustomerLocked(_state.Id, reason));
}
}
// Other business methods are not shown...
void Apply(IEvent e)
{
Changes.Add(e);
Mutate(e);
}
}
To make your code clearer, you can split the implementation into two distinct classes, one for state and one for behavior, and the state object being held by the behavioral. The two objects would collaborate exclusively through the Apply() method. This ensures that state is mutated only by means of Events.
Once the mutating behaviors have completed, we must commit the Changes collection to the Event Store. We append all new changes, ensuring that there are no concurrency conflicts with other writing threads. This check is possible because we pass a concurrency version variable from the Load() to Append() methods.
In the simplest implementation, there will be a background processor that catches up with newly appended Events and publishes them to a messaging infrastructure (i.e. RabbitMQ, JMS, MSMQ, or cloud queues), delivering them to all interested parties.
This simple implementation can be replaced by more elaborate ones. One such immediately or eventually replicates Events to one or more clones, increasing fault tolerance. The figure below shows immediate Event replication to one clone. In this case, the Master Event Store considers its own Events to be saved only after it successfully replicated them to the Clone Event Store, which is a write-through strategy.
Figure A.6. Write Through: A Master Event Store immediately replicates all newly appended Events to a Clone Event Store.
An alternative is to replicate Events to the Clone after changes are saved by the Master using a separate thread, which is a write-behind strategy. This approach is illustrated in the figure shown below. In this case the Clone could be inconsistent with the Master, which is especially true if a server crashes or if partitioning is impacted by network latency.
To summarize what has been discussed so far, let’s walk through the execution sequence that begins with the invocation of an operation on an Application Service:
1. A client invokes a method on an Application Service
2. Obtain any Domain Services needed to carry out the business operation
3. With the client-supplied Aggregate instance identity, retrieve its Event Stream
4. Reconstitute the Aggregate instance by applying to it all Events from the Stream
5. Execute a business operation provided by the Aggregate, passing in all parameters required by the interface’s contract
6. The Aggregate may double-dispatch to any provided Domain Services, instances of other Aggregates, etc., and will generate new Events as the outcome of the operation
7. Assuming no failed business operations, append all newly generated Events to the Stream using the Stream version to guard against concurrency conflicts
8. Publish newly appended Events from the Event Store to subscribers using your choice of messaging infrastructure
We could enhance our A+ES implementation using various options. For example, we could use a Repository to encapsulate access to the Event Store and the details of reconstitution of the Aggregate instances. Given the above code snippets, if would be easy for you to create a reusable Repository base class.
COuld you please add syntax highlighting?
Posted by: Mablae | 12/20/2012 at 05:06 PM
Very nice write up. However I think the "public partial class Customer", instead of using partial class, perhaps DCI is better suited.
http://en.wikipedia.org/wiki/Data,_context_and_interaction
Posted by: Jeff | 12/20/2012 at 09:34 PM