The ETL workflow

patterns & practices Developer Center

From: Developing big data solutions on Microsoft Azure HDInsight

The key tasks that the ETL workflow for the racecar telemetry data must perform are:

  • Serializing the sensor reading objects as files, and uploading them to Azure storage.
  • Filtering the data to remove readings that contain null values, and restructuring it into tabular format.
  • Combining the readings from each sensor into a single table.
  • Loading the combined sensor readings data into the table in Windows Azure SQL Database.

Figure 1 shows this workflow.

Figure 1 - The ETL workflow required to load racecar telemetry data into Azure SQL Database

Figure 1 - The ETL workflow required to load racecar telemetry data into Azure SQL Database

The team wants to integrate these tasks into the existing console application so that, after a test lap, the telemetry data is loaded into the database for later analysis.

Serializing and uploading the sensor readings

The first challenge in implementing the ETL workflow is to serialize each list of captured sensor reading objects into a file, and upload the files to Azure blob storage. There are numerous serialization formats that can be used to achieve this objective, but the team decided to use the Avro serialization format in order to include both the schema and the data in a single file. This enables a downstream data processing task in HDInsight to successfully read and parse the data, regardless of the programming language used to implement the data processing task.

Serializing the data using Avro

To use Avro serialization the application developer imported the Microsoft .NET Library for Avro package from NuGet into the solution and added a using statement that references the Microsoft.Hadoop.Avro.Container namespace. With this library in place the developer can use a FileStream object from the System.IO namespace to write sensor readings to a file in Avro format, applying a compression codec to minimize file size and optimize data load performance. The following code shows how a SequentialWriter instance for the type GpsReading serializes objects in the GpsReadings list to a file.

string gpsFile = new DirectoryInfo(".") + @"\gps.avro";

using (var buffer = new FileStream(gpsFile, FileMode.Create))
{
  // Serialize a sequence of GpsReading objects to stream.
  // Data will be compressed using Deflate codec.
  using (var w = AvroContainer.CreateWriter<GpsReading>(buffer, Codec.Deflate))
  {
    using (var writer = new SequentialWriter<GpsReading>(w, 24))
    {
      // Serialize the data to stream using the sequential writer.
      GpsReadings.ForEach(writer.Write);
    }
  }
  buffer.Close();
}

Similar code is used to serialize the engine and brake sensor data into files in the bin/debug folder of the solution.

Uploading the files to Azure storage

After the data for each sensor has been serialized to a file, the program must upload the files to the Azure blob storage container used by the HDInsight cluster. To accomplish this the developer imported the Microsoft .NET API for Hadoop WebClient package and added using statements that reference the Microsoft.Hadoop.WebHDFS and Microsoft.Hadoop.WebHDFS.Adapters namespaces. The developer can then use the WebHDFSClient class to connect to Azure storage and upload the files. The following code shows how this technique is used to upload the file containing the GPS sensor readings.

// Get Azure storage settings from App.Config.
var hdInsightUser = ConfigurationManager.AppSettings["HDInsightUser"];
var storageKey = ConfigurationManager.AppSettings["StorageKey"];
var storageName = ConfigurationManager.AppSettings["StorageName"];
var containerName = ConfigurationManager.AppSettings["ContainerName"];
var destFolder = ConfigurationManager.AppSettings["InputDir"];

// Upload GPS data.
var hdfsClient = new WebHDFSClient(
    hdInsightUser,
    new BlobStorageAdapter(storageName, storageKey, containerName, false));

Console.WriteLine("Uploading GPS data...");
await hdfsClient.CreateFile(gpsFile, destFolder + "gps.avro");

Notice that the settings used by the WebHDFSClient object are retrieved from the App.Config file. These settings include the credentials required to connect to the Azure storage account used by HDInsight and the path for the folder to which the files should be uploaded. In this scenario the InputDir configuration settings has the value /racecar/source/, so the GPS data file will be saved as /racecar/source/gps.avro.

Filtering and restructuring the data

After the data has been uploaded to Azure storage, HDInsight can be used to process the data and upload it to Azure SQL Database. In this scenario the first task is to filter the sensor data in the files to remove any readings that contain null values, and then restructure the data into tabular format. In the case of the engine and brake readings the data is already structured as a series of objects, each containing simple properties. Converting these objects to rows with regular columns is relatively straightforward.

However, the GPS data is a list of more complex GpsReading objects in which the Position property is a Location object that has Lat and Lon properties representing latitude and longitude coordinates. The presence of data values that are not easily translated into simple rows and columns led the developers to choose Pig as the appropriate tool to perform the initial processing. The following Pig Latin script was created to process the GPS data.

gps = LOAD '/racecar/source/gps.avro' USING AvroStorage();
gpsfilt = FILTER gps BY (Position IS NOT NULL) AND (Time IS NOT NULL);
gpstable = FOREACH gpsfilt GENERATE Time, FLATTEN(Position), Speed;
STORE gpstable INTO '/racecar/gps';

Note that the Pig Latin script uses the AvroStorage load function to load the data file. This load function enables Pig to read the schema and data from the Avro file, with the result that the script can use the properties of the serialized objects to refer to the data structures in the file. For example, the script filters the data based on the Position and Time properties of the objects that were serialized. The script then uses the FLATTEN function to extract the Lat and Lon values from the Position property, and stores the resulting data (which now consists of regular rows and columns) in the /racecar/gps folder using the default tab-delimited text file format.

Similar Pig Latin scripts named engine.pig and brake.pig were created to process the engine and brake data files.

Combining the readings into a single table

The Pig scripts that process the three Avro-format source files restructure the data for each sensor and store it in tab-delimited files. To combine the data in these files the developers decided to use Hive because of the simplicity it provides when querying tabular data structures. The first stage in this process was to create a script that builds Hive tables over the output files generated by Pig. For example, the following HiveQL code defines a table over the filtered GPS data.

CREATE TABLE gps (laptime STRING, lat DOUBLE, lon DOUBLE, speed FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE LOCATION '/racecar/gps';

Similar code was used for the tables that will hold the engine and brake data.

The script also defines the schema for a table named lap that will store the combined data. This script contains the following HiveQL code, which references a currently empty folder.

CREATE TABLE lap
(laptime STRING, lat DOUBLE, lon DOUBLE, speed FLOAT, revs FLOAT, oiltemp FLOAT, braketemp FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE LOCATION '/racecar/lap';

To combine the data from the three sensors and load it into the lap table, the developers used the following HiveQL statement.

FROM gps LEFT OUTER JOIN engine 
  ON (gps.laptime = engine.laptime) LEFT OUTER JOIN brake 
    ON (gps.laptime = brake.laptime)
INSERT INTO TABLE lap
SELECT gps.*, engine.revs, engine.oiltemp, brake.braketemp;

This code joins the data in the three tables based on a common time value (so that each row contains all of the readings for a specific time), and inserts all fields from the gps table, the revs and oiltemp fields from the engine table, and the braketemp field from the brake table, into the lap table.

Loading the combined data to SQL Database

After the ETL process has filtered and combined the data, it loads it into the LapData table in Azure SQL Database. To accomplish this the developers used Sqoop to copy the data from the folder on which the lap Hive table is based and transfer it to the database. The following command shows an example of how Sqoop can be used to perform this task.

Sqoop export --connect "jdbc:sqlserver://abcd1234.database.windows.net:1433;
                       database=MyDatabase;username=MyLogin@abcd1234;
                       password=Pa$$w0rd;logintimeout=30;"
             --table LapData
             --export-dir /racecar/lap
             --input-fields-terminated-by \t
             --input-null-non-string \\N

Now that each of the tasks for the workflow have been defined, they can be combined into a workflow definition. This is described in Encapsulating the ETL tasks in an Oozie workflow.

Next Topic | Previous Topic | Home | Community