Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16583: Handle PartitionChangeRecord without directory IDs #16118

Merged
merged 1 commit into from
Jun 4, 2024

Conversation

soarez
Copy link
Member

@soarez soarez commented May 29, 2024

When PartitionRegistration#merge() reads a PartitionChangeRecord from an older MetadataVersion, with a replica assignment change and without #directories() set, it produces a directory assignment of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion may not yet support directory assignments, leading to an UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord if the MetadataVersion supports it, via PartitionChangeBuilder, there's no need for PartitionRegistration#merge() to populate directories upon a replica assignment change.

When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.
@showuon
Copy link
Contributor

showuon commented May 31, 2024

@soarez , thanks for the fix. It works now if I followed the steps in the JIRA. But then, after MV upgraded, this partition cannot change replica successfully. Here's the steps I did:

  1. Launch a 3.6.0 controller and a 3.6.0 broker(BrokerA) in Kraft mode;
  2. Create a topic with 1 partition;
  3. Launch a 3.6.0 broker(Broker B) in Kraft mode and reassign the step 2 partition to Broker B;
  4. Upgrade Broker B to 3.7.0;
    === These steps in JIRA works now ===
  5. Upgrade Broker A, Controllers to 3.7.0
  6. Upgrade MV to 3.7: ./bin/kafka-features.sh --bootstrap-server localhost:9092 upgrade --metadata 3.7
  7. reassign the step 2 partition to Broker A

The logs in broker A:

[2024-05-31 15:33:25,763] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(t1-0) (kafka.server.ReplicaFetcherManager)
[2024-05-31 15:33:25,837] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(t1-0) (kafka.server.ReplicaFetcherManager)
[2024-05-31 15:33:25,837] INFO [ReplicaAlterLogDirsManager on broker 2] Removed fetcher for partitions Set(t1-0) (kafka.server.ReplicaAlterLogDirsManager)
[2024-05-31 15:33:25,853] INFO Log for partition t1-0 is renamed to /tmp/kraft-broker-logs/t1-0.3e6d8bebc1c04f3186ad6cf63145b6fd-delete and is scheduled for deletion (kafka.log.LogManager)
[2024-05-31 15:33:26,279] ERROR Controller returned error NOT_LEADER_OR_FOLLOWER for assignment of partition PartitionData(partitionIndex=0, errorCode=6) into directory oULBCf49aiRXaWJpO3I-GA (org.apache.kafka.server.AssignmentsManager)
[2024-05-31 15:33:26,280] WARN Re-queueing assignments: [Assignment{timestampNs=26022187148625, partition=t1:0, dir=/tmp/kraft-broker-logs, reason='Applying metadata delta'}] (org.apache.kafka.server.AssignmentsManager)
[2024-05-31 15:33:26,786] ERROR Controller returned error NOT_LEADER_OR_FOLLOWER for assignment of partition PartitionData(partitionIndex=0, errorCode=6) into directory oULBCf49aiRXaWJpO3I-GA (org.apache.kafka.server.AssignmentsManager)
[2024-05-31 15:33:27,296] WARN Re-queueing assignments: [Assignment{timestampNs=26022187148625, partition=t1:0, dir=/tmp/kraft-broker-logs, reason='Applying metadata delta'}] (org.apache.kafka.server.AssignmentsManager)
...

Logs in controller:

[2024-05-31 15:33:25,727] INFO [QuorumController id=1] Successfully altered 1 out of 1 partition reassignment(s). (org.apache.kafka.controller.ReplicationControlManager)
[2024-05-31 15:33:25,727] INFO [QuorumController id=1] Replayed partition assignment change PartitionChangeRecord(partitionId=0, topicId=tMiJOQznTLKtOZ8rLqdgqw, isr=null, leader=-2, replicas=[6, 2], removingReplicas=[2], addingReplicas=[6], leaderRecoveryState=-1, directories=[RuDIAGGJrTG2NU6tEOkbHw, AAAAAAAAAAAAAAAAAAAAAA], eligibleLeaderReplicas=null, lastKnownElr=null) for topic t1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-05-31 15:33:25,802] INFO [QuorumController id=1] AlterPartition request from node 2 for t1-0 completed the ongoing partition reassignment and triggered a leadership change. Returning NEW_LEADER_ELECTED. (org.apache.kafka.controller.ReplicationControlManager)
[2024-05-31 15:33:25,802] INFO [QuorumController id=1] UNCLEAN partition change for t1-0 with topic ID tMiJOQznTLKtOZ8rLqdgqw: replicas: [6, 2] -> [6], directories: [RuDIAGGJrTG2NU6tEOkbHw, AAAAAAAAAAAAAAAAAAAAAA] -> [RuDIAGGJrTG2NU6tEOkbHw], isr: [2] -> [6], removingReplicas: [2] -> [], addingReplicas: [6] -> [], leader: 2 -> 6, leaderEpoch: 3 -> 4, partitionEpoch: 5 -> 6 (org.apache.kafka.controller.ReplicationControlManager)
[2024-05-31 15:33:25,802] INFO [QuorumController id=1] Replayed partition assignment change PartitionChangeRecord(partitionId=0, topicId=tMiJOQznTLKtOZ8rLqdgqw, isr=[6], leader=6, replicas=[6], removingReplicas=[], addingReplicas=[], leaderRecoveryState=-1, directories=[RuDIAGGJrTG2NU6tEOkbHw], eligibleLeaderReplicas=null, lastKnownElr=null) for topic t1 (org.apache.kafka.controller.ReplicationControlManager)
[2024-05-31 15:33:26,277] WARN [QuorumController id=1] AssignReplicasToDirsRequest from broker 2 references non assigned partition t1-0 (org.apache.kafka.controller.ReplicationControlManager)
[2024-05-31 15:33:26,785] WARN [QuorumController id=1] AssignReplicasToDirsRequest from broker 2 references non assigned partition t1-0 (org.apache.kafka.controller.ReplicationControlManager)
[2024-05-31 15:33:27,293] WARN [QuorumController id=1] AssignReplicasToDirsRequest from broker 2 references non assigned partition t1-0 (org.apache.kafka.controller.ReplicationControlManager)

So looks like in handleAssignReplicasToDirs, we will fail validation before entering PartitionChangeBuilder. Any thoughts about this issue?

@soarez
Copy link
Member Author

soarez commented Jun 3, 2024

@showuon thanks for testing and sharing this.

In those logs the controller is rejecting the assignment with a NOT_LEADER_OR_FOLLOWER because the partition has been moved away from the broker. Here the controller is comparing broker IDs, not directory IDs. A failed assignment is re-qeueued, so this error will persist until the broker dies or the replica is assigned back to it.

I'm thinking of two options:

  1. Cancel any pending assignment for a replica when a metadata update shows the broker is no longer a replica for that partition.
  2. Accept NOT_LEADER_OR_FOLLOWER as indication that a reassignment has taken place, and do not retry.

I'm leaning towards option 2. since it's much simpler and there's no other case when handleAssignReplicasToDirs returns NOT_LEADER_OR_FOLLOWER

@showuon
Copy link
Contributor

showuon commented Jun 4, 2024

I was trying to know the root cause of this problem, that why does it fail after upgrade, but not fail without upgrade. My understanding is that because before upgrade, the topic image doesn't have dirID for the assignment. After upgrade, the assignment has the dirID. So in the ReplicaManager#applyDelta, we'll have have directoryId changes in localChanges, which will invoke AssignmentEvent here. With that, we'll get the unexpected NOT_LEADER_OR_FOLLOWER error. And I also confirmed, without your change in this PR, this issue also exists. That is:

  1. Launch a 3.6.0 controller and a 3.6.0 broker(BrokerA) in Kraft mode;
  2. Create a topic with 1 partition;
    3. Launch a 3.6.0 broker(Broker B) in Kraft mode and reassign the step 2 partition to Broker B;
  3. Upgrade Broker B to 3.7.0;
  4. Upgrade Broker A, Controllers to 3.7.0
  5. Upgrade MV to 3.7: ./bin/kafka-features.sh --bootstrap-server localhost:9092 upgrade --metadata 3.7
  6. reassign the step 2 partition to Broker A (or B)

So I think we might need to think about a good solution to fix from the root. I will create another ticket to track this issue. That said, I think this PR already fixed the issue in JIRA. Let's complete it! :)

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@showuon
Copy link
Contributor

showuon commented Jun 4, 2024

KAFKA-16886 is created for above issue. I've set target to v3.7.1/v3.8.0. Thanks.

@soarez
Copy link
Member Author

soarez commented Jun 4, 2024

@showuon Thanks for looking into this, and thanks for filing KAFKA-16886, I'll take that one.

@soarez
Copy link
Member Author

soarez commented Jun 4, 2024

Failing tests are unrelated. Merging.

@soarez soarez merged commit 16359e7 into apache:trunk Jun 4, 2024
1 check failed
soarez added a commit that referenced this pull request Jun 4, 2024
When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.

Reviewers: Luke Chen <showuon@gmail.com>
soarez added a commit that referenced this pull request Jun 4, 2024
When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.

Reviewers: Luke Chen <showuon@gmail.com>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
…he#16118)

When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.

Reviewers: Luke Chen <showuon@gmail.com>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
…he#16118)

When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.

Reviewers: Luke Chen <showuon@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants