Sviluppo di un componente di destinazione personalizzato

Si applica a:SQL Server Integration Runtime SSIS in Azure Data Factory

Microsoft SQL Server Integration Services offre agli sviluppatori la possibilità di scrivere componenti di destinazione personalizzati in grado di connettersi a un'origine dati personalizzata e archiviarvi i dati. I componenti di destinazione personalizzati risultano utili quando è necessario connettersi a origini dati che non sono accessibili tramite uno dei componenti di destinazione disponibili in Integration Services.

I componenti di destinazione includono uno o più input e nessun output. In fase di progettazione creano e configurano connessioni e leggono metadati di colonne dall'origine dati esterna. Durante l'esecuzione si connettono all'origine dati esterna e vi aggiungono le righe ricevute dai componenti a monte nel flusso di dati. Se l'origine dati esterna esiste prima dell'esecuzione del componente, il componente di destinazione deve anche assicurarsi che i tipi di dati delle colonne ricevute corrispondano ai tipi di dati delle colonne presenti nell'origine dati esterna.

In questa sezione viene descritto come sviluppare componenti di destinazione e vengono forniti esempi di codice per chiarire i concetti importanti. Per una panoramica generale sullo sviluppo di componenti flusso dati, vedere Sviluppo di un componente flusso di dati personalizzato.

Fase di progettazione

L'implementazione della funzionalità in fase di progettazione di un componente di destinazione implica la specifica di una connessione a un'origine dati esterna e la verifica che il componente sia correttamente configurato. Per definizione, un componente di destinazione include un input e possibilmente un output degli errori.

Creazione del componente

I componenti di destinazione si connettono a origini dati esterne tramite gli oggetti ConnectionManager definiti in un pacchetto. Il componente di destinazione indica il proprio requisito di una gestione connessione a Progettazione SSIS e agli utenti del componente aggiungendo un elemento alla raccolta RuntimeConnectionCollection di ComponentMetaData. Questa raccolta svolge due funzioni: indica innanzitutto a Progettazione SSIS la necessità di una gestione connessione, quindi, dopo che l'utente ha selezionato o creato una gestione connessione, mantiene un riferimento alla gestione connessione nel pacchetto usato dal componente. Quando un oggetto IDTSRuntimeConnection100 viene aggiunto alla raccolta, in Editor avanzato viene visualizzata la scheda Proprietà connessione, in cui si richiede di selezionare o creare nel pacchetto una connessione che verrà usata dal componente.

Nell'esempio di codice seguente è illustrata un'implementazione di ProvideComponentProperties che aggiunge un input, quindi aggiunge un oggetto IDTSRuntimeConnection100 a RuntimeConnectionCollection.

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]  
    public class DestinationComponent : PipelineComponent   
    {  
        public override void ProvideComponentProperties()  
        {  
            // Reset the component.  
            base.RemoveAllInputsOutputsAndCustomProperties();  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();  
  
            IDTSInput100 input = ComponentMetaData.InputCollection.New();  
            input.Name = "Input";  
  
            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();  
            connection.Name = "ADO.net";  
        }  
    }  
}  
Imports System  
Imports System.Data  
Imports System.Data.SqlClient  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  
  
Namespace Microsoft.Samples.SqlServer.Dts  
  
    <DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _  
    Public Class DestinationComponent  
        Inherits PipelineComponent  
  
        Public Overrides Sub ProvideComponentProperties()  
  
            ' Reset the component.  
            Me.RemoveAllInputsOutputsAndCustomProperties()  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll()  
  
            Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()  
            input.Name = "Input"  
  
            Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()  
            connection.Name = "ADO.net"  
  
        End Sub  
    End Class  
End Namespace  

Connessione a un'origine dati esterna

