Rozwijanie niestandardowe obiekt obiekt docelowy składnika
Microsoft SQL Server Integration Services Programiści zapis składniki niestandardowe obiekt obiekt docelowy, które można połączyć się i dane są przechowywane w dowolnego urządzenie źródłowe danych niestandardowych.Składniki niestandardowe obiekt obiekt docelowy są przydatne w przypadku, gdy trzeba połączyć się ze źródłami danych, które są niedostępne przy użyciu jednej z istniejących składników urządzenie źródłowe dołączone Integration Services.
obiekt docelowy składniki mają jednego lub kilku wejść i wyjść zero.W projekcie czas, ich tworzenia i konfigurowania połączeń i odczytać metadane kolumna z zewnętrznego urządzenie źródłowe danych.Podczas wykonywania ich łączenia się z ich zewnętrznego urządzenie źródłowe danych i Dodaj wiersze, które zostały odebrane z składniki od przepływ danych do zewnętrznego urządzenie źródłowe danych.Jeśli zewnętrzne dane urządzenie źródłowe istnieje przed wykonaniem składnika, składnika docelowego musi również zapewnić, że typy danych kolumn, który odbiera składnika zgodne typy danych kolumn na danych zewnętrznych urządzenie źródłowe.
W tej części opisano szczegóły dotyczące opracowania obiekt docelowy składniki oraz przykłady kodu wyjaśnienie ważne pojęcia.Przykładowy obiekt docelowy składnika, zobacz temat DatasetDestination Component Sample. Aby zapoznać się z ogólnym omówieniem rozwoju składnika przepływ danych zobacz Rozwijanie niestandardowy składnik przepływ danych.
Czas projektowania
Implementowania funkcji czas projektowania składnika docelowego pociąga za sobą Określanie połączenia danych zewnętrznych urządzenie źródłowe i sprawdzenie, czy składnik został poprawnie skonfigurowany.Z definicji składnika obiekt docelowy ma jedno wejście i ewentualnie jeden błąd w danych wyjściowych.
Tworzenie składnika
obiekt docelowy składników nawiązywania połączenia z zewnętrznymi źródłami danych przy użyciu ConnectionManager obiekty zdefiniowane w pakiecie. obiekt docelowy Składnika oznacza jej zapotrzebowania dla menedżer połączeń, który ma być SSIS Projektant, użytkownikom składnika przez dodanie elementu RuntimeConnectionCollection() Kolekcja ComponentMetaData(). Ta kolekcja obsługuje dwóch celach: Po pierwsze to anonsuje potrzebę menedżer połączeń, który ma być SSIS Projektant; następnie, po wybranym przez użytkownika lub utworzone połączenie, Menedżer, posiada odwołanie do menedżer połączeń do pakiet, który jest używany przez składnik. Gdy IDTSRuntimeConnection100 zostanie dodany do kolekcja, Zaawansowany edytor Wyświetla Właściwości połączenia karcie monit, aby wybrać lub utworzyć połączenie w pakiecie do użytku przez składnik.
Poniższy przykład kodu pokazuje implementację ProvideComponentProperties() dodaje dane wejściowe, a następnie dodaje IDTSRuntimeConnection100 obiekt do RuntimeConnectionCollection().
using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]
public class DestinationComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.net";
}
}
}
Imports System
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
Namespace Microsoft.Samples.SqlServer.Dts
<DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _
Public Class DestinationComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Reset the component.
Me.RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
input.Name = "Input"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.net"
End Sub
End Class
End Namespace
Nawiązywanie połączenia danych zewnętrznych urządzenie źródłowe
Po dodaniu do połączenia RuntimeConnectionCollection(), można zastąpić AcquireConnections(Object) Metoda nawiązanie połączenia z danymi zewnętrznymi urządzenie źródłowe. Ta metoda jest wywoływana w projekcie czas i przy uruchomieniu czas.Składnik musi ustanowić połączenia do Menedżera połączeń, określony przez uruchomienie-czas połączenia, a następnie do zewnętrznego źródła danych.Po ustanowieniu składnik powinien wewnętrznie buforować połączenia, a następnie zwolnij go, gdy ReleaseConnections() nazywa się. Deweloperzy mogą zastąpić tę metoda, a następnie zwolnij połączenie ustanowione przez składnik podczas AcquireConnections(Object). Obie te metody ReleaseConnections() i AcquireConnections(Object), są nazywane w czasie projektowania i w czasie wykonywania.
Poniższy przykład kodu pokazuje składnik, który łączy się z połączeniem ADO.NET w AcquireConnections(Object) Metoda, a następnie zamyka połączenie ReleaseConnections().
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then
Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Sprawdzanie poprawności składnika
obiekt docelowy składnik deweloperzy powinni sprawdzenia poprawności zgodnie z opisem w Sprawdzanie poprawności składnika.Ponadto, powinny one Sprawdź, czy właściwości typów danych kolumn zdefiniowane w kolekcja kolumn wejściowe składnika dopasowanie kolumn na danych zewnętrznych urządzenie źródłowe.W czasie sprawdzania danych wejściowych kolumn dla zewnętrznych danych urządzenie źródłowe może być niemożliwe lub niepożądane, na przykład gdy składnik lub SSIS Projektant jest w stanie rozłączenia lub podczas podróży round na serwerze nie są dopuszczalne. W takich sytuacjach kolumna w kolekcja kolumn danych wejściowych nadal mogą być sprawdzone za pomocą ExternalMetadataColumnCollection() wejściowy obiektu.
Ta kolekcja istnieje zarówno wejściowe i wyjściowe obiektów i musi zostać wypełniony przez dewelopera składnika z kolumn w źródle danych zewnętrznych.Ta kolekcja może być używana do sprawdzania poprawności kolumn danych wejściowych podczas SSIS Projektant jest w trybie offline, gdy składnik jest odłączony lub ValidateExternalMetadata() Właściwość jest false.
Następujący kod dodaje kolumna zewnętrzną metadane oparte na istniejącej kolumna danych wejściowych.
private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();
externalColumn.Name = inputColumn.Name;
externalColumn.Precision = inputColumn.Precision;
externalColumn.Length = inputColumn.Length;
externalColumn.DataType = inputColumn.DataType;
externalColumn.Scale = inputColumn.Scale;
// Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()
externalColumn.Name = inputColumn.Name
externalColumn.Precision = inputColumn.Precision
externalColumn.Length = inputColumn.Length
externalColumn.DataType = inputColumn.DataType
externalColumn.Scale = inputColumn.Scale
' Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Czas uruchomienia
Podczas wykonywania, składnika docelowego odbiera wywołanie ProcessInput(Int32, PipelineBuffer) Metoda każdego czas pełnego PipelineBuffer jest dostępny od składnika nadrzędnego. Ta metoda jest nazywana wielokrotnie aż Brak buforów więcej oraz EndOfRowset() Właściwość jest true. W tej metodzie obiekt docelowy składników odczytać kolumn i wierszy w buforze i dodać je do zewnętrznego urządzenie źródłowe danych.
Lokalizowanie kolumny w buforze
Bufor wejściowy dla składnika zawiera wszystkie kolumna zdefiniowane w kolekcji kolumn wyjściowych składników instalacji od składnika w przepływ danych.Na przykład, jeśli składnik urządzenie źródłowe zawiera trzy kolumny w jej wynikach, a kolumna wyprowadzenia dodatkowe dodaje kolejny składnik, dostarczony bufor do obiekt docelowy składnik zawiera cztery kolumny, nawet jeśli obiekt docelowy składnika będzie zapisywać tylko dwie kolumny.
Kolejność kolumn w buforze wejściowym nie jest zdefiniowany przez indeks kolumna w kolekcja kolumn danych wejściowych obiekt docelowy składnika.Kolumny mogą znajdować niezawodnie się w wierszu bufora tylko przy użyciu FindColumnByLineageID(Int32, Int32) Metoda BufferManager(). Ta metoda lokalizuje kolumna, która ma identyfikator rodowodu określony w określonym buforze i zwraca jego lokalizacji w wierszu.Indeksy kolumn danych wejściowych są zwykle umieszczone podczas PreExecute() metoda i buforowane przez deweloperów do wykorzystania później podczas ProcessInput(Int32, PipelineBuffer).
W poniższym przykładzie kodu znajdzie lokalizację kolumn danych wejściowych w buforze ciągu PreExecute() i przechowuje je w tablicy. Tablica jest następnie używana w czasie ProcessInput(Int32, PipelineBuffer) Aby odczytać wartości z kolumn w buforze.
int[] cols;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
cols = new int[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);
}
}
Private cols As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim cols(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)
Next x
End Sub
Przetwarzanie wierszy
Po wejściowy kolumn mają już znajduje się w buforze, mogą być odczytać i zapisywane do zewnętrznego urządzenie źródłowe danych.
Podczas gdy obiekt docelowy składnik zapisuje wierszy do zewnętrznego urządzenie źródłowe danych, może zajść potrzeba zaktualizowania "Wiersze odczytu" lub "BLOB Bajty odczytane" liczników wydajności poprzez wywołanie IncrementPipelinePerfCounter(UInt32, UInt32) Metoda. Aby uzyskać więcej informacji zobaczMonitoring the Performance of the Data Flow Engine.
W poniższym przykładzie przedstawiono składnik, który odczytuje wierszy w buforze ProcessInput(Int32, PipelineBuffer). Zlokalizowano indeksów kolumn w buforze ciągu PreExecute() w powyższym przykładzie kodu.
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
foreach (int col in cols)
{
if (!buffer.IsNull(col))
{
// TODO: Read the column data.
}
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While (buffer.NextRow())
For Each col As Integer In cols
If buffer.IsNull(col) = False Then
' TODO: Read the column data.
End If
Next col
End While
End Sub
Przykład
Następujący przykład przedstawia prosty obiekt docelowy składnika, który będzie używał Menedżera połączeń do pliku, aby zapisać dane binarne ze strumienia danych w plikach.W tym przykładzie nie wykazują, metody i funkcje omówione w tym temacie.Pokazano, ważne metody każdy składnik niestandardowe obiekt docelowy musi zastąpić, ale nie zawiera kodu do projektu-czas sprawdzania poprawności.Dokładniejszy składnika docelowego przykładowe Zobacz DatasetDestination Component Sample.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace BlobDst
{
[DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]
public class BlobDst : PipelineComponent
{
string m_DestDir;
int m_FileNameColumnIndex = -1;
int m_BlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "BLOB Extractor Destination Input";
input.HasSideEffects = true;
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);
if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')
m_DestDir += "\\";
}
public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)
{
IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);
IDTSCustomProperty100 custProp;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsFileName";
custProp.Value = (object)false;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsBLOB";
custProp.Value = (object)false;
return inputColumn;
}
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;
IDTSCustomProperty100 custProp;
foreach (IDTSInputColumn100 column in inputColumns)
{
custProp = column.CustomPropertyCollection["IsFileName"];
if ((bool)custProp.Value == true)
{
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
custProp = column.CustomPropertyCollection["IsBLOB"];
if ((bool)custProp.Value == true)
{
m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
string strFileName = buffer.GetString(m_FileNameColumnIndex);
int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);
byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);
strFileName = TranslateFileName(strFileName);
// Make sure directory exists before creating file.
FileInfo fi = new FileInfo(strFileName);
if (!fi.Directory.Exists)
fi.Directory.Create();
// Write the data to the file.
FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);
fs.Write(blobData, 0, blobLength);
fs.Close();
}
}
private string TranslateFileName(string fileName)
{
if (fileName.Length > 2 && fileName[1] == ':')
return m_DestDir + fileName.Substring(3, fileName.Length - 3);
else
return m_DestDir + fileName;
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Namespace BlobDst
<DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _
Public Class BlobDst
Inherits PipelineComponent
Private m_DestDir As String
Private m_FileNameColumnIndex As Integer = -1
Private m_BlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
input.Name = "BLOB Extractor Destination Input"
input.HasSideEffects = True
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)
If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then
m_DestDir += "\"
End If
End Sub
Public Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100
Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)
Dim custProp As IDTSCustomProperty100
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsFileName"
custProp.Value = CType(False, Object)
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsBLOB"
custProp.Value = CType(False, Object)
Return inputColumn
End Function
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection
Dim custProp As IDTSCustomProperty100
For Each column As IDTSInputColumn100 In inputColumns
custProp = column.CustomPropertyCollection("IsFileName")
If CType(custProp.Value, Boolean) = True Then
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
custProp = column.CustomPropertyCollection("IsBLOB")
If CType(custProp.Value, Boolean) = True Then
m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
Next
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While buffer.NextRow
Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)
Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)
Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)
strFileName = TranslateFileName(strFileName)
Dim fi As FileInfo = New FileInfo(strFileName)
' Make sure directory exists before creating file.
If Not fi.Directory.Exists Then
fi.Directory.Create
End If
' Write the data to the file.
Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)
fs.Write(blobData, 0, blobLength)
fs.Close
End While
End Sub
Private Function TranslateFileName(ByVal fileName As String) As String
If fileName.Length > 2 AndAlso fileName(1) = ":"C Then
Return m_DestDir + fileName.Substring(3, fileName.Length - 3)
Else
Return m_DestDir + fileName
End If
End Function
End Class
End Namespace
|