다음을 통해 공유


비동기 출력을 사용하여 사용자 지정 변환 구성 요소 개발

적용 대상: Azure Data Factory의 SQL Server SSIS Integration Runtime

변환이 구성 요소에서 입력 행을 모두 받을 때까지 행을 출력할 수 없거나 변환이 입력으로 받은 각 행에 대해 출력 행을 정확히 하나만 생성하지 않는 경우에는 비동기 출력을 사용하는 구성 요소를 사용합니다. 예를 들어 집계 변환은 모든 행을 읽을 때까지 행의 합계를 계산할 수 없습니다. 반면, 각 데이터 행을 통과하면서 수정할 때 언제든지 동기 출력이 포함된 구성 요소를 사용할 수 있습니다. 각 행에 대한 데이터를 수정하거나 하나 이상의 새 열을 만들 수 있습니다. 각 열에는 각 입력 행에 대한 값이 있습니다. 동기 구성 요소와 비동기 구성 요소 간의 차이점에 대한 자세한 내용은 동기 및 비동기 변환 이해를 참조하세요.

비동기 출력이 있는 변환 구성 요소는 대상 및 원본 구성 요소 모두의 역할을 하므로 고유합니다. 이러한 종류의 구성 요소는 업스트림 구성 요소에서 행을 수신하고 다운스트림 구성 요소에서 사용되는 행을 추가합니다. 다른 데이터 흐름 구성 요소는 이러한 작업을 모두 수행하지 않습니다.

동기 출력이 있는 구성 요소에서 사용할 수 있는 업스트림 구성 요소의 열은 구성 요소의 다운스트림 구성 요소에서 자동으로 사용할 수 있습니다. 따라서 동기 출력이 있는 구성 요소는 다음 구성 요소에 열과 행을 제공하기 위해 출력 열을 정의할 필요가 없습니다. 반면 비동기 출력을 사용하는 구성 요소에서는 출력 열을 제공하고 다운스트림 구성 요소에 행을 제공해야 합니다. 따라서 비동기 출력이 있는 구성 요소에는 디자인 및 실행 시간 동안 수행할 작업이 더 많으며 구성 요소 개발자는 구현할 코드가 더 많습니다.

SQL Server Integration Services에는 비동기 출력을 사용하는 여러 변환이 포함되어 있습니다. 예를 들어 정렬 변환의 경우에는 행을 정렬하기 전에 모든 행이 있어야 하며 행을 정렬하는 데는 비동기 출력이 사용됩니다. 모든 행을 받은 후 정렬하고 출력에 추가합니다.

이 섹션에서는 비동기 출력을 사용하여 변환을 개발하는 방법을 자세히 설명합니다. 원본 구성 요소 개발에 대한 자세한 내용은 사용자 지정 원본 구성 요소 개발을 참조하세요.

디자인 타임

구성 요소 만들기

개체의 IDTSOutput100 속성은 SynchronousInputID 출력이 동기인지 비동기인지를 식별합니다. 비동기 출력을 만들려면 구성 요소에 출력을 추가하고 0으로 설정합니다 SynchronousInputID . 또한 이 속성을 설정하면 데이터 흐름 태스크가 구성 요소의 입력 및 출력 모두에 개체를 할당 PipelineBuffer 할지, 아니면 두 개체 간에 단일 버퍼가 할당되고 공유되는지 여부도 결정됩니다.

다음 샘플 코드는 구현에서 비동기 출력 ProvideComponentProperties 을 만드는 구성 요소를 보여줍니다.

using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "AsyncComponent",ComponentType = ComponentType.Transform)]  
    public class AsyncComponent : PipelineComponent  
    {  
        public override void ProvideComponentProperties()  
        {  
            // Call the base class, which adds a synchronous input  
            // and output.  
            base.ProvideComponentProperties();  
  
            // Make the output asynchronous.  
            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
            output.SynchronousInputID = 0;  
        }  
    }  
}  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  
  
<DtsPipelineComponent(DisplayName:="AsyncComponent", ComponentType:=ComponentType.Transform)> _  
Public Class AsyncComponent  
    Inherits PipelineComponent  
  
    Public Overrides Sub ProvideComponentProperties()  
  
        ' Call the base class, which adds a synchronous input  
        ' and output.  
        Me.ProvideComponentProperties()  
  
        ' Make the output asynchronous.  
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
        output.SynchronousInputID = 0  
  
    End Sub  
  
End Class  

출력 열 만들기 및 구성

앞에서 설명한 것처럼 비동기 구성 요소는 출력 열 컬렉션에 열을 추가하여 다운스트림 구성 요소에 열을 제공합니다. 구성 요소의 요구 사항에 따라 선택할 수 있는 몇 가지 디자인 타임 메서드가 있습니다. 예를 들어 업스트림 구성 요소의 모든 열을 다운스트림 구성 요소로 전달하려는 경우 입력 열을 구성 요소에서 사용할 수 있는 첫 번째 메서드이므로 열을 추가하는 메서드를 재정 OnInputPathAttached 의합니다.

구성 요소가 입력에 대해 선택한 열을 기반으로 출력 열을 만드는 경우 메서드를 재정의 SetUsageType 하여 출력 열을 선택하고 사용 방법을 나타냅니다.

비동기 출력이 있는 구성 요소가 업스트림 구성 요소의 열을 기반으로 출력 열을 만들고 사용 가능한 업스트림 열이 변경되면 구성 요소는 해당 출력 열 컬렉션을 업데이트해야 합니다. 이러한 변경 내용은 구성 요소에서 검색하고 그 동안 ValidateReinitializeMetaData수정해야 합니다.

참고 항목

출력 열이 출력 열 컬렉션에서 제거되면 데이터 흐름에서 해당 열을 참조하는 다운스트림 구성 요소가 부정적인 영향을 받습니다. 다운스트림 구성 요소를 중단하지 않도록 열을 제거하고 다시 생성하지 않고 출력 열을 복구해야 합니다. 예를 들어 열의 데이터 형식이 변경된 경우에는 데이터 형식을 업데이트해야 합니다.

다음 코드 예에서는 업스트림 구성 요소의 사용 가능한 각 열에 대해 해당 출력 열 컬렉션에 출력 열을 추가하는 구성 요소를 보여 줍니다.

public override void OnInputPathAttached(int inputID)  
{  
   IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);  
   IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
   IDTSVirtualInput100 vInput = input.GetVirtualInput();  
  
   foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)  
   {  
      IDTSOutputColumn100 outCol = output.OutputColumnCollection.New();  
      outCol.Name = vCol.Name;  
      outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage);  
   }  
}  
Public Overrides Sub OnInputPathAttached(ByVal inputID As Integer)  
  
    Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)  
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
    Dim vInput As IDTSVirtualInput100 = input.GetVirtualInput()  
  
    For Each vCol As IDTSVirtualInputColumn100 In vInput.VirtualInputColumnCollection  
  
        Dim outCol As IDTSOutputColumn100 = output.OutputColumnCollection.New()  
        outCol.Name = vCol.Name  
        outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage)  
  
    Next  
End Sub  

실행 시간

또한 비동기 출력을 사용하는 구성 요소에서는 런타임에 다른 유형의 구성 요소와는 다른 일련의 메서드를 실행합니다. 먼저 이러한 구성 요소는 PrimeOutputProcessInput 메서드에 대한 호출을 모두 받는 유일한 구성 요소입니다. 또한 비동기 출력이 있는 구성 요소는 처리를 시작하기 전에 들어오는 모든 행에 대한 액세스 권한이 필요합니다. 따라서 모든 행을 읽을 때까지 내부적으로 입력 행을 캐시해야 합니다. 마지막으로, 다른 구성 요소와 달리 비동기 출력이 있는 구성 요소는 입력 버퍼와 출력 버퍼를 모두 받습니다.

버퍼 이해

입력 버퍼는 .ProcessInput 이 버퍼는 업스트림 구성 요소에서 버퍼에 추가한 행을 포함합니다. 또한 업스트림 구성 요소의 출력에서 제공되었지만 비동기 구성 요소의 입력 컬렉션에 추가되지는 않은 열과 구성 요소의 입력 열도 포함합니다.

구성 요소에 PrimeOutput제공되는 출력 버퍼는 처음에 행을 포함하지 않습니다. 구성 요소는 이 버퍼에 행을 추가하고, 버퍼가 가득 차면 다운스트림 구성 요소에 버퍼를 제공합니다. 출력 버퍼에는 다른 다운스트림 구성 요소가 해당 출력에 추가한 열 외에도 구성 요소의 출력 열 컬렉션에 정의된 열이 포함됩니다.

이는 단일 공유 버퍼를 수신하는 동기 출력이 있는 구성 요소의 동작과 다릅니다. 동기 출력이 있는 구성 요소의 공유 버퍼에는 업스트림 및 다운스트림 구성 요소의 출력에 추가된 열 외에도 구성 요소의 입력 및 출력 열이 모두 포함됩니다.

행 처리

입력 행 캐싱

비동기 출력을 사용하여 구성 요소를 작성하는 경우 출력 버퍼에 행을 추가하는 세 가지 옵션이 있습니다. 입력 행을 수신할 때 추가하거나, 구성 요소가 업스트림 구성 요소에서 모든 행을 받을 때까지 캐시하거나, 구성 요소에 적합한 경우 추가할 수 있습니다. 선택하는 방법은 구성 요소의 요구 사항에 따라 달라집니다. 예를 들어 정렬 구성 요소의 경우에는 업스트림 행을 모두 받은 후에야 행을 정렬할 수 있습니다. 따라서 출력 버퍼에 행을 추가하기 전에 모든 행을 읽을 때까지 기다립니다.

입력 버퍼에서 수신되는 행은 처리 준비가 될 때까지 구성 요소에서 내부적으로 캐시해야 합니다. 들어오는 버퍼 행은 데이터 테이블, 다차원 배열 또는 기타 내부 구조에 캐시될 수 있습니다.

출력 행 추가

행이 수신될 때 또는 모든 행을 받은 후 출력 버퍼에 행을 추가하든 관계없이 출력 버퍼에서 메서드를 AddRow 호출하여 행을 추가합니다. 행을 추가한 후 새 행에 있는 각 열의 값을 설정합니다.

경우에 따라 출력 버퍼에 구성 요소의 출력 열 컬렉션보다 더 많은 열이 있으므로 해당 값을 설정하려면 먼저 버퍼에서 적절한 열의 인덱스를 찾아야 합니다. FindColumnByLineageID 속성의 BufferManager 메서드는 버퍼 행에서 지정된 계보 ID를 갖는 열 인덱스를 반환하며, 이 인덱스는 버퍼 열에 값을 할당하는 데 사용됩니다.

PreExecute 메서드 또는 ProcessInput 메서드 앞에 PrimeOutput 호출되는 메서드는 속성을 사용할 수 있는 BufferManager 첫 번째 메서드이며 입력 및 출력 버퍼에서 열의 인덱스를 찾을 수 있는 첫 번째 기회입니다.

예제

다음 예제에서는 비동기 출력을 사용하며 행을 받을 때 출력 버퍼에 행을 추가하는 간단한 변환 구성 요소를 보여 줍니다. 이 샘플에서는 이 항목에서 설명하는 모든 메서드와 기능을 보여 주지 않습니다. 여기에서는 비동기 출력을 사용하는 모든 사용자 지정 변환 구성 요소에서 재정의해야 하는 중요한 메서드를 보여 주지만 디자인 타임 유효성 검사를 위한 코드는 포함하지 않습니다. 또한 코드에서는 ProcessInput 출력 열 컬렉션에 입력 열 컬렉션의 각 열에 대해 하나의 열이 있다고 가정합니다.

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
   [DtsPipelineComponent(DisplayName = "AsynchronousOutput")]  
   public class AsynchronousOutput : PipelineComponent  
   {  
      PipelineBuffer outputBuffer;  
      int[] inputColumnBufferIndexes;  
      int[] outputColumnBufferIndexes;  
  
      public override void ProvideComponentProperties()  
      {  
         // Let the base class add the input and output objects.  
         base.ProvideComponentProperties();  
  
         // Name the input and output, and make the  
         // output asynchronous.  
         ComponentMetaData.InputCollection[0].Name = "Input";  
         ComponentMetaData.OutputCollection[0].Name = "AsyncOutput";  
         ComponentMetaData.OutputCollection[0].SynchronousInputID = 0;  
      }  
      public override void PreExecute()  
      {  
         IDTSInput100 input = ComponentMetaData.InputCollection[0];  
         IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
  
         inputColumnBufferIndexes = new int[input.InputColumnCollection.Count];  
         outputColumnBufferIndexes = new int[output.OutputColumnCollection.Count];  
  
         for (int col = 0; col < input.InputColumnCollection.Count; col++)  
            inputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[col].LineageID);  
  
         for (int col = 0; col < output.OutputColumnCollection.Count; col++)  
            outputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[col].LineageID);  
  
      }  
  
      public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)  
      {  
         if (buffers.Length != 0)  
            outputBuffer = buffers[0];  
      }  
      public override void ProcessInput(int inputID, PipelineBuffer buffer)  
      {  
            // Advance the buffer to the next row.  
            while (buffer.NextRow())  
            {  
               // Add a row to the output buffer.  
               outputBuffer.AddRow();  
               for (int x = 0; x < inputColumnBufferIndexes.Length; x++)  
               {  
                  // Copy the data from the input buffer column to the output buffer column.  
                  outputBuffer[outputColumnBufferIndexes[x]] = buffer[inputColumnBufferIndexes[x]];  
               }  
            }  
         if (buffer.EndOfRowset)  
         {  
            // EndOfRowset on the input buffer is true.  
            // Set EndOfRowset on the output buffer.  
            outputBuffer.SetEndOfRowset();  
         }  
      }  
   }  
}  
Imports System  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  
  
Namespace Microsoft.Samples.SqlServer.Dts  
  
    <DtsPipelineComponent(DisplayName:="AsynchronousOutput")> _  
    Public Class AsynchronousOutput  
  
        Inherits PipelineComponent  
  
        Private outputBuffer As PipelineBuffer  
        Private inputColumnBufferIndexes As Integer()  
        Private outputColumnBufferIndexes As Integer()  
  
        Public Overrides Sub ProvideComponentProperties()  
  
            ' Let the base class add the input and output objects.  
            Me.ProvideComponentProperties()  
  
            ' Name the input and output, and make the  
            ' output asynchronous.  
            ComponentMetaData.InputCollection(0).Name = "Input"  
            ComponentMetaData.OutputCollection(0).Name = "AsyncOutput"  
            ComponentMetaData.OutputCollection(0).SynchronousInputID = 0  
        End Sub  
  
        Public Overrides Sub PreExecute()  
  
            Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
            Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
  
            ReDim inputColumnBufferIndexes(input.InputColumnCollection.Count)  
            ReDim outputColumnBufferIndexes(output.OutputColumnCollection.Count)  
  
            For col As Integer = 0 To input.InputColumnCollection.Count  
                inputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(col).LineageID)  
            Next  
  
            For col As Integer = 0 To output.OutputColumnCollection.Count  
                outputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(col).LineageID)  
            Next  
  
        End Sub  
        Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())  
  
            If buffers.Length <> 0 Then  
                outputBuffer = buffers(0)  
            End If  
  
        End Sub  
  
        Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
                ' Advance the buffer to the next row.  
                While (buffer.NextRow())  
  
                    ' Add a row to the output buffer.  
                    outputBuffer.AddRow()  
                    For x As Integer = 0 To inputColumnBufferIndexes.Length  
  
                        ' Copy the data from the input buffer column to the output buffer column.  
                        outputBuffer(outputColumnBufferIndexes(x)) = buffer(inputColumnBufferIndexes(x))  
  
                    Next  
                End While  
  
            If buffer.EndOfRowset = True Then  
                ' EndOfRowset on the input buffer is true.  
                ' Set the end of row set on the output buffer.  
                outputBuffer.SetEndOfRowset()  
            End If  
        End Sub  
    End Class  
End Namespace  

참고 항목

동기 출력을 사용하여 사용자 지정 변환 구성 요소 개발
동기 및 비동기 변환 이해
스크립트 구성 요소를 사용하여 비동기 변환 만들기