Dopo che una connessione viene aggiunta a RuntimeConnectionCollection, eseguire l'override del metodo AcquireConnections per stabilire una connessione all'origine dati esterna. Questo metodo viene chiamato in fase di progettazione e in fase di esecuzione. Il componente deve stabilire una connessione alla gestione connessione specificata dalla connessione di runtime e, successivamente, all'origine dati esterna. Una volta stabilita, la connessione deve essere memorizzata nella cache interna dal componente e rilasciata quando viene chiamato il metodo ReleaseConnections. Gli sviluppatori eseguono l'override di questo metodo e rilasciano la connessione stabilita dal componente durante AcquireConnections. Entrambi questi metodi, ReleaseConnections e AcquireConnections, vengono chiamati in fase di progettazione e in fase di esecuzione.

Nell'esempio di codice seguente è illustrato un componente che si connette a una connessione ADO.NET nel metodo AcquireConnections, quindi chiude la connessione in ReleaseConnections.

using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
private SqlConnection sqlConnection;  
  
public override void AcquireConnections(object transaction)  
{  
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)  
    {  
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);  
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;  
  
        if (cmado == null)  
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");  
  
        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;  
        sqlConnection.Open();  
    }  
}  
  
public override void ReleaseConnections()  
{  
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)  
        sqlConnection.Close();  
}  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  
  
Private sqlConnection As SqlConnection  
  
Public Overrides Sub AcquireConnections(ByVal transaction As Object)  
  
    If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then  
  
        Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)  
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)  
  
        If IsNothing(cmado) Then  
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")  
        End If  
  
        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)  
        sqlConnection.Open()  
  
    End If  
End Sub  
  
Public Overrides Sub ReleaseConnections()  
  
    If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then  
        sqlConnection.Close()  
    End If  
  
End Sub  

Convalida del componente

Gli sviluppatori di componenti di destinazione devono eseguire la convalida come descritto in Convalida del componente. Devono inoltre verificare che le proprietà del tipo di dati delle colonne definite nella raccolta di colonne di input del componente corrispondano alle colonne nell'origine dati esterna. A volte, la verifica delle colonne di input rispetto all'origine dati esterna può essere impossibile o indesiderata, ad esempio quando il componente o Progettazione SSIS è in uno stato disconnesso o quando i round trip al server non sono accettabili. In queste situazioni, le colonne nella raccolta di colonne di input possono comunque essere convalidate tramite ExternalMetadataColumnCollection dell'oggetto di output.

Questa raccolta esiste sia negli oggetti di input che di output e deve essere popolata dallo sviluppatore di componenti dalle colonne dell'origine dati esterna. È possibile usare questa raccolta per convalidare le colonne di input quando Progettazione SSIS è offline, quando il componente è disconnesso o quando la proprietà ValidateExternalMetadata è false.

Nel codice di esempio seguente viene aggiunta una colonna di metadati esterna basata su una colonna di input esistente.

private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)  
{  
    // Set the properties of the external metadata column.  
    IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();  
    externalColumn.Name = inputColumn.Name;  
    externalColumn.Precision = inputColumn.Precision;  
    externalColumn.Length = inputColumn.Length;  
    externalColumn.DataType = inputColumn.DataType;  
    externalColumn.Scale = inputColumn.Scale;  
  
    // Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID;  
}  
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)  
  
    ' Set the properties of the external metadata column.  
    Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()  
    externalColumn.Name = inputColumn.Name  
    externalColumn.Precision = inputColumn.Precision  
    externalColumn.Length = inputColumn.Length  
    externalColumn.DataType = inputColumn.DataType  
    externalColumn.Scale = inputColumn.Scale  
  
    ' Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID  
  
End Sub  

Fase di esecuzione

Durante l'esecuzione il componente di destinazione riceve una chiamata al metodo ProcessInput ogni volta che il componente a monte rende disponibile un oggetto PipelineBuffer pieno. Questo metodo viene chiamato più volte finché non sono più disponibili buffer e la proprietà EndOfRowset è true. Durante questo metodo, i componenti di destinazione leggono le colonne e le righe nel buffer e le aggiungono all'origine dati esterna.

Individuazione di colonne nel buffer

