Поделиться через


PROCESS Expression (U-SQL)

Summary

U-SQL provides the ability to write custom rowset processors in C# using the user-defined operator extensibility framework by implementing an IProcessor. For more information, see U-SQL Programmability Guide: User-Defined Processor.

A processor is being invoked with the PROCESS expression that provides the necessary information about the input rowset, the expected result schema as well as additional information that is useful for optimization.

A processor operates on a row at a time and produces zero or one row. If the processing should produce multiple rows per input, an Applier should be used instead.

Some common applications of processors are to normalize nested data into relational form, modify values, or generate new columns using existing columns.

Processors' strengths are in the context of performing complex C# processing where the processing depends on either the input or output schema, or when additional resources need to be accessed. Basically any tasks that are not easily accomplished with user-defined functions in the SELECT clause.

A processor provides limited optimization support, because an optimizer cannot reason about the procedural C# code defining the processor. For example, it cannot push predicates through to earlier statements unless the column used in the predicate is marked as read only. Therefore, it is recommended to instead use user-defined functions and put the logic into the SELECT clause and WHERE clause of a SELECT expression if possible.

Syntax

Process_Expression :=                                                                                    
    'PROCESS' Input_Rowset  
    Produce_Clause  
    [Readonly_Clause]  
    [Required_Clause]                                    
    Using_Clause.  

Remarks

  • Input_Rowset
    Specifies the input rowset that the processor will operate on as either the reference to a rowset name or by a nested rowset expression:

Syntax

  Input_Rowset :=                                                                                     
      Rowset | Rowset_Expression.

with the following semantics:

  • Rowset
    The two simplest rowset sources are a rowset variable such as @rowset that has been defined in a previous statement of the script, or a table that has been created in the account’s catalog:

Syntax

    Rowset :=                                                                                      
        Rowset_Variable | Identifier.

A table can be referenced either with its fully qualified 3-part name, within the current database context with a 2-part name, or within the current database and schema context with a single-part name.

Syntax

    Rowset_Expression :=                                                                           
        '(' Query_Expression ')'
    |   Function_Call
    |   External_Rowset_Expression.
    

The UDO programming model makes both the values and the schema of the input rowset available in the context of the processor's implementation.

  • Produce_Clause
    Specifies the rowset schema returned by the PROCESS expression.

Syntax

  Produce_Clause :=                                                                                   
      'PRODUCE' Column_Definition_List.
  • Column_Definition_List
    This list defines the schema of the processor. The returned columns are defined as a pair of column names and column types:

Syntax

    Column_Definition_List :=                                                                      
        Column_Definition { ',' Column_Definition}.
Column_Definition := Quoted_or_Unquoted_Identifier Built_in_Type.

Each column has an identifier that can be either a quoted or unquoted identifier. A column is typed with one of the U-SQL types that the processor supports.

The UDO programming model makes the specified rowset schema available to the implementation of the processor. An error is raised if the processor is producing a schema that is incompatible with the specified return schema.

  • Readonly_Clause
    The optional READONLY clause can help the UDO programmer to write more efficient code. For more information on how the UDO programmer can take advantage of this hint, see the U-SQL C# Developer’s Guide.

    The optional READONLY clause specifies that either all columns (if specified with *) or the specified columns are read only for the processor and will be passed through to the output using either the same name or the specified column name in parenthesis.

Syntax

  Readonly_Clause :=                                                                                  
      'READONLY' Star_Or_Readonly_Column_List.
Star_Or_Readonly_Column_List := '*' | Readonly_Column_List.
Readonly_Column_List := Readonly_Column { ',' Readonly_Column }.
Readonly_Column := Column_Identifier [Output_Column_Dependency_Alias].
Output_Column_Dependency_Alias := '(' Quoted_or_Unquoted_Identifier ')'.
  • Required_Clause
    The optional REQUIRED clause can help the UDO programmer to write more efficient code. For more information on how the UDO programmer can take advantage of this hint, see the U-SQL C# Developer’s Guide.

    The optional REQUIRED clause specifies that either all columns are required on input for the processor (if specified with *) or the specified columns are required. If a specified column is followed by a list of columns in parenthesis, then the input column is only required if the columns in that list are referenced from the output.

Syntax

  Required_Clause :=                                                                                  
      'REQUIRED' Star_Or_Required_Column_List.
Star_Or_Required_Column_List := '*' | Required_Column_List.
Required_Column_List := Required_Column { ',' Required_Column}.
Required_Column := Column_Identifier [Required_Output_Column_Dependency_List].
Required_Output_Column_Dependency_List := '(' Identifier_List ')'.
  • Using_Clause
    The USING clause specifies which processor should be used to transform the input rowset.

Syntax

  USING_Clause :=                                                                                     
      'USING' udo_expression.

The USING clause takes a C# expression that returns an instance of IProcessor. Users can write their own by implementing an IProcessor (see U-SQL Programmability Guide: User-Defined Processor for more details on how to write your own processor). Most commonly, the UDO expression is either the instantiation of a processor class of the form

USING new MyNameSpace.MyProcessor(parameter:"value")                          

or the invocation of a factory method

USING MyNameSpace.MyProcessorFactory(parameter:"value")                       

where parameter is a parameter of the processor.

Examples

User-Defined Processor - FullAddressProcessor
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;

namespace ReferenceGuide_Examples
{
    [SqlUserDefinedProcessor]
    public class FullAddressProcessor : IProcessor
    {
        public override IRow Process(IRow input, IUpdatableRow output)
        {
            string streetAddress = input.Get<string>("streetAddress");
            string city = input.Get<string>("city");
            string zipCode = input.Get<string>("zipCode");
            string country = input.Get<string>("country");
            string full_address = streetAddress + ", " + city + ", " + zipCode + " " + country;
            output.Set<string>("full_address", full_address);
            output.Set<Guid>("new_guid", Guid.NewGuid());
            return output.AsReadOnly();
        }
    }
}

