Sdílet prostřednictvím


Postupy: Vytvoření agenta toku dat

Tento dokument ukazuje, jak místo toku řízení vytvářet aplikace založené na agentech, které jsou založené na toku dat.

Tok řízení odkazuje na pořadí provádění operací v programu. Tok řízení se reguluje pomocí řídicích struktur, jako jsou podmíněné příkazy, smyčky atd. Tok dat také odkazuje na programovací model, ve kterém jsou výpočty provedeny pouze v případě, že jsou k dispozici všechna požadovaná data. Programovací model toku dat souvisí s konceptem předávání zpráv, ve kterém nezávislé komponenty programu vzájemně komunikují odesláním zpráv.

Asynchronní agenti podporují programovací modely toku řízení i toku dat. I když je model toku řízení vhodný v mnoha případech, model toku dat je vhodný například v jiných případech, když agent přijímá data a provádí akci založenou na datové části těchto dat.

Požadavky

Před zahájením tohoto návodu si přečtěte následující dokumenty:

Oddíly

Tento názorný postup obsahuje následující části:

Vytvoření základního agenta toku řízení

Představte si následující příklad, který definuje control_flow_agent třídu. Třída control_flow_agent funguje na třech vyrovnávacích pamětích zpráv: jedna vstupní vyrovnávací paměť a dvě výstupní vyrovnávací paměti. Metoda run čte ze zdrojové vyrovnávací paměti zpráv ve smyčce a používá podmíněný příkaz pro směrování toku provádění programu. Agent zvýší jeden čítač pro nenulové, záporné hodnoty a zvýší další čítač pro nenulové kladné hodnoty. Jakmile agent obdrží hodnotu sentinelu nula, odešle hodnoty čítačů do vyrovnávací paměti výstupní zprávy. positives Metody negatives umožňují aplikaci číst počty záporných a kladných hodnot z agenta.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

I když tento příklad používá základní použití toku řízení v agentu, ukazuje sériovou povahu programování založeného na řízení-tok. Každá zpráva musí být zpracována postupně, i když může být ve vstupní vyrovnávací paměti zprávy k dispozici více zpráv. Model toku dat umožňuje souběžné vyhodnocení obou větví podmíněného příkazu. Model toku dat také umožňuje vytvářet složitější sítě pro zasílání zpráv, které fungují na datech, jakmile budou k dispozici.

[Nahoře]

Vytvoření základního agenta toku dat

Tato část ukazuje, jak převést control_flow_agent třídu na použití modelu toku dat k provedení stejné úlohy.

Agent toku dat funguje vytvořením sítě vyrovnávacích pamětí zpráv, z nichž každá slouží konkrétnímu účelu. Některé bloky zpráv používají funkci filtru k přijetí nebo odmítnutí zprávy na základě datové části. Funkce filtru zajišťuje, že blok zpráv obdrží pouze určité hodnoty.

Převedení agenta toku řízení na agenta toku dat

  1. Zkopírujte tělo control_flow_agent třídy do jiné třídy, dataflow_agentnapříklad . Případně můžete třídu přejmenovat control_flow_agent .

  2. Odeberte tělo smyčky, která volá receive z run metody.

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. run V metodě po inicializaci proměnných negative_count a positive_countpřidejte countdown_event objekt, který sleduje počet aktivních operací.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

Třída countdown_event se zobrazí dále v tomto tématu.

  1. Vytvořte objekty vyrovnávací paměti zpráv, které se budou účastnit sítě toku dat.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Připojte vyrovnávací paměti zpráv pro vytvoření sítě.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Počkejte, než event countdown event se objekty nastaví. Tyto události signalizují, že agent obdržel hodnotu sentinelu a že všechny operace byly dokončeny.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

Následující diagram znázorňuje úplnou síť toku dat pro dataflow_agent třídu:

Síť toku dat.

Následující tabulka popisuje členy sítě.

Člen Popis
increment_active Objekt concurrency::transformer , který zvýší aktivní čítač událostí a předá vstupní hodnotu do zbytku sítě.
negatives, positives concurrency::call objekty, které zvýší počet čísel a dekrementují aktivní čítač událostí. Každý objekt používá filtr k přijetí záporných nebo kladných čísel.
sentinel Objekt concurrency::call , který přijímá pouze hodnotu sentinelu nula a dekrementuje aktivní čítač událostí.
connector Souběžnost ::unbounded_buffer objekt, který připojuje vyrovnávací paměť zdrojové zprávy k interní síti.

Vzhledem k tomu, že run metoda je volána v samostatném vlákně, ostatní vlákna mohou odesílat zprávy do sítě před tím, než je síť plně připojena. Datový _source člen je unbounded_buffer objekt, který do vyrovnávací paměti veškerý vstup odesílaný z aplikace do agenta. Aby se zajistilo, že síť zpracuje všechny vstupní zprávy, agent nejprve pro linkuje interní uzly sítě a pak pro linkuje začátek této sítě connectors datovým _source členem. To zaručuje, že se zprávy nezpracují při formování sítě.

Vzhledem k tomu, že síť v tomto příkladu je založená na toku dat, a ne na toku řízení, musí síť komunikovat s agentem, že dokončil zpracování každé vstupní hodnoty a že uzel sentinelu obdržel jeho hodnotu. Tento příklad používá countdown_event objekt k označení, že všechny vstupní hodnoty byly zpracovány a objekt concurrency::event označuje, že uzel sentinelu obdržel jeho hodnotu. Třída countdown_event používá event objekt k signalizaci, když hodnota čítače dosáhne nuly. Hlava sítě toku dat zvýší čítač pokaždé, když obdrží hodnotu. Každý terminálový uzel sítě po zpracování vstupní hodnoty dekrementuje čítač. Jakmile agent vytvoří síť toku dat, počká, až uzel sentinelu event nastaví objekt a countdown_event objekt bude signalizovat, že jeho čítač dosáhl nuly.

Následující příklad ukazuje control_flow_agent, dataflow_agenta countdown_event třídy. Funkce wmain vytvoří objekt control_flow_agent a dataflow_agent použije send_values funkci k odeslání řady náhodných hodnot agentům.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Tento příklad vytvoří následující ukázkový výstup:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Probíhá kompilace kódu

Zkopírujte ukázkový kód a vložte ho do projektu sady Visual Studio nebo ho vložte do pojmenovaného dataflow-agent.cpp souboru a potom v okně příkazového řádku sady Visual Studio spusťte následující příkaz.

cl.exe /EHsc dataflow-agent.cpp

[Nahoře]

Vytvoření agenta protokolování zpráv

Následující příklad ukazuje log_agent třídu, která se podobá dataflow_agent třídě. Třída log_agent implementuje asynchronního agenta protokolování, který zapisuje zprávy protokolu do souboru a do konzoly. Třída log_agent umožňuje aplikaci kategorizovat zprávy jako informační, upozornění nebo chyba. Umožňuje také aplikaci určit, jestli se každá kategorie protokolu zapisuje do souboru, konzoly nebo obojího. Tento příklad zapíše všechny zprávy protokolu do souboru a pouze chybové zprávy do konzoly.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

Tento příklad zapíše následující výstup do konzoly.

error: This is a sample error message.

Tento příklad také vytvoří log.txt soubor, který obsahuje následující text.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Probíhá kompilace kódu

Zkopírujte ukázkový kód a vložte ho do projektu sady Visual Studio nebo ho vložte do pojmenovaného log-filter.cpp souboru a potom v okně příkazového řádku sady Visual Studio spusťte následující příkaz.

cl.exe /EHsc log-filter.cpp

[Nahoře]

Viz také

Návody pro Concurrency Runtime