Rozwijanie niestandardowy składnik źródła
SQL Server Integration Services gives developers the ability to write source components that can connect to custom data sources and supply data from those sources to other components in a data flow task.Możliwość tworzenia niestandardowych źródeł jest przydatne, gdy należy podłączyć do źródła danych, których nie można uzyskać dostęp za pomocą jednego z istniejących Integration Services źródeł.
Źródło składniki mają wyjść jeden lub więcej i nakładów zero.W projekcie czas, źródło składniki są używane do tworzenia i konfigurowania połączeń, odczytać kolumna metadane z zewnętrznego źródła danych i konfigurowanie kolumn wyjściowych źródła na podstawie źródła danych zewnętrznych.Podczas wykonywania połączenia z danymi zewnętrznymi źródło i dodawanie wierszy do buforu wyjściowego.zadanie przepływu danych Następnie udostępnia ten bufor wierszy danych do składników niższego rzędu.
Dla składnika źródło próbki, zobacz Integration Services próbki na witrynie Codeplex.Dla ogólny przegląd składnik przepływ danych rozwoju, zobacz Opracowywanie danych niestandardowy składnik przepływu.
Czas projektowania
Implementowanie projektu -czas funkcje składnika źródło obejmuje określenie połączenie z zewnętrznym źródłem danych, dodawanie i konfigurowanie kolumn wyjściowych, które odzwierciedlają źródło danych i sprawdzania, czy składnik jest gotowy do wykonać.Z definicji składnika źródło ma zerowe wejść i wyjść asynchronicznego jeden lub więcej.
Tworzenie składnika
Składniki źródła połączenia z zewnętrznymi źródłami danych przy użyciu ConnectionManager obiektów zdefiniowanych w pakiet.Wskazują one ich zapotrzebowania dla menedżer połączeń, dodając element RuntimeConnectionCollection kolekcja ComponentMetaData właściwość.Ta kolekcja służy dwóm celom — przechowują odwołania do menedżerów połączenia używane przez składnik pakiet i anonsować potrzebę menedżer połączeń do projektanta.Gdy IDTSRuntimeConnection100 został dodany do kolekcja, Zaawansowanego edytora Wyświetla Właściwości połączenia kartę, która pozwala użytkownikom wybrać lub utworzyć połączenie w pakiet.
Poniższy przykład kodu pokazuje implementację ProvideComponentProperties dodaje dane wyjściowe i dodaje IDTSRuntimeConnection100 obiektu, aby RuntimeConnectionCollection.
using System;
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]
public class MyComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.NET";
}
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _
Public Class MySourceComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Allow for resetting the component.
RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
output.Name = "Output"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.NET"
End Sub
End Class
Podłączanie do zewnętrznego źródła danych
Po dodaniu połączenia do RuntimeConnectionCollection, można zastąpić AcquireConnections metoda, aby ustanowić połączenie źródło danych zewnętrznych.Ta metoda jest wywoływana podczas projektowania i wykonywania czas.Składnik należy ustanowić połączenie z menedżer połączeń, określony przez uruchomienie -czas połączenia, a następnie do zewnętrznego źródło danych.
Po nawiązaniu połączenia powinny być buforowane wewnętrznie przez składnik i zwolnione, gdy ReleaseConnections wywoływana jest metoda.ReleaseConnections Na zaprojektowanie oraz wykonanie wywoływana jest metoda czas, tak jak AcquireConnections metody.Deweloperzy zastąpić tę metoda i Zwolnij połączenie ustanowione przez składnik podczas AcquireConnections.
Poniższy przykład kodu pokazuje składnik, który łączy się z obiektów ADO.Połączenie SIECIOWE w AcquireConnections metoda i zamyka połączenie ReleaseConnections metoda.
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then
Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Tworzenie i konfigurowanie kolumn wyjściowych
Kolumny wyjściowe składnika źródło odzwierciedla kolumny z zewnętrznego źródło danych, składnik dodawane do przepływ danych podczas wykonywania.W projekcie czas, dodawanie kolumn wyjściowych po składnik został skonfigurowany, aby połączyć się z zewnętrznym źródłem danych.Projekt —czas metodę, która używa składnika dodać kolumny do swojej kolekcja danych wyjściowych może się różnić w zależności od potrzeb składnika, chociaż nie powinny zostać dodane podczas Validate lub AcquireConnections.Na przykład, składnik, który zawiera instrukcja języka SQL we właściwość niestandardowej, który steruje zestaw danych dla składnika może dodać jej kolumn wyjściowych podczas SetComponentProperty metoda.Składnik sprawdza, czy go ma buforowanych połączenia i, jeśli tak, łączy danych źródło i generuje jej kolumn wyjściowych.
Po utworzeniu kolumna wyjociowej zestaw właściwości typ jej danych przez wywołanie SetDataTypeProperties metoda.Ta metoda jest konieczne ponieważ DataType, Length, Precision, i CodePage Właściwości tylko do odczytu, a każda właściwość jest zależna od innych ustawień.Ta metoda wymusza konieczność te wartości do zestaw , i zadania przepływ danych sprawdza się zestaw poprawnie.
DataType kolumna określa wartości, które są zestaw innych właściwości.W poniższej tabela przedstawiono wymagania zależne od właściwości dla każdego DataType.Typy danych niewymienionych mają ich właściwości zależne od zestaw do zera.
DataType |
Długość |
Skala |
Precision |
Strona kodowa |
---|---|---|---|---|
DT_DECIMAL |
0 |
Większa niż 0 i mniejsza niż 28. |
0 |
0 |
DT_CY |
0 |
0 |
0 |
0 |
DT_NUMERIC |
0 |
Większa niż 0 i mniejsza niż lub równa 28 i mniej niż precyzji. |
Większa lub równa 1 i mniejsza lub równa 38. |
0 |
DT_BYTES |
Większa niż 0. |
0 |
0 |
0 |
DT_STR |
Większa niż 0 i mniejsza niż 8000. |
0 |
0 |
Nie 0 i prawidłowego strona kodowa. |
DT_WSTR |
Większa niż 0 i mniejsza niż 4000. |
0 |
0 |
0 |
Ponieważ ograniczenia właściwości typu danych oparte są na typ danych kolumna danych wyjściowych, należy wybrać poprawny SSIS Typ danych podczas pracy z typów zarządzanych.Klasa podstawowa oferuje trzy metody pomocnika, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType, i DataRecordTypeToBufferType, wspomaganie w wyborze Deweloperzy składników zarządzanych SSIS Typ danych typu zarządzanego.Te metody konwersji typów danych zarządzanych do SSIS typów danych i vice versa.
Jak pokazano w przykładzie kodu następujące dane wyjściowe kolumna kolekcja składnik jest wypełniane na podstawie schematu tabela.Pomocnik metod klasy podstawowej są używane do zestaw typ danych kolumna i właściwości zależne są zestaw na podstawie typu danych.
SqlCommand sqlCommand;
private void CreateColumnsFromDataTable()
{
// Get the output.
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
// Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll();
output.ExternalMetadataColumnCollection.RemoveAll();
this.sqlCommand = sqlConnection.CreateCommand();
this.sqlCommand.CommandType = CommandType.Text;
this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;
SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);
DataTable dataTable = schemaReader.GetSchemaTable();
// Walk the columns in the schema,
// and for each data column create an output column and an external metadata column.
foreach (DataRow row in dataTable.Rows)
{
IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();
IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();
// Set column data type properties.
bool isLong = false;
DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);
dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);
int length = 0;
int precision = (short)row["NumericPrecision"];
int scale = (short)row["NumericScale"];
int codepage = dataTable.Locale.TextInfo.ANSICodePage;
switch (dt)
{
// The length cannot be zero, and the code page property must contain a valid code page.
case DataType.DT_STR:
case DataType.DT_TEXT:
length = precision;
precision = 0;
scale = 0;
break;
case DataType.DT_WSTR:
length = precision;
codepage = 0;
scale = 0;
precision = 0;
break;
case DataType.DT_BYTES:
precision = 0;
scale = 0;
codepage = 0;
break;
case DataType.DT_NUMERIC:
length = 0;
codepage = 0;
if (precision > 38)
precision = 38;
if (scale > 6)
scale = 6;
break;
case DataType.DT_DECIMAL:
length = 0;
precision = 0;
codepage = 0;
break;
default:
length = 0;
precision = 0;
codepage = 0;
scale = 0;
break;
}
// Set the properties of the output column.
outColumn.Name = (string)row["ColumnName"];
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);
}
}
Private sqlCommand As SqlCommand
Private Sub CreateColumnsFromDataTable()
' Get the output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
' Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll()
output.ExternalMetadataColumnCollection.RemoveAll()
Me.sqlCommand = sqlConnection.CreateCommand()
Me.sqlCommand.CommandType = CommandType.Text
Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)
Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)
Dim dataTable As DataTable = schemaReader.GetSchemaTable()
' Walk the columns in the schema,
' and for each data column create an output column and an external metadata column.
For Each row As DataRow In dataTable.Rows
Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New()
Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
' Set column data type properties.
Dim isLong As Boolean = False
Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))
dt = ConvertBufferDataTypeToFitManaged(dt, isLong)
Dim length As Integer = 0
Dim precision As Integer = CType(row("NumericPrecision"), Short)
Dim scale As Integer = CType(row("NumericScale"), Short)
Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage
Select Case dt
' The length cannot be zero, and the code page property must contain a valid code page.
Case DataType.DT_STR
Case DataType.DT_TEXT
length = precision
precision = 0
scale = 0
Case DataType.DT_WSTR
length = precision
codepage = 0
scale = 0
precision = 0
Case DataType.DT_BYTES
precision = 0
scale = 0
codepage = 0
Case DataType.DT_NUMERIC
length = 0
codepage = 0
If precision > 38 Then
precision = 38
End If
If scale > 6 Then
scale = 6
End If
Case DataType.DT_DECIMAL
length = 0
precision = 0
codepage = 0
Case Else
length = 0
precision = 0
codepage = 0
scale = 0
End Select
' Set the properties of the output column.
outColumn.Name = CStr(row("ColumnName"))
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)
Next
End Sub
Sprawdzanie poprawności składnika
Należy sprawdzić poprawność składnika źródło i sprawdź, czy kolumnas zdefiniowane w swojej produkcji kolumna kolekcji odpowiada kolumnas u źródło danych zewnętrznych.Czasami sprawdzenie kolumn wyjściowych przeciwko danych zewnętrznych źródło może być niemożliwe, takich jak rozłączony lub gdy zaleca się uniknąć długiego niepotrzebnej do serwera.W takich sytuacjach kolumny w danych wyjściowych nadal mogą być sprawdzone za pomocą ExternalMetadataColumnCollection obiektu wyjściowego.Aby uzyskać więcej informacji, zobacz Sprawdzanie poprawności składnika przepływu danych.
Tej kolekcja znajduje się na zarówno dane wejściowe i wyjściowe obiektów i można go wypełnić kolumny z zewnętrznego źródło danych.Tej kolekcja można używać do sprawdzania poprawności kolumn wyjściowych podczas SSIS Designer jest w trybie offline, gdy składnik jest odłączony lub ValidateExternalMetadata właściwość jest false.Kolekcja należy najpierw wypełniona w tym samym czas utworzony kolumn wyjściowych.Dodawanie kolumn zewnętrznych metadane do kolekcja jest stosunkowo łatwe, ponieważ kolumna metadane zewnętrznych początkowo powinna odpowiadać kolumna wyjociowej.Właściwości typu danych kolumna powinna już zestaw poprawnie, i właściwości mogą być kopiowane bezpośrednio do IDTSExternalMetadataColumn100 obiektu.
Następujący kod dodaje kolumna metadane zewnętrznych, oparty na kolumna wyjociowej nowo utworzonej.Zakłada się, że kolumna wyprowadzenia została już utworzona.
private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();
externalColumn.Name = outputColumn.Name;
externalColumn.Precision = outputColumn.Precision;
externalColumn.Length = outputColumn.Length;
externalColumn.DataType = outputColumn.DataType;
externalColumn.Scale = outputColumn.Scale;
// Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
externalColumn.Name = outputColumn.Name
externalColumn.Precision = outputColumn.Precision
externalColumn.Length = outputColumn.Length
externalColumn.DataType = outputColumn.DataType
externalColumn.Scale = outputColumn.Scale
' Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Czas jednostkowy
W czasie wykonywania składników Dodawanie wierszy do wyprowadzenia buforów, które są tworzone przez zadanie przepływu danych i dostarczane do składnika w PrimeOutput.Wywoływana po składników źródło metoda odbiera buforu wyjściowego dla każdego IDTSOutput100 składnika, który jest podłączony do składnika niższego rzędu.
Lokalizowanie kolumn w buforze
Bufor wyjściowy dla składnika zawiera kolumny zdefiniowane przez składnik i dodany do wyjścia podrzędne składnika kolumn.Na przykład jeśli składnik źródło zawiera trzy kolumna w swojej produkcji i następny składnik dodaje czwartego wyjściowe kolumna bufor wyjściowy, pod warunkiem stosowania przez źródło składnik zawiera cztery kolumna te.
Kolejność kolumn w wierszu bufor nie jest zdefiniowany przez indeks kolumna danych wyjściowych kolekcja kolumn wyjściowych.Kolumna wyprowadzenia można dokładnie znajdować wyłącznie w wierszu bufora za pomocą FindColumnByLineageID metoda BufferManager.Ta metoda lokalizuje kolumna z Identyfikatorem rodowodu określony w określony bufor i zwraca jego lokalizacji w wierszu.Indeksy kolumn wyjściowych zazwyczaj znajdują się w PreExecute metoda i przechowywane w ramach PrimeOutput.
Poniższy przykład kodu znajdzie lokalizację kolumn wyjściowych buforu wyjściowego podczas wywoływania PreExecutei przechowuje je w wewnętrznej struktury.Nazwa kolumna jest również przechowywany w strukturze i jest używany przez przykładowy kod dla PrimeOutput metoda w następnej sekcji tego tematu.
ArrayList columnInformation;
private struct ColumnInfo
{
public int BufferColumnIndex;
public string ColumnName;
}
public override void PreExecute()
{
this.columnInformation = new ArrayList();
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)
{
ColumnInfo ci = new ColumnInfo();
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);
ci.ColumnName = col.Name;
columnInformation.Add(ci);
}
}
Public Overrides Sub PreExecute()
Me.columnInformation = New ArrayList()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
For Each col As IDTSOutputColumn100 In output.OutputColumnCollection
Dim ci As ColumnInfo = New ColumnInfo()
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)
ci.ColumnName = col.Name
columnInformation.Add(ci)
Next
End Sub
Przetwarzanie wierszy
Wiersze są dodawane do buforu wyjściowego, wywołując AddRow metoda, która tworzy nowy wiersz buforu z pustymi wartościami w kolumnach.Składnik przypisuje następnie wartości poszczególnych kolumn.Bufory wyjścia do składnika są tworzone i monitorowane przez zadanie przepływ danych.Staną się one pełne, wiersze w buforze są przenoszone do następnego składnika.Nie ma możliwości ustalenia, kiedy partia wierszy została wysłana do następnego składnika ponieważ przemieszczanie wierszy przez zadanie przepływ danych jest przezroczyste dla dewelopera składnika i RowCount właściwość jest zawsze zero na buforów danych wyjściowych.Po zakończeniu składnika źródło Dodawanie wierszy do buforu wyjściowego, powiadamia zadanie przepływ danych przez wywołanie SetEndOfRowset metoda PipelineBuffer, i pozostałych wierszy w buforze są przekazywane do następnego składnika.
Gdy składnik źródło odczytuje wiersze z zewnętrznego źródło danych, może chcesz zaktualizować "Wierszy odczytu" "obiekt BLOB bajtów Odczyt" liczniki wydajności przez wywołanie IncrementPipelinePerfCounter metoda.Aby uzyskać więcej informacji, zobacz Monitorowanie wydajności aparatu przepływu danych.
Poniższy przykład kodu pokazuje składnik, który dodaje wierszy do bufora wyjściowego PrimeOutput.Indeksy kolumn wyjściowych w buforze były za pomocą PreExecute w poprzednim przykładzie kodu.
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
PipelineBuffer buffer = buffers[0];
SqlDataReader dataReader = sqlCommand.ExecuteReader();
// Loop over the rows in the DataReader,
// and add them to the output buffer.
while (dataReader.Read())
{
// Add a row to the output buffer.
buffer.AddRow();
for (int x = 0; x < columnInformation.Count; x++)
{
ColumnInfo ci = (ColumnInfo)columnInformation[x];
int ordinal = dataReader.GetOrdinal(ci.ColumnName);
if (dataReader.IsDBNull(ordinal))
buffer.SetNull(ci.BufferColumnIndex);
else
{
buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];
}
}
}
buffer.SetEndOfRowset();
}
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
Dim buffer As PipelineBuffer = buffers(0)
Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()
' Loop over the rows in the DataReader,
' and add them to the output buffer.
While (dataReader.Read())
' Add a row to the output buffer.
buffer.AddRow()
For x As Integer = 0 To columnInformation.Count
Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)
Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)
If (dataReader.IsDBNull(ordinal)) Then
buffer.SetNull(ci.BufferColumnIndex)
Else
buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)
End If
Next
End While
buffer.SetEndOfRowset()
End Sub
Przykład
Poniżej pokazano składnik źródło prostego, który używa pliku menedżer połączeń, aby załadować pliki binarne treści do przepływ danych.W tym przykładzie nie wykazują metod i funkcje omówione w tym temacie.W ten sposób pokazujesz ważnych metod należy zastąpić każdego składnika niestandardowego źródło, ale nie zawiera kodu dla projektu -czas sprawdzania poprawności.Pełniejsze składnika źródło próbki, zobacz Readme_ADO Source Component Sample.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace BlobSrc
{
[DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]
public class BlobSrc : PipelineComponent
{
IDTSConnectionManager100 m_ConnMgr;
int m_FileNameColumnIndex = -1;
int m_FileBlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "BLOB File Inserter Output";
IDTSOutputColumn100 column = output.OutputColumnCollection.New();
column.Name = "FileName";
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);
column = output.OutputColumnCollection.New();
column.Name = "FileBLOB";
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_ConnMgr = conn.ConnectionManager;
}
public override void ReleaseConnections()
{
m_ConnMgr = null;
}
public override void PreExecute()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);
m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
string strFileName = (string)m_ConnMgr.AcquireConnection(null);
while (strFileName != null)
{
buffers[0].AddRow();
buffers[0].SetString(m_FileNameColumnIndex, strFileName);
FileInfo fileInfo = new FileInfo(strFileName);
byte[] fileData = new byte[fileInfo.Length];
FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);
fs.Read(fileData, 0, fileData.Length);
buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);
strFileName = (string)m_ConnMgr.AcquireConnection(null);
}
buffers[0].SetEndOfRowset();
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace BlobSrc
<DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _
Public Class BlobSrc
Inherits PipelineComponent
Private m_ConnMgr As IDTSConnectionManager100
Private m_FileNameColumnIndex As Integer = -1
Private m_FileBlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "BLOB File Inserter Output"
Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New
column.Name = "FileName"
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0)
column = output.OutputColumnCollection.New
column.Name = "FileBLOB"
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0)
Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_ConnMgr = conn.ConnectionManager
End Sub
Public Overrides Sub ReleaseConnections()
m_ConnMgr = Nothing
End Sub
Public Overrides Sub PreExecute()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer)
m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer)
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String)
While Not (strFileName Is Nothing)
buffers(0).AddRow
buffers(0).SetString(m_FileNameColumnIndex, strFileName)
Dim fileInfo As FileInfo = New FileInfo(strFileName)
Dim fileData(fileInfo.Length) As Byte
Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read)
fs.Read(fileData, 0, fileData.Length)
buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData)
strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String)
End While
buffers(0).SetEndOfRowset
End Sub
End Class
End Namespace
|
Zobacz także