Walkthrough: Implementing Futures
This topic shows how to implement futures in your application. The topic demonstrates how to combine existing functionality in the Concurrency Runtime into something that does more.
A task is a computation that can be decomposed into additional, more fine-grained, computations. A future is an asynchronous task that computes a value for later use.
To implement futures, this topic defines the async_future class. The async_future class uses these components of the Concurrency Runtime: the Concurrency::task_group class and the Concurrency::single_assignment class. The async_future class uses the task_group class to compute a value asynchronously and the single_assignment class to store the result of the computation. The constructor of the async_future class takes a work function that computes the result, and the get method retrieves the result.
To implement the future class
Declare a template class named async_future that is parameterized on the type of the resulting computation. Add public and private sections to this class.
template <typename T> class async_future { public: private: };
In the private section of the async_future class, declare a task_group and a single_assignment data member.
// Executes the asynchronous work function. task_group _tasks; // Stores the result of the asynchronous work function. single_assignment<T> _value;
In the public section of the async_future class, implement the constructor. The constructor is a template that is parameterized on the work function that computes the result. The constructor asynchronously executes the work function in the task_group data member and uses the Concurrency::send function to write the result to the single_assignment data member.
template <class Functor> explicit async_future(Functor&& fn) { // Execute the work function in a task group and send the result // to the single_assignment object. _tasks.run([fn, this]() { send(_value, fn()); }); }
In the public section of the async_future class, implement the destructor. The destructor waits for the task to finish.
~async_future() { // Wait for the task to finish. _tasks.wait(); }
In the public section of the async_future class, implement the get method. This method uses the Concurrency::receive function to retrieve the result of the work function.
// Retrieves the result of the work function. // This method blocks if the async_future object is still // computing the value. T get() { return receive(_value); }
Example
Description
The following example shows the complete async_future class and an example of its usage. The wmain function creates a std::vector object that contains 10,000 random integer values. It then uses async_future objects to find the smallest and largest values that are contained in the vector object.
Code
// futures.cpp
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>
#include <numeric>
#include <random>
using namespace Concurrency;
using namespace std;
template <typename T>
class async_future
{
public:
template <class Functor>
explicit async_future(Functor&& fn)
{
// Execute the work function in a task group and send the result
// to the single_assignment object.
_tasks.run([fn, this]() {
send(_value, fn());
});
}
~async_future()
{
// Wait for the task to finish.
_tasks.wait();
}
// Retrieves the result of the work function.
// This method blocks if the async_future object is still
// computing the value.
T get()
{
return receive(_value);
}
private:
// Executes the asynchronous work function.
task_group _tasks;
// Stores the result of the asynchronous work function.
single_assignment<T> _value;
};
int wmain()
{
// Create a vector of 10000 integers, where each element
// is between 0 and 9999.
mt19937 gen(2);
vector<int> values(10000);
generate(values.begin(), values.end(), [&gen]{ return gen()%10000; });
// Create a async_future object that finds the smallest value in the
// vector.
async_future<int> min_value([&]() -> int {
int smallest = INT_MAX;
for_each(values.begin(), values.end(), [&](int value) {
if (value < smallest)
{
smallest = value;
}
});
return smallest;
});
// Create a async_future object that finds the largest value in the
// vector.
async_future<int> max_value([&]() -> int {
int largest = INT_MIN;
for_each(values.begin(), values.end(), [&](int value) {
if (value > largest)
{
largest = value;
}
});
return largest;
});
// Calculate the average value of the vector while the async_future objects
// work in the background.
int sum = accumulate(values.begin(), values.end(), 0);
int average = sum / values.size();
// Print the smallest, largest, and average values.
wcout << L"smallest: " << min_value.get() << endl
<< L"largest: " << max_value.get() << endl
<< L"average: " << average << endl;
}
Comments
This example produces the following output:
smallest: 0
largest: 9999
average: 4981
The example uses the async_future::get method to retrieve the results of the computation. The async_future::get method waits for the computation to finish if the computation is still active.
Robust Programming
To extend the async_future class to handle exceptions that are thrown by the work function, modify the async_future::get method to call the Concurrency::task_group::wait method. The task_group::wait method throws any exceptions that were generated by the work function.
The following example shows the modified version of the async_future class. The wmain function uses a try-catch block to print the result of the async_future object or to print the value of the exception that is generated by the work function.
// futures-with-eh.cpp
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>
using namespace Concurrency;
using namespace std;
template <typename T>
class async_future
{
public:
template <class Functor>
explicit async_future(Functor&& fn)
{
// Execute the work function in a task group and send the result
// to the single_assignment object.
_tasks.run([fn, this]() {
send(_value, fn());
});
}
~async_future()
{
// Wait for the task to finish.
_tasks.wait();
}
// Retrieves the result of the work function.
// This method blocks if the async_future object is still
// computing the value.
T get()
{
// Wait for the task to finish.
// The wait method throws any exceptions that were generated
// by the work function.
_tasks.wait();
// Return the result of the computation.
return receive(_value);
}
private:
// Executes the asynchronous work function.
task_group _tasks;
// Stores the result of the asynchronous work function.
single_assignment<T> _value;
};
int wmain()
{
// For illustration, create a async_future with a work
// function that throws an exception.
async_future<int> f([]() -> int {
throw exception("error");
});
// Try to read from the async_future object.
try
{
int value = f.get();
wcout << L"f contains value: " << value << endl;
}
catch (const exception& e)
{
wcout << L"caught exception: " << e.what() << endl;
}
}
This example produces the following output:
caught exception: error
For more information about the exception handling model in the Concurrency Runtime, see Exception Handling in the Concurrency Runtime.
Compiling the Code
Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named futures.cpp and then run the following command in a Visual Studio 2010 Command Prompt window.
cl.exe /EHsc futures.cpp
See Also
Reference
Concepts
Exception Handling in the Concurrency Runtime