Tensorflow2.0 HelloWorld using google colab.

In this article, we use the most popular deep learning framework TensorFlow and we will take a basic hello world example to do this example you no need to set up a local environment on your machine. 

Image result for tensorflow
Tensorflow.org

We are using google Colab If you are not aware of what it is? here you go and check out my article on the same Colab getting started!!
Train deep neural network free using google colaboratory.medium.com

Now visit https://colab.research.google.com/ and you will see 

Brief About Colab:

Once you opened the Colab and if you are already logged in Gmail account. 

The google colab is available with zero configuration and free access to GPU and the best part is it sharable. The Google Collaboration is free service for the developers to try TensorFlow on CPU and GPU over the cloud instance of Google. This service is totally free for improving Python programming skills, developers can log in with their Google Gmail account and connect to this service. Here developers can try deep learning applications using popular machine learning libraries such as Keras, TensorFlow, PyTorch, OpenCV & others.

Sign in to google colab and create a new notebook for our HelloWorld example.

Go to File → New NoteBook(Google sign-in is required) → 

Now new notebook is ready we want to use TF2.0.0 for our example so let us first install TensorFlow 2.0.0 is already released as a production version. For installing TensorFlow2.0.0 run the following command.

!pip install tensorflow==2.0.0

After a successful installation, we can verify the installed version.

import tensorflow as tf
print(tf.__version__)

Helloworld example:

Now everything is ready and looking promising. We have installed TensorFlow and verified versions too. Now let us look at helicopter overview and create a hello world example. 

To change Runtime: Click on Runtime →Change Runtime Type → one popup will open choose perticular runtime and hardware accelrator such as GPU and TPU.

There are a lot of changes that are there in TF1.0 and TF 2.0.0 TF comes with the ease of development less coding it needs in this version of TF2.0.0. TensorFlow 2.0.0 is developed to remove the issues and complexity of previous versions. 

In the TF 2.0 eager execution is enabled by default.

The eager execution mode evaluates the program immediately and without building the graph. The eager execution mode operation returns the concrete value instead of constructing a computational graph and then execute the program.

We will use the same Hello world code from tensorflow 1.x version for this and let us observe the output.

#This code snippet is from tensorflow 1.X version
import tensorflow as tf

msg = tf.constant('Hello and welcome to Tensorflow world')

#session
sess = tf.Session()

#print the message
print(sess.run(msg))

In this example, we are using Tensorflow 1.X.X version code to print the message, but Session has been removed in TF2.0.0 this will cause the exception i.e

AttributeError: module 'tensorflow' has no attribute 'Session'

We will use the same above code snippet by removing the Session

import tensorflow as tf

msg = tf.constant('Hello and welcome to Tensorflow world')

#print the message
print(msg)

#print using tf.print()
tf.print(msg)

Here we have two print statement observe output for both print:

  1. tf.Tensor(b’Hello and welcome to Tensorflow world’, shape=(), dtype=string) 
  2. Hello and welcome to Tensorflow world.

This is it, for now, we will start exploring different API of TF in the next article.

Code: 

Code is available over github you can directly import that in colab and run it.

https://github.com/maheshwarLigade/GoogleColab/blob/master/HelloWorldTF2_0.ipynb

More Articles on Tensorflows:

https://medium.com/analytics-vidhya/optimization-techniques-tflite-5f6d9ae676d5

https://medium.com/analytics-vidhya/tensorflow-lite-converter-dl-example-febe804b8673

https://medium.com/techwasti/tensorflow-lite-machine-learning-at-the-edge-26e8421ae661

https://medium.com/techwasti/dynamic-computation-graphs-dcg-with-tensorflow-fold-33638b2d5754

https://medium.com/techwasti/tensorflow-lite-deployment-523eec79c017

Monitor spring boot app using Spring Boot Admin.

Administration of spring boot applications using spring boot admin.

This includes health status, various metrics, log level management, JMX-Beans interaction, thread dumps and traces, and much more. Spring Boot Admin is a community project initiated and maintained by code-centric.

Spring boot admin will provide UI to monitor and do some administrative work for your spring boot applications.

This project has been started by codecentric and its open source. You can do your own customization if you want to.

Git Repo:

https://github.com/codecentric/spring-boot-admin

The above video will give you a better idea of what is this project, so we will directly start with an example.

Spring Boot provides actuator endpoints to monitor metrics of individual microservices. These endpoints are very helpful for getting information about applications like if they are up if their components like database etc are working well. But a major drawback or difficulty about using actuator endpoints is that we have to individually hit the endpoints for applications to know their status or health. Imagine microservices involving 150 applications, the admin will have to hit the actuator endpoints of all 150 applications. To help us to deal with this situation we are using Spring Boot Admin app.

