Opracowywanie niestandardowych docelowego składnika
Microsoft SQL Server Integration Services gives developers the ability to write custom destination components that can connect to and store data in any custom data source.Składniki niestandardowe obiekt obiekt docelowy są przydatne, gdy trzeba połączyć się ze źródłami danych, które nie jest dostępny przy użyciu jednego z istniejących składników źródło, z Integration Services.
Składniki docelowe mają jeden lub więcej wejść i wyjść zero.W projekcie czas, ich tworzenie i konfigurowanie połączeń i odczytać kolumna metadane z zewnętrznego źródło danych.W czasie wykonywania ich połączyć ich zewnętrznego źródło danych i dodać wiersze, które zostały odebrane z składniki wlocie przepływ danych do zewnętrznego źródło danych.Jeśli zewnętrzne źródło danych przed jej wykonaniem składnika, obiekt docelowy składnika muszą również zapewnić, że typów danych kolumn, które odbiera składnika zgodne typy danych kolumn w źródle danych zewnętrznych.
W tej sekcji omówiono szczegóły dotyczące opracowania obiekt docelowy składników oraz przykłady kodu wyjaśnienie koncepcji.Przykładowe obiekt docelowy składników, zobacz Integration Services próbki na witrynie Codeplex.Dla ogólny przegląd składnik przepływ danych rozwoju, zobacz Opracowywanie danych niestandardowy składnik przepływu.
Czas projektowania
Implementowanie projektu -czas funkcje składnika docelowego pociąga za sobą określenie połączenie z zewnętrznym źródłem danych i sprawdzania, czy składnik został poprawnie skonfigurowany.Z definicji obiekt docelowy składnik ma jeden wejściowe i wyjściowe prawdopodobnie jeden błąd.
Tworzenie składnika
Składniki docelowe połączyć się z zewnętrznych źródeł danych za pomocą ConnectionManager obiektów zdefiniowanych w pakiet.obiekt docelowy Składnika wskazuje jej wymóg dla menedżer połączeń do SSIS Projektant i użytkowników składnika, dodając element RuntimeConnectionCollection kolekcja ComponentMetaData.Ta kolekcja służy dwóm celom: Po pierwsze, anonsuje potrzebę menedżer połączeń do SSIS Projektant; następnie, po wybranym przez użytkownika lub utworzone połączenie, Menedżer, posiada odwołanie do menedżer połączeń w pakiet , jest używany przez składnik.Po IDTSRuntimeConnection100 jest dodawane do kolekcja, Zaawansowanego edytora Wyświetla Właściwości połączenia karcie monit, aby wybrać lub utworzyć połączenie w pakiet do użytku przez składnik.
Poniższy przykład kodu pokazuje implementację ProvideComponentProperties dodaje dane wejściowe, a następnie dodaje IDTSRuntimeConnection100 obiektu, aby RuntimeConnectionCollection.
using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]
public class DestinationComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.net";
}
}
}
Imports System
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
Namespace Microsoft.Samples.SqlServer.Dts
<DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _
Public Class DestinationComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Reset the component.
Me.RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
input.Name = "Input"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.net"
End Sub
End Class
End Namespace
Podłączanie do zewnętrznego źródła danych
Po dodaniu połączenia do RuntimeConnectionCollection, można zastąpić AcquireConnections metoda, aby ustanowić połączenie źródło danych zewnętrznych.Ta metoda jest wywoływana w projekt czas i uruchom czas.Składnik musi ustanowić połączenia menedżer połączeń, określony przez uruchomienie -czas połączenia, a następnie do zewnętrznego źródło danych.Po ustanowieniu składnik powinien wewnętrznie buforować połączenia, a następnie zwolnij go po ReleaseConnections jest wywoływana.Deweloperzy zastąpić tę metoda i Zwolnij połączenie ustanowione przez składnik podczas AcquireConnections.Obie te metody ReleaseConnections i AcquireConnections, są nazywane w projekt czas i uruchom czas.
Poniższy przykład kodu pokazuje składnik, który łączy się z obiektów ADO.Połączenie SIECIOWE w AcquireConnections metoda, a następnie zamyka połączenie ReleaseConnections.
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then
Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Sprawdzanie poprawności składnika
Deweloperzy składnika docelowego należy przeprowadzić sprawdzania poprawności, zgodnie z opisem w Poprawności składnika.Ponadto powinny one sprawdzić, ten typ danych właściwości kolumnawejściowych zdefiniowanych w składniku s kolumna dopasowanie kolekcji kolumnas u źródło danych zewnętrznych.W czasie sprawdzania wprowadzania kolumn wobec zewnętrznego źródło danych może być niemożliwe lub niepożądanych, gdy składnik lub SSIS Projektant jest rozłączony lub przesłania do serwera nie są akceptowane.W takich sytuacjach kolumna kolekcja kolumn wprowadzania nadal mogą być sprawdzone za pomocą ExternalMetadataColumnCollection obiektu wejściowego.
Ta kolekcja istnieje obiektów i wyjścia i musi zostać wypełnione przez dewelopera składnika z kolumn na danych zewnętrznych źródło.To kolekcja może być używana do sprawdzania poprawności kolumn danych wejściowych po SSIS Designer jest w trybie offline, gdy składnik jest odłączony lub ValidateExternalMetadata właściwość jest false.
Następujący kod dodaje kolumna metadane zewnętrznych na podstawie istniejącej kolumna danych wejściowych.
private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();
externalColumn.Name = inputColumn.Name;
externalColumn.Precision = inputColumn.Precision;
externalColumn.Length = inputColumn.Length;
externalColumn.DataType = inputColumn.DataType;
externalColumn.Scale = inputColumn.Scale;
// Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()
externalColumn.Name = inputColumn.Name
externalColumn.Precision = inputColumn.Precision
externalColumn.Length = inputColumn.Length
externalColumn.DataType = inputColumn.DataType
externalColumn.Scale = inputColumn.Scale
' Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Czas jednostkowy
W czasie wykonywania składnika docelowego odbiera wywołanie ProcessInput metodę czas pełne PipelineBuffer jest dostępny od składnika nadrzędnego.Metoda ta jest wywoływana wielokrotnie, dopóki nie są dostępne żadne bufory więcej i EndOfRowset właściwość jest true.Podczas tej metoda obiekt docelowy składniki odczytać kolumn i wierszy w buforze i dodać je do zewnętrznego źródło danych.
Lokalizowanie kolumn w buforze
Bufor wejściowy składnika zawiera wszystkie kolumna zdefiniowane w kolekcji kolumna Wyjście składników wlocie ze składnika w przepływ danych.Na przykład, jeśli składnik źródło zawiera trzy kolumna w swojej produkcji i dodaje następny składnik kolumna wyprowadzenia dodatkowe, bufor dostarczane obiekt docelowy składnik zawiera cztery kolumna, nawet jeśli obiekt docelowy składnika będzie zapisywać tylko dwie kolumna.
Kolejność kolumn w buforze wejściowym nie jest zdefiniowany przez indeks kolumna kolekcja wprowadzania kolumna obiekt docelowy składnika.Kolumny można niezawodnie umieszczone w wierszu bufora tylko przy użyciu FindColumnByLineageID metoda BufferManager.Ta metoda lokalizuje kolumna, która ma identyfikator rodowodu określony w określony bufor i zwraca jego lokalizacji w wierszu.Indeksy wprowadzania kolumn zwykle znajdują się podczas PreExecute metoda i buforowane przez deweloperów do wykorzystania później podczas ProcessInput.
Poniższy przykład kodu znajdzie lokalizację wprowadzania kolumn buforu podczas PreExecute i przechowuje je w tablicy.Tablica jest następnie użyty podczas ProcessInput do odczytania wartości kolumny w buforze.
int[] cols;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
cols = new int[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);
}
}
Private cols As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim cols(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)
Next x
End Sub
Przetwarzanie wierszy
Po wprowadzania kolumn znajdowała się w buforze, może być odczytywana i zapisywana do zewnętrznego źródło danych.
Podczas obiekt docelowy składnik zapisuje wierszy zewnętrznego źródło danych, należy zaktualizować "odczytu wierszy" "obiekt BLOB bajtów Odczyt" liczniki wydajności przez wywołanie IncrementPipelinePerfCounter metoda.Aby uzyskać więcej informacji, zobacz Monitorowanie wydajności aparatu przepływu danych.
W poniższym przykładzie przedstawiono składnik, który odczytuje wiersze z buforu w ProcessInput.Zlokalizowano indeksów kolumn w buforze podczas PreExecute w poprzednim przykładzie kodu.
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
foreach (int col in cols)
{
if (!buffer.IsNull(col))
{
// TODO: Read the column data.
}
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While (buffer.NextRow())
For Each col As Integer In cols
If buffer.IsNull(col) = False Then
' TODO: Read the column data.
End If
Next col
End While
End Sub
Przykład
Poniżej pokazano prostą obiekt docelowy składnika, który używa menedżer połączeń pliku do zapisywania danych binarnych z przepływ danych w plikach.W tym przykładzie nie wykazują metod i funkcje omówione w tym temacie.W ten sposób pokazujesz ważnych metod należy zastąpić każdego składnika niestandardowego docelowego, ale nie zawiera kodu dla projektu -czas sprawdzania poprawności.Dla bardziej kompletny próbki obiekt docelowy składników, zobacz Readme_DatasetDestination Component Sample.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace BlobDst
{
[DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]
public class BlobDst : PipelineComponent
{
string m_DestDir;
int m_FileNameColumnIndex = -1;
int m_BlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "BLOB Extractor Destination Input";
input.HasSideEffects = true;
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);
if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')
m_DestDir += "\\";
}
public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)
{
IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);
IDTSCustomProperty100 custProp;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsFileName";
custProp.Value = (object)false;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsBLOB";
custProp.Value = (object)false;
return inputColumn;
}
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;
IDTSCustomProperty100 custProp;
foreach (IDTSInputColumn100 column in inputColumns)
{
custProp = column.CustomPropertyCollection["IsFileName"];
if ((bool)custProp.Value == true)
{
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
custProp = column.CustomPropertyCollection["IsBLOB"];
if ((bool)custProp.Value == true)
{
m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
string strFileName = buffer.GetString(m_FileNameColumnIndex);
int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);
byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);
strFileName = TranslateFileName(strFileName);
// Make sure directory exists before creating file.
FileInfo fi = new FileInfo(strFileName);
if (!fi.Directory.Exists)
fi.Directory.Create();
// Write the data to the file.
FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);
fs.Write(blobData, 0, blobLength);
fs.Close();
}
}
private string TranslateFileName(string fileName)
{
if (fileName.Length > 2 && fileName[1] == ':')
return m_DestDir + fileName.Substring(3, fileName.Length - 3);
else
return m_DestDir + fileName;
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Namespace BlobDst
<DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _
Public Class BlobDst
Inherits PipelineComponent
Private m_DestDir As String
Private m_FileNameColumnIndex As Integer = -1
Private m_BlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
input.Name = "BLOB Extractor Destination Input"
input.HasSideEffects = True
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)
If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then
m_DestDir += "\"
End If
End Sub
Public Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100
Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)
Dim custProp As IDTSCustomProperty100
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsFileName"
custProp.Value = CType(False, Object)
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsBLOB"
custProp.Value = CType(False, Object)
Return inputColumn
End Function
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection
Dim custProp As IDTSCustomProperty100
For Each column As IDTSInputColumn100 In inputColumns
custProp = column.CustomPropertyCollection("IsFileName")
If CType(custProp.Value, Boolean) = True Then
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
custProp = column.CustomPropertyCollection("IsBLOB")
If CType(custProp.Value, Boolean) = True Then
m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
Next
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While buffer.NextRow
Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)
Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)
Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)
strFileName = TranslateFileName(strFileName)
Dim fi As FileInfo = New FileInfo(strFileName)
' Make sure directory exists before creating file.
If Not fi.Directory.Exists Then
fi.Directory.Create
End If
' Write the data to the file.
Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)
fs.Write(blobData, 0, blobLength)
fs.Close
End While
End Sub
Private Function TranslateFileName(ByVal fileName As String) As String
If fileName.Length > 2 AndAlso fileName(1) = ":"C Then
Return m_DestDir + fileName.Substring(3, fileName.Length - 3)
Else
Return m_DestDir + fileName
End If
End Function
End Class
End Namespace
|