-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API #14353
Conversation
); | ||
} | ||
|
||
public Builder(String groupId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wire the member id/epoch in the builder.
@@ -56,7 +56,7 @@ | |||
"about": "Each group we would like to fetch offsets for", "fields": [ | |||
{ "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId", | |||
"about": "The group ID."}, | |||
{ "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": null, "ignorable": true, | |||
{ "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": "null", "ignorable": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found this bug with the tests. Yeah!
.setMemberId(offsetFetchRequest.memberId) | ||
.setMemberEpoch(offsetFetchRequest.memberEpoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also found this one. Yeah!
@@ -14,233 +14,532 @@ | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
|
|||
package kafka.server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote all this test suite. The new one covers all the old cases as well, I think.
for (TopicPartition topicPartition : partitions) { | ||
String topicName = topicPartition.topic(); | ||
OffsetFetchRequestTopic topic = offsetFetchRequestTopicMap.getOrDefault( | ||
topicName, new OffsetFetchRequestTopic().setName(topicName)); | ||
OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this object represents one topic, does it make sense for the name to be plural?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this does not make sense but this is how the schema is defined. I can try to come up with a better name but I would do it separately.
import scala.jdk.CollectionConverters._ | ||
import scala.reflect.ClassTag | ||
|
||
class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the goal here to create a new baseRequestTest class? I noticed we didn't extend the existing BaseRequestTest class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't reuse BaseRequestTest
when using @ClusterTest
. Here, I created GroupCoordinatorBaseRequestTest
to group the common functionalities required to test Group Coordinator Apis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. The naming reminds me of BaseRequest test and we have the same/similar connectAndReceive
method. I think this is fine, but these test files do start to get confusing when we have so many similarly named but very different classes. Not something we need to address in this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. I was considering using a test context class instead of using a base class. I am not sure if it would really be better though... What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is ok as it is. I wonder if naming helps? Either way we can do as a followup
@ClusterTest(serverProperties = Array( | ||
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), | ||
new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), | ||
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the current way we enable setting new consumer group protocol? Setting these two configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, at the moment, you have to use unstable.api.versions.enable
and group.coordinator.new.enable
. This will change when we release it though. It will be based on the metadata version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't ask this clearly. For testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator
and testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator
the only difference was the timeout configs:
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000")
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000")
Is this how we enable the new group protocol? Both methods set unstable.api.versions.enable
and group.coordinator.new.enable
to be true. Should the second one have the api versions be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two settings that you mentioned are regular configs for the new protocol. They don't enable the new protocol. As I said in my previous comment, the new protocol is automatically enabled when unstable.api.versions.enable
and group.coordinator.new.enable
are set at the moment. Later, the metadata version will be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like maybe I'm missing something obvious but the test is called testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator()
so it says we use the old protocol.
But
the new protocol is automatically enabled when unstable.api.versions.enable and group.coordinator.new.enable are set at the moment.
So should one of them be false so we have the old consumer group protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testOffsetCommit(false)
}
Whether the new or the old protocol is used is defined by the parameter passed to testOffsetCommit
. It is not used automatically because it is available. I need to set unstable.api.versions.enable
here to test the unreleased versions of the offset commit/fetch apis. I know.. This is confusing...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok -- got it. Thanks for clarifying. That's what I was looking for.
} | ||
|
||
// Start from version 1 because version 0 goes to ZK. | ||
for (version <- 1 to ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should do we want to consider the empty group ID case here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add it. It is similar for the unknown
case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry -- maybe I was a bit unclear here too. Can we have the "null" group ID in this case? Or do we never have this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. GroupId
is not nullable so that would throw a NPE on the client side. This is the reason why I did not test it. This does not bring any value from an integration test point of view.
numPartitions = 3 | ||
) | ||
|
||
// Create groups and commits offsets. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 'commit' or 'creates'
) | ||
) | ||
|
||
assertEquals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment here this is the unknown group test case.
testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true) | ||
} | ||
|
||
private def testSingleGroupOffsetFetch(useNewProtocol: Boolean, requireStable: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is a little large and contains quite a few smaller test cases within. Is there a way we can break it up a bit more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I can separate the fetch offsets vs fetch all offsets cases.
groups.foreach { groupId => | ||
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) | ||
commitOffsets(partitionMap(groupId)) | ||
private def testMultipleGroupsOffsetFetch(useNewProtocol: Boolean, requireStable: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another "my understanding" question. 😅 Do only admin clients request multiple group offsets at once? I probably should have realized this earlier, but I see we don't pass member id or epoch in this version of the request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's correct.
@jolshan @rreddy-22 Thanks for your comments. I have addressed all of them if not mistaken. I have also fixed a few tests and extracted common code into the base class. Please have a second look a it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
…setFetch API (apache#14353) This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol. Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>
…setFetch API (apache#14353) This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol. Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol.
Committer Checklist (excluded from commit message)