Using User-Defined Processor - FullAddressProcessor
The processor generates a new column "full_address" by combining streetAddress, city, zipCode, and country. It also generates a new guid. Using Code-Behind from previous section, above.

// Dataset
@employees = 
    SELECT * FROM 
        ( VALUES
        (new Guid("c8fc966a-6144-4054-9170-6f05ff240812"), "Maria Anders", "Obere Str. 57", "Berlin", "12209", "Germany", "cell:030-0074321,office:030-0076545"),
        (new Guid("9499718f-1c21-4b78-84e7-3868a7fab280"), "Thomas Hardy", "120 Hanover Sq.", "London", "WA1 1DP", "UK","cell:(171) 555-7788,office:(171) 555-6750"),
        (new Guid("e1f04df2-b391-4a6c-a66a-9360626f3cdb"), "Elizabeth Lincoln", "23 Tsawassen Blvd.", "Tsawassen BC", "T2F 8M4", "Canada", "cell:(604) 555-4729,office:(604) 555-3745"),
        (new Guid("5609618a-a77a-4230-bae1-aee126b0f0ac"), "Patricio Simpson", "Cerrito 333", "Buenos Aires", "1010", "Argentina", "cell:(1) 135-5555,office:(1) 135-4892"),
        (new Guid("7ee97a9d-b00b-4b47-8846-020c53ec6f24"), "Yang Wang", "Hauptstr. 29", "Bern", "3012", "Switzerland", "cell:0452-076545")
        ) AS T(guid, Driver, streetAddress, city, zipCode, country, phoneNumbers);

// Data as is
OUTPUT @employees
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Process/FullAddressProcessorA.csv"
USING Outputters.Csv();

// Using processor
@results =
     PROCESS @employees
     PRODUCE new_guid Guid,
             Driver,
             full_address string,
             phoneNumbers
     READONLY Driver, phoneNumbers
     REQUIRED streetAddress, city, zipCode, country
     USING new ReferenceGuide_Examples.FullAddressProcessor ();

OUTPUT @results
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Process/FullAddressProcessorB.csv"
USING Outputters.Csv();

User-Defined Processor - CountryName
This is a modified example from Develop U-SQL user-defined operators for Azure Data Lake Analytics jobs. Please review the article for details.
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;

namespace ReferenceGuide_Examples
{
    public class CountryName : IProcessor
    {
        private static IDictionary<string, string> CountryTranslation = new Dictionary<string, string>
        {
            {
                "Deutschland", "Germany"
            },
            {
                "Schwiiz", "Switzerland"
            },
            {
                "UK", "United Kingdom"
            },
            {
                "USA", "United States of America"
            },
            {
                "中国", "PR China"
            }
        };

        public override IRow Process(IRow input, IUpdatableRow output)
        {
            string Country = input.Get<string>("Country");

            if (CountryTranslation.Keys.Contains(Country))
            {
                Country = CountryTranslation[Country];
            }

            output.Set<string>("Country", Country);
            return output.AsReadOnly();
        }
    }
}

Using User-Defined Processor - CountryName
Using Code-Behind from previous section, above.

@drivers = 
    SELECT * FROM 
        ( VALUES
        ("1", "Maria Anders", "Obere Str. 57", "Berlin", "12209", "Germany"),
        ("3", "Antonio Moreno", "Mataderos  2312", "México D.F.", "5023", "Mexico"),
        ("4", "Thomas Hardy", "120 Hanover Sq.", "London", "WA1 1DP", "UK"),
        ("5", "Christina Berglund", "Berguvsvägen  8", "Luleå", "S-958 22", "Sweden"),
        ("8", "Martín Sommer", "C/ Araquil, 67", "Madrid", "28023", "Spain"),
        ("9", "Laurence Lebihan", "12, rue des Bouchers", "Marseille", "13008", "France"),
        ("10", "Elizabeth Lincoln", "23 Tsawassen Blvd.", "Tsawassen", "T2F 8M4", "Canada"),
        ("14", "Yang Wang", "Hauptstr. 29", "Bern", "3012", "Switzerland"),
        ("32", "Howard Snyder", "2732 Baker Blvd.", "Eugene", "97403", "USA"),
        ("92", "邓小平", "", "牌坊村", "", "中国")
        ) AS T(UserID, Name, Address, City, PostalCode, Country);

 @drivers_CountryName =
     PROCESS @drivers
     PRODUCE UserID,
             Name,
             Address,
             City,
             PostalCode,
             Country
     READONLY UserID, Name, Address, City, PostalCode
     REQUIRED Country
     USING new ReferenceGuide_Examples.CountryName();    

OUTPUT @drivers_CountryName
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Process/CountryName.txt"
USING Outputters.Text(Encoding.Unicode);

Processor with ORDER BY and FETCH
The ORDER BY clause with FETCH allows the selection of a limited number of rows based on the specified order. This examples continues to use CountryName defined earlier.

// Same as previous example but only returns top 3 records ordered by Country
 @drivers_CountryName =
     PROCESS @drivers
     PRODUCE UserID,
             Name,
             Address,
             City,
             PostalCode,
             Country
     READONLY UserID, Name, Address, City, PostalCode
     REQUIRED Country
     USING new ReferenceGuide_Examples.CountryName()
     ORDER BY Country ASC FETCH 3 ROWS;    

OUTPUT @drivers_CountryName
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Process/CountryName_fetch3.txt"
USING Outputters.Text(Encoding.Unicode);

See Also