Description
When the DistributedHerder::stop() method called, it triggers asynchronous shutdown of the background herder thread, and continues with synchronous shutdown of some other resources, including the stopAndStartExecutor.
This executor is responsible for cleanly stopping connectors and tasks, which it the DistributedHerder::halt() method. There is a race condition between the halt() method asynchronously submitting these connector/task stop jobs and the stop() method terminating the executor. If the executor is terminated first, this exception appears:
[2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2, groupId=connect-integration-test-connect-cluster-1] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:366) java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@62878e25[Not completed, task = org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3] rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) at java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)
Attachments
Issue Links
- links to