使用資料流程元件中的錯誤輸出
適用於:Azure Data Factory 中的 SQL Server SSIS Integration Runtime
您可以將呼叫錯誤輸出的特殊 IDTSOutput100 物件加入元件,讓元件將無法在執行期間處理的資料列重新導向。 元件可能遇到的問題通常會歸類為錯誤或是截斷,而且是每個元件特有的。 提供錯誤輸出的元件透過從結果集篩選出錯誤資料列、當問題發生時讓元件失敗,以及忽略錯誤並繼續,讓元件的使用者有處理錯誤狀況的彈性。
若要在元件中實作和支援錯誤輸出,您必須先將元件的 UsesDispositions 屬性設定為 true。 接著您必須將輸出新增至其 IsErrorOut 屬性設定為 true 的元件。 最後,元件必須包含錯誤或截斷發生時會將資料列導向錯誤輸出的程式碼。 本主題涵蓋這三個步驟並說明同步與非同步錯誤輸出之間的差異。
建立錯誤輸出
您可以呼叫 OutputCollection 的 New 方法,然後將新輸出的 IsErrorOut 屬性設定為 true,以建立錯誤輸出。 如果輸出是非同步,則不需要對輸出再執行任何動作。 如果輸出是同步的,而且對同一個輸入而言有另一個同步的輸出,則也必須設定 ExclusionGroup 與 SynchronousInputID 屬性。 對同一個輸入而言屬同步的兩個輸出間,這兩個屬性值必須相同。 如果這些屬性不是設定成非零的值,輸入所提供的資料列會傳送至對輸入同步的兩個輸出。
當元件在執行期間遇到錯誤或是截斷,它會根據發生錯誤之輸入或輸出,或是輸入或輸出資料行的 ErrorRowDisposition 與 TruncationRowDisposition 屬性的設定繼續。 這些屬性值預設應該設定為 RD_NotUsed。 當元件的錯誤輸出連接到下游元件時,會由元件的使用者設定這個屬性,並讓使用者控制元件處理錯誤或截斷的方式。
擴展錯誤資料行
在建立錯誤輸出時,資料流程工作會自動將兩個資料行加入輸出資料行集合。 元件使用這些資料行來指定造成錯誤或是截斷的資料行識別碼,並提供元件特定的錯誤碼。 這些資料行是自動產生,但是資料行中所含的值必須由元件設定。
用以設定這些資料行值的方法,須視錯誤輸出是同步或非同步而定。 有同步輸出的元件會呼叫 DirectErrorRow 方法 (將在下一節做更詳細的討論),並以參數方式提供錯誤碼與錯誤資料行值。 具有非同步輸出的元件有兩個設定這些資料行值的選擇。 它們可以呼叫輸出緩衝區的 SetErrorInfo 方法並提供值,或是透過使用 FindColumnByLineageID 來找出緩衝區中的錯誤資料行,並直接為資料行設定值。 不過,因為資料行名稱可能已經變更,或是其在輸出資料行集合中的位置可能已經修改,第二個方法可能不可靠。 SetErrorInfo 方法會自動在這些錯誤資料行中設定值,而不必手動尋找它們。
如果您需要取得對應至特定錯誤碼的錯誤描述,可以使用 GetErrorDescription 介面 (透過元件的IDTSComponentMetaData100 屬性取得) 的 ComponentMetaData 方法。
下列程式碼範例顯示具有一個輸入與兩個輸出的元件,包括錯誤輸出。 第一個範例顯示如何建立與輸入同步的錯誤輸出。 第二個範例顯示如何建立非同步的錯誤輸出。
public override void ProvideComponentProperties()
{
// Specify that the component has an error output.
ComponentMetaData.UsesDispositions = true;
// Create the input.
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution.";
// Create the default output.
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
output.SynchronousInputID = input.ID;
output.ExclusionGroup = 1;
// Create the error output.
IDTSOutput100 errorOutput = ComponentMetaData.OutputCollection.New();
errorOutput.IsErrorOut = true;
errorOutput.Name = "ErrorOutput";
errorOutput.SynchronousInputID = input.ID;
errorOutput.ExclusionGroup = 1;
}
Public Overrides Sub ProvideComponentProperties()
' Specify that the component has an error output.
ComponentMetaData.UsesDispositions = True
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
' Create the input.
input.Name = "Input"
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution."
' Create the default output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "Output"
output.SynchronousInputID = input.ID
output.ExclusionGroup = 1
' Create the error output.
Dim errorOutput As IDTSOutput100 = ComponentMetaData.OutputCollection.New
errorOutput.IsErrorOut = True
errorOutput.Name = "ErrorOutput"
errorOutput.SynchronousInputID = input.ID
errorOutput.ExclusionGroup = 1
End Sub
下列程式碼範例會建立非同步的錯誤輸出。
public override void ProvideComponentProperties()
{
// Specify that the component has an error output.
ComponentMetaData.UsesDispositions = true;
// Create the input.
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution.";
// Create the default output.
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
// Create the error output.
IDTSOutput100 errorOutput = ComponentMetaData.OutputCollection.New();
errorOutput.Name = "ErrorOutput";
errorOutput.IsErrorOut = true;
}
Public Overrides Sub ProvideComponentProperties()
' Specify that the component has an error output.
ComponentMetaData.UsesDispositions = True
' Create the input.
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
' Create the default output.
input.Name = "Input"
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution."
' Create the error output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "Output"
Dim errorOutput As IDTSOutput100 = ComponentMetaData.OutputCollection.New
errorOutput.Name = "ErrorOutput"
errorOutput.IsErrorOut = True
End Sub
將資料列重新導向至錯誤輸出
在將錯誤輸出加入元件之後,您必須提供程式碼以處理元件特定的錯誤或是截斷狀況,並將錯誤或是截斷資料列重新導向至錯誤輸出。 您可以用這兩種方式來執行這項動作,端視錯誤輸出是同步或非同步而定。
重新導向具有同步輸出的資料列
透過呼叫 DirectErrorRow 類別的 PipelineBuffer 方法,將資料列傳送到同步輸出。 這個方法呼叫包括以參數傳遞錯誤輸出的識別碼、元件定義的錯誤碼以及元件無法處理之資料行的索引。
下列程式碼範例示範如何使用 DirectErrorRow 方法,將緩衝區中的資料列導向至同步錯誤輸出。
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
// This code sample assumes the component has two outputs, one the default,
// the other the error output. If the errorOutputIndex returned from GetErrorOutputInfo
// is 0, then the default output is the second output in the collection.
int defaultOutputID = -1;
int errorOutputID = -1;
int errorOutputIndex = -1;
GetErrorOutputInfo(ref errorOutputID,ref errorOutputIndex);
if (errorOutputIndex == 0)
defaultOutputID = ComponentMetaData.OutputCollection[1].ID;
else
defaultOutputID = ComponentMetaData.OutputCollection[0].ID;
while (buffer.NextRow())
{
try
{
// TODO: Implement code to process the columns in the buffer row.
// Ideally, your code should detect potential exceptions before they occur, rather
// than having a generic try/catch block such as this.
// However, because the error or truncation implementation is specific to each component,
// this sample focuses on actually directing the row, and not a single error or truncation.
// Unless an exception occurs, direct the row to the default
buffer.DirectRow(defaultOutputID);
}
catch
{
// Yes, has the user specified to redirect the row?
if (input.ErrorRowDisposition == DTSRowDisposition.RD_RedirectRow)
{
// Yes, direct the row to the error output.
// TODO: Add code to include the errorColumnIndex.
buffer.DirectErrorRow(errorOutputID, 0, errorColumnIndex);
}
else if (input.ErrorRowDisposition == DTSRowDisposition.RD_FailComponent || input.ErrorRowDisposition == DTSRowDisposition.RD_NotUsed)
{
// No, the user specified to fail the component, or the error row disposition was not set.
throw new Exception("An error occurred, and the DTSRowDisposition is either not set, or is set to fail component.");
}
else
{
// No, the user specified to ignore the failure so
// direct the row to the default output.
buffer.DirectRow(defaultOutputID);
}
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
' This code sample assumes the component has two outputs, one the default,
' the other the error output. If the errorOutputIndex returned from GetErrorOutputInfo
' is 0, then the default output is the second output in the collection.
Dim defaultOutputID As Integer = -1
Dim errorOutputID As Integer = -1
Dim errorOutputIndex As Integer = -1
GetErrorOutputInfo(errorOutputID, errorOutputIndex)
If errorOutputIndex = 0 Then
defaultOutputID = ComponentMetaData.OutputCollection(1).ID
Else
defaultOutputID = ComponentMetaData.OutputCollection(0).ID
End If
While buffer.NextRow
Try
' TODO: Implement code to process the columns in the buffer row.
' Ideally, your code should detect potential exceptions before they occur, rather
' than having a generic try/catch block such as this.
' However, because the error or truncation implementation is specific to each component,
' this sample focuses on actually directing the row, and not a single error or truncation.
' Unless an exception occurs, direct the row to the default
buffer.DirectRow(defaultOutputID)
Catch
' Yes, has the user specified to redirect the row?
If input.ErrorRowDisposition = DTSRowDisposition.RD_RedirectRow Then
' Yes, direct the row to the error output.
' TODO: Add code to include the errorColumnIndex.
buffer.DirectErrorRow(errorOutputID, 0, errorColumnIndex)
Else
If input.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent OrElse input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed Then
' No, the user specified to fail the component, or the error row disposition was not set.
Throw New Exception("An error occurred, and the DTSRowDisposition is either not set, or is set to fail component.")
Else
' No, the user specified to ignore the failure so
' direct the row to the default output.
buffer.DirectRow(defaultOutputID)
End If
End If
End Try
End While
End Sub
重新導向具有非同步輸出的資料列
具有同步錯誤輸出的元件是將資料列導向輸出,而具有非同步輸出的元件則是明確地將資料列加入輸出 PipelineBuffer,以便將資料列傳送到錯誤輸出。 實作使用非同步錯誤輸出的元件,需要將資料行加入提供給下游元件的錯誤輸出,並且為 PrimeOutput 方法期間提供給元件的錯誤輸出快取輸出緩衝區。 實作具有非同步輸出之元件的詳細資料,將於開發具有非同步輸出的自訂轉換元件主題中詳細說明。 如果資料行未明確地加入錯誤輸出,則加入輸出緩衝區的緩衝區資料列只會包含兩個錯誤資料行。
若要將資料列傳送到非同步錯誤輸出,則必須將資料列加入錯誤輸出緩衝區。 有時,資料列可能已經加入非錯誤輸出緩衝區,而您必須使用 RemoveRow 方法來移除此資料列。 接下來您要設定輸出緩衝區資料行值,最後則是呼叫 SetErrorInfo 方法,以提供元件特定的錯誤碼與錯誤資料行值。
下列範例示範如何使用具有非同步輸出之元件的錯誤輸出。 當模擬的錯誤發生時,元件會將資料列加入錯誤輸出緩衝區、將之前加入非錯誤輸出緩衝區的值複製到錯誤輸出緩衝區、移除加入非錯誤輸出緩衝區的資料列,最後再透過呼叫 SetErrorInfo 方法,設定錯誤碼與錯誤資料行值。
int []columnIndex;
int errorOutputID = -1;
int errorOutputIndex = -1;
public override void PreExecute()
{
IDTSOutput100 defaultOutput = null;
this.GetErrorOutputInfo(ref errorOutputID, ref errorOutputIndex);
foreach (IDTSOutput100 output in ComponentMetaData.OutputCollection)
{
if (output.ID != errorOutputID)
defaultOutput = output;
}
columnIndex = new int[defaultOutput.OutputColumnCollection.Count];
for(int col =0 ; col < defaultOutput.OutputColumnCollection.Count; col++)
{
IDTSOutputColumn100 column = defaultOutput.OutputColumnCollection[col];
columnIndex[col] = BufferManager.FindColumnByLineageID(defaultOutput.Buffer, column.LineageID);
}
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
for( int x=0; x < outputs; x++ )
{
if (outputIDs[x] == errorOutputID)
this.errorBuffer = buffers[x];
else
this.defaultBuffer = buffers[x];
}
int rows = 100;
Random random = new Random(System.DateTime.Now.Millisecond);
for (int row = 0; row < rows; row++)
{
try
{
defaultBuffer.AddRow();
for (int x = 0; x < columnIndex.Length; x++)
defaultBuffer[columnIndex[x]] = random.Next();
// Simulate an error.
if ((row % 2) == 0)
throw new Exception("A simulated error.");
}
catch
{
// Add a row to the error buffer.
errorBuffer.AddRow();
// Get the values from the default buffer
// and copy them to the error buffer.
for (int x = 0; x < columnIndex.Length; x++)
errorBuffer[columnIndex[x]] = defaultBuffer[columnIndex[x]];
// Set the error information.
errorBuffer.SetErrorInfo(errorOutputID, 1, 0);
// Remove the row that was added to the default buffer.
defaultBuffer.RemoveRow();
}
}
if (defaultBuffer != null)
defaultBuffer.SetEndOfRowset();
if (errorBuffer != null)
errorBuffer.SetEndOfRowset();
}
Private columnIndex As Integer()
Private errorOutputID As Integer = -1
Private errorOutputIndex As Integer = -1
Public Overrides Sub PreExecute()
Dim defaultOutput As IDTSOutput100 = Nothing
Me.GetErrorOutputInfo(errorOutputID, errorOutputIndex)
For Each output As IDTSOutput100 In ComponentMetaData.OutputCollection
If Not (output.ID = errorOutputID) Then
defaultOutput = output
End If
Next
columnIndex = New Integer(defaultOutput.OutputColumnCollection.Count) {}
Dim col As Integer = 0
While col < defaultOutput.OutputColumnCollection.Count
Dim column As IDTSOutputColumn100 = defaultOutput.OutputColumnCollection(col)
columnIndex(col) = BufferManager.FindColumnByLineageID(defaultOutput.Buffer, column.LineageID)
System.Math.Min(System.Threading.Interlocked.Increment(col),col-1)
End While
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim x As Integer = 0
While x < outputs
If outputIDs(x) = errorOutputID Then
Me.errorBuffer = buffers(x)
Else
Me.defaultBuffer = buffers(x)
End If
System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)
End While
Dim rows As Integer = 100
Dim random As Random = New Random(System.DateTime.Now.Millisecond)
Dim row As Integer = 0
While row < rows
Try
defaultBuffer.AddRow
Dim x As Integer = 0
While x < columnIndex.Length
defaultBuffer(columnIndex(x)) = random.Next
System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)
End While
' Simulate an error.
If (row Mod 2) = 0 Then
Throw New Exception("A simulated error.")
End If
Catch
' Add a row to the error buffer.
errorBuffer.AddRow
' Get the values from the default buffer
' and copy them to the error buffer.
Dim x As Integer = 0
While x < columnIndex.Length
errorBuffer(columnIndex(x)) = defaultBuffer(columnIndex(x))
System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)
End While
' Set the error information.
errorBuffer.SetErrorInfo(errorOutputID, 1, 0)
' Remove the row that was added to the default buffer.
defaultBuffer.RemoveRow
End Try
System.Math.Min(System.Threading.Interlocked.Increment(row),row-1)
End While
If Not (defaultBuffer Is Nothing) Then
defaultBuffer.SetEndOfRowset
End If
If Not (errorBuffer Is Nothing) Then
errorBuffer.SetEndOfRowset
End If
End Sub