Apache Kafka Streams and Tables, the stream-table duality
4 min readIn the previous post, we tried to understand the basics of Apache’s Kafka Streams. In this post, we’ll build on that knowledge and see how Kafka Streams can be used both as streams and tables.
Stream processing has become very common in most modern applications today. You’ll have a minimum of one stream coming into your system to be processed. And depending on your application, it’ll mostly be stateless. But that’s not the case with all applications. We’ll have some sort of data enrichment going on in between streams.
Suppose you have one stream of user activity coming in. You’ll ideally have a user ID attached to each fact in that stream. But down the pipeline, user ID is not going to be enough for processing. Maybe you need more information about the user to be present in the fact. For this, you’ll query your database, fetch the user record, and add the required data to the fact. This enriched fact will be sent to another stream for further processing.
As you can imagine, streams work closely with databases, in most practical applications at least. And that is why, partly, Apache introduced the concept of KTables in Kafka Streams. This enables the stream-table duality.
The Stream Table Duality
In Apache Kafka, streams and tables work together. A stream can be a table, and a table can be a stream. It is a property of Kafka Streams with which we can attain this versatility. Let’s see what I mean.
- Table – A table can be seen as a collection of changelogs of a stream. This is to say that a table will have the latest values of a particular fact at a given point in time. For example, if we’re maintaining a stream of every event on the cart in an e-commerce application, a table would have the latest status of the cart. If we play back the changelog, we should be able to create an actual table.
- Stream – Similarly, a table could be viewed as a stream, with the latest values for a particular field coming in. It’s just a snapshot of the latest values of a key in a stream at a given point in time.
Let’s now look at KStream and KTable.
KStream
KStream is nothing but that, a Kafka Stream. It’s a never ending flow of data in a stream. Each piece of data – a record or a fact – is a collection of key-value pairs. What’s also to note is that each fact going into a stream becomes immutable by nature. The only way you can change any value after sending a fact into the stream is to send another fact after updating the value.
KTable
A KTable is just an abstraction of the stream, where only the latest value is kept. For example, suppose we push the following fact into the table:
{
"name": "Sunny Srinidhi",
"city": "Mysore",
"country": "India"
}
A day after this fact is in the stream, I move to a new city and this change has to be captured in the system. So I send another fact into the stream with the following data:
{
"name": "Sunny Srinidhi",
"city": "Bangalore",
"country": "India"
}
Now, instead of considering this as a new piece of information, the previous fact in the KTable will be updated to reflect the new values.
As you can see, KTable works mostly as a traditional table in a database. The only difference is, every entry into the KTable is considered as an UPSERT (Insert or Update). Which means, if there is an older version of the data in the KTable, it will be UPDATED with the latest values. But if there is no older version, the fact will be INSERTED into the KTable.
One thing to note here is that KTables have a special meaning for the value null. If you send a key-value pair to a KTable where the value is null, it’ll be considered as a DELETE instruction, and that fact will be deleted from the KTable. You have make sure that you don’t accidentally send any null values from you program into a KTable, or you might lose the data you have stored.
GlobalKTable
GlobalKTable is an abstraction on the already abstracted KTable. This comes in handy when we’re dealing with distributed applications. Let’s take an example for better understanding. Suppose we have an application which is populating a KTable. Because of a surge in traffic, we’re deploying more instances of the application in a cluster, say 10 instances. And each instance is reading data from a separate partition of the underlying Kafka topic.
Now, each instance will have it’s own copy of the KTable. This local KTable will be populated with data from only that particular partition assigned to that instance of the application. So none of the local KTables has all the data required. If you’re running joins or aggregations on the table, your results are going to be skewed because the data just isn’t complete.
In this case, instead of using a KTable, if you use a GlobalKTable, this table will be populated with data from all partitions. So you have a much more complete set of data in all the local instances of you GlobalKTable. On top of this, GlobalKTables also have a few aggregation and join benefits.
In future posts, we’ll see a few code examples in which we use the Kafka Streams APIs to practically implement all these concepts we just tried to comprehend.