Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15834

Subscribing to non-existent topic blocks StreamThread from stopping

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.6.0
    • 3.8.0
    • streams
    • None

    Description

      In NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics a topology is created which references an input topic which does not exist. The test as-written passes, but the KafkaStreams#close(Duration) at the end times out, and leaves StreamsThreads running.

      From some cursory investigation it appears that this is happening:
      1. The consumer calls the StreamsPartitionAssignor, which calls TaskManager#handleRebalanceStart as a side-effect
      2. handleRebalanceStart sets the rebalanceInProgress flag
      3. This flag is checked by StreamThread.runLoop, and causes the loop to remain running.
      4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, because the topic does not exist
      5. Because no partitions are ever assigned, the TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
       
      This log message is printed in a tight loop while the close is ongoing and the consumer is being polled with zero duration:

      [2023-11-15 11:42:43,661] WARN [Consumer clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer, groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics] Received unknown topic or partition error in fetch for partition unique_topic_prefix-topology-1-store-repartition-0 (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
      

      Practically, this means that this test leaks two StreamsThreads and the associated clients and sockets, and delays the completion of the test until the KafkaStreams#close(Duration) call times out.

      Either we should change the rebalanceInProgress flag to avoid getting stuck in this rebalance state, or figure out a way to shut down a StreamsThread that is in an extended rebalance state during shutdown.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gharris1727 Greg Harris
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: