Panduan: Membuat Agen Aliran Data

Dokumen ini menunjukkan cara membuat aplikasi berbasis agen yang didasarkan pada aliran data, alih-alih alur kontrol.

Alur kontrol mengacu pada urutan eksekusi operasi dalam suatu program. Alur kontrol diatur dengan menggunakan struktur kontrol seperti pernyataan kondisional, perulangan, dan sebagainya. Atau, aliran data mengacu pada model pemrograman di mana komputasi dibuat hanya ketika semua data yang diperlukan tersedia. Model pemrograman aliran data terkait dengan konsep pengiriman pesan, di mana komponen independen program berkomunikasi satu sama lain dengan mengirim pesan.

Agen asinkron mendukung model pemrograman aliran kontrol dan aliran data. Meskipun model aliran kontrol sesuai dalam banyak kasus, model aliran data sesuai di orang lain, misalnya, ketika agen menerima data dan melakukan tindakan yang didasarkan pada payload data tersebut.

Prasyarat

Baca dokumen berikut sebelum Anda memulai panduan ini:

Bagian

Panduan ini berisi bagian berikut:

Membuat Agen Aliran Kontrol Dasar

Pertimbangkan contoh berikut yang menentukan control_flow_agent kelas . Kelas control_flow_agent bertindak pada tiga buffer pesan: satu buffer input dan dua buffer output. Metode ini run membaca dari buffer pesan sumber dalam perulangan dan menggunakan pernyataan kondisi untuk mengarahkan alur eksekusi program. Agen menaikkan satu penghitung untuk nilai non-nol, negatif, dan kenaikan penghitung lain untuk nilai positif non-nol. Setelah agen menerima nilai sentinel nol, agen mengirim nilai penghitung ke buffer pesan output. Metode negatives dan positives memungkinkan aplikasi membaca hitungan nilai negatif dan positif dari agen.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Meskipun contoh ini membuat penggunaan dasar alur kontrol dalam agen, contoh ini menunjukkan sifat serial pemrograman berbasis aliran kontrol. Setiap pesan harus diproses secara berurutan, meskipun beberapa pesan mungkin tersedia di buffer pesan input. Model aliran data memungkinkan kedua cabang pernyataan kondisional untuk mengevaluasi secara bersamaan. Model aliran data juga memungkinkan Anda membuat jaringan olahpesan yang lebih kompleks yang bertindak berdasarkan data saat tersedia.

[Atas]

Membuat Agen Aliran Data Dasar

Bagian ini memperlihatkan cara mengonversi control_flow_agent kelas untuk menggunakan model aliran data untuk melakukan tugas yang sama.

Agen aliran data bekerja dengan membuat jaringan buffer pesan, yang masing-masing melayani tujuan tertentu. Blok pesan tertentu menggunakan fungsi filter untuk menerima atau menolak pesan berdasarkan payload-nya. Fungsi filter memastikan bahwa blok pesan hanya menerima nilai tertentu.

Untuk mengonversi agen aliran kontrol ke agen aliran data

  1. Salin isi kelas ke control_flow_agent kelas lain, misalnya, dataflow_agent. Atau, Anda dapat mengganti nama control_flow_agent kelas.

  2. Hapus isi perulangan yang memanggil receive dari run metode .

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. run Dalam metode , setelah inisialisasi negative_count variabel dan positive_count, tambahkan countdown_event objek yang melacak jumlah operasi aktif.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

Kelas countdown_event ditampilkan nanti dalam topik ini.

  1. Buat objek buffer pesan yang akan berpartisipasi dalam jaringan aliran data.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Koneksi buffer pesan untuk membentuk jaringan.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Tunggu hingga event objek dan countdown event diatur. Peristiwa ini menandakan bahwa agen telah menerima nilai sentinel dan bahwa semua operasi telah selesai.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

Diagram berikut menunjukkan jaringan aliran data lengkap untuk dataflow_agent kelas :

The dataflow network.

