Share via


Pipelines

The Pipeline pattern uses parallel tasks and concurrent queues to process a sequence of input values. Each task implements a stage of the pipeline, and the queues act as buffers that allow the stages of the pipeline to execute concurrently, even though the values are processed in order. You can think of software pipelines as analogous to assembly lines in a factory, where each item in the assembly line is constructed in stages. The partially assembled item is passed from one assembly stage to another. The outputs of the assembly line occur in the same order as that of the inputs.

A pipeline is composed of a series of producer/consumer stages, each one depending on the output of its predecessor. Pipelines allow you to use parallelism in cases where there are too many dependencies to use a parallel loop.

There are many ways to use pipelines. Pipelines are often useful when the data elements are received from a real-time event stream, such as values on stock ticker tapes, user-generated mouse click events, and packets that arrive over the network. Pipelines are also used to process elements from a data stream, as is done with compression and encryption, or processing streams of video frames. In all of these cases, it's important that the data elements are processed in sequential order.

Note

Don't confuse pipelines and parallel loops. Pipelines are used in cases where a parallel loop can't be used. With the Pipeline pattern, the data is processed in sequential order. The first input is transformed into the first output, the second input into the second output, and so on.

Pipelines are also useful for processor-intensive applications and for applications where the cost of I/O is significant.

The Basics

In the Microsoft® .NET Framework, the buffers that connect stages of a software pipeline are usually based on the BlockingCollection<T> class.

Figure 1 illustrates an example of a pipeline that has four stages. It reads words and sentence fragments from a data source, it corrects the punctuation and capitalization, it groups them into complete sentences, and it writes the sentences to a disk file.

Ff963548.fb6a92eb-ba74-4aeb-8760-9b7a7a5404a0-thumb(en-us,PandP.10).png

Figure 1

Sample pipeline

Each stage of the pipeline reads from a dedicated input and writes to a particular output. For example, the "Read Strings" task reads from a source and writes to buffer 1. All the stages of the pipeline can execute at the same time because concurrent queues buffer any shared inputs and outputs. If there are four available cores, the stages can run in parallel. As long as there is room in its output buffer, a stage of the pipeline can add the value it produces to its output queue. If the output buffer is full, the producer of the new value waits until space becomes available. Stages can also wait (that is, block) on inputs. An input wait is familiar from other programming contexts—if an enumeration or a stream does not have a value, the consumer of that enumeration or stream waits until a value is available or an "end of file" condition occurs. Blocking a collection works the same way. Using buffers that hold more than one value at a time compensates for variability in the time it takes to process each value.

The BlockingCollection<T> class lets you signal the "end of file" condition with the CompleteAdding method. This method tells the consumer that it can end its processing loop after all the data previously added to the collection is removed or processed.

The following code demonstrates how to implement a pipeline that uses the BlockingCollection class for the buffers and tasks for the stages of the pipeline.

int seed = ...
int BufferSize = ...
var buffer1 = new BlockingCollection<string>(BufferSize);
var buffer2 = new BlockingCollection<string>(BufferSize);
var buffer3 = new BlockingCollection<string>(BufferSize);

var f = new TaskFactory(TaskCreationOptions.LongRunning, 
                                                     TaskContinuationOptions.None);

var stage1 = f.StartNew(() => ReadStrings(buffer1, ...));
var stage2 = f.StartNew(() => CorrectCase(buffer1, buffer2));
var stage3 = f.StartNew(() => CreateSentences(buffer2, buffer3));
var stage4 = f.StartNew(() => WriteSentences(buffer3));

Task.WaitAll(stage1, stage2, stage3, stage4);

The first stage produces the input strings and places them in the first buffer. The second stage transforms the strings. The third stage combines the strings and produces sentences. The final stage writes the corrected sentences to a file.

The buffers are instances of the BlockingCollection<string> class. The argument to the constructor specifies the maximum number of values that can be buffered at any one time. In this case, the value is 32 for each buffer. For more information about how to choose the buffer size of a blocking collection, see the section, "Design Notes," later in this chapter.

As this example shows, tasks in a pipeline are usually created with the LongRunning option. For more information, see the section, "Anti-Patterns," later in this chapter.

The first stage of the pipeline includes a sequential loop that writes to an output buffer.