Il buffer di input per un componente contiene tutte le colonne definite nelle raccolte di colonne di output dei componenti a monte del componente nel flusso di dati. Se ad esempio un componente di origine prevede tre colonne nel relativo output e il componente successivo aggiunge un'altra colonna di output, il buffer fornito al componente di destinazione contiene quattro colonne, anche se quest'ultimo ne scriverà solo due.

L'ordine delle colonne nel buffer di input non è definito dall'indice della colonna di output nella raccolta di colonne di input del componente di destinazione. Le colonne possono essere individuate in modo affidabile in una riga del buffer solo tramite il metodo FindColumnByLineageID di BufferManager. Questo metodo individua la colonna con l'ID di derivazione specificato nel buffer specificato, quindi restituisce la relativa posizione nella riga. Gli indici delle colonne di input vengono in genere individuati durante il metodo PreExecute e vengono memorizzati nella cache dallo sviluppatore per un utilizzo successivo durante ProcessInput.

Nell'esempio di codice seguente viene individuata la posizione delle colonne di input nel buffer di output durante PreExecute e tali colonne vengono archiviate in una matrice. La matrice viene in seguito utilizzata durante ProcessInput per leggere i valori delle colonne nel buffer.

int[] cols;  
  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
  
    cols = new int[input.InputColumnCollection.Count];  
  
    for (int x = 0; x < input.InputColumnCollection.Count; x++)  
    {  
        cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);  
    }  
}  
Private cols As Integer()  
  
Public Overrides Sub PreExecute()  
  
    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
  
    ReDim cols(input.InputColumnCollection.Count)  
  
    For x As Integer = 0 To input.InputColumnCollection.Count  
  
        cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)  
    Next x  
  
End Sub  

Elaborazione di righe

Una volta individuate nel buffer, le colonne di input possono essere lette e scritte nell'origine dati esterna.

Mentre il componente di destinazione scrive le righe nell'origine dati esterna, è possibile aggiornare i contatori delle prestazioni "Righe lette" o "Byte BLOB letti" chiamando il metodo IncrementPipelinePerfCounter. Per altre informazioni, vedere i contatori delle prestazioni.

Nell'esempio di codice seguente è illustrato un componente che legge righe dal buffer fornito in ProcessInput. Gli indici delle colonne nel buffer sono stati individuati durante PreExecute nell'esempio di codice precedente.

public override void ProcessInput(int inputID, PipelineBuffer buffer)  
{  
        while (buffer.NextRow())  
        {  
            foreach (int col in cols)  
            {  
                if (!buffer.IsNull(col))  
                {  
                    //  TODO: Read the column data.  
                }  
            }  
        }  
}  
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
        While (buffer.NextRow())  
  
            For Each col As Integer In cols  
  
                If buffer.IsNull(col) = False Then  
  
                    '  TODO: Read the column data.  
                End If  
  
            Next col  
        End While  
End Sub  

Esempio

Nell'esempio seguente è illustrato un componente di destinazione semplice che utilizza una gestione connessione file per salvare dati binari dal flusso di dati in file. Nell'esempio non vengono dimostrati tutti i metodi e le funzionalità descritti nell'argomento. Vengono descritti i metodi importanti di cui ogni componente di destinazione personalizzato deve eseguire l'override, ma non è incluso codice per la convalida in fase di progettazione.

using System;  
using System.IO;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
  
namespace BlobDst  
{  
  [DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]  
  public class BlobDst : PipelineComponent  
  {  
    string m_DestDir;  
    int m_FileNameColumnIndex = -1;  
    int m_BlobColumnIndex = -1;  
  
    public override void ProvideComponentProperties()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection.New();  
      input.Name = "BLOB Extractor Destination Input";  
      input.HasSideEffects = true;  
  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();  
      conn.Name = "FileConnection";  
    }  
  
    public override void AcquireConnections(object transaction)  
    {  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];  
      m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);  
  
      if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')  
        m_DestDir += "\\";  
    }  
  
    public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)  
    {  
      IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);  
      IDTSCustomProperty100 custProp;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsFileName";  
      custProp.Value = (object)false;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsBLOB";  
      custProp.Value = (object)false;  
  
      return inputColumn;  
    }  
  
    public override void PreExecute()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection[0];  
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;  
      IDTSCustomProperty100 custProp;  
  
      foreach (IDTSInputColumn100 column in inputColumns)  
      {  
        custProp = column.CustomPropertyCollection["IsFileName"];  
        if ((bool)custProp.Value == true)  
        {  
          m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
  
        custProp = column.CustomPropertyCollection["IsBLOB"];  
        if ((bool)custProp.Value == true)  
        {  
          m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
      }  
    }  
  
    public override void ProcessInput(int inputID, PipelineBuffer buffer)  
    {  
      while (buffer.NextRow())  
      {  
        string strFileName = buffer.GetString(m_FileNameColumnIndex);  
        int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);  
        byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);  
  
        strFileName = TranslateFileName(strFileName);  
  
        // Make sure directory exists before creating file.  
        FileInfo fi = new FileInfo(strFileName);  
        if (!fi.Directory.Exists)  
          fi.Directory.Create();  
  
        // Write the data to the file.  
        FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);  
        fs.Write(blobData, 0, blobLength);  
        fs.Close();  
      }  
    }  
  
    private string TranslateFileName(string fileName)  
    {  
      if (fileName.Length > 2 && fileName[1] == ':')  
        return m_DestDir + fileName.Substring(3, fileName.Length - 3);  
      else  
        return m_DestDir + fileName;  
    }  
  }  
}  
Imports System   
Imports System.IO   
Imports Microsoft.SqlServer.Dts.Pipeline   
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper   
Namespace BlobDst   
  
 <DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _   
 Public Class BlobDst   
 Inherits PipelineComponent   
   Private m_DestDir As String   
   Private m_FileNameColumnIndex As Integer = -1   
   Private m_BlobColumnIndex As Integer = -1   
  
   Public  Overrides Sub ProvideComponentProperties()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New   
     input.Name = "BLOB Extractor Destination Input"   
     input.HasSideEffects = True   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New   
     conn.Name = "FileConnection"   
   End Sub   
  
   Public  Overrides Sub AcquireConnections(ByVal transaction As Object)   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)   
     m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)   
     If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then   
       m_DestDir += "\"   
     End If   
   End Sub   
  
   Public  Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100   
     Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)   
     Dim custProp As IDTSCustomProperty100   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsFileName"   
     custProp.Value = CType(False, Object)   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsBLOB"   
     custProp.Value = CType(False, Object)   
     Return inputColumn   
   End Function   
  
   Public  Overrides Sub PreExecute()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)   
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection   
     Dim custProp As IDTSCustomProperty100   
     For Each column As IDTSInputColumn100 In inputColumns   
       custProp = column.CustomPropertyCollection("IsFileName")   
       If CType(custProp.Value, Boolean) = True Then   
         m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
       custProp = column.CustomPropertyCollection("IsBLOB")   
       If CType(custProp.Value, Boolean) = True Then   
         m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
     Next   
   End Sub   
  
   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)   
     While buffer.NextRow   
       Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)   
       Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)   
       Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)   
       strFileName = TranslateFileName(strFileName)   
       Dim fi As FileInfo = New FileInfo(strFileName)   
       ' Make sure directory exists before creating file.  
       If Not fi.Directory.Exists Then   
         fi.Directory.Create   
       End If   
       ' Write the data to the file.  
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)   
       fs.Write(blobData, 0, blobLength)   
       fs.Close   
     End While   
   End Sub   
  
   Private Function TranslateFileName(ByVal fileName As String) As String   
     If fileName.Length > 2 AndAlso fileName(1) = ":"C Then   
       Return m_DestDir + fileName.Substring(3, fileName.Length - 3)   
     Else   
       Return m_DestDir + fileName   
     End If   
   End Function   
 End Class   
End Namespace  

Vedere anche

Sviluppo di un componente di origine personalizzato
Creazione di una destinazione con il componente script