Skip to content

Commit 5165f3e

Browse files
committed
Merge pull request apache#32 from rhauch/streaming-topology-builder-improvement
TopologyBuilder methods return the builder to allow methods to be chained together.
2 parents 572d5a2 + 1a1bf8f commit 5165f3e

File tree

1 file changed

+96
-9
lines changed

1 file changed

+96
-9
lines changed

stream/src/main/java/org/apache/kafka/streaming/processor/TopologyBuilder.java

+96-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.kafka.common.KafkaException;
2121
import org.apache.kafka.common.serialization.Deserializer;
2222
import org.apache.kafka.common.serialization.Serializer;
23+
import org.apache.kafka.streaming.KafkaStreaming;
24+
import org.apache.kafka.streaming.StreamingConfig;
2325
import org.apache.kafka.streaming.processor.internals.ProcessorNode;
2426
import org.apache.kafka.streaming.processor.internals.ProcessorTopology;
2527
import org.apache.kafka.streaming.processor.internals.SinkNode;
@@ -33,6 +35,15 @@
3335
import java.util.Map;
3436
import java.util.Set;
3537

38+
/**
39+
* A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
40+
* and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
41+
* its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
42+
* processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
43+
* is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
44+
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
45+
* that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
46+
*/
3647
public class TopologyBuilder {
3748

3849
// list of node factories in a topological order
@@ -56,6 +67,7 @@ public ProcessorNodeFactory(String name, String[] parents, ProcessorDef definiti
5667
this.definition = definition;
5768
}
5869

70+
@Override
5971
public ProcessorNode build() {
6072
Processor processor = definition.instance();
6173
return new ProcessorNode(name, processor);
@@ -75,6 +87,7 @@ private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserial
7587
this.valDeserializer = valDeserializer;
7688
}
7789

90+
@Override
7891
public ProcessorNode build() {
7992
return new SourceNode(name, keyDeserializer, valDeserializer);
8093
}
@@ -94,18 +107,48 @@ private SinkNodeFactory(String name, String[] parents, String topic, Serializer
94107
this.keySerializer = keySerializer;
95108
this.valSerializer = valSerializer;
96109
}
110+
@Override
97111
public ProcessorNode build() {
98112
return new SinkNode(name, topic, keySerializer, valSerializer);
99113
}
100114
}
101115

116+
/**
117+
* Create a new builder.
118+
*/
102119
public TopologyBuilder() {}
103120

104-
public final void addSource(String name, String... topics) {
105-
addSource(name, (Deserializer) null, (Deserializer) null, topics);
121+
/**
122+
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
123+
* The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
124+
* {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
125+
* {@link StreamingConfig streaming configuration}.
126+
*
127+
* @param name the unique name of the source used to reference this node when
128+
* {@link #addProcessor(String, ProcessorDef, String...) adding processor children}.
129+
* @param topics the name of one or more Kafka topics that this source is to consume
130+
* @return this builder instance so methods can be chained together; never null
131+
*/
132+
public final TopologyBuilder addSource(String name, String... topics) {
133+
return addSource(name, (Deserializer) null, (Deserializer) null, topics);
106134
}
107135

108-
public final void addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
136+
/**
137+
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
138+
* The sink will use the specified key and value deserializers.
139+
*
140+
* @param name the unique name of the source used to reference this node when
141+
* {@link #addProcessor(String, ProcessorDef, String...) adding processor children}.
142+
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
143+
* should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
144+
* {@link StreamingConfig streaming configuration}
145+
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
146+
* should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
147+
* {@link StreamingConfig streaming configuration}
148+
* @param topics the name of one or more Kafka topics that this source is to consume
149+
* @return this builder instance so methods can be chained together; never null
150+
*/
151+
public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
109152
if (nodeNames.contains(name))
110153
throw new IllegalArgumentException("Processor " + name + " is already added.");
111154

@@ -118,14 +161,40 @@ public final void addSource(String name, Deserializer keyDeserializer, Deseriali
118161

119162
nodeNames.add(name);
120163
nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
164+
return this;
121165
}
122166

123-
public final void addSink(String name, String topic, String... parentNames) {
124-
addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
167+
/**
168+
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
169+
* The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
170+
* {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
171+
* {@link StreamingConfig streaming configuration}.
172+
*
173+
* @param name the unique name of the sink
174+
* @param topic the name of the Kafka topic to which this sink should write its messages
175+
* @return this builder instance so methods can be chained together; never null
176+
*/
177+
public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
178+
return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
125179
}
126180

127-
public final void addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
128-
181+
/**
182+
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
183+
* The sink will use the specified key and value serializers.
184+
*
185+
* @param name the unique name of the sink
186+
* @param topic the name of the Kafka topic to which this sink should write its messages
187+
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
188+
* should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
189+
* {@link StreamingConfig streaming configuration}
190+
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
191+
* should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
192+
* {@link StreamingConfig streaming configuration}
193+
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
194+
* and write to its topic
195+
* @return this builder instance so methods can be chained together; never null
196+
*/
197+
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
129198
if (nodeNames.contains(name))
130199
throw new IllegalArgumentException("Processor " + name + " is already added.");
131200

@@ -142,9 +211,19 @@ public final void addSink(String name, String topic, Serializer keySerializer, S
142211

143212
nodeNames.add(name);
144213
nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
214+
return this;
145215
}
146216

147-
public final void addProcessor(String name, ProcessorDef definition, String... parentNames) {
217+
/**
218+
* Add a new processor node that receives and processes messages output by one or more parent source or processor node.
219+
* Any new messages output by this processor will be forwarded to its child processor or sink nodes.
220+
* @param name the unique name of the processor node
221+
* @param definition the supplier used to obtain this node's {@link Processor} instance
222+
* @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
223+
* and process
224+
* @return this builder instance so methods can be chained together; never null
225+
*/
226+
public final TopologyBuilder addProcessor(String name, ProcessorDef definition, String... parentNames) {
148227
if (nodeNames.contains(name))
149228
throw new IllegalArgumentException("Processor " + name + " is already added.");
150229

@@ -161,10 +240,14 @@ public final void addProcessor(String name, ProcessorDef definition, String... p
161240

162241
nodeNames.add(name);
163242
nodeFactories.add(new ProcessorNodeFactory(name, parentNames, definition));
243+
return this;
164244
}
165245

166246
/**
167-
* Build the topology by creating the processors
247+
* Build the topology. This is typically called automatically when passing this builder into the
248+
* {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
249+
*
250+
* @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
168251
*/
169252
@SuppressWarnings("unchecked")
170253
public ProcessorTopology build() {
@@ -206,6 +289,10 @@ public ProcessorTopology build() {
206289
return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap);
207290
}
208291

292+
/**
293+
* Get the names of topics that are to be consumed by the source nodes created by this builder.
294+
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
295+
*/
209296
public Set<String> sourceTopics() {
210297
return Collections.unmodifiableSet(sourceTopicNames);
211298
}

0 commit comments

Comments
 (0)