Idempotent Kafka Consumer
In the last couple of articles, we have seen how Kafka guarantees message processing, as part of this we saw idempotent Kafka producers. Idempotence is something I appreciate, maybe the most, in data engineering. I know most of you have a better understanding of the word idempotent.
There are two questions always people do ask in the data industry. Data is the new oil.
How do we guarantee all messages are processed?
How do we avoid or handle duplicate messages?
In the last article, we have seen how producers should ensure that the “at least once” delivery semantics. The producer shouldn’t deliver duplicate messages.
Let us jump into the consumer part. Kafka provides consumer API to pull the data from kafka. When we consume or pull the data from kafka we need to specify the consumer group. Consumer group helps us to a group of consumers that coordinate to read data from a set of topic partitions. To save progress in reading data from Kafka, a consumer needs to save the offset of the next message it will read in each topic partition it is assigned to. Consumers are free to store their offsets wherever they want but by default and for all Kafka Streams applications, these are stored back in Kafka itself in an internal topic called
_consumer_offsets. To use this mechanism consumers either enable automatic periodic commitment of offsets back to Kafka by setting the configuration flag
true or by making an explicit call to commit the offsets.
idempotency guarantees of consistent data and no duplicacy.
Delivery Semantics of Kafka Consumer:
There are different semantics, for more details go through the below image.
- At least Once: In this semantics, offset are committed after the message processed. If some exception occurs while processing so the message is not committed, so the consumer will be able to read the message again from kafka. This can leads to duplicate message processing. Make sure message processing twice won’t impact your system.
2. At most Once: In this semantics offsets are committed as soon as the message batch is received. If the processing goes wrong the message will be lost(we are unable to read again) because we have committed the offset.
- The producer published the message successfully.
- The consumer is able to consume messages successfully and committed offset.
- But while processing messages some exception occurs or an error occurs, machine shutdown so we lost that message because we are unable to read the same message again due to offset were already committed.
3. No guarantee: In this semantics, there is no guarantee of a message, which means the given message processed once, multiple times or no at all. This is a simple scenario where you will end up with these semantics is if you have a consumer with
enable.auto.commit set to
true (this is the default) and for each batch of messages, you async process and save the desired results into a database. The frequency of these commits is determined by the configuration parameter
4. Exactly Once: In 2007 confluent introduced exactly-once semantics. This can be achieved for kafka to kafka workflows using kafka stream API. This semantic ensures your message processing idempotent.
By default, a consumer is at least once because of when we don’t set anything regarding offset commit then the default is auto-commit of the offset.
Kafka stream API will help us to achieve idempotent kafka consumers. Kafka Streams is a library for performing stream transformation on data from kafka. The exactly-once semantic feature was added to Kafka Streams in the 0.11.0 Kafka release. Enabling exactly once is a simple configuration change setting the streaming configuration parameter
exactly_once(the default is
at_least_once). A general workload for a Kafka streams application is to read data from one or more partitions, perform some data transformations, update a state, then write some results to an output topic. When exactly-once semantics is enabled, Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics, and production to output topics altogether. If any one of these steps fail, all of the changes are rolled back.
There is another way to make it idempotent if you are not using kafka streams API.
Suppose you are pulling data from kafka and doing some processing and then persisting packet into the database. How you should guarantee data consistency using unique key i.e “id”.
There are two strategies to generate a unique id
- Kafka generic id:- You can take the help of kafka to generate unique id by appending simple strings like below.
String id = record.topic()+”-“+record.partition()+”-“+record.offset();
2. Application-specific unique id logic:-
This could be based on your application this could be change based on domain and functionality with which you are dealing.
- Suppose you are feeding twitter stream data then twitters feed specific id.
- Any unique transaction-id for any financial domain application.