Пошаговое руководство. Создание агента потоков данных

В этом документе показано, как создавать приложения на основе агента, основанные на потоке данных, а не поток управления.

Поток управления ссылается на порядок выполнения операций в программе. Поток управления регулируется с помощью структур управления, таких как условные операторы, циклы и т. д. Кроме того, поток данных относится к модели программирования, в которой вычисления выполняются только в том случае, если все необходимые данные доступны. Модель программирования потока данных связана с концепцией передачи сообщений, в которой независимые компоненты программы взаимодействуют друг с другом, отправляя сообщения.

Асинхронные агенты поддерживают модели программирования потока управления и потока данных. Хотя модель потока управления подходит во многих случаях, модель потока данных подходит в других случаях, например, когда агент получает данные и выполняет действие, основанное на полезных данных этих данных.

Необходимые компоненты

Ознакомьтесь со следующими документами перед началом работы с этим пошаговом руководстве.

Разделы

Это пошаговое руководство содержит следующие разделы:

Создание агента "Базовый поток управления"

Рассмотрим следующий пример, определяющий control_flow_agent класс. Класс control_flow_agent действует на трех буферах сообщений: один входной буфер и два выходных буфера. Метод run считывает из исходного буфера сообщений в цикле и использует условный оператор для направления потока выполнения программы. Агент увеличивает один счетчик для ненулевых, отрицательных значений и увеличивает другой счетчик для ненулевых положительных значений. После получения отправляемого значения нуля агент отправляет значения счетчиков в буферы выходных сообщений. Методы negatives позволяют positives приложению считывать количество отрицательных и положительных значений от агента.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Хотя в этом примере используется базовый поток управления в агенте, он демонстрирует последовательную природу программирования на основе потока управления. Каждое сообщение должно обрабатываться последовательно, даже если в буфере входных сообщений может быть доступно несколько сообщений. Модель потока данных позволяет одновременно оценивать обе ветви условной инструкции. Модель потока данных также позволяет создавать более сложные сети обмена сообщениями, которые действуют на данных по мере его доступности.

[В начало]

Создание базового агента потока данных

В этом разделе показано, как преобразовать control_flow_agent класс для использования модели потока данных для выполнения той же задачи.

Агент потока данных работает путем создания сети буферов сообщений, каждый из которых служит определенной целью. Некоторые блоки сообщений используют функцию фильтра для принятия или отклонения сообщения на основе полезных данных. Функция фильтра гарантирует, что блок сообщения получает только определенные значения.

Преобразование агента потока управления в агент потока данных

  1. Скопируйте текст control_flow_agent класса в другой класс, например dataflow_agent. Кроме того, можно переименовать control_flow_agent класс.

  2. Удалите текст цикла, вызывающего receive метод run .

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. В методе run после инициализации переменных negative_count и positive_countдобавьте countdown_event объект, отслеживающий количество активных операций.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

Класс countdown_event показан далее в этом разделе.

  1. Создайте объекты буфера сообщений, которые будут участвовать в сети потока данных.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Подключение буферы сообщений для формирования сети.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. event Дождитесь установки объектов и countdown event объектов. Эти события сигнализируют о том, что агент получил значение sentinel и что все операции завершены.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

На следующей dataflow_agent схеме показана полная сеть потока данных для класса:

The dataflow network.

Следующая таблица описывает члены сети.

Элемент Description
increment_active Объект параллелизма::преобразователя , который увеличивает активный счетчик событий и передает входное значение остальной части сети.
negatives, positives concurrency::call objects, которые увеличивают количество чисел и уменьшает счетчик активных событий. Каждый объект использует фильтр для принятия отрицательных чисел или положительных чисел.
sentinel Объект параллелизма::call , который принимает только значение sentinel нулевого значения и уменьшает активный счетчик событий.
connector Объект параллелизма::unbounded_buffer , который подключает буфер исходного сообщения к внутренней сети.

run Так как метод вызывается в отдельном потоке, другие потоки могут отправлять сообщения в сеть до полного подключения сети. Член _source данных — это unbounded_buffer объект, который буферизирует все входные данные, отправленные из приложения агенту. Чтобы убедиться, что сеть обрабатывает все входные сообщения, агент сначала связывает внутренние узлы сети, а затем связывает начало этой сети connectorс элементом _source данных. Это гарантирует, что сообщения не обрабатываются по мере формирования сети.

Так как сеть в этом примере основана на потоке данных, а не на потоке управления, сеть должна взаимодействовать с агентом, который он завершил обработку каждого входного значения и что узел sentinel получил его значение. В этом примере объект используется countdown_event для сигнала о том, что все входные значения обработаны и объект параллелизма::event , чтобы указать, что узел sentinel получил его значение. Класс countdown_event использует event объект для сигнала, когда значение счетчика достигает нуля. Голова сети потока данных увеличивает счетчик каждый раз, когда он получает значение. Каждый узел терминала сети уменьшает счетчик после обработки входного значения. После того как агент формирует сеть потока данных, он ожидает, пока узел sentinel установит event объект, а countdown_event объект будет сигнализировать о том, что его счетчик достиг нуля.

В следующем примере показаны классы control_flow_agent, dataflow_agentи countdown_event классы. Функция wmain создает control_flow_agent объект и dataflow_agent использует send_values функцию для отправки ряда случайных значений агентам.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

В этом примере создаются следующие примеры выходных данных:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Компиляция кода

Скопируйте пример кода и вставьте его в проект Visual Studio или вставьте его в файл с именем dataflow-agent.cpp , а затем выполните следующую команду в окне командной строки Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

[В начало]

Создание агента ведения журнала сообщений

В следующем примере показан log_agent класс, который похож на dataflow_agent класс. Класс log_agent реализует асинхронный агент ведения журнала, который записывает сообщения журнала в файл и в консоль. Класс log_agent позволяет приложению класс классифицировать сообщения как информационные, предупреждения или ошибки. Приложение также позволяет указать, записывается ли каждая категория журнала в файл, консоль или оба. В этом примере все сообщения журнала записываются в файл и только сообщения об ошибках в консоли.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

Этот пример записывает в консоль следующие выходные данные.

error: This is a sample error message.

В этом примере также создается файл log.txt, содержащий следующий текст.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Компиляция кода

Скопируйте пример кода и вставьте его в проект Visual Studio или вставьте его в файл с именем log-filter.cpp , а затем выполните следующую команду в окне командной строки Visual Studio.

cl.exe /EHsc log-filter.cpp

[В начало]

См. также

Пошаговые руководства по среде выполнения с параллелизмом