Практическое руководство. Использование фильтра блоков сообщений

В этом документе показано, как использовать функцию фильтра, чтобы включить асинхронный блок сообщений для принятия или отклонения сообщения на основе полезных данных этого сообщения.

При создании объекта блока сообщений, например параллелизма::unbounded_buffer, параллелизма::call или параллелизма::преобразователя, можно указать функцию фильтра, которая определяет, принимает ли блок сообщения или отклоняет сообщение. Функция фильтра — это удобный способ гарантировать, что блок сообщений получает только определенные значения.

Функции фильтрации важны, так как они позволяют подключать блоки сообщений к сетям потоков данных. В сети потока данных блоки сообщений управляют потоком данных путем обработки только тех сообщений, которые соответствуют определенным критериям. Сравните это с моделью потока управления, где поток данных регулируется с помощью структур управления, таких как условные операторы, циклы и т. д.

В этом документе представлен базовый пример использования фильтра сообщений. Дополнительные примеры, использующие фильтры сообщений и модель потока данных для подключения блоков сообщений, см. в пошаговом руководстве. Создание агента потока данных и пошагового руководства. Создание сети обработки изображений.

Пример: функция count_primes

Рассмотрим следующую функцию, которая иллюстрирует базовое использование блока сообщений, count_primesкоторый не фильтрует входящие сообщения. Блок сообщения добавляет простые числа к объекту std::vector . Функция count_primes отправляет несколько чисел в блок сообщений, получает выходные значения из блока сообщений и выводит эти цифры, которые являются основными в консоли.

// Illustrates usage of a message buffer that does not use filtering.
void count_primes(unsigned long random_seed)
{
    // Holds prime numbers.
    vector<unsigned long> primes;

    // Adds numbers that are prime to the vector object.
    transformer<unsigned long, unsigned long> t([&primes](unsigned long n) -> unsigned long
    {
        if (is_prime(n))
        {
            primes.push_back(n);
        }
        return n;
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    for (int i = 0; i < 20; ++i)
    {
        send(t, static_cast<unsigned long>(generator()%10000));
    }

    // Receive from the message buffer the same number of times
    // to ensure that the message buffer has processed each message.
    for (int i = 0; i < 20; ++i)
    {
        receive(t);
    }

    // Print the prime numbers to the console.
    wcout << L"The following numbers are prime: " << endl;
    for(unsigned long prime : primes)
    {
        wcout << prime << endl;
    }
}

Объект transformer обрабатывает все входные значения, однако он требует только тех значений, которые являются простыми. Хотя приложение может быть записано таким образом, чтобы отправитель сообщения отправлял только простые номера, требования получателя сообщений всегда не могут быть известны.

Пример: функция count_primes_filter

Следующая функция выполняет ту же задачу, count_primes_filterчто count_primes и функция. transformer Однако объект в этой версии использует функцию фильтра, чтобы принимать только те значения, которые являются простыми. Функция, выполняющая действие, получает только простые числа; таким образом, не требуется вызывать функцию is_prime .

transformer Так как объект получает только простые числа, transformer сам объект может содержать простые числа. Другими словами, transformer объект в этом примере не требуется для добавления простых чисел в vector объект.

// Illustrates usage of a message buffer that uses filtering.
void count_primes_filter(unsigned long random_seed)
{
    // Accepts numbers that are prime.
    transformer<unsigned long, unsigned long> t([](unsigned long n) -> unsigned long
    {
        // The filter function guarantees that the input value is prime.
        // Return the input value.
        return n;
    },
    nullptr,
    [](unsigned long n) -> bool
    {
        // Filter only values that are prime.
        return is_prime(n);
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    size_t prime_count = 0;
    for (int i = 0; i < 20; ++i)
    {
        if (send(t, static_cast<unsigned long>(generator()%10000)))
        {
            ++prime_count;
        }
    }

    // Print the prime numbers to the console. 
    wcout << L"The following numbers are prime: " << endl;
    while (prime_count-- > 0)
    {
        wcout << receive(t) << endl;
    }
}

Теперь transformer объект обрабатывает только те значения, которые являются простыми. В предыдущем примере transformer объект обрабатывает все сообщения. Таким образом, предыдущий пример должен получать то же количество сообщений, которые он отправляет. В этом примере используется результат функции параллелизма::send , чтобы определить количество сообщений, получаемых от transformer объекта. Функция send возвращается true , когда буфер сообщения принимает сообщение и false когда буфер сообщения отклоняет сообщение. Таким образом, количество раз, когда буфер сообщения принимает сообщение, соответствует количеству простых чисел.

Пример: пример кода фильтра блока сообщения завершен

Ниже приведен полный пример кода. В примере вызывается как функция, так count_primes и count_primes_filter функция.

// primes-filter.cpp
// compile with: /EHsc
#include <agents.h>
#include <algorithm>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// Determines whether the input value is prime.
bool is_prime(unsigned long n)
{
    if (n < 2)
        return false;
    for (unsigned long i = 2; i < n; ++i)
    {
        if ((n % i) == 0)
            return false;
    }
    return true;
}

// Illustrates usage of a message buffer that does not use filtering.
void count_primes(unsigned long random_seed)
{
    // Holds prime numbers.
    vector<unsigned long> primes;

    // Adds numbers that are prime to the vector object.
    transformer<unsigned long, unsigned long> t([&primes](unsigned long n) -> unsigned long
    {
        if (is_prime(n))
        {
            primes.push_back(n);
        }
        return n;
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    for (int i = 0; i < 20; ++i)
    {
        send(t, static_cast<unsigned long>(generator()%10000));
    }

    // Receive from the message buffer the same number of times
    // to ensure that the message buffer has processed each message.
    for (int i = 0; i < 20; ++i)
    {
        receive(t);
    }

    // Print the prime numbers to the console.
    wcout << L"The following numbers are prime: " << endl;
    for(unsigned long prime : primes)
    {
        wcout << prime << endl;
    }
}

// Illustrates usage of a message buffer that uses filtering.
void count_primes_filter(unsigned long random_seed)
{
    // Accepts numbers that are prime.
    transformer<unsigned long, unsigned long> t([](unsigned long n) -> unsigned long
    {
        // The filter function guarantees that the input value is prime.
        // Return the input value.
        return n;
    },
    nullptr,
    [](unsigned long n) -> bool
    {
        // Filter only values that are prime.
        return is_prime(n);
    });

    // Send random values to the message buffer.
    mt19937 generator(random_seed);
    size_t prime_count = 0;
    for (int i = 0; i < 20; ++i)
    {
        if (send(t, static_cast<unsigned long>(generator()%10000)))
        {
            ++prime_count;
        }
    }

    // Print the prime numbers to the console. 
    wcout << L"The following numbers are prime: " << endl;
    while (prime_count-- > 0)
    {
        wcout << receive(t) << endl;
    }
}

int wmain()
{
    const unsigned long random_seed = 99714;

    wcout << L"Without filtering:" << endl;
    count_primes(random_seed);

    wcout << L"With filtering:" << endl;
    count_primes_filter(random_seed);

    /* Output:
    9973
    9349
    9241
    8893
    1297
    7127
    8647
    3229
    With filtering:
    The following numbers are prime:
    9973
    9349
    9241
    8893
    1297
    7127
    8647
    3229
    */
}

Компиляция кода

Скопируйте пример кода и вставьте его в проект Visual Studio или вставьте его в файл с именем primes-filter.cpp , а затем выполните следующую команду в окне командной строки Visual Studio.

cl.exe /EHsc primes-filter.cpp

Отказоустойчивость

Функция фильтра может быть лямбда-функцией, указателем функции или объектом функции. Каждая функция фильтра принимает одну из следующих форм:

bool (T)
bool (T const &)

Чтобы исключить ненужные копии данных, используйте вторую форму при наличии агрегатного типа, передаваемого по значению.

См. также

Библиотека асинхронных агентов
Пошаговое руководство. Создание агента потоков данных
Пошаговое руководство. Создание сети обработки изображений
Класс transformer