Condividi tramite


Metodi di runtime di un componente del flusso di dati

Si applica a: SQL Server SSIS Integration Runtime in Azure Data Factory

In fase di esecuzione l'attività Flusso di dati esamina la sequenza di componenti, prepara un piano di esecuzione e gestisce un pool di thread di lavoro che eseguono il piano di lavoro. L'attività carica righe di dati dalle origini, li elabora tramite trasformazioni, quindi li salva nelle destinazioni.

Sequenza di esecuzione dei metodi

Durante l'esecuzione di un componente del flusso di dati, viene chiamato un subset dei metodi nella classe di base PipelineComponent. I metodi, così come la sequenza con cui vengono chiamati, sono sempre gli stessi, ad eccezione dei metodi PrimeOutput e ProcessInput. Questi due metodi vengono chiamati in base all'esistenza e alla configurazione degli oggetti IDTSInput100 e IDTSOutput100 di un componente.

Nell'elenco seguente sono illustrati i metodi nell'ordine in cui vengono chiamati durante l'esecuzione del componente. Si noti che PrimeOutput, se chiamato, viene sempre chiamato prima di ProcessInput.

Metodo PrimeOutput

Il metodo PrimeOutput viene chiamato quando un componente include almeno un output, connesso a un componente a valle tramite un oggetto IDTSPath100, la cui proprietà SynchronousInputID è zero. Il metodo PrimeOutput viene chiamato per i componenti di origine e per le trasformazioni con output asincroni. A differenza del metodo ProcessInput descritto di seguito, il metodo PrimeOutput viene chiamato una sola volta per ogni componente che lo richiede.

Metodo ProcessInput

Il metodo ProcessInput viene chiamato per i componenti che includono almeno un input connesso a un componente a monte tramite un oggetto IDTSPath100. Il metodo ProcessInput viene chiamato per i componenti di destinazione e per le trasformazioni con output sincroni. ProcessInput viene chiamato ripetutamente finché non sono più disponibili righe da elaborare dei componenti a monte.

Utilizzo di input e output

In fase di esecuzione i componenti del flusso di dati eseguono le attività seguenti:

  • I componenti di origine aggiungono righe.

  • I componenti di trasformazione con output sincroni ricevono le righe fornite dai componenti di origine.

  • I componenti di trasformazione con output asincroni ricevono e aggiungono righe.

  • I componenti di destinazione ricevono righe e quindi le caricano in una destinazione.

Durante l'esecuzione, l'attività Flusso di dati alloca oggetti PipelineBuffer che contengono tutte le colonne definite nelle raccolte di colonne di output di una sequenza di componenti. Ad esempio, se ognuno dei quattro componenti in una sequenza del flusso di dati aggiunge una colonna di output alla raccolta di colonne di output, il buffer fornito a ogni componente contiene quattro colonne, una per ogni colonna di output per componente. A causa di questo comportamento, un componente a volte riceve buffer contenenti colonne che non utilizza.

Poiché i buffer ricevuti dal componente possono contenere colonne che il componente non utilizzerà, è necessario individuare le colonne che si desidera utilizzare nelle raccolte di colonne di input e output del componente nel buffer fornito al componente dall'attività Flusso di dati. A tale scopo, utilizzare il metodo FindColumnByLineageID della proprietà BufferManager. Per motivi di prestazioni, questa attività viene in genere eseguita durante il metodo PreExecute, anziché in PrimeOutput o ProcessInput.

PreExecute viene chiamato prima dei metodi PrimeOutput e ProcessInput e rappresenta la prima opportunità per un componente di eseguire questa operazione dopo che BufferManager diventa disponibile per il componente. Durante questo metodo, il componente deve individuare le proprie colonne nei buffer e archiviare questa informazione internamente affinché le colonne possano essere utilizzate nei metodi PrimeOutput o ProcessInput.

Nell'esempio di codice seguente viene illustrata la modalità con cui un componente di trasformazione con output sincroni individua le proprie colonne di input nel buffer durante PreExecute.

private int []bufferColumnIndex;  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
    bufferColumnIndex = new int[input.InputColumnCollection.Count];  
  
    for( int x=0; x < input.InputColumnCollection.Count; x++)  
    {  
        IDTSInputColumn100 column = input.InputColumnCollection[x];  
        bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);  
    }  
}  
Dim bufferColumnIndex As Integer()  
  
    Public Overrides Sub PreExecute()  
  
        Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
  
        ReDim bufferColumnIndex(input.InputColumnCollection.Count)  
  
        For x As Integer = 0 To input.InputColumnCollection.Count  
  
            Dim column As IDTSInputColumn100 = input.InputColumnCollection(x)  
            bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)  
  
        Next  
  
    End Sub  

