This post presents a practical and pragmatic solution to bring in the benefits of Event Sourcing into an organization. Incrementally, in a non-intrusive way, and with as little cost and disruption as possible.
Architecture
Separate the read path from the write path
In the true Event Sourcing paradigm, the stream of mutation requests to the data is the source of truth. This generally is too much to ask for the first step. A gentle way is to speak about separating the read path from the write path first.
The idea boils down to two simple steps.
One: Route the reads through logical read replicas.
Reads are eventually consistent by default. Strongly consistent reads are possible too, but are more expensive, as they increase contention. The end goal is to be infinitely scalable on the eventually consistent read path, while handling a sufficient number of mutations per second on the write path.
My ballpark estimate for the throughput of the write path is ~1K TPS trivial, ~50K TPS if "just" done right, ~1M+ TPS if throughput truly is the goal.Two: Make sure mutations are applied sequentially, from a single source.
Unless high-frequency trading levels of latency are essential, to handle the load of under ~50K TPS, at any given point in time most systems already have a single leader that is responsible for mutating the data. The key part here is to take this further and commit to processing these mutations sequentially.
At this point, the only "leap of faith" needed is to internalize the idea that committing to having one CPU core to perform all the critical-path mutations one after another is a liberating architectural decision, not a self-imposed constraint.
As I am writing this in 2023, V8 and Redis have proven beyond reasonable doubt that a single-threaded process can easily crunch through tens of thousands of mutations per seconds. It gets into millions of mutations per second when optimized, as soon as request preparation and cache warmup are handled in advance by a separate thread pool.
Besides, leader elections are a commodity these days, either with Raft (Consul, etcd) or with message-passing technologies that guarantee total order (Kafka, Redpanda). Therefore, the single CPU of a single machine is not a single point of failure. Had this been so, most RDBMS solutions, as well as Redis and Kafka clusters, would be vulnerable by design, which is the opposite of the real world situation.
The read replicas above are referred to as logical read replicas, since they do not have to be dedicated nodes. For performance reasons, it may be beneficial to keep several logical copies of data on the same node, so that in addition to handling the critical-path mutations this node alone can serve a high load of read requests.
It also goes without saying that eventually consistent reads can hit any of the replicas, and thus are infinitely scalable horizontally. Strongly consistent reads are possible too, although, logically speaking, they are about as expensive as mutations.
Produce the stream of all data mutation events
This is the second "leap of faith" step that is easy to see backwards, but requires quite some effort to justify up front.
Ultimately, if the data in question belongs to the domain that many users need to access, those users will begin asking to have a materialized view of your data for themselves.
If your data fits one database, and if this database offers native means to add a read replica, the problem is non-existent: you add this replica and you’re done.
However:
If your data does not reside in a database that has native means to add a read replica, you’re screwed.
If your data is split over more than one database, while the materialized view that your customer needs must be strongly consistent, you’re screwed. (Although, in this case, you are already screwed, since if you have more than one database, then you either have cross-DB transactions — bad — or you have potential inconsistency issues — ugly.)
If the materialized view the client team needs should be in a different format, and your database does not expose an easy-to-parse output stream, you’re screwed (although Debezeum might help), and finally
If your downstream user needs not just the “data changed” events, but why a particular bit of data has changed, you’re screwed. (For example, if some record was erased, was it really deleted, or just renamed, or migrated from one parent to another, with its primary ID changing in the meantime?)
One standard solution to this problem is to use the Transactional Outbox pattern, often referred to as just the “Outbox”. The idea is that prior to committing any changes to your database you also journal the very fact that these changes have been made, as a separate row in a separate table, as part of the same transaction in which the very data is changed. This separate table then contains what effectively is the journal of all the updates you have made to your data. Another, likely async, worker is tasked with scanning the rows from this journal table one by one, submitting them to a message broker or bus, and removing them from the source table (the “outbox”) as soon as the confirmation from the broker was received.
While the Outbox pattern solves the problem, it has three major shortcomings:
The developers must make sure to instrument every single code path of every single mutation, and ensure this remains the case in the future.
The Outbox pattern works if and only if all the data from your business domain is contained in the very database in which the Outbox table is created.
If the data contains hierarchical structures, who is responsible for the fanout when it comes to detecting which related entities may have been affected by a single mutation?
The last bullet point is important. Say, the “source” DB contains the data on folders, in a normalized way. The “DB” behind the materialized view, on the other hand, stores the data in a denormalized way, for faster querying. The data on folders is hierarchical, and some properties, say “is publicly accessible”, are inherited from the folders’ parent chain.
Therefore, moving a subfolder from one parent to another may well impact the “is publicly accessible” bit of data for a large number of folders; for almost the entire set of folders in the database, in an extreme case.
And seamlessly supporting data denormalization on the side of the downstream builder of the materialized view is a fundamental problem. I call it The Fanout Problem, and it does not stop surprising me how many people who work with data daily don’t understand the magnitude of it.
After much personal experience, I have converged to assume the Outbox is just a bad pattern. It is too easy to implement the wrong way. It requires a lot of work to implement correctly. And, most importantly, implementing the Gentle Event Sourcing © approach is something that can be done in about the same amount of time, while the rewards will be much greater.
Another standard solution to the problem of exposing a materialized view is CDC, the Change Data Capture.
Unlike in the Outbox pattern, with CDC, the design relies on the data update events exposed natively by the underlying database. Since most databases these days support leader-follower replication, it is safe to assume one can obtain access to this “raw” log and leverage it.
Therefore, the major pain point of the Outbox pattern — the risk of not capturing all the updates to the data — is non-existent with the CDC pattern.
However, the CDC pattern is not without its own problems. Specifically:
Much as with the Outbox pattern, CDC works well only if all the data is contained in the same database.
Much as with the Outbox pattern, CDC does not deal with the Fanout problem.
With CDC, it is often difficult or outright impossible to figure out the why behind a particular change.
Say, a record was erased from the database. Was it deleted by the user explicitly? Or wiped out by a cron job because it was not accessed for a long time? Or changed ownership, in a way that required changing its ID, so that another, identical, record was just created? What if the downstream user needs to display notifications, or even emails, based on the reason behind this particular deletion. The downstream service can not deduce this vital information from the CDC log entry alone, and it also can not query the original service to figure out what has happened — the record was just deleted, after all!
So, we are getting to the point where we can postulate the problem statement:
The data to work with should be stored in one database (this is something the developers will need to work on to refactor if and when necessary), and
Ideally, for each atomic mutation of the data, we need to know what has prompted this mutation.
Personally, I prefer a more condensed phrasing for the above:
Spinning up a new materialized view of your domain’s data should be a one-click operation.
Here, I admit to cheating a little. Technically, employing CDC does solve the problem of constructing the materialized view downstream. Neither the fanout problem nor explaining “the why” behind each particular mutation are not the requirements for building and maintaining that materialized view up to date.
Nonetheless, in my head, the problems are very tightly coupled together. If you own the data in some domain, and if you are approached with the ask of emitting mutation events to construct the materialized view downstream, chances are you will eventually encounter either or both of these problems: data fanout and explaining “the why”.
In the previous part, on separating the read and the write path, we talked about how easy it is to apply data mutation requests one after another, as soon as they come in in an ordered fashion. We are now getting to the gist of this section: a good design is to introduce this pre-ordered fashion of mutation events up front, before processing them. This is not a difficult task by itself — I will talk about the “listen to yourself” pattern down the text — and it greatly simplifies the task of building fully enriched, fanout-friendly materialized views of your data by any number of downstream clients later on.
Theory
This is a bit of an academic detour. I found out the hard way that quite a few conversations on this topic, when they go deep, tend to converge to first principles of computer science. Hence this chapter.
Produce the stream of all data mutation events
Let me justify the "stream of all data mutation events" part first, and then get to separating the read path from the write path.
Total Order Broadcast
The fundamental observation to justify producing the stream of mutation events as the first-class citizen concept is that the total order broadcast problem is equivalent to the distributed consensus problem.
The world of distributed engineering has internalized this in the past century; for more insight I recommend Martin Kleppmann's "Designing Data-Intensive Applications", Leslie Lamport's early papers on logical clocks and causality, and various blog posts explaining how Raft is easier to understand than Paxos. The blog posts have the benefit of having nice interactive visuals sometimes.
Since the systems we design are not only consistent, but also highly available and durable, the data will need to be replicated. In the vast majority of modern systems (in all of them, really, if we exclude truly decentralized blockchains), replication is based on the idea of the replication log.
The replication log is the stream of mutations that the leader has applied, broadcast from this leader to the replicas, so that the replicas can apply the same set of mutations in the same order and arrive at the same state of data.
To ensure data integrity and transactionality, the data in the replication log is generally broken down into blocks, where the block itself is not meant to be split further. For instance, if Alice sends Bob three units of something, it is desirable that the readers can see the combination of { Alice, Bob } balances either pre-transaction or post-transaction, but not in between. This eliminates the failure modes known in the SQL world as phantom writes or dirty reads.
For the sake of completeness, it is important to note that "the reader is guaranteed to see the consistent state of data from some past moment in time" is not the requirement for the term "eventual consistency", even though most systems these days interpret "eventual consistency" in a way that implies the above statement to be true.
Replayability, Reproducibility, and Determinism
The next question to ask is: why would we want to linearize the log of mutations as they were applied by the leader, if instead we may well stamp total order indexes to the incoming stream of mutation requests which the leader then consumes in the respective order?
To keep this chapter academia-grade, an important comment has to be made here. For this paradigm shift to not introduce bugs, the mutations which the leader sees have to be replayable, so that the resulting results are reproducible. This has many practical implications.
The trivial ones are that referring to "local time" or using random number generations is prohibited in the "transaction running logic". Random numbers should be replaced by pseudo-random numbers, and instead of using the "local time" the "sequence ID" of the mutation request can be used.
If and when the exact time is needed, there is no harm in introducing "heartbeat" events of a desired frequency, so that, every, say, second, the "metronome tick" comes in, allowing the user to map the current message sequence ID to Unix epoch time in seconds in a 100% reproducible way. Also, the use of floating point arithmetic is discouraged, since the CPUs of different architectures may produce different results, which can and will result in nasty discrepancies down the road.
Non-trivial issues with respect to reproducibility have to do with incidental randomness, which is often introduced by various data structures without the developer's consent, and which developers tend to overlook later on. Of course, if the CPU architectures are identical on both the sender side and the receiver side, one has to work hard to write the code in a way that can introduce discrepancies; for instance, the floating-point arithmetic can be used "safely" in this setup.
However, if, say, a hashmap is used (and hashmaps are indeed used a lot!), the hash value for a string can be different on an x64 machine vs. an arm machine. Big- vs. little-endianness can easily creep in too. Thus, something as benign as the order of iterating over a hashmap may, to the developer's surprise, be different from an invocation to an invocation, affecting real-life tasks such as pagination in various weird ways. Same logic applies to xor-lists (that depend on the layout of data in memory), skip lists (which need to be non-random), disjoint sets (which are harder to implement deterministically, but it is required for replayability), etc.
Beyond Transactional Outbox
Back to the main plot. When all the computation is deterministic to the degree that replayability is ensured, having the total order stamped on the incoming requests is sufficient.
In practice, the ultimate system design may or may not use the incoming requests flow as the source of the replication log.
For example, if a certain transaction type contains an expensive precondition/invariant check, while the resulting mutation is either a trivial bit flip or it does not happen, it would be wise to not have the recipients of the replication log to re-do all the computation that the leader has already performed. Thus, a better solution would be to make the replication log that originates from the leader not the very incoming requests stream, but the stream of incoming requests enriched with the metadata.
In an extreme case, if we can afford to make assumptions on the lowest-level data storage, this enriched metadata may well contain the possibly empty list of lowest-level data mutations. In a design doc that I wrote in early 2021, this lowest-level data storage layer was literally a single 64-bit indexed memory blob. In practice, higher-order abstractions can and likely will be used, up to and including some "big-data" SQL-like table, where the "trivial" mutations are CRUD operations on the values of particular columns of particular keys.
Taking a small detour back from the world of theory into the world of practice, it is safe to say that the paradigm outlined above is the natural extension of the concept of the Transactional Outbox. One could refer to it as the Transactional Inbox, although the term is confusing, and I personally do not recommend adopting it.
The idea holds true though: instead of having the DB execution layer introduce total order to the transactions completed, stamp the respective indexes up front, possibly using a different architectural component, and then rely on replayability to propagate changes. I wrote about this in a LinkedIn blog post more than four years ago.
Last but not least: if you are looking for a practical implementation of a distributed CRDT-first database, Riak should very likely be on top of your list.
Separating the read path from the write path
If I were to go into full academia mode, I could go as far as postulating some modern-day specialized adjustment to the CAP and PACELC theorems.
The “Throughput-and-Latency-vs-Contention” Problem
Given we must ensure:
Strong consistency.
It is very difficult to also simultaneously ensure both:
High throughput with low latency, and
High contention in transactions.
If the throughput-and-latency requirement is dropped, any distributed RDBMS does the job. Take Google Spanner with its external consistency guarantees if you can afford it. Look into CockroachDB if you’re on a budget.
When CRDTs Rock
If we drop the high contention requirement, we are squarely in the CRDT territory.
When mutation requests are essentially unordered, the solution is to “just” use the data structures that are designed to be self-correcting regardless of the order in which the mutations are applied. The "computation" then immediately becomes horizontally scalable, and can thus take place on a large number of CPU cores, threads, machines, or even datacenters.
An example could be global counters, where frequent increment-by-one requests originate from anywhere in the world. Clearly, an atomic variable on a single CPU can only handle in the order of a billion increments per second; this number only goes down if higher-level synchronization primitives are used, or if there’s more to the “application logic” than atomically incrementing a counter by one.
But it is also self-evident that these atomic increment-by-ones can be batched and grouped, yielding the same result over time. The system will become eventually, not strongly, consistent, but in practice a small lag is almost always acceptable, and, for atomic counter increments, the lag can easily be brought down to the amount of time that is negligible compared to what the end users can possibly notice.
(Read: batch so that each node sends at most 200 updates per second, and each datacenter sends only 50 updates per second, and, with worst-case added latency of (1000/200) + (1000/50) = 25ms, the problem is solved.)
With persistent data structures, also known as immutable data structures, a lot of operations on data can be scaled horizontally in a similar manner. For instance, concurrent documents editing, for a product such as Google Docs, has long moved on from Operational Transformations (OTs) to CDRTs.
When CRDTs Don’t Nail It
The kryptonite for the CRDT approach is any problem where the order of mutations matters, and thus contention is impossible to avoid.
The textbook example I always use is to design a ledger-like system where each new user starts with a balance of one thousand units, and the mutation is always "send this (integer) number of units from Alice to Bob, as long as Alice's balance is non-negative after the transactions".
Clearly, conflicts are inevitable, since if Alice has five units, and the system requests two concurrent requests, where Alice sends Bob and Charlie three units each, at most one of these mutations can take place without breaking the invariant.
(Designing such a system to be high-throughput (1M++ transactions per second) and low-latency (~milliseconds per transaction for the end user) at the same time will require a far more sophisticated approach. To give you a glimpse, one could shard users by their IDs, reconcile everything within the shard in-place, and make some ~5000 batch cross-shard transactions per second, so that the “added” end-user latency is just a few milliseconds.)
At the same time, if the cap on the TPS of such a system is in the tens of thousands per second, the "trivial" single-CPU sequential approach does the job just fine.
In fact, it is quite easy to take this single-CPU-execution solution to a million transactions per second. Have the threads outside the "critical path" prepare batches of requests and warm up the in-memory LRU cache, in a manner similar to CPU instructions prefetching and branch prediction. Then have that "critical path CPU-thread" take all the contention upon itself, with "batches" of ~thousands of transactions, executed with full knowledge that all the data that needs to be mutated is already laid out in memory in a specific way. As a matter of fact, if these "batches" are code-generated in raw assembly, it is easy to get this single-machine multi-CPU-core machine setup to handle tens of millions of transactions per second, as long as the throughput of warming up the cache allows for this.
The above, of course, assumes that the transactions are as trivial as "if Alice has at least three units left, decrement Alice's balance by three, and increment Bob's count by three". Greatly simplifying the logic, this operation consists of two read-write "random" memory accesses (Alice's and Bob's balance), one branch (the if-statement), and two arithmetic operations (one addition and one subtraction). Each of these operations is what a modern CPU can do billions of times per second; thus, altogether, the above can indeed be run at ~50M TPS within a single CPU core, as long as all the data is warmed up in memory. Pragmatically speaking, if your "atomic transaction" requires ten "branches" and a hundred "nontrivial mutations", the expected TPS number has to be adjusted proportionally.
Needless to say, the operations executed within the "critical path" processing logic should be kept as trivial as possible. If there is a JSON to be parsed, a string to be split, or a enum value to be resolved from a LONG_STRING_WITH_UNDERSCORES, these tasks can and should be performed outside the critical path. It also goes without saying that these "slow" operations are all independent, and thus can and should be parallelized. If your required end-to-end throughput numbers are low (say, tens of thousands of TPS), and if tasks such as JSON parsing have to be handled on the same node, it is not unusual to have one thread doing the "critical path" work, plus twenty separate threads "crunching the JSONs" alongside it. It is also immediately clear from this design that the path to optimize this further is to move the "slow" logic outside this box, likely into some "facade" frontend cluster, which would, at the cost of a couple of milliseconds of end-to-end latency, feed the innermost "critical-path" logic with ready-to-be-executed stream of high-contention, totally ordered, and well-serialized mutation requests.
To recap: for most practical throughput & latency requirements, it does no damage to the overall characteristics of the system to confine the mutation requests to a single CPU of a single machine.
It is true that when ~billions of trivial checks & mutations per second are required, sophisticated solutions have to be employed. It is then often best to persuade the product owners to drop the strong consistency requirement, as it is orders of magnitude easier to build a system that is allowed to drop or overwrite a bit of data once in a blue moon, compared to building a bulletproof strongly consistent system. The designs then quickly get into the CRDT- and eventual-consistency territory, where a wealth of knowledge exists on how to do things right.
The message of this blog post is quite the opposite though. Even when strong consistency is needed, tens of thousands of transactions per second are rather trivial to process, and the high-level design can handle ~millions of them, if the implementation is carefully optimized in the right places.
I may well have called this post “Befriending Event Sourcing with High Performance”. But the goal right now is different. The important part is that Event Sourcing, when done right, does scale to impressive performance figures. But, at the same time, there exists a gentle way to introduce Event Sourcing into the organization, and this gentle way only requires a small buy-in to get the ball rolling.
To finish this part, thanks to a friend of mine, this work by Martin Kleppmann, also dating back to 2019, outlines a very similar pattern end to end. Figure 3 in it, which I take the liberty to paste here in full, talks about almost exactly the design I am advocating in this post, even though it is a bit more specific to financial transactions.
The Full Diagram
To end the “theory” section on a bright note, here is my Event Store Design Doc Miro board from early 2022, which presents a superset of the ideas outlined above.
Design
The approach presented in this document closely resembles the "Listen to Yourself" pattern. The pattern itself is not new, and it was documented several years ago, it is only now slowly gaining in popularity. Personally, I believe it’s a mistake that the pattern is not yet widely adopted. But better late than never.
Listen to Yourself
Here’s a diagram from this post on Redhat.com. Note that “Service A” and “Service B” can be the same service (as presented by Oded Shopen in the blog post linked above).
The Small Pieces
The high-level design for the Gentle Introduction consists of just a few components:
The Total Order Broadcast Engine (TOB), which essentially is a black box that accepts incoming requests, stamps them with incrementing-by-one indexes to ensure total order, and forward them further.
The Transactions Runner, that sits behind the TOB, accepts the ordered stream of transactions, applies them to the data, and performs two functions: produces the stream of mutations to the data, and maintains the materialized view of the data. Depending on the final design, either of these two can be the primary objective. For the Gentle Introduction, it is safe to assume that this component does both, albeit in a very limited fashion. Write requests, as well as the strongly consistent Read requests, have to go through the Transactions Runner.
The Replicas, which essentially sit behind the same TOB engine, but also listen to the Transactions Runner that is currently the leader. The Replicas may well run the very same code as the Transactions Runner, although, for performance optimization reasons, they would benefit from getting a hint from The Transactions Runner in the form of the “execution metadata” for each transaction request. Eventually consistent Read requests go through The Replicas. A Replica may become the Leader, i.e. the Transaction Runner, if instructed by the Orchestrator, if the current Transaction Runner is shutting down.
Storage, that effectively is the three-in-one: the storage for the totally ordered transaction request logs, the storage for the (also totally ordered) transaction execution metadata logs, and the storage for data snapshots, for faster provisioning of new replicas as needed.
The Frontend Cluster, or the Facade, so that the end user can send simple one-off RESTful read and write requests (commands and queries respectively), and receive responses to those one-off requests. Effectively, the frontend cluster routes eventually consistent reads to the replicas, and, for writes and for strongly consistent reads, it acts as an async-to-sync gateway, since in the Listen-to-Yourself pattern the channel via which the request is sent into the system and the channel in which the result appears are two logically different channels. The frontend cluster terminates incoming requests and keeps them in memory while each respective request goes full circle through the system.
The Orchestrator. May well be unneeded if each of the components is implemented in the fashion that ensures high availability, for example if a single-topic single-partition Kafka stream is used to stamp total order indexes to the incoming requests, and if a distributed ACID RDBMS is used to evaluate those requests as transactions in an idempotent setting. For a full-blown low-latency high-throughput system, of course, the orchestrator is responsible for keeping the leader up and running, both during planned version upgrades, and in the case the current leader is misbehaving or not responding.
In fact, many of these pieces can be greatly simplified, or even omitted!
The Big Picture
The trivial, “most gentle”, design may well be based on a single PostgreSQL instance.
Keep all the application logic intact, and execute the transactions not right as they are received, but after running them through an append-only autoincrement-keyed journal table, in the listen-to-yourself fashion.
Such an approach, in fact, is the recommended way, if the company is resisting major changes in its data model, and small steps is the only viable strategy.
Shameless plug: here’s the diagram, which we jokingly refer to as The Pyramid internally, back from 2015, of what we have built together with Max Zhurovich over the course of 2015 .. 2017.
We even implemented zero-downtime schema evolution back then. The (C++) code was up and running with no downtime. It likely still does, although I have not personally touched its backend since circa 2021.
Only a tiny part of this is truly required for the Gentle Introduction though, so worry not. I will touch on schema evolution towards the very end of this post, in the FAQ section.
The Bright Future
If and when I have a chance to implement such a system end-to-end, at a larger scale, the most important major thing I would add is the DSL to define the schema for the stored entities, and to implement the code of the innermost transactions.
This DSL will effectively play the role of stored procedures in SQL databases, or of Lua scripts in Redis. The very code of those procedures will then be put into the same storage, and journaled in the same append-only immutable event log, similar to how smart contracts are stored on modern blockchains these days.
(I’ve invented this before the blockchains did, and believe a hybrid approach of running an infinitely scalable proprietary secure blockchain-ish “DB-plus-biz-logic-aaS” is the future, but that’s a subject for a totally different blog post.)
Done this way, the DSL can be locked down dramatically, to be all-powerful and Turing-complete on the one hand, while provably fast per transaction on the other. This will ensure no deadlocks and no spikes in tail latency. Also, a self-running tasks scheduler can be implemented, for migration tasks on live data to be performed in a throttled way, so that the user traffic is not affected, while petabytes of data are changing shape behind the scenes over the course of several hours.
A higher-order language can then be added, to include not only the very innermost mutations code, but also the logic that can and should live on the Facade level. This upgrades the design from The Fast and Consistent Database to the Backend-for-Frontend for Everything, since any and every data fetching task, from minor “joins” to user-facing paginations, can and will be served in one network round-trip, in a consistent and predictable fashion.
FAQ
For my goal is to help people gently introduce Event Sourcing in their organizations, to end this post, I feel the urge to provide top-level answers for the most common pieces of criticism.
Is the source of truth in the events, or do you keep all the data in the easy-to-query format?
Well, both.
Eventually consistent reads happen from read replicas. The read replicas are the materialized views of the data. If SQL-based patterns is what your team prefers, by all means, put an SQL database as the vehicle behind some (or most) of these materialized views, and query these SQL databases as you please.
For lower-latency eventually consistent reads, consider ElasticSearch, or Lucene, or ClickHouse, or even Redis. The further down this list you go, the more you will have to worry about cooking the data one way or the other. In an extreme case, use Lua stored procedures in Redis both for updates (replaying the log) and for queries; you will then need to maintain your own data structures that are optimized specifically for your query patterns.
Strongly consistent reads follow the same pattern as the writes, since writes are always strongly consistent. Neither the writes nor strongly consistent reads should perform computationally expensive operations regardless. If your business logic needs them, it is then on you to de-normalize your data accordingly, so that those expensive operations are not too expensive, but rather affordable, for they will affect both the throughput and the latency for other requests.
Canonical Event Sourcing as I view it does not go into sharding. This is a separate topic by itself, as sharding immediately moves the system from the monotonically increasing time paradigm to the vector clock paradigm. This will then require cross-shard transactions that will bring with them two-phase commits and distributed locks, which are inherently evil, and should be avoided at all costs.
With this being said, the mutation events are all there, ready to be replayed, and ready to be exposed via any pub-sub bus. If your logic needs to process these mutation events (or, rather, strongly typed mutation commands), this can by all means be arranged under this paradigm.
Are there standard solutions? We are not ready to go all in with a custom-built “database” powering our system.
The Gentle Event Sourcing paradigm does not introduce any new technologies by itself.
The data itself can be stored inside a PostgreSQL, or in some other RDBMS.
In fact, as long as the Total Order Broadcast engine is behaving correctly, and as long as the “critical path” inner loop logic is evaluated in a reproducible way, even MongoDB can be used within this paradigm.
For the Total Order Broadcast engine, a single-topic single-partition Kafka stream would do just fine. For more on-prem options consider Redpanda, or a PostgreSQL with an autoincrement-keyed table. For cloud solutions, Kinesis, with or without DynamoDB’s CDC, is a viable option; and so is Google PubSub.
For leader elections, first, it is possible to do without them as long as both critical components — the total order broadcast engine and the transactions runner — are highly available, durable, and resilient. If this is not though, etcd or Consul would do just fine.
None of the above sound like non-standard components to me.
How do you do data migration?
The simplest answer is that you add a replica and wait until it is fully populated with data.
Having that replica replay all the events from the beginning of time is viable, although might be slow. Thus, most likely, a snapshot will be used.
What about snapshots?
Snapshots of data are straightforward, as both the transaction runners and the replicas maintain their own complete views of the data. The native means of their storage layers (PostgreSQL, Redis, etc.) can be leveraged to create those snapshots.
For the snapshots of mutation logs, I personally recommend storing them as files on S3. Compress them as needed, and compactify them until the files are of reasonable size, both megabytes-wise and wall-time-wise. In practice, some “daily .gz-s” would probably do just fine.
The tricky part is to tie data snapshots to query snapshots. And this is doubly true if the schema of the data changes mid-way. But, simply put, make sure there is a one-to-one correspondence between mutation logs and data snapshots. For an extremely trivial case, separate mutations and schema mutations over time, and have one data snapshot end right before the schema changes, and the next one begin right after it does.
This ensures mutual compatibility between them.
For bonus points, it is now easy to spin up the storage layer, a.k.a. The Database, from the state in which it was at any given time in the past. For unit/integration testing and/or for data investigation purposes. Ain’t this great?
Alright, so what happens when the schema does change?
The concept of data versioning, as well as the concept of backwards- and forward-compatibility, have a different meaning in the world of Event Sourcing.
Strictly speaking, in the world of Event Sourcing, the data should always be consistent. If its format needs to change, first and foremost, the code should be implemented to handle both the old and the new format. Only then can the underlying data schema be augmented.
All in all though, the migration process is not very different from the process of migrating an API to the next, backwards-incompatible version. Let’s walk this path on an example of retiring a field from an API.
Since non-incompatible changes are far easier than the incompatible ones, you first do as much homework as you need to prepare to retire this field. This may include backfilling the data, so that there is no need to run through a large number of if-statements in the application logic code.
Then you implement a change that assumes the field is gone for good. If a strongly typed language is used (and you should be using a strongly typed language for stuff as important as the underlying data schema!), you will receive an error for each and every attempt to use the field that is not being removed. For bonus points, you can first mark this field as deprecated, to have it trigger warnings, not errors, and dedicate some period of time, say, a month, to chase all the teams responsible for the code that may be using that field, so that they can make the changes in their code without your personal intervention.
Optionally, you may want to deploy the new stack alongside the old one, and “tee” the traffic to it as well. You may even send it the read requests, and confirm that they return the same results as the “original” code, off which the mutations stream is being forked.
At the final step, you upgrade the code in prod, retire the now-obsolete field, and observe the system to operate normally.
We have done this before, and we even implemented the zero-downtime schema evolution process on our stack, Current. I am not saying this is easy, but it can certainly be done. And, with a bit more work, it can certainly be done in a way that enables self-service, fool-proof operation.
What about tooling? How do we look into individual events, find bugs in our code, roll back faulty releases, write playbooks for the SREs, and overall troubleshoot and operate the system?
Well, I have good news and bad news for you.
The bad news is that there is no tooling that mature to the degree that one can expect for established products such as PostgreSQL or DynamoDB or Kafka.
The good news is that I wrote this blog post specifically to outline the gentle approach to Event Sourcing. Yes, it is on you and your team to implement this tooling. Yes, this will take time.
But it will very likely take less time than maintaining an enormous data system with disparate storage layers. Especially if the business needs you to make this system emit events about its state changes, so that these events can be replayed on the receiver end to build a materialized view of this data.
Seen in this light, gently introducing Event Sourcing through the listen-to-yourself paradigm first may well be the fastest and the cheapest way to make it there.
May the force be with you.