Share via


Mengembangkan Komponen Tujuan Kustom

Berlaku untuk: Integration Runtime SSIS SQL Server di Azure Data Factory

Microsoft SQL Server Integration Services memberi pengembang kemampuan untuk menulis komponen tujuan kustom yang dapat terhubung ke dan menyimpan data di sumber data kustom apa pun. Komponen tujuan kustom berguna saat Anda perlu menyambungkan ke sumber data yang tidak dapat diakses dengan menggunakan salah satu komponen sumber yang ada yang disertakan dengan Layanan Integrasi.

Komponen tujuan memiliki satu atau beberapa input dan output nol. Pada waktu desain, mereka membuat dan mengonfigurasi koneksi dan membaca metadata kolom dari sumber data eksternal. Selama eksekusi, mereka terhubung ke sumber data eksternal mereka dan menambahkan baris yang diterima dari komponen upstream dalam aliran data ke sumber data eksternal. Jika sumber data eksternal ada sebelum eksekusi komponen, komponen tujuan juga harus memastikan bahwa jenis data kolom yang diterima komponen cocok dengan jenis data kolom di sumber data eksternal.

Bagian ini membahas detail cara mengembangkan komponen tujuan, dan menyediakan contoh kode untuk mengklarifikasi konsep penting. Untuk gambaran umum pengembangan komponen aliran data, lihat Mengembangkan Komponen Aliran Data Kustom.

Waktu Desain

Menerapkan fungsionalitas waktu desain komponen tujuan melibatkan penentuan koneksi ke sumber data eksternal dan memvalidasi bahwa komponen telah dikonfigurasi dengan benar. Menurut definisi, komponen tujuan memiliki satu input dan mungkin satu output kesalahan.

Membuat Komponen

Komponen tujuan tersambung ke sumber data eksternal dengan menggunakan ConnectionManager objek yang ditentukan dalam paket. Komponen tujuan menunjukkan persyaratannya untuk manajer koneksi ke SSIS Designer, dan kepada pengguna komponen, dengan menambahkan elemen ke RuntimeConnectionCollection koleksi ComponentMetaData. Koleksi ini melayani dua tujuan: pertama, ia mengiklankan kebutuhan manajer koneksi ke SSIS Designer; kemudian, setelah pengguna memilih atau membuat pengelola koneksi, ia memegang referensi ke manajer koneksi dalam paket yang sedang digunakan oleh komponen. IDTSRuntimeConnection100 Saat ditambahkan ke koleksi, Editor Lanjutan menampilkan tab Properti Koneksi, untuk meminta pengguna memilih atau membuat koneksi dalam paket untuk digunakan oleh komponen.

Sampel kode berikut menunjukkan implementasi ProvideComponentProperties yang menambahkan input, lalu menambahkan IDTSRuntimeConnection100 objek ke RuntimeConnectionCollection.

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]  
    public class DestinationComponent : PipelineComponent   
    {  
        public override void ProvideComponentProperties()  
        {  
            // Reset the component.  
            base.RemoveAllInputsOutputsAndCustomProperties();  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();  
  
            IDTSInput100 input = ComponentMetaData.InputCollection.New();  
            input.Name = "Input";  
  
            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();  
            connection.Name = "ADO.net";  
        }  
    }  
}  
Imports System  
Imports System.Data  
Imports System.Data.SqlClient  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  
  
Namespace Microsoft.Samples.SqlServer.Dts  
  
    <DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _  
    Public Class DestinationComponent  
        Inherits PipelineComponent  
  
        Public Overrides Sub ProvideComponentProperties()  
  
            ' Reset the component.  
            Me.RemoveAllInputsOutputsAndCustomProperties()  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll()  
  
            Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()  
            input.Name = "Input"  
  
            Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()  
            connection.Name = "ADO.net"  
  
        End Sub  
    End Class  
End Namespace  

Menyambungkan ke Sumber Data Eksternal

