Details
Description
The KafkaOffsetBackingStore consumer callback was recently augmented with a call to OffsetUtils.processPartitionKey: https://github.com/apache/kafka/blob/f1e58a35d7aebbe72844faf3e5019d9aa7a85e4a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L323
This function deserializes the offset key, which may be malformed in the topic: https://github.com/apache/kafka/blob/f1e58a35d7aebbe72844faf3e5019d9aa7a85e4a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java#L92
When this happens, a DataException is thrown, and propagates to the KafkaBasedLog try-catch surrounding the batch processing of the records: https://github.com/apache/kafka/blob/f1e58a35d7aebbe72844faf3e5019d9aa7a85e4a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L445-L454
For example:
ERROR Error polling: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.util.KafkaBasedLog:453)
This means that one DataException for a malformed record may cause the remainder of the batch to be dropped, corrupting the in-memory state of the KafkaOffsetBackingStore. This prevents tasks using the KafkaOffsetBackingStore from seeing all of the offsets in the topics, and can cause duplicate records to be emitted.
Attachments
Issue Links
- links to