次の方法で共有


データ フロー コンポーネントの実行時のメソッド

実行時に、データ フロー タスクは、コンポーネントの順序の確認、実行プランの準備、および作業プランを実行するワーカー スレッドのプールの管理を行います。タスクは、データの行を変換元から読み込み、変換を使用して処理し、変換先に保存します。

メソッドの実行順序

データ フロー コンポーネントの実行中に、PipelineComponent 基本クラス内のメソッドのサブセットが呼び出されます。呼び出されるメソッドとその順序は常に同じです。ただし、PrimeOutput および ProcessInput メソッドは例外です。これら 2 つのメソッドは、コンポーネントの IDTSInput90 および IDTSOutput90 オブジェクトの存在と構成に基づいて呼び出されます。

次の一覧では、コンポーネントの実行中に呼び出される順にメソッドを示します。PrimeOutput は、常に ProcessInput の前に呼び出されます。

PrimeOutput メソッド

PrimeOutput メソッドは、コンポーネントが少なくとも 1 つの出力をとり、その出力が IDTSPath90 オブジェクトを介して下流コンポーネントにアタッチされ、出力の SynchronousInputID プロパティが 0 の場合に呼び出されます。PrimeOutput メソッドは、非同期出力型の変換元コンポーネントと変換の場合に呼び出されます。後述の ProcessInput メソッドとは異なり、PrimeOutput メソッドは必要とされる各コンポーネントに対して 1 回だけ呼び出されます。

ProcessInput メソッド

ProcessInput メソッドは、IDTSPath90 オブジェクトによって上流コンポーネントにアタッチされる入力を、少なくとも 1 つとるコンポーネントに対して呼び出されます。ProcessInput メソッドは、同期出力型の変換先コンポーネントと変換に対して呼び出されます。ProcessInput は、処理する行が上流コンポーネントから渡されなくなるまで繰り返し呼び出されます。

入力と出力の処理

データ フロー コンポーネントは、実行時に次のタスクを実行します。

  • 変換コンポーネントは行を追加します。
  • 同期出力型の変換コンポーネントは、変換元コンポーネントから渡される行を受け取ります。
  • 非同期出力型の変換コンポーネントは、行を受け取って行を追加します。
  • 変換先コンポーネントは、行を受け取って変換先に読み込みます。

実行中、データ フロー タスクは、コンポーネントのシーケンスの出力列コレクションで定義されたすべての列を含む PipelineBuffer オブジェクトを割り当てます。たとえば、データ フロー シーケンス内の 4 つの各コンポーネントが 1 つの出力列をその出力列コレクションに追加した場合、各コンポーネントに提供されているバッファには 4 つの列、つまりコンポーネントごとに 1 つの出力列が格納されます。この動作のため、使用しない列を含むバッファをコンポーネントが受け取る場合があります。

コンポーネントが受け取るバッファにはコンポーネントが使用しない列が含まれる場合もあるので、データ フロー タスクがコンポーネントに提供するバッファ内のコンポーネントの入力および出力列のコレクションから、使用する列を検索する必要があります。これを行うには、BufferManager プロパティの FindColumnByLineageID メソッドを使用します。パフォーマンスの理由により、このタスクは通常、PrimeOutputProcessInput ではなく、PreExecute メソッドで実行します。

PreExecutePrimeOutput および ProcessInput メソッドの前に呼び出され、BufferManager がコンポーネントで使用できるようになった後に初めてコンポーネントで作業を実行できます。このメソッドの実行中、コンポーネントはバッファ内の列を検索してこの情報を内部的に格納し、列が PrimeOutput または ProcessInput メソッドのどちらかで使用できるようにします。

次のコード例では、PreExecute で、同期出力型の変換コンポーネントがバッファ内の入力列を検索する方法を示します。