static void ReadStrings(BlockingCollection<string> output,
                        int seed)
{
  try
  {
    foreach (var phrase in PhraseSource(seed))
    {
      Stage1AdditionalWork();
      output.Add(phrase);
    }
  }
  finally
  {
    output.CompleteAdding();
  }
}

The sequential loop populates the output buffer with values. The values come from an external data source that's accessed by the PhraseSource method, which returns an ordinary single-threaded instance of IEnumerable<string>. The producer places a value in the blocking collection with the blocking collection's Add method. This method can potentially block if the queue is full. This is a way to limit stages of the pipeline that are executing faster than other stages.

The call to the CompleteAdding method is usually inside of a finally block so that it will execute even if an exception occurs.

Stages in the middle of the pipeline consume values from an input buffer and also produce values and place them into an output buffer. The following code shows how the stages are structured.

void DoStage(BlockingCollection<T> input,
             BlockingCollection<T> output)
  try
  {
     foreach (var item in input.GetConsumingEnumerable())
     {
        var result = ...
        output.Add(result);
     }
  }
  finally
  {
     output.CompleteAdding();
  }
}

You can look at the online source code to see the implementations of the CorrectCase and CreateSentences methods that make up stages 2 and 3 of this pipeline. They are structured in a very similar way to this example. The important point of this code is that the input blocking collection's GetConsumingEnumerable method returns an enumeration that a consumer can use to "take" the values. There may be many consumers of a single producer. When a consumer "takes" a value, no other consumer will see it.

Although this example doesn't show it, a blocking collection's GetConsumingEnumerable method can be called by more than one consumer. This allows values from the producer to be divided among multiple recipients. If a recipient gets a value from the blocking collection, no other consumer will also get that value.

The last stage of the pipeline consumes values from a blocking collection but doesn't produce values. Instead, it writes values to a stream. Here's the code.

static void WriteSentences(BlockingCollection<string> input)
{
  using (StreamWriter outfile = 
             new StreamWriter(PathForPipelineResults))
  {
    // ...
    foreach (var sentence in input.GetConsumingEnumerable())
    {
       var printSentence = ...
       outfile.WriteLine(printSentence);
    }
  }
}

One of the things that makes a pipeline easy to write in .NET is that you can rely on familiar sequential techniques such as iteration by using the IEnumerable<T> class. There is some synchronization, but it's hidden inside the implementation of the BlockingCollection<T> class.

(Some details of error handling, cancellation and the collection of performance data have been omitted from this example for clarity. To see error handling and cancellation code, review the full ImagePipeline sample that's mentioned later in this chapter.)

An Example

The online samples include an application named ImagePipeline. This application takes a directory of JPEG images and generates thumbnail versions, which are also post-processed with several image-enhancing filters. The resulting processed images are displayed as a slideshow. Each image in the slideshow is displayed in alphabetical file name order.

Note

You can't use a parallel loop for this example because the application has a requirement that images must be processed in sequence. Parallel loops do not guarantee any particular processing order.

Sequential Image Processing

Each image is processed in four stages: the large color image is loaded from a file, a small thumbnail with a picture frame is generated from it, noise is added to the image to create a speckling effect, and then the processed image displays as the next picture in the slideshow. Figure 2 illustrates this sequence.

Ff963548.e1315c8e-0e67-4f37-8762-5b572540f3b5(en-us,PandP.10).png

Figure 2

Sequential image processing

Here's the code for the sequential version.

string sourceDir = ...
IEnumerable<string> fileNames = ...
int count = 0;
ImageInfo info = null;

foreach (var fileName in fileNames)
{
  /// ...

  info = LoadImage(fileName, sourceDir, count, ...);
  ScaleImage(info);
  FilterImage(info);           
  // ...         
  DisplayImage(info, count + 1, displayFn, ...);
  // ...
                    
  count += 1;
  info = null;
}

The four steps are performed by the LoadImage, ScaleImage, FilterImage, and DisplayImage methods. This example is slightly abridged for clarity. The code that deals with cancellation, error handing (the disposal of handles to unmanaged objects), and the capture of performance measurements is omitted. You can refer to the online samples to see these details. For more information about cancellation and error handling, see the section, "Variations," later in this chapter.

The Image Pipeline

The sequential loop can process only one image at a time; each image must complete all four stages before work can begin on the next image, and the stages themselves are sequentially linked. In fact, this example seems intractably sequential—the top-level loop has the restriction that images must be displayed in order, and within each step are substeps that require inputs from previous substeps. You can't display an image until after the filter is applied to it. You can't apply the filter until after the image is scaled to thumbnail size. You can't do the scaling until after the original image loads.

However, the pipeline pattern can introduce parallelism into this example. Each image still passes through all four stages in sequence, but the stages can execute in parallel on different images. Figure 3 illustrates the image pipeline.

Ff963548.81d86924-d72f-4071-af91-e4eb23567b70-thumb(en-us,PandP.10).png

Figure 3

Image pipeline

The following code shows the parallel version.

IEnumerable<string> fileNames = ...
string sourceDir = ...
Action<ImageInfo> displayFn = ...
int limit = ...

var originalImages = new BlockingCollection<ImageInfo>(limit);
var thumbnailImages = new BlockingCollection<ImageInfo>(limit);
var filteredImages =  new BlockingCollection<ImageInfo>(limit);
try
{
  var f = new TaskFactory(TaskCreationOptions.LongRunning,       
                          TaskContinuationOptions.None);
  // ...
  var loadTask = f.StartNew(() =>
                    LoadPipelinedImages(fileNames, sourceDir, 
                                        originalImages, ...));
  var scaleTask = f.StartNew(() =>
                    ScalePipelinedImages(originalImages, 
                                         thumbnailImages, ...));
  var filterTask = f.StartNew(() =>
                    FilterPipelinedImages(thumbnailImages, 
                                          filteredImages, ...));
  var displayTask = f.StartNew(() =>
                    DisplayPipelinedImages(
                        filteredImages.GetConsumingEnumerable(), 
                        displayFn, ...));

  Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
}
finally
{
  // ... release handles to unmanaged resources ...
}

(Some details of error handling, cancellation, and the collection of performance data have been omitted here for clarity. Refer to the online sample for the complete implementation.)

There are three blocking collections that act as buffers between the stages of the pipeline. The four stages are the same as in the sequential version. A call to the task factory StartNew method launches each processing stage as a long-running task.

The code calls Task.WaitAll to defer cleanup until after all stages complete processing all images.

Performance Characteristics

To understand the performance characteristics of the sequential and pipelined versions, it's useful to look at a scheduling diagram such as the one that Figure 4 illustrates.

Ff963548.b156b6fe-9177-45d0-b94f-8357e1e8611a-thumb(en-us,PandP.10).png

Figure 4

Image pipeline with stages of equal speed

Figure 4 shows how the tasks in the image pipeline example execute over time. For example, the top row shows that stage 1 processes image 1 starting at time t0 and image 2 starting at time t1. Stage 2 begins processing image 1 at time t1. Assume for a moment that the pipeline is perfectly balanced; that is, each stage of the pipeline takes exactly the same amount of time to do its work. Call that duration T. Therefore, in the figure, t1 occurs after T units of time elapse, t2 after 2 x T units of time elapse, and so on.

If there are enough available cores to allow the pipeline's tasks to run in parallel, the figure shows that the expected execution time for six images in a pipeline with four stages is approximately 9 x T. In contrast, the sequential version takes approximately 24 x T because each of the 24 steps must be processed one after another.

The average performance improves as more images are processed. The reason for this, as Figure 4 illustrates, is that some cores are idle as the pipeline fills during startup and drains during shutdown. With a large number of images, the startup and shutdown time becomes relatively insignificant. The average time per image would approach T.

However, there's one catch: the assumption that all the pipeline steps take exactly the same amount of time isn't always true. Figure 5 shows the scheduling diagram that occurs when the filter stage takes twice as long as the other stages.

Ff963548.cfdd41b9-37d6-4ab2-b1cb-5693bb3fbf6c-thumb(en-us,PandP.10).png

Figure 5

Image pipeline with unequal stages

When one of the stages takes 2 x T units of time while the other stages take T units of time, you can see that it's not possible to keep all of the cores completely busy. On average (with a large number of images), the time to process an images is 2 x T. In other words, when there are enough cores for each pipeline stage, the speed of a pipeline is approximately equal to the speed of its slowest stage.

If you run the ImagePipeline application in the Microsoft® Visual Studio® development system debugger, you can see this effect for yourself. The ImagePipeline sample has a user interface (UI) feature that reports the average length of time in milliseconds for each of the stages of the pipeline. It also reports the overall average length of time that's needed to process each image. When you run the sample in sequential mode (by selecting the Sequential radio button), you'll notice that the steady-state elapsed time per image equals the sum of all the stages. When you run in pipeline mode, the average elapsed time per image converges to approximately the same amount of time as slowest stage. The most efficient pipelines have stages of equal speed. You won't always achieve this, but it's a worthy goal.

Variations

There are several variations to the pipeline pattern.

Canceling a Pipeline

Pipeline tasks work together to perform their work; they must also work together when they respond to a cancellation.

In the standard cancellation scenario, which was explained in Chapter 3, your task is passed a CancellationToken value from a higher layer of the application, such as the UI. In the case of a pipeline stage, you need to observe this cancellation token in two places. This is shown in the following code.

void DoStage(BlockingCollection<T> input,
             BlockingCollection<T> output,
             CancellationToken token)
  try
  {
     foreach (var item in input.GetConsumingEnumerable())
     {
        if (token.IsCancellationRequested) break;
        var result = ...
        output.Add(result, token);
     }
  }
  catch (OperationCanceledException) { }
  finally
  {
     output.CompleteAdding();
  }
}

A natural place to check for cancellation is at the beginning of the loop that processes items from the blocking collection. At this point, you only need to break from the loop, as shown in the following code.

  if (token.IsCancellationRequested) break;

The second place is less obvious. You use an overloaded version of the BlockingCollection<T> class's Add method that accepts a cancellation token as an argument. The following code shows how to do this.

  output.Add(result, token);

If you give the Add method a cancellation token, the blocking collection will be able to detect the cancellation request while it's waiting to add a new value.

To understand why it's necessary to add such detailed cancellation logic, recall that it's possible that a pipeline stage that's creating values can be blocked when it calls the Add method of its output queue. (If the output queue is full, the call to Add waits until space is available before returning.) If a cancellation request happened in this situation and you didn't check for it, it would be possible for your program to experience deadlock. Deadlock could happen because stages of the pipeline that consume values produced by this stage can also be canceled; therefore, it's very possible that later stages in the pipeline would terminate without draining the queue. The producer, which is blocked while waiting for space to become available, would have no way to proceed.

Note

Improperly canceling a pipeline can cause your program to experience deadlock. Follow the guidelines in this section carefully.

The solution, as shown in the example, is to use the overloaded version of the blocking collection's Add method that accepts a cancellation token as an argument. If a cancellation is requested while the producer waits for space to become available, the blocking collection creates and throws an OperationCanceledException instance.

If the loop exits normally, from the break keyword, or from a cancellation exception, the finally clause guarantees that the output buffer will be marked as completed. This will unblock any consumers that might be waiting for input, and they can then process the cancellation request.

Although it's also possible to check for a cancellation token while waiting for input from a blocking collection (there's an overloaded version of the GetConsumingEnumerable method that accepts a cancellation token), you don't need to do this if you use the techniques described in this section.

Be aware that if the type T implements the IDispose interface, under .NET coding conventions, you also must call the Dispose method on cancellation. You need to dispose of the current iteration's object as well as instances of T stored in the blocking queues. The online source code of the ImagePipeline example shows how to do this.

Handling Pipeline Exceptions

Exceptions are similar to cancellations. The difference between the two is that when an unhandled exception occurs within one of the pipeline stages, the tasks that execute the other stages don't by default receive notification that an exception has occurred elsewhere. Without such notification, there are several ways for the application to deadlock.

Note

When there is an unhandled exception in one pipeline stage, you should cancel the other stages. If you don't do this, deadlock can occur. Follow the guidelines in this section carefully.

Use a special instantiation of the CancellationTokenSource class to allow your application to coordinate the shutdown of all the pipeline stages when an exception occurs in one of them. Here's an example.

