Delen via


Foutuitvoer gebruiken in een gegevensstroomonderdeel

van toepassing op:SQL Server SSIS Integration Runtime in Azure Data Factory

Speciale IDTSOutput100 objecten die foutuitvoer worden genoemd, kunnen worden toegevoegd aan onderdelen, zodat het onderdeel rijen kan omleiden die tijdens de uitvoering niet kunnen worden verwerkt. De problemen die een onderdeel kan tegenkomen, worden over het algemeen gecategoriseerd als fouten of afkappingen en zijn specifiek voor elk onderdeel. Onderdelen die foutuitvoer bieden, bieden gebruikers van het onderdeel de flexibiliteit om foutvoorwaarden te verwerken door foutrijen uit de resultatenset te filteren, door het onderdeel te mislukken wanneer er een probleem optreedt of door fouten te negeren en door door te gaan.

Als u foutuitvoer in een onderdeel wilt implementeren en ondersteunen, moet u eerst de UsesDispositions eigenschap van het onderdeel instellen op waar. Vervolgens moet u een uitvoer toevoegen aan het onderdeel waarvoor de eigenschap is IsErrorOut ingesteld op true. Ten slotte moet het onderdeel code bevatten waarmee rijen worden omgeleid naar de foutuitvoer wanneer fouten of afkappingen optreden. In dit onderwerp worden deze drie stappen behandeld en worden de verschillen tussen synchrone en asynchrone foutuitvoer uitgelegd.

Een foutuitvoer maken

U maakt een foutuitvoer door de methode van de NewOutputCollectionaan te roepen en vervolgens de IsErrorOut eigenschap van de nieuwe uitvoer in te stellen op true. Als de uitvoer asynchroon is, moet er niets anders worden gedaan aan de uitvoer. Als de uitvoer synchroon is en er een andere uitvoer is die synchroon is met dezelfde invoer, moet u ook de ExclusionGroup en SynchronousInputID eigenschappen instellen. Beide eigenschappen moeten dezelfde waarden hebben als de andere uitvoer die synchroon is met dezelfde invoer. Als deze eigenschappen niet zijn ingesteld op een niet-nulwaarde, worden de rijen die door de invoer worden geleverd, verzonden naar beide uitvoerwaarden die synchroon zijn met de invoer.

Wanneer een onderdeel tijdens de uitvoering een fout of afkapping tegenkomt, wordt deze uitgevoerd op basis van de instellingen van de ErrorRowDisposition invoer TruncationRowDisposition of uitvoer, of invoer- of uitvoerkolom, waar de fout is opgetreden. De waarde van deze eigenschappen moet standaard worden ingesteld op RD_NotUsed. Wanneer de foutuitvoer van het onderdeel is verbonden met een downstreamonderdeel, wordt deze eigenschap ingesteld door de gebruiker van het onderdeel en kan de gebruiker bepalen hoe het onderdeel de fout of afkapping verwerkt.

Foutkolommen vullen

Wanneer er een foutuitvoer wordt gemaakt, voegt de gegevensstroomtaak automatisch twee kolommen toe aan de verzameling uitvoerkolommen. Deze kolommen worden door onderdelen gebruikt om de id van de kolom op te geven die de fout of afkapping heeft veroorzaakt en om de onderdeelspecifieke foutcode op te geven. Deze kolommen worden automatisch gegenereerd, maar de waarden in de kolommen moeten worden ingesteld door het onderdeel.

De methode die wordt gebruikt om de waarden van deze kolommen in te stellen, is afhankelijk van of de foutuitvoer synchroon of asynchroon is. Onderdelen met synchrone uitvoer roepen de DirectErrorRow methode aan, besproken in de volgende sectie en geven de foutcode en kolomwaarden voor fouten op als parameters. Onderdelen met asynchrone uitvoer hebben twee opties voor het instellen van de waarden van deze kolommen. Ze kunnen de methode van de SetErrorInfo uitvoerbuffer aanroepen en de waarden opgeven, of de foutkolommen in de buffer zoeken door FindColumnByLineageID de waarden voor de kolommen rechtstreeks in te stellen. Omdat de namen van de kolommen echter mogelijk zijn gewijzigd of de locatie in de uitvoerkolomverzameling is gewijzigd, is de laatste methode mogelijk niet betrouwbaar. De SetErrorInfo methode stelt automatisch de waarden in deze foutkolommen in zonder ze handmatig te hoeven vinden.

Als u de foutbeschrijving wilt verkrijgen die overeenkomt met een specifieke foutcode, kunt u de GetErrorDescription methode van de IDTSComponentMetaData100 interface gebruiken die beschikbaar is via de eigenschap van ComponentMetaData het onderdeel.

In de volgende codevoorbeelden ziet u een onderdeel met invoer en twee uitvoer, inclusief een foutuitvoer. In het eerste voorbeeld ziet u hoe u een foutuitvoer maakt die synchroon is met de invoer. In het tweede voorbeeld ziet u hoe u een foutuitvoer maakt die asynchroon is.

public override void ProvideComponentProperties()  
{  
    // Specify that the component has an error output.  
    ComponentMetaData.UsesDispositions = true;  
    // Create the input.  
    IDTSInput100 input = ComponentMetaData.InputCollection.New();  
    input.Name = "Input";  
    input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;  
    input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution.";  
  
    // Create the default output.  
    IDTSOutput100 output = ComponentMetaData.OutputCollection.New();  
    output.Name = "Output";  
    output.SynchronousInputID = input.ID;  
    output.ExclusionGroup = 1;  
  
    // Create the error output.  
    IDTSOutput100 errorOutput = ComponentMetaData.OutputCollection.New();  
    errorOutput.IsErrorOut = true;  
    errorOutput.Name = "ErrorOutput";  
    errorOutput.SynchronousInputID = input.ID;  
    errorOutput.ExclusionGroup = 1;  
  
}  
Public  Overrides Sub ProvideComponentProperties()   
  
 ' Specify that the component has an error output.  
 ComponentMetaData.UsesDispositions = True   
  
 Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New   
  
 ' Create the input.  
 input.Name = "Input"   
 input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed   
 input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution."   
  
 ' Create the default output.  
 Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New   
 output.Name = "Output"   
 output.SynchronousInputID = input.ID   
 output.ExclusionGroup = 1   
  
 ' Create the error output.  
 Dim errorOutput As IDTSOutput100 = ComponentMetaData.OutputCollection.New   
 errorOutput.IsErrorOut = True   
 errorOutput.Name = "ErrorOutput"   
 errorOutput.SynchronousInputID = input.ID   
 errorOutput.ExclusionGroup = 1   
  
End Sub  

In het volgende codevoorbeeld wordt een foutuitvoer gemaakt die asynchroon is.

public override void ProvideComponentProperties()  
{  
    // Specify that the component has an error output.  
    ComponentMetaData.UsesDispositions = true;  
  
    // Create the input.  
    IDTSInput100 input = ComponentMetaData.InputCollection.New();  
    input.Name = "Input";  
    input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;  
    input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution.";  
  
    // Create the default output.  
    IDTSOutput100 output = ComponentMetaData.OutputCollection.New();  
    output.Name = "Output";  
  
    // Create the error output.  
    IDTSOutput100 errorOutput = ComponentMetaData.OutputCollection.New();  
    errorOutput.Name = "ErrorOutput";  
    errorOutput.IsErrorOut = true;  
}  
Public  Overrides Sub ProvideComponentProperties()   
  
 ' Specify that the component has an error output.  
 ComponentMetaData.UsesDispositions = True   
  
 ' Create the input.  
 Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New   
  
 ' Create the default output.  
 input.Name = "Input"   
 input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed   
 input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution."   
  
 ' Create the error output.  
 Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New   
 output.Name = "Output"   
 Dim errorOutput As IDTSOutput100 = ComponentMetaData.OutputCollection.New   
 errorOutput.Name = "ErrorOutput"   
 errorOutput.IsErrorOut = True   
  
End Sub  

Een rij omleiden naar een foutuitvoer

Nadat u een foutuitvoer aan een onderdeel hebt toegevoegd, moet u code opgeven die de fout- of afkappingsvoorwaarden verwerkt die specifiek zijn voor het onderdeel en de fout- of afkappingsrijen omleidt naar de foutuitvoer. U kunt dit op twee manieren doen, afhankelijk van of de foutuitvoer synchroon of asynchroon is.

Een rij omleiden met synchrone uitvoer

Rijen worden verzonden naar synchrone uitvoer door de DirectErrorRow methode van de PipelineBuffer klasse aan te roepen. De methode-aanroep bevat als parameters de id van de foutuitvoer, de door het onderdeel gedefinieerde foutcode en de index van de kolom die het onderdeel niet kon verwerken.

In het volgende codevoorbeeld ziet u hoe u een rij in een buffer kunt doorsturen naar een synchrone foutuitvoer met behulp van de DirectErrorRow methode.

public override void ProcessInput(int inputID, PipelineBuffer buffer)  
{  
        IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);  
  
        // This code sample assumes the component has two outputs, one the default,  
        // the other the error output. If the errorOutputIndex returned from GetErrorOutputInfo  
        // is 0, then the default output is the second output in the collection.  
        int defaultOutputID = -1;  
        int errorOutputID = -1;  
        int errorOutputIndex = -1;  
  
        GetErrorOutputInfo(ref errorOutputID,ref errorOutputIndex);  
  
        if (errorOutputIndex == 0)  
            defaultOutputID = ComponentMetaData.OutputCollection[1].ID;  
        else  
            defaultOutputID = ComponentMetaData.OutputCollection[0].ID;  
  
        while (buffer.NextRow())  
        {  
            try  
            {  
                // TODO: Implement code to process the columns in the buffer row.  
  
                // Ideally, your code should detect potential exceptions before they occur, rather  
                // than having a generic try/catch block such as this.   
                // However, because the error or truncation implementation is specific to each component,  
                // this sample focuses on actually directing the row, and not a single error or truncation.  
  
                // Unless an exception occurs, direct the row to the default   
                buffer.DirectRow(defaultOutputID);  
            }  
            catch  
            {  
                // Yes, has the user specified to redirect the row?  
                if (input.ErrorRowDisposition == DTSRowDisposition.RD_RedirectRow)  
                {  
                    // Yes, direct the row to the error output.  
                    // TODO: Add code to include the errorColumnIndex.  
                    buffer.DirectErrorRow(errorOutputID, 0, errorColumnIndex);  
                }  
                else if (input.ErrorRowDisposition == DTSRowDisposition.RD_FailComponent || input.ErrorRowDisposition == DTSRowDisposition.RD_NotUsed)  
                {  
                    // No, the user specified to fail the component, or the error row disposition was not set.  
                    throw new Exception("An error occurred, and the DTSRowDisposition is either not set, or is set to fail component.");  
                }  
                else  
                {  
                    // No, the user specified to ignore the failure so   
                    // direct the row to the default output.  
                    buffer.DirectRow(defaultOutputID);  
                }  
  
            }  
        }  
}  
Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)   
   Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)   
  
   ' This code sample assumes the component has two outputs, one the default,  
   ' the other the error output. If the errorOutputIndex returned from GetErrorOutputInfo  
   ' is 0, then the default output is the second output in the collection.  
   Dim defaultOutputID As Integer = -1   
   Dim errorOutputID As Integer = -1   
   Dim errorOutputIndex As Integer = -1   
  
   GetErrorOutputInfo(errorOutputID, errorOutputIndex)   
  
   If errorOutputIndex = 0 Then   
     defaultOutputID = ComponentMetaData.OutputCollection(1).ID   
   Else   
     defaultOutputID = ComponentMetaData.OutputCollection(0).ID   
   End If   
  
   While buffer.NextRow   
     Try   
       ' TODO: Implement code to process the columns in the buffer row.  
  
       ' Ideally, your code should detect potential exceptions before they occur, rather  
       ' than having a generic try/catch block such as this.   
       ' However, because the error or truncation implementation is specific to each component,  
       ' this sample focuses on actually directing the row, and not a single error or truncation.  
  
       ' Unless an exception occurs, direct the row to the default   
       buffer.DirectRow(defaultOutputID)   
     Catch   
       ' Yes, has the user specified to redirect the row?  
       If input.ErrorRowDisposition = DTSRowDisposition.RD_RedirectRow Then   
         ' Yes, direct the row to the error output.  
         ' TODO: Add code to include the errorColumnIndex.  
         buffer.DirectErrorRow(errorOutputID, 0, errorColumnIndex)   
       Else   
         If input.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent OrElse input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed Then   
           ' No, the user specified to fail the component, or the error row disposition was not set.  
           Throw New Exception("An error occurred, and the DTSRowDisposition is either not set, or is set to fail component.")   
         Else   
           ' No, the user specified to ignore the failure so   
           ' direct the row to the default output.  
           buffer.DirectRow(defaultOutputID)   
         End If   
       End If   
     End Try   
   End While   
