How does Kafka Connect work?

In this article, I am going to describe the internals of Kafka Connect, explain how it uses the Sink and Source Connectors, and how it tracks the offsets of the messages it has processed.

The text should be useful for those of you who want to know how that project works but don’t want to spend a few hours reading its source code.

What is Kafka Connect

Kafka Connect is a tool that facilitates the usage of Kafka as the centralized data hub by providing the feature of copying the data from external systems into Kafka and propagating the messages from Kafka to external systems.

Note that, Kafka Connect only copies the data. It should never be used to do stream processing on its own. To perform any operations on the content of a Kafka topic, we should use KSQL or custom applications that read from one Kafka stream, transform the values, and write the output into another stream.

We should use the data transformation feature provided by Kafka Connect only to convert the original data format into Kafka-compatible messages.

I must mention that Kafka Connect guarantees at least-once-delivery, which means that in some cases, we may read the same information multiple times from the source system or write the same message to the destination location.

The Architecture of Kafka Connect

First, we must take a look at the essential building blocks of the Kafka Connect API. When we implement a new connector, we must provide the code of either a SinkConnector or a SourceConnector and an implementation or a SinkTask or a SourceTask.

A Connector defines the task configuration (the name of the task implementation class and its parameters). Connectors return a collection of task configuration parameters and can notify Kafka Connect when those tasks need reconfiguration.

When we want to reconfigure the tasks, we must use the requestTaskReconfiguration method of ConnectorContext, which is passed as the parameter of the Connector initialize method.

Kafka Connect manages Tasks, and we don’t need to worry about creating Task instances. The developers must specify only the methods that read/write messages and keep track of the message offset. When Kafka Connect runs in distributed mode, the Tasks run on different machines, so the instances should be independent and not share any state.

The general advice is to use Connectors to perform a broad copy. Hence, it is better to have one Connector responsible for copying data from the whole database than to configure separate connectors for every table. Of course, the Connector implementation may divide the work between multiple Task instances.

The Herder Interface

The Herder is the main interface to make changes to the Kafka Connect cluster. Both REST API and CLI use it to configure Kafka Connect. In the source code, there are two implementations of the Herder: one for the single node cluster and one for the distributed mode.

In addition to start/stop methods that start or terminate the whole cluster, the Herder implementation contains methods that update or remove the connector configuration (putConnectorConfig and deleteConnectorConfig). Surprisingly, then we look at their source code in the StandaloneHerder class, we see that those methods not only change the configuration but also start/stop the connectors.

For me, the most interesting part of the StandaloneHerder implementation is the ConfigUpdateListener inner class, which reacts to connector status changes in the ConfigBackingStore. The inner class contains the onConnectorTargetStateChange method, which updates the internal status of the Worker that runs the code that does all of the heavy-lifting in Kafka Connect.

The Worker Class

The Javadoc describes this class as the “container of threads that run tasks.” That sentence accurately illustrates the content of the class. When we start a new Connector, the Worker takes the canonical name of the Connector class and uses reflection to instantiate it. After that, it passes the configuration to the class to initialize it and changes its state to STARTED.

As we already know, a change of state triggers the listener, and, as a consequence, starts new WorkerTasks.

The Worker Tasks

A WorkerTask consists of converters that turn message keys, values, and headers into bytes, which is a required step to send them to Kafka. In addition to those three objects, the WorkerTask gets an instance of the TransformationChain, which encapsulates all of the message transformers defined as Source/Sink configuration parameters. Moreover, the code passes the classes that provide access to metrics, configuration, current time, retry policy, etc.

If we create a WorkerSourceTask, the Worker class also passes classes used to track data offset (CloseableOffsetStorageReader and OffsetStorageWriter) and a KafkaProducer. Last but not least, the instance of WorkerSourceTask gets the SourceTask that implements access to the data source we want to copy into Kafka.

In the case of a WorkerSinkTask, the implementation passes the converters, TransformationChain, KafkaConsumer, and the implementation of SinkTask that writes the Kafka messages into the destination.

It is worth mentioning that a WorkerTask is also an implementation of the Runnable interface, and its run method uses the overwritable execute method to perform its job in a thread.

WorkerSourceTask

Now, let’s assume that we want to copy an existing database into Kafka. For that, we need an implementation of the SourceTask, which is internally run by a WorkerSourceTask.

The aforementioned execute method does three things:

  • checks the task status to determine whether it should pause/resume/stop execution,

  • calls the poll method of the SourceTask instance we provided to read the next messages from the source, and

  • calls the internal sendRecords method to push our data into Kafka.

Inside the sendRecords method, it loops over the SourceRecords to apply the TransformationChain to transform the data into the structure we want to send to Kafka. After converting the data, it creates a new ProducerRecord and writes the source partition and source offset to an OffsetStorageWriter.

We call those things source partition and source offset, but in fact, we are storing a map of values so we can store anything we want: a file name, database URL and table, position in a file, or anything else.

Afterward, it sends the ProducerRecords to Kafka and flushes offsets. Flushing the offsets causes both flushing the data written by OffsetStorageWriter and calling the commit method of SourceTask, which is supposed to acknowledge the processing of messages in the source if it is necessary.

SourceTask

As I mentioned in the previous paragraph, the SourceTask not only retrieves the messages from the data origin, but it can also propagate the commit back to the source.

This feature is useful when we are dealing with message queues, and we must explicitly inform the service that we have processed the message. Doing it in the poll method is not advised because, in the case of a failure, the acknowledged messages would get lost.

Because both poll and commit methods don’t get any parameters s, the SourceTask implementation must keep track of the processed messages on its own. The only progress tracking feature provided by Kafka Connect is passing a SourceTaskContext into the SinkTask, which provides access to OffsetStorageReader and allows us to retrieve the offsets of already processed messages.

WorkerSinkTask

Similarly, copying the content of a Kafka topic into an external service is implemented by the WorkerSinkTask class in its iteration method.

First, it retrieves the messages from Kafka using the pollConsumer method. After that, it converts the message into SinkRecord and passes it through the TransformationChain to get the data in the format compatible with the output.

Later, it calls the deliverMessages method to record metrics, store the offsets of messages in a collection and, of course, use the put method of the SinkTask class to write the data into the external system.

Interestingly, the call to the preCommit function of SinkTask (which flushes the data) and committing the offsets of processed messages happens at the beginning of the next call to the iteration method.

SinkTask

In the SinkTask class we have three methods to care about:

  • put, which writes the messages to the output

  • flush, to force flush the data if we need it

  • preCommit, to inform Kafka Connect which offsets are safe to commit, by default, it flushes and returns all offsets processed during the last run of the WorkerSinkTask iteration method.

OffsetBackingStore and other state tracking classes

In the end, I have to mention that the developers of Apache Kafka used an ingenious solution to track the message offsets, worker status, and configuration parameters.

They have implemented Kafka-based storage that writes the data into a compacted Kafka topic. This solution not only ensures that they always have access to the last version of the parameter, but it also automatically deals with propagating the data across all Workers when Kafka Connect is running in the distributed mode.

Sources

Older post

Why my Airflow tasks got stuck in "no_status" and how I fixed it

A story about debugging an Airflow DAG that was not starting tasks

Newer post

The problem with software testing in data engineering

Why data engineers don't write unit tests?