Kafka commitAsync Retries with Commit Order
up vote
0
down vote
favorite
I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":
A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.
A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.
Can anyone shed light on what this means or even better provide a toy example demonstrating this?
apache-kafka commit offset kafka-consumer-api
add a comment |
up vote
0
down vote
favorite
I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":
A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.
A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.
Can anyone shed light on what this means or even better provide a toy example demonstrating this?
apache-kafka commit offset kafka-consumer-api
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":
A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.
A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.
Can anyone shed light on what this means or even better provide a toy example demonstrating this?
apache-kafka commit offset kafka-consumer-api
I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":
A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.
A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.
Can anyone shed light on what this means or even better provide a toy example demonstrating this?
apache-kafka commit offset kafka-consumer-api
apache-kafka commit offset kafka-consumer-api
asked Nov 10 at 15:46
mathfish
2016
2016
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
0
down vote
accepted
Here is what I think it is, but got to humble I could be wrong
try {
AtomicInteger atomicInteger = new AtomicInteger(0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.format("offset: %dn", record.offset());
System.out.format("partition: %dn", record.partition());
System.out.format("timestamp: %dn", record.timestamp());
System.out.format("timeStampType: %sn", record.timestampType());
System.out.format("topic: %sn", record.topic());
System.out.format("key: %sn", record.key());
System.out.format("value: %sn", record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
private int marker = atomicInteger.incrementAndGet();
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
if (marker == atomicInteger.get()) consumer.commitAsync(this);
} else {
//Cant' try anymore
}
}
});
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync(); //Block
consumer.close();
System.out.println("Closed consumer and we are done");
}
Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
– mathfish
Nov 11 at 11:23
Yes, that is a good point. I agree.
– Daniel Hinojosa
Nov 11 at 14:09
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
accepted
Here is what I think it is, but got to humble I could be wrong
try {
AtomicInteger atomicInteger = new AtomicInteger(0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.format("offset: %dn", record.offset());
System.out.format("partition: %dn", record.partition());
System.out.format("timestamp: %dn", record.timestamp());
System.out.format("timeStampType: %sn", record.timestampType());
System.out.format("topic: %sn", record.topic());
System.out.format("key: %sn", record.key());
System.out.format("value: %sn", record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
private int marker = atomicInteger.incrementAndGet();
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
if (marker == atomicInteger.get()) consumer.commitAsync(this);
} else {
//Cant' try anymore
}
}
});
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync(); //Block
consumer.close();
System.out.println("Closed consumer and we are done");
}
Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
– mathfish
Nov 11 at 11:23
Yes, that is a good point. I agree.
– Daniel Hinojosa
Nov 11 at 14:09
add a comment |
up vote
0
down vote
accepted
Here is what I think it is, but got to humble I could be wrong
try {
AtomicInteger atomicInteger = new AtomicInteger(0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.format("offset: %dn", record.offset());
System.out.format("partition: %dn", record.partition());
System.out.format("timestamp: %dn", record.timestamp());
System.out.format("timeStampType: %sn", record.timestampType());
System.out.format("topic: %sn", record.topic());
System.out.format("key: %sn", record.key());
System.out.format("value: %sn", record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
private int marker = atomicInteger.incrementAndGet();
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
if (marker == atomicInteger.get()) consumer.commitAsync(this);
} else {
//Cant' try anymore
}
}
});
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync(); //Block
consumer.close();
System.out.println("Closed consumer and we are done");
}
Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
– mathfish
Nov 11 at 11:23
Yes, that is a good point. I agree.
– Daniel Hinojosa
Nov 11 at 14:09
add a comment |
up vote
0
down vote
accepted
up vote
0
down vote
accepted
Here is what I think it is, but got to humble I could be wrong
try {
AtomicInteger atomicInteger = new AtomicInteger(0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.format("offset: %dn", record.offset());
System.out.format("partition: %dn", record.partition());
System.out.format("timestamp: %dn", record.timestamp());
System.out.format("timeStampType: %sn", record.timestampType());
System.out.format("topic: %sn", record.topic());
System.out.format("key: %sn", record.key());
System.out.format("value: %sn", record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
private int marker = atomicInteger.incrementAndGet();
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
if (marker == atomicInteger.get()) consumer.commitAsync(this);
} else {
//Cant' try anymore
}
}
});
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync(); //Block
consumer.close();
System.out.println("Closed consumer and we are done");
}
Here is what I think it is, but got to humble I could be wrong
try {
AtomicInteger atomicInteger = new AtomicInteger(0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.format("offset: %dn", record.offset());
System.out.format("partition: %dn", record.partition());
System.out.format("timestamp: %dn", record.timestamp());
System.out.format("timeStampType: %sn", record.timestampType());
System.out.format("topic: %sn", record.topic());
System.out.format("key: %sn", record.key());
System.out.format("value: %sn", record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
private int marker = atomicInteger.incrementAndGet();
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
if (marker == atomicInteger.get()) consumer.commitAsync(this);
} else {
//Cant' try anymore
}
}
});
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync(); //Block
consumer.close();
System.out.println("Closed consumer and we are done");
}
edited Nov 12 at 14:26
mathfish
2016
2016
answered Nov 11 at 0:09
Daniel Hinojosa
78237
78237
Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
– mathfish
Nov 11 at 11:23
Yes, that is a good point. I agree.
– Daniel Hinojosa
Nov 11 at 14:09
add a comment |
Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
– mathfish
Nov 11 at 11:23
Yes, that is a good point. I agree.
– Daniel Hinojosa
Nov 11 at 14:09
Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
– mathfish
Nov 11 at 11:23
Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
– mathfish
Nov 11 at 11:23
Yes, that is a good point. I agree.
– Daniel Hinojosa
Nov 11 at 14:09
Yes, that is a good point. I agree.
– Daniel Hinojosa
Nov 11 at 14:09
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53240589%2fkafka-commitasync-retries-with-commit-order%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown