Udostępnij za pośrednictwem


Rozwijanie składnik transformacja niestandardowe z asynchronicznych wyjścia

Używasz składnika z asynchronicznego wyjścia podczas transformacja nie może wyprowadzić wierszy do momentu składnik otrzymała jej wprowadzania wierszy lub podczas transformacja nie daje dokładnie jednego wiersza danych wyjściowych dla każdego wiersza odebranego jako dane wejściowe.transformacja agregacja, na przykład, nie można obliczyć sumę w wierszach, dopóki nie ma on odczytu wszystkich wierszy.Z drugiej strony można użyć składnika z synchronicznej wyjść dowolnym czasie po zmodyfikowaniu każdy wiersz danych jako przechodząca.Można zmodyfikować danych dla każdego wiersza w miejscu lub można utworzyć jeden lub więcej nowych kolumn, z których każda ma wartość dla każdego wprowadzania wierszy.Aby uzyskać więcej informacji na temat różnicy między składnikami synchronicznych i asynchronicznych zobacz Opis przekształcenia synchroniczne i asynchroniczne.

Składniki transformacja z wyjść asynchroniczne są unikatowe, ponieważ działają one jako obiekt obiekt docelowy i źródłowe składników.Ten rodzaj składnika odbiera wierszy ze składników "pod prąd" i dodaje wiersze, które są zużywane według składników niższego rzędu.Nie innych składników przepływ danych wykonuje obie te operacje.

Kolumny z "pod prąd" składniki, które są dostępne dla składnika o wyjść synchroniczne są automatycznie dostępne dla składników w kierunku od składnika.Składnik z synchronicznej wyjść z tego powodu, nie ma do definiowania każdej kolumny wyjściowe kolumn i wierszy do następnego składnika.Składniki z asynchronicznego wyjść z drugiej strony, muszą zdefiniować kolumny wyjściowe i podaj wiersze, które mają zostać składniki niższego rzędu.Dlatego składnik z wyjść asynchronicznego ma więcej zadań do wykonania podczas projektowania i czas wykonania, a deweloper składnika ma więcej kodu do wykonania.

SQL Server Integration Services zawiera kilka przekształceń z asynchronicznego wyjść.Na przykład transformacja sortowanie wymaga jego wierszy można posortować je i uzyskuje to przy użyciu asynchronicznej wyjść.Po odebraniu wszystkich wierszy, sortuje je i dodaje je do swoich danych wyjściowych.

W tej sekcji omówiono szczegółowo opracowanie transformacji z asynchronicznego wyjść.Aby składnik transformacja próbki z wyjść asynchronicznych zobacz Remove Duplicates Component Sample. Aby uzyskać więcej informacji na temat rozwoju składnika urządzenie źródłowe zobacz Developing a Custom Source Component.

Czas projektowania

Tworzenie składnika

The SynchronousInputID() właściwość on the IDTSOutput100 object identifies whether an output is synchronous or asynchronous. Aby utworzyć asynchronicznej danych wyjściowych, dodać dane wyjściowe składnik i zestaw SynchronousInputID() wartość zero. Ustawienie tej właściwość również określa, czy zadanie przepływ danych przydziela PipelineBuffer obiekty danych wejściowych i wyjściowych części, lub czy pojedynczego buforu jest przydzielane i udostępniane między dwoma obiektami.

Poniższy przykład kodu pokazuje składnik, który tworzy asynchronicznego dane wyjściowe w jego ProvideComponentProperties() Implementacja.

using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;


namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "AsyncComponent",ComponentType = ComponentType.Transform)]
    public class AsyncComponent : PipelineComponent
    {
        public override void ProvideComponentProperties()
        {
            // Call the base class, which adds a synchronous input
            // and output.
            base.ProvideComponentProperties();

            // Make the output asynchronous.
            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
            output.SynchronousInputID = 0;
        }
    }
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime

<DtsPipelineComponent(DisplayName:="AsyncComponent", ComponentType:=ComponentType.Transform)> _
Public Class AsyncComponent
    Inherits PipelineComponent

    Public Overrides Sub ProvideComponentProperties()

        ' Call the base class, which adds a synchronous input
        ' and output.
        Me.ProvideComponentProperties()

        ' Make the output asynchronous.
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
        output.SynchronousInputID = 0

    End Sub


End Class

Tworzenie i Konfigurowanie kolumny wyjściowe

Jak wspomniano wcześniej, składnik asynchronicznego dodaje kolumna do jej kolekcja kolumn wyjściowych zapewniające kolumn do niższego rzędu składników.Istnieje kilka metod czas projektowania, które można wybierać w zależności od potrzeb składnika.Na przykład, jeśli chcesz przekazać do niższego rzędu składników wszystkie kolumny z "pod prąd" składniki, można będzie zastąpić OnInputPathAttached(Int32) Metoda Dodaj kolumny, ponieważ jest to pierwsza metoda, w którym są dostępne dla składnika kolumn danych wejściowych.

Jeśli składnik tworzy dane wyjściowe kolumny oparte na kolumny wybrane do swojego wejścia, Zastąp SetUsageType(Int32, IDTSVirtualInput100, Int32, DTSUsageType) Metoda, aby zaznaczyć kolumny wyjściowe i aby wskazać, w jaki sposób będą używane.

Jeśli składnik z asynchronicznego wyjść tworzy na podstawie kolumn ze składników "pod prąd" kolumna wyjściowe i zmienić dostępnych kolumn "pod prąd", składnik powinni zaktualizować swojej kolekcja kolumn wyjściowych.Te zmiany powinna zostać wykryta przez składnik podczas Validate()i stałe w okresie ReinitializeMetaData().

Uwaga

Jeśli kolumna wyprowadzenia zostanie usunięty z kolekcja kolumn wyjściowych, niższego rzędu składników w przepływie danych, które się odwołać do kolumny są pogorszeniu.Kolumna wyprowadzenia musi zostać naprawiona bez usunięcie i ponowne utworzenie kolumnę, aby uniknąć dzielenia niższego rzędu składników.Na przykład jeśli typ danych kolumna został zmieniony, należy zaktualizować typu danych.

Poniższy przykład kodu pokazuje składnik, który dodaje kolumna danych wyjściowych do swojej kolekcja kolumn wyjściowych dla każdej kolumna, które są dostępne od składnika nadrzędnego.

public override void OnInputPathAttached(int inputID)
{
   IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
   IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
   IDTSVirtualInput100 vInput = input.GetVirtualInput();

   foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)
   {
      IDTSOutputColumn100 outCol = output.OutputColumnCollection.New();
      outCol.Name = vCol.Name;
      outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage);
   }
}
Public Overrides Sub OnInputPathAttached(ByVal inputID As Integer)

    Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
    Dim vInput As IDTSVirtualInput100 = input.GetVirtualInput()

    For Each vCol As IDTSVirtualInputColumn100 In vInput.VirtualInputColumnCollection

        Dim outCol As IDTSOutputColumn100 = output.OutputColumnCollection.New()
        outCol.Name = vCol.Name
        outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage)

    Next
End Sub

Czas uruchomienia

Składniki z asynchronicznego wyjść również wykonać inną kombinację metod przy uruchomieniu czas niż inne rodzaje składników.Po pierwsze, są one jedynymi składnikami, które odebrać połączenia obu PrimeOutput(Int32, array<Int32[], array<PipelineBuffer[]) oraz ProcessInput(Int32, PipelineBuffer) metody. Składniki z asynchronicznego wyjść również wymagają dostępu do wszystkich przychodzących wierszy przed ich rozpoczęciem przetwarzania, dlatego muszą buforują wprowadzania wierszy wewnętrznie aż wszystkie wiersze zostały przeczytane.Na koniec w przeciwieństwie do innych składników składniki z asynchronicznego wyjść otrzymywać bufor wejściowy przez i bufor wyjściowy przez.

Opis buforów

Bufor wejściowy jest odbierane przez składnik podczas ProcessInput(Int32, PipelineBuffer). Ten bufor zawiera wiersze, dodaje do bufora przez składniki "pod prąd".Bufor zawiera także kolumny składnika danych wejściowych, oprócz kolumn, które zostały dostarczone w danych wyjściowych składnika "pod prąd", ale nie zostały dodane do kolekcja wejściowe składnika asynchronicznego.

Bufor wyjściowy, który jest dostarczany do składnika w PrimeOutput(Int32, array<Int32[], array<PipelineBuffer[]), początkowo nie zawiera wierszy. Składnik dodaje wierszy do tego buforu i zapewnia buforu do składników niższego rzędu, po jego zapełnieniu.Bufor wyjściowy zawiera kolumna zdefiniowane w kolekcja kolumn wyjściowych składnika, oprócz żadnych kolumn, innymi składnikami za zostały dodane do ich wydajność.

Jest to inne zachowanie niż składniki z wyjść synchroniczne, które otrzymują pojedynczego buforu udostępnionych.Udostępniony bufor składnika z synchronicznej wyjście zawiera wejściowe i wyjściowe kolumny składnika, oprócz kolumn dodany do wyjścia składników "pod prąd" i niższego rzędu.

Przetwarzanie wierszy

Buforowanie wprowadzania wierszy

Podczas pisania składnik z wyjść asynchronicznych, masz trzy opcje dodawania wierszy do buforu wyjściowego.Możesz dodać je jako wprowadzania wierszy są odbierane, mogą buforować je aż do chwili, kiedy składnik odebrał wszystkie wiersze od składnika nadrzędnego lub można je dodać, gdy jest to zrobić dla składnika.Metoda, która zostanie wybrana opcja zależy od wymagań składnika.Na przykład składnik sortowanie wymaga otrzymania wszystkie poprzednie wiersze przed mogły być sortowane.Dlatego też czeka, aż wszystkie wiersze zostały przeczytane przed dodaniem do buforu wyjściowego wierszy.

Wiersze, które zostały odebrane w buforze wejściowym musi być buforowane wewnętrznie przez składnik dopóki jest gotowa do ich przetworzenia.Wiersze buforu przychodzące mogą być buforowane w tabela danych, Tablica wielowymiarowa lub inne struktury wewnętrznej.Na przykład składnik, który buforuje przychodzące wiersze buforu wewnętrznie, dopóki wszystkie wiersze zostały przeczytane, zobacz temat Remove Duplicates Component Sample.

Dodawanie wierszy danych wyjściowych

Czy można dodawać wierszy do buforu wyjściowego otrzymywanych lub po otrzymaniu wszystkie wiersze, tak zrobisz, wywołując AddRow() Metoda na bufor wyjściowy. Po dodaniu w wierszu, który zestaw wartości dla każdej z kolumn w nowym wierszu.

Ze względu na to, że dostępne są czasami więcej kolumn w bufor wyjściowy niż w kolekcja kolumn wyjściowych składnika, musisz zlokalizować indeks odpowiedniej kolumna w buforze, zanim zestaw jej wartość.The FindColumnByLineageID(Int32, Int32) metoda of the BufferManager() właściwość returns the index of the kolumna in the buffer row with the specified lineage ID, which is then used to assign the value to the buffer kolumna.

The PreExecute() metoda, which is called before the PrimeOutput(Int32, array<Int32[], array<PipelineBuffer[]) metoda or the ProcessInput(Int32, PipelineBuffer) metoda, is the first metoda where the BufferManager() właściwość is available, and the first opportunity to locate the indexes of the columns in the input and output buffers.

Przykład

Następujący przykład przedstawia składnik proste transformacja z asynchronicznego wyjść dodający wiersze buforu wyjściowego odbieranych.W tym przykładzie nie wykazują, metody i funkcje omówione w tym temacie.Pokazano, ważne metody każdy składnik transformacja niestandardowe z asynchronicznego wyjść musi zastąpić, ale nie zawiera kodu do projektu-czas sprawdzania poprawności.Ponadto kod w ProcessInput(Int32, PipelineBuffer) zakłada się, że kolekcja kolumn wyjściowych ma jedną kolumna, dla każdej kolumna w kolekcja kolumn danych wejściowych. Aby składnik transformacja całą próbkę z wyjść asynchronicznego Zobacz Remove Duplicates Component Sample.

