Sdílet prostřednictvím


Průvodce programovatelností U-SQL – UDT a UDAGG

Použití uživatelem definovaných typů: UDT

Uživatelem definované typy (UDT) jsou další funkcí programovatelnosti U-SQL. U-SQL UDT funguje jako běžný uživatelem definovaný typ jazyka C#. C# je jazyk silného typu, který umožňuje použití předdefinovaných a vlastních uživatelsky definovaných typů.

U-SQL nemůže implicitně serializovat nebo deserializovat libovolné UDT, když je UDT předán mezi vrcholy v sadách řádků. To znamená, že uživatel musí poskytnout explicitní formátovací modul pomocí rozhraní IFormatter. To poskytuje U-SQL s metodami serializace a deserializace pro UDT.

Poznámka

Integrované extraktory a výstupní moduly U-SQL v současné době nemohou serializovat nebo deserializovat data UDT do souborů nebo z nich, a to ani se sadou IFormatter. Když tedy zapisujete data UDT do souboru pomocí příkazu OUTPUT nebo je čtete pomocí extraktoru, musíte je předat jako řetězec nebo pole bajtů. Potom explicitně zavoláte serializaci a deserializační kód (to znamená metodu ToString() UDT. Uživatelem definované extraktory a výstupní moduly na druhé straně můžou číst a zapisovat UDT.

Pokud se pokusíme použít UDT v EXTRACTORu nebo OUTPUTTERu (z předchozí funkce SELECT), jak je znázorněno tady:

@rs1 =
    SELECT
        MyNameSpace.Myfunction_Returning_UDT(filed1) AS myfield
    FROM @rs0;

OUTPUT @rs1
    TO @output_file
    USING Outputters.Text();

Zobrazí se následující chyba:

Error	1	E_CSC_USER_INVALIDTYPEINOUTPUTTER: Outputters.Text was used to output column myfield of type
MyNameSpace.Myfunction_Returning_UDT.

Description:

Outputters.Text only supports built-in types.

Resolution:

Implement a custom outputter that knows how to serialize this type, or call a serialization method on the type in
the preceding SELECT.	C:\Users\sergeypu\Documents\Visual Studio 2013\Projects\USQL-Programmability\
USQL-Programmability\Types.usql	52	1	USQL-Programmability

Abychom mohli pracovat s UDT ve výstupním modulu, musíme buď serializovat řetězec pomocí metody ToString(), nebo vytvořit vlastní výstupní modul.

UDT se v současné době v group by nedají použít. Pokud se UDT používá v group BY, vyvolá se následující chyba:

Error	1	E_CSC_USER_INVALIDTYPEINCLAUSE: GROUP BY doesn't support type MyNameSpace.Myfunction_Returning_UDT
for column myfield

Description:

GROUP BY doesn't support UDT or Complex types.

Resolution:

Add a SELECT statement where you can project a scalar column that you want to use with GROUP BY.
C:\Users\sergeypu\Documents\Visual Studio 2013\Projects\USQL-Programmability\USQL-Programmability\Types.usql
62	5	USQL-Programmability

Pokud chceme definovat UDT, musíme:

  1. Přidejte následující obory názvů:
using Microsoft.Analytics.Interfaces
using System.IO;
  1. Přidejte Microsoft.Analytics.Interfaces, což se vyžaduje pro rozhraní UDT. Kromě toho System.IO může být potřeba definovat rozhraní IFormatter.

  2. Definujte použitý typ pomocí atributu SqlUserDefinedType.

SqlUserDefinedType se používá k označení definice typu v sestavení jako uživatelem definovaný typ (UDT) v U-SQL. Vlastnosti atributu odrážejí fyzické charakteristiky UDT. Tuto třídu nelze dědit.

SqlUserDefinedType je požadovaný atribut pro definici UDT.

Konstruktor třídy:

  • SqlUserDefinedTypeAttribute (typ formatter)

  • Typ formatter: Povinný parametr k definování formátovače UDT – konkrétně typ IFormatter rozhraní musí být předán sem.

[SqlUserDefinedType(typeof(MyTypeFormatter))]
public class MyType
{ … }
  • Typické UDT také vyžaduje definici rozhraní IFormatter, jak je znázorněno v následujícím příkladu:
public class MyTypeFormatter : IFormatter<MyType>
{
    public void Serialize(MyType instance, IColumnWriter writer, ISerializationContext context)
    { … }

    public MyType Deserialize(IColumnReader reader, ISerializationContext context)
    { … }
}

Rozhraní IFormatter serializuje a deserializuje graf objektů s kořenovým typem <typeparamref name="T">.

<typeparam name="T">Kořenový typ grafu objektu pro serializaci a deserializaci.

  • Deserializace: De-serializuje data na poskytnutém streamu a rekonstituuje graf objektů.

  • Serializace: Serializuje objekt nebo graf objektů s daným kořenem do zadaného datového proudu.

MyType instance: Instance typu. IColumnWriter writer / IColumnReader reader: Podkladový sloupcový stream. ISerializationContext context: Výčet, který definuje sadu příznaků, které určují zdrojový nebo cílový kontext datového proudu během serializace.

  • Zprostředkující: Určuje, že zdrojový nebo cílový kontext není trvalé úložiště.

  • Trvalost: Určuje, že zdrojový nebo cílový kontext je trvalé úložiště.

Jako běžný typ jazyka C# může definice U-SQL UDT obsahovat přepsání operátorů, jako je +/==/!=. Může také zahrnovat statické metody. Pokud například použijeme tento UDT jako parametr agregační funkce U-SQL MIN, musíme definovat < přepsání operátoru.

Dříve v této příručce jsme si ukázali příklad identifikace fiskálního období od konkrétního data ve formátu Qn:Pn (Q1:P10). Následující příklad ukazuje, jak definovat vlastní typ pro hodnoty fiskálního období.

Následuje příklad oddílu kódu na pozadí s vlastním rozhraním UDT a IFormatter:

[SqlUserDefinedType(typeof(FiscalPeriodFormatter))]
public struct FiscalPeriod
{
    public int Quarter { get; private set; }

    public int Month { get; private set; }

    public FiscalPeriod(int quarter, int month):this()
    {
        this.Quarter = quarter;
        this.Month = month;
    }

    public override bool Equals(object obj)
    {
        if (ReferenceEquals(null, obj))
        {
            return false;
        }

        return obj is FiscalPeriod && Equals((FiscalPeriod)obj);
    }

    public bool Equals(FiscalPeriod other)
    {
return this.Quarter.Equals(other.Quarter) && this.Month.Equals(other.Month);
    }

    public bool GreaterThan(FiscalPeriod other)
    {
return this.Quarter.CompareTo(other.Quarter) > 0 || this.Month.CompareTo(other.Month) > 0;
    }

    public bool LessThan(FiscalPeriod other)
    {
return this.Quarter.CompareTo(other.Quarter) < 0 || this.Month.CompareTo(other.Month) < 0;
    }

    public override int GetHashCode()
    {
        unchecked
        {
            return (this.Quarter.GetHashCode() * 397) ^ this.Month.GetHashCode();
        }
    }

    public static FiscalPeriod operator +(FiscalPeriod c1, FiscalPeriod c2)
    {
return new FiscalPeriod((c1.Quarter + c2.Quarter) > 4 ? (c1.Quarter + c2.Quarter)-4 : (c1.Quarter + c2.Quarter), (c1.Month + c2.Month) > 12 ? (c1.Month + c2.Month) - 12 : (c1.Month + c2.Month));
    }

    public static bool operator ==(FiscalPeriod c1, FiscalPeriod c2)
    {
        return c1.Equals(c2);
    }

    public static bool operator !=(FiscalPeriod c1, FiscalPeriod c2)
    {
        return !c1.Equals(c2);
    }
    public static bool operator >(FiscalPeriod c1, FiscalPeriod c2)
    {
        return c1.GreaterThan(c2);
    }
    public static bool operator <(FiscalPeriod c1, FiscalPeriod c2)
    {
        return c1.LessThan(c2);
    }
    public override string ToString()
    {
        return (String.Format("Q{0}:P{1}", this.Quarter, this.Month));
    }

}

public class FiscalPeriodFormatter : IFormatter<FiscalPeriod>
{
    public void Serialize(FiscalPeriod instance, IColumnWriter writer, ISerializationContext context)
    {
        using (var binaryWriter = new BinaryWriter(writer.BaseStream))
        {
            binaryWriter.Write(instance.Quarter);
            binaryWriter.Write(instance.Month);
            binaryWriter.Flush();
        }
    }

    public FiscalPeriod Deserialize(IColumnReader reader, ISerializationContext context)
    {
        using (var binaryReader = new BinaryReader(reader.BaseStream))
        {
var result = new FiscalPeriod(binaryReader.ReadInt16(), binaryReader.ReadInt16());
            return result;
        }
    }
}

Definovaný typ zahrnuje dvě čísla: čtvrtletí a měsíc. Zde jsou definovány operátory ==/!=/>/< a statická metoda ToString() .

Jak už bylo zmíněno dříve, UDT se dá použít ve výrazech SELECT, ale nedá se použít v OUTPUTTER/EXTRACTOR bez vlastní serializace. Buď musí být serializován jako řetězec s ToString() nebo použit s vlastním OUTPUTTER/EXTRACTOR.

Teď si probereme použití UDT. V části s kódem na pozadí jsme změnili funkci GetFiscalPeriod na následující:

public static FiscalPeriod GetFiscalPeriodWithCustomType(DateTime dt)
{
    int FiscalMonth = 0;
    if (dt.Month < 7)
    {
        FiscalMonth = dt.Month + 6;
    }
    else
    {
        FiscalMonth = dt.Month - 6;
    }

    int FiscalQuarter = 0;
    if (FiscalMonth >= 1 && FiscalMonth <= 3)
    {
        FiscalQuarter = 1;
    }
    if (FiscalMonth >= 4 && FiscalMonth <= 6)
    {
        FiscalQuarter = 2;
    }
    if (FiscalMonth >= 7 && FiscalMonth <= 9)
    {
        FiscalQuarter = 3;
    }
    if (FiscalMonth >= 10 && FiscalMonth <= 12)
    {
        FiscalQuarter = 4;
    }

    return new FiscalPeriod(FiscalQuarter, FiscalMonth);
}

Jak vidíte, vrátí hodnotu našeho typu FiscalPeriod.

Tady uvádíme příklad dalšího použití v základním skriptu U-SQL. Tento příklad ukazuje různé formy volání UDT ze skriptu U-SQL.

DECLARE @input_file string = @"c:\work\cosmos\usql-programmability\input_file.tsv";
DECLARE @output_file string = @"c:\work\cosmos\usql-programmability\output_file.tsv";

@rs0 =
    EXTRACT
        guid string,
        dt DateTime,
        user String,
        des String
    FROM @input_file USING Extractors.Tsv();

@rs1 =
    SELECT
        guid AS start_id,
        dt,
        DateTime.Now.ToString("M/d/yyyy") AS Nowdate,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt).Quarter AS fiscalquarter,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt).Month AS fiscalmonth,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt) + new USQL_Programmability.CustomFunctions.FiscalPeriod(1,7) AS fiscalperiod_adjusted,
        user,
        des
    FROM @rs0;

