Поделиться через


Разработка пользовательского компонента источника

Службы SQL Server Integration Services предоставляют разработчикам возможность создавать компоненты источника, которые могут подключаться к пользовательским источникам данных и предоставлять данные из этих источников другим компонентам в задаче потока данных. Возможность создавать пользовательские источники становится полезной, если возникает необходимость подключаться к источникам данных, доступ к которым нельзя получить с помощью одного из существующих источников служб Integration Services.

Компоненты источника имеют один или несколько выходов и не имеют входов. Во время разработки компоненты источника используются для создания и настройки соединений, чтения метаданных столбца из внешнего источника данных и настройки выходных столбцов источника, исходя из внешнего источника данных. Во время выполнения они подключаются к внешнему источнику данных и добавляют строки в выходной буфер. Затем задача потока данных предоставляет этот буфер со строками данных для нижестоящих компонентов.

Образец компонента источника см. в образцах служб Integration Services в разделе Codeplex. Общие сведения о разработке компонентов потока данных см. в разделе Разработка пользовательского компонента потока данных.

Время разработки

Реализация функциональных возможностей времени разработки для компонента источника включает указание соединения с внешним источником данных, добавление и настройку выходных столбцов, которые отражают источник данных, а также проверку готовности компонента к выполнению. По определению, компонент источника не имеет входов и один или несколько асинхронных выходов.

Создание компонента

Компоненты источника подключаются к внешним источникам данных с помощью объектов ConnectionManager, определенных в пакете. Они указывают свои требования для диспетчера соединений, добавляя элемент к коллекции RuntimeConnectionCollection свойства ComponentMetaData. Эта коллекция предназначена для двух целей: хранение ссылок на диспетчеры соединений в пакете, используемом компонентом, и объявление необходимости диспетчера соединений для конструктора. При добавлении объекта IDTSRuntimeConnection100 к коллекции в окне Расширенный редактор отображается вкладка Свойства соединения, которая позволяет пользователям выбрать или создать соединение в пакете.

В следующем примере кода показана реализация метода ProvideComponentProperties, который добавляет выход, а также добавляет объект IDTSRuntimeConnection100 к коллекции RuntimeConnectionCollection.

using System;
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;

namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]
    public class MyComponent : PipelineComponent
    {
        public override void ProvideComponentProperties()
        {
            // Reset the component.
            base.RemoveAllInputsOutputsAndCustomProperties();
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();

            IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
            output.Name = "Output";

            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
            connection.Name = "ADO.NET";
        }
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper

<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _
Public Class MySourceComponent
    Inherits PipelineComponent

    Public Overrides Sub ProvideComponentProperties()

        ' Allow for resetting the component.
        RemoveAllInputsOutputsAndCustomProperties()
        ComponentMetaData.RuntimeConnectionCollection.RemoveAll()

        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
        output.Name = "Output"

        Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
        connection.Name = "ADO.NET"

    End Sub
End Class

Соединение с внешним источником данных

После добавления соединения к коллекции RuntimeConnectionCollection необходимо переопределить метод AcquireConnections, чтобы установить соединение с внешним источником данных. Этот метод вызывается и во время проектирования, и во время выполнения. Компонент должен установить соединение с диспетчером соединений, указанным с помощью соединения времени выполнения, а впоследствии — с внешним источником данных.

Установленное соединение необходимо кэшировать в компоненте и затем освободить после вызова метода ReleaseConnections. Метод ReleaseConnections вызывается и во время проектирования, и во время выполнения, как и метод AcquireConnections. Разработчики переопределяют этот метод и освобождают соединение, установленное компонентом во время вызова метода AcquireConnections.

В следующем примере кода показан компонент, который подключается к соединению ADO.NET в методе AcquireConnections и закрывает соединение в методе ReleaseConnections.

private SqlConnection sqlConnection;

public override void AcquireConnections(object transaction)
{
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
    {
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;

        if (cmado == null)
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");

        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
        sqlConnection.Open();
    }
}

public override void ReleaseConnections()
{
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
        sqlConnection.Close();
}
Private sqlConnection As SqlConnection

Public Overrides Sub AcquireConnections(ByVal transaction As Object)

    If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then

        Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)

        If IsNothing(cmado) Then
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
        End If

        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
        sqlConnection.Open()

    End If
End Sub

Public Overrides Sub ReleaseConnections()

    If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then
        sqlConnection.Close()
    End If

End Sub

Создание и настройка выходных столбцов