End Sub  

Een rij omleiden met asynchrone uitvoer

In plaats van rijen naar een uitvoer te sturen, zoals wordt gedaan met synchrone foutuitvoer, verzenden onderdelen met asynchrone uitvoer een rij naar een foutuitvoer door expliciet een rij toe te voegen aan de uitvoer PipelineBuffer. Voor het implementeren van een onderdeel dat gebruikmaakt van asynchrone foutuitvoer, moeten kolommen worden toegevoegd aan de foutuitvoer die wordt geleverd aan downstreamonderdelen en moet de uitvoerbuffer in de cache worden opgeslagen voor de foutuitvoer die tijdens de PrimeOutput methode aan het onderdeel wordt geleverd. De details van het implementeren van een onderdeel met asynchrone uitvoer worden gedetailleerd behandeld in het onderwerp Een aangepast transformatieonderdeel ontwikkelen met asynchrone uitvoer. Als kolommen niet expliciet worden toegevoegd aan de foutuitvoer, bevat de bufferrij die wordt toegevoegd aan de uitvoerbuffer alleen de twee foutkolommen.

Als u een rij wilt verzenden naar een asynchrone foutuitvoer, moet u een rij toevoegen aan de foutuitvoerbuffer. Soms is er al een rij toegevoegd aan de niet-foutuitvoerbuffer en moet u deze rij verwijderen met behulp van de RemoveRow methode. Vervolgens stelt u de waarden voor de uitvoerbufferkolommen in en ten slotte roept u de methode aan om de SetErrorInfo componentspecifieke foutcode en de waarde van de foutkolom op te geven.

In het volgende voorbeeld ziet u hoe u een foutuitvoer gebruikt voor een onderdeel met asynchrone uitvoer. Wanneer de gesimuleerde fout optreedt, voegt het onderdeel een rij toe aan de foutuitvoerbuffer, kopieert u de waarden die eerder aan de uitvoerbuffer zonder fouten zijn toegevoegd aan de uitvoerbuffer van de fout, verwijdert u de rij die is toegevoegd aan de uitvoerbuffer zonder fouten. Ten slotte stelt u de foutcode en de kolomwaarden voor fouten in door de SetErrorInfo methode aan te roepen.

int []columnIndex;  
int errorOutputID = -1;  
int errorOutputIndex = -1;  
  
public override void PreExecute()  
{  
    IDTSOutput100 defaultOutput = null;  
  
    this.GetErrorOutputInfo(ref errorOutputID, ref errorOutputIndex);  
    foreach (IDTSOutput100 output in ComponentMetaData.OutputCollection)  
    {  
        if (output.ID != errorOutputID)  
            defaultOutput = output;  
    }  
  
    columnIndex = new int[defaultOutput.OutputColumnCollection.Count];  
  
    for(int col =0 ; col < defaultOutput.OutputColumnCollection.Count; col++)  
    {  
        IDTSOutputColumn100 column = defaultOutput.OutputColumnCollection[col];  
        columnIndex[col] = BufferManager.FindColumnByLineageID(defaultOutput.Buffer, column.LineageID);  
    }  
}  
  
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)  
{  
    for( int x=0; x < outputs; x++ )  
    {  
        if (outputIDs[x] == errorOutputID)  
            this.errorBuffer = buffers[x];  
        else  
            this.defaultBuffer = buffers[x];  
    }  
  
    int rows = 100;  
  
    Random random = new Random(System.DateTime.Now.Millisecond);  
  
    for (int row = 0; row < rows; row++)  
    {  
        try  
        {  
            defaultBuffer.AddRow();  
  
            for (int x = 0; x < columnIndex.Length; x++)  
                defaultBuffer[columnIndex[x]] = random.Next();  
  
            // Simulate an error.  
            if ((row % 2) == 0)  
                throw new Exception("A simulated error.");  
        }  
        catch  
        {  
            // Add a row to the error buffer.  
            errorBuffer.AddRow();  
  
            // Get the values from the default buffer  
            // and copy them to the error buffer.  
            for (int x = 0; x < columnIndex.Length; x++)  
                errorBuffer[columnIndex[x]] = defaultBuffer[columnIndex[x]];  
  
            // Set the error information.  
            errorBuffer.SetErrorInfo(errorOutputID, 1, 0);  
  
            // Remove the row that was added to the default buffer.  
            defaultBuffer.RemoveRow();  
        }  
    }  
  
    if (defaultBuffer != null)  
        defaultBuffer.SetEndOfRowset();  
  
    if (errorBuffer != null)  
        errorBuffer.SetEndOfRowset();  
}  
Private columnIndex As Integer()   
Private errorOutputID As Integer = -1   
Private errorOutputIndex As Integer = -1   
  
Public  Overrides Sub PreExecute()   
 Dim defaultOutput As IDTSOutput100 = Nothing   
 Me.GetErrorOutputInfo(errorOutputID, errorOutputIndex)   
 For Each output As IDTSOutput100 In ComponentMetaData.OutputCollection   
   If Not (output.ID = errorOutputID) Then   
     defaultOutput = output   
   End If   
 Next   
 columnIndex = New Integer(defaultOutput.OutputColumnCollection.Count) {}   
 Dim col As Integer = 0   
 While col < defaultOutput.OutputColumnCollection.Count   
   Dim column As IDTSOutputColumn100 = defaultOutput.OutputColumnCollection(col)   
   columnIndex(col) = BufferManager.FindColumnByLineageID(defaultOutput.Buffer, column.LineageID)   
   System.Math.Min(System.Threading.Interlocked.Increment(col),col-1)   
 End While   
End Sub   
  
Public  Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())   
 Dim x As Integer = 0   
 While x < outputs   
   If outputIDs(x) = errorOutputID Then   
     Me.errorBuffer = buffers(x)   
   Else   
     Me.defaultBuffer = buffers(x)   
   End If   
   System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)   
 End While   
 Dim rows As Integer = 100   
 Dim random As Random = New Random(System.DateTime.Now.Millisecond)   
 Dim row As Integer = 0   
 While row < rows   
   Try   
     defaultBuffer.AddRow   
     Dim x As Integer = 0   
     While x < columnIndex.Length   
       defaultBuffer(columnIndex(x)) = random.Next   
       System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)   
     End While   
     ' Simulate an error.  
     If (row Mod 2) = 0 Then   
       Throw New Exception("A simulated error.")   
     End If   
   Catch   
     ' Add a row to the error buffer.  
     errorBuffer.AddRow   
     ' Get the values from the default buffer  
     ' and copy them to the error buffer.  
     Dim x As Integer = 0   
     While x < columnIndex.Length   
       errorBuffer(columnIndex(x)) = defaultBuffer(columnIndex(x))   
       System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)   
     End While   
     ' Set the error information.  
     errorBuffer.SetErrorInfo(errorOutputID, 1, 0)   
     ' Remove the row that was added to the default buffer.  
     defaultBuffer.RemoveRow   
   End Try   
   System.Math.Min(System.Threading.Interlocked.Increment(row),row-1)   
 End While   
 If Not (defaultBuffer Is Nothing) Then   
   defaultBuffer.SetEndOfRowset   
 End If   
 If Not (errorBuffer Is Nothing) Then   
   errorBuffer.SetEndOfRowset   
 End If   
End Sub  

Zie ook

Foutafhandeling in gegevens
Foutuitvoer gebruiken