Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Ez a dokumentum bemutatja, hogyan hozhat létre adatfolyamon alapuló ügynökalapú alkalmazásokat a vezérlési folyamat helyett.
A vezérlési folyamat a programok műveleteinek végrehajtási sorrendjét jelenti. A vezérlési folyamatot vezérlő struktúrák, például feltételes utasítások, hurkok stb. használatával szabályozzák. Másik lehetőségként az adatfolyam egy olyan programozási modellre utal, amelyben a számítások csak akkor jönnek létre, ha az összes szükséges adat rendelkezésre áll. Az adatfolyam-programozási modell az üzenetátadás fogalmához kapcsolódik, amelyben a program független összetevői üzenetek küldésével kommunikálnak egymással.
Az aszinkron ügynökök támogatják a vezérlési és adatfolyam-programozási modelleket is. Bár a vezérlőfolyamat-modell sok esetben helyénvaló, máskor az adatfolyam-modell az előnyös, például amikor egy ügynök adatokat fogad, és az adatok tartalmán alapuló műveletet hajt végre.
Előfeltételek
Az útmutató megkezdése előtt olvassa el a következő dokumentumokat:
Szakaszok
Ez az útmutató a következő szakaszokat tartalmazza:
Egyszerű Control-Flow-ügynök létrehozása
Tekintse meg az osztályt control_flow_agent meghatározó alábbi példát. Az control_flow_agent osztály három üzenetpufferen működik: egy bemeneti és két kimeneti pufferen. A run metódus a forrásüzenet pufferéből olvas be egy ciklusban, és egy feltételes utasítással irányítja a program végrehajtásának folyamatát. Az ügynök egy számlálót növektet a nem nulla, negatív értékekhez, és egy másik számlálót a nem nulla pozitív értékekhez. Miután az ügynök megkapta a nulla értékű sentinelt, elküldi a számlálók értékeit a kimeneti üzenetpufferekbe. A negatives és positives metódusok lehetővé teszik az alkalmazás számára, hogy beolvassa az ügynöktől a negatív és pozitív értékek számát.
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
Bár ez a példa alapszintűen használja a vezérlőfolyamatot egy ügynökben, bemutatja a vezérlőfolyamat-alapú programozás soros jellegét. Minden üzenetet egymás után kell feldolgozni, annak ellenére, hogy több üzenet is elérhető lehet a bemeneti üzenet pufferében. Az adatfolyam-modell lehetővé teszi a feltételes utasítás mindkét ágának egyidejű kiértékelését. Az adatfolyam-modell lehetővé teszi összetettebb üzenetkezelési hálózatok létrehozását is, amelyek az adatok rendelkezésre állása során működnek.
[Felső]
Egyszerű adatfolyam-ügynök létrehozása
Ez a szakasz bemutatja, hogyan alakíthatja át az control_flow_agent osztályt úgy, hogy az adatfolyam-modellt használja ugyanarra a feladatra.
Az adatfolyam-ügynök egy üzenetpufferek hálózatának létrehozásával működik, amelyek mindegyike egy adott célt szolgál. Bizonyos üzenetblokkok szűrőfüggvényt használnak arra, hogy az üzenetet a terhelés alapján elfogadják vagy elutasítsák. A szűrőfüggvények biztosítják, hogy az üzenetblokkok csak bizonyos értékeket kapnak.
A vezérlőfolyamat-ügynök átalakítása adatfolyam-ügynökké
Másolja az osztály törzsét
control_flow_agentegy másik osztályba,dataflow_agentpéldául. Másik lehetőségként átnevezheti az osztálytcontrol_flow_agent.Távolítsa el a
receivemetódusból azt a huroktörzset, amely hívjarun.
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
- A metódusban a
runváltozóknegative_countpositive_countinicializálása után adjon hozzá egycountdown_eventobjektumot, amely nyomon követi az aktív műveletek számát.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
Az countdown_event osztály később jelenik meg ebben a témakörben.
- Hozza létre az adatfolyam-hálózatban részt vevő üzenetpuffer-objektumokat.
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
- Csatlakoztassa az üzenetpuffereket egy hálózat létrehozásához.
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
- Várja meg, amíg a
eventéscountdown eventobjektumok be lesznek állítva. Ezek az események azt jelzik, hogy az ügynök megkapta a sentinel értéket, és hogy az összes művelet befejeződött.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
Az alábbi ábrán az osztály teljes adatfolyam-hálózata dataflow_agent látható:
Az alábbi táblázat a hálózat tagjait ismerteti.
| Tag | Leírás |
|---|---|
increment_active |
Egyidejűség::transzformátorobjektum, amely növeli az aktív eseményszámlálót, és átadja a bemeneti értéket a hálózat többi részére. |
negatives, positives |
concurrency::call objektumok, amelyek növelik a számok értékét, és csökkentik az aktív eseményszámlálót. Az objektumok mindegyike szűrővel fogadja el a negatív számokat vagy a pozitív számokat. |
sentinel |
concurrency::call objektum, amely csak a nulla szentinel értéket fogadja el, és csökkenti az aktív eseményszámlálót. |
connector |
concurrency::unbounded_buffer objektum, amely a forrásüzenet-puffert csatlakoztatja a belső hálózathoz. |
Mivel a run metódus külön szálon van meghívva, más szálak üzeneteket küldhetnek a hálózatnak, mielőtt a hálózat teljes mértékben csatlakozna. Az _source adattag egy unbounded_buffer objektum, amely puffereli az alkalmazásból az ügynöknek küldött összes bemenetet. Annak érdekében, hogy a hálózat feldolgozza az összes bemeneti üzenetet, az ügynök először a hálózat belső csomópontjait kapcsolja össze, connectormajd az adattaghoz kapcsolja a _source hálózat kezdetét. Ez garantálja, hogy az üzenetek nem lesznek feldolgozva a hálózat kialakításakor.
Mivel a példában szereplő hálózat nem vezérlési folyamaton, hanem adatfolyamon alapul, a hálózatnak közölnie kell az ügynökkel, hogy befejezte az egyes bemeneti értékek feldolgozását, és hogy a sentinel csomópont megkapta az értékét. Ez a példa egy countdown_event objektumot használ annak jelzésére, hogy az összes bemeneti érték feldolgozásra került, és egy egyidejűség::event objektum jelzi, hogy a sentinel csomópont megkapta az értékét. Az countdown_event osztály objektummal event jelzi, ha egy számláló értéke eléri a nullát. Az adatfolyam-hálózat feje minden alkalommal növeli a számlálót, amikor értéket kap. A hálózat minden terminálcsomópontja csökkenti a számlálót a bemeneti érték feldolgozása után. Miután az ügynök létrehozta az adatfolyam-hálózatot, megvárja, amíg a sentinel csomópont beállítja az event objektumot, és az countdown_event objektum jelzi, hogy a számláló elérte a nullát.
Az alábbi példa az control_flow_agent, dataflow_agent és countdown_event osztályokat mutatja be. A wmain függvény létrehoz egy control_flow_agent és egy dataflow_agent objektumot, és a send_values függvény használatával véletlenszerű értékek sorozatát küldi el az ügynököknek.
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
Ez a példa a következő mintakimenetet hozza létre:
Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
A kód összeállítása
Másolja ki a példakódot, és illessze be egy Visual Studio-projektbe, vagy illessze be egy elnevezett dataflow-agent.cpp fájlba, majd futtassa a következő parancsot egy Visual Studio parancssori ablakban.
cl.exe /EHsc dataflow-agent.cpp
[Felső]
Message-Logging-ügynök létrehozása
Az alábbi példa az log_agent osztályt mutatja be, amely hasonlít a dataflow_agent osztályra. Az log_agent osztály implementál egy aszinkron naplózási ügynököt, amely naplóüzeneteket ír egy fájlba és a konzolra. Az log_agent osztály lehetővé teszi, hogy az alkalmazás információs, figyelmeztetés vagy hibaként kategorizálja az üzeneteket. Emellett lehetővé teszi az alkalmazás számára, hogy meghatározza, hogy minden naplókategória fájlba, konzolra vagy mindkettőbe van-e írva. Ez a példa az összes naplóüzenetet egy fájlba írja, és csak a konzolra küldött hibaüzeneteket.
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
Ez a példa a következő kimenetet írja a konzolra.
error: This is a sample error message.
Ez a példa a log.txt fájlt is létrehozza, amely a következő szöveget tartalmazza.
info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===
A kód összeállítása
Másolja ki a példakódot, és illessze be egy Visual Studio-projektbe, vagy illessze be egy elnevezett log-filter.cpp fájlba, majd futtassa a következő parancsot egy Visual Studio parancssori ablakban.
cl.exe /EHsc log-filter.cpp
[Felső]