Выходные столбцы компонента источника отражают столбцы из внешнего источника данных, который добавляется компонентом к потоку данных во время выполнения. Во время разработки выходные столбцы добавляются после настройки компонента для подключения к внешнему источнику данных. Метод времени разработки, используемый компонентом для добавления столбцов к своей выходной коллекции, может изменяться в зависимости от потребностей компонента, хотя эти столбцы не должны добавляться во время выполнения методов Validate или AcquireConnections. Например, компонент, содержащий инструкцию SQL в пользовательском свойстве, которое управляет набором данных для компонента, может добавлять свои выходные столбцы во время выполнения метода SetComponentProperty. Компонент проверяет, хранится ли в нем кэшированное соединение, и если оно имеется, подключается к источнику данных и создает выходные столбцы.

После создания выходного столбца компонент задает для него свойства типа данных, вызывая метод SetDataTypeProperties. Этот метод необходим, поскольку свойства DataType, Length, Precision и CodePage допускают только чтение, причем параметры каждого свойства зависят от параметров другого. Применение данного метода подчеркивает необходимость задавать эти значения единообразно, а задача потока данных проверяет, заданы ли значения правильно.

Свойство DataType столбца определяет значения, задаваемые для других свойств. В следующей таблице показаны требования к зависимым свойствам для каждого значения DataType. Для не указанных здесь типов данных зависимые свойства имеют нулевые значения.

Тип данных

Длина

Масштаб

Точность

Кодовая страница

DT_DECIMAL

0

Больше 0 и меньше или равно 28.

0

0

DT_CY

0

0

0

0

DT_NUMERIC

0

Больше 0, меньше или равно 28 и меньше, чем точность.

Больше или равно 1 и меньше или равно 38.

0

DT_BYTES

Больше 0.

0

0

0

DT_STR

Больше 0 и меньше 8 000.

0

0

Не равно 0 и представляет допустимую кодовую страницу.

DT_WSTR

Больше 0 и меньше 4 000.

0

0

0

Поскольку ограничения для свойств типа данных основаны на типе данных выходного столбца, во время работы с управляемыми типами необходимо выбрать правильный тип данных служб SSIS. В базовом классе предусмотрены три вспомогательных метода: ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType и DataRecordTypeToBufferType, позволяющие разработчикам управляемых компонентов выбрать тип данных служб SSIS с учетом управляемого типа. Эти методы преобразуют управляемые типы данных в типы данных служб SSIS и обратно.

В следующем примере кода показано, как происходит заполнение коллекции выходных столбцов компонента на основе схемы таблицы. Вспомогательные методы базового класса используются для задания типа данных столбца, а зависимые свойства задаются с учетом типа данных.

SqlCommand sqlCommand;

private void CreateColumnsFromDataTable()
{
    // Get the output.
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

    // Start clean, and remove the columns from both collections.
    output.OutputColumnCollection.RemoveAll();
    output.ExternalMetadataColumnCollection.RemoveAll();

    this.sqlCommand = sqlConnection.CreateCommand();
    this.sqlCommand.CommandType = CommandType.Text;
    this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;
    SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);
    DataTable dataTable = schemaReader.GetSchemaTable();

    // Walk the columns in the schema, 
    // and for each data column create an output column and an external metadata column.
    foreach (DataRow row in dataTable.Rows)
    {
        IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();
        IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();

        // Set column data type properties.
        bool isLong = false;
        DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);
        dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);
        int length = 0;
        int precision = (short)row["NumericPrecision"];
        int scale = (short)row["NumericScale"];
        int codepage = dataTable.Locale.TextInfo.ANSICodePage;

        switch (dt)
        {
            // The length cannot be zero, and the code page property must contain a valid code page.
            case DataType.DT_STR:
            case DataType.DT_TEXT:
                length = precision;
                precision = 0;
                scale = 0;
                break;

            case DataType.DT_WSTR:
                length = precision;
                codepage = 0;
                scale = 0;
                precision = 0;
                break;

            case DataType.DT_BYTES:
                precision = 0;
                scale = 0;
                codepage = 0;
                break;

            case DataType.DT_NUMERIC:
                length = 0;
                codepage = 0;

                if (precision > 38)
                    precision = 38;

                if (scale > 6)
                    scale = 6;
                break;

            case DataType.DT_DECIMAL:
                length = 0;
                precision = 0;
                codepage = 0;
                break;

            default:
                length = 0;
                precision = 0;
                codepage = 0;
                scale = 0;
                break;

        }

        // Set the properties of the output column.
        outColumn.Name = (string)row["ColumnName"];
        outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);
    }
}
Private sqlCommand As SqlCommand

