Publishing Enriched Data Update Events
When caching and "poor man's replication" play well together.
Over the past few years I keep coming back to the same conversation from various angles, both at work and outside.
When it happened again, I figured there is no harm in blogging about it, so that people smarter than me can participate in the conversation.
The scenarios and strategies proposed are hypothetical. Any and all similarities to the projects I may be involved with now or in the recent past are purely coincidental.
Broadly speaking, this post will be about this, image courtesy www.striim.com:
From Pulling Data to Pushing Data
Say, a certain component, or a certain business domain if you wish, owns some data.
By owning here I mean end-to-end ownership. It includes, but is not limited to, contracts and invariants, storage, data access APIs, the schema of the data, its evolution, maintaining the respective services in production — well, everything.
Then the time comes when other components depend on this data changing in real time. In other words, some modifications to this data should result in more changes in different business domains downstream, with this very data being the upstream in some logical DAG of cross-team dataflows.
At first, it is tempting to place part of the real-time dependency logic on the side of the very data owner itself. For example, I am quite positive that early versions of Facebook had some hard-wired dependency on the "posts service" from the "notifications service".
In other words, I am quite positive that a nontrivial amount of logic responsible for the now-online user seeing a red dot emerge on their notifications button was implemented as an add-on to the very service that is responsible for persisting their friend's post to the database. Leaky abstraction? For sure. But it works, ship it!
Once the pattern is sufficiently recurring, it becomes less feasible and overall undesirable to keep adding the logic of reacting to data changes to the very domain that owns this data.
By this point we are solidly in the realm of pushing update events as opposed to polling for changes. This is the canonical use case for a pub-sub bus. Yes, by this point the word Kafka should have earned a place in your and your team’s vocabulary.
Once the engineering of a certain product reaches the critical mass in size and maturity, a phase transition takes place. The notion of data update events is introduced, the schema for these events is agreed upon, the data owner becomes the producer of these update events, and other teams, any number of them, really, act as consumers of those events.
Also, it is at this point when a dedicated team that maintains the pub-sub bus is formed, generally, in the infrastructure domain. The mission of this team is to embrace decoupling producers from consumers, so that the former do not have to worry about the latter and the latter do not have to worry about the former.
Also, microservices. This is fertile ground for breeding microservices, but this gentle topic is best kept for some other day.
Enriched Events, Schema, and Evolution
If you are a small high-octane tech-savvy team looking into prototyping the above, I bet it would not take you more than a week to push a PoC to staging, and even to production.
In doing so, you would most likely argue that, for the first version, you do not need to concern yourselves with the very contents of the data that is being changed. Just include the ID key of the just-modified data entity, and the job is as good as done.
The consumer, or the consumers, can then decide if an update to a particular piece of data is of interest to them, and, if it is, make use of the good old RESTful getter to retrieve that data by its ID.
If you truly care about consistency, you may even decide to keep revision numbers attached to each data entity, and have the update events that travel through the pub-sub bus contain this revision number as well. This way the very RESTful getter may also accept this revision number as an input parameter, making this GET operation idempotent.
This GET operation would also very likely happen over a gRPC channel these days as opposed to the HTTP+JSON combo, but this does not change the overall architecture.
Update events that contain the ID key of the changed data entity but not its contents are commonly referred to as shallow events. The opposite of shallow events are the enriched events, where the very value, not just the key, is also contained in the event body that travels through the pub-sub bus.
The terminology is less settled when it comes to the contents of what is being published and subscribed to. There is no clear differentiation when it comes to whether they are messages or events. Beyond that, there is no clear definition of what "business events" stand for.
A minor bit of sanity in this chaos is that people appear to agree that events should describe what has just happened, i.e. what should be treated as a change that is now fully committed.
I personally argue that business events are the events the domain of which has to do with business terminology, for example, "a certain user was added to a certain organization with a certain set of permissions by a certain admin". Reacting to business events has effects in the domain of business, such as further reporting, billing, or regulatory actions to be taken. Non-business events are those of lower granularity, and they are generally coming from the domain of technology and implementation. For example, "a user-organization relationship was created" (to indicate role change), plus "the organization metadata was updated" (to reflect the changed number of users), plus a few more events representing the respective lower-level changes in the relevant bits of data.On particularly quiet and starry nights, I dream of architectures where the rules of converting business events into the respective lower-level changes are formal enough to be replayable, identically, both by the originator of the event and by whatever entities might be interested in maintaining an eventually consistent materialized view of the business domain of interest.
This comes close to the idea of smart contracts, where each data entity lives on the ledger, and the same ledger is used to store both the constantly-evolving schema of this very data entity, and the "program code" of the logic by which this data entity — which, clearly, is a monad, speaking in functional programming terms — can be and routinely is mutated. A ledger, outside the Web3 world, in a non-adversarial setting, is actually quite a trivial storage model. A man can dream though; while I believe this is the proper vision for the future of our industry, I did not quite succeed all the way with evangelizing and implementing it in three out of three companies where I tried. Do hit me up if you share this vision and have a decent practical problem for which implementing it is the most viable pragmatic solution. But I digress.
Further down this text, I will be speaking of the enriched events, as it is the world of enriched events that opens far more possibilities and presents far more challenges.
I would note, however, that "just" reverting to using shallow events does not magically rid one of the challenges that I outline below. Avoiding them, in my view, is merely kicking the can down the road, which, while a noble approach in the spirit of getting things done at times, is generally just a fantastically effective way to stockpile technical debt for the future generations of your engineering team to choke at.
There is a huge can of worms that this post is too narrow to contain, but for the sake of completeness I should touch on it: schema version upgrades.
TL;DR: the traditionally accepted and practically safe approach is to:
Discriminate between minor and major version changes; likely use semver.
Minor changes are backwards-compatible. They can be thought of as adding (always optional) fields to `.proto` messages.
Major changes are backwards-incompatible. They require cross-team collaboration to roll out.
Generally, it's OK to tie major changes in data schema to major changes in API versions.
In practice, this means a new major protocol version results in a new API server and in a new Kafka topic. The previous and the current versions of the API, as well as the two Kafka topics, can co-exist for a while; the older one is then obsoleted. This simple way ensures that at no time the client should support both the old version and the new one in the same code path.
Last but not least: Event Sourcing, as well as the Change Data Capture (CDC) paradigms can, and, in my opinion, should be viewed as particular instances of the overall approach outlined above.
They can be quite useful in practice, as CDC-friendly contracts of different products, such as subscribing to the logical replication log of PostgreSQL, are generally well thought of, and there is no shortage of consultants who would gladly configure and maintain such setups for a reasonable fee. Architecturally though, I would argue that understanding the big picture from first principles is still instrumental to implementing proper architecture, even if and when the ultimate solution ends up being exactly this: getting hold of several months worth of some external people who have done it N times in the past and are glad to make it happen for the (N+1)-st time.
The Cache or a Replica?
The next important conversation to be had is whether the consumer of these enriched data update events should store the state that is being transmitted to it.
The short answer is yes. The protocol allows for exactly this: setting up a logical materialized view of the data, the updates to which are being pub-sub-pushed by its producer as a stream of enriched events.
While we are on this page, let me address two major concepts that often come up at this point:
The "seed state". The idea is that in order to maintain a complete, 100% accurate, copy of the data, behind by just the replication lag of the pub-sub bus that is almost always sub-second under normal operation mode, some initial state should be provided, to initialize this newly emerging materialized view.
The "replayability" of the log. In the "weak" meaning of this term, the consumer may wish to "go back in time", for a short while, and replay some most recent events in case there was an outage of some sort. The "strong" meaning of replayability implies that the entire log of enriched mutations is maintained and can be accessed, so that no seed state is ever needed, as the materialized view can always be created from scratch, by replaying this log of mutations from the very beginning and applying them one after the other.
Realistically, the seed state is hardly used, as not having some records from the past is generally not a problem. The origin of the enriched events remains the source of truth for this data — this source of truth data is stored in some database, of course! — and the contract with this data origin component most certainly includes the means to get the data by its ID key: the GET API.
When it comes to replayability, Kafka veterans would often open up by saying that while it is possible to configure Kafka to store the contents of its topics for a long time, this really is not a standard operating procedure. "Weak" replayability is often kept in mind, mostly for disaster recovery purposes, so that local outages are easy to mitigate without having to manually dig into DB backups and then manually update the affected materialized views downstream.
"Strong" replayability is a very special beast though, and, while I once have had the privilege to be part of the team that implements such a system, keeping the mutation log of all the data since the beginning of time is certainly not the architecture I recommend for most practical applications.
So, we have a materialized view of the data. Is it a cache or a replica?
The very fact that we may not need the "seed state", and the very fact that some data may well be absent in this materialized view, suggests that we are dealing with not more than a non-durable cache.
In fact, it is perfectly feasible to imagine a design where the very cache has a standard expiration policy, say, LRU, and that whatever enriched events come in are first checked against the set of currently cached keys. This way the value of a cached copy is updated only if its key is evident to be of interest in some recent past. Also, from the storage selection and cost optimization standpoint, there is no reason to invest in the durability of the storage used for this cache; whatever happens to be lost can be recovered easily.
Furthermore, in case of an outage or an otherwise misbehaving service, parts of this cache, if not outright all the cache, may be declared invalid, with the data dropped, logically or even physically. Presumably, after a short spike on the load of the GET API of the respective data provider, things should quickly get back to normal. Moreover, the load on this very GET API can be throttled to ensure smooth operation of the whole fleet. Not a particularly trivial design pattern, but not rocket science either.On the other hand, the term "poor man's replication" describes the process of maintaining this very materialized view quite accurately. Such a definition is far better than referring to this component as some "read-through cache, the contents of which are also updated real-time in accordance with update events that are being pushed”.
First, if we were to trust this cache — which we should, otherwise there is no real point in using it! — we have to require the guarantee that any and all updates to the data are broadcast as enriched events through the pub-sub bus. Otherwise how can a client, that reads from the cache, know that the version of the data it is looking at is current, accurate, and behind by at most a split second? What if this very record was put into the cache last week, then immediately deleted, but the delete event somehow felt through the cracks?
If we allow for such a possibility, we may well use a far simpler cache, without any pub-sub bus, and all but the most frequently repeated read requests would have to invoke the very GET API of the original data owner. Not cool.
Second, and more important, is that since the materialized view of the data is complete, this view can be not just the original view of data, but a severely transformed representation of it. For an example, consider a search index.
Imagine that the data, streamed via the enriched events, contains a free-form text field, which needs to be indexed so that search queries can be issued against it. Clearly, the very stream of these enriched events should be good enough to maintain the search index up to date. However, it would be a disaster if this search index is not completely, but only partially up to date. Imagine further that the caller service, responsible for end-user search, now needs to first query the search index, and then query the very origin of the data, iteratively, for each potential search hit, just to confirm that the identified part of the text is still there? Looks like a very poor design to me.
Thus, if consistency is required, the producer of the stream of enriched events should absolutely view it as the logical replication log of the very data it owns, to an external destination of arbitrary nature. There really is no other way.
So the TL;DR of this section is that storage-wise the component on the receiving end of the stream of enriched events can be viewed as a non-durable cache, but the functional requirements for the very replication protocol are as strict as if this protocol is used for database replication.
The Key Tradeoff
The above was not short, but I am confident that the picture would be quite familiar to many people working with real-time dataflows that require strong consistency guarantees.
In fact, I made it longer than it may have been for a reason: I, for one, believe in my skills of packing relatively complex subjects into relatively short, yet concise and complete, summaries.
So apologies if you found it too verbose, and may I suggest you view those extra several minutes you spent reading through it as an investment into that a broader group of people now has got a better grip of the subject domain.
Now here is the conversation that emerges some two or three times every year in my professional life. And it happens to be the conversation that I admit to be at about the level where I am not confident or bold enough to know the "correct" answer, or even the "correct" approach.
This conversation is: Who owns the schema and the contract of the very enriched events?
There may be multiple possible answers:
One can argue that the team that is responsible for the very data should own the schema and the contract. The consumers should then work with this team to get the respective fields exposed, and the pub-sub team acts as the support team on the infrastructure side.
One can argue that the consumers of these events should run the show. The team that owns the data should be able to meet their demands, adding new fields and/or topics and/or enriched streams as necessary, and working with current and future consumers to ensure there is no duplication or waste.
One can argue that the pub-sub team should be the cornerstone of what goes through the pub-sub bus, as each topic creation and/or schema change request should be owned by them.
Of course, a data-savvy organization may argue that the data team, or a specifically created real-time data team, should act as an overseer of all the data that needs to be accessible both via synchronous GET APIs and by subscribing to the stream of enriched events.
Last but not least: the architects team can claim ownership of the set of best practices and guidelines of what should and what should not make it into the streams of enriched events. I have personally spent many and one hours arguing that the idea of “the event bus, but only for business events” makes little sense, as quite a few consumers will actually need to build their own materialized views, thus rendering the pub-sub bus pointless unless it literally captures 100% of updates to the data, however nontrivial the business reasons behind those updates might be.
When it comes to architecture and to the inverse Conway maneuver, there are two fundamental ways to approach the architecture of the above system, top down and bottom up.
Top down: Establish domain boundaries, agree on the principles by which further cross-domain contracts will be defined, then derive these contracts from these principles, on a case-by-case basis.
Bottom up: Outline what exact problems will this unification solve, come up with a solution that is easy to implement and that addresses these problems to a sufficient degree, rinse and repeat until a pattern emerges, DRY or WET depending on whether you feel more or less pedantic.
Top down, the questions to ask and answer, roughly in the order presented, would be:
Which business domain, and which business unit, should be responsible for the content of the very events?
Who are the consumers of these events, and what are their business requirements?
How can we serve the needs of various current and future consumers of these events in a generic fashion?
What are the boundaries between the events producing team, events consuming teams, and the infrastructure / pub-sub maintenance team?
The company-wide architecture then is, through answering the above questions, to derive the strategy and the blueprints of how the enriched events should make their way into production.
The decision on who owns what, with respect to these enriched events, is made based on the principles similar to the principles from which other important decisions have been made, such as: what tech stack to use, which programming languages to recommend, what should make it to the tech radar, and what features are universal enough to become part of the microservice template.
As you can see, I am quite skeptical that the top-down approach can help decide which of the (1) .. (5) options above, if any, is the “correct” one.
Bottom up though, the picture looks crystal clear to my eye. We begin by postulating the problems, from most important to less important, that need to be averted. To me, they are:
Reduce, or, better yet, eliminate any room for human error.
Do not require manual changes when it is unambiguously and programmatically clear what exactly those changes should be.
Ensure the above end-to-end.
Including the producer side, the GET APIs, the consumers, the contents of pub-sub messages, including doing all the way downstream, to the very consumer level.
From them organically follow the solutions:
Code is king; extend configuration-as-code all the way down to dataflow-as-code.
Agree on what the source of truth for the schema is.
Derive everything from it.
As automatically as possible.
To take an example, consider the task of making a certain free form text field of a certain data object full-text-searchable. In the bottom-up fashion:
There is a single place where the very schema of this data object is defined.
From this single place both the schema of a [RESTful] GET response and the schema of the published enriched event is derived.
In this very single place the respective field is annotated as "needs a full-text search index for".
This single place also serves as the source of truth for the code that is on the receiving end of these enriched events.
Whatever code can be auto-generated is autogenerated, or out right templated.
The ultimate litmus test: What if another free-form text field needs to be search-indexed in real time? From the grounds of a bottom-up solution, it's a piece of cake:
This field to add the search index for needs to be annotated as such.
Done.
Yes. That's it. No further work is required.
Granted, in real life the "Done." step may require manually pressing a few buttons, as the rollout process of the code is iterative, and certain steps are better to start after the previous steps have been successfully completed.
For example, if the newly added field was not present before, ideally, it is:
First added to the database,
Only then to the [gRPC] API,
Only then to the contents of enriched events,
Only then to the search index,
Only then to the very API that performs search queries,
Then tested, manually and automatically,
And only then exposed to the real-life users from other teams.
In fact, the users of the future full text search API call for this new field can receive the full spec of this call right as the new field is marked as full-text-searchable in the very source of truth of the schema. Even though it may take a few more hours or days until this new API endpoint is fully operational.
Also, from this design it is crystal clear that the schema of the data that travels through the stream of enriched events is logically identical to the schema of the data that is returned via the GET API. Of course, the GET API may choose to not return certain fields; or there can be multiple GET APIs for various use cases, but this behavior can and should be controlled by annotations to the metadata of the schema, in that very single place where it is defined.
As you can see, in my woldview, the mindset of "whatever travels via the enriched events bus has nothing to do with what the GET API returns" is just wrong. Because, from the standpoint of minimizing future tech debt, either we agree early on that whatever travels via the enriched events bus has everything to do with the payload of the GET API, or we are sentencing our future developers to years and years of refactoring, as the "auxiliary code" that converts the data between various formats and versions will only be growing over time, together with the bugs it will inevitably introduce.
In practice, I would argue that, if gRPC and proto-messages are used, it is best for the enriched event to physically contain a field of the very same type that the GET API returns. In particular, this makes it very natural to store the blobs of this very type as the values in the "cache" that is on the receiving end of the "poor man's replication" pipeline.
The counter-argument to such a purist approach is that it may not be feasible in practice, as the requirements for the clients of the GET API may well differ from the requirements of the clients of the stream of enriched events. To this I then reply: fine, but do make sure that the code that will be converting the data from the GET API format to the format of the enriched events, and/or vice versa, and/or to the ultimate "common denominator" format that is to be stored in the cache, can be 100% auto-generated from the very single place where the schema of the very data object is defined.
Personally, I am finding myself more and more in the camp of the bottom-up-ers. But this does not mean I am fully convinced this is the best strategy from the standpoint of building a large engineering organization. Hence this blog post, and I would appreciate feedback and comments.
Foreword
I will stop here, but, for the sake of completeness and for my own sanity, it has to be noted that, in a proper real-life design for the above, a few more major topics should be covered. In particular, these topics are:
Topics and partitions, as well as the very "to Kafka or not to Kafka" question.
Key schemes, and who owns them, as some consumers would appreciate the means to run a quick rejection test on an incoming event without having to parse its contents.
Causality, data interdependencies, linearizability, and vector clocks where needed.
Dealing with consumer lag in particular, and with backpressure in general.
Troubleshooting and disaster recovery procedures and playbooks.
Testing.
Wink twice if the above hits close to home, and/or if you liked how I outlined the problem and the solution. If there is enough interest, this very topic may be worth a dedicated SysDesign Meetup episode.
Happy Independence Day!