Micronaut Kafka Consumer Producer example.

micronaut

Micronaut Kafka consumer and producer example.

Micronaut is a java framework and it’s been popular to develop microservice-based applications because of lower memory footprint and fast startup.

In this article, we will see how to write down simple Kafka consumers and producers using the micronaut framework. 

You can read my articles on micronaut framework on https://www.techwasti.com/

Start generating project using https://micronaut.io/launch/

You can create a project either using the launch site or using CLI tool.

$ mn create-app techwastikafkaexample --features kafka

Micronaut version for this demo is 2.0.0 and Java 8.

generating project and adding Kafka profile CLI provides powerful option such as;

$ mn create-app techwasti-kafka-service --profile kafka

Prerequisites:– 

  1. Java programming
  2. Kafka
  3. Micronaut

I am assuming the people. You know about this if you don’t know then learn it.

Micronaut features dedicated support for defining both Kafka Producer and Consumer instances.

Kafka Producer:-

We will create one simple Producer using annotation.

package com.techwasti.kafkaex;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaClient
public interface GreetMessageClient {

    // greet is a kafka topic
    @Topic("greet")
    void sendGreetMessage(@KafkaKey String day, String message);

    void sendGreetMessage(@Topic String topic, @KafkaKey String day, String message);
}

@KafkaClient annotation is used to mark this is the kafka client.

@Topic annotation is indicated, on which topic message should get published. 

@Kafkakey annotation is to have a message key.

In the above code snippet we have defined two methods:

  1. In the first method accepting two arguments key and value and topic name is annotated using @Topic. 
  2. In the second method instead of annotating the topic name, we are accepting the topic name in the argument. 

If you omit the @KafkaKey then it’s null.

As we are aware of the beauty of micronaut framework, it will produce an implementation of the above client interface. We can retrieve this instance either by looking up the bean from ApplicationContext or by injecting the bean using @Inject. 

GreetMessageClient client = applicationContext.getBean(GreetMessageClient.class); client.sendProduct("Thursday", "Good morning");

Now our producer is ready and we sent a successful message as well.

Let us create Kafka Consumer.

Kafka Consumer:-

As we have seen a couple of annotations to create a producer and produce a message over a topic. The same way we have @KafkaListener annotation to create a kafka consumer.

package com.techwasti.kafkaex;

import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class GreetMessageConsumer {

    @Topic("greet")
    public void receive(@KafkaKey String day, String message) {
        System.out.println("Got Message for the  - " + day + " and Message is  " + message);
    }
}

@KafkaListener is used to indicate this is kafka consumer and while reading a message from the topic “greet” and offset should be earliest this will start reading a message from the start.

receive method having two arguments one is key and another one is the message. 

This is a simple example of having kafka consumers and producers. 

Advanced Options for producer and consumer:

@Header: To add a header to kafka message. 

When we want to add some header into the kafka producer when we produce a message, let us say we want to add authentication token while publishing message over kafka in this case 

e.g.

@Header(name = “JWT-Token”, value = “${my.authentication.token}”)

Also, you can pass the header as a method argument the same as a topic name. 

@Body: to explicitly indicate the message body.

Generally, the value sent by the producer resolved using @Body annotation only but if we haven’t mentioned it then the first argument resolved as message body. 

e.g 

@Topic(“greet”)
void sendGreetMessage(@KafkaKey String day, String message);

or

@Topic(“greet”)
void sendGreetMessage(@KafkaKey String day, @Body String message);

For more such things please visit micronaut documentation.

Reference:

https://micronaut-projects.github.io/micronaut-kafka/latest/guide/