Rozwijanie składnika przekształcenie niestandardowe wyjście asynchroniczne
Użyciu składnik asynchronicznego wyjść podczas transformacja nie można wyprowadzić wierszy, dopóki składnik otrzymał jego wprowadzania wierszy lub transformacja nie produkuje dokładnie jeden wiersz danych wyjściowych dla każdego wiersza odebranego jako danych wejściowych.transformacja agregacji, na przykład nie można obliczyć sumę wierszach dopóki odczyta wszystkie wiersze.Natomiast można użyć składnika z synchronicznej wyjść każdym czas po zmodyfikowaniu każdy wiersz danych, który przechodzi.Można zmodyfikować danych dla każdego wiersza w miejscu lub można utworzyć jeden lub więcej nowych kolumn, z których każda ma wartość dla każdego wprowadzania wierszy.Aby uzyskać więcej informacji o różnicach między składnikami synchroniczne i asynchroniczne, zobacz Opis przekształcenia synchroniczne i asynchroniczne.
Składniki transformacji z wyjść asynchroniczne są unikatowe, ponieważ działają jako zarówno docelowego i źródło składników.Tego rodzaju składnika odbiera wiersze z nadrzędny składników i dodaje wierszy, które są zużywane przez składniki podrzędne.Żaden inny składnik przepływ danych wykonuje obie te operacje.
Kolumny nadrzędny składników, które są dostępne dla składnika wyjść synchroniczne są automatycznie dostępne składniki rzeki od składnika.Składnik z wyjść synchroniczne nie więc do definiowania kolumn wyjociowych kolumn i wierszy do następnego składnika.Składniki z asynchronicznego wyjść z drugiej strony, musi zdefiniować kolumn wyjściowych i zapewniają wiersze składniki podrzędne.Dlatego składnik z wyjść asynchronicznego ma więcej zadań do wykonania podczas projektowania i wykonywania czas, a kod w celu zaimplementowania deweloper składnika.
SQL Server Integration Services contains several transformations with asynchronous outputs.Na przykład transformacja sortowania wymaga jego wierszy można sortować je i uzyskuje to przy użyciu asynchronicznej wyjść.Po odebraniu jego wierszy sortuje je i dodaje je do swoich danych wyjściowych.
W tej sekcji omówiono szczegółowo rozwijać przekształceń wyjść asynchronicznego.Dla składnika transformacja próbki z wyjść asynchronicznych, zobacz Integration Services próbki na witrynie Codeplex.Aby uzyskać więcej informacji na temat źródło składnika rozwoju Zobacz Rozwijanie niestandardowy składnik źródła.
Czas projektowania
Tworzenie składnika
SynchronousInputID Właściwość IDTSOutput100 identyfikuje obiekt, czy wyjście jest synchroniczna lub asynchroniczna.Tworzenie wyjście asynchroniczne, dodać dane wyjściowe składnika i zestaw SynchronousInputID do zera.Ustawienie tej właściwość również określa czy zadanie przepływ danych przydziela PipelineBuffer obiekty wejściowe i wyjściowe składnika, lub czy pojedynczego buforu jest przydzielane i udostępniane między dwoma obiektami.
Poniższy przykład kodu pokazuje składnika, który tworzy asynchronicznych danych wyjściowych w jego ProvideComponentProperties implementacji.
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "AsyncComponent",ComponentType = ComponentType.Transform)]
public class AsyncComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Call the base class, which adds a synchronous input
// and output.
base.ProvideComponentProperties();
// Make the output asynchronous.
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
output.SynchronousInputID = 0;
}
}
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
<DtsPipelineComponent(DisplayName:="AsyncComponent", ComponentType:=ComponentType.Transform)> _
Public Class AsyncComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Call the base class, which adds a synchronous input
' and output.
Me.ProvideComponentProperties()
' Make the output asynchronous.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
output.SynchronousInputID = 0
End Sub
End Class
Tworzenie i konfigurowanie kolumn wyjściowych
Jak wspomniano wcześniej, asynchronicznego składnika dodaje kolumna do jej kolekcja kolumn wyjściowych zapewniające kolumn components niższego rzędu.Istnieje kilka projektów -czas metod do wyboru, w zależności od potrzeb składnika.Na przykład, jeśli chcesz przekazać wszystkie kolumny ze składników nadrzędny składników niższego rzędu, byłoby zastąpienie OnInputPathAttached metody dodać kolumny, ponieważ jest to pierwsza metoda, w której kolumny danych wejściowych są dostępne do składnika.
Jeśli składnik tworzy wyjściowy kolumny oparte na kolumny wybrane do jego wprowadzania, zastąpić SetUsageType Metoda wybierz kolumn wyjściowych i wskazać, jaki będzie używany.
Składnik z wyjść asynchronicznego tworzy kolumn wyjściowych na podstawie kolumn ze składników kopalnianych i zmienić dostępne kolumna nadrzędny, składnik należy zaktualizować jej kolekcja kolumn wyjściowych.Zmiany te powinny być wykryta przez części podczas Validatei stałe podczas ReinitializeMetaData.
Ostrzeżenie
Po usunięciu kolumna wyprowadzenia z kolekcja kolumn wyjściowych negatywnie wpływa na niższego rzędu składników przepływ danych, które odwołują się do kolumna.Kolumna wyprowadzenia musi zostać naprawiona bez usunięcia i ponownego tworzenia kolumny, aby uniknąć dzielenia składniki podrzędne.Na przykład typ danych kolumna została zmieniona, należy zaktualizować typu danych.
Poniższy przykład kodu pokazuje składnik, który dodaje do swojej kolekcja kolumn wyjściowych dla każdej kolumna Dostępne kolumna wyprowadzenia, od składnika nadrzędnego.
public override void OnInputPathAttached(int inputID)
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
IDTSVirtualInput100 vInput = input.GetVirtualInput();
foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)
{
IDTSOutputColumn100 outCol = output.OutputColumnCollection.New();
outCol.Name = vCol.Name;
outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage);
}
}
Public Overrides Sub OnInputPathAttached(ByVal inputID As Integer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
Dim vInput As IDTSVirtualInput100 = input.GetVirtualInput()
For Each vCol As IDTSVirtualInputColumn100 In vInput.VirtualInputColumnCollection
Dim outCol As IDTSOutputColumn100 = output.OutputColumnCollection.New()
outCol.Name = vCol.Name
outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage)
Next
End Sub
Czas jednostkowy
Składniki z wyjść asynchronicznego również wykonywać sekwencji różnych metod wykonywania czas niż inne rodzaje części składowych.Po pierwsze, są one jedynymi składnikami odbierające wywołania zarówno PrimeOutput i ProcessInput metody.Składniki z asynchronicznego wyjść również wymagają dostępu do wszystkich przychodzących wierszy przed ich rozpoczęciem przetwarzania; Dlatego należy buforują wprowadzania wierszy wewnętrznie aż wszystkie wiersze zostały przeczytane.Ostatecznie w przeciwieństwie do innych składników składników z wyjść asynchronicznego otrzymywać bufor wejściowy i bufor wyjściowy.
Opis buforów
Bufor wejściowy jest odbierane przez składnik podczas ProcessInput.Ten bufor zawiera wiersze dodane do bufora przez nadrzędny składników.Bufor zawiera także kolumny wprowadzania składnika, oprócz kolumn, które zostały dostarczone dane wyjściowe składnika nadrzędnego, ale nie zostały dodane do kolekcja wejściowe składnika asynchronicznego.
Bufor wyjściowy, który jest dostarczany do składnika w PrimeOutput, początkowo nie zawiera wierszy.Składnik dodaje wierszy w tym buforze i zapewnia bufor składniki podrzędne, gdy jest zapełniony.Bufor wyjściowy zawiera kolumna zdefiniowane kolekcja kolumn wyjściowych składnika, oprócz żadnych kolumn, które dodane do ich wyjść inne składniki podrzędne.
Jest to zachowanie różne od tej części z wyjść synchroniczne, które pojedynczego udostępniony bufor odbioru.Udostępniony bufor składnika synchroniczne wyjść zawiera wejściowe i wyjściowe kolumny składnika, oprócz kolumn dodawany do wyjść składniki nadrzędne i podrzędne.
Przetwarzanie wierszy
Buforowanie wprowadzania wierszy
Podczas pisania składnika wyjść asynchronicznego ma trzy opcje dodawania wierszy do buforu wyjściowego.Można je dodać otrzymanym wprowadzania wierszy, można buforować je do momentu składnik odebrał wszystkie wiersze od składnika nadrzędnego lub można je dodać, gdy jest to właściwe dla składnika.Wybór metoda zależy od wymagań składnika.Na przykład składnik sortowania wymaga otrzymania kopalnianych wierszy przed mogą być sortowane.Dlatego też czeka, aż wszystkie wiersze zostały odczytane przed dodaniem do buforu wyjściowego wierszy.
Wiersze, które zostały odebrane w buforze wejściowym musi wewnętrznie buforowane przez składnik do jest gotowa do ich przetworzenia.Wiersze buforu przychodzące mogą być buforowane w tabela danych, Tablica wielowymiarowa lub wewnętrznej struktury.Na przykład składnika, który buforuje przychodzące wiersze buforu wewnętrznie, dopóki wszystkie wiersze zostały przeczytane, zobacz Readme_Remove Duplicates Component Sample.
Dodawanie wierszy danych wyjściowych
Czy można dodawać wierszy do buforu wyjściowego są odbierane lub po otrzymaniu wszystkie wiersze, możesz zrobić wywołując AddRow Metoda buforu wyjściowego.Po dodaniu wiersza, który zestaw wartości każdej kolumna w nowym wierszu.
Ponieważ są czasami więcej kolumn w bufor wyjściowy niż kolekcja kolumna wyjściowe składnika, należy zlokalizować indeksu odpowiedniej kolumna w buforze przed zestaw jej wartość.FindColumnByLineageID metoda BufferManager Właściwość zwraca indeks kolumna w wierszu bufor o identyfikatorze określonego rodowodu, który jest następnie używana do przypisywania wartości do kolumn buforu.
PreExecute Metodę, która będzie wywoływana przed PrimeOutput metody lub ProcessInput , jest pierwsza metoda gdzie BufferManager właściwość jest dostępna, a pierwszą okazję do zlokalizowania indeksów kolumn w buforach wejściowe i wyjściowe.
Przykład
Poniżej pokazano składnik prostych transformacja z wyjść asynchronicznego dodający wiersze buforu wyjściowego są odbierane.W tym przykładzie nie wykazują metod i funkcje omówione w tym temacie.W ten sposób pokazujesz ważnych metod musi zastępować każdy składnik transformacja niestandardowe z asynchronicznego wyjść, ale nie zawiera kodu dla projektu -czas sprawdzania poprawności.Ponadto kod w ProcessInput zakłada, że kolekcja kolumn wyjściowych ma jedną kolumna dla każdej kolumna kolekcja kolumna danych wejściowych.Dla składnika transformacja pełną próbki z wyjść asynchronicznych, zobacz Readme_Remove Duplicates Component Sample.
using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "AsynchronousOutput")]
public class AsynchronousOutput : PipelineComponent
{
PipelineBuffer outputBuffer;
int[] inputColumnBufferIndexes;
int[] outputColumnBufferIndexes;
public override void ProvideComponentProperties()
{
// Let the base class add the input and output objects.
base.ProvideComponentProperties();
// Name the input and output, and make the
// output asynchronous.
ComponentMetaData.InputCollection[0].Name = "Input";
ComponentMetaData.OutputCollection[0].Name = "AsyncOutput";
ComponentMetaData.OutputCollection[0].SynchronousInputID = 0;
}
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
inputColumnBufferIndexes = new int[input.InputColumnCollection.Count];
outputColumnBufferIndexes = new int[output.OutputColumnCollection.Count];
for (int col = 0; col < input.InputColumnCollection.Count; col++)
inputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[col].LineageID);
for (int col = 0; col < output.OutputColumnCollection.Count; col++)
outputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[col].LineageID);
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
if (buffers.Length != 0)
outputBuffer = buffers[0];
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
// Advance the buffer to the next row.
while (buffer.NextRow())
{
// Add a row to the output buffer.
outputBuffer.AddRow();
for (int x = 0; x < inputColumnBufferIndexes.Length; x++)
{
// Copy the data from the input buffer column to the output buffer column.
outputBuffer[outputColumnBufferIndexes[x]] = buffer[inputColumnBufferIndexes[x]];
}
}
if (buffer.EndOfRowset)
{
// EndOfRowset on the input buffer is true.
// Set EndOfRowset on the output buffer.
outputBuffer.SetEndOfRowset();
}
}
}
}
Imports System
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace Microsoft.Samples.SqlServer.Dts
<DtsPipelineComponent(DisplayName:="AsynchronousOutput")> _
Public Class AsynchronousOutput
Inherits PipelineComponent
Private outputBuffer As PipelineBuffer
Private inputColumnBufferIndexes As Integer()
Private outputColumnBufferIndexes As Integer()
Public Overrides Sub ProvideComponentProperties()
' Let the base class add the input and output objects.
Me.ProvideComponentProperties()
' Name the input and output, and make the
' output asynchronous.
ComponentMetaData.InputCollection(0).Name = "Input"
ComponentMetaData.OutputCollection(0).Name = "AsyncOutput"
ComponentMetaData.OutputCollection(0).SynchronousInputID = 0
End Sub
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
ReDim inputColumnBufferIndexes(input.InputColumnCollection.Count)
ReDim outputColumnBufferIndexes(output.OutputColumnCollection.Count)
For col As Integer = 0 To input.InputColumnCollection.Count
inputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(col).LineageID)
Next
For col As Integer = 0 To output.OutputColumnCollection.Count
outputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(col).LineageID)
Next
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
If buffers.Length <> 0 Then
outputBuffer = buffers(0)
End If
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
' Advance the buffer to the next row.
While (buffer.NextRow())
' Add a row to the output buffer.
outputBuffer.AddRow()
For x As Integer = 0 To inputColumnBufferIndexes.Length
' Copy the data from the input buffer column to the output buffer column.
outputBuffer(outputColumnBufferIndexes(x)) = buffer(inputColumnBufferIndexes(x))
Next
End While
If buffer.EndOfRowset = True Then
' EndOfRowset on the input buffer is true.
' Set the end of row set on the output buffer.
outputBuffer.SetEndOfRowset()
End If
End Sub
End Class
End Namespace
|