Kafka Connect QuickStart.

understand the kaka connect.

Apache Kafka is a distributed streaming platform. If you are not aware of apache kafka then you can use below articles

Kafka Installation guide

Kafka Idempotent Consumer

Kafka Idempotent Producer

Kafka Connect, an open-source component of Apache Kafka. kafka connect is another project in Apache kafka family. As we know some common use cases of apache kafka such as consume data from some RDBMS and sink data into Hadoop. This framework is developed based on convention over configuration.

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.

https://kafka.apache.org/documentation/

Advantages of Using Kafka Connect: 

Below are some benefits of Kafka connect. 

  1. Data-Centric Pipeline
  2. Flexibility and Scalability
  3. Reusability and Extensibility
  4. Distributed and standalone modes
  5. Streaming/batch integration
  6. REST interface for better operations
  7. Automatic offset management

Kafka Connect is a tool suite for scalably and reliably streaming data between Kafka and other external systems such as databases, key-value stores, search indexes, and file systems, using so-called Connectors.

As in the diagram, you can see kafka connect generally contains three main components such as source connector, sink connector, and kafka topic. 

A source connector collects data from a source system. Source systems can be entire databases, tables, or any message brokers. A source connector could also collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency.

A sink connector reads data from Kafka topics into other systems, which might be indexed such as Elasticsearch, Hadoop or any other database.

Prerequisite: 

  •  Java8, Maven, basic knowledge of Kafka installed and running on your system.
  • Kafka version used for this guide: kafka_2.10–0.10.2.1.
  1. Download Kafka from apache kafka

Kafka Connect Example:

In this example, we will take a very simple example to demonstrate the power of kafka connect. Basic kafka connector file source connector and file sink connector. 

  1. Start Zookeeper 
  2. Start Kafka 

Create a kafka topic for our testing

$KAFKA_HOME/bin/kafka-topics.sh \
  --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic my-connect-test

Kafka Connect currently supports two modes of execution: standalone and distributed.

In standalone mode, all work is performed in a single process. This standalone mode is for development.

$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

Source Connector configurations

Provide source configuration file and this should be in your home directory

For the source connector, the reference configuration is below:

name=local-file-source-connector
connector.class=FileStreamSource
tasks.max=1
topic=my-connect-test
file=test-connector.txt
  • name – Unique name for the connector. Attempting to register again with the same name will fail.
  • connector.class – The Java class for the connector
  • tasks.max – The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
  • key.converter – (optional) Override the default key converter set by the worker.
  • value.converter – (optional) Override the default value converter set by the worker.
  • topics – A comma-separated list of topics to use as input for this connector
  • topics.regex – A Java regular expression of topics to use as input for this connector

Sink Connector Configuration

name=local-file-sink-connector
connector.class=FileStreamSink
tasks.max=1
file=test-sink-conector.txt
topics=my-connect-test

Sink connector, the reference configuration is below:

Now source and sink are ready to let us have worker configuration. Most of the properties are the same because both source and sink are file connectors. 

Worker Configuration

Worker connector, the reference configuration is below:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=20000
plugin.path=/share/java
  • bootstrap.servers – List of Kafka servers used to bootstrap connections to Kafka
  • offset.flush.interval -Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before canceling the process and restoring the offset data to be committed in a future attempt.
  • plugin.path -List of paths separated by commas (,) that contain plugins (connectors, converters, transformations).
  • key.converter – Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • value.converter – Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Below config property specific to standalone mode:

  • offset.storage.file.filename – File to store offset data in.

To check more on configuration official document

Now Everything is ready:

  1. kafka/config/connect-standalone.properties 
  2. kafka/config/local-file-source-connector.properties
  3. kafka/config/local-file-sink-connector.properties

the content of test-connector.text

{
		color: "red",
		value: "#f00"
	}
	{
		color: "green",
		value: "#0f0"
	}
	{
		color: "blue",
		value: "#00f"
	}
	{
		color: "cyan",
		value: "#0ff"
	}
	{
		color: "magenta",
		value: "#f0f"
	}
	{
		color: "yellow",
		value: "#ff0"
	}
	{
		color: "black",
		value: "#000"
	}

You can add content later in the same file and check kafka connector able to read and sink data in test-sink-conector.txt

The source connector will automatically detect the changes and publish content over kafka.

make sure to insert a newline at the end, otherwise, the source connector won’t consider the last line.

Now let us run and check the kafka connect 