static void DoPipeline(CancellationToken token)
{
  using (CancellationTokenSource cts = 
          CancellationTokenSource.CreateLinkedTokenSource(token))
  {
     var f = new TaskFactory(TaskCreationOptions.LongRunning,                                                      
                             TaskContinuationOptions.None);

     var stage1 = f.StartNew(() => DoStage1(..., cts));
     var stage2 = f.StartNew(() => DoStage2(..., cts));
     var stage3 = f.StartNew(() => DoStage3(..., cts));
     var stage4 = f.StartNew(() => DoStage4(..., cts));
  }
  Task.WaitAll(stage1, stage2, stage3, stage4);
}

The CreateLinkedTokenSource method of the CancellationTokenSource class creates a handle that allows you to respond to an external cancellation request and also to initiate (and recognize) an internal cancellation request of your own. You pass the linked cancellation token source as an argument to the methods that execute your pipeline stages, so each stage can use the token source to initiate cancellation.

Here's an example.

void DoStage(BlockingCollection<T> input,
             BlockingCollection<T> output,
             CancellationTokenSource cts)
  try
  {
     var token = cts.Token;
     foreach (var item in input.GetConsumingEnumerable())
     {
        if (token.IsCancellationRequested) break;
        var result = ...
        output.Add(result, token);
     }
  }
  catch (Exception e)
  {
    // If an exception occurs, notify all other pipeline stages.
    cts.Cancel(); 
    if (!(e is OperationCanceledException))
      throw;
  }
  finally
  {
     output.CompleteAdding();
  }
}

This code is similar to the cancellation variation described earlier, except that when an unhandled exception occurs, the exception is intercepted by the catch block, which also signals cancellation for all of the other pipeline stages. Consequently, each pipeline stage will begin an orderly shutdown.

After all pipeline tasks stop, the original exception, wrapped as an inner exception of an AggregateException instance, is thrown by the Task.WaitAll method. You should be sure to include a catch or finally block to do any cleanup, such as releasing handles to unmanaged resources.

You might ask why you can't just pass a CancellationTokenSource object to the DoPipeline method instead of a CancellationToken value. By convention, only the cancellation token is passed as an argument. Passing a CancellationToken value allows lower-level library code to respond to an external cancellation but prevents lower-level libraries from initiating the cancellation of higher-level components. In other words, responding to a cancellation request requires less privilege than initiating such a request.

Load Balancing Using Multiple Producers

The BlockingCollection<T> class allows you to read values from more than one producer. This feature is provided by the TakeFromAny static method and its variants. You can use TakeFromAny to implement load balancing strategies for some pipeline scenarios (but not all). This variation is sometimes known as a nonlinear pipeline.

The image pipeline example described earlier in this chapter requires that the slideshow of thumbnail images be performed in the same order as the input files. This is a constraint that's common to many pipeline scenarios, such as processing a series of video frames. However, in the case of the image pipeline example, the filter operations on successive images are independent of each other. In this case, you can insert an additional pipeline task. This is shown in Figure 6.

Ff963548.943a5147-bce7-4c85-9294-019f1177eec3(en-us,PandP.10).png

Figure 6

Consuming values from multiple producers

Figure 6 shows what happens when you add an additional filter task. Both of the filter tasks take images produced by the previous stage of the pipeline. The order in which they consume these images is not fully determined, although from a filter's local point of view, no input image ever arrives out of order.

Each of the filter stages has its own blocking collection for placing in queues the elements that it produces. The consumer of these queues is a component known as a multiplexer, which combines the inputs from all of the producers. The multiplexer allows its consumer, which in this case is the display stage of the pipeline, to receive the images in the correct sequential order. The images don't need to be sorted or reordered. Instead, the fact that each producer queue is locally ordered allows the multiplexer to look for the next value in the sequence by simultaneously monitoring the heads of all of the producer queues. This is where the blocking collection's TakeFromAny method comes into play. The method allows the multiplexer to block until any of the producer queues has a value to read.

