Desarrollar un componente de transformación personalizado con salidas sincrónicas
Los componentes de transformación con salidas sincrónicas reciben filas de los componentes de nivel superior y leen o modifican los valores de las columnas de estas filas al pasarlas a los componentes de nivel inferior. También pueden definir columnas de salida adicionales que se derivan de las columnas que proporcionan los componentes de nivel superior, pero no agregan filas al flujo de datos. Para obtener más información acerca de la diferencia entre los componentes sincrónicos y asincrónicos, vea Descripción de las transformaciones sincrónicas y asincrónicas.
Este tipo de componente es el adecuado para las tareas en las que se modifican los datos insertados cuando se proporcionan al componente, y donde el componente no tiene que ver todas las filas antes de procesarlas. Se trata de desarrollar el componente más sencillo debido a que las transformaciones con salidas sincrónicas no conectan normalmente a orígenes de datos externos, administran columnas de metadatos externas o agregan filas a búferes de salida.
Crear un componente de transformación con salidas sincrónicas implica agregar un objeto IDTSInput100 que contendrá las columnas de nivel superior seleccionadas para el componente, así como un objeto IDTSOutput100 que puede contener las columnas derivadas creadas por el componente. También incluye implementar métodos en tiempo de diseño y proporcionar código que lee o modifica las columnas en las filas del búfer de entrada durante la ejecución.
En esta sección se facilita la información necesaria para implementar un componente de transformación personalizado y se proporcionan ejemplos de código que sirven de ayuda para comprender mejor los conceptos.
Tiempo de diseño
El código en tiempo de diseño para este componente implica crear entradas y salidas, agregar cualquier columna de salida adicional que genere el componente y validar la configuración del componente.
Crear el componente
Las entradas, salidas y las propiedades personalizadas del componente se crean normalmente durante el método ProvideComponentProperties. Hay dos maneras para agregar la entrada y salida de un componente de transformación con salidas sincrónicas. Puede usar la implementación de la clase base del método y, a continuación, modificar la entrada y salida predeterminada que se crea o puede agregar explícitamente usted mismo la entrada y salida.
En el ejemplo de código siguiente se muestra una implementación de ProvideComponentProperties que agrega explícitamente los objetos de entrada y salida. La llamada a la clase base que realizaría la misma acción se incluye en un comentario.
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "SynchronousComponent", ComponentType = ComponentType.Transform)]
public class SyncComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Add the input.
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
// Add the output.
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
output.SynchronousInputID = input.ID;
// Alternatively, you can let the base class add the input and output
// and set the SynchronousInputID of the output to the ID of the input.
// base.ProvideComponentProperties();
}
}
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
<DtsPipelineComponent(DisplayName:="SynchronousComponent", ComponentType:=ComponentType.Transform)> _
Public Class SyncComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Add the input.
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
input.Name = "Input"
' Add the output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
output.Name = "Output"
output.SynchronousInputID = Input.ID
' Alternatively, you can let the base class add the input and output
' and set the SynchronousInputID of the output to the ID of the input.
' base.ProvideComponentProperties();
End Sub
End Class
Crear y configurar columnas de salida
Aunque los componentes de transformación con salidas sincrónicas no agregan filas a los búferes, pueden agregar columnas de salidas adicionales a su salida. Normalmente, cuando un componente agrega una columna de salida, los valores de la nueva columna de salida se derivan en tiempo de ejecución de los datos contenidos en una o varias de las columnas que el componente de nivel superior proporciona al componente.
Una vez creada una columna de salida, se deben establecer sus propiedades de tipo de datos. Para establecer las propiedades de tipo de datos de una columna de salida se requiere un tratamiento especial y se realiza llamando al método SetDataTypeProperties. Este método es obligatorio porque las propiedades DataType, Length, Precision y CodePage son de solo lectura individualmente, debido a que cada propiedad depende de los valores de la otra. Este método garantiza que los valores de las propiedades se establezcan de forma coherente y que la tarea de flujo de datos valide que se establezcan correctamente.
El DataType de la columna determina los valores que se establecen para otras propiedades. En la tabla siguiente se muestran los requisitos de las propiedades dependientes para cada DataType. En los tipos de datos no enumerados, sus propiedades dependientes se establecen en cero.
Tipo de datos |
Longitud |
Escala |
Precisión |
CodePage |
---|---|---|---|---|
DT_DECIMAL |
0 |
Mayor que 0 y menor o igual que 28. |
0 |
0 |
DT_CY |
0 |
0 |
0 |
0 |
DT_NUMERIC |
0 |
Mayor que 0 y menor o igual que 28 y menor que Precisión. |
Mayor o igual que 1 y menor o igual que 38. |
0 |
DT_BYTES |
Mayor que 0. |
0 |
0 |
0 |
DT_STR |
Mayor que 0 y menor que 8.000. |
0 |
0 |
Distinto de 0 y una página de códigos válida. |
DT_WSTR |
Mayor que 0 y menor que 4.000. |
0 |
0 |
0 |
Puesto que las restricciones en las propiedades de tipo de datos se basan en el tipo de datos de la columna de salida, debe elegir el tipo de datos de Integration Services correcto al trabajar con tipos administrados. La clase base proporciona tres métodos de aplicación auxiliar, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType y DataRecordTypeToBufferType, que ayudan a los programadores de componentes administrados a seleccionar un tipo de datos de SSIS dado un tipo administrado. Estos métodos convierten los tipos de datos administrados en tipos de datos de SSIS y viceversa.
Tiempo de ejecución
Por lo general, la implementación en tiempo de ejecución de parte del componente se clasifica en dos tareas: localización de las columnas de entrada y salida del componente en el búfer y lectura o escritura de los valores de estas columnas en las filas del búfer de entrada.
Localizar columnas en el búfer
El número de columnas en los búferes que se proporcionan a un componente durante la ejecución, superará probablemente el número de columnas en las colecciones de entrada o salida del componente. Esto se debe a que cada búfer contiene todas las columnas de salida definidas en los componentes de un flujo de datos. Para asegurarse de que las columnas de búfer coinciden correctamente con las columnas de entrada o salida, los programadores de componentes deben usar el método FindColumnByLineageID de BufferManager. Este método busca una columna en el búfer especificado por su identificador de linaje. Normalmente, las columnas se encuentran durante PreExecute porque se trata del primer método en tiempo de ejecución donde la propiedad BufferManager pasa a estar disponible.
En el siguiente ejemplo de código se muestra un componente que busca índices de columnas en su columna de colección de columnas de entrada y salida durante PreExecute. Los índices de columna se almacenan en una matriz entera y el componente puede tener acceso a ellos durante ProcessInput.
int []inputColumns;
int []outputColumns;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
inputColumns = new int[input.InputColumnCollection.Count];
outputColumns = new int[output.OutputColumnCollection.Count];
for(int col=0; col < input.InputColumnCollection.Count; col++)
{
IDTSInputColumn100 inputColumn = input.InputColumnCollection[col];
inputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID);
}
for(int col=0; col < output.OutputColumnCollection.Count; col++)
{
IDTSOutputColumn100 outputColumn = output.OutputColumnCollection[col];
outputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID);
}
}
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
ReDim inputColumns(input.InputColumnCollection.Count)
ReDim outputColumns(output.OutputColumnCollection.Count)
For col As Integer = 0 To input.InputColumnCollection.Count
Dim inputColumn As IDTSInputColumn100 = input.InputColumnCollection(col)
inputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID)
Next
For col As Integer = 0 To output.OutputColumnCollection.Count
Dim outputColumn As IDTSOutputColumn100 = output.OutputColumnCollection(col)
outputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID)
Next
End Sub
Procesar las filas
Los componentes reciben objetos PipelineBuffer que contienen filas y columnas del método ProcessInput. Durante este método las filas en el búfer se iteran y las columnas identificadas durante PreExecute se pueden leer y modificar. La tarea de flujo de datos llama repetidamente al método hasta que el componente de nivel superior no proporciona más filas.
Se lee o se escribe una columna individual en el búfer mediante el método de acceso al indizador de matrices o mediante uno de los métodos Set o Get. Los métodos Set y Get son más eficaces y se deben usar cuando se conoce el tipo de datos de la columna en el búfer.
En el siguiente ejemplo de código se muestra una implementación del método ProcessInput que procesa las filas entrantes.
public override void ProcessInput( int InputID, PipelineBuffer buffer)
{
while( buffer.NextRow())
{
for(int x=0; x < inputColumns.Length;x++)
{
if(!buffer.IsNull(inputColumns[x]))
{
object columnData = buffer[inputColumns[x]];
// TODO: Modify the column data.
buffer[inputColumns[x]] = columnData;
}
}
}
}
Public Overrides Sub ProcessInput(ByVal InputID As Integer, ByVal buffer As PipelineBuffer)
While (buffer.NextRow())
For x As Integer = 0 To inputColumns.Length
if buffer.IsNull(inputColumns(x)) = false then
Dim columnData As Object = buffer(inputColumns(x))
' TODO: Modify the column data.
buffer(inputColumns(x)) = columnData
End If
Next
End While
End Sub
Ejemplo
En el ejemplo siguiente se muestra un componente de transformación simple con salidas sincrónicas que convierte en mayúsculas los valores de todas las columnas de cadena. En este ejemplo no se muestran todos los métodos ni funcionalidad tratados en este tema. Muestra los métodos importantes que cada componente de transformación personalizado con salidas sincrónicas debe invalidar, pero no contiene código para la validación en tiempo de diseño.
using System;
using System.Collections;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace Uppercase
{
[DtsPipelineComponent(DisplayName = "Uppercase")]
public class Uppercase : PipelineComponent
{
ArrayList m_ColumnIndexList = new ArrayList();
public override void ProvideComponentProperties()
{
base.ProvideComponentProperties();
ComponentMetaData.InputCollection[0].Name = "Uppercase Input";
ComponentMetaData.OutputCollection[0].Name = "Uppercase Output";
}
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;
foreach (IDTSInputColumn100 column in inputColumns)
{
if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)
{
m_ColumnIndexList.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
}
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
foreach (int columnIndex in m_ColumnIndexList)
{
string str = buffer.GetString(columnIndex);
buffer.SetString(columnIndex, str.ToUpper());
}
}
}
}
}
Imports System
Imports System.Collections
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace Uppercase
<DtsPipelineComponent(DisplayName="Uppercase")> _
Public Class Uppercase
Inherits PipelineComponent
Private m_ColumnIndexList As ArrayList = New ArrayList
Public Overrides Sub ProvideComponentProperties()
MyBase.ProvideComponentProperties
ComponentMetaData.InputCollection(0).Name = "Uppercase Input"
ComponentMetaData.OutputCollection(0).Name = "Uppercase Output"
End Sub
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection
For Each column As IDTSInputColumn100 In inputColumns
If column.DataType = DataType.DT_STR OrElse column.DataType = DataType.DT_WSTR Then
m_ColumnIndexList.Add(CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer))
End If
Next
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While buffer.NextRow
For Each columnIndex As Integer In m_ColumnIndexList
Dim str As String = buffer.GetString(columnIndex)
buffer.SetString(columnIndex, str.ToUpper)
Next
End While
End Sub
End Class
End Namespace
|
Vea también
Conceptos
Desarrollar un componente de transformación personalizado con salidas asincrónicas
Descripción de las transformaciones sincrónicas y asincrónicas
Crear una transformación sincrónica con el componente de script