$KAFKA_HOME/bin/connect-standalone.sh config/connect-standalone.propertiesconfig/local-file-source-connector.propertiesconfig/local-file-sink-connector.properties

This is the simple implementation of kafka connect.

Discover more kafka connector here:- https://www.confluent.io/hub/

Reference documents:-

https://docs.confluent.io/current/connect/quickstart.html

https://kafka.apache.org/documentation/#connect

Idempotent Kafka Consumer

In the last couple of articles, we have seen how Kafka guarantees message processing, as part of this we saw idempotent Kafka producers. Idempotence is something I appreciate, maybe the most, in data engineering. I know most of you have a better understanding of the word idempotent.

There are two questions always people do ask in the data industry. Data is the new oil.

How do we guarantee all messages are processed?

How do we avoid or handle duplicate messages?

In the last article, we have seen how producers should ensure that the “at least once” delivery semantics. The producer shouldn’t deliver duplicate messages.

simple message data flow.

Let us jump into the consumer part. Kafka provides consumer API to pull the data from kafka. When we consume or pull the data from kafka we need to specify the consumer group. Consumer group helps us to a group of consumers that coordinate to read data from a set of topic partitions. To save progress in reading data from Kafka, a consumer needs to save the offset of the next message it will read in each topic partition it is assigned to. Consumers are free to store their offsets wherever they want but by default and for all Kafka Streams applications, these are stored back in Kafka itself in an internal topic called _consumer_offsets. To use this mechanism consumers either enable automatic periodic commitment of offsets back to Kafka by setting the configuration flag enable.auto.commit to true or by making an explicit call to commit the offsets.

idempotency guarantees of consistent data and no duplicacy.

Delivery Semantics of Kafka Consumer:

There are different semantics, for more details go through the below image.

  1. At least Once: In this semantics, offset are committed after the message processed. If some exception occurs while processing so the message is not committed, so the consumer will be able to read the message again from kafka. This can leads to duplicate message processing. Make sure message processing twice won’t impact your system. 

2. At most Once: In this semantics offsets are committed as soon as the message batch is received. If the processing goes wrong the message will be lost(we are unable to read again) because we have committed the offset.

  1. The producer published the message successfully.
  2. The consumer is able to consume messages successfully and committed offset.
  3. But while processing messages some exception occurs or an error occurs, machine shutdown so we lost that message because we are unable to read the same message again due to offset were already committed. 

3. No guarantee: In this semantics, there is no guarantee of a message, which means the given message processed once, multiple times or no at all. This is a simple scenario where you will end up with these semantics is if you have a consumer with enable.auto.commit set to true (this is the default) and for each batch of messages, you async process and save the desired results into a database. The frequency of these commits is determined by the configuration parameter auto.commit.interval.ms.

4. Exactly Once: In 2007 confluent introduced exactly-once semantics. This can be achieved for kafka to kafka workflows using kafka stream API. This semantic ensures your message processing idempotent. 

By default, a consumer is at least once because of when we don’t set anything regarding offset commit then the default is auto-commit of the offset. 

Idempotent Consumer:

Kafka stream API will help us to achieve idempotent kafka consumers. Kafka Streams is a library for performing stream transformation on data from kafka. The exactly-once semantic feature was added to Kafka Streams in the 0.11.0 Kafka release. Enabling exactly once is a simple configuration change setting the streaming configuration parameter processing.guaranteeto exactly_once(the default is at_least_once). A general workload for a Kafka streams application is to read data from one or more partitions, perform some data transformations, update a state, then write some results to an output topic. When exactly-once semantics is enabled, Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics, and production to output topics altogether. If any one of these steps fail, all of the changes are rolled back.

There is another way to make it idempotent if you are not using kafka streams API.

Suppose you are pulling data from kafka and doing some processing and then persisting packet into the database. How you should guarantee data consistency using unique key i.e “id”.

There are two strategies to generate a unique id

  1. Kafka generic id:- You can take the help of kafka to generate unique id by appending simple strings like below.

 String id = record.topic()+”-“+record.partition()+”-“+record.offset();

2. Application-specific unique id logic:-

This could be based on your application this could be change based on domain and functionality with which you are dealing.

e.g.

  1. Suppose you are feeding twitter stream data then twitters feed specific id.
  2. Any unique transaction-id for any financial domain application.

Kafka Idempotent Producer!!

Kafka idempotent producer this is just the term but what exactly mean bu idempotent producer.

Let us first try to understand what is mean by an idempotent.

