How to: Create a Blocking Queue Using Agents

Applies to: Functional Programming

Authors: Tomas Petricek and Jon Skeet

Referenced Image

Get this book in Print, PDF, ePub and Kindle at manning.com. Use code “MSDN37b” to save 37%.

Summary: This article shows how to solve the consumer-producer problem using agents. It implements a queue that blocks the producer when the queue is full and blocks the consumer when the queue is empty.

This topic contains the following sections.

  • Introducing the Consumer-Producer Problem
  • Implementing a Blocking Queue
  • Programming with a Blocking Queue
  • Summary
  • Additional Resources
  • See Also

This article is associated with Real World Functional Programming: With Examples in F# and C# by Tomas Petricek with Jon Skeet from Manning Publications (ISBN 9781933988924, copyright Manning Publications 2009, all rights reserved). No part of these chapters may be reproduced, stored in a retrieval system, or transmitted in any form or by any means—electronic, electrostatic, mechanical, photocopying, recording, or otherwise—without the prior written permission of the publisher, except in the case of brief quotations embodied in critical articles or reviews.

Introducing the Consumer-Producer Problem

This article discusses how to solve the so-called consumer-producer problem in F#. This is a well-known concurrency pattern. The idea is that an application consists of two components—a consumer and a producer. The producer generates some values and stores them into a store. The consumer picks items from the store and processes them. When the store is empty, the consumer needs to block until more items become available. Similarly, when the store is full, the producer needs to stop until the consumer removes some items.

This may sound like an abstract problem but it's actually a very common task in concurrent programming. It can be viewed as a design pattern for synchronization of two components. For example, this pattern can be used to connect multiple components into a pipeline. Figure 1 demonstrates such a pipeline.

Figure 1. A processing pipeline with four steps and three stores

Referenced Image

Each step in a pipeline consumes values from the previous one and produces a new value for the next step. In this programming model, the size of the store controls how much work is done in parallel. For example, in Figure 1, the buffer can only contain four temporary items so, when a buffer is filled, the producer is blocked until some values are consumed. The rest of the article looks at how to implement the store, which is usually called a blocking queue.

Implementing a Blocking Queue

The interface of the blocking queue is quite simple. It has a member Add that adds a new value and a member Get that reads a value. Both of the members can block the caller. The blocking is implemented using asynchronous workflows to avoid blocking an actual physical thread. The next section starts with the internal message type used in the agent.

Defining Message

When adding or removing elements, the caller needs to wait for a confirmation from the queue. This means that the messages sent to the agent need to carry AsyncReplyChannel<'T>, which is used for sending notification back:

type internal BlockingAgentMessage<'T> = 
  | Get of AsyncReplyChannel<'T>
  | Add of 'T * AsyncReplyChannel<unit> 

When getting an element from the queue, the reply channel carries a value of 'T, which is the element removed from the queue. When adding an element to the queue, the message carries the element to be added (value of type 'T) and also a reply channel that sends back a unit value to confirm that the element was added. As shown later, the Add member can be called using the do! syntax of asynchronous workflows, which doesn't declare a new value (because unit doesn't carry any information), but it blocks until the asynchronous workflow completes.

Creating Blocking Queue Type

The message type is a useful guide for implementing the agent. The next important aspect to consider is the agent states. The blocking queue has three different states. In each state, it can handle a different set of messages:

  • When the agent is empty, it can only handle an Add message.

  • When it is full, it can only handle a Get message.

  • When it is not empty but also not full, it can handle both messages.

This behavior can be implemented using a state machine with three states implemented as three mutually recursive, asynchronous functions. The idea is quite simple, but there is more information about this technique in the references at the end of the article.

The article first shows the declaration of the BlockingQueueAgent type and then adds the body. The following listing omits the body of the agent. It just contains three functions (emptyQueue, fullQueue, and runningQueue) that will represent the three states of the agent. As usual, in this introduction, the snippet uses a convenient type alias Agent<'T>, which stands for MailboxProcessor<'T>.

