Compartilhar via


Passo a passo: Criando um agente do fluxo de dados

Este documento demonstra como criar aplicativos baseados agente- que são baseados no fluxo de dados, em vez de fluxo de controle.

O fluxo de controle se refere a ordem de execução de operações em um programa.O fluxo de controle é regulado usando estruturas de controle como instruções condicionais, loop, e assim por diante.Como alternativa, o fluxo de dados se refere a um modelo de programação que computações são feitas somente quando todos os dados necessários estão disponíveis.O modelo de programação do fluxo de dados está relacionado ao conceito de mensagem que passa, em que os componentes independentes de um programa se comunicam com um outro enviar mensagens.

Agentes assíncronos suportam modelos de programação de fluxo de controle e de fluxo de dados.Embora o modelo de fluxo de controle é apropriado em muitos casos, o modelo do fluxo de dados é apropriado no outro, por exemplo, quando um agente receber dados e executa uma ação que é baseada na carga dos dados.

Pré-requisitos

Ler os seguintes documentos antes de iniciar esta explicação passo a passo:

Seções

Essa explicação passo a passo contém as seções a seguir:

  • Criando um agente básico de fluxo de controle

  • Criando um agente básico de fluxo de dados

  • Criando um agente de Mensagem- log

Criando um agente básico de fluxo de controle

Considere o seguinte exemplo que define a classe de control_flow_agent .A classe de control_flow_agent atua em três buffers de mensagem: um buffer de entrada e dois buffers de saída.O método de run ler de buffer de mensagem fonte em um loop e usa uma instrução condicional para direcionar o fluxo de execução do programa.O agente incrementa um contador para valores e incrementos diferente de zero, negativos outro contador para valores diferente de zero, positivos.Depois que o agente recebe o valor de sentinela de zero, envia os valores dos contadores para buffers de mensagem de saída.Os métodos de negatives e de positives permitem que o aplicativo ler os resultados de valores positivos e negativos de agente.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Embora este exemplo faz uso básico de fluxo de controle em um agente, demonstra a natureza serial de programação controle-fluxo- base.Cada mensagem deve ser processada em sequência, mesmo que várias mensagens podem estar disponíveis em buffer de mensagens de entrada.O modelo de fluxo de dados permite ambas as ramificações da instrução condicional para avaliar simultaneamente.O modelo de fluxo de dados também permite que você crie as redes mais complexas de mensagem que atuem em dados como se tornam disponíveis.

Superior[]

Criando um agente básico de fluxo de dados

Esta seção mostra como converter a classe de control_flow_agent para usar o modelo de fluxo de dados para executar a mesma tarefa.

O agente do fluxo de dados funciona criando uma rede de buffers de mensagem, cada um de eles serve uma finalidade específica.Determinados blocos de mensagem usam uma função de filtragem para aceitar ou descartar uma mensagem com base na sua carga.Uma função de filtro garante que um bloco de mensagens recebe somente certos valores.

Para converter o agente de fluxo de controle a um agente do fluxo de dados

  1. Copie o corpo da classe de control_flow_agent a outra classe, por exemplo, dataflow_agent.Como alternativa, você pode renomear a classe de control_flow_agent .

  2. Remova o corpo do loop que chama receive do método de run .

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. No método de run , após a inicialização de variáveis negative_count e positive_count, adicione um objeto de countdown_event que controla a contagem de operações ativos.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    A classe de countdown_event é mostrada neste tópico posteriormente.

  4. Crie os objetos do buffer de mensagem que participarão na rede do fluxo de dados.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Conectar os buffers de mensagem para formar uma rede.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Espere os objetos de event e de countdown event a ser definidas.Esses eventos sinalizam que o agente recebeu o valor de sentinela e que todas as operações tenham sido concluídas.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

O diagrama a seguir mostra a rede completo do fluxo de dados para a classe de dataflow_agent :

A rede de fluxo de dados

A tabela a seguir descreve os membros de rede.

Membro

Descrição

increment_active

Um objeto de concurrency::transformer que aumenta o contador de evento ativo e passe o valor de entrada para o resto de rede.

negatives, positives

concurrency::call objetos que incrementam a contagem de números e diminui o contador de evento ativo.Os objetos cada uso um filtro aceitar números negativos ou números positivos.

sentinel

Um objeto de concurrency::call que aceita apenas o valor de sentinela de zero e decresça o contador de evento ativo.

connector

Um objeto de concurrency::unbounded_buffer que conecta o buffer de mensagem fonte para a rede interna.

Porque o método de run é chamado um segmento separado, outros segmentos podem enviar mensagens à rede antes da rede está conectada inteiramente.O membro de dados de _source é um objeto de unbounded_buffer que irá armazenar em buffer todas as entradas que é enviada ao agente do aplicativo.Para certificar-se que a rede processa todas as mensagens de entrada, o agente destina-se principalmente os nós internos de rede e links no início da rede, connector, ao membro de dados de _source .Isso garante que as mensagens não são processadas como a rede está sendo formada.

Porque a rede nesse exemplo é baseada no fluxo de dados, em vez de no fluxo de controle, rede deve se comunicar ao agente que terminou cada valor de entrada e que o nó de sentinela recebeu seu valor.Este exemplo usa um objeto de countdown_event para sinalizar que todos os valores de entrada foram processados e um objeto de concurrency::event para indicar que o nó de sentinela recebeu seu valor.A classe de countdown_event usa um objeto de event para sinalizar quando uma atingir do contador zero de um valor.O início de rede do fluxo de dados incrementa o contador sempre que recebe um valor.Cada nó terminal de rede diminui o contador depois que processa o valor de entrada.Após os formulários do agente a rede do fluxo de dados, aguarda o nó de sentinela para definir o objeto de event e o objeto de countdown_event para indicar que o contador alcançar zero.

O exemplo a seguir mostra control_flow_agent, dataflow_agent, e classes de countdown_event .A função de wmain cria control_flow_agent e um objeto de e usa a função dataflow_agent de send_values para enviar uma série de valores aleatórios para agentes.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Este exemplo produz a seguinte saída de exemplo:

  
  
  
  

Ff398051.collapse_all(pt-br,VS.110).gifCompilando o código

Copie o código de exemplo e cole-o em um projeto do Visual Studio, ou cole em um arquivo denominado dataflow-agent.cpp e execute o seguinte comando em uma janela de prompt de comando do Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

Superior[]

Criando um agente de Mensagem- log

O exemplo a seguir mostra a classe de log_agent , que lembra a classe de dataflow_agent .A classe implementa um agente de log_agent assíncrona de log que mensagens de log de grava em um arquivo e no console.A classe de log_agent permite que o aplicativo classificar mensagens aviso como informativas, ou erro.Também permite o aplicativo para especificar se cada categoria de log estiver escrevendo em um arquivo, o console, ou em ambos.Este exemplo grava as mensagens de log em um arquivo e apenas as mensagens de erro no console.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

Este exemplo grava a saída a seguir no console.

  

Este exemplo também gera o arquivo de log.txt, que contém o texto a seguir.

  
  
  

Ff398051.collapse_all(pt-br,VS.110).gifCompilando o código

Copie o código de exemplo e cole-o em um projeto do Visual Studio, ou cole em um arquivo denominado log-filter.cpp e execute o seguinte comando em uma janela de prompt de comando do Visual Studio.

cl.exe /EHsc log-filter.cpp

Superior[]

Consulte também

Outros recursos

Passo a passo de tempo de execução de concorrência