Procedura dettagliata: Creazione di un agente del flusso di dati
Questo documento illustra come creare applicazioni basate su agenti basate sul flusso di dati, anziché sul flusso di controllo.
Il flusso di controllo si riferisce all'ordine di esecuzione delle operazioni in un programma. Il flusso di controllo è regolamentato usando strutture di controllo come istruzioni condizionali, cicli e così via. In alternativa, il flusso di dati fa riferimento a un modello di programmazione in cui i calcoli vengono eseguiti solo quando tutti i dati necessari sono disponibili. Il modello di programmazione del flusso di dati è correlato al concetto di passaggio dei messaggi, in cui i componenti indipendenti di un programma comunicano tra loro inviando messaggi.
Gli agenti asincroni supportano sia i modelli di programmazione del flusso di controllo che dei flussi di dati. Anche se il modello di flusso di controllo è appropriato in molti casi, il modello di flusso di dati è appropriato in altri casi, ad esempio quando un agente riceve i dati ed esegue un'azione basata sul payload di tali dati.
Prerequisiti
Leggere i documenti seguenti prima di iniziare questa procedura dettagliata:
Sezioni
Questa procedura dettagliata contiene le sezioni seguenti:
Creazione di un agente di flusso di controllo di base
Si consideri l'esempio seguente che definisce la control_flow_agent
classe . La control_flow_agent
classe agisce su tre buffer di messaggi: un buffer di input e due buffer di output. Il run
metodo legge dal buffer del messaggio di origine in un ciclo e usa un'istruzione condizionale per indirizzare il flusso di esecuzione del programma. L'agente incrementa un contatore per valori negativi diversi da zero e incrementa un altro contatore per valori positivi diversi da zero. Dopo che l'agente riceve il valore sentinel pari a zero, invia i valori dei contatori ai buffer dei messaggi di output. I negatives
metodi e positives
consentono all'applicazione di leggere i conteggi dei valori negativi e positivi dall'agente.
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
Anche se in questo esempio viene usato di base il flusso di controllo in un agente, viene illustrata la natura seriale della programmazione basata sul flusso di controllo. Ogni messaggio deve essere elaborato in sequenza, anche se nel buffer dei messaggi di input potrebbero essere disponibili più messaggi. Il modello di flusso di dati consente a entrambi i rami dell'istruzione condizionale di valutare contemporaneamente. Il modello di flusso di dati consente anche di creare reti di messaggistica più complesse che agiscono sui dati man mano che diventano disponibili.
Creazione di un agente di flusso di dati di base
Questa sezione illustra come convertire la control_flow_agent
classe per usare il modello di flusso di dati per eseguire la stessa attività.
L'agente del flusso di dati funziona creando una rete di buffer di messaggi, ognuno dei quali serve uno scopo specifico. Alcuni blocchi di messaggi usano una funzione di filtro per accettare o rifiutare un messaggio in base al relativo payload. Una funzione di filtro garantisce che un blocco di messaggi riceva solo determinati valori.
Per convertire l'agente del flusso di controllo in un agente del flusso di dati
Copiare il corpo della
control_flow_agent
classe in un'altra classe,dataflow_agent
ad esempio . In alternativa, è possibile rinominare lacontrol_flow_agent
classe .Rimuovere il corpo del ciclo che chiama
receive
dalrun
metodo .
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
run
Nel metodo , dopo l'inizializzazione delle variabilinegative_count
epositive_count
, aggiungere uncountdown_event
oggetto che tiene traccia del numero di operazioni attive.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
La countdown_event
classe viene illustrata più avanti in questo argomento.
- Creare gli oggetti buffer dei messaggi che faranno parte della rete del flusso di dati.
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
- Connettere i buffer dei messaggi per formare una rete.
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
- Attendere che gli
event
oggetti ecountdown event
siano impostati. Questi eventi segnalano che l'agente ha ricevuto il valore sentinel e che tutte le operazioni sono state completate.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
Il diagramma seguente mostra la rete completa del flusso di dati per la dataflow_agent
classe :
Nella tabella seguente vengono descritti i membri della rete.
Membro | Descrizione |
---|---|
increment_active |
Oggetto concurrency::transformer che incrementa il contatore eventi attivo e passa il valore di input al resto della rete. |
negatives , positives |
oggetti concurrency::call che incrementino il numero di numeri e decrementa il contatore eventi attivo. Gli oggetti usano un filtro per accettare numeri negativi o numeri positivi. |
sentinel |
Oggetto concurrency::call che accetta solo il valore sentinel pari a zero e decrementa il contatore eventi attivo. |
connector |
Oggetto concurrency::unbounded_buffer che connette il buffer dei messaggi di origine alla rete interna. |
Poiché il run
metodo viene chiamato su un thread separato, altri thread possono inviare messaggi alla rete prima che la rete sia completamente connessa. Il _source
membro dati è un unbounded_buffer
oggetto che memorizza nel buffer tutti gli input inviati dall'applicazione all'agente. Per assicurarsi che la rete elabori tutti i messaggi di input, l'agente collega innanzitutto i nodi interni della rete e quindi collega l'inizio della rete, connector
, al _source
membro dati. Ciò garantisce che i messaggi non vengano elaborati durante il formato della rete.
Poiché la rete in questo esempio si basa sul flusso di dati, anziché sul flusso di controllo, la rete deve comunicare con l'agente che ha completato l'elaborazione di ogni valore di input e che il nodo sentinel ha ricevuto il relativo valore. Questo esempio usa un countdown_event
oggetto per segnalare che tutti i valori di input sono stati elaborati e un oggetto concurrency::event per indicare che il nodo sentinel ha ricevuto il relativo valore. La countdown_event
classe usa un event
oggetto per segnalare quando un valore del contatore raggiunge zero. L'head della rete del flusso di dati incrementa il contatore ogni volta che riceve un valore. Ogni nodo terminale della rete decrementa il contatore dopo l'elaborazione del valore di input. Dopo che l'agente forma la rete del flusso di dati, attende che il nodo sentinel imposti l'oggetto event
e che l'oggetto segnali che il countdown_event
contatore ha raggiunto zero.
Nell'esempio seguente vengono illustrate le control_flow_agent
classi , dataflow_agent
e countdown_event
. La wmain
funzione crea un control_flow_agent
oggetto e dataflow_agent
e usa la send_values
funzione per inviare una serie di valori casuali agli agenti.
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
Questo esempio produce l'output di esempio seguente:
Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Compilazione del codice
Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato dataflow-agent.cpp
e quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.
cl.exe /EHsc dataflow-agent.cpp
Creazione di un agente di registrazione messaggi
Nell'esempio seguente viene illustrata la log_agent
classe , simile alla dataflow_agent
classe . La log_agent
classe implementa un agente di registrazione asincrono che scrive i messaggi di log in un file e nella console. La log_agent
classe consente all'applicazione di classificare i messaggi come informativo, avviso o errore. Consente inoltre all'applicazione di specificare se ogni categoria di log viene scritta in un file, nella console o in entrambi. Questo esempio scrive tutti i messaggi di log in un file e solo i messaggi di errore nella console.
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
In questo esempio viene scritto l'output seguente nella console.
error: This is a sample error message.
In questo esempio viene generato anche il file log.txt contenente il testo seguente.
info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===
Compilazione del codice
Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato log-filter.cpp
e quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.
cl.exe /EHsc log-filter.cpp