using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace Microsoft.Samples.SqlServer.Dts
{
   [DtsPipelineComponent(DisplayName = "AsynchronousOutput")]
   public class AsynchronousOutput : PipelineComponent
   {
      PipelineBuffer outputBuffer;
      int[] inputColumnBufferIndexes;
      int[] outputColumnBufferIndexes;

      public override void ProvideComponentProperties()
      {
         // Let the base class add the input and output objects.
         base.ProvideComponentProperties();

         // Name the input and output, and make the
         // output asynchronous.
         ComponentMetaData.InputCollection[0].Name = "Input";
         ComponentMetaData.OutputCollection[0].Name = "AsyncOutput";
         ComponentMetaData.OutputCollection[0].SynchronousInputID = 0;
      }
      public override void PreExecute()
      {
         IDTSInput100 input = ComponentMetaData.InputCollection[0];
         IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

         inputColumnBufferIndexes = new int[input.InputColumnCollection.Count];
         outputColumnBufferIndexes = new int[output.OutputColumnCollection.Count];

         for (int col = 0; col < input.InputColumnCollection.Count; col++)
            inputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[col].LineageID);

         for (int col = 0; col < output.OutputColumnCollection.Count; col++)
            outputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[col].LineageID);

      }

      public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
      {
         if (buffers.Length != 0)
            outputBuffer = buffers[0];
      }
      public override void ProcessInput(int inputID, PipelineBuffer buffer)
      {
            // Advance the buffer to the next row.
            while (buffer.NextRow())
            {
               // Add a row to the output buffer.
               outputBuffer.AddRow();
               for (int x = 0; x < inputColumnBufferIndexes.Length; x++)
               {
                  // Copy the data from the input buffer column to the output buffer column.
                  outputBuffer[outputColumnBufferIndexes[x]] = buffer[inputColumnBufferIndexes[x]];
               }
            }
         if (buffer.EndOfRowset)
         {
            // EndOfRowset on the input buffer is true.
            // Set EndOfRowset on the output buffer.
            outputBuffer.SetEndOfRowset();
         }
      }
   }
}
Imports System
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper

Namespace Microsoft.Samples.SqlServer.Dts

    <DtsPipelineComponent(DisplayName:="AsynchronousOutput")> _
    Public Class AsynchronousOutput


        Inherits PipelineComponent

        Private outputBuffer As PipelineBuffer
        Private inputColumnBufferIndexes As Integer()
        Private outputColumnBufferIndexes As Integer()

        Public Overrides Sub ProvideComponentProperties()

            ' Let the base class add the input and output objects.
            Me.ProvideComponentProperties()

            ' Name the input and output, and make the
            ' output asynchronous.
            ComponentMetaData.InputCollection(0).Name = "Input"
            ComponentMetaData.OutputCollection(0).Name = "AsyncOutput"
            ComponentMetaData.OutputCollection(0).SynchronousInputID = 0
        End Sub

        Public Overrides Sub PreExecute()

            Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
            Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)

            ReDim inputColumnBufferIndexes(input.InputColumnCollection.Count)
            ReDim outputColumnBufferIndexes(output.OutputColumnCollection.Count)

            For col As Integer = 0 To input.InputColumnCollection.Count
                inputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(col).LineageID)
            Next

            For col As Integer = 0 To output.OutputColumnCollection.Count
                outputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(col).LineageID)
            Next

        End Sub
        Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())

            If buffers.Length <> 0 Then
                outputBuffer = buffers(0)
            End If

        End Sub

        Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)

                ' Advance the buffer to the next row.
                While (buffer.NextRow())

                    ' Add a row to the output buffer.
                    outputBuffer.AddRow()
                    For x As Integer = 0 To inputColumnBufferIndexes.Length

                        ' Copy the data from the input buffer column to the output buffer column.
                        outputBuffer(outputColumnBufferIndexes(x)) = buffer(inputColumnBufferIndexes(x))

                    Next
                End While

            If buffer.EndOfRowset = True Then
                ' EndOfRowset on the input buffer is true.
                ' Set the end of row set on the output buffer.
                outputBuffer.SetEndOfRowset()
            End If
        End Sub
    End Class
End Namespace
Integration Services icon (small) Konfiguracja w aktualizacji z usług integracja Services pobytu

Najnowsze pliki do pobrania, artykuły, próbki, i pliki wideo firmy Microsoft, jak również wybranego rozwiązania od Wspólnoty, odwiedź witrynę Integration Services strona w witrynie MSDN lub TechNet:

Automatycznego powiadomienie tych aktualizacji należy subskrybować źródła danych RSS, które jest dostępne strona.