How to: Use a Message Block Filter

This document demonstrates how to use a filter function to enable an asynchronous message block to accept or reject a message on the basis of the payload of that message.

When you create a message block object such as a concurrency::unbounded_buffer, a concurrency::call, or a concurrency::transformer, you can supply a filter function that determines whether the message block accepts or rejects a message. A filter function is a useful way to guarantee that a message block receives only certain values.

Filter functions are important because they enable you to connect message blocks to form dataflow networks. In a dataflow network, message blocks control the flow of data by processing only those messages that meet specific criteria. Compare this to the control-flow model, where the flow of data is regulated by using control structures such as conditional statements, loops, and so on.

This document provides a basic example of how to use a message filter. For additional examples that use message filters and the dataflow model to connect message blocks, see Walkthrough: Creating a Dataflow Agent and Walkthrough: Creating an Image-Processing Network.

Example: count_primes function

Consider the following function, count_primes, which illustrates the basic usage of a message block that does not filter incoming messages. The message block appends prime numbers to a std::vector object. The count_primes function sends several numbers to the message block, receives the output values from the message block, and prints those numbers that are prime to the console.

// Illustrates usage of a message buffer that does not use filtering.
void count_primes(unsigned long random_seed)
{
    // Holds prime numbers.
    vector<unsigned long> primes;

    // Adds numbers that are prime to the vector object.
    transformer<unsigned long, unsigned long> t([&primes](unsigned long n) -> unsigned long
    {
        if (is_prime(n))
        {
            primes.push_back(n);
        }
        return n;
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    for (int i = 0; i < 20; ++i)
    {
        send(t, static_cast<unsigned long>(generator()%10000));
    }

    // Receive from the message buffer the same number of times
    // to ensure that the message buffer has processed each message.
    for (int i = 0; i < 20; ++i)
    {
        receive(t);
    }

    // Print the prime numbers to the console.
    wcout << L"The following numbers are prime: " << endl;
    for(unsigned long prime : primes)
    {
        wcout << prime << endl;
    }
}

The transformer object processes all input values; however, it requires only those values that are prime. Although the application could be written so that the message sender sends only prime numbers, the requirements of the message receiver cannot always be known.

Example: count_primes_filter function

The following function, count_primes_filter, performs the same task as the count_primes function. However, the transformer object in this version uses a filter function to accept only those values that are prime. The function that performs the action only receives prime numbers; therefore, it does not have to call the is_prime function.

Because the transformer object receives only prime numbers, the transformer object itself can hold the prime numbers. In other words, the transformer object in this example is not required to add the prime numbers to the vector object.

// Illustrates usage of a message buffer that uses filtering.
void count_primes_filter(unsigned long random_seed)
{
    // Accepts numbers that are prime.
    transformer<unsigned long, unsigned long> t([](unsigned long n) -> unsigned long
    {
        // The filter function guarantees that the input value is prime.
        // Return the input value.
        return n;
    },
    nullptr,
    [](unsigned long n) -> bool
    {
        // Filter only values that are prime.
        return is_prime(n);
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    size_t prime_count = 0;
    for (int i = 0; i < 20; ++i)
    {
        if (send(t, static_cast<unsigned long>(generator()%10000)))
        {
            ++prime_count;
        }
    }

    // Print the prime numbers to the console. 
    wcout << L"The following numbers are prime: " << endl;
    while (prime_count-- > 0)
    {
        wcout << receive(t) << endl;
    }
}

The transformer object now processes only those values that are prime. In the previous example, transformer object processes all messages. Therefore, the previous example must receive the same number of messages that it sends. This example uses the result of the concurrency::send function to determine how many messages to receive from the transformer object. The send function returns true when the message buffer accepts the message and false when the message buffer rejects the message. Therefore, the number of times that the message buffer accepts the message matches the count of prime numbers.

Example: Finished message block filter code sample

The following code shows the complete example. The example calls both the count_primes function and the count_primes_filter function.

// primes-filter.cpp
// compile with: /EHsc
#include <agents.h>
#include <algorithm>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// Determines whether the input value is prime.
bool is_prime(unsigned long n)
{
    if (n < 2)
        return false;
    for (unsigned long i = 2; i < n; ++i)
    {
        if ((n % i) == 0)
            return false;
    }
    return true;
}

// Illustrates usage of a message buffer that does not use filtering.
void count_primes(unsigned long random_seed)
{
    // Holds prime numbers.
    vector<unsigned long> primes;

    // Adds numbers that are prime to the vector object.
    transformer<unsigned long, unsigned long> t([&primes](unsigned long n) -> unsigned long
    {
        if (is_prime(n))
        {
            primes.push_back(n);
        }
        return n;
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    for (int i = 0; i < 20; ++i)
    {
        send(t, static_cast<unsigned long>(generator()%10000));
    }

    // Receive from the message buffer the same number of times
    // to ensure that the message buffer has processed each message.
    for (int i = 0; i < 20; ++i)
    {
        receive(t);
    }

    // Print the prime numbers to the console.
    wcout << L"The following numbers are prime: " << endl;
    for(unsigned long prime : primes)
    {
        wcout << prime << endl;
    }
}

// Illustrates usage of a message buffer that uses filtering.
void count_primes_filter(unsigned long random_seed)
{
    // Accepts numbers that are prime.
    transformer<unsigned long, unsigned long> t([](unsigned long n) -> unsigned long
    {
        // The filter function guarantees that the input value is prime.
        // Return the input value.
        return n;
    },
    nullptr,
    [](unsigned long n) -> bool
    {
        // Filter only values that are prime.
        return is_prime(n);
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    size_t prime_count = 0;
    for (int i = 0; i < 20; ++i)
    {
        if (send(t, static_cast<unsigned long>(generator()%10000)))
        {
            ++prime_count;
        }
    }

    // Print the prime numbers to the console. 
    wcout << L"The following numbers are prime: " << endl;
    while (prime_count-- > 0)
    {
        wcout << receive(t) << endl;
    }
}

int wmain()
{
    const unsigned long random_seed = 99714;

    wcout << L"Without filtering:" << endl;
    count_primes(random_seed);

    wcout << L"With filtering:" << endl;
    count_primes_filter(random_seed);

    /* Output:
    9973
    9349
    9241
    8893
    1297
    7127
    8647
    3229
    With filtering:
    The following numbers are prime:
    9973
    9349
    9241
    8893
    1297
    7127
    8647
    3229
    */
}

Compiling the Code

Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named primes-filter.cpp and then run the following command in a Visual Studio Command Prompt window.

cl.exe /EHsc primes-filter.cpp

Robust Programming

A filter function can be a lambda function, a function pointer, or a function object. Every filter function takes one of the following forms:

bool (T)
bool (T const &)

To eliminate the unnecessary copying of data, use the second form when you have an aggregate type that is transmitted by value.

See also

Asynchronous Agents Library
Walkthrough: Creating a Dataflow Agent
Walkthrough: Creating an Image-Processing Network
transformer Class