Setelah koneksi ditambahkan ke RuntimeConnectionCollection, Anda mengambil AcquireConnections alih metode untuk membuat koneksi ke sumber data eksternal. Metode ini dipanggil pada waktu desain dan pada waktu proses. Komponen harus membuat koneksi ke manajer koneksi yang ditentukan oleh koneksi run-time, dan kemudian, ke sumber data eksternal. Setelah dibuat, komponen harus menyimpan koneksi secara internal dan melepaskannya saat ReleaseConnections dipanggil. Pengembang mengambil alih metode ini, dan merilis koneksi yang dibuat oleh komponen selama AcquireConnections. Kedua metode ini, ReleaseConnections dan AcquireConnections, dipanggil pada waktu desain dan pada durasi.

Contoh kode berikut menunjukkan komponen yang tersambung ke koneksi ADO.NET dalam AcquireConnections metode , lalu menutup koneksi di ReleaseConnections.

using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
private SqlConnection sqlConnection;  
  
public override void AcquireConnections(object transaction)  
{  
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)  
    {  
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);  
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;  
  
        if (cmado == null)  
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");  
  
        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;  
        sqlConnection.Open();  
    }  
}  
  
public override void ReleaseConnections()  
{  
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)  
        sqlConnection.Close();  
}  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  
  
Private sqlConnection As SqlConnection  
  
Public Overrides Sub AcquireConnections(ByVal transaction As Object)  
  
    If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then  
  
        Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)  
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)  
  
        If IsNothing(cmado) Then  
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")  
        End If  
  
        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)  
        sqlConnection.Open()  
  
    End If  
End Sub  
  
Public Overrides Sub ReleaseConnections()  
  
    If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then  
        sqlConnection.Close()  
    End If  
  
End Sub  

Memvalidasi Komponen

Pengembang komponen tujuan harus melakukan validasi seperti yang dijelaskan dalam Validasi Komponen. Selain itu, mereka harus memverifikasi bahwa properti jenis data kolom yang ditentukan dalam kumpulan kolom input komponen cocok dengan kolom di sumber data eksternal. Terkadang, memverifikasi kolom input terhadap sumber data eksternal bisa tidak mungkin atau tidak diinginkan, seperti ketika komponen atau SSIS Designer dalam keadaan terputus, atau ketika perjalanan pulang pergi ke server tidak dapat diterima. Dalam situasi ini, kolom dalam koleksi kolom input masih dapat divalidasi dengan menggunakan ExternalMetadataColumnCollection objek input.

Koleksi ini ada pada objek input dan output dan harus diisi oleh pengembang komponen dari kolom di sumber data eksternal. Koleksi ini dapat digunakan untuk memvalidasi kolom input saat Desainer SSIS offline, ketika komponen terputus, atau ketika ValidateExternalMetadata properti salah.

Kode sampel berikut menambahkan kolom metadata eksternal berdasarkan kolom input yang sudah ada.

private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)  
{  
    // Set the properties of the external metadata column.  
    IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();  
    externalColumn.Name = inputColumn.Name;  
    externalColumn.Precision = inputColumn.Precision;  
    externalColumn.Length = inputColumn.Length;  
    externalColumn.DataType = inputColumn.DataType;  
    externalColumn.Scale = inputColumn.Scale;  
  
    // Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID;  
}  
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)  
  
    ' Set the properties of the external metadata column.  
    Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()  
    externalColumn.Name = inputColumn.Name  
    externalColumn.Precision = inputColumn.Precision  
    externalColumn.Length = inputColumn.Length  
    externalColumn.DataType = inputColumn.DataType  
    externalColumn.Scale = inputColumn.Scale  
  
    ' Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID  
  
End Sub  

Durasi

Selama eksekusi, komponen tujuan menerima panggilan ke ProcessInput metode setiap kali penuh PipelineBuffer tersedia dari komponen upstream. Metode ini dipanggil berulang kali sampai tidak ada lagi buffer yang tersedia dan EndOfRowsetproperti benar. Selama metode ini, komponen tujuan membaca kolom dan baris di buffer, dan menambahkannya ke sumber data eksternal.

Menemukan Kolom di Buffer

