Dela via


Automating the ETL workflow

patterns & practices Developer Center

From: Developing big data solutions on Microsoft Azure HDInsight

With the workflow tasks encapsulated in an Oozie workflow, the developers can automate the process by using the classes in the .NET API for Hadoop to upload the workflow files and initiate the Oozie job.

Uploading the workflow files

To implement code to upload the workflow files, the developer used the same WebHDFSClient class that was previously used to upload the serialized data files. The workflow files are stored in a folder named OozieWorkflow in the execution folder of the console application, and the following code in the UploadWorkflowFiles method uploads them to Azure storage.

var workflowLocalDir = new DirectoryInfo(@".\OozieWorkflow");

var hdfsClient = new WebHDFSClient(hdInsightUser,
    new BlobStorageAdapter(storageName, storageKey, containerName, false));

await hdfsClient.DeleteDirectory(workflowDir);

foreach (var file in workflowLocalDir.GetFiles())
{
  await hdfsClient.CreateFile(file.FullName, workflowDir + file.Name);
}

Notice that the code begins by deleting the workflow directory if it already exists in Azure blob storage, and then uploads each file from the local OozieWorkflow folder.

Initiating the Oozie job

To initiate the Oozie job the developers added using statements to reference the Microsoft.Hadoop.WebClient.OozieClient, Microsoft.Hadoop.WebClient.OozieClient.Contracts, and Newtonsoft.Json namespaces, and then added the following code to the application.

var hdInsightUser = ConfigurationManager.AppSettings["HDInsightuser"];
var hdInsightPassword = ConfigurationManager.AppSettings["HDInsightPassword"];
var storageName = ConfigurationManager.AppSettings["StorageName"];
var containerName = ConfigurationManager.AppSettings["ContainerName"];
var nameNodeHost = "wasbs://" + containerName +  "@" + storageName + ".blob.core.windows.net";
var workflowDir = ConfigurationManager.AppSettings["WorkflowDir"];
var outputDir = ConfigurationManager.AppSettings["OutputDir"];
var sqlConnectionString = ConfigurationManager.AppSettings["SqlConnectionString"];
var targetSqlTable = ConfigurationManager.AppSettings["TargetSqlTable"];
var clusterName = ConfigurationManager.AppSettings["ClusterName"];
var clusterAddress = "https://" + clusterName + ".azurehdinsight.net";
var clusterUri = new Uri(clusterAddress);

// Create an Oozie job and execute it.
Console.WriteLine("Starting Oozie workflow...");
var client = new OozieHttpClient(clusterUri, hdInsightUser, hdInsightPassword);

var prop = new OozieJobProperties(hdInsightUser, nameNodeHost, "jobtrackerhost:9010",
                                  workflowDir, inputDir, outputDir);

var parameters = prop.ToDictionary();
parameters.Add("oozie.use.system.libpath", "true");
parameters.Add("exportDir", outputDir);
parameters.Add("targetSqlTable", targetSqlTable);
parameters.Add("connectionString", sqlConnectionString);

var newJob = await client.SubmitJob(parameters);
var content = await newJob.Content.ReadAsStringAsync();
var serializer = new JsonSerializer();
dynamic json = serializer.Deserialize(new JsonTextReader(new StringReader(content)));
string id = json.id;

await client.StartJob(id);
Console.WriteLine("Oozie job started");
Console.WriteLine("View workflow progress at " + clusterAddress + "/oozie/v0/job/" + id + "?show=log");

This code retrieves the parameters for the Oozie job from the App.Config file for the application, and initiates the job on the HDInsight cluster. When the job is submitted, its ID is retrieved and the application displays a message such as:

View workflow progress at https://mycluster.azurehdinsight.net/oozie/v0/job/*job\_id*?show=log

Users can then browse to the URL indicated by the application to view the progress of the Oozie job as it performs the ETL workflow tasks.

The final stage is to explore how the data in SQL Database can be used. An example is shown in Analyzing the loaded data.

Next Topic | Previous Topic | Home | Community