Udostępnij za pośrednictwem


Wskazówki: tworzenie agenta przepływu danych

W tym dokumencie przedstawiono sposób tworzenia aplikacji opartych na agentach opartych na przepływie danych zamiast przepływu sterowania.

Przepływ sterowania odnosi się do kolejności wykonywania operacji w programie. Przepływ sterowania jest regulowany za pomocą struktur sterujących, takich jak instrukcje warunkowe, pętle itd. Alternatywnie przepływ danych odnosi się do modelu programowania, w którym obliczenia są wykonywane tylko wtedy, gdy wszystkie wymagane dane są dostępne. Model programowania przepływu danych jest związany z koncepcją przekazywania komunikatów, w której niezależne składniki programu komunikują się ze sobą przez wysyłanie komunikatów.

Agenci asynchroniczna obsługują zarówno modele programowania przepływu sterowania, jak i przepływu danych. Chociaż model przepływu sterowania jest odpowiedni w wielu przypadkach, model przepływu danych jest odpowiedni w innych, na przykład gdy agent odbiera dane i wykonuje akcję opartą na ładunku tych danych.

Wymagania wstępne

Przed rozpoczęciem tego przewodnika zapoznaj się z następującymi dokumentami:

Sekcje

Ten przewodnik zawiera następujące sekcje:

Tworzenie podstawowego agenta przepływu sterowania

Rozważmy następujący przykład, który definiuje klasę control_flow_agent . Klasa control_flow_agent działa na trzech buforach komunikatów: jeden bufor wejściowy i dwa bufory wyjściowe. Metoda run odczytuje z buforu komunikatów źródłowych w pętli i używa instrukcji warunkowej do kierowania przepływu wykonywania programu. Agent zwiększa jeden licznik dla wartości niezerowych, ujemnych i zwiększa inny licznik dla wartości niezerowych, dodatnich. Gdy agent otrzyma wartość sentinel o wartości zero, wysyła wartości liczników do buforów komunikatów wyjściowych. Metody negatives i positives umożliwiają aplikacji odczytywanie liczb ujemnych i dodatnich z agenta.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Mimo że w tym przykładzie podstawowy sposób korzystania z przepływu sterowania w agencie pokazuje szeregowy charakter programowania opartego na przepływie sterowania. Każdy komunikat musi być przetwarzany sekwencyjnie, mimo że wiele komunikatów może być dostępnych w buforze komunikatów wejściowych. Model przepływu danych umożliwia jednoczesne ocenianie obu gałęzi instrukcji warunkowej. Model przepływu danych umożliwia również tworzenie bardziej złożonych sieci obsługi komunikatów, które działają na danych, gdy staną się dostępne.

[Top]

Tworzenie podstawowego agenta przepływu danych

W tej sekcji pokazano, jak przekonwertować klasę control_flow_agent na użycie modelu przepływu danych w celu wykonania tego samego zadania.

Agent przepływu danych działa przez utworzenie sieci buforów komunikatów, z których każdy służy do określonego celu. Niektóre bloki komunikatów używają funkcji filtru do akceptowania lub odrzucania komunikatu na podstawie ładunku. Funkcja filtru zapewnia, że blok komunikatów odbiera tylko określone wartości.

Aby przekonwertować agenta sterowania przepływem do agenta przepływu danych

  1. Skopiuj treść control_flow_agent klasy do innej klasy, na przykład dataflow_agent. Alternatywnie możesz zmienić nazwę control_flow_agent klasy.

  2. Usuń treść pętli, która wywołuje receive metodę run .

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. W metodzie run po zainicjowaniu zmiennych negative_count i positive_countdodaj countdown_event obiekt, który śledzi liczbę aktywnych operacji.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

Klasa zostanie wyświetlona countdown_event w dalszej części tego tematu.

  1. Utwórz obiekty buforu komunikatów, które będą uczestniczyć w sieci przepływu danych.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Połączenie bufory komunikatów w celu utworzenia sieci.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Poczekaj event na ustawienie obiektów i .countdown event Te zdarzenia sygnalizowały, że agent otrzymał wartość sentinel i że wszystkie operacje zostały zakończone.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

Na poniższym diagramie przedstawiono pełną sieć przepływu danych dla dataflow_agent klasy:

The dataflow network.

W poniższej tabeli opisano elementy członkowskie sieci.

Element członkowski opis
increment_active Obiekt concurrency::transformer , który zwiększa aktywny licznik zdarzeń i przekazuje wartość wejściową do pozostałej części sieci.
negatives, positives concurrency::wywołaj obiekty, które zwiększają liczbę liczb i dekrementują aktywny licznik zdarzeń. Każdy obiekt używa filtru do akceptowania liczb ujemnych lub liczb dodatnich.
sentinel Obiekt współbieżności::call , który akceptuje tylko wartość sentinel o wartości zero i dekrementuje aktywny licznik zdarzeń.
connector Obiekt współbieżności::unbounded_buffer , który łączy bufor komunikatu źródłowego z siecią wewnętrzną.

run Ponieważ metoda jest wywoływana w osobnym wątku, inne wątki mogą wysyłać komunikaty do sieci, zanim sieć zostanie w pełni połączona. Element _source członkowski danych jest obiektem unbounded_buffer , który buforuje wszystkie dane wejściowe wysyłane z aplikacji do agenta. Aby upewnić się, że sieć przetwarza wszystkie komunikaty wejściowe, agent najpierw łączy wewnętrzne węzły sieci, a następnie łączy początek tej sieci, connector, z elementem _source członkowskim danych. Gwarantuje to, że komunikaty nie są przetwarzane w miarę tworzenia sieci.

Ponieważ sieć w tym przykładzie jest oparta na przepływie danych, a nie na przepływie sterowania, sieć musi komunikować się z agentem, że zakończył przetwarzanie każdej wartości wejściowej i że węzeł sentinel otrzymał jego wartość. W tym przykładzie countdown_event użyto obiektu, aby zasygnalizować, że wszystkie wartości wejściowe zostały przetworzone, oraz obiekt concurrency::event wskazujący, że węzeł sentinel otrzymał jego wartość. Klasa countdown_event używa event obiektu do sygnalizatora, gdy wartość licznika osiągnie zero. Szef sieci przepływu danych zwiększa licznik za każdym razem, gdy otrzymuje wartość. Każdy węzeł terminalu sieci dekrementuje licznik po przetwarzaniu wartości wejściowej. Po utworzeniu przez agenta sieci przepływu danych oczekuje, aż węzeł sentinel ustawi event obiekt i zasygnalizuje countdown_event , że jego licznik osiągnął zero.

W poniższym przykładzie przedstawiono control_flow_agentklasy , dataflow_agenti countdown_event . Funkcja wmain tworzy control_flow_agent obiekt i dataflow_agent używa send_values funkcji do wysyłania serii losowych wartości do agentów.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

W tym przykładzie są generowane następujące przykładowe dane wyjściowe:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Kompilowanie kodu

Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wklej go w pliku o nazwie dataflow-agent.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia programu Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

[Top]

Tworzenie agenta rejestrowania komunikatów

W poniższym przykładzie przedstawiono klasę log_agent podobną do dataflow_agent klasy. Klasa log_agent implementuje asynchronicznego agenta rejestrowania, który zapisuje komunikaty dziennika w pliku i w konsoli programu . Klasa log_agent umożliwia aplikacji kategoryzowanie komunikatów jako informacji, ostrzeżeń lub błędów. Umożliwia również aplikacji określenie, czy każda kategoria dziennika jest zapisywana w pliku, konsoli, czy w obu tych przypadkach. W tym przykładzie wszystkie komunikaty dziennika są zapisywane w pliku i tylko komunikaty o błędach w konsoli programu .

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

W tym przykładzie dane wyjściowe są zapisywane w konsoli programu .

error: This is a sample error message.

W tym przykładzie jest również generowany plik log.txt, który zawiera następujący tekst.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Kompilowanie kodu

Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wklej go w pliku o nazwie log-filter.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia programu Visual Studio.

cl.exe /EHsc log-filter.cpp

[Top]

Zobacz też

Środowisko uruchomieniowe współbieżności — wskazówki