Buffer input untuk komponen berisi semua kolom yang ditentukan dalam kumpulan kolom output komponen upstream dari komponen dalam aliran data. Misalnya, jika komponen sumber menyediakan tiga kolom dalam outputnya, dan komponen berikutnya menambahkan kolom output tambahan, buffer yang disediakan ke komponen tujuan berisi empat kolom, bahkan jika komponen tujuan hanya akan menulis dua kolom.

Urutan kolom dalam buffer input tidak ditentukan oleh indeks kolom dalam kumpulan kolom input komponen tujuan. Kolom dapat dengan andal terletak di baris buffer hanya dengan menggunakan FindColumnByLineageID metode .BufferManager Metode ini menemukan kolom yang memiliki ID silsilah yang ditentukan di buffer yang ditentukan, dan mengembalikan lokasinya dalam baris. Indeks kolom input biasanya terletak selama PreExecute metode , dan di-cache oleh pengembang untuk digunakan nanti selama ProcessInput.

Contoh kode berikut menemukan lokasi kolom input di buffer selama PreExecute dan menyimpannya dalam array. Array kemudian digunakan selama ProcessInput untuk membaca nilai kolom di buffer.

int[] cols;  
  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
  
    cols = new int[input.InputColumnCollection.Count];  
  
    for (int x = 0; x < input.InputColumnCollection.Count; x++)  
    {  
        cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);  
    }  
}  
Private cols As Integer()  
  
Public Overrides Sub PreExecute()  
  
    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
  
    ReDim cols(input.InputColumnCollection.Count)  
  
    For x As Integer = 0 To input.InputColumnCollection.Count  
  
        cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)  
    Next x  
  
End Sub  

Memproses Baris

Setelah kolom input berada di buffer, kolom tersebut dapat dibaca dan ditulis ke sumber data eksternal.

Saat komponen tujuan menulis baris ke sumber data eksternal, Anda mungkin ingin memperbarui penghitung kinerja "Baris dibaca" atau "byte BLOB dibaca" dengan memanggil IncrementPipelinePerfCounter metode . Untuk informasi selengkapnya, lihat Penghitung Performa.

Contoh berikut menunjukkan komponen yang membaca baris dari buffer yang disediakan di ProcessInput. Indeks kolom dalam buffer terletak selama PreExecute dalam contoh kode sebelumnya.

public override void ProcessInput(int inputID, PipelineBuffer buffer)  
{  
        while (buffer.NextRow())  
        {  
            foreach (int col in cols)  
            {  
                if (!buffer.IsNull(col))  
                {  
                    //  TODO: Read the column data.  
                }  
            }  
        }  
}  
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
        While (buffer.NextRow())  
  
            For Each col As Integer In cols  
  
                If buffer.IsNull(col) = False Then  
  
                    '  TODO: Read the column data.  
                End If  
  
            Next col  
        End While  
End Sub  

Sampel

Contoh berikut menunjukkan komponen tujuan sederhana yang menggunakan manajer koneksi File untuk menyimpan data biner dari aliran data ke dalam file. Sampel ini tidak menunjukkan semua metode dan fungsionalitas yang dibahas dalam topik ini. Ini menunjukkan metode penting yang harus diambil alih oleh setiap komponen tujuan kustom, tetapi tidak berisi kode untuk validasi waktu desain.

using System;  
using System.IO;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
  
namespace BlobDst  
{  
  [DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]  
  public class BlobDst : PipelineComponent  
  {  
    string m_DestDir;  
    int m_FileNameColumnIndex = -1;  
    int m_BlobColumnIndex = -1;  
  
    public override void ProvideComponentProperties()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection.New();  
      input.Name = "BLOB Extractor Destination Input";  
      input.HasSideEffects = true;  
  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();  
      conn.Name = "FileConnection";  
    }  
  
