Compartilhar via


Demonstra Passo a passo: Criando um agente de fluxo de dados

Este documento demonstra como criar aplicativos baseados em agente que são baseados em fluxo de dados, em vez de fluxo de controle.

Controlar o fluxo de refere-se a ordem de execução de operações em um programa. Controle de fluxo é regulamentado por meio de estruturas de controle como instruções condicionais, loops e assim por diante. Como alternativa, fluxo de dados se refere a um modelo de programação qual computações são feitas apenas quando está disponível a todos os dados necessários. O modelo de programação de fluxo de dados está relacionado ao conceito de mensagem passando, no qual os componentes independentes de um programa se comunicar entre si enviando mensagens.

Suportam a agentes assíncronos o fluxo de controle e os modelos de programação de fluxo de dados. Embora o modelo de fluxo de controle é apropriado em muitos casos, o modelo de fluxo de dados é apropriado em outros, por exemplo, quando um agente recebe os dados e executa uma ação com base na carga de dados.

Pré-requisitos

Antes de começar este passo a passo, leia os seguintes documentos:

Seções

Esta explicação passo a passo contém as seções a seguir:

  • Criando um agente de fluxo de controle básico

  • Criando um agente de fluxo de dados básicos

  • Criando um agente de log de mensagens

Criando um agente de fluxo de controle básico

Considere o exemplo a seguir define o control_flow_agent classe. O control_flow_agent classe atua em três buffers de mensagem: um buffer de entrada e dois buffers de saída. O run método lê o buffer de mensagem de origem em um loop e usa uma instrução condicional para direcionar o fluxo de execução do programa. O agente incrementa um contador para valores negativos de zero e incrementa o contador de outro para os valores de zero e positivos. Depois que o agente recebe o valor de Sentinela de zero, ele envia os valores dos contadores para os buffers de mensagem de saída. O negatives e positives métodos permitem que o aplicativo ler as contagens de valores negativos e positivos do agente.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Embora este exemplo torna o uso básico do fluxo de controle em um agente, ele demonstra a natureza serial de programação baseado em fluxo de controle. Cada mensagem deve ser processada seqüencialmente, embora várias mensagens podem estar disponíveis no buffer de mensagem de entrada. O modelo de fluxo de dados permite que ambas as ramificações da instrução condicional para avaliar simultaneamente. O modelo de fluxo de dados permite criar redes de mensagens mais complexas que atuam sobre dados assim que estiver disponível.

go to top

Criando um agente de fluxo de dados básicos

Esta seção mostra como converter o control_flow_agent classe para usar o modelo de fluxo de dados para executar a mesma tarefa.

O agente de fluxo de dados funciona através da criação de uma rede de buffers de mensagem, cada um deles serve a um propósito específico. Determinados blocos de mensagem usam uma função de filtro para aceitar ou rejeitar uma mensagem na Base de sua carga. Uma função de filtro garante que um bloco de mensagens recebe apenas certos valores.

Para converter o agente de fluxo de controle a um agente de fluxo de dados

  1. Copiar o corpo da control_flow_agent classe para outra classe, por exemplo, dataflow_agent. Como alternativa, você pode renomear o control_flow_agent classe.

  2. Remover o corpo do loop que chama receive partir do run método.

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. No run método, após a inicialização das variáveis negative_count e positive_count, adicione um countdown_event objeto que controla a contagem de operações de ativo.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    O countdown_event classe é mostrada posteriormente neste tópico.

  4. Crie a mensagem buffer objetos que participarão da rede de fluxo de dados.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Conecte os buffers de mensagem para formar uma rede.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Aguarde a event e countdown event objetos a ser definido. Esses eventos sinalizam que, se o agente recebeu o valor de Sentinela e que todas as operações tenham terminado.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

O diagrama a seguir mostra a rede completa de fluxo de dados para o dataflow_agent classe:

O fluxo de dados <>>rede

A tabela a seguir descreve os membros da rede.

Membro

Descrição

increment_active

A Concurrency::transformer o objeto que incrementa o contador de eventos ativos e passa o valor de entrada para o restante da rede.

negatives, positives

Concurrency::Call objetos que incrementam a contagem de números e diminui o contador de eventos ativos. Os objetos usam um filtro para aceitar números negativos ou em números positivos.

sentinel

A Concurrency::call o objeto que aceita somente o valor sentinel de zero e diminui o contador de eventos ativos.

connector

A Concurrency::unbounded_buffer o objeto que conecta o buffer de mensagem de origem para a rede interna.

Porque o run método for chamado em um thread separado, outros threads podem enviar mensagens à rede antes que a rede está totalmente conectada. O _source o membro de dados é um unbounded_buffer o objeto que todos os buffers de entrada que é enviado do aplicativo para o agente. Para certificar-se de que a rede processa mensagens de entrada de todos, o agente primeiro vincula os nós internos da rede e vincula o início da rede, connector, para o _source membro de dados. Isso garante que as mensagens não seja processadas como a rede está sendo formada.

Porque a rede neste exemplo é baseada em fluxo de dados, em vez de no fluxo de controle, a rede deve comunicar ao agente que concluiu o processamento de cada valor de entrada e que o nó sentinel recebeu seu valor. Este exemplo usa um countdown_event o objeto para sinalizar que todos os valores de entrada foram processados e um Concurrency::event o objeto para indicar que o nó sentinel recebeu seu valor. O countdown_event classe usa uma event o objeto para sinalizar quando um valor de contador atingir zero. A cabeça da rede de fluxo de dados o contador é incrementado toda vez que ele recebe um valor. Cada nó terminal de diminui a rede o contador depois de processar o valor de entrada. Depois que o agente da rede de fluxo de dados de formulários, ele aguarda o nó sentinel para definir o event objeto e para a countdown_event o objeto para sinalizar que atingiu seu contador zero.

A exemplo a seguir mostra a control_flow_agent, dataflow_agent, e countdown_event classes. O wmain função cria um control_flow_agent e um dataflow_agent objeto e usa o send_values a função de enviar uma série de valores aleatórios para agentes.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace Concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Este exemplo produz a saída de exemplo a seguir:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Compilando o código

Copie o código de exemplo e colá-lo em um Visual Studio do projeto, ou colá-lo em um arquivo que é chamado agent.cpp de fluxo de dados e, em seguida, execute o seguinte comando um Visual Studio 2010 janela do Prompt de comando.

cl.exe /EHsc dataflow-agent.cpp

go to top

Criando um agente de log de mensagens

A exemplo a seguir mostra a log_agent classe, que é semelhante a dataflow_agent classe. O log_agent classe implementa um agente de log assíncrono que gravações log de mensagens para um arquivo e o console. O log_agent classe permite que o aplicativo categorizar as mensagens como informativo, aviso ou erro. Ele também permite que o aplicativo especificar se cada categoria de log é gravada em um arquivo, o console ou ambos. Este exemplo grava todas as mensagens de log para um arquivo e somente as mensagens de erro no console.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
   log_info    = 0x1,
   log_warning = 0x2,
   log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
   // Holds a message string and its logging type.
   struct log_message
   {
      wstring message;
      log_message_type type;
   };

public:
   log_agent(const wstring& file_path, log_message_type file_messages, 
      log_message_type console_messages)
      : _file(file_path)
      , _file_messages(file_messages)
      , _console_messages(console_messages)    
      , _active(0)
   {
      if (_file.bad())
         throw invalid_argument("Unable to open log file.");
   }

   // Writes the provided message to the log.
   void log(const wstring& message, log_message_type type)
   {  
      // Increment the active message count.
      _active.add_count();

      // Send the message to the network.
      log_message msg = { message, type };      
      send(_log_buffer, msg);
   }

   void close()
   {
      // Signal that the agent is now closed.
      _closed.set();
   }

protected:

   void run()
   {
      //
      // Create the members of the dataflow network.
      //

      // Offers messages to the file writer and the console writer.
      overwrite_buffer<log_message> connector;

      // Writes a log message to file.
      call<log_message> file_writer(
         [this](log_message msg) {
            // Write the message to the file.
            write_to_stream(msg, _file);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool {
            // Accept only message types that are to be written to file.
            return (msg.type & _file_messages) != 0;
         });

       // Writes a log message to the console.
      call<log_message> console_writer(
         [this](log_message msg) {
            // Write the message to the console.
            write_to_stream(msg, wcout);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool  {
            // Accept only message types that are to be written to file.
            return (msg.type & _console_messages) != 0;
         });

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&file_writer);
      connector.link_target(&console_writer);

      // Connect _log_buffer to the internal network to begin data flow.
      _log_buffer.link_target(&connector);

      // Wait for the closed event to be signaled.
      _closed.wait();

      // Wait for all messages to be processed.
      _active.wait();

      // Close the log file and flush the console.
      _file.close();
      wcout.flush();

      // Set the agent to the completed state.
      done();
   }

private:
   // Writes a logging message to the specified output stream.
   void write_to_stream(const log_message& msg, wostream& stream)
   {
      // Write the message to the stream.
      wstringstream ss;

      switch (msg.type)
      {
      case log_info:
         ss << L"info: ";
         break;
      case log_warning:
         ss << L"warning: ";
         break;
      case log_error:
         ss << L"error: ";
      }

      ss << msg.message << endl;
      stream << ss.str();
   }

private:   
   // The file stream to write messages to.
   wofstream _file;   

   // The log message types that are written to file.
   log_message_type _file_messages;

   // The log message types that are written to the console.
   log_message_type _console_messages;

   // The head of the network. Propagates logging messages
   // to the rest of the network.
   unbounded_buffer<log_message> _log_buffer;   

   // Counts the number of active messages in the network.
   countdown_event _active;

   // Signals that the agent has been closed.
   event _closed;
};

int wmain()
{
   // Union of all log message types.
   log_message_type log_all = 
      log_message_type(log_info | log_warning  | log_error);

   // Create a logging agent that writes all log messages to file and error 
   // messages to the console.
   log_agent logger(L"log.txt", log_all, log_error);

   // Start the agent.
   logger.start();

   // Log a few messages.

   logger.log(L"===Logging started.===", log_info);

   logger.log(L"This is a sample warning message.", log_warning);
   logger.log(L"This is a sample error message.", log_error);

   logger.log(L"===Logging finished.===", log_info);

   // Close the logger and wait for the agent to finish.
   logger.close();
   agent::wait(&logger);
}

Este exemplo grava a saída a seguir ao console.

error: This is a sample error message.

Este exemplo também produz o arquivo de log. txt, que contém o texto a seguir.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Compilando o código

Copie o código de exemplo e colá-lo em um Visual Studio do projeto, ou colá-lo em um arquivo que é chamado log-filter.cpp e, em seguida, execute o seguinte comando um Visual Studio 2010 janela do Prompt de comando.

cl.exe /EHsc log-filter.cpp

go to top

Consulte também

Conceitos

Orientações de Runtime de simultaneidade