Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API #14353

Merged
merged 10 commits into from
Sep 11, 2023

Conversation

dajac
Copy link
Member

@dajac dajac commented Sep 7, 2023

This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dajac dajac added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Sep 7, 2023
);
}

public Builder(String groupId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wire the member id/epoch in the builder.

@@ -56,7 +56,7 @@
"about": "Each group we would like to fetch offsets for", "fields": [
{ "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID."},
{ "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": null, "ignorable": true,
{ "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": "null", "ignorable": true,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found this bug with the tests. Yeah!

Comment on lines +1497 to +1498
.setMemberId(offsetFetchRequest.memberId)
.setMemberEpoch(offsetFetchRequest.memberEpoch)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also found this one. Yeah!

@@ -14,233 +14,532 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote all this test suite. The new one covers all the old cases as well, I think.

for (TopicPartition topicPartition : partitions) {
String topicName = topicPartition.topic();
OffsetFetchRequestTopic topic = offsetFetchRequestTopicMap.getOrDefault(
topicName, new OffsetFetchRequestTopic().setName(topicName));
OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this object represents one topic, does it make sense for the name to be plural?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this does not make sense but this is how the schema is defined. I can try to come up with a better name but I would do it separately.

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal here to create a new baseRequestTest class? I noticed we didn't extend the existing BaseRequestTest class

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't reuse BaseRequestTest when using @ClusterTest. Here, I created GroupCoordinatorBaseRequestTest to group the common functionalities required to test Group Coordinator Apis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. The naming reminds me of BaseRequest test and we have the same/similar connectAndReceive method. I think this is fine, but these test files do start to get confusing when we have so many similarly named but very different classes. Not something we need to address in this PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. I was considering using a test context class instead of using a base class. I am not sure if it would really be better though... What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok as it is. I wonder if naming helps? Either way we can do as a followup

@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the current way we enable setting new consumer group protocol? Setting these two configs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, at the moment, you have to use unstable.api.versions.enable and group.coordinator.new.enable. This will change when we release it though. It will be based on the metadata version.

Copy link
Member

@jolshan jolshan Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't ask this clearly. For testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator and testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator the only difference was the timeout configs:

new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000")
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000")

Is this how we enable the new group protocol? Both methods set unstable.api.versions.enable and group.coordinator.new.enable to be true. Should the second one have the api versions be false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two settings that you mentioned are regular configs for the new protocol. They don't enable the new protocol. As I said in my previous comment, the new protocol is automatically enabled when unstable.api.versions.enable and group.coordinator.new.enable are set at the moment. Later, the metadata version will be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like maybe I'm missing something obvious but the test is called testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator() so it says we use the old protocol.

But

the new protocol is automatically enabled when unstable.api.versions.enable and group.coordinator.new.enable are set at the moment.

So should one of them be false so we have the old consumer group protocol?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
    testOffsetCommit(false)
  }

Whether the new or the old protocol is used is defined by the parameter passed to testOffsetCommit. It is not used automatically because it is available. I need to set unstable.api.versions.enable here to test the unreleased versions of the offset commit/fetch apis. I know.. This is confusing...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- got it. Thanks for clarifying. That's what I was looking for.

}

// Start from version 1 because version 0 goes to ZK.
for (version <- 1 to ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should do we want to consider the empty group ID case here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add it. It is similar for the unknown case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry -- maybe I was a bit unclear here too. Can we have the "null" group ID in this case? Or do we never have this case?

Copy link
Member Author

@dajac dajac Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. GroupId is not nullable so that would throw a NPE on the client side. This is the reason why I did not test it. This does not bring any value from an integration test point of view.

numPartitions = 3
)

// Create groups and commits offsets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 'commit' or 'creates'

)
)

assertEquals(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment here this is the unknown group test case.

testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true)
}

private def testSingleGroupOffsetFetch(useNewProtocol: Boolean, requireStable: Boolean): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is a little large and contains quite a few smaller test cases within. Is there a way we can break it up a bit more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I can separate the fetch offsets vs fetch all offsets cases.

groups.foreach { groupId =>
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
commitOffsets(partitionMap(groupId))
private def testMultipleGroupsOffsetFetch(useNewProtocol: Boolean, requireStable: Boolean): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another "my understanding" question. 😅 Do only admin clients request multiple group offsets at once? I probably should have realized this earlier, but I see we don't pass member id or epoch in this version of the request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's correct.

@dajac
Copy link
Member Author

dajac commented Sep 8, 2023

@jolshan @rreddy-22 Thanks for your comments. I have addressed all of them if not mistaken. I have also fixed a few tests and extracted common code into the base class. Please have a second look a it.

@dajac dajac mentioned this pull request Sep 11, 2023
3 tasks
Copy link
Member

@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@dajac dajac merged commit 6123071 into apache:trunk Sep 11, 2023
@dajac dajac deleted the KAFKA-14499-7 branch September 11, 2023 17:48
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
…setFetch API (apache#14353)

This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>
Cerchie pushed a commit to Cerchie/kafka that referenced this pull request Feb 22, 2024
…setFetch API (apache#14353)

This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants