-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18518: Add processor to handle rebalance events #18527
Conversation
Call for review: @aliehsaeedii |
d34ad1a
to
fc91305
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @cadonna ! I think StreamsRebalanceData
need some more work, rest is looking fine
|
||
public static final Assignment EMPTY = new Assignment(); | ||
|
||
public final Set<TaskId> activeTasks = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The collections should be unmodifiable, and probably private with getters?
Right now, I can do Assignment.EMPTY.activeTasks.add(new TaskID("a",1))
and wreak havoc.
If you want to make the class mutable, you cannot define EMPTY
as a static variable.
|
||
public final Set<TaskId> warmupTasks = new HashSet<>(); | ||
|
||
public Assignment() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are calling this constructor, you should just be using EMPTY
. It does not make sense to have both.
return subtopologyId; | ||
} | ||
|
||
public TaskId(final String subtopologyId, final int partitionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: first constructor, then getters
|
||
@Override | ||
public int compareTo(TaskId taskId) { | ||
if (subtopologyId.equals(taskId.subtopologyId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Java 8, it woul be more idiomatic to use soemthing like:
Comparator.comparing(TaskId::subtopologyId)
.thenComparingInt(TaskId::partitionId);
|
||
public static class Subtopology { | ||
|
||
public final Set<String> sourceTopics; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to make these collections unmodifiable, non-null, private and have getters.
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY); | ||
|
||
public StreamsRebalanceData(Map<String, Subtopology> subtopologies) { | ||
this.subtopologies = subtopologies; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-null, unmodifiable?
*/ | ||
public StreamsRebalanceEventsProcessor(StreamsRebalanceData streamsRebalanceData, | ||
StreamsGroupRebalanceCallbacks rebalanceCallbacks) { | ||
this.streamsRebalanceData = streamsRebalanceData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-null?
|
||
private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke, | ||
final CompletableFuture<Void> future) { | ||
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onTasksRevoked(activeTasksToRevoke); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invokeOnTasksRevokedCallback
has a different implementation than invokeOnTasksAssignedCallback
and invokeOnAllTasksLostCallback
. It's probably easier to read if all three use either the .map
stile or the isPresent
style. I don't mind which one, isPresent
is probably a little easier to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I just applied IDEA's recommendation to transform it to functional style. However, I agree with you regarding readability.
@@ -27,7 +27,9 @@ | |||
public abstract class BackgroundEvent { | |||
|
|||
public enum Type { | |||
ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK | |||
ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one per line will simplify conflict resolution
/** | ||
* This class holds the data that is needed to participate in the Streams rebalance protocol. | ||
*/ | ||
public class StreamsRebalanceData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class and its inner classes need unit tests.
This commit adds a processor named StreamsRebalanceEventsProcessor that handles the rebalance events sent from the background thread of the async consumer to the stream thread when an task assignment changes. It also adds the corresponding rebalance events. Additionally, this commit adds StreamsRebalanceData that maintains the data that is exchanges for the Streams rebalance protocol. All of these are used by the Streams heartbeat request manager and the Streams membership manager that will be added in a future commit.
fc91305
to
a319b91
Compare
@lucasbru Thank you for your review! Please re-review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the updates!
This commit adds a processor named StreamsRebalanceEventsProcessor that handles the rebalance events sent from the background thread of the async consumer to the stream thread when an task assignment changes. It also adds the corresponding rebalance events. Additionally, this commit adds StreamsRebalanceData that maintains the data that is exchanges for the Streams rebalance protocol. All of these are used by the Streams heartbeat request manager and the Streams membership manager that will be added in a future commit. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit adds a processor named
StreamsRebalanceEventsProcessor that handles the rebalance events sent from the background thread of the async consumer to the stream thread when an task
assignment changes. It also adds the corresponding rebalance events.
Additionally, this commit adds StreamsRebalanceData that maintains the data that is exchanges for the Streams rebalance protocol.
All of these are used by the Streams heartbeat request manager and the Streams membership manager that will be added in a future commit.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)