Aggiunta di righe

I componenti forniscono righe ai componenti a valle tramite l'aggiunta di righe agli oggetti PipelineBuffer. L'attività Flusso di dati fornisce una matrice di buffer di output, uno per ogni oggetto IDTSOutput100 connesso a un componente a valle, come parametro per il metodo PrimeOutput. I componenti di origine e i componenti di trasformazione con output asincroni aggiungono righe ai buffer e quando hanno terminato chiamano il metodo SetEndOfRowset. L'attività Flusso di dati gestisce i buffer di output che fornisce ai componenti e, quando un buffer diventa pieno, sposta automaticamente le righe presenti al suo interno nel componente successivo. Il metodo PrimeOutput viene chiamato una sola volta per ogni componente, a differenza del metodo ProcessInput che viene chiamato ripetutamente.

Nell'esempio di codice seguente viene illustrata la modalità con cui un componente aggiunge righe ai propri buffer di output durante il metodo PrimeOutput, quindi chiama il metodo SetEndOfRowset.

public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)  
{  
    for( int x=0; x < outputs; x++ )  
    {  
        IDTSOutput100 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);  
        PipelineBuffer buffer = buffers[x];  
  
        // TODO: Add rows to the output buffer.  
    }  
    foreach( PipelineBuffer buffer in buffers )  
    {  
        /// Notify the data flow task that no more rows are coming.  
        buffer.SetEndOfRowset();  
    }  
}  
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)  
  
    For x As Integer = 0 To outputs.MaxValue  
  
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))  
        Dim buffer As PipelineBuffer = buffers(x)  
  
        ' TODO: Add rows to the output buffer.  
  
    Next  
  
    For Each buffer As PipelineBuffer In buffers  
  
        ' Notify the data flow task that no more rows are coming.  
        buffer.SetEndOfRowset()  
  
    Next  
  
End Sub  

Per altre informazioni sullo sviluppo di componenti che aggiungono righe ai buffer di output, vedere Sviluppo di un componente di origine personalizzato e Sviluppo di un componente di trasformazione personalizzato con output asincroni.

Ricezione di righe

I componenti ricevono righe dai componenti a monte in oggetti PipelineBuffer. L'attività Flusso di dati fornisce un oggetto PipelineBuffer che contiene le righe aggiunte al flusso di dati dai componenti a monte come parametro per il metodo ProcessInput. Questo buffer di input può essere utilizzato per esaminare e modificare le righe e le colonne nel buffer, ma non può essere utilizzato per aggiungere o rimuovere righe. Il metodo ProcessInput viene chiamato ripetutamente finché non sono più disponibili buffer. L'ultima volta che viene chiamato, la proprietà EndOfRowset è true. È possibile scorrere la raccolta di righe nel buffer tramite il metodo NextRow, che fa avanzare il buffer alla riga successiva. Questo metodo restituisce false quando il buffer si trova nell'ultima riga della raccolta. Non è necessario controllare la proprietà EndOfRowset a meno che non occorra eseguire un'azione aggiuntiva dopo l'elaborazione delle ultime righe di dati.

Nel testo riportato di seguito viene illustrato l'utilizzo corretto del metodo NextRow e della proprietà EndOfRowset:

while (buffer.NextRow())

{

// Do something with each row.

}

if (buffer.EndOfRowset)

{

// Optionally, do something after all rows have been processed.

}

Nell'esempio di codice seguente viene illustrata la modalità con cui un componente elabora le righe nei buffer di input durante il metodo ProcessInput.

public override void ProcessInput( int inputID, PipelineBuffer buffer )  
{  
    {  
        IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);  
        while( buffer.NextRow())  
        {  
            // TODO: Examine the columns in the current row.  
        }  
}  
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
        Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)  
        While buffer.NextRow() = True  
  
            ' TODO: Examine the columns in the current row.  
        End While  
  
End Sub  

Per altre informazioni sullo sviluppo di componenti che ricevono righe nei buffer di input, vedere Sviluppo di un componente di destinazione personalizzato e Sviluppo di un componente di trasformazione personalizzato con output sincroni.

Vedi anche

Metodi della fase di progettazione di un componente flusso di dati