Micronaut Kafka Consumer Producer example.

Micronaut Kafka consumer and producer example.

Micronaut is a java framework and it’s been popular to develop microservice-based applications because of lower memory footprint and fast startup.

In this article, we will see how to write down simple Kafka consumers and producers using the micronaut framework. 

You can read my articles on micronaut framework on https://www.techwasti.com/

Start generating project using https://micronaut.io/launch/

You can create a project either using the launch site or using CLI tool.

$ mn create-app techwastikafkaexample --features kafka

Micronaut version for this demo is 2.0.0 and Java 8.

generating project and adding Kafka profile CLI provides powerful option such as;

$ mn create-app techwasti-kafka-service --profile kafka

Prerequisites:– 

  1. Java programming
  2. Kafka
  3. Micronaut

I am assuming the people. You know about this if you don’t know then learn it.

Micronaut features dedicated support for defining both Kafka Producer and Consumer instances.

Kafka Producer:-

We will create one simple Producer using annotation.

package com.techwasti.kafkaex;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaClient
public interface GreetMessageClient {

    // greet is a kafka topic
    @Topic("greet")
    void sendGreetMessage(@KafkaKey String day, String message);

    void sendGreetMessage(@Topic String topic, @KafkaKey String day, String message);
}

@KafkaClient annotation is used to mark this is the kafka client.

@Topic annotation is indicated, on which topic message should get published. 

@Kafkakey annotation is to have a message key.

In the above code snippet we have defined two methods:

  1. In the first method accepting two arguments key and value and topic name is annotated using @Topic. 
  2. In the second method instead of annotating the topic name, we are accepting the topic name in the argument. 

If you omit the @KafkaKey then it’s null.

As we are aware of the beauty of micronaut framework, it will produce an implementation of the above client interface. We can retrieve this instance either by looking up the bean from ApplicationContext or by injecting the bean using @Inject. 

GreetMessageClient client = applicationContext.getBean(GreetMessageClient.class); client.sendProduct("Thursday", "Good morning");

Now our producer is ready and we sent a successful message as well.

Let us create Kafka Consumer.

Kafka Consumer:-

As we have seen a couple of annotations to create a producer and produce a message over a topic. The same way we have @KafkaListener annotation to create a kafka consumer.

package com.techwasti.kafkaex;

import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class GreetMessageConsumer {

    @Topic("greet")
    public void receive(@KafkaKey String day, String message) {
        System.out.println("Got Message for the  - " + day + " and Message is  " + message);
    }
}

@KafkaListener is used to indicate this is kafka consumer and while reading a message from the topic “greet” and offset should be earliest this will start reading a message from the start.

receive method having two arguments one is key and another one is the message. 

This is a simple example of having kafka consumers and producers. 

Advanced Options for producer and consumer:

@Header: To add a header to kafka message. 

When we want to add some header into the kafka producer when we produce a message, let us say we want to add authentication token while publishing message over kafka in this case 

e.g.

@Header(name = “JWT-Token”, value = “${my.authentication.token}”)

Also, you can pass the header as a method argument the same as a topic name. 

@Body: to explicitly indicate the message body.

Generally, the value sent by the producer resolved using @Body annotation only but if we haven’t mentioned it then the first argument resolved as message body. 

e.g 

@Topic(“greet”)
void sendGreetMessage(@KafkaKey String day, String message);

or

@Topic(“greet”)
void sendGreetMessage(@KafkaKey String day, @Body String message);

For more such things please visit micronaut documentation.

Reference:

https://micronaut-projects.github.io/micronaut-kafka/latest/guide/

Kafka Connect QuickStart.

understand the kaka connect.

Apache Kafka is a distributed streaming platform. If you are not aware of apache kafka then you can use below articles

Kafka Installation guide

Kafka Idempotent Consumer

Kafka Idempotent Producer

Kafka Connect, an open-source component of Apache Kafka. kafka connect is another project in Apache kafka family. As we know some common use cases of apache kafka such as consume data from some RDBMS and sink data into Hadoop. This framework is developed based on convention over configuration.

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.

https://kafka.apache.org/documentation/

Advantages of Using Kafka Connect: 

Below are some benefits of Kafka connect. 

  1. Data-Centric Pipeline
  2. Flexibility and Scalability
  3. Reusability and Extensibility
  4. Distributed and standalone modes
  5. Streaming/batch integration
  6. REST interface for better operations
  7. Automatic offset management

Kafka Connect is a tool suite for scalably and reliably streaming data between Kafka and other external systems such as databases, key-value stores, search indexes, and file systems, using so-called Connectors.

As in the diagram, you can see kafka connect generally contains three main components such as source connector, sink connector, and kafka topic. 

A source connector collects data from a source system. Source systems can be entire databases, tables, or any message brokers. A source connector could also collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency.

A sink connector reads data from Kafka topics into other systems, which might be indexed such as Elasticsearch, Hadoop or any other database.

Prerequisite: 

  •  Java8, Maven, basic knowledge of Kafka installed and running on your system.
  • Kafka version used for this guide: kafka_2.10–0.10.2.1.
  1. Download Kafka from apache kafka

Kafka Connect Example:

In this example, we will take a very simple example to demonstrate the power of kafka connect. Basic kafka connector file source connector and file sink connector. 

  1. Start Zookeeper 
  2. Start Kafka 

