Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18518: Add processor to handle rebalance events #18527

Merged
merged 13 commits into from
Jan 22, 2025

Conversation

cadonna
Copy link
Member

@cadonna cadonna commented Jan 14, 2025

This commit adds a processor named
StreamsRebalanceEventsProcessor that handles the rebalance events sent from the background thread of the async consumer to the stream thread when an task
assignment changes. It also adds the corresponding rebalance events.

Additionally, this commit adds StreamsRebalanceData that maintains the data that is exchanges for the Streams rebalance protocol.

All of these are used by the Streams heartbeat request manager and the Streams membership manager that will be added in a future commit.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@cadonna cadonna added streams consumer KIP-1071 PRs related to KIP-1071 labels Jan 14, 2025
@cadonna cadonna requested review from bbejeck, lucasbru and mjsax January 14, 2025 14:48
@github-actions github-actions bot added triage PRs from the community clients labels Jan 14, 2025
@cadonna
Copy link
Member Author

cadonna commented Jan 14, 2025

Call for review: @aliehsaeedii

@cadonna cadonna removed the triage PRs from the community label Jan 15, 2025
@cadonna cadonna force-pushed the streams_rebalance_interface branch from d34ad1a to fc91305 Compare January 15, 2025 21:24
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @cadonna ! I think StreamsRebalanceData need some more work, rest is looking fine


public static final Assignment EMPTY = new Assignment();

public final Set<TaskId> activeTasks = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The collections should be unmodifiable, and probably private with getters?

Right now, I can do Assignment.EMPTY.activeTasks.add(new TaskID("a",1)) and wreak havoc.

If you want to make the class mutable, you cannot define EMPTY as a static variable.


public final Set<TaskId> warmupTasks = new HashSet<>();

public Assignment() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are calling this constructor, you should just be using EMPTY. It does not make sense to have both.

return subtopologyId;
}

public TaskId(final String subtopologyId, final int partitionId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: first constructor, then getters


@Override
public int compareTo(TaskId taskId) {
if (subtopologyId.equals(taskId.subtopologyId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java 8, it woul be more idiomatic to use soemthing like:

Comparator.comparing(TaskId::subtopologyId)
          .thenComparingInt(TaskId::partitionId);


public static class Subtopology {

public final Set<String> sourceTopics;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to make these collections unmodifiable, non-null, private and have getters.

private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY);

public StreamsRebalanceData(Map<String, Subtopology> subtopologies) {
this.subtopologies = subtopologies;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-null, unmodifiable?

*/
public StreamsRebalanceEventsProcessor(StreamsRebalanceData streamsRebalanceData,
StreamsGroupRebalanceCallbacks rebalanceCallbacks) {
this.streamsRebalanceData = streamsRebalanceData;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-null?


private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke,
final CompletableFuture<Void> future) {
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onTasksRevoked(activeTasksToRevoke);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invokeOnTasksRevokedCallback has a different implementation than invokeOnTasksAssignedCallback and invokeOnAllTasksLostCallback. It's probably easier to read if all three use either the .map stile or the isPresent style. I don't mind which one, isPresent is probably a little easier to follow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I just applied IDEA's recommendation to transform it to functional style. However, I agree with you regarding readability.

@@ -27,7 +27,9 @@
public abstract class BackgroundEvent {

public enum Type {
ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK
ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one per line will simplify conflict resolution

/**
* This class holds the data that is needed to participate in the Streams rebalance protocol.
*/
public class StreamsRebalanceData {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class and its inner classes need unit tests.

This commit adds a processor named
StreamsRebalanceEventsProcessor that handles the rebalance
events sent from the background thread of the async
consumer to the stream thread when an task
assignment changes. It also adds the corresponding rebalance
events.

Additionally, this commit adds StreamsRebalanceData that
maintains the data that is exchanges for the Streams rebalance
protocol.

All of these are used by the Streams heartbeat request manager
and the Streams membership manager that will be added in a future
commit.
@cadonna cadonna force-pushed the streams_rebalance_interface branch from fc91305 to a319b91 Compare January 21, 2025 14:55
@cadonna
Copy link
Member Author

cadonna commented Jan 21, 2025

@lucasbru Thank you for your review!
Frankly, I do not know why I missed out on so many things in the Streams rebalance data. 🤷

Please re-review.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the updates!

@cadonna cadonna merged commit 239708f into apache:trunk Jan 22, 2025
9 checks passed
askew pushed a commit to askew/kafka that referenced this pull request Jan 23, 2025
This commit adds a processor named
StreamsRebalanceEventsProcessor that handles the rebalance
events sent from the background thread of the async
consumer to the stream thread when an task
assignment changes. It also adds the corresponding rebalance
events.

Additionally, this commit adds StreamsRebalanceData that
maintains the data that is exchanges for the Streams rebalance
protocol.

All of these are used by the Streams heartbeat request manager
and the Streams membership manager that will be added in a future
commit.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants