Condividi tramite


Procedura dettagliata: Creazione di un agente del flusso di dati

Questo documento illustra come creare applicazioni basate su agenti basate sul flusso di dati, anziché sul flusso di controllo.

Il flusso di controllo si riferisce all'ordine di esecuzione delle operazioni in un programma. Il flusso di controllo è regolamentato usando strutture di controllo come istruzioni condizionali, cicli e così via. In alternativa, il flusso di dati fa riferimento a un modello di programmazione in cui i calcoli vengono eseguiti solo quando tutti i dati necessari sono disponibili. Il modello di programmazione del flusso di dati è correlato al concetto di passaggio dei messaggi, in cui i componenti indipendenti di un programma comunicano tra loro inviando messaggi.

Gli agenti asincroni supportano sia i modelli di programmazione del flusso di controllo che dei flussi di dati. Anche se il modello di flusso di controllo è appropriato in molti casi, il modello di flusso di dati è appropriato in altri casi, ad esempio quando un agente riceve i dati ed esegue un'azione basata sul payload di tali dati.

Prerequisiti

Leggere i documenti seguenti prima di iniziare questa procedura dettagliata:

Sezioni

Questa procedura dettagliata contiene le sezioni seguenti:

Creazione di un agente di flusso di controllo di base

Si consideri l'esempio seguente che definisce la control_flow_agent classe . La control_flow_agent classe agisce su tre buffer di messaggi: un buffer di input e due buffer di output. Il run metodo legge dal buffer del messaggio di origine in un ciclo e usa un'istruzione condizionale per indirizzare il flusso di esecuzione del programma. L'agente incrementa un contatore per valori negativi diversi da zero e incrementa un altro contatore per valori positivi diversi da zero. Dopo che l'agente riceve il valore sentinel pari a zero, invia i valori dei contatori ai buffer dei messaggi di output. I negatives metodi e positives consentono all'applicazione di leggere i conteggi dei valori negativi e positivi dall'agente.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Anche se in questo esempio viene usato di base il flusso di controllo in un agente, viene illustrata la natura seriale della programmazione basata sul flusso di controllo. Ogni messaggio deve essere elaborato in sequenza, anche se nel buffer dei messaggi di input potrebbero essere disponibili più messaggi. Il modello di flusso di dati consente a entrambi i rami dell'istruzione condizionale di valutare contemporaneamente. Il modello di flusso di dati consente anche di creare reti di messaggistica più complesse che agiscono sui dati man mano che diventano disponibili.

[Torna all'inizio]

Creazione di un agente di flusso di dati di base

Questa sezione illustra come convertire la control_flow_agent classe per usare il modello di flusso di dati per eseguire la stessa attività.

L'agente del flusso di dati funziona creando una rete di buffer di messaggi, ognuno dei quali serve uno scopo specifico. Alcuni blocchi di messaggi usano una funzione di filtro per accettare o rifiutare un messaggio in base al relativo payload. Una funzione di filtro garantisce che un blocco di messaggi riceva solo determinati valori.

Per convertire l'agente del flusso di controllo in un agente del flusso di dati

  1. Copiare il corpo della control_flow_agent classe in un'altra classe, dataflow_agentad esempio . In alternativa, è possibile rinominare la control_flow_agent classe .

  2. Rimuovere il corpo del ciclo che chiama receive dal run metodo .

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. run Nel metodo , dopo l'inizializzazione delle variabili negative_count e positive_count, aggiungere un countdown_event oggetto che tiene traccia del numero di operazioni attive.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

La countdown_event classe viene illustrata più avanti in questo argomento.

  1. Creare gli oggetti buffer dei messaggi che faranno parte della rete del flusso di dati.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Connettere i buffer dei messaggi per formare una rete.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Attendere che gli event oggetti e countdown event siano impostati. Questi eventi segnalano che l'agente ha ricevuto il valore sentinel e che tutte le operazioni sono state completate.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

Il diagramma seguente mostra la rete completa del flusso di dati per la dataflow_agent classe :

Rete del flusso di dati.

Nella tabella seguente vengono descritti i membri della rete.

Membro Descrizione
increment_active Oggetto concurrency::transformer che incrementa il contatore eventi attivo e passa il valore di input al resto della rete.
negatives, positives oggetti concurrency::call che incrementino il numero di numeri e decrementa il contatore eventi attivo. Gli oggetti usano un filtro per accettare numeri negativi o numeri positivi.
sentinel Oggetto concurrency::call che accetta solo il valore sentinel pari a zero e decrementa il contatore eventi attivo.
connector Oggetto concurrency::unbounded_buffer che connette il buffer dei messaggi di origine alla rete interna.

Poiché il run metodo viene chiamato su un thread separato, altri thread possono inviare messaggi alla rete prima che la rete sia completamente connessa. Il _source membro dati è un unbounded_buffer oggetto che memorizza nel buffer tutti gli input inviati dall'applicazione all'agente. Per assicurarsi che la rete elabori tutti i messaggi di input, l'agente collega innanzitutto i nodi interni della rete e quindi collega l'inizio della rete, connector, al _source membro dati. Ciò garantisce che i messaggi non vengano elaborati durante il formato della rete.

Poiché la rete in questo esempio si basa sul flusso di dati, anziché sul flusso di controllo, la rete deve comunicare con l'agente che ha completato l'elaborazione di ogni valore di input e che il nodo sentinel ha ricevuto il relativo valore. Questo esempio usa un countdown_event oggetto per segnalare che tutti i valori di input sono stati elaborati e un oggetto concurrency::event per indicare che il nodo sentinel ha ricevuto il relativo valore. La countdown_event classe usa un event oggetto per segnalare quando un valore del contatore raggiunge zero. L'head della rete del flusso di dati incrementa il contatore ogni volta che riceve un valore. Ogni nodo terminale della rete decrementa il contatore dopo l'elaborazione del valore di input. Dopo che l'agente forma la rete del flusso di dati, attende che il nodo sentinel imposti l'oggetto event e che l'oggetto segnali che il countdown_event contatore ha raggiunto zero.

Nell'esempio seguente vengono illustrate le control_flow_agentclassi , dataflow_agente countdown_event . La wmain funzione crea un control_flow_agent oggetto e dataflow_agent e usa la send_values funzione per inviare una serie di valori casuali agli agenti.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Questo esempio produce l'output di esempio seguente:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato dataflow-agent.cpp e quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

[Torna all'inizio]

Creazione di un agente di registrazione messaggi

Nell'esempio seguente viene illustrata la log_agent classe , simile alla dataflow_agent classe . La log_agent classe implementa un agente di registrazione asincrono che scrive i messaggi di log in un file e nella console. La log_agent classe consente all'applicazione di classificare i messaggi come informativo, avviso o errore. Consente inoltre all'applicazione di specificare se ogni categoria di log viene scritta in un file, nella console o in entrambi. Questo esempio scrive tutti i messaggi di log in un file e solo i messaggi di errore nella console.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

In questo esempio viene scritto l'output seguente nella console.

error: This is a sample error message.

In questo esempio viene generato anche il file log.txt contenente il testo seguente.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato log-filter.cpp e quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc log-filter.cpp

[Torna all'inizio]

Vedi anche

Procedure dettagliate del runtime di concorrenza