Sample Code:

To implement this we will create two projects one is server and another is the client.

  1. Spring Boot Admin server.
  2. Spring Boot Admin client.

Spring Boot Admin Server:

The project structure should look like any spring boot application:

POM.xml 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.techwasti</groupId>
	<artifactId>spring-boot-admin</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring-boot-admin</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.4.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
<!-- admin dependency-->
		<dependency>
			<groupId>de.codecentric</groupId>
			<artifactId>spring-boot-admin-server-ui-login</artifactId>
			<version>1.5.1</version>
		</dependency>
		<dependency>
			<groupId>de.codecentric</groupId>
			<artifactId>spring-boot-admin-server</artifactId>
			<version>1.5.1</version>
		</dependency>
		<dependency>
			<groupId>de.codecentric</groupId>
			<artifactId>spring-boot-admin-server-ui</artifactId>
			<version>1.5.1</version>
		</dependency>
<!-- end admin dependency-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-security</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

We need to configure security as well since we are accessing sensitive information:

package com.techwasti;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;

import de.codecentric.boot.admin.config.EnableAdminServer;

@EnableAdminServer
@Configuration
@SpringBootApplication
public class SpringBootAdminApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootAdminApplication.class, args);
	}

	@Configuration
	public static class SecurityConfig extends WebSecurityConfigurerAdapter {
		@Override
		protected void configure(HttpSecurity http) throws Exception {
			http.formLogin().loginPage("/login.html").loginProcessingUrl("/login").permitAll();
			http.logout().logoutUrl("/logout");
			http.csrf().disable();

			http.authorizeRequests().antMatchers("/login.html", "/**/*.css", "/img/**", "/third-party/**").permitAll();
			http.authorizeRequests().antMatchers("/**").authenticated();

			http.httpBasic();
		}
	}

}

application.propertie file content

spring.application.name=SpringBootAdminEx
server.port=8081
security.user.name=admin
security.user.password=admin

Run the app and localhost:8081


Enter username and password and click on button login.

As this is a sample example so we hardcoded username and password but you can use spring security to integrate LDAP or any other security.

Spring Boot Admin can be configured to display only the information that we consider useful.

spring.boot.admin.routes.endpoints=env, metrics, trace, info, configprops

Notifications and Alerts:

We can notify and send alerts using any below channels.

  • Email
  • PagerDuty
  • OpsGenie
  • Hipchat
  • Slack
  • Let’s Chat

Spring Boot Admin Client:

Now we are ready with the admin server application let us create the client application. Create any HelloWorld spring boot application or if you have any existing spring boot app you can use the same as a client application.

Add below Maven dependency 

<dependency>
			<groupId>de.codecentric</groupId>
			<artifactId>spring-boot-admin-starter-client</artifactId>
			<version>1.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

Next, update application.properties and add the following properties

spring.boot.admin.url=http://localhost:8081
spring.boot.admin.username=admin
spring.boot.admin.password=admin

These changes are fine in your client application now run the client application. Once the client application is up and running go and check your admin server application. It will show all your applications.

Beautiful Dashboards:

References:-

codecentric/spring-boot-admin
This community project provides an admin interface for Spring Boot ® applications. It provides the following features…github.com
Spring Boot

Admin Reference Guide
@Configuration @EnableAutoConfiguration @EnableDiscoveryClient @EnableAdminServer public class…codecentric.github.io

Spring Cloud function Helloworld on AWS and local!!

In this article, we’ll learn how to use Spring Cloud Function. Serverless function using spring cloud function.

A serverless function is a modern industry buzzword and the reason behind that is boost in cloud computing.

credit goes to gitconnected

As part of this article, We’ll build and run a simple Spring Cloud Function locally and then deploy it to the AWS cloud platform.

Spring Cloud Function is a project with the following high-level goals as per spring cloud official website:

  • Promote the implementation of business logic via functions.
  • Decouple the development lifecycle of business logic from any specific runtime target so that the same code can run as a web endpoint, a stream processor, or a task.
  • Support a uniform programming model across serverless providers, as well as the ability to run standalone (locally or in a PaaS).
  • Enable Spring Boot features (auto-configuration, dependency injection, metrics) on serverless providers.

If you could able to understand the above things you will be able to relate things this with any serverless technology by any cloud provider. The reason behind serverless is concentrating on business logic, not on infra and any other things.

Features:

There are below number features spring cloud function provides.

  • Choice of programming styles — reactive, imperative or hybrid.
  • Function composition and adaptation (e.g., composing imperative functions with reactive).
  • Support for reactive function with multiple inputs and outputs allowing merging, joining and other complex streaming operations to be handled by functions.
  • Transparent type conversion of inputs and outputs.
  • Packaging functions for deployments, specific to the target platform (e.g., Project Riff, AWS Lambda and more)
  • Adapters to expose a function to the outside world as HTTP endpoints etc.
  • Deploying a JAR file containing such an application context with an isolated classloader, so that you can pack them together in a single JVM.
  • Compiling strings which are Java function bodies into bytecode, and then turning them into @Beans that can be wrapped as above.
  • Adapters for AWS Lambda, Microsoft Azure, Apache OpenWhisk and possibly other “serverless” service providers.
pivotal

Let us deep dive and do some coding.

We will take one simple example 

  • Convert a String to lower case, using a String method
  • And a HelloWorld greeting message using a dedicated class.

For this example, I am using Maven you can use Gradle as a build tool as well. 

#add spring cloud function dependency
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-web</artifactId>
<version>1.0.1.RELEASE</version>
</dependency>

First Spring Cloud Function:

We will expose @Bean of type function using spring cloud function. As part of this code, we are exposing toLowerCase functionality as a spring cloud function.

@SpringBootApplication
public class CloudFunctionApp {
public static void main(String[] args) {
SpringApplication.run(CloudFunctionApp.class, args);
}
@Bean
public Function<String, String> lowerCaseString() {
return value -> new StringBuilder(value).toString().toLowerCase().toString();
}
}

Test this functionality in our local using curl command

curl localhost:8087/lowerCaseString -H "Content-Type: text/plain" -d "Spring cloud FUNCTION"

The endpoint is the name of the bean. here it is lowerCaseString.

Spring Cloud Function in Packages:

In the above we have exposed @Bean as method Apart from this, we could also write our software as classes that implement the functional interface Function<T, R>. We can specify the packages to scan for relevant beans in the application.properties file.

package com.techwasti.spring.cloudfunction.functions;


public class Hello implements Function<String, String> {
@Override
public String apply(String s) {
return "Hello " + s + ", and welcome to Server less world!!!";
}
}

Also as mentioned above add below line in application.properties file

spring.cloud.function.scan.packages=com.techwasti.spring.cloudfunction.functions

We can also test this one in our local

curl localhost:8080/hello -H "Content-Type: text/plain" -d "Maheshwar"

The endpoint is the name of the class that implements the functional interface.

Spring Cloud Function on AWS:

The best thing about the spring ecosystem is seamless adoption and customization. The same applies to Spring Cloud Function, spring cloud function so powerful is that we can build Spring enabled functions that are cloud-agnostic. Spring cloud function provides abstraction API and adapter so that we can build function tests on local and deploy on any cloud provider such as AWS, Azure or Google Cloud platform without changing any of the business logic.

AWS is a popular one so for this exercise we choose AWS. 

Remember we have used above spring cloud function maven dependency we need the same one for this as 

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-web</artifactId>
<version>1.0.1.RELEASE</version>
</dependency>
##to handle AWS lambda we need below dependency 
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>2.0.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>

We are going to upload the artifact generated by the maven build to AWS Lambda, we need to build an artifact that is shaded, which means, it has all the dependencies burst out as individual class files instead of jars.

We are adding spring-boot-thin-layout dependency that helps us to reduce the size of the artifact by excluding some dependencies that are not needed.

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-thin-layout</artifactId>
<version>1.0.10.RELEASE</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>aws</shadedClassifierName>
</configuration>
</plugin>
</plugins>
</build>

If we are shipping our spring cloud function such as lower case String converter to handle this over AWS we need Handler

public class MyHelloWorldHandlers extends SpringBootRequestHandler<String, String> {

}

Spring Cloud Function AWS also ships with SpringBootStreamHandler and FunctionInvokingS3EventHandler as well.

This is just an empty class but it plays a very important role in both acting as an entry point and also defines the input and output types.

How Does AWS lambda Know Which Spring Cloud Function to Invoke?

In our example, if we have more than one spring cloud function How AWS know which one we have to invoke,

Even if we have more than one Spring Cloud Function in our application, AWS can invoke only one of them.

We have to specify the cloud function name in an environment variable called FUNCTION_NAME on the AWS console.

Upload function over AWS and Test:

Now we are ready to upload spring cloud function over AWS.

On the AWS Lambda console page, in the Function code section, we can select a Java 8runtime and simply click Upload. As I mentioned we need to specify handler as well i.e MyHelloWorldHandlers.

And then specify FUNCTION_NAME in environment variable as lowerCaseString.

it’s time for us to test the Lambda spring cloud function by creating a test event and supplying a sample string such as “Spring CLOUD function”, then save it and, then click the Test button. Similar way we can test the “hello” spring cloud function by changing the value of FUNCTION_NAME parameter. 

Spring Cloud Function is a powerful tool for decoupling the business logic from any specific runtime environment.

References:

Introducing Spring Cloud Function
Spring Cloud Function is a new project with the following high-level goals: Promote the implementation of business…spring.io

Spring Cloud Function
Spring Cloud Function features: Choice of programming styles – reactive, imperative or hybrid. Function composition and…spring.io

Introduction of Intel OpenVINO Toolkit!!!

I got udacity intel AI edge scholarship and in the introduction, they provided Introduction of Intel OpenVINO. In this article, we are going to explore the basic of openVINO.

OpenVINO toolkit can boost your inference applications across multiple deep neural networks with high throughput and efficiency.

OpenVINO stands for Open Visual Inference and Neural Network Optimization.

Download link 

https://software.intel.com/en-us/openvino-toolkit/choose-download

What is OpenVINO?

OpenVINO stands for Open Visual Inference and Neural Network Optimization. OpenVINO is a toolkit provided by Intel to facilitate faster inference of deep learning computer vision models. This toolkit helps developers to create cost-effective and robust computer vision applications.

Learn more about openvino here.

It enables deep learning inference at the edge and supports heterogeneous execution across computer vision accelerators — CPU, GPU, Intel® Movidius™ Neural Compute Stick, and FPGA.

OpenVION provides a number of inbuilt trained models here

Download & Install the OpenVINO:

There is the very good official documentation of OpenVION from intel, which descriptive and easy to understand. Please use below link

  1. To download

Choose & Download | Intel® Distribution of OpenVINO™ Toolkit
Download a version of the Intel® Distribution of OpenVINO™ toolkit for Linux, Windows, or macOS.software.intel.com

2. Get started

Get Started | Intel® Distribution of OpenVINO™ Toolkit
Get up-to-speed fast using resources and training materials for this computer vision toolkit.software.intel.com

OverView of OpenVINO:

The execution process is as follows — 

  • We have to feed a pre-trained model to the Model Optimizer. It optimizes the model and converts it into its intermediate representation (.xml and .bin file).
  • The Inference Engine helps in the proper execution of the model on the different number of devices. It manages the libraries required to run the code properly on different platforms.

The two main components of the OpenVINO toolkit are Model Optimizer and Inference Engine. So, we will dip dive into there details, to have a better understanding of under the hood.

Model Optimizer:

The model optimizer is a cross-platform CLI tool that facilitates the transition between the training and deployment environment. It adjusts the deep learning models for optimal execution on end-point target devices. If you want to understand the optimization technique for TensorFlow you can check out this article.

If you check the diagram carefully optimizer contains three steps 

  1. Converting
  2. Optimizing
  3. Preparing to inference.

OpenVION is a toolkit, not a deep learning library that will help you to train a model. It helps you to optimize and serve the model on different devices.

There is a detailed documentation of how under the hood this works. I don’t want to go into detail. 

Inference Engine:

Now our model is ready for inferencing. The optimizer CLI converted and optimized model and ready for inference. The model optimizer produces the intermediate representation of a model. This is the input for the inference engine to take inference over the input data. 

The Inference Engine is a C++ library with a set of C++ classes to infer input data (images) and get a result. The C++ library provides an API to read the Intermediate Representation, set the input and output formats, and execute the model on devices.

The best thing about the OpenVION inference engine is the heterogeneous execution of the model and it is possible because of the Inference Engine. It uses different plug-ins for different devices.

Code sample Example:

We will take sample store-aisle-monitor-python. This code sample has been provided by intel.

We will take some code sample snippets and brief description.

# Initialize the class
infer_network = Network()

# Load the network to IE plugin to get shape of input layer

n, c, h, w = infer_network.load_model(args.model, args.device, 1, 1, 2, args.cpu_extension)[1]

The above code is self-explanatory. 

just initializing the Network class and loading the model using the load_model function.
The load_model the function returns the plugin along with the input shape.
We only need the input shape that’s why we have specified [1] after the function call.

# The exec_net function will start an asynchronous inference request.
infer_network.exec_net(next_request_id, in_frame)

We need to pass request-id and input frame for inference. 

res = infer_network.get_output(cur_request_id)
for obj in res[0][0]:
if obj[2] > args.prob_threshold:
xmin = int(obj[3] * initial_w)
ymin = int(obj[4] * initial_h)
xmax = int(obj[5] * initial_w)
ymax = int(obj[6] * initial_h)
class_id = int(obj[1])

 get_output the function will give us the model’s result.

