From fd223ae1668fa44ab26f7449f5a73f53e430d8f6 Mon Sep 17 00:00:00 2001
From: Shanthoosh Venkataraman
Date: Thu, 16 Mar 2017 15:42:32 -0700
Subject: [PATCH] Adding job hierarchy endpoint.
---
.../versioned/rest/resource-directory.md | 3 +-
.../versioned/rest/resources/tasks.md | 125 +++++++++
.../samza/standalone/StandaloneJobCoordinator.java | 12 +-
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 8 +-
.../apache/samza/coordinator/JobModelManager.scala | 279 +++++++++++----------
.../scala/org/apache/samza/job/JobRunner.scala | 2 +-
.../main/scala/org/apache/samza/util/Util.scala | 12 +-
.../samza/execution/TestExecutionPlanner.java | 5 +-
.../samza/coordinator/TestJobCoordinator.scala | 32 ++-
samza-rest/src/main/config/samza-rest.properties | 3 +
.../org/apache/samza/rest/model/Partition.java | 103 ++++++++
.../java/org/apache/samza/rest/model/Task.java | 137 ++++++++++
.../samza/rest/proxy/job/AbstractJobProxy.java | 21 +-
.../samza/rest/proxy/job/JobProxyFactory.java | 2 +-
.../samza/rest/proxy/job/SimpleYarnJobProxy.java | 8 +-
.../rest/proxy/job/SimpleYarnJobProxyFactory.java | 14 +-
.../samza/rest/proxy/task/SamzaTaskProxy.java | 169 +++++++++++++
.../rest/proxy/task/SamzaTaskProxyFactory.java | 54 ++++
.../apache/samza/rest/proxy/task/TaskProxy.java | 44 ++++
.../samza/rest/proxy/task/TaskProxyFactory.java | 41 +++
.../samza/rest/proxy/task/TaskResourceConfig.java | 50 ++++
.../samza/rest/resources/BaseResourceConfig.java | 87 +++++++
.../rest/resources/DefaultResourceFactory.java | 7 +-
.../apache/samza/rest/resources/JobsResource.java | 20 +-
.../samza/rest/resources/JobsResourceConfig.java | 40 +--
.../org/apache/samza/rest/resources/Responses.java | 52 ++++
.../apache/samza/rest/resources/TasksResource.java | 97 +++++++
.../samza/rest/resources/TestJobsResource.java | 13 +-
.../samza/rest/resources/TestTasksResource.java | 105 ++++++++
.../resources/mock/MockInstallationFinder.java | 39 +++
.../rest/resources/mock/MockResourceFactory.java | 44 ++++
.../samza/rest/resources/mock/MockTaskProxy.java | 72 ++++++
.../rest/resources/mock/MockTaskProxyFactory.java | 32 +++
33 files changed, 1476 insertions(+), 256 deletions(-)
create mode 100644 docs/learn/documentation/versioned/rest/resources/tasks.md
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java
create mode 100644 samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java
create mode 100644 samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
create mode 100644 samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
create mode 100644 samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java
create mode 100644 samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
create mode 100644 samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java
diff --git a/docs/learn/documentation/versioned/rest/resource-directory.md b/docs/learn/documentation/versioned/rest/resource-directory.md
index 79746d1..3d7bfe4 100644
--- a/docs/learn/documentation/versioned/rest/resource-directory.md
+++ b/docs/learn/documentation/versioned/rest/resource-directory.md
@@ -22,6 +22,7 @@ title: Resource Directory
The Samza REST Service ships with the JAX-RS Resources listed below.
- [JobsResource](resources/jobs.html)
-- (Second resource coming soon)
+- [TasksResource](resources/tasks.html)
## [Jobs Resource »](resources/jobs.html)
+## [Tasks Resource »](resources/tasks.html)
diff --git a/docs/learn/documentation/versioned/rest/resources/tasks.md b/docs/learn/documentation/versioned/rest/resources/tasks.md
new file mode 100644
index 0000000..385848e
--- /dev/null
+++ b/docs/learn/documentation/versioned/rest/resources/tasks.md
@@ -0,0 +1,125 @@
+---
+layout: page
+title: Tasks Resource
+---
+
+
+This resource exposes endpoints to support operations at the tasks scope. The initial implementation includes the ability to list all the tasks for a particular job.
+This is a sub-resource of the [Jobs Resource »](jobs.html) and is not intended to be used independently.
+
+Responses of individual endpoints will vary in accordance with their functionality and scope. However, the error
+messages of all of the tasks resource end points will be of the following form.
+
+**Error Message**
+
+Every error response will have the following structure:
+
+{% highlight json %}
+{
+ "message": "Unrecognized status parameter: null"
+}
+{% endhighlight %}
+`message` is the only field in the response and contains a description of the problem.
+
+
+## Get All Tasks
+Lists the complete details about all the tasks for a particular job
+
+######Request
+ GET /v1/jobs/{jobName}/{jobId}/tasks
+
+######Response
+Status: 200 OK
+
+{% highlight json %}
+ [
+{
+ "preferredHost" : "samza-preferredHost",
+ "taskName" : "Samza task",
+ "containerId" : "0",
+ "partitions" : [{
+ "system" : "kafka",
+ "stream" : "topic-name",
+ "partitionId" : "0"
+ }]
+ }
+ ]
+{% endhighlight %}
+
+######Response codes
+
+
+
+
Status
+
Description
+
+
+
+
+
200 OK
The operation completed successfully and all the tasks that for
+ the job are returned.
+
+
+
404 Not Found
Invalid job instance was provided as an argument.{% highlight json %}
+{
+ "message": "Invalid arguments for getTasks. jobName: SamzaJobName jobId: SamzaJobId."
+}
+{% endhighlight %}
+
+
+
500 Server Error
There was an error executing the command on the server. e.g. The command timed out.{% highlight json %}
+{
+ "message": "Timeout waiting for get all tasks."
+}
+{% endhighlight %}
+
+
+
+
+
+
+
+###Design
+###Abstractions
+There are two primary abstractions that are required by the TasksResource that users can implement to handle any details specific to their environment.
+
+1. **TaskProxy**: This interface is the central point of interacting with Samza tasks. It exposes a method to get all the tasks of a Samza job.
+2. **InstallationFinder**: The InstallationFinder provides a generic interface to discover all the installed jobs, hiding any customizations in the job package structure and its location. The InstallationFinder also resolves the job configuration, which is used to validate and identify the job.
+
+## Configuration
+The TasksResource properties should be specified in the same file as the Samza REST configuration.
+
+
+
+
+
Name
+
Description
+
+
+
+
+
task.proxy.factory.class
Required: The TaskProxyFactory that will be used to create the TaskProxy instances. The value is a fully-qualified class name which must implement TaskProxyFactory. Samza ships with one implementation:
org.apache.samza.rest.proxy.task.SamzaTaskProxy
gets the details of all the tasks of a job. It uses the
SimpleInstallationRecord
to interact with Samza jobs installed on disk.
+
+
+
job.installations.path
Required: The file system path which contains the Samza job installations. The path must be on the same host as the Samza REST Service. Each installation must be a directory with structure conforming to the expectations of the InstallationRecord implementation used by the JobProxy.
+
+
+
job.config.factory.class
The config factory to use for reading Samza job configs. This is used to fetch the job.name and job.id properties for each job instance in the InstallationRecord. It's also used to validate that a particular directory within the installation path actually contains Samza jobs. If not specified
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
index 46dbf30..b2927f4 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -19,12 +19,12 @@
package org.apache.samza.standalone;
import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.JobModelManager$;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
import org.apache.samza.system.StreamMetadataCache;
@@ -65,7 +65,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
private final int processorId;
private final Config config;
- private final JobModelManager jobModelManager;
+ private final JobModel jobModel;
private final SamzaContainerController containerController;
@VisibleForTesting
@@ -73,11 +73,11 @@ public class StandaloneJobCoordinator implements JobCoordinator {
int processorId,
Config config,
SamzaContainerController containerController,
- JobModelManager jobModelManager) {
+ JobModel jobModel) {
this.processorId = processorId;
this.config = config;
this.containerController = containerController;
- this.jobModelManager = jobModelManager;
+ this.jobModel = jobModel;
}
public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
@@ -105,7 +105,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
* TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
* (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
*/
- this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, null);
+ this.jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
}
@Override
@@ -143,6 +143,6 @@ public class StandaloneJobCoordinator implements JobCoordinator {
@Override
public JobModel getJobModel() {
- return jobModelManager.jobModel();
+ return jobModel;
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1d16d4a..c425d65 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,7 +29,6 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.JobModelManager$;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
import org.apache.samza.system.StreamMetadataCache;
@@ -57,7 +57,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private JobModel newJobModel;
private String newJobModelVersion; // version published in ZK (by the leader)
- private JobModelManager jobModelManager;
+ private JobModel jobModel;
public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
this.zkUtils = zkUtils;
@@ -201,9 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
log.info("generate new job model: processorsIds: " + sb.toString());
- jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null,
- containerIds);
- JobModel jobModel = jobModelManager.jobModel();
+ jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, containerIds);
log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 14d5dff..4122c87 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -26,104 +26,145 @@ import java.util.concurrent.atomic.AtomicReference
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{Config, StorageConfig}
+import org.apache.samza.config.Config
+import org.apache.samza.config.StorageConfig
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
-import org.apache.samza.container.grouper.task.{BalancingTaskNameGrouper, TaskNameGrouperFactory}
-import org.apache.samza.container.{LocalityManager, TaskName}
-import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
+import org.apache.samza.container.grouper.task.BalancingTaskNameGrouper
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
+import org.apache.samza.container.LocalityManager
+import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.job.model.{JobModel, TaskModel}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, SystemFactory, SystemStreamPartition, SystemStreamPartitionMatcher}
-import org.apache.samza.util.{Logging, Util}
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.system.ExtendedSystemAdmin
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamPartitionMatcher
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-
/**
- * Helper companion object that is responsible for wiring up a JobCoordinator
+ * Helper companion object that is responsible for wiring up a JobModelManager
* given a Config object.
*/
object JobModelManager extends Logging {
+ val SOURCE = "JobModelManager"
/**
- * a volatile value to store the current instantiated JobCoordinator
+ * a volatile value to store the current instantiated JobModelManager
*/
@volatile var currentJobModelManager: JobModelManager = null
val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
/**
- * @param coordinatorSystemConfig A config object that contains job.name,
- * job.id, and all system.<job-coordinator-system-name>.*
- * configuration. The method will use this config to read all configuration
- * from the coordinator stream, and instantiate a JobCoordinator.
+ * Does the following actions for a job.
+ * a) Reads the jobModel from coordinator stream using the job's configuration.
+ * b) Creates changeLogStream for task stores if it does not exists.
+ * c) Recomputes changelog partition mapping based on jobModel and job's configuration
+ * and writes it to the coordinator stream.
+ * d) Builds JobModelManager using the jobModel read from coordinator stream.
+ * @param coordinatorSystemConfig A config object that contains job.name
+ * job.id, and all system.<job-coordinator-system-name>.*
+ * configuration. The method will use this config to read all configuration
+ * from the coordinator stream, and instantiate a JobModelManager.
*/
def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = {
- val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
- val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
- val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
- info("Registering coordinator system stream.")
- coordinatorSystemConsumer.register
- debug("Starting coordinator system stream.")
- coordinatorSystemConsumer.start
- debug("Bootstrapping coordinator system stream.")
- coordinatorSystemConsumer.bootstrap
- val source = "Job-coordinator"
- coordinatorSystemProducer.register(source)
- info("Registering coordinator system stream producer.")
- val config = coordinatorSystemConsumer.getConfig
- info("Got config: %s" format config)
- val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
- val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+ var coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = null
+ var coordinatorSystemProducer: CoordinatorStreamSystemProducer = null
+ try {
+ val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
+ coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+ coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+ info("Registering coordinator system stream consumer.")
+ coordinatorSystemConsumer.register
+ debug("Starting coordinator system stream consumer.")
+ coordinatorSystemConsumer.start
+ debug("Bootstrapping coordinator system stream consumer.")
+ coordinatorSystemConsumer.bootstrap
+ info("Registering coordinator system stream producer.")
+ coordinatorSystemProducer.register(SOURCE)
+
+ val config = coordinatorSystemConsumer.getConfig
+ info("Got config: %s" format config)
+ val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE)
+ changelogManager.start()
+ val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+ // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
+ // TODO: This code will go away with refactoring - SAMZA-678
- val systemNames = getSystemNames(config)
+ localityManager.start()
- // Map the name of each system to the corresponding SystemAdmin
- val systemAdmins = systemNames.map(systemName => {
- val systemFactoryClassName = config
- .getSystemFactory(systemName)
- .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
- val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
- systemName -> systemFactory.getAdmin(systemName, config)
- }).toMap
+ // Map the name of each system to the corresponding SystemAdmin
+ val systemAdmins = getSystemAdmins(config)
- val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
- var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
- if (config.getMonitorPartitionChange) {
- val extendedSystemAdmins = systemAdmins.filter{
- case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
- }
- val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
- if (inputStreamsToMonitor.nonEmpty) {
- streamPartitionCountMonitor = new StreamPartitionCountMonitor(
- setAsJavaSet(inputStreamsToMonitor),
- streamMetadataCache,
- metricsRegistryMap,
- config.getMonitorPartitionChangeFrequency)
+ val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
+ var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
+ if (config.getMonitorPartitionChange) {
+ val extendedSystemAdmins = systemAdmins.filter{
+ case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
+ }
+ val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
+ if (inputStreamsToMonitor.nonEmpty) {
+ streamPartitionCountMonitor = new StreamPartitionCountMonitor(
+ setAsJavaSet(inputStreamsToMonitor),
+ streamMetadataCache,
+ metricsRegistryMap,
+ config.getMonitorPartitionChangeFrequency)
+ }
+ }
+ val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
+ val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
+ val jobModel = jobModelManager.jobModel
+ // Save the changelog mapping back to the ChangelogPartitionmanager
+ // newChangelogPartitionMapping is the merging of all current task:changelog
+ // assignments with whatever we had before (previousChangelogPartitionMapping).
+ // We must persist legacy changelog assignments so that
+ // maxChangelogPartitionId always has the absolute max, not the current
+ // max (in case the task with the highest changelog partition mapping
+ // disappears.
+ val newChangelogPartitionMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
+ taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+ }}.toMap ++ previousChangelogPartitionMapping
+ info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
+ changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping)
+
+ createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)
+
+ jobModelManager
+ } finally {
+ if (coordinatorSystemConsumer != null) {
+ coordinatorSystemConsumer.stop()
+ }
+ if (coordinatorSystemProducer != null) {
+ coordinatorSystemProducer.stop()
}
}
-
- val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
- createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions)
-
- jobCoordinator
}
-
def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap())
/**
- * Build a JobCoordinator using a Samza job's configuration.
+ * Build a JobModelManager using a Samza job's configuration.
*/
- def getJobCoordinator(config: Config,
- changelogManager: ChangelogPartitionManager,
- localityManager: LocalityManager,
- streamMetadataCache: StreamMetadataCache,
- streamPartitionCountMonitor: StreamPartitionCountMonitor,
- containerIds: java.util.List[Integer]) = {
- val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache, containerIds)
+ private def getJobModelManager(config: Config,
+ changeLogMapping: util.Map[TaskName, Integer],
+ localityManager: LocalityManager,
+ streamMetadataCache: StreamMetadataCache,
+ streamPartitionCountMonitor: StreamPartitionCountMonitor,
+ containerIds: java.util.List[Integer]) = {
+ val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds)
jobModelRef.set(jobModel)
val server = new HttpServer
@@ -136,7 +177,7 @@ object JobModelManager extends Logging {
* For each input stream specified in config, exactly determine its
* partitions, returning a set of SystemStreamPartitions containing them all.
*/
- def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) = {
+ private def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) = {
val inputSystemStreams = config.getInputStreams
// Get the set of partitions for each SystemStream from the stream metadata
@@ -151,7 +192,7 @@ object JobModelManager extends Logging {
}.toSet
}
- def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) : Set[SystemStreamPartition] = {
+ private def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache): Set[SystemStreamPartition] = {
val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache)
config.getSSPMatcherClass match {
case Some(s) => {
@@ -172,98 +213,40 @@ object JobModelManager extends Logging {
}
}
-
/**
* Gets a SystemStreamPartitionGrouper object from the configuration.
*/
- def getSystemStreamPartitionGrouper(config: Config) = {
+ private def getSystemStreamPartitionGrouper(config: Config) = {
val factoryString = config.getSystemStreamPartitionGrouperFactory
val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString)
factory.getSystemStreamPartitionGrouper(config)
}
/**
- * The method intializes the jobModel and returns it to the caller.
- * Note: refreshJobModel can be used as a lambda for JobModel generation in the future.
+ * The function reads the latest checkpoint from the underlying coordinator stream and
+ * builds a new JobModel.
*/
- private def initializeJobModel(config: Config,
- changelogManager: ChangelogPartitionManager,
- localityManager: LocalityManager,
- streamMetadataCache: StreamMetadataCache,
- containerIds: java.util.List[Integer]): JobModel = {
+ def readJobModel(config: Config,
+ changeLogPartitionMapping: util.Map[TaskName, Integer],
+ localityManager: LocalityManager,
+ streamMetadataCache: StreamMetadataCache,
+ containerIds: java.util.List[Integer]): JobModel = {
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
val grouper = getSystemStreamPartitionGrouper(config)
val groups = grouper.group(allSystemStreamPartitions)
info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet()))
- // Initialize the ChangelogPartitionManager and the CheckpointManager
- val previousChangelogMapping = if (changelogManager != null)
- {
- changelogManager.start()
- changelogManager.readChangeLogPartitionMapping()
- }
- else
- {
- new util.HashMap[TaskName, Integer]()
- }
- // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
- // TODO: This code will go away with refactoring - SAMZA-678
-
- if (localityManager != null) {
- localityManager.start()
- }
-
- // Generate the jobModel
- def jobModelGenerator(): JobModel = refreshJobModel(config,
- groups,
- previousChangelogMapping,
- localityManager,
- containerIds)
-
- val jobModel = jobModelGenerator()
-
- // Save the changelog mapping back to the ChangelogPartitionmanager
- if (changelogManager != null)
- {
- // newChangelogMapping is the merging of all current task:changelog
- // assignments with whatever we had before (previousChangelogMapping).
- // We must persist legacy changelog assignments so that
- // maxChangelogPartitionId always has the absolute max, not the current
- // max (in case the task with the highest changelog partition mapping
- // disappears.
- val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
- taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
- }}.toMap ++ previousChangelogMapping
- info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
- changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
- }
-
- jobModel
- }
-
- /**
- * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
- * builds a new JobModel.
- * Note: This method no longer needs to be thread safe because HTTP request from a container no longer triggers a jobmodel
- * refresh. Hence, there is no need for synchronization as before.
- */
- private def refreshJobModel(config: Config,
- groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
- previousChangelogMapping: util.Map[TaskName, Integer],
- localityManager: LocalityManager,
- containerIds: java.util.List[Integer]): JobModel = {
-
// If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
// mapping.
- var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+ var maxChangelogPartitionId = changeLogPartitionMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
// Sort the groups prior to assigning the changelog mapping so that the mapping is reproducible and intuitive
val sortedGroups = new util.TreeMap[TaskName, util.Set[SystemStreamPartition]](groups)
// Assign all SystemStreamPartitions to TaskNames.
val taskModels = {
sortedGroups.map { case (taskName, systemStreamPartitions) =>
- val changelogPartition = Option(previousChangelogMapping.get(taskName)) match {
+ val changelogPartition = Option(changeLogPartitionMapping.get(taskName)) match {
case Some(changelogPartitionId) => new Partition(changelogPartitionId)
case _ =>
// If we've never seen this TaskName before, then assign it a
@@ -291,6 +274,24 @@ object JobModelManager extends Logging {
new JobModel(config, containerMap, localityManager)
}
+ /**
+ * Instantiates the system admins based upon the system factory class available in {@param config}.
+ * @param config contains adequate information to instantiate the SystemAdmin.
+ * @return a map of SystemName(String) to the instantiated SystemAdmin.
+ */
+ def getSystemAdmins(config: Config) : Map[String, SystemAdmin] = {
+ val systemNames = getSystemNames(config)
+ // Map the name of each system to the corresponding SystemAdmin
+ val systemAdmins = systemNames.map(systemName => {
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ systemName -> systemFactory.getAdmin(systemName, config)
+ }).toMap
+ systemAdmins
+ }
+
private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
val changeLogSystemStreams = config
.getStoreNames
@@ -313,7 +314,7 @@ object JobModelManager extends Logging {
}
/**
- *
JobCoordinator is responsible for managing the lifecycle of a Samza job
+ *
JobModelManager is responsible for managing the lifecycle of a Samza job
* once it's been started. This includes starting and stopping containers,
* managing configuration, etc.
*
@@ -321,7 +322,7 @@ object JobModelManager extends Logging {
* must integrate with the job coordinator.
*
*
This class' API is currently unstable, and likely to change. The
- * coordinator's responsibility is simply to propagate the job model, and HTTP
+ * responsibility is simply to propagate the job model, and HTTP
* server right now.
*/
class JobModelManager(
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 6d8b24d..70e5a51 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,7 +20,7 @@
package org.apache.samza.job
import org.apache.samza.SamzaException
-import org.apache.samza.config.{ConfigRewriter, Config}
+import org.apache.samza.config.Config
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
import org.apache.samza.job.ApplicationStatus.Running
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 97bd22a..4a945d2 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -23,19 +23,23 @@ import java.net._
import java.io._
import java.lang.management.ManagementFactory
import java.util.zip.CRC32
-import org.apache.samza.config.ConfigRewriter
import org.apache.samza.{SamzaException, Partition}
import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
import java.util.Random
+
import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.ConfigRewriter
+import org.apache.samza.config.JobConfig
+import org.apache.samza.config.MapConfig
import org.apache.samza.config.SystemConfig
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.ConfigException
-import org.apache.samza.config.MapConfig
+
import scala.collection.JavaConversions._
-import org.apache.samza.config.JobConfig
import java.io.InputStreamReader
+
+
import scala.collection.immutable.Map
import org.apache.samza.serializers._
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index ee73195..46250d7 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -19,6 +19,7 @@
package org.apache.samza.execution;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -162,8 +163,8 @@ public class TestExecutionPlanner {
MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- m1.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output1, null, null));
- m3.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output2, null, null));
+ m1.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output1, null, null));
+ m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output2, null, null));
return streamGraph;
}
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index fcabc69..a779027 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -22,29 +22,39 @@ package org.apache.samza.coordinator
import java.util
import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
-import org.apache.samza.job.MockJobFactory
-import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
+import org.apache.samza.job.local.ProcessJobFactory
+import org.apache.samza.job.local.ThreadJobFactory
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.util.Util
-import org.junit.{After, Test}
+import org.junit.After
+import org.junit.Test
import org.junit.Assert._
+
import scala.collection.JavaConversions._
import org.apache.samza.config.MapConfig
import org.apache.samza.config.TaskConfig
import org.apache.samza.config.SystemConfig
-import org.apache.samza.container.{SamzaContainer, TaskName}
+import org.apache.samza.container.SamzaContainer
+import org.apache.samza.container.TaskName
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.Config
import org.apache.samza.system._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.TaskModel
import org.apache.samza.config.JobConfig
-import org.apache.samza.coordinator.stream.{MockCoordinatorStreamWrappedConsumer, MockCoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamWrappedConsumer
+import org.apache.samza.job.MockJobFactory
+import org.scalatest.{FlatSpec, PrivateMethodTester}
-class TestJobCoordinator {
+import scala.collection.immutable
+
+
+class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
/**
* Builds a coordinator from config, and then compares it with what was
* expected. We simulate having a checkpoint manager that has 2 task
@@ -251,9 +261,11 @@ class TestJobCoordinator {
}).toMap
val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+ val getInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions)
+ val getMatchedInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
- val allSSP = JobModelManager.getInputStreamPartitions(config, streamMetadataCache)
- val matchedSSP = JobModelManager.getMatchedInputStreamPartitions(config, streamMetadataCache)
+ val allSSP = JobModelManager invokePrivate getInputStreamPartitions(config, streamMetadataCache)
+ val matchedSSP = JobModelManager invokePrivate getMatchedInputStreamPartitions(config, streamMetadataCache)
assertEquals(matchedSSP, allSSP)
}
@@ -320,7 +332,7 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
override def createCoordinatorStream(streamName: String) {
new UnsupportedOperationException("Method not implemented.")
}
-
+
override def offsetComparator(offset1: String, offset2: String) = null
override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
diff --git a/samza-rest/src/main/config/samza-rest.properties b/samza-rest/src/main/config/samza-rest.properties
index 7be0b47..0b9dcc3 100644
--- a/samza-rest/src/main/config/samza-rest.properties
+++ b/samza-rest/src/main/config/samza-rest.properties
@@ -21,3 +21,6 @@ services.rest.port=9139
# JobsResource
job.proxy.factory.class=org.apache.samza.rest.proxy.job.SimpleYarnJobProxyFactory
job.installations.path=/export/content/samza/deploy/
+
+# TasksResource
+task.proxy.factory.class=org.apache.samza.rest.proxy.task.SamzaTaskProxyFactory
\ No newline at end of file
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java
new file mode 100644
index 0000000..8911cb8
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Provides a client view of the samza system stream partition.
+ * Includes the system name, stream name and partition id
+ */
+public class Partition {
+
+ private String system;
+ private String stream;
+ private int partitionId;
+
+ public Partition() {
+ }
+
+ public Partition(@JsonProperty("system") String system,
+ @JsonProperty("stream") String stream,
+ @JsonProperty("partitionId") int partitionId) {
+ this.system = system;
+ this.stream = stream;
+ this.partitionId = partitionId;
+ }
+
+ public Partition(SystemStreamPartition systemStreamPartition) {
+ this(systemStreamPartition.getSystem(),
+ systemStreamPartition.getStream(),
+ systemStreamPartition.getPartition().getPartitionId());
+ }
+
+ public String getSystem() {
+ return system;
+ }
+
+ public void setSystem(String system) {
+ this.system = system;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Partition)) {
+ return false;
+ }
+
+ Partition partition = (Partition) o;
+
+ if (partitionId != partition.partitionId) {
+ return false;
+ }
+ if (!system.equals(partition.system)) {
+ return false;
+ }
+ return stream.equals(partition.stream);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = system.hashCode();
+ result = 31 * result + stream.hashCode();
+ result = 31 * result + partitionId;
+ return result;
+ }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
new file mode 100644
index 0000000..f1225ec
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+import java.util.List;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Provides a client view of a samza task.
+ * Includes the preferred preferredHost, taskName, containerId, containerId and list of partitions.
+ *
+ */
+public class Task {
+
+ // preferred host of the task
+ private String preferredHost;
+
+ // name of the task
+ private String taskName;
+
+ // containerId of the samza container in which the task is running
+ private int containerId;
+
+ // list of partitions that belong to the task.
+ private List partitions;
+
+ // list of stores that are associated with the task.
+ private List storeNames;
+
+ public Task() {
+ }
+
+ public Task(@JsonProperty("preferredHost") String preferredHost,
+ @JsonProperty("taskName") String taskName,
+ @JsonProperty("containerId") int containerId,
+ @JsonProperty("partitions") List partitions,
+ @JsonProperty("storeNames") List storeNames) {
+ this.preferredHost = preferredHost;
+ this.taskName = taskName;
+ this.containerId = containerId;
+ this.partitions = partitions;
+ this.storeNames = storeNames;
+ }
+
+ public String getPreferredHost() {
+ return preferredHost;
+ }
+
+ public void setPreferredHost(String preferredHost) {
+ this.preferredHost = preferredHost;
+ }
+
+ public int getContainerId() {
+ return containerId;
+ }
+
+ public void setContainerId(int containerId) {
+ this.containerId = containerId;
+ }
+
+ public List getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(List partitions) {
+ this.partitions = partitions;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public void setTaskName(String taskName) {
+ this.taskName = taskName;
+ }
+
+ public List getStoreNames() {
+ return storeNames;
+ }
+
+ public void setStoreNames(List storeNames) {
+ this.storeNames = storeNames;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Task)) {
+ return false;
+ }
+
+ Task task = (Task) o;
+
+ if (containerId != task.containerId) {
+ return false;
+ }
+ if (!preferredHost.equals(task.preferredHost)) {
+ return false;
+ }
+ if (!taskName.equals(task.taskName)) {
+ return false;
+ }
+ if (!partitions.equals(task.partitions)) {
+ return false;
+ }
+ return storeNames.equals(task.storeNames);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = preferredHost.hashCode();
+ result = 31 * result + taskName.hashCode();
+ result = 31 * result + containerId;
+ result = 31 * result + partitions.hashCode();
+ result = 31 * result + storeNames.hashCode();
+ return result;
+ }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
index 4d8647f..492385f 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
@@ -24,8 +24,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.samza.SamzaException;
-import org.apache.samza.config.ConfigFactory;
-import org.apache.samza.config.factories.PropertiesConfigFactory;
import org.apache.samza.rest.model.Job;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.resources.JobsResourceConfig;
@@ -33,7 +31,6 @@ import org.apache.samza.util.ClassLoaderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Implements a subset of the {@link JobProxy} interface with the default, cluster-agnostic,
* implementations. Subclasses are expected to override these default methods where necessary.
@@ -53,7 +50,7 @@ public abstract class AbstractJobProxy implements JobProxy {
String jobProxyFactory = config.getJobProxyFactory();
if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
try {
- JobProxyFactory factory = ClassLoaderHelper.fromClassName(jobProxyFactory);
+ JobProxyFactory factory = ClassLoaderHelper.fromClassName(jobProxyFactory);
return factory.getJobProxy(config);
} catch (Exception e) {
throw new SamzaException(e);
@@ -110,22 +107,6 @@ public abstract class AbstractJobProxy implements JobProxy {
}
/**
- * @return the {@link ConfigFactory} to use to read job configuration files.
- */
- protected ConfigFactory getJobConfigFactory() {
- String configFactoryClassName = config.get(JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY);
- if (configFactoryClassName == null) {
- configFactoryClassName = PropertiesConfigFactory.class.getCanonicalName();
- log.warn("{} not specified. Defaulting to {}", JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY, configFactoryClassName);
- }
-
- try {
- return ClassLoaderHelper.fromClassName(configFactoryClassName);
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
- /**
* @return the {@link JobStatusProvider} to use in retrieving the job status.
*/
protected abstract JobStatusProvider getJobStatusProvider();
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
index 067711a..d54a6f6 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
@@ -34,7 +34,7 @@ public interface JobProxyFactory {
/**
* Creates a new {@link JobProxy} and initializes it with the specified config.
*
- * @param config the {@link org.apache.samza.rest.SamzaRestConfig} to pass to the proxy.
+ * @param config the {@link JobsResourceConfig} to pass to the proxy.
* @return the created proxy.
*/
JobProxy getJobProxy(JobsResourceConfig config);
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
index a935c98..677be1a 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
@@ -25,10 +25,10 @@ import org.apache.samza.rest.proxy.installation.InstallationFinder;
import org.apache.samza.rest.proxy.installation.InstallationRecord;
import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.apache.samza.util.ClassLoaderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Extends the {@link ScriptJobProxy} with methods specific to simple Samza deployments.
*/
@@ -45,10 +45,10 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
private final InstallationFinder installFinder;
- public SimpleYarnJobProxy(JobsResourceConfig config) {
+ public SimpleYarnJobProxy(JobsResourceConfig config) throws Exception {
super(config);
-
- installFinder = new SimpleInstallationFinder(config.getInstallationsPath(), getJobConfigFactory());
+ this.installFinder = new SimpleInstallationFinder(config.getInstallationsPath(),
+ ClassLoaderHelper.fromClassName(config.getJobConfigFactory()));
}
@Override
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
index 11d93d4..1e8556b 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
@@ -18,18 +18,28 @@
*/
package org.apache.samza.rest.proxy.job;
+import org.apache.samza.SamzaException;
import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Factory to produce SimpleJobProxy instances.
*
- * See {@link AbstractJobProxy#fromFactory(org.apache.samza.rest.resources.JobsResourceConfig)}
+ * See {@link AbstractJobProxy#fromFactory(JobsResourceConfig)}
*/
public class SimpleYarnJobProxyFactory implements JobProxyFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleYarnJobProxyFactory.class);
+
@Override
public JobProxy getJobProxy(JobsResourceConfig config) {
- return new SimpleYarnJobProxy(config);
+ try {
+ return new SimpleYarnJobProxy(config);
+ } catch (Exception e) {
+ LOG.error("Exception during instantiation of SimpleYarnJobProxy: ", e);
+ throw new SamzaException(e);
+ }
}
}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
new file mode 100644
index 0000000..27c88e5
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.rest.model.Partition;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.storage.ChangelogPartitionManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * {@link TaskProxy} interface implementation for samza jobs running in yarn execution environment.
+ * getTasks implementation reads the jobModel of the job specified by {@link JobInstance} from coordinator stream.
+ */
+public class SamzaTaskProxy implements TaskProxy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaTaskProxy.class);
+
+ private static final MetricsRegistryMap METRICS_REGISTRY = new MetricsRegistryMap();
+
+ private static final String SOURCE = "SamzaTaskProxy";
+
+ private final TaskResourceConfig taskResourceConfig;
+
+ private final InstallationFinder installFinder;
+
+ public SamzaTaskProxy(TaskResourceConfig taskResourceConfig, InstallationFinder installFinder) {
+ this.taskResourceConfig = taskResourceConfig;
+ this.installFinder = installFinder;
+ }
+
+ /**
+ * Fetches the complete job model from the coordinator stream based upon the provided {@link JobInstance}
+ * param, transforms it to a list of {@link Task} and returns it.
+ * {@inheritDoc}
+ */
+ @Override
+ public List getTasks(JobInstance jobInstance)
+ throws IOException, InterruptedException {
+ Preconditions.checkArgument(installFinder.isInstalled(jobInstance),
+ String.format("Invalid job instance : %s", jobInstance));
+ JobModel jobModel = getJobModel(jobInstance);
+ StorageConfig storageConfig = new StorageConfig(jobModel.getConfig());
+
+ List storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
+ Map containerLocality = jobModel.getAllContainerLocality();
+ List tasks = new ArrayList<>();
+ for (ContainerModel containerModel : jobModel.getContainers().values()) {
+ int containerId = containerModel.getContainerId();
+ String host = containerLocality.get(containerId);
+ for (TaskModel taskModel : containerModel.getTasks().values()) {
+ String taskName = taskModel.getTaskName().getTaskName();
+ List partitions = taskModel.getSystemStreamPartitions()
+ .stream()
+ .map(Partition::new).collect(Collectors.toList());
+ tasks.add(new Task(host, taskName, containerId, partitions, storeNames));
+ }
+ }
+ return tasks;
+ }
+
+ /**
+ * Builds coordinator system config for the {@param jobInstance}.
+ * @param jobInstance the job instance to get the jobModel for.
+ * @return the constructed coordinator system config.
+ */
+ private Config getCoordinatorSystemConfig(JobInstance jobInstance) {
+ try {
+ InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance);
+ ConfigFactory configFactory = ClassLoaderHelper.fromClassName(taskResourceConfig.getJobConfigFactory());
+ Config config = configFactory.getConfig(new URI(String.format("file://%s", record.getConfigFilePath())));
+ Map configMap = ImmutableMap.of(JobConfig.JOB_ID(), jobInstance.getJobId(),
+ JobConfig.JOB_NAME(), jobInstance.getJobName());
+ return Util.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap)));
+ } catch (Exception e) {
+ LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e);
+ throw new SamzaException(e);
+ }
+ }
+
+ /**
+ * Retrieves the jobModel from the jobCoordinator.
+ * @param jobInstance the job instance (jobId, jobName).
+ * @return the JobModel fetched from the coordinator stream.
+ */
+ protected JobModel getJobModel(JobInstance jobInstance) {
+ CoordinatorStreamSystemConsumer coordinatorSystemConsumer = null;
+ CoordinatorStreamSystemProducer coordinatorSystemProducer = null;
+ try {
+ CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
+ Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
+ LOG.info("Using config: {} to create coordinatorStream producer and consumer.", coordinatorSystemConfig);
+ coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
+ coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, METRICS_REGISTRY);
+ LOG.info("Registering coordinator system stream consumer.");
+ coordinatorSystemConsumer.register();
+ LOG.debug("Starting coordinator system stream consumer.");
+ coordinatorSystemConsumer.start();
+ LOG.debug("Bootstrapping coordinator system stream consumer.");
+ coordinatorSystemConsumer.bootstrap();
+ LOG.info("Registering coordinator system stream producer.");
+ coordinatorSystemProducer.register(SOURCE);
+
+ Config config = coordinatorSystemConsumer.getConfig();
+ LOG.info("Got config from coordinatorSystemConsumer: {}.", config);
+ ChangelogPartitionManager changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE);
+ changelogManager.start();
+ LocalityManager localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer);
+ localityManager.start();
+ return JobModelManager.readJobModel(config, changelogManager.readChangeLogPartitionMapping(), localityManager,
+ new StreamMetadataCache(JobModelManager.getSystemAdmins(config), 0, SystemClock.instance()), null);
+ } finally {
+ if (coordinatorSystemConsumer != null) {
+ coordinatorSystemConsumer.stop();
+ }
+ if (coordinatorSystemProducer != null) {
+ coordinatorSystemProducer.stop();
+ }
+ }
+ }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
new file mode 100644
index 0000000..7bcac7d
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
+import org.apache.samza.rest.resources.BaseResourceConfig;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Creates the {@link TaskProxy} instances.
+ */
+public class SamzaTaskProxyFactory implements TaskProxyFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaTaskProxyFactory.class);
+
+ @Override
+ public TaskProxy getTaskProxy(TaskResourceConfig config) {
+ String installationsPath = config.getInstallationsPath();
+ Preconditions.checkArgument(StringUtils.isNotEmpty(installationsPath),
+ String.format("Config param %s is not defined.", BaseResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH));
+ String configFactoryClass = config.getJobConfigFactory();
+ try {
+ InstallationFinder installFinder = new SimpleInstallationFinder(installationsPath,
+ ClassLoaderHelper.fromClassName(configFactoryClass));
+ return new SamzaTaskProxy(config, installFinder);
+ } catch (Exception e) {
+ LOG.error(String.format("Exception during instantiation through configFactory class: %s.", configFactoryClass), e);
+ throw new SamzaException(e);
+ }
+ }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java
new file mode 100644
index 0000000..54da8c9
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+/**
+ * TaskProxy is the primary abstraction that will be used by Rest API's to interact with tasks.
+ */
+public interface TaskProxy {
+
+ /**
+ * @param jobInstance the job instance to get the tasks for.
+ * @return a list of all the {@link Task} tasks that belongs to the {@link JobInstance}.
+ * Each task will have a preferred host and stream partitions assigned to it by
+ * the samza job coordinator.
+ * @throws IOException if there was a problem executing the command to get the tasks.
+ * @throws InterruptedException if the thread was interrupted while waiting for the result.
+ */
+ List getTasks(JobInstance jobInstance)
+ throws IOException, InterruptedException;
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
new file mode 100644
index 0000000..5e9957f
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory interface that will be used to create {@link TaskProxy}
+ * instances.
+ *
+ * To use a custom {@link TaskProxy}, create an implementation of this interface
+ * and instantiate the custom proxy in the getTaskProxy method. Set
+ * the config {@link TaskResourceConfig#CONFIG_TASK_PROXY_FACTORY}
+ * value to the appropriate factory implementation class.
+ */
+public interface TaskProxyFactory {
+
+ /**
+ *
+ * @param config the {@link Config} to pass to the proxy.
+ * @return the created proxy.
+ */
+ TaskProxy getTaskProxy(TaskResourceConfig config);
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
new file mode 100644
index 0000000..40cf706
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.rest.resources.BaseResourceConfig;
+import org.apache.samza.rest.resources.TasksResource;
+
+
+/**
+ * Configurations for the {@link TasksResource} endpoint.
+ */
+public class TaskResourceConfig extends BaseResourceConfig {
+
+ /**
+ * Specifies the canonical name of the {@link TaskProxyFactory} class to produce
+ * {@link TaskProxy} instances.
+ *
+ * To use your own proxy, implement the factory and specify the class for this config.
+ */
+ public static final String CONFIG_TASK_PROXY_FACTORY = "task.proxy.factory.class";
+
+ public TaskResourceConfig(Config config) {
+ super(config);
+ }
+
+ /**
+ * @see TaskResourceConfig#CONFIG_TASK_PROXY_FACTORY
+ * @return the canonical name of the {@link TaskProxyFactory} class to produce {@link TaskProxy} instances.
+ */
+ public String getTaskProxyFactory() {
+ return get(CONFIG_TASK_PROXY_FACTORY);
+ }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
new file mode 100644
index 0000000..eca8fdc
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class contains the common configurations that are
+ * shared between different samza-rest resources.
+ */
+public class BaseResourceConfig extends MapConfig {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseResourceConfig.class);
+
+ /**
+ * Specifies the canonical name of the {@link org.apache.samza.config.ConfigFactory} to read the job configs.
+ */
+ public static final String CONFIG_JOB_CONFIG_FACTORY = "job.config.factory.class";
+
+ /**
+ * The path where all the Samza jobs are installed (unzipped). Each subdirectory of this path
+ * is expected to be a Samza job installation and corresponds to one {@link InstallationRecord}.
+ */
+ public static final String CONFIG_JOB_INSTALLATIONS_PATH = "job.installations.path";
+
+
+ public BaseResourceConfig(Config config) {
+ super(config);
+ }
+
+ /**
+ * @see BaseResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
+ * @return the path where all the Samza jobs are installed (unzipped).
+ */
+ public String getInstallationsPath() {
+ return sanitizePath(get(CONFIG_JOB_INSTALLATIONS_PATH));
+ }
+
+ /**
+ * @see BaseResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
+ * @return the config factory class that has to be used to parse job configurations. If the config key
+ * {@link BaseResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH} is not defined, then returns the {@link PropertiesConfigFactory} class name.
+ */
+ public String getJobConfigFactory() {
+ String configFactoryClassName = get(CONFIG_JOB_CONFIG_FACTORY);
+ if (configFactoryClassName == null) {
+ configFactoryClassName = PropertiesConfigFactory.class.getCanonicalName();
+ LOG.warn("{} not specified. Defaulting to {}", CONFIG_JOB_CONFIG_FACTORY, configFactoryClassName);
+ }
+ return configFactoryClassName;
+ }
+
+ /**
+ * Ensures a usable file path when the user specifies a tilde for the home path.
+ *
+ * @param rawPath the original path.
+ * @return the updated path with the tilde resolved to home.
+ */
+ private static String sanitizePath(String rawPath) {
+ if (rawPath == null) {
+ return null;
+ }
+ return rawPath.replaceFirst("^~", System.getProperty("user.home"));
+ }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
index e0224c6..0c41f55 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
@@ -18,10 +18,10 @@
*/
package org.apache.samza.rest.resources;
-import java.util.Collections;
+import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.samza.config.Config;
-import org.apache.samza.rest.SamzaRestConfig;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
/**
@@ -30,6 +30,7 @@ import org.apache.samza.rest.SamzaRestConfig;
public class DefaultResourceFactory implements ResourceFactory {
@Override
public List extends Object> getResourceInstances(Config config) {
- return Collections.singletonList(new JobsResource(new JobsResourceConfig(config)));
+ return ImmutableList.of(new JobsResource(new JobsResourceConfig(config)),
+ new TasksResource(new TaskResourceConfig(config)));
}
}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
index a566db5..caad56c 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
@@ -37,7 +37,6 @@ import org.apache.samza.rest.proxy.job.JobProxyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* The REST resource for jobs. Handles all requests for the jobs collection
* or individual job instances.
@@ -73,7 +72,7 @@ public class JobsResource {
return Response.ok(jobProxy.getAllJobStatuses()).build();
} catch (Exception e) {
log.error("Error in getInstalledJobs.", e);
- return errorResponse(e.getMessage());
+ return Responses.errorResponse(e.getMessage());
}
}
@@ -105,7 +104,7 @@ public class JobsResource {
return Response.ok(job).build();
} catch (Exception e) {
log.error("Error in getJob.", e);
- return errorResponse(e.getMessage());
+ return Responses.errorResponse(e.getMessage());
}
}
@@ -155,21 +154,10 @@ public class JobsResource {
}
} catch (IllegalArgumentException e) {
log.info(String.format("Illegal arguments updateJobStatus. JobName:%s JobId:%s Status=%s", jobName, jobId, status), e);
- return Response.status(Response.Status.BAD_REQUEST).entity(
- Collections.singletonMap("message", e.getMessage())).build();
+ return Responses.badRequestResponse(e.getMessage());
} catch (Exception e) {
log.error("Error in updateJobStatus.", e);
- return errorResponse(String.format("Error type: %s message: %s", e.toString(), e.getMessage()));
+ return Responses.errorResponse(String.format("Error type: %s message: %s", e.toString(), e.getMessage()));
}
}
-
- /**
- * Constructs a consistent format for error responses. This method should be used for every error case.
- *
- * @param message the error message to report.
- * @return the {@link Response} containing the error message.
- */
- private Response errorResponse(String message) {
- return Response.serverError().entity(Collections.singletonMap("message", message)).build();
- }
}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
index 527482d..bd52e65 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
@@ -19,8 +19,6 @@
package org.apache.samza.rest.resources;
import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.rest.proxy.installation.InstallationRecord;
import org.apache.samza.rest.proxy.job.JobProxy;
import org.apache.samza.rest.proxy.job.JobProxyFactory;
@@ -28,7 +26,8 @@ import org.apache.samza.rest.proxy.job.JobProxyFactory;
/**
* Configurations for the {@link JobsResource} endpoint.
*/
-public class JobsResourceConfig extends MapConfig {
+public class JobsResourceConfig extends BaseResourceConfig {
+
/**
* Specifies the canonical name of the {@link JobProxyFactory} class to produce
* {@link JobProxy} instances.
@@ -37,48 +36,15 @@ public class JobsResourceConfig extends MapConfig {
*/
public static final String CONFIG_JOB_PROXY_FACTORY = "job.proxy.factory.class";
- /**
- * The path where all the Samza jobs are installed (unzipped). Each subdirectory of this path
- * is expected to be a Samza job installation and corresponds to one {@link InstallationRecord}.
- */
- public static final String CONFIG_JOB_INSTALLATIONS_PATH = "job.installations.path";
-
- /**
- * Specifies the canonical name of the {@link org.apache.samza.config.ConfigFactory} to read the job configs.
- */
- public static final String CONFIG_JOB_CONFIG_FACTORY = "job.config.factory.class";
-
public JobsResourceConfig(Config config) {
super(config);
}
/**
- * @see JobsResourceConfig#CONFIG_JOB_CONFIG_FACTORY
+ * @see JobsResourceConfig#CONFIG_JOB_PROXY_FACTORY
* @return the canonical name of the {@link JobProxyFactory} class to produce {@link JobProxy} instances.
*/
public String getJobProxyFactory() {
return get(CONFIG_JOB_PROXY_FACTORY);
}
-
- /**
- * @see JobsResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
- * @return the path where all the Samza jobs are installed (unzipped).
- */
- public String getInstallationsPath() {
- return sanitizePath(get(CONFIG_JOB_INSTALLATIONS_PATH));
- }
-
- /**
- * Ensures a usable file path when the user specifies a tilde for the home path.
- *
- * @param rawPath the original path.
- * @return the updated path with the tilde resolved to home.
- */
- private static String sanitizePath(String rawPath) {
- if (rawPath == null) {
- return null;
- }
- return rawPath.replaceFirst("^~", System.getProperty("user.home"));
- }
-
}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java
new file mode 100644
index 0000000..9194d5a
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.Collections;
+import javax.ws.rs.core.Response;
+
+/**
+ * This is a helper class that holds the methods that are reusable
+ * across the different samza-rest resource endpoints.
+ */
+public class Responses {
+
+ private Responses() {
+ }
+
+ /**
+ * Constructs a consistent format for error responses.
+ *
+ * @param message the error message to report.
+ * @return the {@link Response} containing the error message.
+ */
+ public static Response errorResponse(String message) {
+ return Response.serverError().entity(Collections.singletonMap("message", message)).build();
+ }
+
+ /**
+ * Constructs a bad request (HTTP 400) response.
+ *
+ * @param message the bad request message to report.
+ * @return the {@link Response} containing the message.
+ */
+ public static Response badRequestResponse(String message) {
+ return Response.status(Response.Status.BAD_REQUEST).entity(Collections.singletonMap("message", message)).build();
+ }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java
new file mode 100644
index 0000000..301c202
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import com.google.common.base.Preconditions;
+import javax.inject.Singleton;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.task.TaskProxyFactory;
+import org.apache.samza.rest.proxy.task.SamzaTaskProxy;
+import org.apache.samza.rest.proxy.task.TaskProxy;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The REST resource for tasks. Handles the requests that are at the tasks scope.
+ */
+@Singleton
+@Path("/v1/jobs")
+public class TasksResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TasksResource.class);
+
+ private final TaskProxy taskProxy;
+
+ /**
+ * Initializes a TaskResource with {@link TaskProxy} from the
+ * {@link TaskProxyFactory} class specified in the configuration.
+ *
+ * @param config the configuration containing the {@link TaskProxyFactory} class.
+ */
+ public TasksResource(TaskResourceConfig config) {
+ String taskProxyFactory = config.getTaskProxyFactory();
+ Preconditions.checkArgument(StringUtils.isNotEmpty(taskProxyFactory),
+ String.format("Missing config: %s", TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY));
+ try {
+ TaskProxyFactory factory = ClassLoaderHelper.fromClassName(taskProxyFactory);
+ taskProxy = factory.getTaskProxy(config);
+ } catch (Exception e) {
+ LOG.error(String.format("Exception in building TasksResource with config: %s.", config), e);
+ throw new SamzaException(e);
+ }
+ }
+
+ /**
+ * Gets the list of {@link Task} for the job instance specified by jobName and jobId.
+ * @param jobName the name of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_NAME}
+ * @param jobId the id of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_ID}.
+ * @return a {@link javax.ws.rs.core.Response.Status#OK} {@link javax.ws.rs.core.Response}
+ * contains a list of {@link Task}, where each task belongs to
+ * the samza job. {@link javax.ws.rs.core.Response.Status#BAD_REQUEST} is returned for invalid
+ * job instances.
+ */
+ @GET
+ @Path("/{jobName}/{jobId}/tasks")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getTasks(
+ @PathParam("jobName") final String jobName,
+ @PathParam("jobId") final String jobId) {
+ try {
+ return Response.ok(taskProxy.getTasks(new JobInstance(jobName, jobId))).build();
+ } catch (IllegalArgumentException e) {
+ String message = String.format("Invalid arguments for getTasks. jobName: %s, jobId: %s.", jobName, jobId);
+ LOG.error(message, e);
+ return Responses.badRequestResponse(message);
+ } catch (Exception e) {
+ LOG.error(String.format("Error in getTasks with arguments jobName: %s, jobId: %s.", jobName, jobId), e);
+ return Responses.errorResponse(e.getMessage());
+ }
+ }
+}
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
index 7db437b..2a051c4 100644
--- a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
@@ -18,8 +18,8 @@
*/
package org.apache.samza.rest.resources;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
@@ -31,6 +31,8 @@ import org.apache.samza.rest.SamzaRestConfig;
import org.apache.samza.rest.model.Job;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.resources.mock.MockJobProxy;
+import org.apache.samza.rest.resources.mock.MockJobProxyFactory;
+import org.apache.samza.rest.resources.mock.MockResourceFactory;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -48,10 +50,11 @@ public class TestJobsResource extends JerseyTest {
@Override
protected Application configure() {
- Map map = new HashMap<>();
- map.put(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY, "org.apache.samza.rest.resources.mock.MockJobProxyFactory");
- map.put(JobsResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH, ".");
- SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map));
+ Map configMap = ImmutableMap.of(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY,
+ MockJobProxyFactory.class.getName(),
+ SamzaRestConfig.CONFIG_REST_RESOURCE_FACTORIES,
+ MockResourceFactory.class.getName());
+ SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
return new SamzaRestApplication(config);
}
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
new file mode 100644
index 0000000..63a9958
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.SamzaRestApplication;
+import org.apache.samza.rest.SamzaRestConfig;
+import org.apache.samza.rest.model.Partition;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
+import org.apache.samza.rest.resources.mock.MockJobProxy;
+import org.apache.samza.rest.resources.mock.MockResourceFactory;
+import org.apache.samza.rest.resources.mock.MockTaskProxy;
+import org.apache.samza.rest.resources.mock.MockTaskProxyFactory;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestTasksResource extends JerseyTest {
+ ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper();
+
+ @Override
+ protected Application configure() {
+ Map configMap = ImmutableMap.of(TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY,
+ MockTaskProxyFactory.class.getName(),
+ SamzaRestConfig.CONFIG_REST_RESOURCE_FACTORIES,
+ MockResourceFactory.class.getName());
+ SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
+ return new SamzaRestApplication(config);
+ }
+
+ @Test
+ public void testGetTasks()
+ throws IOException {
+ String requestUrl = String.format("v1/jobs/%s/%s/tasks", "testJobName", "testJobId");
+ Response response = target(requestUrl).request().get();
+ assertEquals(200, response.getStatus());
+ Task[] tasks = objectMapper.readValue(response.readEntity(String.class), Task[].class);
+ assertEquals(2, tasks.length);
+ List partitionList = ImmutableList.of(new Partition(MockTaskProxy.SYSTEM_NAME,
+ MockTaskProxy.STREAM_NAME,
+ MockTaskProxy.PARTITION_ID));
+
+ assertEquals(null, tasks[0].getPreferredHost());
+ assertEquals(MockTaskProxy.TASK_1_CONTAINER_ID, tasks[0].getContainerId());
+ assertEquals(MockTaskProxy.TASK_1_NAME, tasks[0].getTaskName());
+ assertEquals(partitionList, tasks[0].getPartitions());
+
+ assertEquals(null, tasks[1].getPreferredHost());
+ assertEquals(MockTaskProxy.TASK_2_CONTAINER_ID, tasks[1].getContainerId());
+ assertEquals(MockTaskProxy.TASK_2_NAME, tasks[1].getTaskName());
+ assertEquals(partitionList, tasks[1].getPartitions());
+ }
+
+ @Test
+ public void testGetTasksWithInvalidJobName()
+ throws IOException {
+ String requestUrl = String.format("v1/jobs/%s/%s/tasks", "BadJobName", MockJobProxy.JOB_INSTANCE_4_ID);
+ Response resp = target(requestUrl).request().get();
+ assertEquals(400, resp.getStatus());
+ final Map errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference