Connect(); 2016

Volume 31 Number 12

[Connect(); Intelligent Apps]

Extensibility in U-SQL Big Data Applications

By Michael Rys; 2016

The traditional focus on addressing the big V’s of Big Data—­volume, velocity and variety—during Big Data processing has mainly concentrated on providing a scalable platform to process the volume of data, on adding near-real-time processing capabilities and on offering the ability to process a variety of input data formats, from CSV over JSON to custom binary formats. One variety that’s often been somewhat of an afterthought is the variety associated with custom data processing—not only in terms of format, but also the ability to make it easy to extend your analysis with custom algorithms while preserving the declarative nature of the query language experience.

Some modern Big Data processing and query languages are starting to address this. In particular, U-SQL was designed from the ground up to combine the declarative power of a SQL-based language with the flexibility of using your existing code libraries and developing new custom algorithms.

In a previous article (bit.ly/1OtXM2K), I introduced U-SQL and showed how using the Microsoft .NET Framework type system together with the C#-based expression language in U-SQL makes it seamless to extend your analysis with custom code expressions. I explained how to use C# assemblies to define user-defined functions (UDFs) and use them in your U-SQL query scripts.

U-SQL not only allows you to add your own custom C# functions, but it also provides a framework in which you can add your own user-defined operators (UDOs), such as your own extractors, outputters, and rowset operators, such as processors, appliers, reducers, and custom combiners. The framework consists of two parts:

  1. .NET interfaces that provide the contract for you to build these operators in such a way that you can concentrate on your code, leaving the scaled-out execution to U-SQL. Note that the actual business-logic code doesn’t have to be implemented in .NET, as I’ll show later.
  2. U-SQL expressions such as EXTRACT and REDUCE that invoke the custom operators and execute them at scale on your data.

In this article, I’ll build on the previous article and show how you can use the U-SQL extensibility mechanisms to process a variety of different data ranging from JSON to image data. I’ll also show how to add your own operators.

Managing Your Custom Code in U-SQL

Before I start with some of the examples, let’s better understand how U-SQL can use your custom code.

As mentioned, U-SQL follows C# with its scalar expression language, which is being used in places such as U-SQL predicates and in the expressions in a select clause. For your custom code to become visible to the U-SQL compiler, the code must be packaged into a .NET assembly that must be referenced by the U-SQL script. To be able to reference the assembly, it must have been previously registered in the U-SQL metadata service using a CREATE ASSEMBLY statement.

Registering and Referencing U-SQL Assemblies I suggest using the Azure Data Lake Tools for Visual Studio (aka.ms/adltoolsvs), which make it easy to build and register assemblies that work with U-SQL. If you write your custom code in a “Class Library (For U-SQL Application)” project (see Figure 1), you can then write your code and build the project and directly register the generated assembly DLL file with a right-click (see Figure 2).

Class Library (For U-SQL Application) Project
Figure 1 Class Library (For U-SQL Application) Project

Registering a U-SQL Assembly
Figure 2 Registering a U-SQL Assembly

Then all you need in your U-SQL Script is the REFERENCE ASSEMBLY statement to make the public classes and methods usable in your U-SQL script, as shown in Figure 3.

Figure 3 Referring to a User-Defined Function from a Custom Assembly

REFERENCE ASSEMBLY master.TweetAnalysis;
USING tweet_fns = TweetAnalysis.Udfs;
@t =
  EXTRACT date string,
          time string,
          author string,
          tweet string
  FROM "/Samples/Data/Tweets/Tweets.csv"
  USING Extractors.Csv();
// Get the mentions from the tweet string
@m =
  SELECT origin
       , tweet_fns.get_mentions(tweet) AS mentions
       , author AS mentioned_by
FROM @t;
...

Using Existing Code with U-SQL Assemblies Often you’ll want to use existing code libraries or even non-.NET code. If you want to use non-.NET code—for example, a native library or even a completely different language runtime like Python or JavaScript—you must wrap the non-.NET code with a C# interoperability layer that will be called from U-SQL and that then calls the non-.NET code, marshaling the data between the components and implementing a UDO interface contract. In this case, the non-.NET code artifacts such as the native .dlls or the files of the different runtime need to be added as additional files. This can be done in the Additional File option of the assembly registration. These files are automatically deployed to every node when the .NET assembly gets referenced in a script and are made available to the .NET assembly’s working directory locally to that node.

To use existing .NET libraries, you need to register the existing code libraries as Managed Depen­dencies on your own assembly, or—if you reuse a library that you can use directly in U-SQL—register it directly in your U-SQL database. In either case, the script must reference all .NET assemblies needed by the script.

I’ll show some examples of these registration options in the remainder of the article, as I discuss some custom code scenarios in which it makes sense to use the extensibility model. These scenarios include: merging overlapping ranges with a custom reducer, processing JSON documents, processing image data and processing spatial data. I’ll discuss each in turn.

Merging Overlapping Ranges with a Custom Reducer

Let’s assume you have a log file that tracks when a user interacts with your service. Furthermore, let’s assume a user can interact with your service in multiple ways (for example, by conducting Bing searches from multiple devices or browser windows). As part of your U-SQL job that prepares the log file for later analysis, you want to merge overlapping ranges.

For example, if the input log file looks like Figure 4, then you want to merge the overlapping ranges for each user into Figure 5.

Figure 4 Log File with Overlapping Time Ranges

Start Time  End Time  User Name
5:00 AM  6:00 AM  ABC
5:00 AM  6:00 AM  XYZ
8:00 AM  9:00 AM  ABC
8:00 AM  10:00 AM  ABC
10:00 AM  2:00 PM  ABC
7:00 AM  11:00 AM  ABC
9:00 AM  11:00 AM  ABC
11:00 AM  11:30 AM  ABC
11:40 PM  11:59 PM  FOO
11:50 PM  0:40 AM  FOO

Figure 5 Log File After Merging  Overlapping Time Ranges

Start Time  End Time  User Name
5:00 AM  6:00 AM  ABC
5:00 AM  6:00 AM  XYZ
7:00 AM  2:00 PM  ABC
11:40 PM  0:40 AM  FOO

If you look at the problem, you’ll first notice you want to define something like a user-defined aggregation to combine the overlapping time intervals. However, if you look at the input data, you’ll notice that because the data isn’t ordered, you’ll either have to maintain the state for all possible intervals and then merge disjoint intervals as bridging intervals appear, or you need to preorder the intervals for each user name to make the merging of the intervals easier.

The ordered aggregation is simpler to scale out, but U-SQL doesn’t provide ordered user-defined aggregators (UDAGGs). In addition, UDAGGs normally produce one row per group, while in this case, I can have multiple rows per group if the ranges are disjoint ranges.

Luckily, U-SQL provides a scalable UDO called a reducer (bit.ly/2evGsDA) that can aggregate a set of rows based on a grouping key set using custom code.

Let’s first write the U-SQL logic where ReduceSample.Range­Reducer is our user-defined reducer (Reducer UDO) from the RangeReducer assembly, and the log data is located in the file /Samples/Blogs/MRys/Ranges/ranges.txt (bit.ly/2eseZyw) and uses “-” as the column delimiter. Here’s the code:

REFERENCE ASSEMBLY RangeReducer;
@in = EXTRACT start DateTime, end DateTime, user string
FROM "/Samples/Blogs/MRys/Ranges/ranges.txt"
USING Extractors.Text(delimiter:'-');
@r =  REDUCE @in PRESORT start ON user
      PRODUCE start DateTime, end DateTime, user string
      READONLY user
      USING new ReduceSample.RangeReducer();
OUTPUT @r
TO "/temp/result.csv"
USING Outputters.Csv();

The REDUCE expression takes the rowset @in as input, partitions it based on the user column, presorts the partitions based on the values in the start column and applies the RangeReducer, producing the same rowset schema on the output. Because the reducer only adjusts the range from start to end, it actually doesn’t touch the user column, so you mark it as READONLY. This gives the reducer framework the permission to pass the data through automatically for that column and, in return, allows the U-SQL query processor to aggressively apply optimizations around read-only columns, such as to push predicates on a read-only column ahead of the reducer.

The way to write a reducer is to implement an instance of Microsoft.Analytics.Interfaces.IReducer. In this case, because you don’t need to provide any parameters, you only need to overwrite the abstract Reduce method. You can copy the code into a C# Library for U-SQL and register it as the assembly RangeReducer as explained earlier. Figure 6 shows the implementation of the RangeReducer. (Note that normal code indentation practices have been altered in some code samples because of space constraints.)

Figure 6 C# Implementation of the RangeReducer

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ReduceSample
{
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Init aggregation values
      bool first_row_processed = false;
      var begin = DateTime.MaxValue;
      var end = DateTime.MinValue;
      // Requires that the reducer is PRESORTED on begin and
      // READONLY on the reduce key.
      foreach (var row in input.Rows)
      {
        // Initialize the first interval with the first row if i is 0
       if (!first_row_processed)
        {
         first_row_processed = true; // Mark that the first row was handled
          begin = row.Get<DateTime>("start");
          end = row.Get<DateTime>("end");
          // If the end is just a time and not a date, it can be earlier
          // than the begin, indicating it is on the next day;
          // this let's you fix up the end to the next day in that case
          if (end < begin) { end = end.AddDays(1); }
        }
        else // Handle the remaining rows
        {
          var b = row.Get<DateTime>("start");
          var e = row.Get<DateTime>("end");
          // Fix up the date if end is earlier than begin
          if (e < b) { e = e.AddDays(1); }
          // If begin time is still inside the interval,
          // increase the interval if it is longer
          if (b <= end)
          {
            // If the new end time is later than the current,
            // extend the interval
            if (e > end) { end = e; }
          }
          else // Output the previous interval and start a new one
          {
            output.Set<DateTime>("start", begin);
            output.Set<DateTime>("end", end);
            yield return output.AsReadOnly();
            begin = b; end = e;
          } // if
        } // if
      } // foreach
      // Now output the last interval
      output.Set<DateTime>("start", begin);
      output.Set<DateTime>("end", end);
      yield return output.AsReadOnly();
    } // Reduce
  } // RangeReducer
} // ReduceSample

The U-SQL REDUCE expression will apply the Reduce method once for each distinct partition key in parallel. The input parameter thus will only contain the rows for a given group and the implementation can return zero to many rows as output.

Because the PRESORT clause guarantees the rows are ordered, the inner logic can assume the data is ordered, and because the user column is marked as READONLY, the column will be passed through automatically and you can write your UDO code more generically by focusing just on the columns you want to transform.

If you now apply the reducer on a large set of data, and if some of your users might be using your system much more frequently than others, you’ll encounter something called data skew where some users have large partitions and others only small partitions. Because the contract of the reducer is guaranteed to see all data for that partition, all data must be shuffled to that node and read in one call. Because of this requirement, such data skew in the best case can lead to some partitions taking much longer than others to process, and in the worst case can lead to some reducers running out of the available memory and time resources (a U-SQL vertex will time out after running for about five hours).

If the reducer semantics are associative and commutative and its output schema is the same as its input schema, then a reducer can be marked as recursive, which allows the query engine to split large groups into smaller sub-groups and recursively apply the reducer on these sub-groups to calculate the final result. This recursive application allows the reducer to better balance and parallelize in the presence of data skew. A reducer is marked as recursive by using the property annotation SqlUserDefinedReducer(IsRecursive = true):

namespace ReduceSample
{
  [SqlUserDefinedReducer(IsRecursive = true)]
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Insert the code from Figure 6 here
    } // Reduce
  } // RangeReducer
} // ReduceSample

In our case, the reducer can be marked as recursive to improve scalability and performance, assuming the processing will preserve the sort among the rows in each recursive invocation.

You can find a Visual Studio project for the example on our GitHub repository at bit.ly/2ecLe5B.

Processing JSON Documents

One of the most frequent data formats after comma-separated text files is JSON. Unlike CSV file formats, U-SQL doesn’t provide a built-in JSON extractor. However, the U-SQL community has provided a sample assembly at bit.ly/2d9O4va that offers support for extracting and processing both JSON and XML documents.

This solution uses Newtonsoft’s Json.NET library (bit.ly/2evWJbz) for the heavy JSON lifting and System.XML for the XML processing. The assembly can extract data from a JSON document using the JsonExtractor (bit.ly/2dPARsM), take a JSON document and split it into a SqlMap to allow navigation and decomposition of JSON documents with the JsonTuple function (bit.ly/2e8tSuX) and finally transform a rowset into a JSON-formatted file with the JSONOutputter (bit.ly/2e4uv3W).

Notice the assembly is designed to be a generic JSON processor, which means it doesn’t make any assumption about the JSON document structure and needs to be resilient to the semi-structured nature of JSON, including heterogeneously typed elements (scalar versus structured, different datatypes for the same element, missing elements and so on). If you know your JSON documents adhere to a specific schema, you can possibly create a more efficient JSON extractor.

Unlike in the reducer example earlier, where you write your own assembly that you then deploy, in this case the solution is ready to be used. You can either load the solution from our GitHub repository into Visual Studio and build and deploy it yourself, or you can find the DLLs in the solution’s bin\Debug directory.

As mentioned earlier, the non-system dependency requires that both the Samples.Format and the Json.NET assemblies must be registered in the U-SQL metadata store (you can select the Newtonsoft assembly as Managed Dependency when registering the Format assembly using the Visual Studio tool) and both need to be referenced if you want to process JSON documents. Assuming you’ve installed your JSON assemblies in your U-SQL catalog under the name [Microsoft.Analytics.Samples.Formats] and [NewtonSoft.Json] in the U-SQL database JSONBlog (see Figure 7), you can use the assemblies by referencing them at the beginning of your scripts with:

REFERENCE ASSEMBLY JSONBlog.[NewtonSoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];

Registering the Formats Assembly in Visual Studio
Figure 7 Registering the Formats Assembly in Visual Studio

The JSON extractor implements the U-SQL IExtractor interface. Because JSON documents need to be fully parsed to make sure they’re well-formed, a file containing a single JSON document will need to be processed in a single Extractor vertex. Thus, you indicate that the extractor needs to see the full file content by setting the AtomicFileProcessing property to true (see Figure 8). The extractor can be called with an optional parameter called rowpath that allows us to identify the JSON objects that will each be mapped to a row using a JSONPath expression (bit.ly/1EmvgKO).

Figure 8 The JSON Extractor

[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class JsonExtractor : IExtractor
{
  private string rowpath;            
  public JsonExtractor(string rowpath = null)
  {
    this.rowpath = rowpath;
  }
  public override IEnumerable<IRow> Extract(
    IUnstructuredReader input, IUpdatableRow output)
  {
    // Json.NET
    using (var reader = new JsonTextReader(
      new StreamReader(input.BaseStream)))
    {
      // Parse Json
      var root = JToken.ReadFrom(reader);
      // Rows
      // All objects are represented as rows
      foreach (JObject o in SelectChildren(root, this.rowpath))
      {
        // All fields are represented as columns
        this.JObjectToRow(o, output);
        yield return output.AsReadOnly();
      }
    }
  }
}

The extractor implementation will pass the input stream that the U-SQL Extractor framework feeds into the extractor to the Json.NET JsonTextReader. Then it will use the rowpath to get the sub-trees being mapped to a row using SelectChildren. Because JSON objects can be heterogeneous, the code is returning the generic JObject instead of positional JArray or scalar values.

Note that this this extractor is loading the JSON document into memory. If your document is too big, it could cause an out-of-memory condition. In that case, you’d have to write your own extractor that streams through the document without having to load the full document into memory.

Now let’s use the JSON Extractor and the JSON tuple function to parse the complex JSON document from /Samples/Blogs/MRys/JSON/complex.json (bit.ly/2ekwOEQ) provided in Figure 9.

Figure 9 A JSON Example Document

[{
  "person": {
    "personid": 123456,
    "name": "Person 1",
    "addresses": {
      "address": [{
        "addressid": "2",
        "street": "Street 2",
        "postcode": "1234 AB",
        "city": "City 1"
      }, {
        "addressid": "2",
        "street": "Street 2",
        "postcode": "5678 CD",
        "city": "City 2"
      }]
    }
  }
}, {
     "person": {
     "personid": 798,
     "name": "Person 2",
     "addresses": {
       "address": [{
         "addressid": "1",
         "street": "Street 1",
         "postcode": "1234 AB",
         "city": "City 1"
     }, {
         "addressid": "4",
         "street": "Street 7",
         "postcode": "98799",
         "city": "City 3"
     }]
   }
  }
}]

The format is an array of person “objects” (technically objects with a single person key each) that in turn contain some person properties and address objects. The U-SQL script in Figure 10 extracts a row per person/address combination.

Figure 10 U-SQL Script Processing the Example JSON Document from Figure 9

DECLARE @input string = "/Samples/Blogs/MRys/JSON/complex.json";
REFERENCE ASSEMBLY JSONBlog.[Newtonsoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
@json =
  EXTRACT personid int,
          name string,
          addresses string
  FROM @input
  USING new JsonExtractor("[*].person");
@person =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(
           addresses, "address")["address"] AS address_array
  FROM @json;
@addresses =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(address) AS address
  FROM @person
       CROSS APPLY
         EXPLODE (JsonFunctions.JsonTuple(address_array).Values)
           AS A(address);
@result =
  SELECT personid,
         name,
         address["addressid"]AS addressid,
         address["street"]AS street,
         address["postcode"]AS postcode,
         address["city"]AS city
  FROM @addresses;
OUTPUT @result
TO "/output/json/persons.csv"
USING Outputters.Csv();

Notice the script passes the JSONPath expression [*].person to the extractor, thus generating a row for each person field in the top-level array. The EXTRACT schema is being used by the extractor to get the resulting object’s properties into columns. Because the addresses field is itself a nested JSON document, the first invocation of the JsonTuple function creates a map containing the address objects, which then are mapped to one row per address with the CROSS APPLY EXPLODE expression. Finally, all the address properties are projected out from the map data type to give you the rowset, as shown in Figure 11.

Figure 11 The Rowset Generated by Processing the JSON Document from Figure 9

123456 Person 1 2 Street 2 1234 AB City 1
123456 Person 1 2 Street 2 5678 CD City 2
798 Person 2 1 Street 1 1234 AB City 1
798 Person 2 4 Street 7 98799 City 3

You can find a Visual Studio project of the example and other JSON processing scenarios, including multiple JSON documents inside a file, on our GitHub repository at bit.ly/2dzceLv.

Processing Image Data

In this example, I’m processing some larger unstructured data: images. In particular, I want to process JPEG pictures and extract some of the JPEG EXIF properties, as well as create a thumbnail of the image. Fortunately, .NET provides a variety of image processing capabilities in the System.Drawing class. So all I need to do is build the U-SQL extension functions and operators, delegating the JPEG processing to these classes.

There are several ways to do this. An initial attempt might load all images as byte arrays into a rowset and then apply individual user-defined functions to extract each of the properties and create the thumbnail, as shown in Figure 12.

Figure 12 Processing Images in U-SQL by Loading Images into Rows

REFERENCE ASSEMBLY Images;
USING Images;
@image_data =
  EXTRACT image_data byte[]  // Max size of row is 4MB!
        , name string
        , format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new ImageExtractor();
// Use UDFs
@image_properties =
  SELECT ImageOps.getImageProperty(image_data, ImageProperties.copyright)
         AS image_copyright,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_make)
         AS image_equipment_make,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_model)
         AS image_equipment_model,
         ImageOps.getImageProperty(image_data, ImageProperties.description)
         AS image_description
  FROM @image_data
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");

However, this approach has some drawbacks:

  • U-SQL rows can be at most 4MB in size, thus limiting the solution to 4MB-sized images (minus the size of the other columns).
  • Each of the function invocations can add to the memory pressure and requires flowing the byte array through the U-SQL processing.

Therefore, a better approach is to do the property extraction and thumbnail creation directly inside the custom extractor. Figure 13 shows a revised U-SQL script.

Figure 13 Processing Images in U-SQL by Extracting the Features with an Extractor

REFERENCE ASSEMBLY Images;
@image_features =
  EXTRACT copyright string,
          equipment_make string,
          equipment_model string,
          description string,
          thumbnail byte[],
          name string,
          format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new Images.ImageFeatureExtractor(scaleWidth:500, scaleHeight:300);
@image_features =
  SELECT *
  FROM @image_features
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");
OUTPUT @image_features
TO @"/output/images/image_features.csv"
USING Outputters.Csv();
@scaled_image =
  SELECT thumbnail
  FROM @image_features
  WHERE name == "GT4";
OUTPUT @scaled_image
TO "/output/images/GT4_thumbnail_2.jpg"
USING new Images.ImageOutputter();

This script extracts the properties and the thumbnail from the images specified by the file-set pattern (bit.ly/2ektTY6): /Samples/Data/Images/{name}.{format}. The SELECT statement then restricts the extraction to JPEG files by using a predicate only on the format column that will eliminate all non-JPEG files from the extraction (the optimizer will only apply the extractor to the files that satisfy the predicate on the format column). The extractor provides the option to specify the thumbnail’s dimensions. The script then outputs the features into a CSV file and uses a simple byte-stream-level outputter to create a thumbnail file for one of the scaled-down images.

Figure 14 shows the implementation of the extractor.

Figure 14 The Image Feature Extractor

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Drawing;
using System.Drawing.Imaging;
using System.Drawing.Drawing2D;
namespace Images
{
  public static class UpdatableRowExtensions
  {
    public static void SetColumnIfExists<T>(this IUpdatableRow source
                                           , string colName, T value)
    {
      var colIdx = source.Schema.IndexOf(colName);
      if (colIdx != -1)
      { source.Set<T>(colIdx, value); }
    }
  }
  [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
  public class ImageFeatureExtractor : IExtractor
  {
    private int _scaleWidth, _scaleHeight;
    public ImageFeatureExtractor(int scaleWidth = 150, int scaleHeight = 150)
    { _scaleWidth = scaleWidth; _scaleHeight = scaleHeight; }
    public override IEnumerable<IRow> Extract(IUnstructuredReader input
                                             , IUpdatableRow output)
    {
      byte[] img = ImageOps.GetByteArrayforImage(input.BaseStream);
      using (StreamImage inImage = new StreamImage(img))
      {
        output.SetColumnIfExists("image", img);
        output.SetColumnIfExists("equipment_make",
          inImage.getStreamImageProperty(ImageProperties.equipment_make));
        output.SetColumnIfExists("equipment_model",
          inImage.getStreamImageProperty(ImageProperties.equipment_model));
        output.SetColumnIfExists("description",
          inImage.getStreamImageProperty(ImageProperties.description));
        output.SetColumnIfExists("copyright",
          inImage.getStreamImageProperty(ImageProperties.copyright));
        output.SetColumnIfExists("thumbnail",
          inImage.scaleStreamImageTo(this._scaleWidth, this._scaleHeight));
      }
      yield return output.AsReadOnly();
    }
  }
}

The extractor again needs to see the entire file and operates on the input.BaseStream but now creates only one Image in memory, unlike the script in Figure 12. The extractor also checks for each of the requested columns and only processes the data for the requested column names using the extension method SetColumnIfExists.

For more details, see the Visual Studio project on our GitHub site at bit.ly/2dngXCE.

Processing Spatial Data

In this example, I’m going to show how to use the SQL Server Spatial type assembly Microsoft.SqlServer.Types.dll in U-SQL. In particular, I want to use the spatial library functions in the U-SQL scripts as user-defined functions. As in the case of the JSON extractor discussed earlier, this means you want to register an already existing assembly in U-SQL without having to write your own assembly.

First, you need to download and install the assembly from the SQL Server 2016 feature pack (bit.ly/2dZTw1k). Select the 64-bit version of the installer (ENU\x64\SQLSysClrTypes.msi) to ensure you have the 64-bit version of the libraries.

The installer installs the managed assembly Microsoft.Sql­­Server.Types.dll into C:\Program Files (x86)\Microsoft SQL Server\130\SDK\Assemblies and the native assembly SqlServerSpatial130.dll into \Windows\System32\. Next, upload the assemblies into your Azure Data Lake Store (for example, into a folder called /upload/asm/spatial). Because the installer has installed the native library into the system folder c:\Windows\System32, you have to make sure that you either copy SqlServerSpatial130.dll out from that folder before uploading it, or make sure that the tool you use doesn’t perform File System Redirection (bit.ly/1TYm9YZ) on system folders. For example, if you want to upload it with the current Visual Studio ADL File Explorer, you’ll have to copy the file into another directory first, otherwise—as of the time of the writing of this article—you’ll get the 32-bit version uploaded (because Visual Studio is a 32-bit application that does File System Redirection in its ADL upload file selection window), and when you run a U-SQL script that calls into the native assembly, you’ll get the following (inner) error at runtime: “Inner exception from user expression: An attempt was made to load a program with an incorrect format. (Exception from HRESULT: 0x8007000B).”

After uploading the two assembly files, register them in a database named SQLSpatial with this script:

DECLARE @ASSEMBLY_PATH string = "/upload/asm/spatial/";
DECLARE @SPATIAL_ASM string = @ASSEMBLY_PATH+"Microsoft.SqlServer.Types.dll";
DECLARE @SPATIAL_NATIVEDLL string = @ASSEMBLY_PATH+"SqlServerSpatial130.dll";
CREATE DATABASE IF NOT EXISTS SQLSpatial;
USE DATABASE SQLSpatial;
DROP ASSEMBLY IF EXISTS SqlSpatial;
CREATE ASSEMBLY SqlSpatial
FROM @SPATIAL_ASM
WITH ADDITIONAL_FILES =
  (
    @SPATIAL_NATIVEDLL
  );

Note in this case you only register one U-SQL assembly and include the native assembly as a strong dependency to the U-SQL assembly. In order to use the spatial assemblies, you need only reference the U-SQL assembly and the additional file will automatically be made available for the assembly. Figure 15 shows a simple sample script using the spatial assembly.

Figure 15 Using the Spatial Capabilities in U-SQL

REFERENCE SYSTEM ASSEMBLY [System.Xml];
REFERENCE ASSEMBLY SQLSpatial.SqlSpatial;
USING Geometry = Microsoft.SqlServer.Types.SqlGeometry;
USING Geography = Microsoft.SqlServer.Types.SqlGeography;
USING SqlChars = System.Data.SqlTypes.SqlChars;
@spatial =
    SELECT * FROM (VALUES
                   // The following expression is not using the native DDL
                   ( Geometry.Point(1.0,1.0,0).ToString()),   
                   // The following expression is using the native DDL
                   ( Geometry.STGeomFromText(
                     new SqlChars("LINESTRING (100 100, 20 180, 180 180)"),
                     0).ToString())
                  ) AS T(geom);
OUTPUT @spatial
TO "/output/spatial.csv"
USING Outputters.Csv();

The SQL Types library has a dependency on the System.Xml assembly, so you need to reference it. Also, some of the methods are using the System.Data.SqlTypes types instead of the built-in C# types. Because System.Data is already included by default, you can simply reference the needed SQL type. The code in Figure 15 is available on our GitHub site at bit.ly/2dMSBm9.

Wrapping Up: Some Tips and Best Practices for UDOs

This article, while only scratching the surface of the powerful extensibility capabilities of U-SQL, has shown how the U-SQL extensibility mechanism allows you to reuse existing domain-spe­cific code while using the U-SQL extension framework to scale the processing out over the typical Big Data volume.

But such a powerful tool can also be misused easily, so here are some tips and best practice advice.

While custom data formats often need a custom extractor and potentially an outputter, one should consider very carefully if the data format can be extracted in parallel (such as CSV-type formats) or if the processing needs to see all the data in a single operator instance. Additionally, making the operators generic enough so processing only happens if a specific column is requested can also potentially improve performance.

When considering UDOs such as processors, reducers, combiners and appliers, it’s highly recommended to first consider a pure U-SQL solution that leverages built-in operators. For example, the range reducer script discussed earlier could actually be written with some clever use of windowing and ranking functions. Here are some reasons why you still might want to consider UDOs:

