One of my favorite things about F# is the ease and pleasure it brings to the otherwise tricky task of asynchronous programming. Using async workflows/computation expressions, mailbox processors (agents) and async combinators makes async, parallel and concurrent programming a total joy in F#.
Outside of programming and family life, another joy of mine is listening to and playing (albeit not so much these days) music. I played Double Bass on the Irish jazz scene up until about 7 years ago when I got back into software development.
That’s me on the Double Bass around 2004 looking a lot younger!
For a few years, music was my life and, when not practicing on my own, I would be rehearsing and playing gigs or just getting together with musician friends to have a jam session. A jam session would be when we would get together in someone’s house and just play music until the early hours or else it could be more official like an open session at a venue around town.
When I was thinking about what I would write about for my F# advent calendar post, I wanted to do something involving async but I also wanted a little app or toy project to use as a playground to show some async in practice. I got reminiscing on my music days and I was thinking wouldn’t it be cool to have an app that would help musicians get together for these jam sessions! Back then, I was living in Dublin and there were a lot of musicians moving to and from there all the time. So, what about an app that could track musician availability at different locations and suggest other musicians for them to meet up with for jam sessions. For this post, I built a prototype for this idea using two distributed processes, CQRS (command query responsibility segregation), and Azure Service Bus to asynchronously propagate events from one process to the other. Who knows, maybe I’ll eventually build out the idea into a full production app!
Introducing the Social Music platform!
In order to set the stage (pardon the pun!), lets imagine that this app which gets musicians together for jam sessions is part of a bigger platform called “Social Music”.
One part of the system – SocialMusicLocations – is a process (or multiple instances of this process) which is in charge of the being the source of truth on everything that happens with locations (or a subset of locations if we were to make this into multiple instances). It processes commands related to locations, transforms these commands into events related to these locations, stores them in an immutable event store and also propagates these events to other processes that use them in order to store and serve up a view/read model that their clients would be interested in. SocialMusicMatchMaker is one process which receives these event propagations. It generates its own current state based on events that it receives and this state is made available via an HTTP API for its consumers – e.g a consumer could be a web/mobile app, “LetsGetJammin”, suggesting possible other musicians for users to get together with to have a jam session!
The events propagation from SocialMusicLocations to SocialMusicMatchMaker is where async processing comes into play. Within SocialMusicLocations, there is an F# mailbox processor receiving events into its mailbox and asynchronously storing these events to an event store and also propagating them as messages to an Azure service bus queue.
To keep things simple, the events store in SocialMusicLocations is an in memory store using NEventStore. Also the locations read model within SocialMusicMatchMaker will just use in memory persistence with a .NET Dictionary.
Also, the code for each of these processes will live separate solutions each of which being a.NET Core 2.0 solution. So, when the Social Music platform takes off and there’s billions of users, we can just scale out these processes with docker containers or whatever the cool kid on the block is in container technology by then .
The code for these two solutions is available on my github here. For the rest of this blog post, I will go through some of the more interesting parts of the code and show some async, CQRS and domain modelling in action.
The domain for this is very simple consisting of definitions for a musician, instrument, location, events, commands and a state that is an aggregation of previous events. In order to track changes in musician presence within locations over time, I’ve kept it really simple and just have two commands and associated events for musicians registering to a location or deregistering from a location that they are currently registered to. So from the Musician type down through the command and event types, there is the simple idea of a musician being one who is registered (or being registered through a command) to a location or one who is deregistered (or being deregistered through a command).
I’m also representing errors with a DU which has just one case for the error that will arise if a command attempts to deregister a musician from an empty location.
The command handing is two stage operation consisting of the functions above. First of all the CommandHandler.handle function takes a current state and a new command to either register or deregister a musician from a location. Based on these inputs, it generates a list of events as, if you like, a recording of the operation of applying the command. It then folds across these events calling StateGeneration.apply to transform the state along the way until it ends up with the final state. If everything is ok, the CommandHandler.handle function returns the events and final state wrapped in an F# Result.Ok. If there is an error in processing the command – for example if an attempt is made to deregister a musician from an empty location – the CommandHandler.handle function returns this error wrapped in an F# Result.Error.
The code for the CommandHandler.handle function is shown below:
The Event Store
All events generated by the command handling are stored per location. To get the current state of a location, it is a matter of starting with an initial empty location and applying all the events from that location using the StateGeneration.appy function again.
The event store is defined as a type with two functions to save events and generate state as shown above. For the purpose of this prototype, I used NEventStore which provides an in-memory event store as a stream of events corresponding to an ID. I used the location as the id in a simplistic way shown below so that events can be stored per location.
The implementation details of calling out to NEventStore’s store and retrieve mechanism are hidden behind the Store type that I showed above (full source code is available on my github here).
Mailbox Processor to Propagate Events
A mailbox processor in F# is like a state machine. When a mailbox processor instance is created, a function is supplied with the type:
MailboxProcessor<‘Msg> -> Async<unit>.
Its within this function where the action happens usually in the form of a recursive function that the programmer defines usually called
which loops with each loop running in an async block/computation expression. So with each loop, a thread from a .NET thread pool is supplied.
is essentially like an inbox and provides an inbox.Receive() function which blocks until a new message of type ‘Msg arrives. This allows you to write code in an async block which will wait for the next message, handle it and take action to possibly update state, and recursively call the loop function with this new state. The mailbox processor used to propagate the event messages for SocialMusicLocations doesn’t have state which is passed between calls to the loop function – it uses CommandHandler.handle shown earlier to generate events and new state and performs a side effect operation of saving to the in memory event store and sending a message for each event to an Azure service bus queue.
The ‘Msg type that a mailbox processor handles is defined by the programmer and, for SocialMusicLocations, it is defined as:
The Stop message will be used to tell the mailbox processor to stop processing any more messages.
The PostCommand DU case above is a message consisting of the domain model Command along with an AsyncReplyChannel which can carry an AgentResponse.
AgentResponse is defined as:
The mailbox processor is encapsulated in a type which I called Agent. This is where the OO capabilities of F# work really well for encapsulating the fact that a mailbox processor is being used for async processing. The messages that I showed earlier, which can be sent to the mailbox processor, are encapsulated behind methods of the Agent class. This class is defined as follows:
let! message = self.Receive()
the async computation being executed can wait in a non blocking fashion – the thread currently being used can be given back to the thread pool until a message is received – in which case, another available thread from the thread pool (or possibly the same thread again) can be used to process the next part of the async computation.
The helper function, propagateEvent, is used to serialize an event to json, add a timestamp and send it to the Azure service bus queue. The timestamp being added can be used by message receivers to maintain ordering across a location as, without using Azure service bus sessions, order is not guaranteed. In the code above, the pattern discussed earlier of using the recursive loop function to process mailbox messages can be seen. Once a message is received, the Command it contains can be processed unless it is a Stop message – in which case, the recursive looping is stopped.
The Http Command API
An api to accept the commands to register or deregister musicians is provided by a simple Rest API implemented with Suave
Suave provides a really nice way of composing together a web application with the concept of Web Parts. There’s a lot of great documentation and tutorials on Suave out there – a really great one that I totally recommend is this course on FSharp TV.
The HTTP command API also acts as a boundary to make sure that any commands that get past it will be valid Commands according to the domain model shown earlier. For this I used the applicative pattern of wrapping a constructor function inside a Result type and applying this across its arguments which, themselves, are each pumped through their own validation function to decide if they are valid or not by outputting the same Result type.
This may sound a bit abstract and vague for anyone who hasn’t seen this before – I know it did for me. The best resources I found for learning these kind of patterns are the chapters on Functors, Applicatives and Monads from Haskell Programming from First Priciples and also Scott Wlaschin’s series on map, bind and apply.
The Result type, called ValidationResult is a wrapper type which has a case for wrapping a successfully validated command and also a case for wrapping a list of errors that have been collected along the way.
The constructor function to create a domain Command type is as follows:
In order for the applicative pattern to work, each of the parameters to this constructor function needs to be wrapped in the same ValidationResult type – in the case where no validation is required, wrapping the parameter in the Success DU case will suffice.
So for each parameter, I have an associated validation function, most of which, convert strings into appropriate DU types as follows:
The little engine room of the applicative approach is defined in ValidationResult.apply as follows:
The apply function takes two ValidationResult types :
- f is a wrapper for a function and, at runtime, this wrapper will either be the Success ValidationResult case or it may be the Error case.
- validationResult is a wrapper for a value – wrapping either the value or a list of errors associated with trying to produce this value. When Validation.appy is used with the toCommand constructor function, each time Validation.apply is called, this validatationResult will be the result of validating an individual argument to the toCommand function using one of the validation functions shown earlier, e.g validateInstrument.
In order to chain up successive calls to Validation.apply, I’ve add just a dash of ML soup to make things a bit cleaner with defining a local operator.
So, now the whole validateCommand function is as follows:
Pattern matching withing the ValidationResult.apply function means that any errors that are encountered with successive calls to ValidationResult.apply are appended together.
So, after calling the validateCommand function, you either end up with a ValidationResult which wraps a valid Command type or a list of errors collected along the way.
SocialMusicMatchMaker is an entirely separate .NET sln and separate process. It asynchronously consumes messages from the Azure service bus queue that SocialMusicLocations sends messages to. On consuming these messages, it updates a read model which is a representation of the current state of locations. It also uses Suave to expose an HTTP Rest Api to query this read model.
SocialMusicMatchMaker has it’s own domain which is effectively a read model snapshot of the source of truth that it is consuming from Azure service bus queue in the form of messages sent by SocialMusicLocations. Its domain model is similar to that of SocialMusicLocations except for a few notable differences. For example, the Musician type is represented as a straight record instead of a DU because there isn’t the concept of a registered or deregistered musician in the read model – it is simply musicians keyed by location.
Also the Event type is slightly different in SocialMusicMatchMaker because the timestamp is added to each Event DU case. I will show shortly how this is used when updating the read model.
In the last section, you can see that the Event type has an associated module with constructor functions for each Event DU case. This is because I follow the same pattern for validation in SocialMusicMatchMaker that I have already shown for SocialMusicLocations using an applicative pattern.
In order to turn a service bus message into one of these Event DU cases, the message is first validated. This time round, since the likelihood of receiving an invalid message is very slim in this prototype system, I used the F# option type instead of using a ValidationResult type. So if there are any validation errors, a None will be produced and the message is effectively ignored.
I enhanced the Option module with an apply function like I showed earlier for ValidationResult.apply:
Each part of the service bus message that needs to be validated has a validation function that returns an Option:
This time round, for musician name I went a step further and created a single case DU with a private data constructor so that only the type is available outside the module SocialMusicMatchMaker.Core.Domain, and not the value/data constructor.
The constructor function, create, is a smart constructor which only allows valid Name values to be created.
We still need to be able to pattern match on a Name type outside of this module. This can be enabled with an active pattern:
The other parts that make up a musician registered or deregistered event have similar validation functions which also return None for an invalid value:
A valid MusicianRegistered or MusicianDeregistered event can then be created by putting the applicative pattern into action again:
Again, I’ve added a dash of ML soup with (<*>) to make the chaining up of the Option.apply calls a bit easier.
So where are – MusicianRegistered propagationMessage – and – MusicianDeRegistered propagationMessage – coming from. They are types I added to enable deserializing of the service bus message. To enable the pattern matching above on – match message – I used active patterns which check for a messages that we are interested in by checking the message label and attempting to deserialize the message body:
The Azure Service Bus Consumer
To consume from the Azure service bus queue, I used a handy library, Microsoft.Azure.ServiceBus
This makes consuming messages very straight forward by creating a MessageReceiver and registering a message handler function:
The message handler above converts service bus messages that we are interested in into events and then projects these events to update the read model which is the last part I’ll talk about next.
Updating the Read Model
The read model is abstracted behind a simple interface representing a data store that allows for saving of a musician with a timestamp to a location, removing a musician from a location and also getting all musicians for a location – which facilitates the HTTP Rest api to get all musicians currently in a location.
For the purpose of the prototype, I implemented an in memory implementation of this interface with a simple Dictionary:
and a crude transaction implementation using locking:
The implementations of the functions from the DataStore interface use basic operations on a .NET Dictionary. The only extra bit is that the timestamps are checked so that only an event relating to a musician with a timestamp after the latest timestamp for this musician is considered.
The function that is used by the service bus consumer to call out to the data store is as follows:
There is not much to that function – it simply calls out to the relevant functions of the DataStore type that it is injected with.
I really enjoyed creating this little prototype system and its something I will expand upon – like adding a mobile UI to consume from SocialMusicMatchMaker in the form of a “Lets Get Jammin'” app! I would also like to explore using different technologies for distributed event propagation, for example Apache Kafka.
The full source code for this prototype is available on my github here.
I hope you have gotten something out of reading this. Thanks so Sergey Tihon for organizing the F# Advent Calendar and for all the great work he does with his regular updates on F# Weekly
Thanks for taking time to read this!