Compartir por


Cómo: Implementar un patrón de flujo de datos productor-consumidor

En este artículo, aprenderá a usar la biblioteca de flujos de datos de TPL para implementar un patrón productor-consumidor. En este patrón, el productor envía mensajes a un bloque de mensajes y el consumidor lee los mensajes de ese bloque.

Nota:

La biblioteca de flujos de datos TPL (el System.Threading.Tasks.Dataflow namespace) se incluye en .NET 6 y posteriores. Para los proyectos de .NET Framework y .NET Standard, debe instalar el 📦 paquete NuGet System.Threading.Tasks.Dataflow.

Example

En el ejemplo siguiente se muestra un modelo básico de productor-consumidor que usa el flujo de datos. El Produce método escribe matrices que contienen bytes aleatorios de datos en un System.Threading.Tasks.Dataflow.ITargetBlock<TInput> objeto y el Consume método lee bytes de un System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> objeto . Al actuar en las ISourceBlock<TOutput> interfaces y ITargetBlock<TInput> , en lugar de sus tipos derivados, puede escribir código reutilizable que pueda actuar en una variedad de tipos de bloques de flujo de datos. En este ejemplo se usa la BufferBlock<T> clase . Dado que la BufferBlock<T> clase actúa como un bloque de origen y como bloque de destino, el productor y el consumidor pueden usar un objeto compartido para transferir datos.

El Produce método llama al Post método en un bucle para escribir datos de forma sincrónica en el bloque de destino. Una vez que el Produce método escribe todos los datos en el bloque de destino, llama al Complete método para indicar que el bloque nunca tendrá datos adicionales disponibles. El Consume método usa los operadores async y await (Async y Await en Visual Basic) para calcular de forma asincrónica el número total de bytes que se reciben del ISourceBlock<TOutput> objeto. Para actuar de forma asincrónica, el Consume método llama al OutputAvailableAsync método para recibir una notificación cuando el bloque de origen tiene datos disponibles y cuando el bloque de origen nunca tendrá datos adicionales disponibles.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Programación sólida

En el ejemplo anterior se usa solo un consumidor para procesar los datos de origen. Si tiene varios consumidores en la aplicación, use el TryReceive método para leer datos del bloque de origen, como se muestra en el ejemplo siguiente.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

El TryReceive método devuelve False cuando no hay datos disponibles. Cuando varios consumidores deben acceder al bloque de origen simultáneamente, este mecanismo garantiza que los datos siguen estando disponibles después de la llamada a OutputAvailableAsync.

Consulte también