Total Order Broadcast via Conditional Writes
Or how to derive distributed consensus from DynamoDB’s consistency model.
Disclaimer: This is likely not a feasible idea if the traffic to handle is over some one hundred records per second. But it is worthy of a POC, and it is neat enough academically to walk through.
Problem Statement
So, for Event Sourcing to work correctly, it is essential for the commands the core system sees to come in a totally ordered way. Without loss of generality, it would be great to have each command come with its own, pre-stamped, Sequence ID, so that:
Once a Sequence ID is assigned to a certain command, it always points to this command,
Once a command is assigned a Sequence ID, this command always has this Sequence ID,
There are no gaps in the sequence of Sequence IDs, and
The exactly-once guarantee, via Idempotency tokens, is respected at level of the component that assigns Sequence IDs, so that if the same command it sent to it again, instead of having a new Sequence ID assigned to it, the previously assigned one is returned.
(In fact, the above is a slightly more restrictive set of requirements. But it is easier to reason about the top-level problem if these restrictive requirements hold true.)
It also goes without saying that, in addition to the correctness requirements above:
The commands with the Sequence IDs are persisted, so that is it always possible to (quickly) retrieve a command by its Sequence ID, and
A real-time push-enabled streaming interface is exposed, so that the listener / subscriber / consumer can receive new commands as their Sequence IDs are stamped with low latency.
Since correct Sequence ID stamping is the core of the system, the logic that stamps them has to be durable and fault-tolerant. This is where we enter the territory of leader elections and distributed consensus: ensuring that the system is self-healing, so that even when parts of the system fail, it continues to behave correctly as seen by the end users, with little or no performance degradation.
(Academically speaking, Total Order Broadcast as the problem is identical to the problem of distributed consensus, since from one of these problems solved correctly the solution to the other one can be derived. It is a major leap of faith in reasoning about distributed systems that instead of talking about “consensus” applies to the contents of the database one can talk about the total order of commands, from which the DB contents consensus follows naturally in one step. But this is a subject to a different, and longer, post.)
Naturally, as we are not just academically thinking theoreticians, but also engineers, we need to build this very Sequence-ID-assigning component. For the record, I have not seen a canonical name for it in the industry juts yet. Most commonly used are the terms Indexes or Sequencer. I prefer to use the term Index Stamper, since this component is effectively a pass-through bus that “stamps” the indexes onto each message (command) that goes through it.
A Non-Solution
If I were to POC such a system today, I would simply use a single-node single-threaded “critical path” loop, where one CPU thread is kept busy with one task and one task only: stamping sequence IDs.
With proper outside-main-loop binary format massaging and batch data prefetching, it is rather trivial to stamp indexes at the rate far exceeding one gigabyte of data per second, which is more than enough for the vast majority of modern problems.
After all, the payloads of commands rarely exceed a kilobyte of size on average (resulting in 1M+ indexes stamped per second), and, if they do, it is easy to first store their bodies in some key-value store, and then stamp indexes on on the very command bodies, that will be ~dozens of bytes.
If traffic requirements are not that tight, I would seriously consider building this POC based on a single instance of Redis, with Lua doing all the job of checking the idempotency token and stamping the next available index in a single “Redis transaction”.
Unfortunately, lucrative as these solutions are, they are not safe enough for production use. For a single critical reason: they can not hold their consistency guaranteed if this single node goes down.
Yes, the consistency requirements are extremely tight for Event-Sourcing-enabled systems. Even a single event lost can cause major trouble down the road; and having the same Sequence ID stamped onto two different commands is a disaster in the making.
Even though Redis Cluster is generally praised, it does not offer the guarantees required to run the system at scale. And custom solutions for leader re-election and stamped indexes reconciliation are just too hard to make into the POC.
The Canonical Solution: A Message Bus
The safest solution is to build the Index Stamper based on an existing primitive that has leader election guarantees built into itself.
The canonical solution might be to use a single-topic single-partition Kafka. Redpanda would do just fine, if you need more throughput.
It is not as trivial though, since we need to check the idempotency token before pushing the freshly arrived command into this topic. It may happen that multiple nodes have independently checked the idempotency tokens, and then, since both of them have received the confirmation that this idempotency token was not seen before, both may add this same command into this single partition of a single topic. And both Kafka and Redpanda are by design not intelligent enough, and they can not deduplicate messages based on their contents. For more details, read up on the “dumb broker smart consumer” paradigm.
In fact, the easiest way to build a strongly consistent distributed system that will respect idempotency tokens correctly is to use two single-topic single-partition streams. One of them is the “input stream”, where duplicate commands may emerge. But this “input stream” stamps the Kafka offset onto each message that enters it, and then another processor goes through this input stream and sanitizes it, eliminating the duplicates.
Another Canonical Solution: A Strongly Consistent DB
Another canonical solution, albeit expensive, may be to use the database that guarantees strong consistency in a distributed setting.
For example, Google Spanner, with its external consistency approach, may well be the Holy Grail, that can solve many distributed consensus problems including this one.
You may, and likely will, be [un]pleasantly surprised by both the cost (high) and the throughput (low) of a Spanner-based Index stemming engine. This is understandable though: Spanner is so powerful by itself that it is far better to use is as, well, a relational database, not just as a single component of an Event-Sourcing-powered system.
Nonetheless, if your desired throughput is low, if you are on Google Cloud, and if you already have Google Spanner spun up for some tasks, adding another single-digit TPS to it to solve the Total Order Broadcast problem may well be the cleanest solution.
Another Canonical Solution: Via a Replicated State Machine
Another path worth taking is to consider an existing off-the-shelf solution that is built around maintaining a distributed, consistent, replicated state machine.
I have seen several products aiming at solving this problem so far. Having not touched any of them myself, I’ll refrain from comparing the pros and cons.
I will say though, that if a replicated state machine framework that you are considering offers a powerful programming language, this is a big plus, as this very language can be leveraged to take on quite some work, which an Event Sourcing system may have to do down the road.
For instance, in addition to ensuring that the provided idempotency token was not seen before, this replicated state machine runner may confirm whether the actor that attempts to execute a certain command — by pushing this command into the queue where it gets its Sequence ID stamped — is allowed to perform this very command.
If the set of possible actors and commands is small (which is often the case), it is reasonable to expect that the data required to authorize this particular command to be kept and maintained locally, on each node that runs this replicated state machine. Even if this node might not fully endorse further execution of this command, it may at least pre-authorize and greenlight it, marking the command as having passed some preliminary checks, saving on the CPU cycles later down the road as this command reaches the critical path of processing under the Event Sourcing paradigm.
And Another Canonical Solution
Given my architectural curiosity, I can not skip mentioning another possible solution to the distributed total order broadcast problem: the blockchain!
To spare the details, as this is off the main flow, I wrote about this idea in detail in a LinkedIn post.
The POC Experiment
Now, the purpose of this post is to share the idea I've been thinking of recently: what if we could use AWS DynamoDB conditional writes as the linearization primitive?
Sure, consistent operations on DynamoDB cost twice as much. But it’s a reasonable price to pay to not have to deal with leader elections, with distributed consensus, and/or with replicated state machines.
In fact, it is possible to use DynamoDB to solve the Index Stamping problem even without using its Transactions functionality, which allows operations on multiple items.
The “happy path” flow would look like this:
There is a dedicated key that stores the next unassigned index. Say,
“eventstore-key:total”
. Read it, in a strongly consistent way. Say, the value is42
.Attempt to write the command under the key
“eventstore-command:42”.
Condition: this key should not exist.If the write succeeds, update
“eventstore-key:total”
to43
. Condition: If the value is still42
.
Even this “trivial” implementation may contain bugs. To begin with, the node that executes this happy path logic can die. If it dies between steps (2) and (3), before (3) is done, no new data can be written to this event store, as it is now in the “logical deadlock” state. So, the logic has to be amended:
If the write fails, read the value from
“eventstore-key:total”
again. If this value has changed since the last read in step (1), go to step (2).If the value has not changed, we are in a situation where some other node has not done its job yet. First of all, it may be worth waiting a few hundred milliseconds and trying step (4) again, maybe several times repeatedly. If this does not succeed, go to the next step.
Failover mode. See if there is anything under the key
“eventstore-command:42”
. If there is, execute the write of43
into the key“eventstore-key:total”
, conditional on that the value there is still42
.
In a nutshell, the above offers the “optimistic locking” protocol for concurrent writes. If there are multiple writers, but little or no contention, they all operate independently, without being aware of each other’s presence. But if the traffic grows larger, they begin stepping on each other’s toes more and more frequently. This is the “failure mode” of the DynamoDB-based index stamping design. Hey, I’ve never said it’s a perfect design! Just a useful thought experiment to go through.
With a bit of creativity, it is easy to extend the above protocol so that multiple writers could “cooperate”, using the very DynamoDB as the means for synchronization.
For example, the writer may well add some metadata right next to the message, under the “eventstore-command:*”
key, such as its own machine name, software version, some run ID, and the wall time when it was started.
Then, on step (6), instead of checking whether the value for the “eventstore-command:${eventstore-key:total}”
key exists, the code may well read this key. If it discovers that this key was written by the instance of the code that is of a newer version and/or by the instance of the code that was started later, the presently running instance may well yield control, or even gracefully terminate itself. So that, through the load balancer, further commands go to the more up-to-date version of the code, and contention is minimized.
Idempotency tokens, of course, can also be stored in the same DynamoDB. In fact, in some “eventstore-idempotencykey:*”
key space, the value for each key may well be just that: the unique sequence ID index that was previously stamped for this very command body. The failover / recovery logic will become even less trivial, since it is now important to maintain “transactional guarantees” between not just the “contents” of the event store and its size kept under the “eventstore-key:total”
key, but also between the key space that has to do with previously seen idempotency tokens and the very commands the index for which have already been stamped.
Of course, to ensure correctness, both on the happy path and on the failover path, the reads from DynamoDB would need to be strongly consistent. Eventually consistent reads would be encouraged when one needs to grab the sequence ID of a command that is well known to have been persisted into the event store a while back.
For streaming, DynamoDB can natively send the stream of changes to it into Kinesis. Realistically though, they may well be too much. The component that writes to DynamoDB may well keep a ring buffer for the past several hours of commands in memory. This way, it could natively send a stream of commands, in the “tail -f”
fashion, to its clients, maybe via a streaming gRPC interface, avoiding any and all Kafkaesque consumer groups logic.
Last but not least: snapshots. Naturally, if a client that requests commands from a certain sequence ID is too much behind, it is infeasible to read a large number of DynamoDB keys to fetch all command bodies. Instead, cheap S3 snapshots can be used.
The streaming gRPC response to the “please stream me all the commands from sequence ID 42” may well then be:
Either: “The current sequence ID is
123
, so here you go:data_42
,data_43
,data_44
, …”,Or: “You are too far behind, please parse commands 42 .. 9999 from
s3://eventstore-commands/snapshot-0-9999.bin
and come back”.
The client then gets the best of both worlds: compact and fast-to-crunch snapshots downloaded from off the critical path when they are behind, combined with a low-latency stream of the current events if they only skipped a few.
I would like to reiterate that this section outlines a POC. The DynamoDB-based solution will likely be no cheaper than the Kafka- or Redpanda-based one; and neither will it hold more commands per second.
It just is something that can be POD’d in a single evening. And I, for one, am finding such exercised both useful and rewarding.