Développement d'un composant de transformation personnalisé à sorties synchrones

S’applique à :SQL Server SSIS Integration Runtime dans Azure Data Factory

Les composants de transformation à sorties synchrones reçoivent des lignes en provenance des composants en amont, puis lisent ou modifient les valeurs comprises dans les colonnes de ces lignes alors qu'ils transfèrent les lignes aux composants en aval. Ils peuvent également définir des colonnes de sortie supplémentaires dérivées des colonnes fournies par les composants en amont, mais ils n'ajoutent pas de lignes au flux de données. Pour plus d’informations sur la différence entre les composants synchrones et asynchrones, consultez Présentation des transformations synchrones et asynchrones.

Ce type de composant convient aux tâches dans lesquelles les données sont modifiées en ligne à mesure qu'elles sont fournies au composant et dans lesquelles le composant n'a pas à consulter toutes les lignes avant de les traiter. Il s'agit du composant le plus facile à développer parce que les transformations à sorties synchrones ne se connectent pas en général à des sources de données externes, gèrent des colonnes de métadonnées externes ou ajoutent des lignes aux tampons de sortie.

La création d'un composant de transformation à sorties synchrones implique d'ajouter un objet IDTSInput100 qui contiendra des colonnes en amont sélectionnées pour le composant, et un objet IDTSOutput100 qui peut contenir des colonnes dérivées créées par le composant. Il inclut également l'implémentation des méthodes de conception, ainsi que la fourniture du code qui lit ou modifie les colonnes dans les lignes du tampon entrantes pendant l'exécution.

Cette section fournit les informations requises pour implémenter un composant de transformation personnalisé et des exemples de code pour vous aider à mieux comprendre les concepts.

Moment de la conception

Le code du moment de la conception de ce composant implique la création des entrées et sorties, l'ajout de toutes colonnes de sortie supplémentaires que le composant génère et la validation de la configuration du composant.

Création du composant

Les entrées, sorties et propriétés personnalisées du composant sont en général créées pendant la méthode ProvideComponentProperties. Il existe deux manières d'ajouter l'entrée et la sortie d'un composant de transformation à sorties synchrones. Vous pouvez utiliser l'implémentation de la classe de base de la méthode, puis modifier l'entrée et la sortie par défaut qu'elle crée, ou vous pouvez ajouter explicitement l'entrée et la sortie vous-même.

L'exemple de code suivant présente une implémentation de la méthode ProvideComponentProperties qui ajoute explicitement les objets d'entrée et de sortie. L'appel de la classe de base qui accomplirait la même chose est inclus dans un commentaire.

using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "SynchronousComponent", ComponentType = ComponentType.Transform)]  
    public class SyncComponent : PipelineComponent  
    {  
  
        public override void ProvideComponentProperties()  
        {  
            // Add the input.  
            IDTSInput100 input = ComponentMetaData.InputCollection.New();  
            input.Name = "Input";  
  
            // Add the output.  
            IDTSOutput100 output = ComponentMetaData.OutputCollection.New();  
            output.Name = "Output";  
            output.SynchronousInputID = input.ID;  
  
            // Alternatively, you can let the base class add the input and output  
            // and set the SynchronousInputID of the output to the ID of the input.  
            // base.ProvideComponentProperties();  
        }  
    }  
}  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  
  
<DtsPipelineComponent(DisplayName:="SynchronousComponent", ComponentType:=ComponentType.Transform)> _  
Public Class SyncComponent  
    Inherits PipelineComponent  
  
    Public Overrides Sub ProvideComponentProperties()  
  
        ' Add the input.  
        Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()  
        input.Name = "Input"  
  
        ' Add the output.  
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()  
        output.Name = "Output"  
        output.SynchronousInputID = Input.ID  
  
        ' Alternatively, you can let the base class add the input and output  
        ' and set the SynchronousInputID of the output to the ID of the input.  
        ' base.ProvideComponentProperties();  
  
    End Sub  
  
End Class  

Création et configuration de colonnes de sortie

Bien que les composants de transformation à sorties synchrones n'ajoutent pas de lignes aux tampons, ils peuvent ajouter des colonnes de sortie supplémentaires à leur sortie. En général, lorsqu'un composant ajoute une colonne de sortie, les valeurs de la nouvelle colonne de sortie sont dérivées au moment de l'exécution des données contenues dans une ou plusieurs colonnes parmi celles fournies au composant par un composant en amont.

Après avoir créé une colonne de sortie, ses propriétés de type de données doivent être définies. La définition des propriétés de type de données d'une colonne de sortie requiert une gestion spéciale et s'effectue en appelant la méthode SetDataTypeProperties. Cette méthode est requise parce que les propriétés DataType, Length, Precision et CodePage sont individuellement en lecture seule, parce que chacune dépend des paramètres de l'autre. Cette méthode garantit que les valeurs des propriétés sont définies de manière cohérente et que la tâche de flux de données valide leur définition correcte.

La propriété DataType de la colonne détermine les valeurs définies pour les autres propriétés. Le tableau suivant indique les conditions requises sur les propriétés dépendantes de chaque propriété DataType. Les types de données non répertoriés ont leurs propriétés dépendantes définies sur zéro.

DataType Longueur Scale Précision CodePage
DT_DECIMAL 0 Supérieur à 0 et inférieur ou égal à 28. 0 0
DT_CY 0 0 0 0
DT_NUMERIC 0 Supérieur à 0 et inférieur ou égale à 28 et inférieur à Précision. Supérieur ou égal à 1 et inférieur ou égal à 38. 0
DT_BYTES Supérieur à 0. 0 0 0
DT_STR Supérieur à 0 et inférieur à 8 000. 0 0 Différent de 0 et une page de codes valide.
DT_WSTR Supérieur à 0 et inférieur à 4000. 0 0 0

Étant donné que les restrictions sur les propriétés de type de données sont basées sur le type de données de la colonne de sortie, vous devez choisir le type de données Integration Services correct lorsque vous utilisez des types managés. La classe de base fournit trois méthodes d'assistance, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType et DataRecordTypeToBufferType qui aident les développeurs de composants managés à sélectionner un type de données SSIS en fonction d'un type managé. Ces méthodes convertissent des types de données managées en types de données SSIS, et vice versa.

Moment de l'exécution

En général, l’implémentation de la partie exécution du composant se divise en deux tâches : localisation des colonnes d’entrée et de sortie du composant dans le tampon et lecture ou écriture des valeurs de ces colonnes dans les lignes du tampon entrantes.

Localisation des colonnes dans le tampon

Le nombre de colonnes dans les tampons fournis à un composant pendant l'exécution dépassera probablement le nombre de colonnes dans les collections d'entrée ou de sortie du composant. Ce dépassement est dû au fait que chaque tampon contient toutes les colonnes de sortie définies dans les composants d'un flux de données. Pour garantir que les colonnes de tampon sont correctement mises en correspondance avec les colonnes d'entrée ou de sortie, les développeurs de composants doivent utiliser la méthode FindColumnByLineageID de la propriété BufferManager. Cette méthode localise une colonne dans le tampon spécifié par son ID de lignage. En général, les colonnes sont localisées pendant la méthode PreExecute parce qu'il s'agit de la première méthode d'exécution où la propriété BufferManager devient disponible.

L'exemple de code suivant présente un composant qui localise des index des colonnes dans sa collection de colonnes d'entrée et de sortie pendant la méthode PreExecute. Les index de colonne sont stockés dans un tableau d'entiers et sont accessibles via le composant pendant la méthode ProcessInput.

int []inputColumns;  
int []outputColumns;  
  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
  
    inputColumns = new int[input.InputColumnCollection.Count];  
    outputColumns = new int[output.OutputColumnCollection.Count];  
  
    for(int col=0; col < input.InputColumnCollection.Count; col++)  
    {  
        IDTSInputColumn100 inputColumn = input.InputColumnCollection[col];  
        inputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID);  
    }  
  
    for(int col=0; col < output.OutputColumnCollection.Count; col++)  
    {  
        IDTSOutputColumn100 outputColumn = output.OutputColumnCollection[col];  
        outputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID);  
    }  
  
}  
Public Overrides Sub PreExecute()  
  
    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
  
    ReDim inputColumns(input.InputColumnCollection.Count)  
    ReDim outputColumns(output.OutputColumnCollection.Count)  
  
    For col As Integer = 0 To input.InputColumnCollection.Count  
  
        Dim inputColumn As IDTSInputColumn100 = input.InputColumnCollection(col)  
        inputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID)  
    Next  
  
    For col As Integer = 0 To output.OutputColumnCollection.Count  
  
        Dim outputColumn As IDTSOutputColumn100 = output.OutputColumnCollection(col)  
        outputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID)  
    Next  
  
End Sub  

Traitement de lignes

Les composants reçoivent des objets PipelineBuffer qui contiennent des lignes et des colonnes dans la méthode ProcessInput. Pendant cette méthode, les lignes comprises dans le tampon sont parcourues, puis les colonnes identifiées pendant la méthode PreExecute sont lues et modifiées. La méthode est appelée à plusieurs reprises par la tâche de flux de données jusqu'à ce qu'aucune ligne ne soit fournie à partir du composant en amont.

Une colonne individuelle du tampon est lue ou écrite en utilisant la méthode d’accès indexeur de tableau ou l’une des méthodes Get ou Set. Les méthodes Get et Set sont plus efficaces et doivent être utilisées lorsque le type de données de la colonne dans le tampon est connu.

L'exemple de code suivant présente une implémentation de la méthode ProcessInput qui traite des lignes entrantes.

public override void ProcessInput( int InputID, PipelineBuffer buffer)  
{  
       while( buffer.NextRow())  
       {  
            for(int x=0; x < inputColumns.Length;x++)  
            {  
                if(!buffer.IsNull(inputColumns[x]))  
                {  
                    object columnData = buffer[inputColumns[x]];  
                    // TODO: Modify the column data.  
                    buffer[inputColumns[x]] = columnData;  
                }  
            }  
  
      }  
}  
Public Overrides Sub ProcessInput(ByVal InputID As Integer, ByVal buffer As PipelineBuffer)  
  
        While (buffer.NextRow())  
  
            For x As Integer = 0 To inputColumns.Length  
  
                if buffer.IsNull(inputColumns(x)) = false then  
  
                    Dim columnData As Object = buffer(inputColumns(x))  
                    ' TODO: Modify the column data.  
                    buffer(inputColumns(x)) = columnData  
  
                End If  
            Next  
  
        End While  
End Sub  

Exemple

L'exemple suivant présente un composant de transformation simple à sorties synchrones qui convertit les valeurs de toutes les colonnes de chaîne en majuscule. Cet exemple ne contient pas toutes les méthodes et fonctionnalités présentées dans cette rubrique. Il illustre les méthodes importantes que chaque composant de transformation personnalisé à sorties synchrones doit substituer, mais ne contient pas de code pour la validation au moment de la conception.

using System;  
using System.Collections;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
namespace Uppercase  
{  
  [DtsPipelineComponent(DisplayName = "Uppercase")]  
  public class Uppercase : PipelineComponent  
  {  
    ArrayList m_ColumnIndexList = new ArrayList();  
  
    public override void ProvideComponentProperties()  
    {  
      base.ProvideComponentProperties();  
      ComponentMetaData.InputCollection[0].Name = "Uppercase Input";  
      ComponentMetaData.OutputCollection[0].Name = "Uppercase Output";  
    }  
  
    public override void PreExecute()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection[0];  
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;  
  
      foreach (IDTSInputColumn100 column in inputColumns)  
      {  
        if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)  
        {  
          m_ColumnIndexList.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));  
        }  
      }  
    }  
  
    public override void ProcessInput(int inputID, PipelineBuffer buffer)  
    {  
      while (buffer.NextRow())  
      {  
        foreach (int columnIndex in m_ColumnIndexList)  
        {  
          string str = buffer.GetString(columnIndex);  
          buffer.SetString(columnIndex, str.ToUpper());  
        }  
      }  
    }  
  }  
}  
Imports System   
Imports System.Collections   
Imports Microsoft.SqlServer.Dts.Pipeline   
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper   
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper   
Namespace Uppercase   
  
 <DtsPipelineComponent(DisplayName="Uppercase")> _   
 Public Class Uppercase   
 Inherits PipelineComponent   
   Private m_ColumnIndexList As ArrayList = New ArrayList   
  
   Public  Overrides Sub ProvideComponentProperties()   
     MyBase.ProvideComponentProperties   
     ComponentMetaData.InputCollection(0).Name = "Uppercase Input"   
     ComponentMetaData.OutputCollection(0).Name = "Uppercase Output"   
   End Sub   
  
   Public  Overrides Sub PreExecute()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)   
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection   
     For Each column As IDTSInputColumn100 In inputColumns   
       If column.DataType = DataType.DT_STR OrElse column.DataType = DataType.DT_WSTR Then   
         m_ColumnIndexList.Add(CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer))   
       End If   
     Next   
   End Sub   
  
   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)   
     While buffer.NextRow   
       For Each columnIndex As Integer In m_ColumnIndexList   
         Dim str As String = buffer.GetString(columnIndex)   
         buffer.SetString(columnIndex, str.ToUpper)   
       Next   
     End While   
   End Sub   
 End Class   
End Namespace  

Voir aussi

Développement d'un composant de transformation personnalisé à sorties asynchrones
Présentation des transformations synchrones et asynchrones
Création d'une transformation synchrone à l'aide du composant Script