-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14581: Moving GetOffsetShell to tools #13562
Conversation
Do you forgot to remove
If you are interested, you can change GetOffsetShellTest to use |
Yes, since it's WIP and I have questions, I left original files. |
@dengziming, could you help me in this situation? |
@ruslankrivoshein Are you encountering the issue in kraft or zk mode, you can push you code firstly, then we can take a look. � |
try(Admin admin = Admin.create(cluster.config().adminClientProperties())) { | ||
List<NewTopic> topics = new ArrayList<>(); | ||
|
||
IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short)1))); | ||
|
||
admin.createTopics(topics); | ||
} | ||
|
||
Properties props = new Properties(); | ||
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers")); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
|
||
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) { | ||
IntStream.range(0, topicCount + 1) | ||
.forEach(i -> IntStream.range(0, i * i) | ||
.forEach(msgCount -> producer.send( | ||
new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount))) | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to fit the tests for that type of test extensions, but now I struggle with environment preparation. I don't know how to get access to cluser
in @BeforeEach
. @ClusterTest
annotation doesn't help, so I'd like to know, how can I create topics before tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are allowed to access cluster
in setUp method, for example, in ApiVersionsRequestTest
we use cluster.isKraft
in setUp(), I will take more time to inspect this PR later.
0827a35
to
ae41e8f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dengziming, please, take a look. I've finished and I'm ready to make it better.
|
||
@ClusterTest | ||
public void testInternalExcluded() { | ||
setUp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This thing confuses me, but I couldn't find any workaround
public void testGetLatestOffsets() { | ||
setUp(); | ||
|
||
for (String time : new String[] {"-1", "latest"}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also didn't find a way how to use ParametrizedTest
and ValueSource
in the same place with ClusterTest
. It throws error about ParameterResolver
, so I did this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ruslankrivoshein, thanks for working on this. The code looks almost good, but there are some issues to fix and some little improvements we can do.
First of all, after importing commits from #13158, I had a couple of checkstyle errors, which I fixed by removing import org.apache.kafka.server.util.TopicFilter.IncludeList
and using TopicFilter.IncludeList
in TopicPartitionFilter
and TopicFilterTest
classes. I also removed <allow class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
from import-control-server-common.xml
. You may have to rebase in order to see this.
You still have to update bin/kafka-get-offsets.sh
and bin/windows/kafka-get-offsets.bat
. They are now referencing a class that do no exists.
As a little improvement, it would be good to raise a warning when a deprecated option is used (e.g. --broker-list). We can add a note in KAFKA-14705 as a reminder to remove deprecated options in the next major release.
When an error occurs, I get a stack trace, in addition to the error message, but that was not the case in the previous implementation. You can use TerseException
. I'm adding some examples here.
$ bin/kafka-get-offsets.sh --topic my-topic --time -3
joptsimple.MissingRequiredOptionsException: Missing required option(s) [bootstrap-server]
at joptsimple.OptionParser.ensureRequiredOptions(OptionParser.java:426)
at joptsimple.OptionParser.parse(OptionParser.java:400)
at org.apache.kafka.tools.GetOffsetShell.parseArgs(GetOffsetShell.java:143)
at org.apache.kafka.tools.GetOffsetShell.execute(GetOffsetShell.java:82)
at org.apache.kafka.tools.GetOffsetShell.mainNoExit(GetOffsetShell.java:70)
at org.apache.kafka.tools.GetOffsetShell.main(GetOffsetShell.java:65)
$ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets --time -1 --exclude-internal-topics
Could not match any topic-partitions with the specified filters
java.lang.IllegalArgumentException: Could not match any topic-partitions with the specified filters
at org.apache.kafka.tools.GetOffsetShell.fetchOffsets(GetOffsetShell.java:195)
at org.apache.kafka.tools.GetOffsetShell.execute(GetOffsetShell.java:84)
at org.apache.kafka.tools.GetOffsetShell.mainNoExit(GetOffsetShell.java:70)
at org.apache.kafka.tools.GetOffsetShell.main(GetOffsetShell.java:65)
$ bin/kafka-get-offsets.sh --broker-list :9092 --topic my-topic --time -1 --partitions 1
Could not match any topic-partitions with the specified filters
java.lang.IllegalArgumentException: Could not match any topic-partitions with the specified filters
at org.apache.kafka.tools.GetOffsetShell.fetchOffsets(GetOffsetShell.java:195)
at org.apache.kafka.tools.GetOffsetShell.execute(GetOffsetShell.java:84)
at org.apache.kafka.tools.GetOffsetShell.mainNoExit(GetOffsetShell.java:70)
at org.apache.kafka.tools.GetOffsetShell.main(GetOffsetShell.java:65
The GetOffsetShellTest
system test is failing because you are still referencing the old class. Please, fix that and run all 10 tests included in that module.
import java.util.stream.Collectors; | ||
|
||
public class GetOffsetShell { | ||
Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this private as in the original code.
|
||
String brokerList = options.valueOf(effectiveBrokerListOpt); | ||
|
||
ToolsUtils.validatePortOrDie(parser, brokerList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to be consistent with other tools and utility clasees, I would call it validatePortOrExit
.
} | ||
|
||
private OptionSet options; | ||
private OptionSpec<String> topicPartitionsOpt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not embed all the options in a private static class that can also hosts various utility methods on them? You can look at the new JmxTool
for an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that's not exactly a duplicate, as mine also creates the JmxTool runnable for convenience. For the last part, I guess I didn't want to break ToolsTestUtils visibility.
@fvaleri, I worked a bit after your comments and still have a few questions. |
From Kafka project home dir, build with
For this tool, search for deprecated in the class and you'll find |
b780b01
to
b3c96e0
Compare
@fvaleri, will be this way good?
Also I've finished with exceptions and stack trace |
Yes, but just use |
Well, that's it:
|
Hi @ruslankrivoshein, thanks for adding the warning message. Not sure if this is ready for another review, but I still see some issues. The checkstyle phase is still failing, I suggested a way to fix it in one of my previous comments. I don't see the stacktrace anymore in case of wrong options, but the error messages are not exactly the same as before. In general, we should match the old behavior when reporting errors. # before
$ bin/kafka-get-offsets.sh --topic my-topic --time -3
Error occurred: Missing required option(s) [bootstrap-server]
# now
$ bin/kafka-get-offsets.sh --topic my-topic --time -3
Missing required option(s) [bootstrap-server]
Option Description
------ -----------
--bootstrap-server <String: HOST1: REQUIRED. The server(s) to connect to
PORT1,...,HOST3:PORT3> in the form HOST1:PORT1,HOST2:PORT2.
--broker-list <String: HOST1:PORT1,..., DEPRECATED, use --bootstrap-server
HOST3:PORT3> instead; ignored if --bootstrap-
server is specified. The server(s)
to connect to in the form HOST1:
PORT1,HOST2:PORT2.
...
# before
$ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets --time -1 --exclude-internal-topics
Error occurred: Could not match any topic-partitions with the specified filters
# now
$ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets --time -1 --exclude-internal-topics
Could not match any topic-partitions with the specified filters Finally, the ./gradlew clean systemTestLibs
TC_PATHS="tests/kafkatest/tests/core/get_offset_shell_test.py" bash tests/docker/run_tests.sh Hope it helps. |
This is to let you know that TopicFilter and related classes have been merged, so you can rebase this PR from trunk. |
Thanks @fvaleri and sorry for the really long delay from my side on the other PR |
87cda86
to
29a245d
Compare
Hey @ruslankrivoshein, about the deprecation warning message, it looks like it is considered a public interface change, and a committer may ask to remove it or create a KIP, so please remove it for now. Sorry, my fault. |
I hope it's done. Then I restored error messages and removed usage print. Actually, here I get also messages from SLF4J. Did you see anything like this before? Could you recommend any way to fix it locally?
@fvaleri, please, take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just a couple of comments.
The system test runs fine with the new implementation. One thing we could improve is to avoid using the fully qualified class name (FQCN) and instead use the wrapper script name. This is something I'm also doing in another tool migration:
Actually, here I get also messages from SLF4J. Did you see anything like this before?
It happens only when running scripts from the source (see KAFKA-2875). Do not worry about it.
import java.util.Arrays; | ||
|
||
public class ToolsUtils { | ||
public static void validatePortOrExit(OptionParser parser, String hostPort) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you just translated this to Java, but I think we can add null/empty checks for the input parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And what should it do when null?
Could you also tell me more about improvement? I can't find FQCN in my implementation. And how it must be corrected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And what should it do when null?
The parser can be null right? We don't want to throw a NPE to the user. You can print an error in that case and exit.
The hostPort can be null or empty. Same story.
Could you also tell me more about improvement? I can't find FQCN in my implementation. And how it must be corrected?
It's in tests/kafkatest/services/kafka/kafka.py, where you already changed the package name. My suggestion is to use the wrapper script, instead of the FQCN. That way, if the package name needs to be changed again in the future, we won't need to touch the system test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have ideas about how it should look like? I see there 3 FQCN. What's the way to put this names in that script?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are only a couple of FQCN used in tests/kafkatest/services/kafka/kafka.py, but we are only concerned about kafka-get-offsets, the other will be addressed in its own PR. The change is very simple in the end:
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index f116f92c3b..ab48d1091d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1777,8 +1777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
- cmd += self.path.script("kafka-run-class.sh", node)
- cmd += " org.apache.kafka.tools.GetOffsetShell"
+ cmd += self.path.script("kafka-get-offsets.sh", node)
cmd += " --bootstrap-server %s" % self.bootstrap_servers(self.security_protocol)
if time:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the code, so I can't comment further, but this reveals a programming error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fvaleri ah, I'm sorry, I investigated it finally and worked around.
What do you think about this approach?
public static void validatePortOrExit(OptionParser parser, String hostPort) {
if (parser == null || hostPort == null || hostPort.isEmpty()) {
CommandLineUtils.printVersionAndExit();
return;
}
}
Should I use printUsageAndExit if parser will be null or what will be better to indicate an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe validateBootstrapServerOrExit would be a better method name. If you print the version, the user will have no clue about what went wrong. I would rather print an error message that makes sense, something like "Error while validating the bootstrap address".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fvaleri what do you think, do we really need here "parser"? What about removing usage of CommandLineUtils and just to print message into stderr? Because it also looks weird that it validates a param, but requires some parser.
I'd suggest to throw there an exception and to call "printUsageAndExit" from GetOffsetShell directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think that's a reasonable approach.
@ruslankrivoshein There are still conflicts in this PR, can you rebase it again and update the description? |
c574fe7
to
b22ec40
Compare
@fvaleri I summon thee. Please, take one more look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ruslankrivoshein, just a minor change, then lgtm.
System tests are ok.
SESSION REPORT (ALL TESTS)
ducktape version: 0.11.3
session_id: 2023-08-05--001
run time: 1 minute 42.487 seconds
tests run: 10
passed: 10
flaky: 0
failed: 0
ignored: 0
@@ -99,4 +101,26 @@ public static void prettyPrintTable( | |||
printRow(columnLengths, headers, out); | |||
rows.forEach(row -> printRow(columnLengths, row, out)); | |||
} | |||
|
|||
public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException { | |||
if (hostPort == null || hostPort.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Java 11 we can use isBlank()
that also ignores white spaces. Unfortunately, we still need to provide support for Java 8, where we can achieve the same by doing hostPort.trim().isEmpty()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still keep this file can forward all args to new GetOffsetShell, see FeatureCommand.scala
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dengziming, this is tool was reported in the "missing wrapper script" category in KIP-906, but that was only because some system tests are depending on the FQCN. In this PR we are also changing that, and they are now using the wrapper script instead, so I think there is no need for the redirection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dengziming are you good with this?
@mimaison please, take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for this pr, please update the description to list your changes, and rebase the trunk to trigger the CI.
24addc9
to
5b84cc3
Compare
@dengziming or @fvaleri, could you clarify it to me, please? |
@ruslankrivoshein I mean that you should update the description of this PR, and clarify what you have changed in this PR, then we can merge this if CI pass. |
@dengziming is there any template for this or an example? |
@ruslankrivoshein Maybe you didn't get my idea, I mean the do you get it? 👀 |
@dengziming, done. |
Test failures are unrelated
|
@fvaleri I think it's time to take a note about this changes in KAFKA-14705, isn't it? |
This should fall under the phrase: "4. We should also get rid of many deprecated options across all tools, including not migrated tools." If you want to provide a list of all deprecated options across all tools, that would be great, and I will be happy to review. |
This PR moves GetOffsetShell from core module to tools module with rewriting from Scala to Java. Reviewers: Federico Valeri fedevaleri@gmail.com, Ziming Deng dengziming1993@gmail.com, Mickael Maison mimaison@apache.org.
This PR moves GetOffsetShell from core module to tools module with rewriting from Scala to Java. Reviewers: Federico Valeri fedevaleri@gmail.com, Ziming Deng dengziming1993@gmail.com, Mickael Maison mimaison@apache.org.
This PR moves GetOffsetShell from core module to tools module with rewriting from Scala to Java.