Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15555: Ensure wakeups are handled correctly in poll() #14746

Merged
merged 12 commits into from
Nov 23, 2023
Prev Previous commit
Next Next commit
Enable FetchBufferTest to investigate OOM on Jenkins
  • Loading branch information
cadonna committed Nov 22, 2023
commit a51a794f659bb8fe937b0cd2286bd69843750530
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def isChangeRequest(env) {
env.CHANGE_ID != null && !env.CHANGE_ID.isEmpty()
}

def doTest(env, target = "clients:test --tests org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest") {
def doTest(env, target = "clients:test --tests org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest --tests org.apache.kafka.clients.consumer.internals.FetchBufferTest") {
sh """./gradlew -PscalaVersion=$SCALA_VERSION ${target} \
--profile --continue -PkeepAliveMode="session" -PtestLoggingEvents=started,passed,skipped,failed \
-PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=10"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
Expand Down Expand Up @@ -171,19 +173,19 @@ public void testAddAllAndRetainAll() {
}
}

// @Test
// public void testWakeup() throws Exception {
// try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
// final Thread waitingThread = new Thread(() -> {
// final Timer timer = time.timer(Duration.ofMinutes(1));
// fetchBuffer.awaitNotEmpty(timer);
// });
// waitingThread.start();
// fetchBuffer.wakeup();
// waitingThread.join(Duration.ofSeconds(30).toMillis());
// assertFalse(waitingThread.isAlive());
// }
// }
@Test
public void testWakeup() throws Exception {
try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
final Thread waitingThread = new Thread(() -> {
final Timer timer = time.timer(Duration.ofMinutes(1));
fetchBuffer.awaitNotEmpty(timer);
});
waitingThread.start();
fetchBuffer.wakeup();
waitingThread.join(Duration.ofSeconds(30).toMillis());
assertFalse(waitingThread.isAlive());
}
}

private CompletedFetch completedFetch(TopicPartition tp) {
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData();
Expand Down