In this article, I am going to describe the internals of Kafka Connect, explain how it uses the Sink and Source Connectors, and how it tracks the offsets of the messages it has processed.
The text should be useful for those of you who want to know how that project works but don’t want to spend a few hours reading its source code.
What is Kafka Connect
Kafka Connect is a tool that facilitates the usage of Kafka as the centralized data hub by providing the feature of copying the data from external systems into Kafka and propagating the messages from Kafka to external systems.
Note that, Kafka Connect only copies the data. It should never be used to do stream processing on its own. To perform any operations on the content of a Kafka topic, we should use KSQL or custom applications that read from one Kafka stream, transform the values, and write the output into another stream.
We should use the data transformation feature provided by Kafka Connect only to convert the original data format into Kafka-compatible messages.
I must mention that Kafka Connect guarantees at least-once-delivery, which means that in some cases, we may read the same information multiple times from the source system or write the same message to the destination location.
The Architecture of Kafka Connect
First, we must take a look at the essential building blocks of the Kafka Connect API. When we implement a new connector, we must provide the code of either a
SinkConnector or a
SourceConnector and an implementation or a
SinkTask or a
A Connector defines the task configuration (the name of the task implementation class and its parameters). Connectors return a collection of task configuration parameters and can notify Kafka Connect when those tasks need reconfiguration.
When we want to reconfigure the tasks, we must use the
requestTaskReconfiguration method of
ConnectorContext, which is passed as the parameter of the
Connector initialize method.
Kafka Connect manages Tasks, and we don’t need to worry about creating
Task instances. The developers must specify only the methods that read/write messages and keep track of the message offset. When Kafka Connect runs in distributed mode, the Tasks run on different machines, so the instances should be independent and not share any state.
The general advice is to use Connectors to perform a broad copy. Hence, it is better to have one Connector responsible for copying data from the whole database than to configure separate connectors for every table. Of course, the Connector implementation may divide the work between multiple Task instances.
The Herder Interface
Herder is the main interface to make changes to the Kafka Connect cluster. Both REST API and CLI use it to configure Kafka Connect. In the source code, there are two implementations of the Herder: one for the single node cluster and one for the distributed mode.
In addition to start/stop methods that start or terminate the whole cluster, the Herder implementation contains methods that update or remove the connector configuration (
deleteConnectorConfig). Surprisingly, then we look at their source code in the
StandaloneHerder class, we see that those methods not only change the configuration but also start/stop the connectors.
For me, the most interesting part of the
StandaloneHerder implementation is the
ConfigUpdateListener inner class, which reacts to connector status changes in the
ConfigBackingStore. The inner class contains the
onConnectorTargetStateChange method, which updates the internal status of the
Worker that runs the code that does all of the heavy-lifting in Kafka Connect.
The Worker Class
The Javadoc describes this class as the “container of threads that run tasks.” That sentence accurately illustrates the content of the class. When we start a new
Worker takes the canonical name of the
Connector class and uses reflection to instantiate it. After that, it passes the configuration to the class to initialize it and changes its state to
As we already know, a change of state triggers the listener, and, as a consequence, starts new
The Worker Tasks
WorkerTask consists of converters that turn message keys, values, and headers into bytes, which is a required step to send them to Kafka. In addition to those three objects, the
WorkerTask gets an instance of the
TransformationChain, which encapsulates all of the message transformers defined as Source/Sink configuration parameters. Moreover, the code passes the classes that provide access to metrics, configuration, current time, retry policy, etc.
If we create a
WorkerSourceTask, the Worker class also passes classes used to track data offset (
OffsetStorageWriter) and a
KafkaProducer. Last but not least, the instance of
WorkerSourceTask gets the
SourceTask that implements access to the data source we want to copy into Kafka.
In the case of a
WorkerSinkTask, the implementation passes the converters,
KafkaConsumer, and the implementation of
SinkTask that writes the Kafka messages into the destination.
It is worth mentioning that a
WorkerTask is also an implementation of the
Runnable interface, and its run method uses the overwritable execute method to perform its job in a thread.
Now, let’s assume that we want to copy an existing database into Kafka. For that, we need an implementation of the
SourceTask, which is internally run by a
The aforementioned execute method does three things:
checks the task status to determine whether it should pause/resume/stop execution,
calls the poll method of the
SourceTaskinstance we provided to read the next messages from the source, and
calls the internal
sendRecordsmethod to push our data into Kafka.
sendRecords method, it loops over the
SourceRecords to apply the
TransformationChain to transform the data into the structure we want to send to Kafka. After converting the data, it creates a new
ProducerRecord and writes the source partition and source offset to an
We call those things source partition and source offset, but in fact, we are storing a map of values so we can store anything we want: a file name, database URL and table, position in a file, or anything else.
Afterward, it sends the
ProducerRecords to Kafka and flushes offsets. Flushing the offsets causes both flushing the data written by
OffsetStorageWriter and calling the commit method of
SourceTask, which is supposed to acknowledge the processing of messages in the source if it is necessary.
As I mentioned in the previous paragraph, the
SourceTask not only retrieves the messages from the data origin, but it can also propagate the commit back to the source.
This feature is useful when we are dealing with message queues, and we must explicitly inform the service that we have processed the message. Doing it in the poll method is not advised because, in the case of a failure, the acknowledged messages would get lost.
Because both poll and commit methods don’t get any parameters s, the
SourceTask implementation must keep track of the processed messages on its own. The only progress tracking feature provided by Kafka Connect is passing a
SourceTaskContext into the
SinkTask, which provides access to
OffsetStorageReader and allows us to retrieve the offsets of already processed messages.
Similarly, copying the content of a Kafka topic into an external service is implemented by the
WorkerSinkTask class in its iteration method.
First, it retrieves the messages from Kafka using the
pollConsumer method. After that, it converts the message into
SinkRecord and passes it through the
TransformationChain to get the data in the format compatible with the output.
Later, it calls the
deliverMessages method to record metrics, store the offsets of messages in a collection and, of course, use the put method of the
SinkTask class to write the data into the external system.
Interestingly, the call to the
preCommit function of
SinkTask (which flushes the data) and committing the offsets of processed messages happens at the beginning of the next call to the iteration method.
SinkTask class we have three methods to care about:
put, which writes the messages to the output
flush, to force flush the data if we need it
preCommit, to inform Kafka Connect which offsets are safe to commit, by default, it flushes and returns all offsets processed during the last run of the
OffsetBackingStore and other state tracking classes
In the end, I have to mention that the developers of Apache Kafka used an ingenious solution to track the message offsets, worker status, and configuration parameters.
They have implemented Kafka-based storage that writes the data into a compacted Kafka topic. This solution not only ensures that they always have access to the last version of the parameter, but it also automatically deals with propagating the data across all Workers when Kafka Connect is running in the distributed mode.