“Denoting an element of a set which is unchanged in value when multiplied or otherwise operated on by itself”. — Google dictionary 

Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application.

Now we know basically in HTTP verbs get is the Idempotent operation.

As seen in the above picture you can imagine, a producer will be publishing data over the broker and some times due to network error you will see a duplicate message in Kafka. When the producer sends the message to kafka topic, you can introducer duplicate message due to network error. 

kafka producer

Good request flow: Producer publish the message over kafka and kafka say I got the message and I committed and this the ack and producer listening to the ack.

In Failure case: Producer publishes the message over kafka and kafka says I got the message and I committed and this the ack and due to network error producer unable to get the ack and here is the problem. Then in the above case producer say I am going to try because I haven’t received ack from kafka. The producer publishes the same message and this makes the data duplication. 

  1. If the producer resends the message it creates duplicate data.
  2. If the producer doesn’t resend the message then message lost.

How to solve it?

Kafka provides “at least once” delivery semantics. This means that a message that is sent may be delivered one or more times. In kafka ≥0.11 released in 2017, you can configure “idempotent producer”, which won’t introducer duplicate data. To stop processing a message multiple times, it must be persisted to Kafka topic only once. During initialization, unique ID gets assigned to the producer which is called producer ID or PID.

In this flow after network failure also kafka doesn’t make duplicate data, even though the producer publishes a message multiple times till he receives the ack, because of Producer ID or PID.

using PID

To achieve this as a programmer we don’t have to do anything.

producer = Producer({'bootstrap.servers': ‘localhost:9092’,          'message.send.max.retries': 10000000,          'enable.idempotence': True})

Enable the idempotence is true and kafka producer will take care of everything for you.

message.send.max.retries= Integer.MAX_VALUE #which is really huge number

Just consider this in my mind how times you want to retry. The early Idempotent Producer was forcing max.in.flight.requests.per.connection to 1 but in the latest releases it can now be used with max.in.flight.requests.per.connection set to up to 5 and still keep its guarantees.

Idempotent delivery ensures that messages are delivered exactly once to a particular topic partition during the lifetime of a single producer.

Reference document:- 

Apache Kafka
You’re viewing documentation for an older version of Kafka – check out our current documentation here. Here is a…kafka.apache.org

Installation of Apache Kafka on Mac(OSx).

Installation and environment setup for Kafka on mac os.

To install Apache Kafka on Mac or any system, the prerequisite is the only Java(Java 8 ). First, we will look into the installation steps of Java and then we will proceed to set up Apache Kafka.

Install Java:-

Open the below link in a browserJava SE Downloads
Java SE 13.0.1 is the latest release for the Java SE Platform Learn more Looking for Oracle OpenJDK builds? Oracle…www.oracle.com

  1. Click on JDK, check the “Accept License Agreement” and download .dmg file for installation on Mac.
  2. Install the JDK on your system.
  3. Setup the Java path in your bash_profile.

You may verify the Java installation, by running the following command over a terminal.

java -version

Now you will come to know the Java version(the required version is Java8).

Install Apache Kafka using binary:-

  1. Download the latest Apache Kafka from https://kafka.apache.org/downloads under Binary downloads.

2. Click on any of the binary downloads, or choose a specific scala version if you have any dependency with scala in your development.

3. Go with the recommended mirror site and download.

4. Extract the downloaded file. Navigate to root of Apache Kafka folder and open a Terminal.

Now Kafka is ready to start.

Install Apache Kafka using Brew:-

Brew Cask, I am assuming you don’t have cask. Let us execute the below commands.

https://gist.github.com/maheshwarLigade/f626eb8b280017e92c9f6d449da6e400

$ brew tap caskroom/cask

# install JDK 8 if you have already please skip this step

$ brew cask install java8

# now install Kafka along with zookeeper service

$ brew install Kafka

There are other ways but this is the two ways most of the people will follow.

Up & Running:-

Let us start zookeeper and Kafka locally. To start Kafka you must start with the first zookeeper. If you go to the extracted Kafka content you will get below picture. In the below directory structure you can get it in “bin” folder we have all binaries and in “config” we have all the configuration files to start Zookeeper and Kafka. To start with we need server.properties and zookeeper.properties.

By default, both Kafka and ZooKeeper send all data to /tmp/data.

You can change this default path to something different if you want from zookeeper.properties.

start zookeeper

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

Once zookeeper is up and running then go and start kafka server.

$ kafka-server-start /usr/local/etc/kafka/server.properties

Above configuration file path may change based on your installtion.