  • The logic needs to dynamically access the input or output schema of the rowset that’s being processed. For example, create a JSON document for the data in the row where the columns aren’t known ahead of time.
  • A solution using several user-defined functions in the SELECT expression creates too much memory pressure and you can write your code to be more memory-efficient in a processor UDO.
  • You need an ordered aggregator or an aggregator that produces more than one row per group, and you can’t write either with windowing functions.

When you’re using UDOs, you should always keep the following tips in mind:

  • Use the READONLY clause to allow pushing predicates through UDOs.
  • Use the REQUIRED clause to allow column pruning to be pushed through UDOs.
  • Hint Cardinality on your query expression that uses a UDO, should the query optimizer choose the wrong plan.

Michael Rys is a principal program manager at Microsoft. He has been doing data processing and query languages since the 1980s. He has represented Microsoft on the XQuery and SQL design committees and has taken SQL Server beyond relational with XML, Geospatial and Semantic Search. Currently he’s working on Big Data query languages such as SCOPE and U-SQL when he’s not enjoying time with his family underwater or at autocross. Follow him on Twitter: @MikeDoesBigData.

Thanks to the following Microsoft technical experts for reviewing this article: Clemens Szyperski, Ed Triou, Saveen Reddy and Michael Kadaner
Clemens Szyperski is a principal group engineering manager at Microsoft. For decades, his passion has been specialized languages, tools and approaches that ease the construction of complex software systems. He presently leads the Azure Data Lake U-SQL and Scope teams, unless he's away sailing with his family. Follow him on Twitter: @ClemensSzy.

Ed Triou is a principal development lead at Microsoft. For the past 20 years he has been focused on data programmability (ODBC, OLEDB, ADO.NET, JDBC, PHP and EDM), with a specialization in compilers and query languages (IDL, TSQl, LINQ to SQL/Entities, eSQL, SCOPE and U-SQL).  He presently leads the U-SQL compiler and language teams, trying to stay one step ahead of our external and internal businesses that daily depend on ADL and Cosmos at exabyte scale.

Saveen Reddy is a principal program manager at Microsoft focused on designing and building the Azure Data Lake Platform -- the components and experiences supporting all of Microsoft’s Big Data cloud services. Saveen holds a 100 percent completion rating for Metal Gear Solid V: The Phantom Pain. Follow him on Twitter: @saveenr

Michael Kadaner is a principal software engineer at Microsoft. Despite decades of experience in various areas of computer science and software development, he claims that writing programs is a precise art, and software can be bug-free. His true passion is solving complex algorithmic and engineering problems and implementing the solutions in concise and elegant code that is correct by design He divides his spare time between reading and do-it-yourself projects.