Details
Description
Hi, We are using kstreams to get the aggregated counts per vendor(key) within a specified window.
Here's how we configured the suppress operator to emit one final record per key/window.
KTable<Windowed<Integer>, Long> windowedCount = groupedStream .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) .count(Materialized.with(Serdes.Integer(),Serdes.Long())) .suppress(Suppressed.untilWindowCloses(unbounded()));
But we are getting more than one record for the same key/window as shown below.
[KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039 [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162 [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584 [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107 [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315 [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119 [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746 [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809
Could you please take a look?
Thanks
Added by John:
Acceptance Criteria:
- add suppress to system tests, such that it's exercised with crash/shutdown recovery, rebalance, etc.
- make sure that there's some system test coverage with caching disabled.
- Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
- test with tighter time bounds with windows of say 30 seconds and use system time without adding any extra time for verification
- Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944
Attachments
Issue Links
- is related to
-
KAFKA-8204 Streams may flush state stores in the incorrect order
- Resolved
-
KAFKA-7943 Add Suppress system test with caching disabled.
- Open
-
KAFKA-7944 Add more natural Suppress test
- Resolved
- links to