You can clone the git repo and start making your hands dirty. Happy coding!

References:

intel-iot-devkit/store-aisle-monitor-python
This reference implementation is also available in C++ This reference implementation counts the number of people…github.com

Some Cool project just FYIIntel® IoT Developer Kit
IoT Libraries & Code Samples from Intel. Intel® IoT Developer Kit has 102 repositories available. Follow their code on…github.com

Introduction to Intel® Distribution of OpenVINO™ toolkit for Computer Vision Applications |…
The Intel® Developer Zone offers tools and how-to information to enable cross-platform app development through platform…www.coursera.org

Idempotent Kafka Consumer

In the last couple of articles, we have seen how Kafka guarantees message processing, as part of this we saw idempotent Kafka producers. Idempotence is something I appreciate, maybe the most, in data engineering. I know most of you have a better understanding of the word idempotent.

There are two questions always people do ask in the data industry. Data is the new oil.

How do we guarantee all messages are processed?

How do we avoid or handle duplicate messages?

In the last article, we have seen how producers should ensure that the “at least once” delivery semantics. The producer shouldn’t deliver duplicate messages.

simple message data flow.

Let us jump into the consumer part. Kafka provides consumer API to pull the data from kafka. When we consume or pull the data from kafka we need to specify the consumer group. Consumer group helps us to a group of consumers that coordinate to read data from a set of topic partitions. To save progress in reading data from Kafka, a consumer needs to save the offset of the next message it will read in each topic partition it is assigned to. Consumers are free to store their offsets wherever they want but by default and for all Kafka Streams applications, these are stored back in Kafka itself in an internal topic called _consumer_offsets. To use this mechanism consumers either enable automatic periodic commitment of offsets back to Kafka by setting the configuration flag enable.auto.commit to true or by making an explicit call to commit the offsets.

idempotency guarantees of consistent data and no duplicacy.

Delivery Semantics of Kafka Consumer:

There are different semantics, for more details go through the below image.

  1. At least Once: In this semantics, offset are committed after the message processed. If some exception occurs while processing so the message is not committed, so the consumer will be able to read the message again from kafka. This can leads to duplicate message processing. Make sure message processing twice won’t impact your system. 

2. At most Once: In this semantics offsets are committed as soon as the message batch is received. If the processing goes wrong the message will be lost(we are unable to read again) because we have committed the offset.

  1. The producer published the message successfully.
  2. The consumer is able to consume messages successfully and committed offset.
  3. But while processing messages some exception occurs or an error occurs, machine shutdown so we lost that message because we are unable to read the same message again due to offset were already committed. 

3. No guarantee: In this semantics, there is no guarantee of a message, which means the given message processed once, multiple times or no at all. This is a simple scenario where you will end up with these semantics is if you have a consumer with enable.auto.commit set to true (this is the default) and for each batch of messages, you async process and save the desired results into a database. The frequency of these commits is determined by the configuration parameter auto.commit.interval.ms.

4. Exactly Once: In 2007 confluent introduced exactly-once semantics. This can be achieved for kafka to kafka workflows using kafka stream API. This semantic ensures your message processing idempotent. 

By default, a consumer is at least once because of when we don’t set anything regarding offset commit then the default is auto-commit of the offset. 

Idempotent Consumer:

Kafka stream API will help us to achieve idempotent kafka consumers. Kafka Streams is a library for performing stream transformation on data from kafka. The exactly-once semantic feature was added to Kafka Streams in the 0.11.0 Kafka release. Enabling exactly once is a simple configuration change setting the streaming configuration parameter processing.guaranteeto exactly_once(the default is at_least_once). A general workload for a Kafka streams application is to read data from one or more partitions, perform some data transformations, update a state, then write some results to an output topic. When exactly-once semantics is enabled, Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics, and production to output topics altogether. If any one of these steps fail, all of the changes are rolled back.

There is another way to make it idempotent if you are not using kafka streams API.

Suppose you are pulling data from kafka and doing some processing and then persisting packet into the database. How you should guarantee data consistency using unique key i.e “id”.

There are two strategies to generate a unique id

  1. Kafka generic id:- You can take the help of kafka to generate unique id by appending simple strings like below.

 String id = record.topic()+”-“+record.partition()+”-“+record.offset();

2. Application-specific unique id logic:-

This could be based on your application this could be change based on domain and functionality with which you are dealing.

e.g.

  1. Suppose you are feeding twitter stream data then twitters feed specific id.
  2. Any unique transaction-id for any financial domain application.