Here's an example to make this more concrete. Suppose that each image has a unique sequence number that's available as a property. The image numbers start with 1 and increase sequentially. As Figure 6 shows, the first filter might process images that are numbered 1, 4, and 5, while the second filter processes images with sequence numbers 2, 3, 6, and 7. Each load-balanced filter stage collects its output images into two queues. The two output queues are correctly ordered (that is, no higher numbered image comes before a lower numbered image), but there are gaps in the sequence of numbers. For example, if you take values from the first filter's output queue, you get image 1, followed by image 4, followed by image 5. Images 2 and 3 are missing because they're found in the second filter's output queue.

The gaps are a problem. The next stage of the pipeline, the Display Image stage, needs to show images in order and without gaps in the sequence. This is where the multiplexer comes in. Using the TakeFromAny method, the multiplexer waits for input from both of the filter stage producer queues. When an image arrives, the multiplexer looks to see if the image's sequence number is the next in the expected sequence. If it is, the multiplexer passes it to the Display Image stage. If the image is not the next in the sequence, the multiplexer holds the value in an internal look-ahead buffer and repeats the take operation for the input queue that does not have a look-ahead value. This algorithm allows the multiplexer to put together the inputs from the incoming producer queues in a way that ensures sequential order without sorting the values.

Figure 7 shows the performance benefit of doubling the number of filter stages when the filter operation is twice as expensive as the other pipeline stages.

Ff963548.9e36c2aa-b0df-4c3f-b4d9-cacfd0bc3628-thumb(en-us,PandP.10).png

Figure 7

Image pipeline with load balancing

If all pipeline stages, except the filter stage, take T units of time to process an image, and the filter stage takes 2 x T units of time, using two filter stages and two producer queues to load balance the pipeline results in an overall speed of approximately T units of time per image as the number of images grows. If you run the ImagePipeline sample and select the Load Balanced radio button, you'll see this effect. The speed of the pipeline (after a suitable number of images are processed) will converge on the average time of the slowest single-instance stage or on one-half of the average filter time, whichever is greater.

The queue wait time of Queue 3, which is displayed on the ImagePipeline sample's UI, indicates the overhead that's introduced by waiting on multiple producer queues. This is an example of how adding overhead to a parallel computation can actually increase the overall speed if the change also allows more efficient use of the available cores.

Pipelines and Streams

You may have noticed that blocking collections and streams have some similarities. It's sometimes useful to treat a blocking collection as a stream, and vice versa. For example, you may want to use a Pipeline pattern with library methods that read and write to streams. Suppose that you want to compress a file and then encrypt it. Both compression and encryption are supported by the .NET Framework, but the methods expect streams, not blocking collections, as input.

It's possible to create a stream whose underlying implementation relies on tasks and a blocking collection. For more information, see the section, "Further Reading," at the end of this chapter.

Asynchronous Pipelines

The pipelines that have been described so far are synchronous. Producers and consumers are long-running tasks that block on inputs and outputs. You could also have an asynchronous pipeline, where tasks aren't created until data becomes available. For that, you could use the AsyncCall class that's found in the ParallelExtensionsExtras sample project. (For information about where to get this sample, see the section, "Further Reading," at the end of this chapter.) The AsyncCall class is a queue that a producer puts data into; if there's currently no task processing the queue when data arrives, a new task is spun up to process the queue, and it is active as long as there's incoming data to process. If it ever finds that there is no more data, the task goes away. If more data arrives, a new task starts.

Anti-Patterns

There are a few things to watch out for when implementing a pipeline.

Thread Starvation

A pipeline requires all of its tasks to be executing concurrently. If there are not enough threads to run all pipeline tasks, the blocking collections can fill and block indefinitely. Task inlining, which was described in Chapter 3, doesn't help. To guarantee that a thread will be available to run each pipeline task, you can use the default task scheduler's LongRunning task creation option. If you forget this option and are using the default task scheduler, you're still safe. The thread pool will eventually notice that early stages of the pipeline are not making progress and inject the additional threads that are needed for the later pipeline stages.

Infinite Blocking Collection Waits

If a pipeline task throws an exception, it will no longer take values from its input blocking collection. If that blocking collection happens to be full, the task that writes values to that collection will be blocked indefinitely. You can avoid this situation by using the technique that was described in the section, "Canceling a Pipeline," earlier in this chapter.

Forgetting GetConsumingEnumerable()