@rs2 =
    SELECT
        start_id,
        dt,
        DateTime.Now.ToString("M/d/yyyy") AS Nowdate,
        fiscalquarter,
        fiscalmonth,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt).ToString() AS fiscalperiod,

           // This user-defined type was created in the prior SELECT.  Passing the UDT to this subsequent SELECT would have failed if the UDT was not annotated with an IFormatter.
           fiscalperiod_adjusted.ToString() AS fiscalperiod_adjusted,
           user,
           des
    FROM @rs1;

OUTPUT @rs2
    TO @output_file
    USING Outputters.Text();

Tady je příklad úplného oddílu kódu na pozadí:

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;

namespace USQL_Programmability
{
    public class CustomFunctions
    {
        static public DateTime? ToDateTime(string dt)
        {
            DateTime dtValue;

            if (!DateTime.TryParse(dt, out dtValue))
                return Convert.ToDateTime(dt);
            else
                return null;
        }

        public static FiscalPeriod GetFiscalPeriodWithCustomType(DateTime dt)
        {
            int FiscalMonth = 0;
            if (dt.Month < 7)
            {
                FiscalMonth = dt.Month + 6;
            }
            else
            {
                FiscalMonth = dt.Month - 6;
            }

            int FiscalQuarter = 0;
            if (FiscalMonth >= 1 && FiscalMonth <= 3)
            {
                FiscalQuarter = 1;
            }
            if (FiscalMonth >= 4 && FiscalMonth <= 6)
            {
                FiscalQuarter = 2;
            }
            if (FiscalMonth >= 7 && FiscalMonth <= 9)
            {
                FiscalQuarter = 3;
            }
            if (FiscalMonth >= 10 && FiscalMonth <= 12)
            {
                FiscalQuarter = 4;
            }

            return new FiscalPeriod(FiscalQuarter, FiscalMonth);
        }        [SqlUserDefinedType(typeof(FiscalPeriodFormatter))]
        public struct FiscalPeriod
        {
            public int Quarter { get; private set; }

