How does a Kafka Cluster work?

In this article, I am going to explain the building blocks of a Kafka Cluster. I am NOT going to write about consumers, producers, connectors, and streams. Instead of that, I will focus on servers and internal processes.

However, before we begin all of that, we have to start with the concept of topics.

Partitioning

Every topic has one or many partitions. It is possible to parallelize reading from a topic by using a consumer group that consists of multiple consumer applications. Each of those applications reads from one partition of the topic.

On the other hand, when we send a message, we may add a partitioning key to the record. Kafka producer is going to calculate the hash of the partitioning key and use that value to assign the record to a partition.

Interestingly, Kafka no longer uses round-robin partitioning as the default partitioning method. Since Kafka 2.4, producers use the “sticky” partitioning strategy (org.apache.kafka.clients.producer.internals.DefaultPartitioner). Right now, Kafka picks a partition randomly and sends the whole batch to that one partition. After that, it randomly picks another partition that will receive the next batch.

Kafka Brokers

Kafka cluster consists of a set of broker servers (three or more) that are identified by a unique numeric id. Every broker can have one of two roles in the cluster. It can be either a leader broker or a follower broker (there is also a special “controller” role, that we will discuss later).

Leader brokers add new records to partitions. A Kafka producer uses the topic name and the partition identifier to determine to which broker it should send the record. We can specify the replication factor of every topic, which tells Kafka how many follower brokers should keep a replica of all partitions that belong to that topic. Of course, it makes no sense to set the number of replicas to a value higher than the number of brokers.

When a producer sends a record to the leader, the producer can specify the acknowledgment level, which describes what must happen before the server can send back a confirmation to notify the producer that the message has been successfully added to the topic.

If we set zero as the replication level, the producer sends the message and does not wait until it gets any confirmation from the Kafka broker. With the replication level set to one, the producer waits until the leader confirms that it has appended the message to the log.

We can also use the replication level “all,” which means that the min.insync.replicas parameter specifies the number of brokers that must confirm receiving the record before the producer considers it to be successfully delivered.

Note that this parameter does not influence the replication level in any way! Even if we set the acknowledgment level to zero, the leader adds the message to its log and forwards it to all replicas. The only difference is the producer’s behavior.

In-sync and out-of-sync replicas

Because multiple nodes must communicate with each other, we may get synchronization problems. Partition leaders track the synchronization between themselves and their followers by measuring the replica lag time.

By default, a replica is out of sync if it has not requested new messages in 30 seconds, or if it has not caught up to the latest message during the last 30 seconds. We may control the time by setting the replica.lag.time.max.ms parameter.

The partition leader tracks the number of in-sync replicas because replicating all of the messages across many brokers is crucial to achieving high reliability.

Because of that, if the number of in-sync replicas is lower than the min.insync.replicas parameter, the partition leader prevents any writes to that partition because it cannot guarantee the desired durability.

In the case of data replication, the leader is one of the replicas, so setting the value to 1 means that only the leader must be available. If the parameter equals to two, one follower in-sync broker is required.

Cluster controller

In addition to being a partition leader or a replica, one of the brokers in the cluster is the cluster controller. The controller broker managed partitions, replicas, and performs administrative tasks.

The process of selecting the cluster controller is surprisingly straightforward. The idea is based on the Zookeeper ephemeral nodes feature - a node exists as long as the session that created it is active.

When the Kafka cluster starts, every broker attempts to become the controller node. The broker registers as a controller in the Zookeeper, but only the first such operation can succeed. All other brokers receive a notification that a controller already exists.

When the current controller shuts down or gets disconnected from Zookeeper, the controller entry disappears, and all remaining brokers attempt to register as the new controller. Once again, only the first one is going to succeed.

Partition leader election

In a well-balanced cluster, there is no single node that acts as the leader of all partitions. Instead of that, all brokers in the cluster are leaders of some of the partitions and act as replicas of other partitions at the same time.

Such a distribution of the leader’s responsibilities helps to avoid performance issues caused by heavy load on the leader node. In addition to that, it also allows avoiding costly leader election in case of leader failure.

When the administrator creates a new partition, the cluster controller assigns the leadership of this partition to one of the brokers in a way that ensures uniform distribution of partitions across the cluster.

We must remember that if the leader broker disconnects from the cluster and then reconnects again, it has only one chance to become the partition leader again. If there are no out-of-sync replicas when the ex-leader reconnects, it can become the partition leader again. If there are such replicas, the current leader remains the partition leader.

We may enable automatic rebalancing of the cluster, which allows the ex-leader to become the partition leader later when all of the replicas are in-sync again by setting the auto.leader.rebalance.enable parameter. Since 2017, the usage of this parameter was not recommended due to a bug. In some configurations, the cluster tried to rebalance all partitions at once and ended up in a permanent out-of-sync state.

Even in 2020, there is an ongoing discussion in the Jira ticket about this feature, so perhaps some future version of Kafka will offer a working leader rebalancing feature. Before it happens, we must monitor the state of the cluster and use the kafka-reassign-partitions.sh script manually (or automate executing that script if you feel lucky).

Sources

Older post

Athena performance tips explained

How to use query execution plans to speed up Athena queries

Newer post

What is Kafka log compaction, and how does it work?

How the log compaction is implemented in Apache Kafka and how to configure Kafka log compaction properly