İzlenecek Yol: Veri Akışı Aracısı Oluşturma
Bu belgede, denetim akışı yerine veri akışını temel alan aracı tabanlı uygulamaların nasıl oluşturulacağı gösterilmektedir.
Denetim akışı , bir programdaki işlemlerin yürütme sırasını ifade eder. Denetim akışı, koşullu deyimler, döngüler vb. denetim yapıları kullanılarak düzenlenir. Alternatif olarak, veri akışı yalnızca gerekli tüm veriler kullanılabilir olduğunda hesaplamaların yapıldığı bir programlama modeline başvurur. Veri akışı programlama modeli, bir programın bağımsız bileşenlerinin ileti göndererek birbirleriyle iletişim kurarak ileti geçirme kavramıyla ilgilidir.
Zaman uyumsuz aracılar hem denetim akışı hem de veri akışı programlama modellerini destekler. Denetim akışı modeli çoğu durumda uygun olsa da veri akışı modeli, örneğin bir aracı verileri aldığında ve bu verilerin yükünü temel alan bir eylem gerçekleştirdiğinde diğerlerinde uygundur.
Önkoşullar
Bu izlenecek yolu başlatmadan önce aşağıdaki belgeleri okuyun:
Bölümler
Bu izlenecek yol aşağıdaki bölümleri içerir:
Temel Denetim Akışı Aracısı Oluşturma
sınıfını tanımlayan control_flow_agent
aşağıdaki örneği göz önünde bulundurun. control_flow_agent
sınıfı üç ileti arabelleği üzerinde hareket eder: bir giriş arabelleği ve iki çıkış arabelleği. yöntemi bir run
döngüdeki kaynak ileti arabelleğinden okur ve program yürütme akışını yönlendirmek için koşullu bir deyim kullanır. Aracı sıfır olmayan, negatif değerler için bir sayacı artırır ve sıfır olmayan pozitif değerler için başka bir sayacı artırır. Aracı sıfırın sentinel değerini aldıktan sonra, sayaçların değerlerini çıkış iletisi arabelleklerine gönderir. negatives
ve positives
yöntemleri, uygulamanın aracıdan negatif ve pozitif değerlerin sayısını okumasını sağlar.
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
Bu örnek bir aracıda denetim akışını temel olarak kullansa da, denetim akışı tabanlı programlamanın seri niteliğini gösterir. Giriş iletisi arabelleğinde birden çok ileti kullanılabilse bile her ileti sırayla işlenmelidir. Veri akışı modeli, koşullu deyimin her iki dalının da eşzamanlı olarak değerlendirilmesini sağlar. Veri akışı modeli, kullanılabilir hale geldikçe veriler üzerinde hareket eden daha karmaşık mesajlaşma ağları oluşturmanıza da olanak tanır.
[Üst]
Temel Veri Akışı Aracısı Oluşturma
Bu bölümde, aynı görevi gerçekleştirmek için veri akışı modelini kullanmak üzere sınıfın nasıl dönüştürüldüğü control_flow_agent
gösterilmektedir.
Veri akışı aracısı, her biri belirli bir amaca hizmet eden bir ileti arabellekleri ağı oluşturarak çalışır. Bazı ileti blokları, yükü temelinde bir iletiyi kabul etmek veya reddetmek için bir filtre işlevi kullanır. Filtre işlevi, bir ileti bloğunun yalnızca belirli değerleri almasını sağlar.
Denetim akışı aracısını bir veri akışı aracısına dönüştürmek için
sınıfının gövdesini
control_flow_agent
başka bir sınıfa kopyalayın, örneğin,dataflow_agent
. Alternatif olarak sınıfı yeniden adlandırabilirsinizcontrol_flow_agent
.yönteminden çağıran
receive
döngünün gövdesinirun
kaldırın.
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
- yönteminde
run
, vepositive_count
değişkenlerininnegative_count
başlatılmasından sonra etkin işlemlerin sayısını izleyen bircountdown_event
nesne ekleyin.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
sınıfı countdown_event
bu konunun ilerleyen bölümlerinde gösterilir.
- Veri akışı ağına katılacak ileti arabelleği nesnelerini oluşturun.
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
- Ağ oluşturmak için ileti arabelleklerini bağlayın.
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
- ve
countdown event
nesnelerininevent
ayarlanmasını bekleyin. Bu olaylar, aracının sentinel değerini aldığını ve tüm işlemlerin tamamlandığını gösterir.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
Aşağıdaki diyagramda sınıfı için tam veri akışı ağı gösterilmektedir dataflow_agent
:
Aşağıdaki tabloda ağ üyeleri açıklanmaktadır.
Üye | Açıklama |
---|---|
increment_active |
Etkin olay sayacını artıran ve giriş değerini ağın geri kalanına geçiren concurrency::transformer nesnesi. |
negatives , positives |
eşzamanlılık::sayı sayısını artıran ve etkin olay sayacını azalan nesneleri çağırın . Nesnelerin her biri negatif sayıları veya pozitif sayıları kabul etmek için bir filtre kullanır. |
sentinel |
Yalnızca sıfırın sentinel değerini kabul eden ve etkin olay sayacını azalan concurrency::call nesnesi. |
connector |
Kaynak ileti arabelleği iç ağa bağlayan eşzamanlılık::unbounded_buffer nesnesi. |
run
Yöntemi ayrı bir iş parçacığında çağrıldığından, ağ tam olarak bağlanmadan önce diğer iş parçacıkları ağa ileti gönderebilir. Veri _source
üyesi, uygulamadan aracıya gönderilen tüm girişleri arabelleğe alan bir unbounded_buffer
nesnedir. Ağın tüm giriş iletilerini işlediğinden emin olmak için, aracı önce ağın iç düğümlerini ve ardından bu ağın connector
başlangıcını veri üyesine _source
bağlar. Bu, ağ oluşturulurken iletilerin işlenmemesini garanti eder.
Bu örnekteki ağ denetim akışı yerine veri akışını temel aldığından, ağ her giriş değerini işlemeyi bitirdiğini ve sentinel düğümünü değerini aldığını aracıyla iletişim kurmalıdır. Bu örnekte, tüm giriş değerlerinin işlendiğini belirten bir countdown_event
nesne ve sentinel düğümünü değerini aldığını belirtmek için concurrency ::event nesnesi kullanılır. sınıfı, countdown_event
bir event
sayaç değeri sıfıra ulaştığında sinyal vermek için bir nesnesi kullanır. Veri akışı ağının başı, her değer aldığında sayacı artırır. Ağın her terminal düğümü, giriş değerini işledikten sonra sayacı ayırır. Aracı veri akışı ağını oluşturduktan sonra sentinel düğümünü nesneyi ayarlamasını event
ve nesnenin sayacının countdown_event
sıfıra ulaştığını sinyallemesini bekler.
Aşağıdaki örnekte , dataflow_agent
ve countdown_event
sınıfları gösterilmektedircontrol_flow_agent
. wmain
işlevi bir control_flow_agent
ve dataflow_agent
nesnesi oluşturur ve aracılara bir dizi rastgele değer göndermek için işlevini kullanırsend_values
.
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
Bu örnek aşağıdaki örnek çıkışı oluşturur:
Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Kod Derleniyor
Örnek kodu kopyalayıp bir Visual Studio projesine yapıştırın veya adlı dataflow-agent.cpp
bir dosyaya yapıştırın ve ardından bir Visual Studio Komut İstemi penceresinde aşağıdaki komutu çalıştırın.
cl.exe /EHsc dataflow-agent.cpp
[Üst]
İleti Günlüğü Aracısı Oluşturma
Aşağıdaki örnekte sınıfına log_agent
benzeyen sınıfı gösterilmektedir dataflow_agent
. sınıfı, log_agent
bir dosyaya ve konsola günlük iletileri yazan zaman uyumsuz bir günlük aracısı uygular. sınıfı, log_agent
uygulamanın iletileri bilgilendirme, uyarı veya hata olarak kategorilere ayırmasını sağlar. Ayrıca uygulamanın her günlük kategorisinin bir dosyaya mı, konsola mı yoksa her ikisine mi yazılacağını belirtmesini sağlar. Bu örnek, tüm günlük iletilerini bir dosyaya ve yalnızca hata iletilerini konsola yazar.
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
Bu örnek konsola aşağıdaki çıkışı yazar.
error: This is a sample error message.
Bu örnek, aşağıdaki metni içeren log.txt dosyasını da oluşturur.
info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===
Kod Derleniyor
Örnek kodu kopyalayıp bir Visual Studio projesine yapıştırın veya adlı log-filter.cpp
bir dosyaya yapıştırın ve ardından bir Visual Studio Komut İstemi penceresinde aşağıdaki komutu çalıştırın.
cl.exe /EHsc log-filter.cpp
[Üst]