/// Agent-based implementation of producer/consumer problem 
type BlockingQueueAgent<'T>(maxLength) =
    let agent = Agent.Start(fun agent ->
        let queue = new Queue<_>()
        // State machine running inside the agent
        let rec emptyQueue() = (...)
        and fullQueue() = (...)
        and runningQueue() = (...)

        // Start with an empty queue
        emptyQueue() )

    /// Asynchronously adds item to the queue. If the queue
    /// is full, it blocks until some items are removed.
    member x.AsyncAdd(v:'T) = 
        agent.PostAndAsyncReply(fun ch -> Add(v, ch))

    /// Asynchronously gets item from the queue. If the queue
    /// is empty, it blocks until some items are added.
    member x.AsyncGet() = 
        agent.PostAndAsyncReply(Get)

The type takes a single constructor argument maxLength, which represents the maximum size of the queue. The argument must be larger than 0, but it can also be 1 (in that case, the queue is always either empty or full). When the queue is created, it starts an agent. The implementation of the agent is discussed in the next section, but the above snippet shows that the agent is initially in the empty state.

The two members of the type are both implemented using the PostAndAsyncReply method of the agent. The method creates a channel and passes it to the function specified as an argument to the method. In case of AsyncAdd, the function creates the Add message by combining the value with the created channel. The Get message only expects a channel as an argument, so the lambda function doesn't have to be written explicitly. Both of the members are asynchronous. The return type of AsyncGet is Async<'T>, and the return type of AsyncAdd is Async<unit>. It would also be possible to add synchronous versions of the methods. This article doesn't do that to indicate that only the operations from asynchronous computations should be used.

The only piece that needs to be added before testing the agent is the body that implements a state machine running inside the agent.

Implementing a State Machine

As already explained, the agent can handle both of the messages only when it is not empty and also not full. This state is represented by the runningQueue function. In this state, the agent can receive any message, so it calls the Receive operation. When the agent is empty or full, it cannot handle Get and Add, respectively. In that case, the processing of messages in the state can be encoded using the Scan operation:

let rec emptyQueue() = 
    agent.Scan(fun msg ->
        match msg with 
        | Add(value, reply) -> Some(enqueueAndContinue(value, reply))
        | _ -> None )
and fullQueue() = 
    agent.Scan(fun msg ->
        match msg with 
        | Get(reply) -> Some(dequeueAndContinue(reply))
        | _ -> None )
and runningQueue() = async {
    let! msg = agent.Receive() 
    match msg with 
    | Add(value, reply) -> return! enqueueAndContinue(value, reply)
    | Get(reply) -> return! dequeueAndContinue(reply) }

The Scan operation provides a way to specify that the agent is only interested in some messages. For example, in the emptyQueue function, the Scan operation calls the given lambda function to check if the agent can handle a received message. When the message is Add, the lambda function returns Some, containing an asynchronous computation that should be executed. For any other message, the function returns None and so the message is kept in the queue for later. The fullQueue function is very similar.

In the runningQueue state, the agent receives any message that arrives and then handles it. The code to handle the adding or removing of elements and sending a notification back to the caller needs to be repeated a couple of times, so it is refactored into two functions, dequeueAndContinue and enqueueAndContinue. They update the queue, notify the caller and then choose the new state depending on the number of elements in the queue:

and enqueueAndContinue (value, reply) = async {
    queue.Enqueue(value)
    reply.Reply() 
    return! chooseState() }
and dequeueAndContinue (reply) = async { 
    reply.Reply(queue.Dequeue())
    return! chooseState() }
and chooseState() = 
    if queue.Count = 0 then emptyQueue()
    elif queue.Count = maxLength then fullQueue()
    else runningQueue()

The enqueueAndContinue function adds the element to the queue, notifies the caller to unblock it, and then calls chooseState to resume the agent in one of the three states. The dequeueAndContinue function takes the element, sends it to the caller, and then resumes the agent. When resuming the agent, the chooseState function first checks for the two corner cases (emptyQueue or fullQueue); otherwise, it continues in the runningQueue state.

The tree parts of the source code can be copied into a single F# source file to get a fully working blocking queue. Alternatively, the complete source code is also available at the end of the document. The next section shows a simple producer consumer demo that uses the BlockingQueueAgent<'T> type.

Programming with a Blocking Queue

At the beginning of the article, Figure 1 demonstrated a pipeline consisting of blocking queues and processes that take elements from one queue, perform some calculation, and put results into the next queue. In F#, the processes of a pipeline can be implemented as asynchronous workflows.

The following example implements two workflows that communicate via a single BlockingQueueAgent. The first workflow generates numbers from 1 to 10 and adds them to the queue. The second workflow gets elements from the queue and then does some work (for simplicity, it just blocks the workflow for 1 second and then prints the number):

let ag = new BlockingQueueAgent<int>(3)

let writer() = async { 
    for i in 0 .. 10 do 
        do! ag.AsyncAdd(i)
        printfn "Added: %d" i }

let reader () = async { 
    while true do
        let! v = ag.AsyncGet()
        do! Async.Sleep(1000)
        printfn "Got: %d" v }

reader () |> Async.Start
writer () |> Async.Start

Although the sample is very basic, it demonstrates a typical structure of pipeline-based processing. The first step is to initialize the blocking queue; the second step is to define workflows that get or add elements from or to the queue, and the last step is to start the workflows on a background thread.

When the example is executed, the writer workflow starts adding elements to the queue until the queue becomes full. This is done by calling the AsyncAdd method using the do! syntax, which means that the call will block the workflow when the queue cannot add another element and the workflow will wait until AsyncGet completes. If the generation of elements takes longer than the consumption (or if there were multiple consumers), the reader workflow could block when calling AsyncGet.

Summary

This article presented an implementation of a blocking queue using agents. A blocking queue is a thread-safe store that provides operations for the adding and taking of elements. If an operation cannot be completed, the caller is blocked until the operation is enabled. When implementing the queue using F# agents, it is possible to avoid blocking the actual system threads by exposing operations as values of the Async<'T> type.

As the last example demonstrated, the blocking operations can be called using let! or do! syntax from asynchronous workflows. The fact that the operations do not block an actual system thread (just a logical execution of the workflow) means that code written using the BlockingQueueAgent type scales extremely well. Using a large number of such agents with a large number of concurrently executing asynchronous workflows (that may be blocked for arbitrarily long time) doesn't hurt the performance of the system.

Additional Resources

This article discussed how to implement an agent that is useful for encoding a very common pattern called the consumer-producer problem. For more information about agents and server-side programming in F#, see the following articles:

The following How-to article gives another example of a reusable agent:

To download the code snippets shown in this article, go to https://code.msdn.microsoft.com/Chapter-2-Concurrent-645370c3

See Also

This article is based on Real World Functional Programming: With Examples in F# and C#. Book chapters related to the content of this article are:

  • Book Chapter 13: “Asynchronous and data-driven programming” explains how asynchronous workflows work and uses them to write an interactive script that downloads a large dataset from the Internet.

  • Book Chapter 14: “Writing parallel functional programs” explains how to use the Task Parallel Library to write data-parallel and task-based parallel programs. This approach complements agent-based parallelism in F#.

  • Book Chapter 16: “Developing reactive functional programs” discusses how to write reactive user interfaces using asynchronous workflows and events. This approach is related to agents but more suitable for creating user interfaces.

The following MSDN documents are related to the topic of this article:

  • Asynchronous Workflows (F#) explains how to use F# asynchronous workflows for writing efficient computations that do not block the execution of other work.

  • BlockingCollection<T> Class discusses a concurrent collection type that implements the producer consumer pattern in .NET 4.0. The type is highly optimized but doesn't expose operations as F# asynchronous workflows.

  • Image pipeline using agents (Tomas Petricek's blog) gives an example of using the BlockingQueueAgent type for implementing an image-processing pipeline that consists of multiple steps.

Previous article: How to: Create Reusable Agents

Next article: How to: Create an Agent for Batch Processing