Encapsulating the ETL tasks in an Oozie workflow
From: Developing big data solutions on Microsoft Azure HDInsight
Having defined the individual tasks that make up the ETL process, the developers decided to encapsulate the ETL process in a workflow. This makes it easier to integrate the tasks into the existing telemetry application, and reuse them regularly. The workflow must execute these tasks in the correct order and, where appropriate, wait until each one completes before starting the next one.
A workflow defined in Oozie can fulfil these requirements, and enable automation of the entire process. Figure 1 shows an overview of the Oozie workflow that the developers implemented.
Figure 1 - The Oozie workflow for the ETL process
Note that, in addition to the tasks described earlier, a new task has been added that drops any existing Hive tables before processing the data. Because the Hive tables are INTERNAL, dropping them cleans up any data left by previous uploads. This task uses the following HiveQL code.
DROP TABLE gps;
DROP TABLE engine;
DROP TABLE brake;
DROP TABLE lap;
The workflow includes a fork, enabling the three Pig tasks that filter the individual data files to be executed in parallel. A join is then used to ensure that the next phase of the workflow doesn’t start until all three Pig jobs have finished.
If any of the tasks should fail, the workflow executes the “kill” task. This generates a message containing details of the error, abandons any subsequent tasks, and halts the workflow. As long as there are no errors, the workflow ends after the Sqoop task that loads the data into Azure SQL Database has completed.
Note
When executed, the workflow currently exits with an error. This is due to a fault in Oozie and is not an error in the scripts. For more information see A CoordActionUpdateXCommand gets queued for all workflows even if they were not launched by a coordinator.
Defining the workflow
The Oozie workflow shown in Figure 1 is defined in a Hadoop Process Definition Language (hPDL) file named workflow.xml. The file contains a <start> and an <end> element that define where the workflow process starts and ends, and a <kill> element that defines the action to take when prematurely halting the workflow if an error occurs.
The following code shows an outline of the workflow definition file with the contents of each action removed for clarity. The <start> element specifies that the first action to execute is the one named DropTables. Each <action> includes an <ok> element with a to attribute specifying the next action (or fork) to be performed, and an <error> element with a to attribute directing the workflow to the <kill> action in the event of a failure. Notice the <fork> and <join> elements that delineate the actions that can be executed in parallel.
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ETLWorkflow">
<start to="DropTables"/>
<action name="DropTables">
...
<ok to="CleanseData"/>
<error to="fail"/>
</action>
<fork name="CleanseData">
<path start="FilterGps" />
<path start="FilterEngine" />
<path start="FilterBrake" />
</fork>
<action name="FilterGps">
...
<ok to="CombineData"/>
<error to="fail"/>
</action>
<action name="FilterEngine">
...
<ok to="CombineData"/>
<error to="fail"/>
</action>
<action name="FilterBrake">
...
<ok to="CombineData"/>
<error to="fail"/>
</action>
<join name="CombineData" to="CreateTables" />
<action name="CreateTables">
...
<ok to="LoadLapTable"/>
<error to="fail"/>
</action>
<action name="LoadLapTable">
...
<ok to="TransferData"/>
<error to="fail"/>
</action>
<action name="TransferData">
...
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name="end"/>
</workflow-app>
Each action in the workflow is of a particular type, indicated by the first child element of the <action> element. For example, the following code shows the DropTables action, which uses Hive.
...
<action name="DropTables">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>hive-default.xml</value>
</property>
</configuration>
<script>droptables.hql</script>
</hive>
<ok to="CleanseData"/>
<error to="fail"/>
</action>
...
The DropTables action references the script droptables.hql, which contains the HiveQL code to drop any existing Hive tables. All the script files are stored in the same folder as the workflow.xml file. This folder also contains files used by the workflow to determine configuration settings for specific execution environments; for example, the hive-default.xml file referenced by all Hive actions contains the environment settings for Hive.
The FilterGps action, shown in the following code, is a Pig action that references the gps.pig script. This script contains the Pig Latin code to process the GPS data.
...
<action name="FilterGps">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>gps.pig</script>
</pig>
<ok to="CombineData"/>
<error to="fail"/>
</action>
...
The FilterEngine and FilterBrake actions are similar to the FilterGps action, but specify the appropriate value for the <script> element.
After the three filter actions have completed, following the <join> element in the workflow file, the CreateTables action generates the new internal Hive tables over the data, and the LoadLapTable action combines the data into the lap table. These are both Hive actions, defined as shown in the following code.
...
<action name="CreateTables">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>hive-default.xml</value>
</property>
</configuration>
<script>createtables.hql</script>
</hive>
<ok to="LoadLapTable"/>
<error to="fail"/>
</action>
<action name="LoadLapTable">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>hive-default.xml</value>
</property>
</configuration>
<script>loadlaptable.hql</script>
</hive>
<ok to="TransferData"/>
<error to="fail"/>
</action>
...
The final action is the TransferData action. This is a Sqoop action, defined as shown in the following code.
...
<action name="TransferData">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>--connect</arg>
<arg>${connectionString}</arg>
<arg>--table</arg>
<arg>${targetSqlTable}</arg>
<arg>--export-dir</arg>
<arg>${outputDir}</arg>
<arg>--input-fields-terminated-by</arg>
<arg>\t</arg>
<arg>--input-null-non-string</arg>
<arg>\\N</arg>
</sqoop>
<ok to="end"/>
<error to="fail"/>
</action>
...
Several of the values used by the actions in this workflow are parameters that are set in the job configuration, and are populated when the workflow executes. The syntax ${...} denotes a parameter that is populated at runtime. For example, the TransferData action includes an argument for the connection string to be used when connecting to Azure SQL database. The value for this argument is passed to the workflow as a parameter named connectionString. When running the Oozie workflow from a command line, the parameter values can be specified in a job.properties file as shown in the following example.
nameNode=wasbs://container-name@mystore.blob.core.windows.net
jobTracker=jobtrackerhost:9010
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=/racecar/oozieworkflow/
outputDir=/racecar/lap/
connectionString=jdbc:sqlserver://server-name.database.windows.net:1433;
database=database-name;user=user-name@server-name;password=password;encrypt=true;
trustServerCertificate=true;loginTimeout=30;
targetSqlTable=LapData
The ability to abstract settings in a separate file makes the ETL workflow more flexible. It can be easily adapted to handle future changes in the environment, such as a requirement to use alternative folder locations or a different Azure SQL Database instance.
Note
The job.properties file may contain sensitive information such as database connection strings and credentials (as in the example above). This file is uploaded to the cluster and so cannot easily be encrypted. Ensure you properly protect this file when it is stored outside of the cluster, such as on client machines that will initiate the workflow, by applying appropriate file permissions and computer security practices.
Executing the Oozie workflow interactively
To execute the Oozie workflow the administrators upload the workflow files and the job.properties file to the HDInsight cluster, and then run the following command on the cluster.
oozie job -oozie https://localhost:11000/oozie/
-config c:\ETL\job.properties
-run
When the Oozie job starts, the command line interface displays the unique ID assigned to the job. The administrators can then view the progress of the job by using the browser on the HDInsight cluster to display job status at https://localhost:11000/oozie/v0/job/the_unique_job_id?show=log.
With the Oozie workflow definition complete, the next stage it to automate its execution. This is described in Automating the ETL workflow.