Sahara’s Elastic Data Processing facility or EDP allows the execution of jobs on clusters created from sahara. EDP supports:
The EDP features can be used from the sahara web UI which is described in the Sahara (Data Processing) UI User Guide.
The EDP features also can be used directly by a client through the REST api
Sahara EDP uses a collection of simple objects to define and execute jobs. These objects are stored in the sahara database when they are created, allowing them to be reused. This modular approach with database persistence allows code and data to be reused across multiple jobs.
The essential components of a job are:
These components are supplied through the objects described below.
A Job Binary object stores a URL to a single script or Jar file and any credentials needed to retrieve the file. The file itself may be stored in the sahara internal database, in swift, or in manila.
Files in the sahara database are stored as raw bytes in a Job Binary Internal object. This object’s sole purpose is to store a file for later retrieval. No extra credentials need to be supplied for files stored internally.
Sahara requires credentials (username and password) to access files stored in swift unless swift proxy users are configured as described in Sahara Advanced Configuration Guide. The swift service must be running in the same OpenStack installation referenced by sahara.
To reference a binary file stored in manila, create the job binary with the URL manila//{share_id}/{path}. This assumes that you have already stored that file in the appropriate path on the share. The share will be automatically mounted to any cluster nodes which require access to the file, if it is not mounted already.
There is a configurable limit on the size of a single job binary that may be retrieved by sahara. This limit is 5MB and may be set with the job_binary_max_KB setting in the sahara.conf configuration file.
A Job object specifies the type of the job and lists all of the individual Job Binary objects that are required for execution. An individual Job Binary may be referenced by multiple Jos. A Job object specifies a main binary and/or supporting libraries depending on its type:
Job type Main binary Libraries Hive required optional Pig required optional MapReduce not used required MapReduce.Streaming not used optional Java not used required Shell required optional Spark required optional
A Data Source object stores a URL which designates the location of input or output data and any credentials needed to access the location.
Sahara supports data sources in swift. The swift service must be running in the same OpenStack installation referenced by sahara.
Sahara also supports data sources in HDFS. Any HDFS instance running on a sahara cluster in the same OpenStack installation is accessible without manual configuration. Other instances of HDFS may be used as well provided that the URL is resolvable from the node executing the job.
Sahara supports data sources in manila as well. To reference a path on an NFS share as a data source, create the data source with the URL manila//{share_id}/{path}. As in the case of job binaries, the specified share will be automatically mounted to your cluster’s nodes as needed to access the data source.
Some job types require the use of data source objects to specify input and output when a job is launched. For example, when running a Pig job the UI will prompt the user for input and output data source objects.
Other job types like Java or Spark do not require the user to specify data sources. For these job types, data paths are passed as arguments. For convenience, sahara allows data source objects to be referenced by name or id. The section Using Data Source References as Arguments gives further details.
Job objects must be launched or executed in order for them to run on the cluster. During job launch, a user specifies execution details including data sources, configuration values, and program arguments. The relevant details will vary by job type. The launch will create a Job Execution object in sahara which is used to monitor and manage the job.
To execute Hadoop jobs, sahara generates an Oozie workflow and submits it to the Oozie server running on the cluster. Familiarity with Oozie is not necessary for using sahara but it may be beneficial to the user. A link to the Oozie web console can be found in the sahara web UI in the cluster details.
For Spark jobs, sahara uses the spark-submit shell script and executes the Spark job from the master node. Logs of spark jobs run by sahara can be found on the master node under the /tmp/spark-edp directory.
The general workflow for defining and executing a job in sahara is essentially the same whether using the web UI or the REST API.
The workflow is simpler when using existing objects. For example, to construct a new job which uses existing binaries and input data a user may only need to perform steps 3, 5, and 6 above. Of course, to repeat the same job multiple times a user would need only step 6.
Jobs can be configured at launch. The job type determines the kinds of values that may be set:
Job type Configuration Values Parameters Arguments Hive Yes Yes No Pig Yes Yes Yes MapReduce Yes No No MapReduce.Streaming Yes No No Java Yes No Yes Shell Yes Yes Yes Spark Yes No Yes
These values can be set on the Configure tab during job launch through the web UI or through the job_configs parameter when using the /jobs/<job_id>/execute REST method.
In some cases sahara generates configuration values or parameters automatically. Values set explicitly by the user during launch will override those generated by sahara.
Sometimes it’s necessary or desirable to pass a data path as an argument to a job. In these cases, a user may simply type out the path as an argument when launching a job. If the path requires credentials, the user can manually add the credentials as configuration values. However, if a data source object has been created that contains the desired path and credentials there is no need to specify this information manually.
As a convenience, sahara allows data source objects to be referenced by name or id in arguments, configuration values, or parameters. When the job is executed, sahara will replace the reference with the path stored in the data source object and will add any necessary credentials to the job configuration. Referencing an existing data source object is much faster than adding this information by hand. This is particularly useful for job types like Java or Spark that do not use data source objects directly.
There are two job configuration parameters that enable data source references. They may be used with any job type and are set on the Configuration tab when the job is launched:
edp.substitute_data_source_for_name (default False) If set to True, causes sahara to look for data source object name references in configuration values, arguments, and parameters when a job is launched. Name references have the form datasource://name_of_the_object.
For example, assume a user has a WordCount application that takes an input path as an argument. If there is a data source object named my_input, a user may simply set the edp.substitute_data_source_for_name configuration parameter to True and add datasource://my_input as an argument when launching the job.
edp.substitute_data_source_for_uuid (default False) If set to True, causes sahara to look for data source object ids in configuration values, arguments, and parameters when a job is launched. A data source object id is a uuid, so they are unique. The id of a data source object is available through the UI or the sahara command line client. A user may simply use the id as a value.
In order to better document your job for cluster operators (or for yourself in the future), sahara allows the addition of an interface (or method signature) to your job template. A sample interface for the Teragen Hadoop example might be:
Name Mapping Type Location Value Type Required Default Example Class args 0 string false teragen Rows args 1 number true unset Output Path args 2 data_source false hdfs://ip:port/path Mapper Count configs mapred.map.tasks number false unset
A “Description” field may also be added to each interface argument.
To create such an interface via the REST API, provide an “interface” argument, the value of which consists of a list of JSON objects, as below:
[
{
"name": "Example Class",
"description": "Indicates which example job class should be used."
"mapping_type": "args",
"location": "0",
"value_type": "string",
"required": false,
"default": "teragen"
},
# Other arguments above here, as JSON objects
]
Creating this interface would allow you to specify a configuration for any execution of the job template by passing an “interface” map similar to:
{
"Rows": "1000000",
"Mapper Count": "3",
"Output Path": "hdfs://mycluster:8020/user/myuser/teragen-output"
}
The specified arguments would be automatically placed into the args, configs, and params for the job, according to the mapping type and location fields of each interface argument. The final job_configs map would be:
{
"job_configs": {
"configs": {"mapred.map.tasks": "3"},
"args" ["teragen", "1000000", "hdfs://mycluster:8020/user/myuser/teragen-output"]
}
}
Rules for specifying an interface are as follows:
The additional one-time complexity of specifying an interface on your template allows a simpler repeated execution path, and also allows us to generate a customized form for your job in the Horizon UI. This may be particularly useful in cases in which an operator who is not a data processing job developer will be running and administering the jobs.
If swift proxy users are not configured (see Sahara Advanced Configuration Guide) and a job is run with data source objects containing swift paths, sahara will automatically generate swift username and password configuration values based on the credentials in the data sources. If the input and output data sources are both in swift, it is expected that they specify the same credentials.
The swift credentials may be set explicitly with the following configuration values:
Name fs.swift.service.sahara.username fs.swift.service.sahara.password
Setting the swift credentials explicitly is required when passing literal swift paths as arguments instead of using data source references. When possible, use data source references as described in Using Data Source References as Arguments.
Sahara will automatically generate values for the INPUT and OUTPUT parameters required by Hive based on the specified data sources.
Sahara will automatically generate values for the INPUT and OUTPUT parameters required by Pig based on the specified data sources.
For Pig jobs, arguments should be thought of as command line arguments separated by spaces and passed to the pig shell.
Parameters are a shorthand and are actually translated to the arguments -param name=value
Important!
If the job type is MapReduce, the mapper and reducer classes must be specified as configuration values.
Note that the UI will not prompt the user for these required values; they must be added manually with the Configure tab.
Make sure to add these values with the correct names:
Name Example Value mapred.mapper.class org.apache.oozie.example.SampleMapper mapred.reducer.class org.apache.oozie.example.SampleReducer
Important!
If the job type is MapReduce.Streaming, the streaming mapper and reducer classes must be specified.
In this case, the UI will prompt the user to enter mapper and reducer values on the form and will take care of adding them to the job configuration with the appropriate names. If using the python client, however, be certain to add these values to the job configuration manually with the correct names:
Name Example Value edp.streaming.mapper /bin/cat edp.streaming.reducer /usr/bin/wc
Data Source objects are not used directly with Java job types. Instead, any input or output paths must be specified as arguments at job launch either explicitly or by reference as described in Using Data Source References as Arguments. Using data source references is the recommended way to pass paths to Java jobs.
If configuration values are specified, they must be added to the job’s Hadoop configuration at runtime. There are two methods of doing this. The simplest way is to use the edp.java.adapt_for_oozie option described below. The other method is to use the code from this example to explicitly load the values.
The following special configuration values are read by sahara and affect how Java jobs are run:
edp.java.main_class (required) Specifies the full name of the class containing main(String[] args)
A Java job will execute the main method of the specified main class. Any arguments set during job launch will be passed to the program through the args array.
oozie.libpath (optional) Specifies configuration values for the Oozie share libs, these libs can be shared by different workflows
edp.java.java_opts (optional) Specifies configuration values for the JVM
edp.java.adapt_for_oozie (optional) Specifies that sahara should perform special handling of configuration values and exit conditions. The default is False.
If this configuration value is set to True, sahara will modify the job’s Hadoop configuration before invoking the specified main method. Any configuration values specified during job launch (excluding those beginning with edp.) will be automatically set in the job’s Hadoop configuration and will be available through standard methods.
Secondly, setting this option to True ensures that Oozie will handle program exit conditions correctly.
At this time, the following special configuration value only applies when running jobs on a cluster generated by the Cloudera plugin with the Enable Hbase Common Lib cluster config set to True (the default value):
The edp-wordcount example bundled with sahara shows how to use configuration values, arguments, and swift data paths in a Java job type. Note that the example does not use the edp.java.adapt_for_oozie option but includes the code to load the configuration values explicitly.
A shell job will execute the script specified as main, and will place any files specified as libs in the same working directory (on both the filesystem and in HDFS). Command line arguments may be passed to the script through the args array, and any params values will be passed as environment variables.
Data Source objects are not used directly with Shell job types but data source references may be used as described in Using Data Source References as Arguments.
The edp-shell example bundled with sahara contains a script which will output the executing user to a file specified by the first command line argument.
Data Source objects are not used directly with Spark job types. Instead, any input or output paths must be specified as arguments at job launch either explicitly or by reference as described in Using Data Source References as Arguments. Using data source references is the recommended way to pass paths to Spark jobs.
Spark jobs use some special configuration values:
edp.java.main_class (required) Specifies the full name of the class containing the Java or Scala main method:
A Spark job will execute the main method of the specified main class. Any arguments set during job launch will be passed to the program through the args array.
edp.spark.adapt_for_swift (optional) If set to True, instructs sahara to modify the job’s Hadoop configuration so that swift paths may be accessed. Without this configuration value, swift paths will not be accessible to Spark jobs. The default is False.
The edp-spark example bundled with sahara contains a Spark program for estimating Pi.
Sahara uses custom URLs to refer to objects stored in swift, in manila, or in the sahara internal database. These URLs are not meant to be used outside of sahara.
Sahara swift URLs passed to running jobs as input or output sources include a ”.sahara” suffix on the container, for example:
swift://container.sahara/object
You may notice these swift URLs in job logs, however, you do not need to add the suffix to the containers yourself. sahara will add the suffix if necessary, so when using the UI or the python client you may write the above URL simply as:
swift://container/object
Sahara internal database URLs have the form:
internal-db://sahara-generated-uuid
This indicates a file object in the sahara database which has the given uuid as a key.
Manila NFS filesystem reference URLS take the form:
manila://share-uuid/path
This format should be used when referring to a job binary or a data source stored in a manila NFS share.
The OpenStack installation and the cluster launched from sahara must meet the following minimum requirements in order for EDP to function:
When a Hadoop job is executed, binaries are first uploaded to a cluster node and then moved from the node local filesystem to HDFS. Therefore, there must be an instance of HDFS available to the nodes in the sahara cluster.
If the swift service is not running in the OpenStack installation:
- Job binaries may only be stored in the sahara internal database
- Data sources require a long-running HDFS
If the swift service is running in the OpenStack installation:
- Job binaries may be stored in swift or the sahara internal database
- Data sources may be in swift or a long-running HDFS
Requirements for EDP support depend on the EDP job type and plugin used for the cluster. For example a Vanilla sahara cluster must run at least one instance of these processes to support EDP:
There are several things in EDP which require attention in order to work properly. They are listed on this page.
EDP allows running jobs on transient clusters. In this case the cluster is created specifically for the job and is shut down automatically once the job is finished.
Two config parameters control the behaviour of periodic clusters:
- periodic_enable - if set to ‘False’, sahara will do nothing to a transient cluster once the job it was created for is completed. If it is set to ‘True’, then the behaviour depends on the value of the next parameter.
- use_identity_api_v3 - set it to ‘False’ if your OpenStack installation does not provide keystone API v3. In that case sahara will not terminate unneeded clusters. Instead it will set their state to ‘AwaitingTermination’ meaning that they could be manually deleted by a user. If the parameter is set to ‘True’, sahara will itself terminate the cluster. The limitation is caused by lack of ‘trusts’ feature in Keystone API older than v3.
If both parameters are set to ‘True’, sahara works with transient clusters in the following manner:
- When a user requests for a job to be executed on a transient cluster, sahara creates such a cluster.
- Sahara drops the user’s credentials once the cluster is created but prior to that it creates a trust allowing it to operate with the cluster instances in the future without user credentials.
- Once a cluster is not needed, sahara terminates its instances using the stored trust. sahara drops the trust after that.