Skip to content

Commit ef2c5e4

Browse files
authored
KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState (#15972)
This rack information is required to compute rack-aware assignments, which many of the current assigners do. The internal ClientMetadata class was also edited to pass around this rack information. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
1 parent a753172 commit ef2c5e4

File tree

12 files changed

+489
-76
lines changed

12 files changed

+489
-76
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java

+1-14
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.util.Map;
2020
import java.util.Set;
21-
import org.apache.kafka.streams.processor.TaskId;
2221
import org.apache.kafka.streams.errors.TaskAssignmentException;
2322

2423
/**
@@ -49,17 +48,5 @@ public interface ApplicationState {
4948
/**
5049
* @return the set of all tasks in this topology which must be assigned
5150
*/
52-
Set<TaskId> allTasks();
53-
54-
/**
55-
*
56-
* @return the set of stateful and changelogged tasks in this topology
57-
*/
58-
Set<TaskId> statefulTasks();
59-
60-
/**
61-
*
62-
* @return the set of stateless or changelog-less tasks in this topology
63-
*/
64-
Set<TaskId> statelessTasks();
51+
Set<TaskInfo> allTasks();
6552
}

streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java

+31-8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,37 @@ public class KafkaStreamsAssignment {
3333
private final Set<AssignedTask> assignment;
3434
private final Optional<Instant> followupRebalanceDeadline;
3535

36+
/**
37+
* Construct an instance of KafkaStreamsAssignment with this processId and the given set of
38+
* assigned tasks. If you want this KafkaStreams client to request a followup rebalance, you
39+
* can set the followupRebalanceDeadline via the {@link #withFollowupRebalance(Instant)} API.
40+
*
41+
* @param processId the processId for the KafkaStreams client that should receive this assignment
42+
* @param assignment the set of tasks to be assigned to this KafkaStreams client
43+
*
44+
* @return a new KafkaStreamsAssignment object with the given processId and assignment
45+
*/
46+
public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment) {
47+
return new KafkaStreamsAssignment(processId, assignment, Optional.empty());
48+
}
49+
50+
/**
51+
* This API can be used to request that a followup rebalance be triggered by the KafkaStreams client
52+
* receiving this assignment. The followup rebalance will be initiated after the provided deadline
53+
* has passed, although it will always wait until it has finished the current rebalance before
54+
* triggering a new one. This request will last until the new rebalance, and will be erased if a
55+
* new rebalance begins before the scheduled followup rebalance deadline has elapsed. The next
56+
* assignment must request the followup rebalance again if it still wants to schedule one for
57+
* the given instant, otherwise no additional rebalance will be triggered after that.
58+
*
59+
* @param rebalanceDeadline the instant after which this KafkaStreams client will trigger a followup rebalance
60+
*
61+
* @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline
62+
*/
63+
public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) {
64+
return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline));
65+
}
66+
3667
private KafkaStreamsAssignment(final ProcessId processId,
3768
final Set<AssignedTask> assignment,
3869
final Optional<Instant> followupRebalanceDeadline) {
@@ -65,14 +96,6 @@ public Optional<Instant> followupRebalanceDeadline() {
6596
return followupRebalanceDeadline;
6697
}
6798

68-
public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment) {
69-
return new KafkaStreamsAssignment(processId, assignment, Optional.empty());
70-
}
71-
72-
public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) {
73-
return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline));
74-
}
75-
7699
public static class AssignedTask {
77100
private final TaskId id;
78101
private final Type taskType;

streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java

+5
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,9 @@ public interface KafkaStreamsState {
100100
* @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
101101
*/
102102
Map<String, String> clientTags();
103+
104+
/**
105+
* @return the rackId for this KafkaStreams client, or {@link Optional#empty()} if none was configured
106+
*/
107+
Optional<String> rackId();
103108
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.processor.assignment;
18+
19+
20+
import java.util.Map;
21+
import java.util.Set;
22+
import org.apache.kafka.common.TopicPartition;
23+
import org.apache.kafka.streams.processor.TaskId;
24+
25+
/**
26+
* A simple container class corresponding to a given {@link TaskId}.
27+
* Includes metadata such as whether it's stateful and the names of all state stores
28+
* belonging to this task, the set of input topic partitions and changelog topic partitions
29+
* for all logged state stores, and the rack ids of all replicas of each topic partition
30+
* in the task.
31+
*/
32+
public interface TaskInfo {
33+
34+
/**
35+
*
36+
* @return The {@code TaskId} of the underlying task.
37+
*/
38+
TaskId id();
39+
40+
/**
41+
*
42+
* @return true if the underlying task is stateful, and false otherwise.
43+
*/
44+
boolean isStateful();
45+
46+
/**
47+
*
48+
* @return the set of state store names that this task makes use of. In the case of stateless tasks,
49+
* this set will be empty as no state stores are used.
50+
*/
51+
Set<String> stateStoreNames();
52+
53+
/**
54+
*
55+
* @return the set of source topic partitions. This set will include both changelog and non-changelog
56+
* topic partitions.
57+
*/
58+
Set<TopicPartition> sourceTopicPartitions();
59+
60+
/**
61+
*
62+
* @return the set of changelog topic partitions. This set will include both source and non-source
63+
* topic partitions.
64+
*/
65+
Set<TopicPartition> changelogTopicPartitions();
66+
67+
/**
68+
*
69+
* @return the mapping of {@code TopicPartition} to set of rack ids that this partition resides on.
70+
*/
71+
Map<TopicPartition, Set<String>> partitionToRackIds();
72+
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java

+9
Original file line numberDiff line numberDiff line change
@@ -1974,6 +1974,15 @@ public Set<InternalTopicConfig> nonSourceChangelogTopics() {
19741974
return topicConfigs;
19751975
}
19761976

1977+
/**
1978+
*
1979+
* @return the set of changelog topics, which includes both source changelog topics and non
1980+
* source changelog topics.
1981+
*/
1982+
public Set<String> changelogTopics() {
1983+
return Collections.unmodifiableSet(stateChangelogTopics.keySet());
1984+
}
1985+
19771986
/**
19781987
* Returns the topic names for any optimized source changelogs
19791988
*/

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

+85-13
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.kafka.streams.errors.TaskAssignmentException;
3939
import org.apache.kafka.streams.processor.TaskId;
4040
import org.apache.kafka.streams.processor.assignment.ApplicationState;
41+
import org.apache.kafka.streams.processor.assignment.TaskInfo;
4142
import org.apache.kafka.streams.processor.assignment.ProcessId;
4243
import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment;
4344
import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl;
@@ -52,10 +53,12 @@
5253
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
5354
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
5455
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
56+
import org.apache.kafka.streams.processor.internals.assignment.RackUtils;
5557
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
5658
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
5759
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
5860
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
61+
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
5962
import org.apache.kafka.streams.state.HostInfo;
6063
import org.slf4j.Logger;
6164

@@ -82,6 +85,7 @@
8285
import java.util.function.Supplier;
8386
import java.util.stream.Collectors;
8487

88+
import static java.util.Collections.unmodifiableSet;
8589
import static java.util.Map.Entry.comparingByKey;
8690
import static java.util.UUID.randomUUID;
8791
import static org.apache.kafka.common.utils.Utils.filterMap;
@@ -133,8 +137,9 @@ public static class ClientMetadata {
133137
private final HostInfo hostInfo;
134138
private final ClientState state;
135139
private final SortedSet<String> consumers;
140+
private final Optional<String> rackId;
136141

137-
ClientMetadata(final UUID processId, final String endPoint, final Map<String, String> clientTags) {
142+
ClientMetadata(final UUID processId, final String endPoint, final Map<String, String> clientTags, final Optional<String> rackId) {
138143

139144
// get the host info, or null if no endpoint is configured (ie endPoint == null)
140145
hostInfo = HostInfo.buildFromEndpoint(endPoint);
@@ -144,6 +149,8 @@ public static class ClientMetadata {
144149

145150
// initialize the client state with client tags
146151
state = new ClientState(processId, clientTags);
152+
153+
this.rackId = rackId;
147154
}
148155

149156
void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) {
@@ -164,6 +171,10 @@ public HostInfo hostInfo() {
164171
return hostInfo;
165172
}
166173

174+
public Optional<String> rackId() {
175+
return rackId;
176+
}
177+
167178
@Override
168179
public String toString() {
169180
return "ClientMetadata{" +
@@ -355,7 +366,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
355366
futureMetadataVersion = usedVersion;
356367
processId = FUTURE_ID;
357368
if (!clientMetadataMap.containsKey(FUTURE_ID)) {
358-
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap()));
369+
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap(), subscription.rackId()));
359370
}
360371
} else {
361372
processId = info.processId();
@@ -367,7 +378,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
367378

368379
// create the new client metadata if necessary
369380
if (clientMetadata == null) {
370-
clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags());
381+
clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags(), subscription.rackId());
371382
clientMetadataMap.put(info.processId(), clientMetadata);
372383
}
373384

@@ -474,23 +485,84 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
474485
*
475486
* @param clientMetadataMap the map of process id to client metadata used to build an immutable
476487
* {@code ApplicationState}
477-
* @param statefulTasks the set of {@code TaskId} that correspond to all the stateful
478-
* tasks that need to be reassigned.
479488
* @return The {@code ApplicationState} needed by the TaskAssigner to compute new task
480489
* assignments.
481490
*/
482-
private ApplicationState buildApplicationState(final Map<UUID, ClientMetadata> clientMetadataMap,
483-
final Set<TaskId> statefulTasks) {
484-
final Set<TaskId> statelessTasks = new HashSet<>();
485-
for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientMetadataMap.entrySet()) {
486-
final ClientState clientState = clientEntry.getValue().state;
487-
statelessTasks.addAll(clientState.statelessActiveTasks());
491+
private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata,
492+
final Map<UUID, ClientMetadata> clientMetadataMap,
493+
final Map<Subtopology, TopicsInfo> topicGroups,
494+
final Cluster cluster) {
495+
final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
496+
final Map<Subtopology, Set<String>> changelogTopicsByGroup = new HashMap<>();
497+
for (final Map.Entry<Subtopology, TopicsInfo> entry : topicGroups.entrySet()) {
498+
final Set<String> sourceTopics = entry.getValue().sourceTopics;
499+
final Set<String> changelogTopics = entry.getValue().changelogTopics();
500+
sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
501+
changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
502+
}
503+
504+
final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask =
505+
partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
506+
final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask =
507+
partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
508+
509+
if (!sourcePartitionsForTask.keySet().equals(changelogPartitionsForTask.keySet())) {
510+
log.error("Partition grouper returned {} tasks for source topics but {} tasks for changelog topics",
511+
sourcePartitionsForTask.size(), changelogPartitionsForTask.size());
512+
throw new TaskAssignmentException("Partition grouper returned conflicting information about the "
513+
+ "tasks for source topics vs changelog topics.");
488514
}
489515

516+
final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
517+
final Set<TopicPartition> nonSourceChangelogTopicPartitions = new HashSet<>();
518+
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : sourcePartitionsForTask.entrySet()) {
519+
final TaskId taskId = entry.getKey();
520+
final Set<TopicPartition> taskSourcePartitions = entry.getValue();
521+
final Set<TopicPartition> taskChangelogPartitions = changelogPartitionsForTask.get(taskId);
522+
final Set<TopicPartition> taskNonSourceChangelogPartitions = new HashSet<>(taskChangelogPartitions);
523+
taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
524+
525+
sourceTopicPartitions.addAll(taskSourcePartitions);
526+
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
527+
}
528+
529+
final Map<TopicPartition, Set<String>> racksForSourcePartitions = RackUtils.getRacksForTopicPartition(
530+
cluster, internalTopicManager, sourceTopicPartitions, false);
531+
final Map<TopicPartition, Set<String>> racksForChangelogPartitions = RackUtils.getRacksForTopicPartition(
532+
cluster, internalTopicManager, nonSourceChangelogTopicPartitions, true);
533+
534+
final Set<TaskId> logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet());
535+
final Set<TaskInfo> logicalTasks = logicalTaskIds.stream().map(taskId -> {
536+
final Set<String> stateStoreNames = topologyMetadata
537+
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
538+
.keySet();
539+
final Set<TopicPartition> sourcePartitions = sourcePartitionsForTask.get(taskId);
540+
final Set<TopicPartition> changelogPartitions = changelogPartitionsForTask.get(taskId);
541+
final Map<TopicPartition, Set<String>> racksForTaskPartition = new HashMap<>();
542+
sourcePartitions.forEach(topicPartition -> {
543+
racksForTaskPartition.put(topicPartition, racksForSourcePartitions.get(topicPartition));
544+
});
545+
changelogPartitions.forEach(topicPartition -> {
546+
if (racksForSourcePartitions.containsKey(topicPartition)) {
547+
racksForTaskPartition.put(topicPartition, racksForSourcePartitions.get(topicPartition));
548+
} else {
549+
racksForTaskPartition.put(topicPartition, racksForChangelogPartitions.get(topicPartition));
550+
}
551+
});
552+
553+
return new DefaultTaskInfo(
554+
taskId,
555+
!stateStoreNames.isEmpty(),
556+
racksForTaskPartition,
557+
stateStoreNames,
558+
sourcePartitions,
559+
changelogPartitions
560+
);
561+
}).collect(Collectors.toSet());
562+
490563
return new ApplicationStateImpl(
491564
assignmentConfigs.toPublicAssignmentConfigs(),
492-
statefulTasks,
493-
statelessTasks,
565+
logicalTasks,
494566
clientMetadataMap
495567
);
496568
}

0 commit comments

Comments
 (0)