            public int Month { get; private set; }

            public FiscalPeriod(int quarter, int month):this()
            {
                this.Quarter = quarter;
                this.Month = month;
            }

            public override bool Equals(object obj)
            {
                if (ReferenceEquals(null, obj))
                {
                    return false;
                }

                return obj is FiscalPeriod && Equals((FiscalPeriod)obj);
            }

            public bool Equals(FiscalPeriod other)
            {
return this.Quarter.Equals(other.Quarter) &&    this.Month.Equals(other.Month);
            }

            public bool GreaterThan(FiscalPeriod other)
            {
return this.Quarter.CompareTo(other.Quarter) > 0 || this.Month.CompareTo(other.Month) > 0;
            }

            public bool LessThan(FiscalPeriod other)
            {
return this.Quarter.CompareTo(other.Quarter) < 0 || this.Month.CompareTo(other.Month) < 0;
            }

            public override int GetHashCode()
            {
                unchecked
                {
                    return (this.Quarter.GetHashCode() * 397) ^ this.Month.GetHashCode();
                }
            }

            public static FiscalPeriod operator +(FiscalPeriod c1, FiscalPeriod c2)
            {
return new FiscalPeriod((c1.Quarter + c2.Quarter) > 4 ? (c1.Quarter + c2.Quarter)-4 : (c1.Quarter + c2.Quarter), (c1.Month + c2.Month) > 12 ? (c1.Month + c2.Month) - 12 : (c1.Month + c2.Month));
            }

            public static bool operator ==(FiscalPeriod c1, FiscalPeriod c2)
            {
                return c1.Equals(c2);
            }

            public static bool operator !=(FiscalPeriod c1, FiscalPeriod c2)
            {
                return !c1.Equals(c2);
            }
            public static bool operator >(FiscalPeriod c1, FiscalPeriod c2)
            {
                return c1.GreaterThan(c2);
            }
            public static bool operator <(FiscalPeriod c1, FiscalPeriod c2)
            {
                return c1.LessThan(c2);
            }
            public override string ToString()
            {
                return (String.Format("Q{0}:P{1}", this.Quarter, this.Month));
            }

        }

        public class FiscalPeriodFormatter : IFormatter<FiscalPeriod>
        {
public void Serialize(FiscalPeriod instance, IColumnWriter writer, ISerializationContext context)
            {
                using (var binaryWriter = new BinaryWriter(writer.BaseStream))
                {
                    binaryWriter.Write(instance.Quarter);
                    binaryWriter.Write(instance.Month);
                    binaryWriter.Flush();
                }
            }

public FiscalPeriod Deserialize(IColumnReader reader, ISerializationContext context)
            {
                using (var binaryReader = new BinaryReader(reader.BaseStream))
                {
var result = new FiscalPeriod(binaryReader.ReadInt16(), binaryReader.ReadInt16());
                    return result;
                }
            }
        }
    }
}

Použití uživatelem definovaných agregací: UDAGG

Uživatelem definované agregace jsou všechny funkce související s agregací, které nejsou dodávány předem s U-SQL. Příkladem může být agregace pro provádění vlastních matematických výpočtů, zřetězení řetězců, manipulace s řetězci atd.

Uživatelsky definovaná definice agregační základní třídy je následující:

    [SqlUserDefinedAggregate]
    public abstract class IAggregate<T1, T2, TResult> : IAggregate
    {
        protected IAggregate();

        public abstract void Accumulate(T1 t1, T2 t2);
        public abstract void Init();
        public abstract TResult Terminate();
    }

SqlUserDefinedAggregate označuje, že typ by měl být zaregistrován jako uživatelem definovaná agregace. Tuto třídu nelze dědit.

Atribut SqlUserDefinedType je pro definici UDAGG volitelný .

Základní třída umožňuje předat tři abstraktní parametry: dva jako vstupní parametry a jeden jako výsledek. Datové typy jsou proměnné a měly by být definovány během dědičnosti tříd.

public class GuidAggregate : IAggregate<string, string, string>
{
    string guid_agg;

    public override void Init()
    { … }

    public override void Accumulate(string guid, string user)
    { … }

    public override string Terminate()
    { … }
}
  • Init vyvolá během výpočtu jednou pro každou skupinu. Poskytuje inicializační rutinu pro každou skupinu agregace.
  • Funkce Kumulace se provede jednou pro každou hodnotu. Poskytuje hlavní funkce pro algoritmus agregace. Dá se použít k agregaci hodnot s různými datovými typy, které jsou definovány během dědičnosti tříd. Může přijímat dva parametry proměnných datových typů.
  • Funkce Terminate se provede jednou pro každou skupinu agregace na konci zpracování, aby se vystavil výsledek pro každou skupinu.

Pokud chcete deklarovat správné vstupní a výstupní datové typy, použijte definici třídy následujícím způsobem:

public abstract class IAggregate<T1, T2, TResult> : IAggregate
  • T1: První parametr, který se má nahromadit
  • T2: Druhý parametr, který se má nahromadit
  • TResult: Návratový typ ukončení

Příklad:

public class GuidAggregate : IAggregate<string, int, int>

nebo

public class GuidAggregate : IAggregate<string, string, string>

Použití UDAGG v U-SQL

Chcete-li použít UDAGG, nejprve jej definujte v kódu na pozadí nebo na něj odkazujte z existující programovatelné knihovny DLL, jak bylo popsáno výše.

Pak použijte následující syntaxi:

AGG<UDAGG_functionname>(param1,param2)

Tady je příklad UDAGG:

public class GuidAggregate : IAggregate<string, string, string>
{
    string guid_agg;

    public override void Init()
    {
        guid_agg = "";
    }

    public override void Accumulate(string guid, string user)
    {
        if (user.ToUpper()== "USER1")
        {
            guid_agg += "{" + guid + "}";
        }
    }

    public override string Terminate()
    {
        return guid_agg;
    }

}

A základní skript U-SQL:

DECLARE @input_file string = @"\usql-programmability\input_file.tsv";
DECLARE @output_file string = @" \usql-programmability\output_file.tsv";

@rs0 =
    EXTRACT
            guid string,
            dt DateTime,
            user String,
            des String
    FROM @input_file
    USING Extractors.Tsv();

@rs1 =
    SELECT
        user,
        AGG<USQL_Programmability.GuidAggregate>(guid,user) AS guid_list
    FROM @rs0
    GROUP BY user;

OUTPUT @rs1 TO @output_file USING Outputters.Text();

V tomto scénáři použití zřetězujeme identifikátory GUID tříd pro konkrétní uživatele.

Další kroky