Skip to content

Commit 0109a3f

Browse files
authored
KAFKA-15045: (KIP-924 pt. 17) State store computation fixed (#16194)
Fixed the calculation of the store name list based on the subtopology being accessed. Also added a new test to make sure this new functionality works as intended. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
1 parent 52514a8 commit 0109a3f

File tree

5 files changed

+67
-7
lines changed

5 files changed

+67
-7
lines changed

checkstyle/suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@
200200
files="StreamThread.java"/>
201201

202202
<suppress checks="ClassDataAbstractionCoupling"
203-
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
203+
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl).java"/>
204204

205205
<suppress checks="CyclomaticComplexity"
206206
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>

streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java

+37-4
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,12 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
144144

145145
private String applicationId = null;
146146

147+
// keyed by subtopology id
147148
private Map<Integer, Set<String>> nodeGroups = null;
148149

150+
// keyed by subtopology id
151+
private Map<Integer, Set<String>> subtopologyIdToStateStoreNames = null;
152+
149153
// The name of the topology this builder belongs to, or null if this is not a NamedTopology
150154
private final String topologyName;
151155
// TODO KAFKA-13336: we can remove this reference once we make the Topology/NamedTopology class into an interface and implement it
@@ -937,14 +941,15 @@ private int putNodeGroupName(final String nodeName,
937941
* @return the full topology minus any global state
938942
*/
939943
public synchronized ProcessorTopology buildTopology() {
940-
final Set<String> nodeGroup = new HashSet<>();
944+
final Set<String> allNodes = new HashSet<>();
941945
for (final Set<String> value : nodeGroups().values()) {
942-
nodeGroup.addAll(value);
946+
allNodes.addAll(value);
943947
}
944-
nodeGroup.removeAll(globalNodeGroups());
948+
allNodes.removeAll(globalNodeGroups());
945949

946950
initializeSubscription();
947-
return build(nodeGroup);
951+
initializeSubtopologyIdToStateStoreNamesMap();
952+
return build(allNodes);
948953
}
949954

950955
/**
@@ -1500,6 +1505,34 @@ private boolean isGlobalSource(final String nodeName) {
15001505
return false;
15011506
}
15021507

1508+
public Set<String> stateStoreNamesForSubtopology(final int subtopologyId) {
1509+
return subtopologyIdToStateStoreNames.get(subtopologyId);
1510+
}
1511+
1512+
private void initializeSubtopologyIdToStateStoreNamesMap() {
1513+
final Map<Integer, Set<String>> storeNames = new HashMap<>();
1514+
1515+
for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
1516+
final Set<String> subtopologyNodes = nodeGroup.getValue();
1517+
final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(subtopologyNodes);
1518+
1519+
if (!isNodeGroupOfGlobalStores) {
1520+
final int subtopologyId = nodeGroup.getKey();
1521+
final Set<String> subtopologyStoreNames = new HashSet<>();
1522+
1523+
for (final String nodeName : subtopologyNodes) {
1524+
final AbstractNode node = nodeFactories.get(nodeName).describe();
1525+
if (node instanceof Processor) {
1526+
subtopologyStoreNames.addAll(((Processor) node).stores());
1527+
}
1528+
}
1529+
1530+
storeNames.put(subtopologyId, subtopologyStoreNames);
1531+
}
1532+
}
1533+
subtopologyIdToStateStoreNames = storeNames;
1534+
}
1535+
15031536
public TopologyDescription describe() {
15041537
final TopologyDescription description = new TopologyDescription(topologyName);
15051538

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -570,8 +570,7 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe
570570
Function.identity(),
571571
taskId -> {
572572
final Set<String> stateStoreNames = topologyMetadata
573-
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
574-
.keySet();
573+
.stateStoreNamesForSubtopology(taskId.topologyName(), taskId.subtopology());
575574
final Set<TaskTopicPartition> topicPartitions = topicPartitionsForTask.get(taskId);
576575
return new DefaultTaskInfo(
577576
taskId,

streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java

+4
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,10 @@ public Map<String, List<String>> stateStoreNameToSourceTopicsForTopology(final S
521521
return lookupBuilderForNamedTopology(topologyName).stateStoreNameToFullSourceTopicNames();
522522
}
523523

524+
public Set<String> stateStoreNamesForSubtopology(final String topologyName, final int subtopologyId) {
525+
return lookupBuilderForNamedTopology(topologyName).stateStoreNamesForSubtopology(subtopologyId);
526+
}
527+
524528
public Map<String, List<String>> stateStoreNameToSourceTopics() {
525529
final Map<String, List<String>> stateStoreNameToSourceTopics = new HashMap<>();
526530
applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToFullSourceTopicNames()));

streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import static org.hamcrest.CoreMatchers.is;
7272
import static org.hamcrest.MatcherAssert.assertThat;
7373
import static org.hamcrest.Matchers.not;
74+
import static org.hamcrest.Matchers.nullValue;
7475
import static org.hamcrest.core.IsInstanceOf.instanceOf;
7576
import static org.junit.Assert.assertEquals;
7677
import static org.junit.Assert.assertFalse;
@@ -568,6 +569,29 @@ public void testAddStateStore() {
568569
assertEquals(storeBuilder.name(), suppliers.get(0).name());
569570
}
570571

572+
@Test
573+
public void testStateStoreNamesForSubtopology() {
574+
builder.addStateStore(storeBuilder);
575+
builder.setApplicationId("X");
576+
577+
builder.addSource(null, "source-1", null, null, null, "topic-1");
578+
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1");
579+
builder.connectProcessorAndStateStores("processor-1", storeBuilder.name());
580+
581+
builder.addSource(null, "source-2", null, null, null, "topic-2");
582+
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2");
583+
584+
builder.buildTopology();
585+
final Set<String> stateStoreNames = builder.stateStoreNamesForSubtopology(0);
586+
assertThat(stateStoreNames, equalTo(mkSet(storeBuilder.name())));
587+
588+
final Set<String> emptyStoreNames = builder.stateStoreNamesForSubtopology(1);
589+
assertThat(emptyStoreNames, equalTo(mkSet()));
590+
591+
final Set<String> stateStoreNamesUnknownSubtopology = builder.stateStoreNamesForSubtopology(13);
592+
assertThat(stateStoreNamesUnknownSubtopology, nullValue());
593+
}
594+
571595
@Test
572596
public void shouldAllowAddingSameStoreBuilderMultipleTimes() {
573597
builder.setApplicationId("X");

0 commit comments

Comments
 (0)