private int []bufferColumnIndex;
public override void PreExecute()
{
    IDTSInput90 input = ComponentMetaData.InputCollection[0];
    bufferColumnIndex = new int[input.InputColumnCollection.Count];

    for( int x=0; x < input.InputColumnCollection.Count; x++)
    {
        IDTSInputColumn90 column = input.InputColumnCollection[x];
        bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);
    }
}
Dim bufferColumnIndex As Integer()

    Public Overrides Sub PreExecute()

        Dim input As IDTSInput90 = ComponentMetaData.InputCollection(0)

        ReDim bufferColumnIndex(input.InputColumnCollection.Count)

        For x As Integer = 0 To input.InputColumnCollection.Count

            Dim column As IDTSInputColumn90 = input.InputColumnCollection(x)
            bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)

        Next

    End Sub

行の追加

コンポーネントは、PipelineBuffer オブジェクトに行を追加することで、下流コンポーネントに行を提供します。データ フロー タスクは、下流コンポーネントに接続される IDTSOutput90 オブジェクトごとに 1 つずつ、出力バッファの配列を PrimeOutput メソッドに渡すパラメータとして提供します。非同期出力型の変換元コンポーネントと変換コンポーネントは、行をバッファに追加し、行の追加が完了したら SetEndOfRowset メソッドを呼び出します。データ フロー タスクは、コンポーネントに提供する出力バッファを管理し、バッファがいっぱいになると、バッファ内の行を自動的に次のコンポーネントに移動します。PrimeOutput メソッドは、繰り返し呼び出される ProcessInput メソッドと異なり、コンポーネントごとに 1 回ずつ呼び出されます。

次のコード例では、コンポーネントが PrimeOutput メソッドで出力バッファに行を追加し、SetEndOfRowset メソッドを呼び出す方法を示します。

public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)
{
    for( int x=0; x < outputs; x++ )
    {
        IDTSOutput90 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);
        PipelineBuffer buffer = buffers[x];

        // TODO: Add rows to the output buffer.
    }
    foreach( PipelineBuffer buffer in buffers )
    {
        /// Notify the data flow task that no more rows are coming.
        buffer.SetEndOfRowset();
    }
}
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)

    For x As Integer = 0 To outputs.MaxValue

        Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))
        Dim buffer As PipelineBuffer = buffers(x)

        ' TODO: Add rows to the output buffer.

    Next

    For Each buffer As PipelineBuffer In buffers

        ' Notify the data flow task that no more rows are coming.
        buffer.SetEndOfRowset()

    Next

End Sub

出力バッファに行を追加するコンポーネントの開発の詳細については、「カスタム変換元コンポーネントの開発」および「非同期出力型のカスタム変換コンポーネントの開発」を参照してください。

行の受信

コンポーネントは、PipelineBuffer オブジェクト内の上流コンポーネントから行を受け取ります。データ フロー タスクでは、上流コンポーネントがデータ フローに追加する行が含まれる PipelineBuffer オブジェクトが、ProcessInput メソッドに渡されるパラメータとして用意されています。この入力バッファを使用して、バッファ内の行および列を検証して変更することはできますが、行を追加したり削除することはできません。ProcessInput メソッドは、使用できるバッファがなくなるまで繰り返し呼び出されます。最後に呼び出されたとき、バッファの RowCount は 0 になり、EndOfRowset プロパティが true になります。バッファを次の行に進める NextRow メソッドを使用することにより、バッファ内の行のコレクションを反復処理できます。バッファがコレクション内の最後の行にある場合、このメソッドは false を返します。

次のコード例では、EndOfRowset プロパティが true を返すまで、コンポーネントが入力バッファ内の行を ProcessInput メソッドで処理する方法を示します。

public override void ProcessInput( int inputID, PipelineBuffer buffer )
{
    if( ! buffer.EndOfRowset)
    {
        IDTSInput90 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
        while( buffer.NextRow())
        {
            // TODO: Examine the columns in the current row.
        }
    }
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)

    If buffer.EndOfRowset = False Then
        Dim input As IDTSInput90 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
        While buffer.NextRow() = True

            ' TODO: Examine the columns in the current row.
        End While
    End If

End Sub

入力バッファ内の行を受け取るコンポーネントの開発の詳細については、「カスタム変換先コンポーネントの開発」および「同期出力型のカスタム変換コンポーネントの開発」を参照してください。

参照

概念

データ フロー コンポーネントのデザイン時のメソッド

ヘルプおよび情報

SQL Server 2005 の参考資料の入手