Condividi tramite


Sviluppo di componenti flusso di dati con più input

Un componente flusso di dati con più input può utilizzare una quantità di memoria eccessiva se i relativi input generano dati a frequenze irregolari. Quando si sviluppa un componente flusso di dati personalizzato in grado di supportare due o più input, è possibile gestire la richiesta di memoria tramite i membri dello spazio dei nomi Microsoft.SqlServer.Dts.Pipeline seguenti:

  • Proprietà DtsPipelineComponentAttribute.SupportsBackPressure della classe DtsPipelineComponentAttribute. Impostare il valore di questa proprietà su true per implementare il codice necessario affinché il componente flusso di dati personalizzato gestisca i dati generati a frequenze irregolari.

  • Metodo IsInputReady della classe PipelineComponent. È necessario fornire un'implementazione di questo metodo se si imposta la proprietà SupportsBackPressure su true. Se non si fornisce un'implementazione, nel motore del flusso di dati viene generata un'eccezione durante la fase di esecuzione.

  • Metodo GetDependentInputs della classe PipelineComponent. È inoltre necessario fornire un'implementazione di questo metodo se si imposta la proprietà SupportsBackPressure su true e il componente personalizzato supporta più di due input. Se non si fornisce un'implementazione, nel motore del flusso di dati viene generata un'eccezione durante la fase di esecuzione se l'utente collega più di due input.

Insieme, questi membri consentono di sviluppare una soluzione per richieste elevate di memoria simili alla soluzione sviluppata da Microsoft per le trasformazioni Merge e Merge join.

Impostazione della proprietà SupportsBackPressure

Il primo passaggio necessario per implementare una migliore gestione della memoria per un componente flusso di dati personalizzato che supporta più input consiste nell'impostare il valore della proprietà SupportsBackPressure su true in DtsPipelineComponentAttribute. Quando il valore di SupportsBackPressure è true, il motore del flusso di dati chiama il metodo IsInputReady e se sono presenti più di due input chiama anche il metodo GetDependentInputs durante la fase di esecuzione.

Esempio

Nell'esempio seguente l'implementazione di DtsPipelineComponentAttribute comporta l'impostazione del valore di SupportsBackPressure su true.

[DtsPipelineComponent(ComponentType = ComponentType.Transform,
        DisplayName = "Shuffler",
        Description = "Shuffle the rows from input.",
        SupportsBackPressure = true,
        LocalizationType = typeof(Localized),
        IconResource = "Microsoft.Samples.SqlServer.Dts.MIBPComponent.ico")
]
public class Shuffler : Microsoft.SqlServer.Dts.Pipeline.PipelineComponent
        {
          ...
        }

Implementazione del metodo IsInputReady

Quando si imposta il valore della proprietà SupportsBackPressure su true nell'oggetto DtsPipelineComponentAttribute, è necessario fornire un'implementazione per il metodo IsInputReady della classe PipelineComponent.

[!NOTA]

L'implementazione del metodo IsInputReady non deve chiamare le implementazioni nella classe di base. L'implementazione predefinita di questo metodo nella classe di base causa semplicemente la generazione di un'eccezione NotImplementedException.

Quando si implementa questo metodo, si imposta lo stato di un elemento nella matrice canProcess booleana per ogni input del componente. Gli input sono identificabili tramite i relativi valori ID nella matrice inputIDs. Quando si imposta il valore di un elemento nella matrice canProcess su true per un input, il motore del flusso di dati chiama il metodo ProcessInput del componente e fornisce una maggiore quantità di dati per l'input specificato.

Mentre sono disponibili più dati upstream, il valore dell'elemento di matrice canProcess per almeno un input deve sempre essere true. In caso contrario, l'elaborazione viene arrestata.

Il motore del flusso di dati chiama il metodo IsInputReady prima di inviare ogni buffer di dati per determinare quali input sono in attesa di ricevere altri dati. Quando il valore restituito indica che un input è bloccato, il motore del flusso di dati memorizza temporaneamente nella cache buffer aggiuntivi di dati per l'input anziché inviarli al componente.

[!NOTA]

Il metodo IsInputReady o GetDependentInputs non viene chiamato nel codice personalizzato. Il motore del flusso di dati chiama questi metodi e gli altri metodi della classe PipelineComponent di cui si esegue l'override, quando nel motore del flusso di dati viene eseguito il componente.

Esempio

Nell'esempio seguente l'implementazione del metodo IsInputReady indica che un input è in attesa di ricevere altri dati quando sussistono le condizioni seguenti:

  • È disponibile una maggiore quantità di dati upstream per l'input (!inputEOR).

  • Per il componente non sono attualmente disponibili dati da elaborare per l'input nei buffer già ricevuti dal componente (inputBuffers[inputIndex].CurrentRow() == null).

Se un input è in attesa di ricevere una maggiore quantità di dati, il componente flusso di dati indica questa situazione impostando su true il valore dell'elemento nella matrice canProcess che corrisponde a quell'input.

Viceversa, se per il componente sono ancora disponibili dati da elaborare per l'input, viene sospesa l'elaborazione dell'input. A tale scopo, viene impostato su false il valore dell'elemento nella matrice canProcess che corrisponde a quell'input.

public override void IsInputReady(int[] inputIDs, ref bool[] canProcess)
{
    for (int i = 0; i < inputIDs.Length; i++)
    {
        int inputIndex = ComponentMetaData.InputCollection.GetObjectIndexByID(inputIDs[i]);

        canProcess[i] = (inputBuffers[inputIndex].CurrentRow() == null)
            && !inputEOR[inputIndex];
    }
}

Nell'esempio precedente viene utilizzata la matrice inputEOR booleana per indicare se sono disponibili più dati upstream per ogni input. EOR nel nome della matrice rappresenta "la fine del set di righe" e fa riferimento alla proprietà EndOfRowset dei buffer del flusso di dati. In una parte dell'esempio non inclusa in questo argomento, il metodo ProcessInput controlla il valore della proprietà EndOfRowset per ogni buffer di dati ricevuto. Se un valore true indica che per un input non sono disponibili altri dati upstream, il valore dell'elemento di matrice inputEOR per quell'input viene impostato su true. In questo esempio del metodo IsInputReady il valore dell'elemento corrispondente nella matrice canProcess viene impostato su false per un input se il valore dell'elemento di matrice inputEOR indica che per l'input non sono disponibili altri dati upstream.

Implementazione del metodo GetDependentInputs

Se il componente flusso di dati personalizzato supporta più di due input, è necessario fornire un'implementazione per il metodo GetDependentInputs della classe PipelineComponent.

[!NOTA]

L'implementazione del metodo GetDependentInputs non deve chiamare le implementazioni nella classe di base. L'implementazione predefinita di questo metodo nella classe di base causa semplicemente la generazione di un'eccezione NotImplementedException.

Il motore del flusso di dati chiama solo il metodo GetDependentInputs quando l'utente collega più di due input al componente. Se un componente dispone solo di due input e il metodo IsInputReady indica che un input è bloccato (canProcess = false), il motore del flusso di dati riconosce che l'altro input è in attesa di ricevere altri dati. Se tuttavia sono presenti più di due input e il metodo IsInputReady indica che un input è bloccato, il codice aggiuntivo nel metodo GetDependentInputs identifica quali input sono in attesa di ricevere altri dati.

[!NOTA]

Il metodo IsInputReady o GetDependentInputs non viene chiamato nel codice personalizzato. Il motore del flusso di dati chiama questi metodi e gli altri metodi della classe PipelineComponent di cui si esegue l'override, quando nel motore del flusso di dati viene eseguito il componente.

Esempio

Per un input specifico bloccato, l'implementazione seguente del metodo GetDependentInputs restituisce una raccolta degli input in attesa di ricevere altri dati che quindi bloccano l'input specificato. Il componente identifica gli input di blocco cercando gli input diversi da quello bloccato per i quali non sono attualmente disponibili dati da elaborare nei buffer già ricevuti dal componente (inputBuffers[i].CurrentRow() == null). Il metodo GetDependentInputs restituisce quindi la raccolta di input di blocco come una raccolta di ID di input.

        public override Collection<int> GetDependentInputs(int blockedInputID)
        {
            Collection<int> currentDependencies = new Collection<int>();
            for (int i = 0; i < ComponentMetaData.InputCollection.Count; i++)
            {
                if (ComponentMetaData.InputCollection[i].ID != blockedInputID
                    && inputBuffers[i].CurrentRow() == null)
                {
                    currentDependencies.Add(ComponentMetaData.InputCollection[i].ID);
                }
            }
            
            return currentDependencies;
        }