Tabel berikut ini menjelaskan anggota jaringan.

Anggota Deskripsi
increment_active Objek konkurensi::transformer yang menaikkan penghitung peristiwa aktif dan meneruskan nilai input ke jaringan lainnya.
negatives, positives concurrency::call objects yang menaikkan jumlah angka dan penurunan penghitung peristiwa aktif. Objek masing-masing menggunakan filter untuk menerima angka negatif atau angka positif.
sentinel Objek konkurensi::panggilan yang hanya menerima nilai sentinel nol dan mengurangi penghitung peristiwa aktif.
connector Objek konkurensi::unbounded_buffer yang menyambungkan buffer pesan sumber ke jaringan internal.

run Karena metode ini dipanggil pada utas terpisah, utas lain dapat mengirim pesan ke jaringan sebelum jaringan tersambung sepenuhnya. Anggota _source data adalah unbounded_buffer objek yang menyangga semua input yang dikirim dari aplikasi ke agen. Untuk memastikan bahwa jaringan memproses semua pesan input, agen terlebih dahulu menautkan simpul internal jaringan lalu menautkan awal jaringan tersebut, connector, ke _source anggota data. Ini menjamin bahwa pesan tidak diproses karena jaringan sedang dibentuk.

Karena jaringan dalam contoh ini didasarkan pada aliran data, bukan pada aliran kontrol, jaringan harus berkomunikasi dengan agen bahwa jaringan telah selesai memproses setiap nilai input dan bahwa simpul sentinel telah menerima nilainya. Contoh ini menggunakan objek untuk memberi sinyal bahwa semua nilai input telah diproses countdown_event dan objek konkurensi::peristiwa untuk menunjukkan bahwa simpul sentinel telah menerima nilainya. Kelas countdown_event menggunakan objek untuk memberi event sinyal ketika nilai penghitung mencapai nol. Kepala jaringan aliran data menaikkan penghitung setiap kali menerima nilai. Setiap simpul terminal jaringan mengurangi penghitung setelah memproses nilai input. Setelah agen membentuk jaringan aliran data, ia menunggu node sentinel untuk mengatur event objek dan agar countdown_event objek memberi sinyal bahwa penghitungnya telah mencapai nol.

Contoh berikut menunjukkan control_flow_agentkelas , dataflow_agent, dan countdown_event . Fungsi ini wmain membuat control_flow_agent objek dan dataflow_agent dan menggunakan send_values fungsi untuk mengirim serangkaian nilai acak ke agen.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Contoh ini menghasilkan contoh output berikut:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Mengompilasi Kode

Salin kode contoh dan tempelkan dalam proyek Visual Studio, atau tempelkan dalam file yang diberi nama dataflow-agent.cpp lalu jalankan perintah berikut di jendela Prompt Perintah Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

[Atas]

Membuat Agen Pengelogan Pesan

Contoh berikut menunjukkan log_agent kelas , yang menyerupai dataflow_agent kelas . Kelas mengimplementasikan log_agent agen pengelogan asinkron yang menulis pesan log ke file dan ke konsol. Kelas log_agent memungkinkan aplikasi untuk mengategorikan pesan sebagai informasi, peringatan, atau kesalahan. Ini juga memungkinkan aplikasi untuk menentukan apakah setiap kategori log ditulis ke file, konsol, atau keduanya. Contoh ini menulis semua pesan log ke file dan hanya pesan kesalahan ke konsol.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

Contoh ini menulis output berikut ke konsol.

error: This is a sample error message.

Contoh ini juga menghasilkan file log.txt, yang berisi teks berikut.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Mengompilasi Kode

Salin kode contoh dan tempelkan dalam proyek Visual Studio, atau tempelkan dalam file yang diberi nama log-filter.cpp lalu jalankan perintah berikut di jendela Prompt Perintah Visual Studio.

cl.exe /EHsc log-filter.cpp

[Atas]

Baca juga

Panduan Runtime Konkurensi