Skip to content

Commit

Permalink
KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackin…
Browse files Browse the repository at this point in the history
…gStore (#14718)

Signed-off-by: Greg Harris <greg.harris@aiven.io>

Reviewers: Yash Mayya <yash.mayya@gmail.com>
  • Loading branch information
gharris1727 authored Nov 10, 2023
1 parent fcfd378 commit 989618a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ public static void processPartitionKey(byte[] partitionKey, byte[] offsetValue,
}
// The topic parameter is irrelevant for the JsonConverter which is the internal converter used by
// Connect workers.
Object deserializedKey = keyConverter.toConnectData("", partitionKey).value();
Object deserializedKey;
try {
deserializedKey = keyConverter.toConnectData("", partitionKey).value();
} catch (DataException e) {
log.warn("Ignoring offset partition key with unknown serialization. Expected json.", e);
return;
}
if (!(deserializedKey instanceof List)) {
log.warn("Ignoring offset partition key with an unexpected format. Expected type: {}, actual type: {}",
List.class.getName(), className(deserializedKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -75,49 +76,51 @@ public void testValidateFormatWithValidFormat() {
OffsetUtils.validateFormat(offsetData);
}

@Test
public void testProcessPartitionKeyWithUnknownSerialization() {
assertInvalidPartitionKey(
new byte[]{0},
"Ignoring offset partition key with unknown serialization");
assertInvalidPartitionKey(
"i-am-not-json".getBytes(StandardCharsets.UTF_8),
"Ignoring offset partition key with unknown serialization");
}

@Test
public void testProcessPartitionKeyNotList() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
OffsetUtils.processPartitionKey(serializePartitionKey(new HashMap<>()), new byte[0], CONVERTER, connectorPartitions);
// Expect no partition to be added to the map since the partition key is of an invalid format
assertEquals(0, connectorPartitions.size());
assertEquals(1, logCaptureAppender.getMessages().size());
assertThat(logCaptureAppender.getMessages().get(0),
containsString("Ignoring offset partition key with an unexpected format"));
}
assertInvalidPartitionKey(
new byte[]{},
"Ignoring offset partition key with an unexpected format");
assertInvalidPartitionKey(
serializePartitionKey(new HashMap<>()),
"Ignoring offset partition key with an unexpected format");
}

@Test
public void testProcessPartitionKeyListWithOneElement() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
OffsetUtils.processPartitionKey(serializePartitionKey(Collections.singletonList("")), new byte[0], CONVERTER, connectorPartitions);
// Expect no partition to be added to the map since the partition key is of an invalid format
assertEquals(0, connectorPartitions.size());
assertEquals(1, logCaptureAppender.getMessages().size());
assertThat(logCaptureAppender.getMessages().get(0),
containsString("Ignoring offset partition key with an unexpected number of elements"));
}
assertInvalidPartitionKey(
serializePartitionKey(Collections.singletonList("")),
"Ignoring offset partition key with an unexpected number of elements");
}

@Test
public void testProcessPartitionKeyListWithElementsOfWrongType() {
assertInvalidPartitionKey(
serializePartitionKey(Arrays.asList(1, new HashMap<>())),
"Ignoring offset partition key with an unexpected format for the first element in the partition key list");
assertInvalidPartitionKey(
serializePartitionKey(Arrays.asList("connector-name", new ArrayList<>())),
"Ignoring offset partition key with an unexpected format for the second element in the partition key list");
}

public void assertInvalidPartitionKey(byte[] key, String message) {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList(1, new HashMap<>())), new byte[0], CONVERTER, connectorPartitions);
OffsetUtils.processPartitionKey(key, new byte[0], CONVERTER, connectorPartitions);
// Expect no partition to be added to the map since the partition key is of an invalid format
assertEquals(0, connectorPartitions.size());
assertEquals(1, logCaptureAppender.getMessages().size());
assertThat(logCaptureAppender.getMessages().get(0),
containsString("Ignoring offset partition key with an unexpected format for the first element in the partition key list"));

OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name", new ArrayList<>())), new byte[0], CONVERTER, connectorPartitions);
// Expect no partition to be added to the map since the partition key is of an invalid format
assertEquals(0, connectorPartitions.size());
assertEquals(2, logCaptureAppender.getMessages().size());
assertThat(logCaptureAppender.getMessages().get(1),
containsString("Ignoring offset partition key with an unexpected format for the second element in the partition key list"));
assertThat(logCaptureAppender.getMessages().get(0), containsString(message));
}
}

Expand Down

0 comments on commit 989618a

Please sign in to comment.