    public override void AcquireConnections(object transaction)  
    {  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];  
      m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);  
  
      if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')  
        m_DestDir += "\\";  
    }  
  
    public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)  
    {  
      IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);  
      IDTSCustomProperty100 custProp;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsFileName";  
      custProp.Value = (object)false;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsBLOB";  
      custProp.Value = (object)false;  
  
      return inputColumn;  
    }  
  
    public override void PreExecute()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection[0];  
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;  
      IDTSCustomProperty100 custProp;  
  
      foreach (IDTSInputColumn100 column in inputColumns)  
      {  
        custProp = column.CustomPropertyCollection["IsFileName"];  
        if ((bool)custProp.Value == true)  
        {  
          m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
  
        custProp = column.CustomPropertyCollection["IsBLOB"];  
        if ((bool)custProp.Value == true)  
        {  
          m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
      }  
    }  
  
    public override void ProcessInput(int inputID, PipelineBuffer buffer)  
    {  
      while (buffer.NextRow())  
      {  
        string strFileName = buffer.GetString(m_FileNameColumnIndex);  
        int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);  
        byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);  
  
        strFileName = TranslateFileName(strFileName);  
  
        // Make sure directory exists before creating file.  
        FileInfo fi = new FileInfo(strFileName);  
        if (!fi.Directory.Exists)  
          fi.Directory.Create();  
  
        // Write the data to the file.  
        FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);  
        fs.Write(blobData, 0, blobLength);  
        fs.Close();  
      }  
    }  
  
    private string TranslateFileName(string fileName)  
    {  
      if (fileName.Length > 2 && fileName[1] == ':')  
        return m_DestDir + fileName.Substring(3, fileName.Length - 3);  
      else  
        return m_DestDir + fileName;  
    }  
  }  
}  
Imports System   
Imports System.IO   
Imports Microsoft.SqlServer.Dts.Pipeline   
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper   
Namespace BlobDst   
  
 <DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _   
 Public Class BlobDst   
 Inherits PipelineComponent   
   Private m_DestDir As String   
   Private m_FileNameColumnIndex As Integer = -1   
   Private m_BlobColumnIndex As Integer = -1   
  
   Public  Overrides Sub ProvideComponentProperties()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New   
     input.Name = "BLOB Extractor Destination Input"   
     input.HasSideEffects = True   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New   
     conn.Name = "FileConnection"   
   End Sub   
  
   Public  Overrides Sub AcquireConnections(ByVal transaction As Object)   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)   
     m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)   
     If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then   
       m_DestDir += "\"   
     End If   
   End Sub   
  
   Public  Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100   
     Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)   
     Dim custProp As IDTSCustomProperty100   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsFileName"   
     custProp.Value = CType(False, Object)   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsBLOB"   
     custProp.Value = CType(False, Object)   
     Return inputColumn   
   End Function   
  
   Public  Overrides Sub PreExecute()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)   
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection   
     Dim custProp As IDTSCustomProperty100   
     For Each column As IDTSInputColumn100 In inputColumns   
       custProp = column.CustomPropertyCollection("IsFileName")   
       If CType(custProp.Value, Boolean) = True Then   
         m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
       custProp = column.CustomPropertyCollection("IsBLOB")   
       If CType(custProp.Value, Boolean) = True Then   
         m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
     Next   
   End Sub   
  
   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)   
     While buffer.NextRow   
       Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)   
       Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)   
       Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)   
       strFileName = TranslateFileName(strFileName)   
       Dim fi As FileInfo = New FileInfo(strFileName)   
       ' Make sure directory exists before creating file.  
       If Not fi.Directory.Exists Then   
         fi.Directory.Create   
       End If   
       ' Write the data to the file.  
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)   
       fs.Write(blobData, 0, blobLength)   
       fs.Close   
     End While   
   End Sub   
  
   Private Function TranslateFileName(ByVal fileName As String) As String   
     If fileName.Length > 2 AndAlso fileName(1) = ":"C Then   
       Return m_DestDir + fileName.Substring(3, fileName.Length - 3)   
     Else   
       Return m_DestDir + fileName   
     End If   
   End Function   
 End Class   
End Namespace  

Lihat juga

Mengembangkan Komponen Sumber Kustom
Membuat Tujuan dengan Komponen Skrip