Desarrollar componentes de flujo de datos con varias entradas
Un componente de flujo de datos con varias entradas puede utilizar demasiada memoria si sus diversas entradas producen datos a velocidades desiguales. Al desarrollar un componente de flujo de datos personalizado que admite dos o más entradas, puede administrar esta presión de memoria utilizando los siguientes miembros en el espacio de nombres Microsoft.SqlServer.Dts.Pipeline:
La propiedad DtsPipelineComponentAttribute.SupportsBackPressure de la clase DtsPipelineComponentAttribute. Establezca el valor de esta propiedad en true si desea implementar el código que es necesario que el componente de flujo de datos personalizado administre datos que fluyen a velocidades desiguales.
El método IsInputReady de la clase PipelineComponent. Debe proporcionar una implementación de este método si establece la propiedad SupportsBackPressure en true. Si no proporciona una implementación, el motor de flujo de datos produce una excepción en tiempo de ejecución.
El método GetDependentInputs de la clase PipelineComponent. También debe proporcionar una implementación de este método si establece la propiedad SupportsBackPressure en true y su componente personalizado admite más de dos entradas. Si no proporciona una implementación, el motor de flujo de datos produce una excepción en tiempo de ejecución si el usuario adjunta más de dos entradas.
Juntos, estos miembros le permiten desarrollar una solución para la presión de memoria que es similar a la solución que Microsoft desarrolló para las transformaciones Combinación y Combinación de mezcla.
Establecer la propiedad SupportsBackPressure
El primer paso para implementar la mejor administración de memoria para un componente de flujo de datos personalizado que admite varias entradas es establecer el valor de la propiedad SupportsBackPressure en true en la clase DtsPipelineComponentAttribute. Cuando el valor de SupportsBackPressure es true, el motor de flujo de datos llama al método IsInputReady y, cuando hay más de dos entradas, también llama en tiempo de ejecución al método GetDependentInputs.
Ejemplo
En el siguiente ejemplo, la implementación de DtsPipelineComponentAttribute establece el valor de la propiedad SupportsBackPressure en true.
[DtsPipelineComponent(ComponentType = ComponentType.Transform,
DisplayName = "Shuffler",
Description = "Shuffle the rows from input.",
SupportsBackPressure = true,
LocalizationType = typeof(Localized),
IconResource = "Microsoft.Samples.SqlServer.Dts.MIBPComponent.ico")
]
public class Shuffler : Microsoft.SqlServer.Dts.Pipeline.PipelineComponent
{
...
}
Implementar el método IsInputReady
Al establecer el valor de la propiedad SupportsBackPressure en true en el objeto DtsPipelineComponentAttribute, también debe proporcionar una implementación para el método IsInputReady de la clase PipelineComponent.
[!NOTA]
Su implementación del método IsInputReady no debería llamar a las implementaciones en la clase base. La implementación predeterminada de este método en la clase base simplemente genera una NotImplementedException.
Al implementar este método, establece el estado de un elemento en la matriz canProcess Boolean para cada una de las entradas del componente. (Sus entradas se identifican mediante los valores del identificador en la matriz inputIDs). Al establecer el valor de un elemento en la matriz canProcess en true para una entrada, el motor de flujo de datos llama al método ProcessInput del componente y proporciona más datos para la entrada especificada.
Aunque los datos del nivel superior estén disponibles, el valor del elemento de la matriz canProcess de una entrada al menos siempre debe ser true o procesar las detenciones.
El motor de flujo de datos llama al método IsInputReady antes de enviar cada búfer de datos para determinar qué entradas esperan recibir más datos. Cuando el valor devuelto indica que se bloquea una entrada, el motor de flujo de datos almacena temporalmente en memoria caché los búferes adicionales de datos para esa entrada en lugar de enviarlos al componente.
[!NOTA]
No llame a los métodos GetDependentInputs o IsInputReady en su propio código. El motor de flujo de datos llama a estos métodos y a los otros métodos de la clase PipelineComponent que invalide, cuando el motor de flujo de datos ejecuta su componente.
Ejemplo
En el siguiente ejemplo, la implementación del método IsInputReady indica que una entrada está esperando recibir más datos cuando se cumplen las condiciones siguientes:
Los datos de nivel superior están disponibles para la entrada (!inputEOR).
El componente no tiene los datos disponibles actualmente para procesar la entrada en los búferes que el componente ya haya recibido (inputBuffers[inputIndex].CurrentRow() == null).
Si una entrada espera recibir más datos, el componente de flujo de datos lo indica estableciendo en true el valor del elemento en la matriz canProcess que corresponde a esa entrada.
Por el contrario, cuando el componente todavía tiene los datos disponibles para procesar la entrada, el ejemplo suspende su procesamiento. Para ello, en el ejemplo se establece en false el valor del elemento en la matriz canProcess que corresponde a esa entrada.
public override void IsInputReady(int[] inputIDs, ref bool[] canProcess)
{
for (int i = 0; i < inputIDs.Length; i++)
{
int inputIndex = ComponentMetaData.InputCollection.GetObjectIndexByID(inputIDs[i]);
canProcess[i] = (inputBuffers[inputIndex].CurrentRow() == null)
&& !inputEOR[inputIndex];
}
}
El ejemplo anterior utiliza la matriz inputEOR de tipo Boolean para indicar si los datos de nivel superior están disponibles para cada entrada. EOR en el nombre de la matriz representa "fin del conjunto de filas" y hace referencia a la propiedad EndOfRowset de los búferes del flujo de datos. En una parte del ejemplo que no está incluido aquí, el método ProcessInput comprueba el valor de la propiedad EndOfRowset cada búfer de datos que recibe. Cuando el valor true indica que no hay más datos de nivel superior disponibles para una entrada, en el ejemplo se establece el valor del elemento de la matriz inputEOR para esa entrada en true. Este ejemplo del método IsInputReady establece el valor del elemento correspondiente en la matriz canProcess en false para una entrada cuando el valor del elemento de la matriz inputEOR indica que no hay más datos del nivel superior disponibles para la misma.
Implementar el método GetDependentInputs
Cuando su componente de flujo de datos personalizado admite más de dos entradas, también debe proporcionar una implementación para el método GetDependentInputs de la clase PipelineComponent.
[!NOTA]
Su implementación del método GetDependentInputs no debería llamar a las implementaciones en la clase base. La implementación predeterminada de este método en la clase base simplemente genera una NotImplementedException.
El motor de flujo de datos solo llama al método GetDependentInputs cuando el usuario adjunta más de dos entradas al componente. Cuando un componente tiene solo dos entradas y el método IsInputReady indica que una está bloqueada (canProcess = false), el motor de flujo de datos sabe que la otra entrada está esperando recibir más datos. Sin embargo, cuando hay más de dos entradas y el método IsInputReady indica que una está bloqueada, el código adicional del método GetDependentInputs identifica qué entradas esperan recibir más datos.
[!NOTA]
No llame a los métodos GetDependentInputs o IsInputReady en su propio código. El motor de flujo de datos llama a estos métodos y a los otros métodos de la clase PipelineComponent que invalide, cuando ejecuta su componente.
Ejemplo
Para una entrada concreta que se bloquea, la siguiente implementación del método GetDependentInputs devuelve una colección de las entradas que están esperando recibir más datos y, por consiguiente, bloquean la entrada especificada. El componente identifica las entradas del bloqueo comprobando las entradas distintas de la entrada bloqueada que no tienen actualmente datos disponibles para procesar en los búferes que el componente ya ha recibido (inputBuffers[i].CurrentRow() == null). A continuación, el método GetDependentInputs devuelve la colección de entradas del bloqueo como una recopilación de identificadores de entrada.
public override Collection<int> GetDependentInputs(int blockedInputID)
{
Collection<int> currentDependencies = new Collection<int>();
for (int i = 0; i < ComponentMetaData.InputCollection.Count; i++)
{
if (ComponentMetaData.InputCollection[i].ID != blockedInputID
&& inputBuffers[i].CurrentRow() == null)
{
currentDependencies.Add(ComponentMetaData.InputCollection[i].ID);
}
}
return currentDependencies;
}