Private Sub CreateColumnsFromDataTable()

    ' Get the output.
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)

    ' Start clean, and remove the columns from both collections.
    output.OutputColumnCollection.RemoveAll()
    output.ExternalMetadataColumnCollection.RemoveAll()

    Me.sqlCommand = sqlConnection.CreateCommand()
    Me.sqlCommand.CommandType = CommandType.Text
    Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)

    Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)
    Dim dataTable As DataTable = schemaReader.GetSchemaTable()

    ' Walk the columns in the schema, 
    ' and for each data column create an output column and an external metadata column.
    For Each row As DataRow In dataTable.Rows

        Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New()
        Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()

        ' Set column data type properties.
        Dim isLong As Boolean = False
        Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))
        dt = ConvertBufferDataTypeToFitManaged(dt, isLong)
        Dim length As Integer = 0
        Dim precision As Integer = CType(row("NumericPrecision"), Short)
        Dim scale As Integer = CType(row("NumericScale"), Short)
        Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage

        Select Case dt

            ' The length cannot be zero, and the code page property must contain a valid code page.
            Case DataType.DT_STR
            Case DataType.DT_TEXT
                length = precision
                precision = 0
                scale = 0

            Case DataType.DT_WSTR
                length = precision
                codepage = 0
                scale = 0
                precision = 0

            Case DataType.DT_BYTES
                precision = 0
                scale = 0
                codepage = 0

            Case DataType.DT_NUMERIC
                length = 0
                codepage = 0

                If precision > 38 Then
                    precision = 38
                End If

                If scale > 6 Then
                    scale = 6
                End If

            Case DataType.DT_DECIMAL
                length = 0
                precision = 0
                codepage = 0

            Case Else
                length = 0
                precision = 0
                codepage = 0
                scale = 0
        End Select

        ' Set the properties of the output column.
        outColumn.Name = CStr(row("ColumnName"))
        outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)
    Next
End Sub

Проверка компонента

Необходимо проверить компонент источника и убедиться, что столбцы, определенные в его коллекциях выходных столбцов, совпадают со столбцами во внешнем источнике данных. Иногда сверка выходных столбцов с внешним источником данных может оказаться невозможной, например, если имеет место отсоединенное состояние или желательно обойтись без применения продолжительных операций обмена данными с сервером. В этих ситуациях все равно может быть выполнена проверка столбцов в выходе с помощью коллекции ExternalMetadataColumnCollection выходного объекта. Дополнительные сведения см. в разделе Проверка компонента потока данных.

Эта коллекция существует и во входных, и в выходных объектах, и ее можно заполнить столбцами из внешнего источника данных. Эту коллекцию можно использовать для проверки выходных столбцов, если конструктор служб SSIS находится в режиме вне сети, компонент отсоединен или свойство ValidateExternalMetadata имеет значение false. Эту коллекцию необходимо вначале заполнить одновременно с созданием выходных столбцов. Добавление столбцов внешних метаданных к коллекции происходит относительно просто, поскольку столбец внешних метаданных должен с самого начала совпадать с выходным столбцом. Свойства типа данных столбца уже должны быть заданы правильно, поэтому эти свойства можно скопировать непосредственно в объект IDTSExternalMetadataColumn100.

В следующем образце кода добавляется столбец внешних метаданных, который основан на вновь созданном выходном столбце. При этом подразумевается, что выходной столбец уже был создан.

private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)
{
    
    // Set the properties of the external metadata column.
    IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();
    externalColumn.Name = outputColumn.Name;
    externalColumn.Precision = outputColumn.Precision;
    externalColumn.Length = outputColumn.Length;
    externalColumn.DataType = outputColumn.DataType;
    externalColumn.Scale = outputColumn.Scale;

    // Map the external column to the output column.
    outputColumn.ExternalMetadataColumnID = externalColumn.ID;

}
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100)


        ' Set the properties of the external metadata column.
        Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
        externalColumn.Name = outputColumn.Name
        externalColumn.Precision = outputColumn.Precision
        externalColumn.Length = outputColumn.Length
        externalColumn.DataType = outputColumn.DataType
        externalColumn.Scale = outputColumn.Scale

        ' Map the external column to the output column.
        outputColumn.ExternalMetadataColumnID = externalColumn.ID

    End Sub

Время выполнения