Create a kafka topic for our testing

$KAFKA_HOME/bin/kafka-topics.sh \
  --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic my-connect-test

Kafka Connect currently supports two modes of execution: standalone and distributed.

In standalone mode, all work is performed in a single process. This standalone mode is for development.

$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

Source Connector configurations

Provide source configuration file and this should be in your home directory

For the source connector, the reference configuration is below:

name=local-file-source-connector
connector.class=FileStreamSource
tasks.max=1
topic=my-connect-test
file=test-connector.txt
  • name – Unique name for the connector. Attempting to register again with the same name will fail.
  • connector.class – The Java class for the connector
  • tasks.max – The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
  • key.converter – (optional) Override the default key converter set by the worker.
  • value.converter – (optional) Override the default value converter set by the worker.
  • topics – A comma-separated list of topics to use as input for this connector
  • topics.regex – A Java regular expression of topics to use as input for this connector

Sink Connector Configuration

name=local-file-sink-connector
connector.class=FileStreamSink
tasks.max=1
file=test-sink-conector.txt
topics=my-connect-test

Sink connector, the reference configuration is below:

Now source and sink are ready to let us have worker configuration. Most of the properties are the same because both source and sink are file connectors. 

Worker Configuration

Worker connector, the reference configuration is below:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=20000
plugin.path=/share/java
  • bootstrap.servers – List of Kafka servers used to bootstrap connections to Kafka
  • offset.flush.interval -Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before canceling the process and restoring the offset data to be committed in a future attempt.
  • plugin.path -List of paths separated by commas (,) that contain plugins (connectors, converters, transformations).
  • key.converter – Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • value.converter – Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Below config property specific to standalone mode:

  • offset.storage.file.filename – File to store offset data in.

To check more on configuration official document

Now Everything is ready:

  1. kafka/config/connect-standalone.properties 
  2. kafka/config/local-file-source-connector.properties
  3. kafka/config/local-file-sink-connector.properties

the content of test-connector.text

{
		color: "red",
		value: "#f00"
	}
	{
		color: "green",
		value: "#0f0"
	}
	{
		color: "blue",
		value: "#00f"
	}
	{
		color: "cyan",
		value: "#0ff"
	}
	{
		color: "magenta",
		value: "#f0f"
	}
	{
		color: "yellow",
		value: "#ff0"
	}
	{
		color: "black",
		value: "#000"
	}

You can add content later in the same file and check kafka connector able to read and sink data in test-sink-conector.txt

The source connector will automatically detect the changes and publish content over kafka.

make sure to insert a newline at the end, otherwise, the source connector won’t consider the last line.

Now let us run and check the kafka connect 

$KAFKA_HOME/bin/connect-standalone.sh config/connect-standalone.propertiesconfig/local-file-source-connector.propertiesconfig/local-file-sink-connector.properties

This is the simple implementation of kafka connect.

Discover more kafka connector here:- https://www.confluent.io/hub/

Reference documents:-

https://docs.confluent.io/current/connect/quickstart.html

https://kafka.apache.org/documentation/#connect

Installation of Apache Kafka on Mac(OSx).

Installation and environment setup for Kafka on mac os.

To install Apache Kafka on Mac or any system, the prerequisite is the only Java(Java 8 ). First, we will look into the installation steps of Java and then we will proceed to set up Apache Kafka.

Install Java:-

Open the below link in a browserJava SE Downloads
Java SE 13.0.1 is the latest release for the Java SE Platform Learn more Looking for Oracle OpenJDK builds? Oracle…www.oracle.com

  1. Click on JDK, check the “Accept License Agreement” and download .dmg file for installation on Mac.
  2. Install the JDK on your system.
  3. Setup the Java path in your bash_profile.

You may verify the Java installation, by running the following command over a terminal.

java -version

Now you will come to know the Java version(the required version is Java8).

Install Apache Kafka using binary:-

  1. Download the latest Apache Kafka from https://kafka.apache.org/downloads under Binary downloads.

2. Click on any of the binary downloads, or choose a specific scala version if you have any dependency with scala in your development.

3. Go with the recommended mirror site and download.

4. Extract the downloaded file. Navigate to root of Apache Kafka folder and open a Terminal.

Now Kafka is ready to start.

Install Apache Kafka using Brew:-

Brew Cask, I am assuming you don’t have cask. Let us execute the below commands.

https://gist.github.com/maheshwarLigade/f626eb8b280017e92c9f6d449da6e400

$ brew tap caskroom/cask

# install JDK 8 if you have already please skip this step

$ brew cask install java8

# now install Kafka along with zookeeper service

$ brew install Kafka

There are other ways but this is the two ways most of the people will follow.

Up & Running:-

Let us start zookeeper and Kafka locally. To start Kafka you must start with the first zookeeper. If you go to the extracted Kafka content you will get below picture. In the below directory structure you can get it in “bin” folder we have all binaries and in “config” we have all the configuration files to start Zookeeper and Kafka. To start with we need server.properties and zookeeper.properties.

By default, both Kafka and ZooKeeper send all data to /tmp/data.

You can change this default path to something different if you want from zookeeper.properties.

start zookeeper

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

Once zookeeper is up and running then go and start kafka server.

$ kafka-server-start /usr/local/etc/kafka/server.properties

Above configuration file path may change based on your installtion.