Aracılığıyla paylaş


İzlenecek yol: Dataflow Aracısı oluşturma

Bu belge, akış denetimi yerine dataflow esas aracı tabanlı uygulamalar oluşturmak gösterilmiştir.

Akış denetimi bir programda işlemleri yürütme sırasını gösterir.Koşul deyimleri, döngüler ve benzeri gibi denetim yapıları kullanarak denetimi akışı düzeltildi.Alternatif olarak, dataflow hesaplamaların yapıldığı yalnızca gerekli tüm verileri olduğunda kullanılabilir bir programlama modeline başvurur.Dataflow programlama modeli, ileti kavramı içinde bağımsız bir program bileşenlerinin iletileri göndererek birbirleriyle ilişkilidir.

Zaman uyumsuz aracıları akış denetimi ve dataflow programlama modellerini destekler.Akış denetimi modeli birçok durumda uygun olsa da, aracı veri alır ve veri yükü üzerinde temel alan bir eylem gerçekleştirir dataflow bazı durumlarda, örneğin, en uygun modeldir.

Önkoşullar

Bu izlenecek yolda, başlamadan önce aşağıdaki belgeleri okuyun:

Bölümler

Bu izlenecek yolda, aşağıdaki bölümleri içerir:

  • Temel akış denetimi aracı oluşturma

  • Temel Dataflow Aracısı oluşturma

  • İleti günlüğü Aracısı oluşturma

Temel akış denetimi aracı oluşturma

Tanımlar aþaðýdaki örneði ele alalým control_flow_agent sınıfı.control_flow_agent Üç ileti arabellek sınıf davranır: arabellek bir giriş ve iki çıkış arabelleği.run Yöntemi bir döngü kaynak ileti arabelleğe alınan okur ve program akışını yönlendirmek için bir koşul deyimi kullanır.Aracı sıfır, negatif değerler için bir sayacı artırır ve sıfır olmayan, pozitif değerler için başka bir sayacı artırır.Sentinel değerinin sıfır Aracısı aldıktan sonra sayaçların değerleri çıktı ileti arabelleği gönderir.negatives Ve positives Aracısı'ndan negatif ve pozitif değerlerin sayıları okumak uygulama yöntemleri etkinleştirin.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Bu örnek aracı, temel akış denetimi kullanımını sağlar, ancak Denetim akış tabanlı programlama seri yapısını göstermektedir.Her iletinin birden çok ileti giriş iletisi arabellek kullanılabilir halde ardışık olarak işlenmelidir.Aynı anda değerlendirmek için koşullu ifadenin her iki dalları dataflow modeli sağlar.Dataflow model verileri kullanılabilir duruma gibi davranan ileti daha karmaşık ağlar oluşturmanıza olanak sağlar.

Top

Temel Dataflow Aracısı oluşturma

Bu bölüm nasıl dönüştürüleceğini gösterir control_flow_agent dataflow modeli aynı görevi gerçekleştirmek için kullanılacak sınıfı.

Dataflow aracı, her biri belirli bir amaca hizmet eder bir ağ iletisi arabelleklerinin oluşturarak çalışır.Belirli ileti blokları, kabul etmek veya reddetmek yük üzerinde bir ileti için bir filtre işlevini kullanın.Filter işlevi, ileti bloğu yalnızca belirli değerleri almasını sağlar.

Akış denetimi aracı dataflow öğesine dönüştürmek için

  1. Gövde metni kopyalamak control_flow_agent için başka bir sınıf, sınıf dataflow_agent.Alternatif olarak, adını control_flow_agent sınıfı.

  2. Çağıran Döngünün gövdesi kaldırma receive dan run yöntemi.

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. De run sonra değişkenlerini başlatma yöntemi negative_count ve positive_count, ekleme bir countdown_event etkin işlem sayısı izler nesnesi.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    countdown_event Sınıfı, bu konudaki gösterilir.

  4. İleti dataflow ağa katılacak arabellek nesneleri oluşturun.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Bir ağ oluşturmak için ileti arabelleği bağlayın.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Bekle event ve countdown event nesneleri ayarlanacak.Bu olaylar, Aracısı sentinel değerini aldı ve tüm işlemleri bitirdikten sinyal.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

Aşağıdaki diyagram için tam dataflow ağ gösterir dataflow_agent sınıfı:

Ağ veri akışı

Aşağıdaki tabloda ağ üyeleri açıklanmaktadır.

Üye

Description

increment_active

A concurrency::transformer nesne etkin olay sayacı artırır ve ağın geri kalanı için girdi değeri geçirir.

negatives, positives

CONCURRENCY::call sayı sayısını azaltır etkin olay sayacı artırılmasına nesneler.Her nesne negatif sayılar ya da pozitif sayılar kabul etmek için bir filtre kullanın.

sentinel

A concurrency::call etkin olay sayacı yalnızca sentinel değeri sıfır azaltır ve kabul eden bir nesne.

connector

A concurrency::unbounded_buffer kaynak ileti arabelleği iç ağa bağlayan bir nesne.

Çünkü run yöntemi olarak adlandırılan ayrı bir iş parçacığı, ağ tamamen bağlı önce diğer iş ağ iletileri gönderebilir._source Veri üyesi olan bir unbounded_buffer Aracısı uygulamasından gönderilen tüm giriş arabelleği nesnesi.Tüm iletileri ağ işlemleri, aracı ilk iç düğümler ağ bağlantıları ve sonra bu ağ başlatma bağlar emin olmak için connector, çok _source veri üyesi.Bu gibi ağ oluşturulmuş iletileri formlarýnýn değil garanti eder.

Bu örnekte ağ üzerinde dataflow dayandığından, yerine Denetim akış üzerinde Ağ aracı, her giriş değeri işlemeyi bitirdiğinden ve sentinel düğüm değerini aldı iletmelisiniz.Bu örnek bir countdown_event tüm giriş değerleri işlenen sinyal nesne ve bir concurrency::event sentinel düğüm değerini aldığını belirtmek için nesne.countdown_event Sınıfını kullanan bir event nesnesine sayaç değeri sıfır olduğunda sinyal.O BT değerini alır her head dataflow ağ sayacı artırır.Her terminal düğüm ağ azaltır, girdi değeri işler sayaç.Dataflow ağ Aracısı oluşturur sonra sentinel düğümünü ayarlamak bekler event nesnesi ve countdown_event nesne, sayaç sıfıra ulaştı sinyal.

Aşağıdaki örnekte gösterildiği control_flow_agent, dataflow_agent, ve countdown_event sınıfları.wmain İşlev oluşturur bir control_flow_agent ve dataflow_agent nesnesi ve kullanımları send_values aracıları rasgele değerler dizisini göndermek için işlevi.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Bu örnek, aşağıdaki örnek çıktı üretir:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Ff398051.collapse_all(tr-tr,VS.110).gifKod Derleniyor

Örnek kodu kopyalayın ve Visual Studio Project'te yapıştırın veya adlı bir dosyaya yapıştırın dataflow agent.cpp ve Visual Studio komut istemi penceresinde aşağıdaki komutu çalıştırın.

cl.exe /EHsc dataflow-agent.cpp

Top

İleti günlüğü Aracısı oluşturma

Aşağıdaki örnekte gösterildiği log_agent benzer sınıfı dataflow_agent sınıfı.log_agent Sınıfı bir zaman uyumsuz günlük Aracısı yazar bir dosyaya ve konsol iletileri günlüğe uygular.log_agent Olarak bilgilendirme iletisi, uyarı veya hata kategorilere ayırmak uygulama sınıfı sağlar.Ayrıca, her bir günlük kategorisi dosya, konsol veya her ikisini de yazılan belirtmek uygulama sağlar.Bu örnek, tüm bir dosyaya günlük iletileri ve hata iletileri konsola yazar.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

Bu örnek, aşağıdaki çıkışı konsola yazacak.

error: This is a sample error message.

Bu örnek, aşağıdaki metni içeren dosya log.txt de üretir.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Ff398051.collapse_all(tr-tr,VS.110).gifKod Derleniyor

Örnek kodu kopyalayın ve Visual Studio Project'te yapıştırın veya adlı bir dosyaya yapıştırın Günlük filter.cpp ve Visual Studio komut istemi penceresinde aşağıdaki komutu çalıştırın.

cl.exe /EHsc log-filter.cpp

Top

Ayrıca bkz.

Diğer Kaynaklar

Eşzamanlılık çalışma zamanı izlenecek yollar