Blocking collections implement IEnumerable<T>, so it's easy to forget to call the GetConsumingEnumerable method. If you make this mistake, the enumeration will be a snapshot of the blocking collection's state, and enumerating the results won't consume from the collection or modify it in any way, which means that multiple consumers could get the same items.

Note

It's easy to forget to call the GetConsumingEnumerable because the BlockingCollection class implements IEnumerable<T>. Enumerating over the blocking collection instance won't consume values. Watch out for this!

Using Other Producer/Consumer Collections

The BlockingCollection<T> class uses a concurrent queue as its default storage mechanism. However, you can also specify your own storage mechanism. The only requirement is that the underlying storage must implement the IProducerConsumerCollection interface. In general, using the blocking collections provided by the .NET Framework is easier and safer than writing your own implementation.

The .NET Framework provides several implementations of the IProducerConsumerCollection interface. These include the ConcurrentBag and the ConcurrentStack classes. Therefore, in principle, you could use bag (unordered) or stack (last-in, first-out [LIFO]) semantics for the buffers between your pipeline's stages.

Generally, only the default first-in, first-out (FIFO) ordering is recommended. If you use a concurrent bag, the outputs of your pipeline stages don't depend on order. In this case, a parallel loop could be used instead of a pipeline. Parallel loops are faster and easier to code. The fact that you are using unordered buffers is evidence that you're using the wrong pattern.

Design Notes

When you use the Pipeline pattern to decompose a problem, you need to consider how many pipeline stages to use. This depends on the number of cores you expect to have available at run time. Unlike most of the other patterns in this book, the Pipeline pattern doesn't automatically scale with the number of cores. This is one of its limitations, unless additional parallelism is introduced within a pipeline stage itself.

More stages work well unless the overhead of adding and removing elements from the buffers becomes significant. This is usually only a problem for stages that perform very small amounts of work.

To achieve a high degree of parallelism, you need to be careful that all the stages in the pipeline take approximately the same amount of time to perform their work. If they don't, the pipeline will be gated by the slowest component and may experience processor undersubscription.

Buffer size is also important for overall performance. Very small buffers will block the pipeline, especially if their buffer elements take a variable amount of processing time. Allocating larger buffers accommodates the variability in processing time. The size of the buffer can also depend on the size of the objects it contains. You would probably want to use a buffer with fewer entries if each element contained an object such as a large bitmapped image that required a great deal of memory.

In general, buffers should be large enough to absorb variability in the pipeline flow, but no larger. Use the profiler to understand the throughput characteristics of the pipeline and modify the buffer sizes to minimize the amount of time each stage is blocked by I/O waits.

Related Patterns

Pipelines have a lot in common with operating system concepts of pipes and filters. Pipelines are also related to streaming concepts.

Pipelines are expressions of a general technique known as producer/consumer. The pipeline is composed of a series of producer/consumers, each one depending on the output of its predecessor.

Exercises

  1. Write your own pipeline by modifying the example shown in the first section of this chapter.
  2. Execute the code with the Concurrency Visualizer. View and interpret the results.

Further Reading

The use of streams that are implemented on top of tasks and blocking collections is discussed by Toub. This reference uses the term "transfer stream" for this concept. Multiplexing inputs from multiple producer queues is covered by Campbell. A description of the pipes and filters pattern used by command shells for operating systems is described by Buschmann. The Microsoft Parallel Samples package is ParExtSamples on MSDN.

F. Buschmann, R. Meunier, H. Rohnert, P. Sommerlad, and M. Stal. Pattern-Oriented Software Architecture Volume 1: A System of Patterns. Wiley, 1996.

C. Campbell, M. Veanes, J. Huo, and A. Petrenko. "Multiplexing of Partially Ordered Events." TestCom 2005, Springer Verlag, June 2005.
https://research.microsoft.com/apps/pubs/default.aspx?id=77808

S. Toub. "Patterns of Parallel Programming: Understanding and Applying Parallel Patterns with the .NET Framework 4." 2009.
https://www.microsoft.com/downloads/details.aspx?FamilyID=86b3d32b-ad26-4bb8-a3ae-c1637026c3ee&displaylang=en

ParExtSamples software. "Samples for Parallel Programming with the .NET Framework 4."
https://code.msdn.microsoft.com/ParExtSamples

Next | Previous | Home | Community