Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16583: Handle PartitionChangeRecord without directory IDs #16118

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
KAFKA-16583: Handle PartitionChangeRecord without directory IDs
When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.
  • Loading branch information
soarez committed May 28, 2024
commit 3000de00a6c381e050626ee36f27784153f0b4b7
Original file line number Diff line number Diff line change
Expand Up @@ -5971,6 +5971,7 @@ class ReplicaManagerTest {
.setPartitionId(0)
.setTopicId(FOO_UUID)
.setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2))
.setDirectories(util.Arrays.asList(Uuid.fromString("fKgQ2axkQiuzt4ANqKbPkQ"), DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
.setIsr(util.Arrays.asList(localId, localId + 1))
)
followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,20 @@ private static List<Uuid> checkDirectories(PartitionChangeRecord record) {
return record.directories();
}

private static boolean migratingDirectories(Uuid[] directories) {
if (directories == null) {
return true;
}
for (Uuid directory : directories) {
if (!DirectoryId.MIGRATING.equals(directory)) {
return false;
}
}
return true;
}

private static Uuid[] defaultToMigrating(Uuid[] directories, int numReplicas) {
if (directories == null || directories.length == 0) {
if (migratingDirectories(directories)) {
return DirectoryId.migratingArray(numReplicas);
}
return directories;
Expand Down Expand Up @@ -228,14 +240,11 @@ private PartitionRegistration(int[] replicas, Uuid[] directories, int[] isr, int
public PartitionRegistration merge(PartitionChangeRecord record) {
int[] newReplicas = (record.replicas() == null) ?
replicas : Replicas.toArray(record.replicas());
Uuid[] newDirectories;
if (record.directories() != null && !record.directories().isEmpty()) {
newDirectories = Uuid.toArray(checkDirectories(record));
} else if (record.replicas() != null) {
newDirectories = Uuid.toArray(DirectoryId.createDirectoriesFrom(replicas, directories, record.replicas()));
} else {
newDirectories = directories;
}
Uuid[] newDirectories = defaultToMigrating(
(record.directories() == null) ?
directories : Uuid.toArray(checkDirectories(record)),
newReplicas.length
);
int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr());
int[] newRemovingReplicas = (record.removingReplicas() == null) ?
removingReplicas : Replicas.toArray(record.removingReplicas());
Expand All @@ -257,7 +266,7 @@ public PartitionRegistration merge(PartitionChangeRecord record) {
int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
int[] newLastKnownElr = (record.lastKnownElr() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownElr());
return new PartitionRegistration(newReplicas,
defaultToMigrating(newDirectories, replicas.length),
newDirectories,
newIsr,
newRemovingReplicas,
newAddingReplicas,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ public void testMergePartitionChangeRecordWithReassignmentData() {
PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord().
setRemovingReplicas(Collections.singletonList(3)).
setAddingReplicas(Collections.singletonList(4)).
setReplicas(Arrays.asList(1, 2, 3, 4)));
setReplicas(Arrays.asList(1, 2, 3, 4)).
setDirectories(Arrays.asList(dir1, dir2, dir3, DirectoryId.UNASSIGNED)));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}).
setDirectories(new Uuid[]{dir1, dir2, dir3, DirectoryId.UNASSIGNED}).
setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {3}).setAddingReplicas(new int[] {4}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).build(), partition1);
PartitionRegistration partition2 = partition1.merge(new PartitionChangeRecord().
setIsr(Arrays.asList(1, 2, 4)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setReplicas(Arrays.asList(1, 2, 4)));
setReplicas(Arrays.asList(1, 2, 4)).
setDirectories(Arrays.asList(dir1, dir2, DirectoryId.UNASSIGNED)));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 4}).
setDirectories(new Uuid[]{dir1, dir2, DirectoryId.UNASSIGNED}).
setIsr(new int[] {1, 2, 4}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(202).build(), partition2);
Expand Down