On Transactions DSL for Event Sourcing
What is that state that has to be mutated, and how do we mutate it?
This should be about the last post I publish on the topic of Event Sourcing this year.
Recap
So far, in the previous post, we established that the two pillars to Event Sourcing are:
Use Total Order Broadcast (TOB), so that the commands come from an immutable append-only stream, with their indexes stamped, and
The Operations / Commands / Transactions on the underlying data structures are reproducible.
One insight to note right away is that as long as the commands enter the system after the TOB engine has stamped sequence ID indexes onto them, the “critical path”, innermost layer of the system does not need to solve the leader elections problem. More on this later.
The fundamental idea is that the single source of truth for the data is no longer in the database. It is in the queue! Exactly as we outlined back in 2015:
In-Memory Processing
The “working set” of the data is kept entirely in memory.
This may sound controversial, but keep in mind that the primary purpose of the component that executes the commands (or “performs the transactions”) is not to maintain the data in some database.
In fact, maintaining the data in some databases is the job of various materialized views. It is the cornerstone of the Event Sourcing design that materialized views are first class citizens. Any and all [eventually consistent] reads go through one of them.
Materialized views are cheap to spin up and cheap to maintain. Moreover, they come in all shapes and forms, from RDBMSes and NoSQL storages, through ElasticSearch-style custom indexes, all the way to a piece of code that fits on screen and maintains “just a hashmap” of some slice of data, with ultra-high-QPS read throughput.
Also, while materialized views serve “eventually consistent” reads, the guarantees on their consistency are pretty tight. First of all, they are far below one second in practice.
Besides, if the caller knows the sequence ID (or the “offset”, or the “epoch”) of the version of data they would like to read, the eventually consistent read request may indicate that they want the data that is not older than this epoch index, via some Epoch-At-Least
header.
This immediately solves the Causality Problem, as well as the more narrow problem that each client is guaranteed to Read Their Own Writes. So that if Alice sent money to Bob and then immediately wants to see the balance of her account, the device (or a web browser) that she is using knows to request the data at the state no older than the epoch with the confirmation of her sending money out.
Of course, if a truly consistent read is needed, it has to be a proper command, also known as a “transaction”. It will get its own sequence ID stamped by the Total Order Broadcast engine, and it will be executed as its turn comes.
Consistent reads are generally about as expensive as transactions, and they do not automagically scale horizontally.
Source of Truth
So what is the primary purpose of executing commands if not to execute the commands?
It is to generate the stream of command execution results.
Each command that had entered the system and had its sequence ID stamped will eventually be executed. Once executed, it will produce two pieces of derived data, into two different output streams, under the same sequence ID as the ID of the command itself.
The response to be sent to the caller. For instance, if the semantics of the command was
POST /user ${body}
, then the response, logically speaking, could be some200 OK {“user_id”:100042}
, or some409 CONFLICT {“error”:”EMAIL_ALREADY_REGISTERED”}
.The set of mutations to the underlying data store. Continuing with the example from (1) above, the set of mutations could be empty if the user was not added, or it would contain the “fact” that a certain blob of data for the newly created user has to be persisted.
The set of mutations may well contain a lot more than merely a single insertion into a dictionary. For instance, maybe the system is keeping track of the number of users registered per different top-level email domains. It may also maintain the list of top ten of these domains, for a dashboard.
In this case the “fact” that a certain counter is incremented will also be present in the set of mutations. For the “top-ten” data structure, which may well be a heap, the set of mutations would reflect the updates to the structure of the binary tree that represents this heap, should this particular user registration alter “the leaderboard”.
The most important insight from this part is that producing the next record to be appended to the stream of mutations to the data is more important than maintaining the data itself.
The node that has just executed this command may well disappear from the face of Earth right after publishing this “set of mutations” record to the output stream. In this case, a new leader node will take over running the commands, and it will start from the very next command.
Or the original node may have died mid-way of executing the command. In this case a new node would have to take over, and it will start from executing this very command on which the original node has died.
The truth is that:
The data that was stored in memory of the node that executed the command is only of interest to this very node, because
This data is only valuable for this very node to correctly produce the response and the set of mutations for the next command to come.
So, if a node dies, its data disappears with it. It was ephemeral in the first place. The streams, of command and of responses and mutations, are where the true state lives.
Of course, the above is a lot easier to implement if the transaction execution logic is reproducible. Simply put, if the node that was running the transaction has died after writing to the responses stream, but not to the mutations stream, the node that takes over would write to both streams too.
If the execution logic is reproducible, this accidental “double-processing” is totally acceptable, since the end user would not notice it. If the logic is not reproducible, then, to ensure correctness, extra logic would have to be introduced.
In practice, it would be sufficient to join the responses stream and the mutations stream into one, making it impossible to overwrite whatever was written into it before, by the same sequence ID. This is doable. Dealing with reproducible commands is just easier. It’s also more performant, as materialized views maintainers do not need to concern themselves with what were the responses sent to the callers; they only care about the modifications to the state to serve their read requests from.
Distributed Consensus
In the Recap section at the very top, I mentioned that the innermost layer of the system does not need to solve the leader elections problem. This may also sound controversial, and this also requires a clarification.
On the one hand, of course, some form of leader elections on the side that executes the commands would be necessary. After all, what do we do if the node that is currently assigned to execute the commands has died? Or what if it’s just too slow to respond?
Even worse: What if that node went blank, the system has moved on with another node as the primary transactions executor, and then the original node comes back into play, as good and new, albeit way behind the current state?
Well, two things. Two and a half, actually.
First, once this original node begins processing commands, it will immediately observe that the command that it has just processed was already taken care of. It would see this upon attempting to persist the result of execution of this command.
Because the response from the stream of mutations (and/or from the stream of responses) would be “persisted, thank you, and, FYI, this blob of data was already added”.
This is where the first-and-a-half thing comes into play. Not only this node would know it was “logically evicted”, as someone else, a new “leader” in processing the commands, is already doing its job. This node would also know who this new leader is, as of the time that command was executed. As long as the record persisted into the “mutations” or the “results” contains the information about it, which it should.
Moreover, if the system has some “logical orchestrator”, then this extra information would, in addition to the ID of the “leader” as of that particular sequence ID, contain the “system topology epoch” index.
Every time the leader changes, this “system topology epoch index” is incremented. There is more to be said about this concept. For example, when a new type of command is added, or when the schema of the commands or responses or of data is altered, this also triggers an increment of this “system topology epoch index”. For now though, it would suffice to note that not only our original node knows it is no longer responsible for executing the commands; it can also confirm that the “leader transition” operation has completed successfully, and that the new leader has successfully taken up on their task, as evident by the increased index of the “current system topology epoch”.
This original node can then demote itself into a “hot standby”, if its currently running version of the code & data schema are compatible with the “current” epoch of the system topology. Or it can shut down gracefully, to be restarted automatically on an updated version of the code, as a new “hot standby”.
We are coming to the second thing: who and when and where performs the updates to this “system topology” journal”?
These questions are also straighforward.
Where? In the very commands log. The meta-command looks like this:
Node ABC.DEF.XYZ, please take over commands execution starting from sequence ID 123’456’789
. The now-hot-standby nodeABC.DEF.XYZ
is presently looking at the command123’456’789
and waiting for the respective record from the mutations stream. All it needs to do is execute this command itself, publish the result and the mutation into the respective streams, and continue operating as the leader.Who and when? An external observer, when it notices that the current leader has choked on the next command for an unacceptably long amount of time, say, 0.5 seconds. Yes, this can and should happen this quickly, because why wait?
Aha, you say! So there is an external observer! How do we know it’s alive and well, and don’t we need to have a leader-elected forum of multiple nodes, to ensure this external observer is always up and running?
Good questions. But the answer is: not really.
The magic is in the fact that we already have the Total Order Broadcast (TOB) engine, which stamps an always-increasing index on each command that enters the system. And the correctly implemented TOB Engine solves the problem of distributed consensus and of leader elections by itself.
So, yes, we may well have two or three observers, to be safe. Something would need to regularly watch the replication lag and other internal metrics on our nodes, if only to keep the dashboard and the alerts up to date. And yes, it would be one of these observers that will initiate state transition, as soon as it sees that the current commands processor has choked.
But these observers, even if there are many of them, do not need to agree on a quorum of any kind. Their commands can not and will not conflict, because these commands go through the very same totally ordered stream.
In the “worst case” scenario, multiple “observers” would “order” different hot standby nodes to assume ownership of executing commands from a certain sequence ID. And, alas, we have a pur last write wins scenario: in the stable state every node knows exactly what is the “current” (the “latest”) state of the fleet, since one and only of those multiple commands would have the highest sequence ID.
And even if a few user-initiated commands would end up being processed by more than one “temporary leader”, thanks to replayability, the end user would remain blissfully unaware of the fact that quite some reshuffling may have happened behind the scenes.
Of course, the above assumes we have the Total Order Broadcast engine up and running correctly. Yes, this is a hard problem by itself. But it has standard solutions. A single-topic single-partition Kafka cluster, with each write acknowledged by the majority of partitions, is good enough.
Data Types and Transactions Logic
We are now coming to:
What format do the entries of the mutations stream have, and
In what language are the very commands / transactions implemented.
Vision
TL;DR: The “good enough” solution here, is to use a custom-built DSL.
Blast to the Past
My earlier ideas on this date back to 2013. What we’ve built back in 2015 .. 2017 is:
A bunch of macros for custom storage schema definition, ref. code
Wrappers to use this storage locally and via the REST API, ref. code.
No leader elections, but the orchestrator, that is assumed to not die simultaneously with the leader node.
Effectively, this was a combination of the Stream — an immutable append-only sequence of records — and a Storage — an abstraction on top of it that adds persisted data structures and transactional guarantees to how they are mutated.
What went well:
Strong schema for the stored data, and for the CQRS commands mutating it. This absolutely did fly. Static typing, both for the contents of the in-memory views, and for the request/response blobs, does miracles. I can not stress enough how important type invariants are when dealing with the core data of a business.
The idea that a single transaction results in a single “atomic” mutation to the data, which is stored as a single entry in the mutations log. In other words, no matter how many changes have been made during the execution of a single command, these changes are batched together and considered an atomic “transaction” mutations-wise. On the side of the hot standby, or of a read replica, the mutations that belong to a single logical transaction can only be applied together, as applying them partially and/or performing a concurrent read query would result in a “dirty read”, or in reading otherwise corrupted data.
Separate the critical path from the rest of the logic. Data massaging, for example, happens before the “critical path”. Once all the mutations are performed, and the command/transaction is logically committed, further operations (such as sorting or formatting the results before returning them) also take place outside the “critical path”.
A meta-language to define transactions. On the low level we pretty much mimicked the interface of a
map<>
, which is a C++ dictionary container. Whatever changes made to each individualmap<>
were automatically journaled. Moreover, the “transaction” was trivial to “abort” (roll back) by throwing an exception. Once the “user code” is done executing, the accumulated list of mutations was either persisted to the “mutations” stream, or “applied back”, so that the in-memory data remains intact.PubSub-first. A major design consideration was that it is straightforward to implement a piece of code that subscribes to the feed of data updates. When operating outside our stack (say, in F#, which we did), the user is responsible for parsing the mutations log. Unlike today, back then maintaining custom materialized views was not something we considered important. We did export the entire schema in multiple programming languages though, which was a nice touch.
Back to 2023
Let me now outline how I would build the above today in the proof-of-concept / “gentle introduction” setting.
First of all, we need to identify the problems with the approach from 2015. I’ll present them loosely in the order in which they are worth taking care of.
Short-Term
The short-term objectives are what should be taken care of in the POC.
High Availability. Back then we relied on the orchestrator not dying at the same time as our leader does. Statistically speaking, this was — and is! — a very safe bet; this assumption offers many many nines after the decimal point. Nonetheless, for the system to truly be highly available and self-healing, this is not the shortcut I would be comfortable taking today. A proper Total Order Broadcast engine it should be.
Snapshots. Our implementation from eight years ago did not have data snapshots. Nor did it have binary-encoded blobs, for what it’s worth. The source of truth was effectively a single gigabytes-large one-JSON-per-line text file. Today we can do a lot better.
Richer data structures. Back then we only had Dictionaries and Relationships. Dictionaries are key-value stores (“just a hashmap!”), and relationships are effectively join tables between these dictionaries. Today, I would plan ahead and design the means to introduce new data structures incrementally, so that their logic can be managed “in the user space”, outside the “Event Sourcing framework”.
Materialized views as first-class citizens. This is a major extension to the “PubSub-first” approach. Back in 2015 we knew we needed to let users subscribe to strongly-typed streams of mutations. But that was it. Today, as an architect, I fully embrace the idea that the domain that owns certain data should also own the method and apparatus that builds and maintains the materialized views of this data. For the POC though an approach based on scaffolding would do.
Faster interfaces. We got away with “RESTful” HTTP APIs back in the days. Today, at least, gRPC is a de-facto standard. Streaming, of course, as one-off gRPC requests are just too slow. Also, I’m a big fan of sidecars and client libraries, as it benefits the design of the system greatly when whatever is best done on the caller side is done on the caller side.
Examples of data structures to add may include singly- and doubly-linked lists and priority queues right away. These are the familiar ones from Redis, priority queues are just the sorted sets.
Also, keep in mind that to be Event-Sourcing-friendly, the mutations to these data structures should be reproducible. In particular, it means that the order of iteration over the keys of a dictionary should be fixed; although I’m repeating myself.
Long-Term
My long term plans for Event Sourcing span decades into the future 🙂 and include, among other things, custom hardware and interplanetary storage & compute.
I will outline a few of them here. Just the ones that I am both comfortable to claim they would be essential for success, and the ones that I have a clear picture of how to implement.
These long term plans do not have to make it into any “gentle” POC whatsoever. But a well-done POC should be designed with these future requirements in mind.
Schema evolution support. Granted, we had it back in 2015, in quite a powerful way. But implementing deeply templated C++ code, albeit from an auto-generated boilerplate, was not a trivial task. Today, I would make it easy to evolve and obsolete some data, so that making use of this process is safe and effective.
Optimizations. There are tons of optimizations we did not do back in 2015. There are quite a few on my mind now, and today I would of course design the Event Sourcing framework keeping them in mind. Admittedly, no optimizations are needed to support “just” thousands of transactions per second. But to save the planet instead of warming it up by wasting CPU cycles, at least zero-copy binary-encoded commands are must-have in my book.
The DSL. The crown jewel of this design is the DSL to implement transactions.
The DSL
A domain-specific language used specifically to define the current and the future state of an Event-Sourcing-based system. This DSL covers many areas, specifically:
The schema.
Of the data stored in the system.
Including, ideally, extra invariants that can be statically checked.
Of the commands and queries and the responses to them.
The schema should keep in mind backwards- and forward compatibility.
This includes the evolution of the underlying data.
And the versioning of commands/queries.
The code to be executed on the “critical path”.
The very invariant checks and the very mutations of the underlying data.
A.k.a. the “transactions” business logic.
Generating the output to be sent back to the user.
The “materialized views” code and meta-code.
Materialized views code: Effectively, the “read path” for eventually consistent queries, as well as the “hot backup” state maintainer.
Meta-code: the template for the scaffolding component that other teams can use to spin up their own materialized view to later slice & dice the data as they please.
The slow-running schema harmonization code.
For the slow-running “schema migrations”, where in order to properly retire a certain field every single key of a respective data structure must be
touch
-ed via a logically no-op command that would “copy-on-write” the record from an old format into a new one.
The perfect DSL would have the following properties:
Ensure O(1) transaction execution. For instance, no for-loop over collections should be allowed. Overall, I’m thinking it should be a language of primitive recursive functions, which is not Turing-complete by design, but much easier to reason about. Also, it would be nice to statically prohibit commands/transactions that may take over some fixed number of CPU cycles. For bonus points, if high throughput is required, we can also prohibit commands from making too many “random reads” from various parts of the core data, to force the developers to maintain data locality wherever needed.
Easy to enforce invariants. Most invariants should be enforced by the storage type system. But there are always exceptions. Imagine the underlying data model contains the hierarchy of folders, but a folder can only be deleted if it is empty. It would be awesome if a language of formal proofs, such as Coq, could be used (and required during the presubmit checks / code review level!) to ensure no inconsistencies down the road.
JIT-friendly. Strictly speaking, JIT is unnecessary for the above system to function. Materialized views are quick to spin up, and local build-run-test iterations would naturally be quite fast. On the other hand, being able to run “custom” code against the presently-in-memory state of data on a local (or otherwise a non-prod) box would be a huge productivity boost. It’s okay if the JIT-compiled code is not optimized; what’s important is that one-line code changes are quick to integrate and run against the already-in-memory data.
At this point it’s worth noting that the “front-looking” shape and form of the DSL are not important. To a developer, The DSL itself look like Lisp or like Python or even like YAML. The idea would not change.
What I am convinced about is that it should be ESCaC, “Event Sourcing Configuration as Code”.
It’s a repository, where a single commit changing the schema and the code can go live upon approval as long as it passes all the checks. And force-commits should not be allowed. Although, realistically speaking, they would not be a problem: the very event log would contain the contents of that repository at the moment the new configuration was pushed.
Afterword
I meant to mention this in the text above, but somehow managed to keep it “simple”, albeit long. So, here it goes in the afterword.
The Event Sourcing way of working with the data is the functional programming way of designing a database.
And the state of the data, together with all the means to mutate and access it, is a Monad. Here. I’ve said it.
For almost a decade I keep dreaming of a world in which SQL is merely one of the ways to query the data, and mostly in the OLAP setting. OLTP transactions, on the other hand, should:
Be strongly consistent across all the domains of the business,
Always mutate the single source of truth of the data, to avoid data silos,
Run at high throughput and low latency, and
Allow for simple maintainability and extensibility.
In such a world distributed locks, two-phase commits, and compensating transactions are non-existent. Engineers focus on building their [micro]services, and occasionally they take a few hours of their time to extend the code that has to do with the code data of the system.
And this latter activity is a joy, in stark contrast to modern-day spaghetti code where relational and non-relational databases and caches and [thread-]local variables are all mixed up for the sake of faster delivery a few quarters ago.
In such a world the auxiliary code and the code that has to do with managing the distributed state machine of the whole business are strictly separated and follow different review / approval / deployment paths. Want to update the JSON format of your API in v2.1? Go ahead, you don’t need to touch the critical-path data mutations code a tiny bit. Need to extract another field from the source of truth? It’s a dedicated, albeit small, change, and make sure it’s reviewed carefully before making it to production … in a few minutes.
And in such a world all conversations about domain boundaries are resolved before they are started. They are resolved by asking one simple question: what, if any, new view of the data, with what throughput / latency / scalability characteristics, do we need to launch this new user-facing feature?
Because everybody knows that if we are talking about the data of any non-zero degree of importance, the single source of truth for this data is in the Event Log, and everything else is and should be derived from this source of truth via materialized views.
That’s what engineering enablement looks like to me. And I hope to live long enough to see this vision executed upon.