Share via


Initiating an Oozie workflow from a .NET application

patterns & practices Developer Center

From: Developing big data solutions on Microsoft Azure HDInsight

When an application needs to perform complex data processing as a sequence of dependent actions, you can define an Oozie workflow to coordinate the data processing tasks and initiate it from a .NET client application.

The Microsoft .NET API for Hadoop WebClient NuGet package is part of the .NET SDK for HDInsight, and includes the OozieHttpClient class. You can use this class to connect to the Oozie application on an HDInsight cluster and initiate an Oozie workflow job. The following example code shows a .NET client application that uploads the workflow files required by the Oozie workflow, and then initiates an Oozie workflow job that uses these files. The example is deliberately kept simple by including the credentials in the code so that you can copy and paste it while you are experimenting with HDInsight. In a production system you must protect credentials, as described in “Securing credentials in scripts and applications” in the Security section of this guide.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
using Microsoft.Hadoop.WebHDFS;
using Microsoft.Hadoop.WebHDFS.Adapters;
using Microsoft.Hadoop.WebClient;
using Microsoft.Hadoop.WebClient.OozieClient;
using Microsoft.Hadoop.WebClient.OozieClient.Contracts;
using Newtonsoft.Json;

namespace OozieClient
{
  class Program
  {
    const string hdInsightUser = "user-name";
    const string hdInsightPassword = "password";
    const string hdInsightCluster = "cluster-name";
    const string azureStore = "storage-account-name";
    const string azureStoreKey = "storage-account-key";
    const string azureStoreContainer = "container-name";
    const string workflowDir = "/data/oozieworkflow/";
    const string inputPath = "/data/source/";
    const string outputPath = "/data/output/";
    static void Main(string[] args)
    {
      try
      {
        UploadWorkflowFiles().Wait();
        CreateAndExecuteOozieJob().Wait();
      }
      catch (Exception ex)
      {
        Console.WriteLine(ex.Message);
      }
      finally
      {
        Console.WriteLine("Press a key to end");
        Console.Read();
      }
    }

    private static async Task UploadWorkflowFiles()
    {
      try
      {
        var workflowLocalDir = new DirectoryInfo(@".\oozieworkflow");
        var hdfsClient = new WebHDFSClient(hdInsightUser,
            new BlobStorageAdapter(azureStore, azureStoreKey, azureStoreContainer, false));
        Console.WriteLine("Uploading workflow files...");
        await hdfsClient.DeleteDirectory(workflowDir);
        foreach (var file in workflowLocalDir.GetFiles())
        {
          await hdfsClient.CreateFile(file.FullName, workflowDir + file.Name);
        }
      }
      catch (Exception ex)
      {
        Console.WriteLine("An error occurred while uploading files");
        throw (ex);
      }
    }

    private static async Task CreateAndExecuteOozieJob()
    {
      try
      {
        var nameNodeHost = "wasb://" + azureStoreContainer + "@" + azureStore 
                         + ".blob.core.windows.net";
        var tableName = "MyTable";
        var tableFolder = "/Data/MyTable";
        var clusterAddress = "https://" + hdInsightCluster + ".azurehdinsight.net";
        var clusterUri = new Uri(clusterAddress);

        // Create an oozie job and execute it.
        Console.WriteLine("Starting oozie workflow...");
        var client = new OozieHttpClient(clusterUri, hdInsightUser, hdInsightPassword);

        var prop = new OozieJobProperties(
                            hdInsightUser,
                            nameNodeHost,
                            "jobtrackerhost:9010",
                            workflowDir,
                            inputPath,
                            outputPath);

        var parameters = prop.ToDictionary();
        parameters.Add("oozie.use.system.libpath", "true");
        parameters.Add("TableName", tableName);
        parameters.Add("TableFolder", tableFolder);

        var newJob = await client.SubmitJob(parameters);
        var content = await newJob.Content.ReadAsStringAsync();
        var serializer = new JsonSerializer();
        dynamic json = serializer.Deserialize(new JsonTextReader(new StringReader(content)));
        string id = json.id;
        await client.StartJob(id);
        Console.WriteLine("Oozie job started");
        Console.WriteLine("View workflow progress at " + clusterAddress 
                        + "/oozie/v0/job/" + id + "?show=log");
      }
      catch (Exception ex)
      {
        Console.WriteLine("An error occurred while initiating the Oozie workflow job");
        throw (ex);
      }
    }
  }
}

Notice that an OozieJobProperties object contains the properties to be used by the workflow. These properties include configuration settings for Oozie as well as any parameters that are required by actions defined in the Oozie job. In this example the properties include parameters named TableName and TableFolder, which are used by the following action in the workflow.xml file that is uploaded in the oozieworkflow folder.

<action name='CreateTable'>
  <hive xmlns="uri:oozie:hive-action:0.2">
    <job-tracker>${jobTracker}</job-tracker>
    <name-node>${nameNode}</name-node>
    <configuration>
      <property>
        <name>mapred.job.queue.name</name>
        <value>default</value>
      </property>
      <property>
        <name>oozie.hive.defaults</name>
        <value>hive-default.xml</value>
      </property>
    </configuration>
    <script>CreateTable.q</script>
    <param>TABLE_NAME=${TableName}</param>
    <param>LOCATION=${TableFolder}</param>
  </hive>
  <ok to="end"/>
  <error to="fail"/>
</action>

This action passes the parameter values to the CreateTable.q file. This file is also in the oozieworkflow folder, and is shown in the following code example.

DROP TABLE IF EXISTS ${TABLE_NAME};

CREATE EXTERNAL TABLE ${TABLE_NAME} (id INT, val STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
STORED AS TEXTFILE LOCATION '${LOCATION}';

Next Topic | Previous Topic | Home | Community