Implementing an event sourced domain model in F#

event sourcing, F#, DDD | March 11, 2018

This talk by Greg Young. If you haven't seen it, go and watch it now. I was very much enlightened by it and for this first post, would like to take you through my take on implementing an event sourced domain model in F#. I'll really stick to the no frills functionality for now. I'll elaborate on it in later posts. But let's get the core implemented first.

Types

There are two function signatures that we will need. The first one is the command handler function. This will give us a list of events that occured, based on the current state of the aggregate and the command that we need to handle. Handling the command might also fail, in case we're violating some domain rules. So we'll be needing the following types:

type CommandResult<'event, 'failure> =
| Ok of 'event seq
| Fail of 'failure

type CommandHandler<'command, 'event, 'state, 'failure> = 'command -> 'state -> CommandResult<'event, 'failure>

Now that we can produce lists of events, we can use these to get the new state of the aggregate. Our event handler function can do this for us. It takes the current state of the aggregate and an event, performs it's logic and returns to us the new state of the aggregate. It's signature:

type EventHandler<'state, 'event> = 'state -> 'event -> 'state

If we do this for each event in the stream of the aggregate, we'll end up with its current state. As Greg Young puts it, current state is a left fold of previous behaviours, "behaviours" being events. F# makes this easy for us.

let state' = Seq.fold eventHandler state events

He then went on to replace the state argument in the command handler function with the fold we're doing above.

// load up the events from the stored stream
let result = Seq.fold eventHandler initialState events |> commandHandler command
Looping the fold

I think we now have the simplest possible implementation, and it could perfectly work on its own. If we put up a REST API and run this code for each API call, we'd perfectly get by. It would also mean that we need to query and fold the entire event stream for each call, which is also fine. But there is a certain sentence in Greg Young's talk that caught my attention. He gives the example of taking a command, handling it and -in his words- "continue my fold" each time. To me this sounded like we're running in a loop, keeping the state between each call and just folding the produced events from the previous call on top of it. If you have long streams and you'd otherwise think of snapshotting to help performance, this would be enormously helpful. But how could this work? If you try to write some recursive function, you can pass the produced events forward, but where would the command then come from? And more importantly, how do you get the produced events out of there?

I found that F# helps us out here by giving us agents. We can use the typical recursive style function to pass the produced events forward, and the mailbox to receive commands from. We can also get the produced events back to the caller by means of a reply channel. Let's implement it.

let pass f x =
  f x |> ignore
  x

let okOrEmpty = function
| Ok events -> events 
| Fail _ -> Seq.empty

type Message<'command, 'event, 'failure> = 'command * AsyncReplyChannel<CommandResult<'event, 'failure>>
let agent =
  MailboxProcessor<Message<'command, 'event, 'failure>>.Start(fun commands ->
    let rec handler state events =
      async {            
        let state' = Seq.fold eventHandler state events
        let! (command, replyChannel) = commands.Receive()
        let reply = pass replyChannel.Reply >> okOrEmpty
        return! state'
          |> commandHandler command
          |> reply
          |> handler state'
      }
    handler initialState initialEvents
  )

The first two functions are helper functions, which are simple enough to understand. The Message type is what our mailbox will produce. We're expecting a command, and an async reply channel which we'll use to communicate the result back to the caller. The recursive handler function takes the current state and an event list. These are the events that will need to be folded to get the new state. That is in fact the first thing that the function will be doing. The first time we start our agent, we'll pass it some initial state, and an empty list of events. Or if we're starting from an existing stream, we'll pass it the list of existing events. Next we wait for a command to arrive. Once that happens, we also get the reply channel that we can use. The reply helper function lifts this channel so that after replying the result back to the caller, we filter out the resulting events. Our agent is only interested in folding events to get to new state, so we're just ignoring failure results. In essence, our agent does the following:

  1. Take current state and an event list and fold them to get the new state
  2. Take a command, and handle it to produce either a list of occurred events or a failure result
  3. Reply the result to the caller
  4. Take the new state, the list of occurred events, and use them in step 1

And we're now effectively done. We can extract the PostAndReply function of the agent into a handy function:

// signature: 'command -> CommandResult<'event, 'failure>
let handle command = agent.PostAndReply (fun replyChannel -> command, replyChannel)

and we can go ahead and send commands to it. The state of the aggregate is encapsulated by the agent and completely invisible to us. On the other hand, the command handler and event handler functions are just pure functions, which means testing them becomes really straightforward. The command handler takes some state and a command, and we just need to test that the correct events are produced. As for the event handler, we give it a state and an event, and we check that the correct state gets produced. There is very little setup to be done for these tests.

In the following posts, I'd like to talk about storing and retrieving events, and how communication between aggregates can work.