By Glide


2017-04-14 17:19:34 8 Comments

Is there a way to commit manually with Kafka Stream?

Usually with using the KafkaConsumer, I do something like below:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}

Where I'm calling commit manually. I don't see a similar API for KStream.

1 comments

@Matthias J. Sax 2017-04-14 19:53:11

Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit -- in fact, auto-commit is disabled for the internally used consumer and Streams manages commits "manually". The reason is, that commits can only happen at certain points during processing to ensure no data can get lost (there a many internal dependencies with regard to updating state and flushing results).

For more frequent commits, you can reduce commit interval via StreamsConfig parameter commit.interval.ms.

Nevertheless, manual commits are possible indirectly, via low-level Processor API. You can use the context object that is provided via init() method to call context#commit(). Note, that this is only a "request to Streams" to commit as soon as possible -- it's not issuing a commit directly.

@Arpan Sharma 2019-10-12 13:35:12

What happens if there is an exception and the stream app crashes. Then again starting the stream app will lead to consume same message and loop will go on until we delete the topic and re-create it.

@Matthias J. Sax 2019-10-12 19:23:35

Yes, if your application crashes is is restarted, it will resume processing from the latest committed offset (similar to KafkaConsumer, that is in fact used internally). -- Not sure what you mean by "until we delete the topic and re-create it"? How does committing offsets relates to deleting/re-creating topics?

@Arpan Sharma 2019-10-14 04:26:15

The issue was I was reading a specific message and it had special characters in it and consuming this message was leading my stream app to crash. Again when the app started it consumed the same message and crashed and loop continued. My point is can we not manually commit the message while catching an exception and proceed with next message

@Matthias J. Sax 2019-10-14 07:12:04

Not within the application. Depending when your application encounters the issue, you can maybe use a DeserializationExceptionHandler: docs.confluent.io/current/streams/developer-guide/… -- Or you might be able to catch the exception and "swallow" it.

@Arpan Sharma 2019-10-14 07:46:33

Thanks got it now!! Much appreciated 👍

@Balan 2020-07-07 02:14:31

Any application exception thrown within kafka streams application may cause duplicate messages (offsets and message commits going out of sync) if the exception is not deserlization or production exception. I have tested this with kafka 2.5 even with processing gurantee (EOS setup). Issuing context.commt() didnot help

@Matthias J. Sax 2020-07-07 05:10:32

@Balan With EOS, you should never get any duplicates in your output. When you verified your output, did use configure the consumer with read_committed mode?

@Balan 2020-07-08 21:24:37

@MatthiasJ.Sax Agreed if we do read-committed, it would filter out uncommitted transactions. That said some of the delivered interceptors and even the replicator does not support setting isolation level for the consumer. so this is still an issue when you look at the overall eco system

@Matthias J. Sax 2020-07-08 21:31:01

Well, if some component do not support the correct isolation level, there is not much you can do about it. EOS only work as advertised if all components support it. What replicator are you using? Wondering why it does not support isolation level "read-committed" -- and note that interceptors that trigger side-effects are not covered by the EOS guarantee by design.

Related Questions

Sponsored Content

2 Answered Questions

[SOLVED] Kafka Stream Exception: GroupAuthorizationException

1 Answered Questions

1 Answered Questions

[SOLVED] Kafka Stream Process API batch commit (based number of messages)

1 Answered Questions

Kafka ktable corrupt message handling

1 Answered Questions

[SOLVED] Is it possible to access message headers with Kafka Streams?

2 Answered Questions

[SOLVED] Commit Kafka offsets in Java manually

1 Answered Questions

kafka streams app - ignore old messages on restart

2 Answered Questions

[SOLVED] Kafka Streaming Concurrency?

Sponsored Content