-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15555: Ensure wakeups are handled correctly in poll() #14746
Conversation
@@ -335,10 +336,17 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { | |||
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); | |||
} | |||
|
|||
final CompletableFuture<Void> wakeupFuture = setupWakeupTrigger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to create a dedicated future since fetches are not passed to the background thread as events since fetches happen continuously on the background thread.
I will add tests after somebody confirms that this makes sense. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @cadonna!
I believe poll()
should throw a WakeupException
, right @philipnee? If so, the ExecutionException
's should be inspected for being a WakeupException
. If it's not a WakeupException
, I'm not sure what to do. I also don't remember how we handle interruptions, though I believe that's in Javadoc comments somewhere.
Hi @cadonna - Thank you for putting time into this PR. Based on my understanding this PR does 2 things: if wakeup() is invoked before calling poll(), the consumer will return immediately. If wakeup() is invoked during poll(), we should get a wakeupException and return. Overall I think it looks right. *while writing this I think @kirktrue has asked the questions I wanted to ask. |
Another point I want to make here is that the wakeup call also wakes-up the blocking client. I wonder if we also need to do that to the network thread - @kirktrue |
@kirktrue I am not sure, I understand your comment about |
Hi @cadonna - When the consumer is woken up. The WakeupTrigger should complete the future exceptionally with WakeupException. To rethrow that exception during future.get(), you will need to examine the ExecutionException kind of like this:
|
Basically future.get() API only return 3 types of exceptions: ExecutionException, InterruptedException, and Cancellation per documentation. |
@philipnee @kirktrue Thanks for your comments and explanation! I totally missed that if the future is completed exceptionally it throws an
It does not say anything about wrapping. I just saw now that the javadocs of
I misunderstood the javadocs on |
d306623
to
363fd16
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna Thanks for the PR! Generally looks good to me, but I had an idea for a slightly different implementation, and a question about testing.
|
||
private CompletableFuture<Void> setupWakeupTrigger() { | ||
final CompletableFuture<Void> wakeupFuture = new CompletableFuture<>(); | ||
return wakeupTrigger.setActiveTask(wakeupFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to scratch my head a little bit over this.
Since we already have the Wakeupable
interface, I wonder if it wouldn't be cleaner to have a new static subclass:
/** Placeholder for a task that delays wake-up until it's manually triggered */
DelayedWakeupTask(final boolean wakeUpMarker)
Then replace DelayedWakeupTask(false)
by DelayedWakeupTask(true)
in WakeupTrigger.wakeup
and move maybeTriggerWakeup
to WakeupTrigger
, rename it to maybeTriggerDelayedWakeup
.
Then we'd have fewer code paths to worry about (no non-exceptional completion, no other possible exceptions where we ourselves don't know what to do and just throw and IllegalStateException
).
Its just an idea, could be that I overlooked something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to not use a future, but a little bit different from you proposal. The code became indeed simpler.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Show resolved
Hide resolved
@@ -415,6 +439,27 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { | |||
return ConsumerRecords.empty(); | |||
} finally { | |||
kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); | |||
wakeupTrigger.clearActiveTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not 100% sure we are providing the guarantee stated in the javadoc of the consumer here.
This would be:
* If no thread is blocking in a method which can throw {@link org.apache.kafka.common.errors.WakeupException}, the next call to such a method will raise it instead.
But if the current fetch returns records, we will never throw a WakeupException
, not from this poll
nor the next. Should we keep the "activeTask" around strictly until we are ready to throw a WakeupException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right @lucasbru - We need to ensure WakeupException will also be thrown when wakeup() is invoked. I wonder if we need to wrap the whole poll() in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. We no longer need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, that's indeed a lot cleaner without the Future! LGTM
I am still not sure what the conclusion is regarding: #14746 (comment) |
hi @cadonna - I think we probably don't have to worry about it here. I have one comment: Is it possible to test poll would return normally after the consumer is woken up and invoke poll again? |
That is a good idea! |
@philipnee @kirktrue @lucasbru I am concerned that when one calls |
Yes. The application thread remains blocked, so this doesn't really wake the application thread up.
I'd say we signal
There are other ideas, but I think they are more problematic:
|
Thanks for the options, @lucasbru! In the meanwhile, I also had an idea to store the reference to the application thread in a wakeupable task similar to your |
Can we add the |
Hi @cadonna - I was about to reply with the same idea you proposed. I think that would work. I wonder if we could just use a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continue on my comment - Do you think it is worth adding a ticket about using the BackgroundEventQueue for the fetches? Let me know if you have a concrete idea about how to implement this.
@cadonna - There are some build failures. Can we rerun the tests? |
As far as I see from the javadocs of a |
Let me think about it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks a lot for solving this tricky problem. I added two nits, use at your own discretion
@@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable { | |||
private final Condition notEmptyCondition; | |||
private final IdempotentCloser idempotentCloser = new IdempotentCloser(); | |||
|
|||
private final AtomicBoolean wakedUp = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wokenUp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, boy! How embarrassing!
@@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) { | |||
} | |||
} | |||
|
|||
void wakeup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wakeup
vs. wakeUp
capitalization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the same format as on the Consumer
interface. Same for my recent change to wokenup
.
@@ -166,7 +169,7 @@ void awaitNotEmpty(Timer timer) { | |||
try { | |||
lock.lock(); | |||
|
|||
while (isEmpty()) { | |||
while (isEmpty() && !wakedUp.compareAndSet(true, false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I was first convinced that we have an interleaving here that misses the wakeup, but wakedUp
takes care.
643662b
to
c5c19d6
Compare
…nsumer.poll() We need to be careful when aborting a long poll with wakeup() since the consumer might never return records if the poll is interrupted after the consumer position has been updated but the records have not been returned to the caller of poll(). This PR avoid wake-ups during this critical period.
c5c19d6
to
6defbb8
Compare
Build failures are unrelated. |
…4746) We need to be careful when aborting a long poll with wakeup() since the consumer might never return records if the poll is interrupted after the consumer position has been updated but the records have not been returned to the caller of poll(). This PR avoid wake-ups during this critical period. Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
…4746) We need to be careful when aborting a long poll with wakeup() since the consumer might never return records if the poll is interrupted after the consumer position has been updated but the records have not been returned to the caller of poll(). This PR avoid wake-ups during this critical period. Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
…4746) We need to be careful when aborting a long poll with wakeup() since the consumer might never return records if the poll is interrupted after the consumer position has been updated but the records have not been returned to the caller of poll(). This PR avoid wake-ups during this critical period. Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
We need to be careful when aborting a long poll with wakeup() since the consumer might never return records if the poll is interrupted after the consumer position has been updated but the records have not been returned to the caller of poll().
This PR avoids wake-ups during this critical period in the PrototypeAsyncConsumer, similarly as in the current consumer.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)