Data streaming with Apache Kafka - guide for data engineers
In this article, I am going to describe the fundamental concepts of Apache Kafka. The text should provide answers for data engineers who are considering using Apache Kafka in their infrastructure and to people who are preparing for a data engineering job interview.
First, let’s focus on message queues. Why do we use them?
Communication between applications
In the case of backend services, we use message queues to decouple applications. When application A receives a request from the user and performs an action, it may send a notification using a message queue. The application B may receive the notification and update its state. Application C may receive the same message and trigger another action as a result.
In all of those cases, the applications may not be even running when the application A sends the event. There may be more applications that read the same message queue and ignore the message because it is not useful for them.
Most importantly, the programmers who implemented the application A don’t need to know anything about systems that consume their messages.
They don’t need to provide mocks of their APIs in the tests. They don’t need to implement circuit breakers to protect their application from failures of the dependencies.
In fact, they don’t need to even care about the subsequent actions that are the result of the notification they sent.
What if we need to send emails to half a million users? Do we send half a million requests to the mailing service? It does not look like a good idea.
Even if we replicated the service and have enough instances to handle half a million simultaneous requests, something is going to fail. Perhaps an overloaded mailing server is going to be the cause of the failure. Maybe the network is too slow, and some connections are going to timeout.
We should somehow buffer the requests. Of course, buffering in RAM is always an option. In 2020, half a million requests are not even a lot, so it should not be a problem. Of course, if the service fails for whatever reason, we are going to lose all of the buffered messages.
In both scenarios, our inability to recover from a failure causes the biggest problem. We can’t just retry sending half a million emails. A failed service may have sent some of the messages successfully, but it did not have an opportunity to report the success.
To solve the problem, we use message queues as the request buffer. In our example, the application which wants to send the emails should send them first to the message queue. Message queues are designed to handle a huge load, so it should not be a problem, especially if we are using Apache Kafka.
In this setup, we have another service (or a group of them) that read messages from the message queue and send the email.
If something fails, we have a few options. We can stop processing at the first failure and investigate the problem. The messages in the queue can wait until we fix it. We can also store the error somewhere (perhaps in another queue) and continue sending messages. In the end, we may try to resend the failed messages.
Of course, we may resend an email that was successfully sent, but we failed to report the success. In that case, some people may receive duplicates. It is not a significant risk if we send duplicates to a few people. If the user complains, we can always apologize. Try apologizing to half a million users ;)
It is not a usual use-case of message queues, but Apache Kafka offers Connect API, which we can configure to propagate changes from one database to another automatically. We are not limited to databases! It is possible to write a Connect API implementation that reads from/writes to a REST API endpoint, Amazon S3, or other queues.
If the implementation we need is not already available in Maven or a GitHub repository, we can quickly implement a new Source, Sink, or Transformation (yes, it is possible to apply data transformations in the queue).
This feature is particularly useful for data engineers because we routinely synchronize data between multiple databases or applications.
Real-time data aggregation
Finally, the most interesting use-case. Why do we bother with data streaming? Of course, because we want real-time data! Not just all of the data. That would be useless. We want a data aggregation that gets updated in real-time.
With Apache Kafka, we have two options. We can implement an application that reads from a Kafka topic and aggregates the data, for example, a Spark Streaming application. Alternatively, we can use KSQL - a specialized SQL-like language that allows us to query the data stream without writing code.
What about the results? We have Kafka Connect API, so we can aggregate a data stream using KSQL, send the aggregation to a separate Kafka topic, and, finally, write the results into a database or send it to a REST API.
Apache Kafka architecture
Now, when we know why we may want to use Apache Kafka, let’s have a look at its building blocks.
Kafka queues are implemented as an indexed append-only log. Because of that, multiple readers can retrieve messages from the queue at the same time. Every reader may start reading at a different index, so it is not required to start at the beginning of the queue or the most recent message.
The queue is append-only, so messages don’t disappear when a reader retrieves them (but we can specify the retention settings to remove old messages). Also, Kafka tracks what messages have been read by the clients, so the consumer may choose to continue reading the queue from the last received message or reset the offset and start from the beginning or any arbitrary position in the data stream.
The most fundamental part of Apache Kafka is the concept of topics. Everybody who has ever used a messaging service should intuitively understand what the topic is. People not familiar with that may think of a topic as the identifier of a data stream.
At any given time, there may be multiple topics on Kafka servers. The messages written to a topic get replicated between multiple nodes of a Kafka cluster to ensure that we don’t lose data in case of a failure.
The consumers subscribe to a topic to get messages. In KSQL, topics may be used as the output of a query and in the FROM and JOIN clauses.
Obviously, producers of messages have to specify the topic. In the Connect API, it is possible to change the destination topic by using a Transformer that extracts the new topic name from the message. For example, the
io.confluent.connect.transforms.ExtractTopic Transformer provided by Confluent implements such a behavior.
In almost every data engineering project, topics are not enough. We need to split the topic into partitions because that allows us to parallelize message consumption by connecting multiple consumers to the same topic. In this case, every consumer reads messages that belong to a single partition.
The producers of messages should specify the partition key. Kafka calculates a hash function using the partition key as the input. The output of the hash function (a number) is used to assign the message to a partition.
It is possible to publish messages to a partitioned topic without specifying the partition key, but it is not recommended. The Kafka’s default behavior in case of a missing partition key is suboptimal.
Optimal parallelization requires more than specifying a partitioning key. If the output of the Kafka’s hash function is not uniformly distributed, we are going to end up with skewed partitions. It is a situation in which one partition receives more messages than the others. Skewed partitions lead to the unbalanced workload of the message consumers.
We can solve that problem by changing the partitioning key to a value that gives us a uniformly distributed output after hashing. If there is no better option, a random number should give us optimal partitioning.
In addition to subpar hashing, we may face problems caused by slow consumers. In this case, the messages may be uniformly distributed across all partitions, but one of the consumers is slower than the others.
It is best to fix such issues by optimizing the consumer’s code or adding more partitions. Alternatively, it is possible to tweak the partitioning key until we get the desired distribution, but that solution feels like monkey-patching.
Note: In some cases, we may want skewed partitioning because it is the only way to get the correct results! It may be the case if Kafka is used for event sourcing, and every consumer deals with only a subset of messages.
For example, all users whose names start with A-C are handled by the same consumer and other consumers don’t store their data. In such a situation, we need stable hashing that permanently assigns a user to a partition.
It seems strange, but a data stream and a table are the same. We may interpret a stream of updates as a table - we call it event-sourcing. At the same time, we may interpret a table as a stream - every update of the table’s content is a new event.
In Kafka, we have a concept of KTables, which are a materialized view of the updates sent to a Kafka topic. We may use them to look up the “current” state of data records by their key.
The concept of Stream-Table Duality is the basis of using Kafka Connect API to synchronize data between databases. We turn one database into a stream of data updates, transport it with Kafka, and apply the updates to another database.
The offset is a sequential id number that uniquely identifies each record within the partition. As I wrote before, Kafka keeps the offset metadata for every consumer, but the consumer may change it to an arbitrary value.
This ability allows us to reprocess parts of the data stream in case of errors. We can even use stream reprocessing as a debugging tool and replay the stream of events to see what happens.
Consumers and consumer groups
We must label all Kafka topic consumers with a consumer group. Kafka uses the consumer group to determine whether the topic partitions should be load-balanced over the consumer instances or whether all consumers get all of the data.
If multiple consumers use the same consumer group name, Kafka load-balances the topic, and every consumer within the group received messages from only a subset of partitions.
Of course, a consumer cannot get a fraction of a partition, so it is not possible to have more consumers than partitions. If we have more consumers than partitions, some of them not do anything and just wait as a backup for the working consumers.
Kafka provides a total order over messages within one partition. Because of that, messages that ended up in separate partitions may be consumed in a different order than their publication order.
If a total order of all messages within one topic is required, we must use only one partition per topic. Of course, in this case, we can have only one consumer within a consumer group.
KSQL is a SQL-like language that allows us to implement stream transformations within Kafka. It can read events from a stream and join them with events from another stream. Of course, in the case of streams, we join over a window, not the entire dataset. As a result, KSQL produces events that are published on a topic.
KSQL is NOT a part of Apache Kafka. We can install KSQL servers on a separate cluster, so failures of KSQL nodes do not affect Kafka nodes.
KSQL server consists of a KSQL engine that runs the stream processing and a REST API server that allows communication with the KSQL client.
Note that KSQL works as a continuous query system on a data stream. It is built on top of the Kafka Streams library, so it is a good fit when we have to transform, aggregate, join stream data. It is NOT a business intelligence application or a debugging tool for Kafka topics.
Apache Kafka performance tuning for big data
I have already mentioned skewed partitions, which are one of the most common causes of performance problems in Apache Kafka.
In addition to that, we should make sure that we have enough consumers to process all partitions. It makes no sense to split data into 10 partitions just to consume it with 8 consumers. Some of them will get more than one partition and lag behind the others.
In the case of producers, we must know that producers buffer events before sending them to Kafka.
It takes more time to send multiple small messages over the network than to send one large message that consists of multiple events. Because of that, we should ensure that the buffer is big enough.
Of course, buffering messages adds latency between message publication and actual transmission over the network, so it is a tradeoff between sending small messages more often or waiting to send a large one.
Remember to share on social media! If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.