Во время выполнения компоненты добавляют строки в выходные буферы, созданные задачей потока данных и предоставленные компоненту в методе PrimeOutput. Этот метод, вызываемый однократно для компонентов источника, получает выходной буфер для каждого объекта IDTSOutput100 компонента, который подключен к нижестоящему компоненту.

Поиск столбцов в буфере

Выходной буфер для компонента содержит столбцы, определенные компонентом, и все столбцы, добавленные к выходу нижестоящего компонента. Например, если компонент источника предоставляет три столбца на выходе, а в следующем компоненте добавляется четвертый выходной столбец, то выходной буфер, предназначенный для использования компонентом источника, содержит эти четыре столбца.

Порядок столбцов в строке буфера не определяется индексом выходного столбца в коллекции выходных столбцов. Выходной столбец можно точно найти в строке буфера только с помощью метода FindColumnByLineageID объекта BufferManager. Этот метод находит столбец с указанным идентификатором журнала обращений и преобразований в указанном буфере, после чего возвращает его расположение в строке. Индексы выходных столбцов обычно определяются в методе PreExecute и сохраняются для использования во время выполнения метода PrimeOutput.

В следующем примере кода находится расположение выходных столбцов в выходном буфере во время вызова метода PreExecute и полученные данные сохраняются во внутренней структуре. В структуре сохраняется также имя столбца, которое затем используется в примере кода, относящемся к методу PrimeOutput, в следующем подразделе этого раздела.

ArrayList columnInformation;

private struct ColumnInfo
{
    public int BufferColumnIndex;
    public string ColumnName;
}

public override void PreExecute()
{
    this.columnInformation = new ArrayList();
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

    foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)
    {
        ColumnInfo ci = new ColumnInfo();
        ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);
        ci.ColumnName = col.Name;
        columnInformation.Add(ci);
    }
}
Public Overrides Sub PreExecute()

    Me.columnInformation = New ArrayList()
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)

    For Each col As IDTSOutputColumn100 In output.OutputColumnCollection

        Dim ci As ColumnInfo = New ColumnInfo()
        ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)
        ci.ColumnName = col.Name
        columnInformation.Add(ci)
    Next
End Sub

Обработка строк

Строки добавляются к выходному буферу путем вызова метода AddRow, который создает новую строку буфера с пустыми значениями в ее столбцах. Затем компонент присваивает значения отдельным столбцам. Выходные буферы, предоставленные компоненту, создаются и наблюдаются задачей потока данных. По мере их заполнения строки буферы перемещаются в следующий компонент. Невозможно определить время отправки пакета строк в следующий компонент, поскольку перемещение строк задачей потока данных прозрачно для разработчика компонента, а свойство RowCount для выходных буферов всегда равно нулю. Компонент источника после завершения добавления строк в выходной буфер уведомляет задачу потока данных путем вызова метода SetEndOfRowset буфера PipelineBuffer, а оставшиеся в буфере строки передаются следующему компоненту.

При считывании строк компонентом источника из внешнего источника данных может потребоваться обновить счетчики производительности «Rows read» (Количество считанных строк) или «BLOB bytes read» (Количество считанных байтов больших двоичных объектов) путем вызова метода IncrementPipelinePerfCounter. Дополнительные сведения см. в разделе Наблюдение за производительностью подсистемы обработки потока данных.

В следующем примере кода показан компонент, который добавляет строки в выходной буфер в методе PrimeOutput. Индексы выходных столбцов в буфере были найдены с помощью метода PreExecute в предыдущем примере кода.

public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
    PipelineBuffer buffer = buffers[0];

    SqlDataReader dataReader = sqlCommand.ExecuteReader();

    // Loop over the rows in the DataReader, 
    // and add them to the output buffer.
    while (dataReader.Read())
    {
        // Add a row to the output buffer.
        buffer.AddRow();

        for (int x = 0; x < columnInformation.Count; x++)
        {
            ColumnInfo ci = (ColumnInfo)columnInformation[x];
            int ordinal = dataReader.GetOrdinal(ci.ColumnName);

            if (dataReader.IsDBNull(ordinal))
                buffer.SetNull(ci.BufferColumnIndex);
            else
            {
                buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];
            }
        }
    }
    buffer.SetEndOfRowset();
}
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())

    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
    Dim buffer As PipelineBuffer = buffers(0)

    Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()

    ' Loop over the rows in the DataReader, 
    ' and add them to the output buffer.
    While (dataReader.Read())

        ' Add a row to the output buffer.
        buffer.AddRow()

        For x As Integer = 0 To columnInformation.Count

            Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)

            Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)

            If (dataReader.IsDBNull(ordinal)) Then
                buffer.SetNull(ci.BufferColumnIndex)
            Else
                buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)

            End If
        Next

    End While

    buffer.SetEndOfRowset()
End Sub

Образец

В следующем образце показан простой компонент источника, в котором используется диспетчер соединения файлов для загрузки двоичного содержимого из файлов в поток данных. В этом образце показаны не все методы и возможности, описанные в этом разделе. Он демонстрирует важные методы, которые должны быть переопределены в каждом пользовательском компоненте источника, но не содержат код для проверки во время разработки. Более полный образец компонента источника см. в разделе Readme_ADO Source Component Sample.

using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace BlobSrc
{
  [DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]
  public class BlobSrc : PipelineComponent
  {
    IDTSConnectionManager100 m_ConnMgr;
    int m_FileNameColumnIndex = -1;
    int m_FileBlobColumnIndex = -1;

    public override void ProvideComponentProperties()
    {
      IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
      output.Name = "BLOB File Inserter Output";

      IDTSOutputColumn100 column = output.OutputColumnCollection.New();
      column.Name = "FileName";
      column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);

      column = output.OutputColumnCollection.New();
      column.Name = "FileBLOB";
      column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);

      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
      conn.Name = "FileConnection";
    }

    public override void AcquireConnections(object transaction)
    {
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
      m_ConnMgr = conn.ConnectionManager;
    }

    public override void ReleaseConnections()
    {
      m_ConnMgr = null;
    }

    public override void PreExecute()
    {
      IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

      m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);
      m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);
    }

    public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
    {
      string strFileName = (string)m_ConnMgr.AcquireConnection(null);

      while (strFileName != null)
      {
        buffers[0].AddRow();

        buffers[0].SetString(m_FileNameColumnIndex, strFileName);

        FileInfo fileInfo = new FileInfo(strFileName);
        byte[] fileData = new byte[fileInfo.Length];
        FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);
        fs.Read(fileData, 0, fileData.Length);

        buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);

        strFileName = (string)m_ConnMgr.AcquireConnection(null);
      }

      buffers[0].SetEndOfRowset();
    }
  }
}
Imports System 
Imports System.IO 
Imports Microsoft.SqlServer.Dts.Pipeline 
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper 
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper 
Namespace BlobSrc 

 <DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _ 
 Public Class BlobSrc 
 Inherits PipelineComponent 
   Private m_ConnMgr As IDTSConnectionManager100 
   Private m_FileNameColumnIndex As Integer = -1 
   Private m_FileBlobColumnIndex As Integer = -1 

   Public  Overrides Sub ProvideComponentProperties() 
     Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New 
     output.Name = "BLOB File Inserter Output" 
     Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New 
     column.Name = "FileName" 
     column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0) 
     column = output.OutputColumnCollection.New 
     column.Name = "FileBLOB" 
     column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0) 
     Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New 
     conn.Name = "FileConnection" 
   End Sub 

   Public  Overrides Sub AcquireConnections(ByVal transaction As Object) 
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0) 
     m_ConnMgr = conn.ConnectionManager 
   End Sub 

   Public  Overrides Sub ReleaseConnections() 
     m_ConnMgr = Nothing 
   End Sub 

   Public  Overrides Sub PreExecute() 
     Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0) 
     m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer) 
     m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer) 
   End Sub 

   Public  Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer()) 
     Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String) 
     While Not (strFileName Is Nothing) 
       buffers(0).AddRow 
       buffers(0).SetString(m_FileNameColumnIndex, strFileName) 
       Dim fileInfo As FileInfo = New FileInfo(strFileName) 
       Dim fileData(fileInfo.Length) As Byte 
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read) 
       fs.Read(fileData, 0, fileData.Length) 
       buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData) 
       strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String) 
     End While 
     buffers(0).SetEndOfRowset 
   End Sub 
 End Class 
End Namespace
Значок служб Integration Services (маленький) Будьте в курсе новых возможностей служб Integration Services

Чтобы загружать новейшую документацию, статьи, образцы и видеоматериалы от корпорации Майкрософт, а также лучшие решения от участников сообщества, посетите страницу служб Integration Services на сайтах MSDN или TechNet:

Чтобы получать автоматические уведомления об этих обновлениях, подпишитесь на RSS-каналы, предлагаемые на этой странице.