Skip to content

Commit

Permalink
KAFKA-15045: (KIP-924 pt. 13) AssignmentError calculation added (#16114)
Browse files Browse the repository at this point in the history
This PR adds the post-processing of the TaskAssignment to figure out if the new assignment is valid, and return an AssignmentError otherwise.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
  • Loading branch information
apourchet authored May 29, 2024
1 parent 8d243df commit d64e3fb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface TaskAssignor extends Configurable {
* ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task
* ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client
* INVALID_STANDBY_TASK: stateless task assigned as a standby task
* MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
* UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers
* UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned
*/
Expand All @@ -45,6 +46,7 @@ enum AssignmentError {
ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
INVALID_STANDBY_TASK,
MISSING_PROCESS_ID,
UNKNOWN_PROCESS_ID,
UNKNOWN_TASK_ID
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment;
Expand Down Expand Up @@ -1540,6 +1543,72 @@ private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebal
}
}

private AssignmentError validateTaskAssignment(final ApplicationState applicationState,
final TaskAssignment taskAssignment) {
final Collection<KafkaStreamsAssignment> assignments = taskAssignment.assignment();
final Set<TaskId> activeTasksInOutput = new HashSet<>();
final Set<TaskId> standbyTasksInOutput = new HashSet<>();
for (final KafkaStreamsAssignment assignment : assignments) {
final Set<TaskId> tasksForAssignment = new HashSet<>();
for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) {
if (activeTasksInOutput.contains(task.id()) && task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
log.error("Assignment is invalid: an active task was assigned multiple times: {}", task.id());
return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
}

if (tasksForAssignment.contains(task.id())) {
log.error("Assignment is invalid: both an active and standby assignment of a task were assigned to the same client: {}", task.id());
return AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
}

tasksForAssignment.add(task.id());
if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
activeTasksInOutput.add(task.id());
} else {
standbyTasksInOutput.add(task.id());
}
}
}

for (final TaskInfo task : applicationState.allTasks()) {
if (!task.isStateful() && standbyTasksInOutput.contains(task.id())) {
log.error("Assignment is invalid: a standby task was found for a stateless task: {}", task.id());
return AssignmentError.INVALID_STANDBY_TASK;
}
}

final Map<ProcessId, KafkaStreamsState> clientStates = applicationState.kafkaStreamsStates(false);
final Set<ProcessId> clientsInOutput = assignments.stream().map(KafkaStreamsAssignment::processId)
.collect(Collectors.toSet());
for (final Map.Entry<ProcessId, KafkaStreamsState> entry : clientStates.entrySet()) {
final ProcessId processIdInInput = entry.getKey();
if (!clientsInOutput.contains(processIdInInput)) {
log.error("Assignment is invalid: one of the clients has no assignment: {}", processIdInInput.id());
return AssignmentError.MISSING_PROCESS_ID;
}
}

for (final ProcessId processIdInOutput : clientsInOutput) {
if (!clientStates.containsKey(processIdInOutput)) {
log.error("Assignment is invalid: one of the clients in the assignment is unknown: {}", processIdInOutput.id());
return AssignmentError.UNKNOWN_PROCESS_ID;
}
}

final Set<TaskId> taskIdsInInput = applicationState.allTasks().stream().map(TaskInfo::id)
.collect(Collectors.toSet());
for (final KafkaStreamsAssignment assignment : assignments) {
for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) {
if (!taskIdsInInput.contains(task.id())) {
log.error("Assignment is invalid: one of the tasks in the assignment is unknown: {}", task.id());
return AssignmentError.UNKNOWN_TASK_ID;
}
}
}

return AssignmentError.NONE;
}

/**
* Verify that this client's host info was included in the map returned in the assignment, and trigger a
* rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed
Expand Down

0 comments on commit d64e3fb

Please sign in to comment.