Megosztás a következőn keresztül:


Útmutató: Adatfolyam-ügynök létrehozása

Ez a dokumentum bemutatja, hogyan hozhat létre adatfolyamon alapuló ügynökalapú alkalmazásokat a vezérlési folyamat helyett.

A vezérlési folyamat a programok műveleteinek végrehajtási sorrendjét jelenti. A vezérlési folyamatot vezérlő struktúrák, például feltételes utasítások, hurkok stb. használatával szabályozzák. Másik lehetőségként az adatfolyam egy olyan programozási modellre utal, amelyben a számítások csak akkor jönnek létre, ha az összes szükséges adat rendelkezésre áll. Az adatfolyam-programozási modell az üzenetátadás fogalmához kapcsolódik, amelyben a program független összetevői üzenetek küldésével kommunikálnak egymással.

Az aszinkron ügynökök támogatják a vezérlési és adatfolyam-programozási modelleket is. Bár a vezérlőfolyamat-modell sok esetben helyénvaló, máskor az adatfolyam-modell az előnyös, például amikor egy ügynök adatokat fogad, és az adatok tartalmán alapuló műveletet hajt végre.

Előfeltételek

Az útmutató megkezdése előtt olvassa el a következő dokumentumokat:

Szakaszok

Ez az útmutató a következő szakaszokat tartalmazza:

Egyszerű Control-Flow-ügynök létrehozása

Tekintse meg az osztályt control_flow_agent meghatározó alábbi példát. Az control_flow_agent osztály három üzenetpufferen működik: egy bemeneti és két kimeneti pufferen. A run metódus a forrásüzenet pufferéből olvas be egy ciklusban, és egy feltételes utasítással irányítja a program végrehajtásának folyamatát. Az ügynök egy számlálót növektet a nem nulla, negatív értékekhez, és egy másik számlálót a nem nulla pozitív értékekhez. Miután az ügynök megkapta a nulla értékű sentinelt, elküldi a számlálók értékeit a kimeneti üzenetpufferekbe. A negatives és positives metódusok lehetővé teszik az alkalmazás számára, hogy beolvassa az ügynöktől a negatív és pozitív értékek számát.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Bár ez a példa alapszintűen használja a vezérlőfolyamatot egy ügynökben, bemutatja a vezérlőfolyamat-alapú programozás soros jellegét. Minden üzenetet egymás után kell feldolgozni, annak ellenére, hogy több üzenet is elérhető lehet a bemeneti üzenet pufferében. Az adatfolyam-modell lehetővé teszi a feltételes utasítás mindkét ágának egyidejű kiértékelését. Az adatfolyam-modell lehetővé teszi összetettebb üzenetkezelési hálózatok létrehozását is, amelyek az adatok rendelkezésre állása során működnek.

[Felső]

Egyszerű adatfolyam-ügynök létrehozása

Ez a szakasz bemutatja, hogyan alakíthatja át az control_flow_agent osztályt úgy, hogy az adatfolyam-modellt használja ugyanarra a feladatra.

Az adatfolyam-ügynök egy üzenetpufferek hálózatának létrehozásával működik, amelyek mindegyike egy adott célt szolgál. Bizonyos üzenetblokkok szűrőfüggvényt használnak arra, hogy az üzenetet a terhelés alapján elfogadják vagy elutasítsák. A szűrőfüggvények biztosítják, hogy az üzenetblokkok csak bizonyos értékeket kapnak.

A vezérlőfolyamat-ügynök átalakítása adatfolyam-ügynökké

  1. Másolja az osztály törzsét control_flow_agent egy másik osztályba, dataflow_agentpéldául. Másik lehetőségként átnevezheti az osztályt control_flow_agent .

  2. Távolítsa el a receive metódusból azt a huroktörzset, amely hívja run.

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. A metódusban a run változók negative_countpositive_countinicializálása után adjon hozzá egy countdown_event objektumot, amely nyomon követi az aktív műveletek számát.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

Az countdown_event osztály később jelenik meg ebben a témakörben.

  1. Hozza létre az adatfolyam-hálózatban részt vevő üzenetpuffer-objektumokat.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Csatlakoztassa az üzenetpuffereket egy hálózat létrehozásához.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Várja meg, amíg a event és countdown event objektumok be lesznek állítva. Ezek az események azt jelzik, hogy az ügynök megkapta a sentinel értéket, és hogy az összes művelet befejeződött.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

Az alábbi ábrán az osztály teljes adatfolyam-hálózata dataflow_agent látható:

Az adatfolyam-hálózat.

Az alábbi táblázat a hálózat tagjait ismerteti.

Tag Leírás
increment_active Egyidejűség::transzformátorobjektum, amely növeli az aktív eseményszámlálót, és átadja a bemeneti értéket a hálózat többi részére.
negatives, positives concurrency::call objektumok, amelyek növelik a számok értékét, és csökkentik az aktív eseményszámlálót. Az objektumok mindegyike szűrővel fogadja el a negatív számokat vagy a pozitív számokat.
sentinel concurrency::call objektum, amely csak a nulla szentinel értéket fogadja el, és csökkenti az aktív eseményszámlálót.
connector concurrency::unbounded_buffer objektum, amely a forrásüzenet-puffert csatlakoztatja a belső hálózathoz.

Mivel a run metódus külön szálon van meghívva, más szálak üzeneteket küldhetnek a hálózatnak, mielőtt a hálózat teljes mértékben csatlakozna. Az _source adattag egy unbounded_buffer objektum, amely puffereli az alkalmazásból az ügynöknek küldött összes bemenetet. Annak érdekében, hogy a hálózat feldolgozza az összes bemeneti üzenetet, az ügynök először a hálózat belső csomópontjait kapcsolja össze, connectormajd az adattaghoz kapcsolja a _source hálózat kezdetét. Ez garantálja, hogy az üzenetek nem lesznek feldolgozva a hálózat kialakításakor.

Mivel a példában szereplő hálózat nem vezérlési folyamaton, hanem adatfolyamon alapul, a hálózatnak közölnie kell az ügynökkel, hogy befejezte az egyes bemeneti értékek feldolgozását, és hogy a sentinel csomópont megkapta az értékét. Ez a példa egy countdown_event objektumot használ annak jelzésére, hogy az összes bemeneti érték feldolgozásra került, és egy egyidejűség::event objektum jelzi, hogy a sentinel csomópont megkapta az értékét. Az countdown_event osztály objektummal event jelzi, ha egy számláló értéke eléri a nullát. Az adatfolyam-hálózat feje minden alkalommal növeli a számlálót, amikor értéket kap. A hálózat minden terminálcsomópontja csökkenti a számlálót a bemeneti érték feldolgozása után. Miután az ügynök létrehozta az adatfolyam-hálózatot, megvárja, amíg a sentinel csomópont beállítja az event objektumot, és az countdown_event objektum jelzi, hogy a számláló elérte a nullát.

Az alábbi példa az control_flow_agent, dataflow_agent és countdown_event osztályokat mutatja be. A wmain függvény létrehoz egy control_flow_agent és egy dataflow_agent objektumot, és a send_values függvény használatával véletlenszerű értékek sorozatát küldi el az ügynököknek.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Ez a példa a következő mintakimenetet hozza létre:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

A kód összeállítása

Másolja ki a példakódot, és illessze be egy Visual Studio-projektbe, vagy illessze be egy elnevezett dataflow-agent.cpp fájlba, majd futtassa a következő parancsot egy Visual Studio parancssori ablakban.

cl.exe /EHsc dataflow-agent.cpp

[Felső]

Message-Logging-ügynök létrehozása

Az alábbi példa az log_agent osztályt mutatja be, amely hasonlít a dataflow_agent osztályra. Az log_agent osztály implementál egy aszinkron naplózási ügynököt, amely naplóüzeneteket ír egy fájlba és a konzolra. Az log_agent osztály lehetővé teszi, hogy az alkalmazás információs, figyelmeztetés vagy hibaként kategorizálja az üzeneteket. Emellett lehetővé teszi az alkalmazás számára, hogy meghatározza, hogy minden naplókategória fájlba, konzolra vagy mindkettőbe van-e írva. Ez a példa az összes naplóüzenetet egy fájlba írja, és csak a konzolra küldött hibaüzeneteket.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

Ez a példa a következő kimenetet írja a konzolra.

error: This is a sample error message.

Ez a példa a log.txt fájlt is létrehozza, amely a következő szöveget tartalmazza.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

A kód összeállítása

Másolja ki a példakódot, és illessze be egy Visual Studio-projektbe, vagy illessze be egy elnevezett log-filter.cpp fájlba, majd futtassa a következő parancsot egy Visual Studio parancssori ablakban.

cl.exe /EHsc log-filter.cpp

[Felső]

Lásd még

Egyidejűségi futtatókörnyezeti útmutatók