如何:使用消息块筛选器
本文档演示如何使用筛选功能来使异步消息块能够根据消息的负载接受或拒绝此消息。
在创建消息块对象(例如 concurrency::unbounded_buffer、 concurrency::call 或 concurrency::transformer)时,您可以提供可确定消息块是接受还是拒绝消息的“筛选功能”。 筛选功能很有用,它可以保证消息块只接收特定值。
筛选功能很重要,因为它们使您能够连接消息块以形成“数据流网络”。 在数据流网络中,消息块通过只处理那些满足特定条件的消息来控制数据流。 这与控制流模型不同,在控制流模型中数据流通过控制结构(例如条件语句、循环等)进行调节。
本文档提供如何使用消息筛选器的基本示例。 对于使用消息筛选器和数据流模型连接消息块的其他示例,请参见演练:创建数据流代理和演练:创建图像处理网络。
示例
请考虑下面的 count_primes 函数,它说明了不筛选传入消息的消息块的基本用法。 消息块将质数追加到 std::vector 对象中。 count_primes 函数会向消息块发送几个数字、从消息块接收输出值并将质数输出到控制台。
// Illustrates usage of a message buffer that does not use filtering.
void count_primes(unsigned long random_seed)
{
// Holds prime numbers.
vector<unsigned long> primes;
// Adds numbers that are prime to the vector object.
transformer<unsigned long, unsigned long> t([&primes](unsigned long n) -> unsigned long
{
if (is_prime(n))
{
primes.push_back(n);
}
return n;
});
// Send random values to the message buffer.
mt19937 generator(random_seed);
for (int i = 0; i < 20; ++i)
{
send(t, static_cast<unsigned long>(generator()%10000));
}
// Receive from the message buffer the same number of times
// to ensure that the message buffer has processed each message.
for (int i = 0; i < 20; ++i)
{
receive(t);
}
// Print the prime numbers to the console.
wcout << L"The following numbers are prime: " << endl;
for(unsigned long prime : primes)
{
wcout << prime << endl;
}
}
transformer 对象可处理所有输入值;但它只需要质数。 虽然可以编写应用程序以使消息发送方只发送质数,但无法始终了解消息接收方的要求。
count_primes_filter 函数会执行与 count_primes 函数相同的任务。 但是,此版本中的 transformer 对象使用筛选功能来只接受质数。 执行此操作的函数只接收质数;因此,它不必调用 is_prime 函数。
因为 transformer 对象只接收质数,所以 transformer 对象本身便可以保留这些质数。 即,不需要本示例中的 transformer 对象即可将质数添加到 vector 对象中。
// Illustrates usage of a message buffer that uses filtering.
void count_primes_filter(unsigned long random_seed)
{
// Accepts numbers that are prime.
transformer<unsigned long, unsigned long> t([](unsigned long n) -> unsigned long
{
// The filter function guarantees that the input value is prime.
// Return the input value.
return n;
},
nullptr,
[](unsigned long n) -> bool
{
// Filter only values that are prime.
return is_prime(n);
});
// Send random values to the message buffer.
mt19937 generator(random_seed);
size_t prime_count = 0;
for (int i = 0; i < 20; ++i)
{
if (send(t, static_cast<unsigned long>(generator()%10000)))
{
++prime_count;
}
}
// Print the prime numbers to the console.
wcout << L"The following numbers are prime: " << endl;
while (prime_count-- > 0)
{
wcout << receive(t) << endl;
}
}
transformer 对象现在只处理质数。 在前面的示例中,transformer 对象处理所有消息。 因此,前面的示例必须接收它所发送的相同数量的消息。 此示例使用 concurrency::send 函数的结果来确定要从 transformer 对象接收多少消息。 send 函数在消息缓冲区接受消息时返回 true;在消息缓冲区拒绝消息时返回 false。 因此,消息缓冲区接受消息的次数与质数的计数相匹配。
下面的代码显示完整的示例。 该示例同时调用 count_primes 和 count_primes_filter 函数。
// primes-filter.cpp
// compile with: /EHsc
#include <agents.h>
#include <algorithm>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// Determines whether the input value is prime.
bool is_prime(unsigned long n)
{
if (n < 2)
return false;
for (unsigned long i = 2; i < n; ++i)
{
if ((n % i) == 0)
return false;
}
return true;
}
// Illustrates usage of a message buffer that does not use filtering.
void count_primes(unsigned long random_seed)
{
// Holds prime numbers.
vector<unsigned long> primes;
// Adds numbers that are prime to the vector object.
transformer<unsigned long, unsigned long> t([&primes](unsigned long n) -> unsigned long
{
if (is_prime(n))
{
primes.push_back(n);
}
return n;
});
// Send random values to the message buffer.
mt19937 generator(random_seed);
for (int i = 0; i < 20; ++i)
{
send(t, static_cast<unsigned long>(generator()%10000));
}
// Receive from the message buffer the same number of times
// to ensure that the message buffer has processed each message.
for (int i = 0; i < 20; ++i)
{
receive(t);
}
// Print the prime numbers to the console.
wcout << L"The following numbers are prime: " << endl;
for(unsigned long prime : primes)
{
wcout << prime << endl;
}
}
// Illustrates usage of a message buffer that uses filtering.
void count_primes_filter(unsigned long random_seed)
{
// Accepts numbers that are prime.
transformer<unsigned long, unsigned long> t([](unsigned long n) -> unsigned long
{
// The filter function guarantees that the input value is prime.
// Return the input value.
return n;
},
nullptr,
[](unsigned long n) -> bool
{
// Filter only values that are prime.
return is_prime(n);
});
// Send random values to the message buffer.
mt19937 generator(random_seed);
size_t prime_count = 0;
for (int i = 0; i < 20; ++i)
{
if (send(t, static_cast<unsigned long>(generator()%10000)))
{
++prime_count;
}
}
// Print the prime numbers to the console.
wcout << L"The following numbers are prime: " << endl;
while (prime_count-- > 0)
{
wcout << receive(t) << endl;
}
}
int wmain()
{
const unsigned long random_seed = 99714;
wcout << L"Without filtering:" << endl;
count_primes(random_seed);
wcout << L"With filtering:" << endl;
count_primes_filter(random_seed);
/* Output:
9973
9349
9241
8893
1297
7127
8647
3229
With filtering:
The following numbers are prime:
9973
9349
9241
8893
1297
7127
8647
3229
*/
}
编译代码
复制代码示例,并将此代码粘贴到Visual Studio项目中或一个名为 primes-filter.cpp的文件中,然后在 Visual Studio命令提示符窗口中运行以下命令。
cl.exe /EHsc primes-filter.cpp
可靠编程
筛选功能可以是 lambda 函数、函数指针或函数对象。 每个筛选功能都会采用下面的某一种形式:
若要取消不必要的数据复制操作,可在您具有通过值传输的聚合类型时使用第二种形式。