What is Kafka log compaction, and how does it work?

In this article, I am going to explain the concept of log compaction in Apache Kafka. After that, I describe the configuration parameters related to log compaction. In the end, I am going to look at the Kafka source code and explain how the log compaction works under the hood.

What is Kafka log compaction?

When we send a message to a Kafka topic, we have to specify the key and the content of the message. Kafka uses the key to select the partition which stores the message. It is possible because Kafka calculates the hashcode of the provided key.

A hashcode of a constant value always remains the same. Therefore, if we send multiple messages with the same key, all of them end up in the same partition. Because of that, log compaction happens within a single Kafka broker (+ partition replicas) and does not require coordination with other nodes of the Kafka cluster.

Periodically, the Kafka broker scans the log to remove old messages. In general, Kafka removes messages when they are too old or when the log contains too much data. In the Kafka configuration, we can specify two parameters log.retention.ms, which configures the time-based retention policy and log.retention.bytes that controls the log size-based retention.

Log compaction is a separate kind of retention policy that we can enable in addition to the feature that removes old messages. It allows limiting the size of the Kafka log by overwriting old messages with their new version.

According to the Kafka documentation, it retains at least the last known value for each message key within the log for a single topic partition. Later, in the “how does log compaction work” section, I am going to explain why we get “at least the last known value,” instead of “only the last value.”

How to identify message updates

While doing the log compaction, Kafka identifies the new versions of messages by comparing the message key and the offset. If we send multiple messages with the same key to a topic that configured to use compaction, Kafka retains only the message with the highest offset.

Example

Assume that the Kafka log contains five such messages in this order:

key: 1234, value: version_1, offset: 1
key: 5678, value: version_2, offset: 2
key: 1234, value: version_3, offset: 3
key: 1234, value: version_4, offset: 4
key: 5678, value: version_5, offset: 5

When Kafka executes log compaction, we are guaranteed to still have messages those messages in the log:

key: 1234, value: version_4, offset: 4
key: 5678, value: version_5, offset: 5

In some cases, the log may still contain the old versions of messages in addition to the latest ones. It happens because we are promised to get “at least the last known value.”

Note that the order of messages and their offsets never changes! Also, we can still request the message with offset 3, but instead of it, Kafka returns the next existing offset, and we get the message with offset 4.

Null values

It is possible to remove a message from a Kafka topic by sending a new message with the key of the message we want to delete and null content. In this case, Kafka log compaction overwrites the existing version with null.

Such deletion markers are a special case in Kafka code because the message with null content does not stay in the topic forever. When the time defined by the delete.retention.ms parameter passes, Kafka is going to remove the deletion marker from the topic even if it is the most recent version of the event identified by the given key.

Use cases

We can use Kafka log compaction to maintain a full snapshot of the final value for every key while implementing an event sourcing application.

In addition to that, we can use it to keep a backup copy of data processed by downstream consumers. In case of a consumer failure, we can restore its state by replaying the snapshot from a Kafka topic.

Such an implementation is used in the Apache Samza framework to keep a backup copy of its in-memory database. Apache Samza uses the LevelDB database for state management when executing joins and aggregates. Because LevelDB is an in-memory database, Samza flushes data changes to a Kafka topic and uses it for data recovery after a failure.

Last but not least, the log compaction feature lets us fulfill GDPR requirements while using Kafka. GDPR was one of the reasons why the maximum log compaction lag parameter was added to Kafka (KIP-354). Now, we can overwrite personal data with a null and be sure that after some time, Kafka permanently removes the data from the disc.

How to configure it?

First, we have to configure the log retention policy by setting the log.cleanup.policy parameter. It is set to delete by default. We may replace the default policy with compact or use both by settings its value to: compact, delete.

When we have log compaction enabled, we also should specify four more parameters:

  • min.compaction.lag.ms - the minimal time that has to pass before the message can be compacted. That is one of the reasons why we cannot expect to see only the most recent version of the message in the topic.

  • min.cleanable.dirty.ratio - the minimal percentage (expressed as a ratio) of the partition log that must be “dirty” before Kafka attempts to compact messages.

  • max.compaction.lag.ms - the maximum delay between the time a message is written and the time the message becomes eligible for compaction. This configuration parameter overwrites min.cleanable.dirty.ratio and forces a log segment to become compactable even if the “dirty ratio” is lower than the threshold.

  • delete.retention.ms - how long to retain deletion markers in the cleaned log if the empty content (null) is the most recent version of a given message key.

A few words of caution regarding delete.retention.ms

If we send a message with null content to remove it from the topic, the segment of the topic that holds the message gets compacted, and the delete retention time passes, the deletion marker gets removed from the log. Because of that, slow consumers may miss the deletion marker!

We may end up with an inconsistent state across many applications if the consumer uses the deletion marker to remove data from its database.

How does log compaction work?

Every topic partition consists of log segments. Log compaction ignores the segments that have been previously cleaned, the active segment of the log, and all segments which contain at least one message with the timestamp within the minimal compaction lag. Every other segment of the log can be compacted.

The ignored segments are another reason why the topic may contain more versions of the events assigned to one key. The newest version may be in a segment that is not eligible for compaction yet, and Kafka is just removing some of the older versions.

Log compaction is implemented in the LogCleaner class.

The cleanFilthiestLog method of a CleanerThread instance calls the grabFilthiestCompactedLog method of LogCleanermanager to get the dirtiest log segment. It is the log segment that has the highest ratio of the number of cleanable bytes to the total number of bytes.

After that, the thread builds an OffsetMap. An OffsetMap is a data structure that contains every key within a log segment and the highest offset of that key.

The segment and its OffsetMap is passed to the cleanSegments function. The function creates a new segment and iterates over messages within the segment given as a parameter.

For every message, it checks whether the message offset matches the offset of the most recent occurrence of the message key. If OffsetMap indicates that there is a message with the same key and higher offset, the currently processed message is ignored. Otherwise, the message is copied to the new segment.

In the end, the cleanSegments method flushes the replacement segment to the disc and replaces the dirty segment of the log with the newly created cleaned one.

Sources

Log Compaction section of the Apache Kafka documentation

State Management documentation of Apache Samza

KIP-354 Max Log Compaction Lag

Kafka Topic Configuration

LogCleaner source code

How does a Kafka Cluster work?

Older post

How does a Kafka Cluster work?

What is the difference between a leader and a replica broker? What is the cluster controller? How is the controller elected?

Newer post

Why my Airflow tasks got stuck in "no_status" and how I fixed it

A story about debugging an Airflow DAG that was not starting tasks