Métodos en tiempo de ejecución de un componente de flujo de datos
En tiempo de ejecución, la tarea de flujo de datos examina la secuencia de componentes, prepara un plan de ejecución y administra un grupo de subprocesos de trabajo que ejecutan el plan de trabajo. La tarea carga filas de datos de los orígenes, las procesa a través de las transformaciones y, a continuación, las guarda en los destinos.
Secuencia de ejecución del método
Durante la ejecución de un componente de flujo de datos, se llama a un subconjunto de métodos en la clase base PipelineComponent. Los métodos, y la secuencia en la que se llaman, siempre son los mismos, con la excepción de los métodos ProcessInput y PrimeOutput. Se llama a estos dos métodos dependiendo de la existencia y configuración de los objetos IDTSInput100 y IDTSOutput100 de un componente.
La lista siguiente muestra los métodos en el orden en el que se llaman durante la ejecución del componente. Tenga en cuenta que cuando se llama a PrimeOutput, siempre se llama antes de ProcessInput.
Método PrimeOutput
Se llama al método PrimeOutput cuando un componente tiene por lo menos una salida, adjuntada a un componente de nivel inferior a través de un objeto IDTSPath100, y la propiedad SynchronousInputID de la salida es cero. Se llama al método PrimeOutput para los componentes de origen y para las transformaciones con salidas asincrónicas. A diferencia del método ProcessInput que se describe a continuación, cada componente que lo requiere llama al método PrimeOutput una sola vez.
Método ProcessInput
Un objeto IDTSPath100 llama al método ProcessInput para los componentes que tienen al menos una salida adjuntada a un componente de nivel superior. Se llama al método ProcessInput para los componentes de destino y para las transformaciones con salidas sincrónicas. Se llama a ProcessInput varias veces hasta que no hay más filas de componentes de nivel superior que procesar.
Trabajar con entradas y salidas
En tiempo de ejecución, los componentes de flujo de datos realizan las tareas siguientes:
Los componentes de origen agregan filas.
Los componentes de transformación con salidas sincrónicas reciben las filas que proporcionan los componentes de origen.
Los componentes de transformación con salidas asincrónicas reciben filas y agregan filas.
Los componentes de destino reciben filas y, a continuación, las cargan en un destino.
Durante la ejecución, la tarea de flujo de datos asigna los objetos PipelineBuffer que contienen todas las columnas definidas en las colecciones de columna de salida de una secuencia de componentes. Por ejemplo, si uno de cada cuatro componentes en una secuencia del flujo de datos agrega una columna de salida a su colección de columna de salida, el búfer que se proporciona a cada componente contiene cuatro columnas, una por cada columna de salida por componente. Debido a este comportamiento, un componente recibe a veces búferes que contienen columnas que no usará el componente.
Puesto que los búferes que recibe su componente pueden contener columnas que el componente no usará, busque las columnas que desea usar en las colecciones de columna de entrada y salida de su componente en el búfer que la tarea de flujo de datos proporciona al componente. Para ello, use el método FindColumnByLineageID de la propiedad BufferManager. Por razones de rendimiento, esta tarea normalmente se realiza durante el método PreExecute, en lugar de en PrimeOutput o ProcessInput.
Se llama a PreExecute antes que a los métodos ProcessInput y PrimeOutput, y es la primera oportunidad para que un componente realice esta tarea después de que BufferManager pase a estar disponible para el componente. Durante este método, el componente debería buscar sus columnas en los búferes y almacenar internamente esta información, de manera que las columnas se puedan usar en cualquiera de los métodos ProcessInput o PrimeOutput.
En el siguiente ejemplo de código se muestra cómo un componente de transformación con una salida sincrónica busca sus columnas de entrada en el búfer durante PreExecute.
private int []bufferColumnIndex;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
bufferColumnIndex = new int[input.InputColumnCollection.Count];
for( int x=0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);
}
}
Dim bufferColumnIndex As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim bufferColumnIndex(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
Dim column As IDTSInputColumn100 = input.InputColumnCollection(x)
bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)
Next
End Sub
Agregar filas
Los componentes proporcionan filas a los componentes de nivel inferior agregándolas a los objetos PipelineBuffer. La tarea de flujo de datos proporciona una matriz de búferes de salida, uno para cada objeto IDTSOutput100 que está conectado a un componente de nivel inferior, como un parámetro al método PrimeOutput. Los componentes de origen y componentes de transformación con salidas asincrónicas agregan filas a los búferes y llaman al método SetEndOfRowset cuando terminan de agregar filas. La tarea de flujo de datos administra los búferes de salida que proporciona a los componentes y, cuando un búfer se llena, automáticamente mueve las filas del búfer al componente siguiente. Se llama al método PrimeOutput una vez por componente, a diferencia del método ProcessInput, al que se llama varias veces.
En el siguiente ejemplo de código se muestra cómo un componente agrega filas a sus búferes de salida durante el método PrimeOutput y, a continuación, llama al método SetEndOfRowset.
public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)
{
for( int x=0; x < outputs; x++ )
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);
PipelineBuffer buffer = buffers[x];
// TODO: Add rows to the output buffer.
}
foreach( PipelineBuffer buffer in buffers )
{
/// Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset();
}
}
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)
For x As Integer = 0 To outputs.MaxValue
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))
Dim buffer As PipelineBuffer = buffers(x)
' TODO: Add rows to the output buffer.
Next
For Each buffer As PipelineBuffer In buffers
' Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset()
Next
End Sub
Para obtener más información acerca de cómo desarrollar componentes que agregan filas a los búferes de salida, vea Desarrollar un componente de origen personalizado y Desarrollar un componente de transformación personalizado con salidas asincrónicas.
Recibir filas
Los componentes reciben filas de los componentes de nivel superior en objetos PipelineBuffer. La tarea de flujo de datos proporciona un objeto PipelineBuffer que contiene las filas agregadas al flujo de datos por componentes de nivel superior como un parámetro al método ProcessInput. Este búfer de entrada se puede usar para examinar y modificar filas y columnas en el búfer, pero no se puede usar para agregar o quitar filas. Se llama al método ProcessInput varias veces hasta que no hay más búferes disponibles. En la última llamada, la propiedad EndOfRowset es true. Puede iterar sobre la colección de filas en el búfer mediante el método NextRow, que hace avanzar el búfer a la siguiente fila. Este método devuelve false cuando el búfer está activado en la última fila de la colección. No tiene que comprobar la propiedad EndOfRowset, a menos que tenga que realizar una acción adicional después de que las últimas filas de datos se hayan procesado.
El texto siguiente muestra el patrón correcto para usar el método NextRow y la propiedad EndOfRowset:
while (buffer.NextRow())
{
// Hacer algo con cada fila.
}
if (buffer.EndOfRowset)
{
// Opcionalmente, hacer algo después de procesar todas las filas.
}
En el siguiente ejemplo de código se muestra cómo un componente procesa filas en búferes de entrada durante el método ProcessInput.
public override void ProcessInput( int inputID, PipelineBuffer buffer )
{
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
while( buffer.NextRow())
{
// TODO: Examine the columns in the current row.
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
While buffer.NextRow() = True
' TODO: Examine the columns in the current row.
End While
End Sub
Para obtener más información acerca de cómo desarrollar componentes que reciben filas en búferes de entrada, vea Desarrollar un componente de destino personalizado y Desarrollar un componente de transformación personalizado con salidas sincrónicas.
|