データ フロー コンポーネントの実行時のメソッド
実行時に、データ フロー タスクは、コンポーネントの順序の確認、実行プランの準備、および作業プランを実行するワーカー スレッドのプールの管理を行います。タスクは、データの行を変換元から読み込み、変換を使用して処理し、変換先に保存します。
メソッドの実行順序
データ フロー コンポーネントの実行中に、PipelineComponent 基本クラス内のメソッドのサブセットが呼び出されます。呼び出されるメソッドとその順序は常に同じです。ただし、PrimeOutput および ProcessInput メソッドは例外です。これら 2 つのメソッドは、コンポーネントの IDTSInput90 および IDTSOutput90 オブジェクトの存在と構成に基づいて呼び出されます。
次の一覧では、コンポーネントの実行中に呼び出される順にメソッドを示します。PrimeOutput は、常に ProcessInput の前に呼び出されます。
- AcquireConnections
- Validate
- ReleaseConnections
- PrepareForExecute
- AcquireConnections
- PreExecute
- PrimeOutput
- ProcessInput
- PostExecute
- ReleaseConnections
- Cleanup
PrimeOutput メソッド
PrimeOutput メソッドは、コンポーネントが少なくとも 1 つの出力をとり、その出力が IDTSPath90 オブジェクトを介して下流コンポーネントにアタッチされ、出力の SynchronousInputID プロパティが 0 の場合に呼び出されます。PrimeOutput メソッドは、非同期出力型の変換元コンポーネントと変換の場合に呼び出されます。後述の ProcessInput メソッドとは異なり、PrimeOutput メソッドは必要とされる各コンポーネントに対して 1 回だけ呼び出されます。
ProcessInput メソッド
ProcessInput メソッドは、IDTSPath90 オブジェクトによって上流コンポーネントにアタッチされる入力を、少なくとも 1 つとるコンポーネントに対して呼び出されます。ProcessInput メソッドは、同期出力型の変換先コンポーネントと変換に対して呼び出されます。ProcessInput は、処理する行が上流コンポーネントから渡されなくなるまで繰り返し呼び出されます。
入力と出力の処理
データ フロー コンポーネントは、実行時に次のタスクを実行します。
- 変換コンポーネントは行を追加します。
- 同期出力型の変換コンポーネントは、変換元コンポーネントから渡される行を受け取ります。
- 非同期出力型の変換コンポーネントは、行を受け取って行を追加します。
- 変換先コンポーネントは、行を受け取って変換先に読み込みます。
実行中、データ フロー タスクは、コンポーネントのシーケンスの出力列コレクションで定義されたすべての列を含む PipelineBuffer オブジェクトを割り当てます。たとえば、データ フロー シーケンス内の 4 つの各コンポーネントが 1 つの出力列をその出力列コレクションに追加した場合、各コンポーネントに提供されているバッファには 4 つの列、つまりコンポーネントごとに 1 つの出力列が格納されます。この動作のため、使用しない列を含むバッファをコンポーネントが受け取る場合があります。
コンポーネントが受け取るバッファにはコンポーネントが使用しない列が含まれる場合もあるので、データ フロー タスクがコンポーネントに提供するバッファ内のコンポーネントの入力および出力列のコレクションから、使用する列を検索する必要があります。これを行うには、BufferManager プロパティの FindColumnByLineageID メソッドを使用します。パフォーマンスの理由により、このタスクは通常、PrimeOutput や ProcessInput ではなく、PreExecute メソッドで実行します。
PreExecute は PrimeOutput および ProcessInput メソッドの前に呼び出され、BufferManager がコンポーネントで使用できるようになった後に初めてコンポーネントで作業を実行できます。このメソッドの実行中、コンポーネントはバッファ内の列を検索してこの情報を内部的に格納し、列が PrimeOutput または ProcessInput メソッドのどちらかで使用できるようにします。
次のコード例では、PreExecute で、同期出力型の変換コンポーネントがバッファ内の入力列を検索する方法を示します。
private int []bufferColumnIndex;
public override void PreExecute()
{
IDTSInput90 input = ComponentMetaData.InputCollection[0];
bufferColumnIndex = new int[input.InputColumnCollection.Count];
for( int x=0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn90 column = input.InputColumnCollection[x];
bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);
}
}
Dim bufferColumnIndex As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput90 = ComponentMetaData.InputCollection(0)
ReDim bufferColumnIndex(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
Dim column As IDTSInputColumn90 = input.InputColumnCollection(x)
bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)
Next
End Sub
行の追加
コンポーネントは、PipelineBuffer オブジェクトに行を追加することで、下流コンポーネントに行を提供します。データ フロー タスクは、下流コンポーネントに接続される IDTSOutput90 オブジェクトごとに 1 つずつ、出力バッファの配列を PrimeOutput メソッドに渡すパラメータとして提供します。非同期出力型の変換元コンポーネントと変換コンポーネントは、行をバッファに追加し、行の追加が完了したら SetEndOfRowset メソッドを呼び出します。データ フロー タスクは、コンポーネントに提供する出力バッファを管理し、バッファがいっぱいになると、バッファ内の行を自動的に次のコンポーネントに移動します。PrimeOutput メソッドは、繰り返し呼び出される ProcessInput メソッドと異なり、コンポーネントごとに 1 回ずつ呼び出されます。
次のコード例では、コンポーネントが PrimeOutput メソッドで出力バッファに行を追加し、SetEndOfRowset メソッドを呼び出す方法を示します。
public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)
{
for( int x=0; x < outputs; x++ )
{
IDTSOutput90 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);
PipelineBuffer buffer = buffers[x];
// TODO: Add rows to the output buffer.
}
foreach( PipelineBuffer buffer in buffers )
{
/// Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset();
}
}
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)
For x As Integer = 0 To outputs.MaxValue
Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))
Dim buffer As PipelineBuffer = buffers(x)
' TODO: Add rows to the output buffer.
Next
For Each buffer As PipelineBuffer In buffers
' Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset()
Next
End Sub
出力バッファに行を追加するコンポーネントの開発の詳細については、「カスタム変換元コンポーネントの開発」および「非同期出力型のカスタム変換コンポーネントの開発」を参照してください。
行の受信
コンポーネントは、PipelineBuffer オブジェクト内の上流コンポーネントから行を受け取ります。データ フロー タスクでは、上流コンポーネントがデータ フローに追加する行が含まれる PipelineBuffer オブジェクトが、ProcessInput メソッドに渡されるパラメータとして用意されています。この入力バッファを使用して、バッファ内の行および列を検証して変更することはできますが、行を追加したり削除することはできません。ProcessInput メソッドは、使用できるバッファがなくなるまで繰り返し呼び出されます。最後に呼び出されたとき、バッファの RowCount は 0 になり、EndOfRowset プロパティが true になります。バッファを次の行に進める NextRow メソッドを使用することにより、バッファ内の行のコレクションを反復処理できます。バッファがコレクション内の最後の行にある場合、このメソッドは false を返します。
次のコード例では、EndOfRowset プロパティが true を返すまで、コンポーネントが入力バッファ内の行を ProcessInput メソッドで処理する方法を示します。
public override void ProcessInput( int inputID, PipelineBuffer buffer )
{
if( ! buffer.EndOfRowset)
{
IDTSInput90 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
while( buffer.NextRow())
{
// TODO: Examine the columns in the current row.
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
If buffer.EndOfRowset = False Then
Dim input As IDTSInput90 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
While buffer.NextRow() = True
' TODO: Examine the columns in the current row.
End While
End If
End Sub
入力バッファ内の行を受け取るコンポーネントの開発の詳細については、「カスタム変換先コンポーネントの開発」および「同期出力型のカスタム変換コンポーネントの開発」を参照してください。