Osvědčené postupy v knihovně asynchronních agentů
Tento dokument popisuje, jak využít asynchronního knihovny agentů.Knihovny agentů podporuje programovací model založený na herce a zprávy v procesu předávání pro hrubý tok dat a úkoly kanálů.
Další informace o knihovně agenti, viz Knihovna asynchronních agentů.
Oddíly
Tento dokument obsahuje následující oddíly:
Izolace stavů pomocí agentů
Omezení počtu zpráv v kanálu dat pomocí mechanismu omezování
Neprovádějte detailní operace v kanálu dat
Nepředávejte zprávy hodnotou, pokud mají velkou datovou část
Použijte ukazatele shared_ptr v datových sítích bez definovaného vlastnictví
Izolace stavů pomocí agentů
Knihovny agentů poskytuje alternativy k sdíleného stavu tím, že umožňuje připojení izolovaných součástí přes asynchronní mechanismus předávání zpráv.Asynchronní agenti jsou nejúčinnější při jejich zjištění jejich vnitřní stav z jiných komponent.Izolováním stát více součástí nevztahovala obvykle na sdílená data.Stav izolace můžete povolit aplikace měřítko, protože snižuje tvrzení na sdílené paměti.Stavu izolace také snižuje pravděpodobnost zablokování a rasy podmínky, protože součásti nemají synchronizovat přístup ke sdíleným datům.
Obvykle izolovat tak, že podržíte datové členy v stavu agenta private nebo protected body agent třídy a pomocí vyrovnávací paměti zprávy sdělovat informace o změně stavu.Následující příklad ukazuje basic_agent třídy, která je odvozena z concurrency::agent.basic_agent Třída používá ke komunikaci s externí součásti dvě vyrovnávací paměti zprávy.Jedna zpráva vyrovnávací paměť obsahuje příchozí zprávy; Další zpráva vyrovnávací paměti uchovává odchozí zprávy.
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
basic_agent(concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrives the message buffer that holds output messages.
concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
concurrency::unbounded_buffer<int> _output;
};
Kompletní příklady o tom, jak definovat a používat agenti naleznete v tématu Návod: Vytvoření aplikace založené na agentovi a Postupy: Vytvoření agenta toku dat.
[Nahoře]
Omezení počtu zpráv v kanálu dat pomocí mechanismu omezování
Mnoho typů vyrovnávací paměť zpráv, jako je například concurrency::unbounded_buffer, může obsahovat neomezený počet zpráv.Pokud výrobce zprávu odesílá zprávy kanálu data rychleji, než spotřebitel může zpracovávat tyto zprávy, aplikace můžete zadat stav nedostatku paměti nebo z důvodu nedostatku paměti.Můžete omezit počet zpráv, které jsou současně aktivní v kanálu data omezení mechanismu, například semafor.
Základní příklad ukazuje, jak použít semafor omezit počet zpráv v kanálu data.Data pro seznam příležitostí používá concurrency::wait funkce pro simulaci operace, která trvá nejméně 100 milisekund.Vzhledem k tomu, že odesílatel vytváří rychleji, než příjemce tyto zprávy můžete zpracovávat zprávy, v tomto příkladu definuje semaphore třídy, které umožňují aplikacím pro omezení počtu aktivních zpráv.
// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(long long count)
: _current(count)
{
// Set the event if the initial count is zero.
if (_current == 0LL)
_event.set();
}
// Decrements the event counter.
void signal() {
if(--_current == 0LL) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(++_current == 1LL) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
atomic<long long> _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const long long MessageCount = 5;
// The number of messages that can be active at the same time.
const long long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(auto i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
/* Sample output:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
*/
semaphore Objekt omezuje příležitosti zpracovat současně nejvýše dvě zprávy.
Výrobce v tomto příkladu odešle relativně malý počet zpráv pro spotřebitele.V tomto příkladu tedy neuvádí potenciální podmínku nedostatku paměti nebo z důvodu nedostatku paměti.Tento mechanismus je však užitečné při poměrně vysoký počet zpráv obsahuje datový kanál.
Další informace o tom, jak vytvořit semafor třídy, která je použita v tomto příkladu naleznete v tématu Postupy: Použití třídy kontextu pro implementaci semaforu pro spolupráci.
[Nahoře]
Neprovádějte detailní operace v kanálu dat
Knihovny agentů je nejužitečnější při práci, které je prováděno pomocí datového kanálu je velmi hrubý.Jednu součást aplikace může například číst data ze souboru nebo připojení k síti a občas odesíláním dat na jiné komponenty.Protokol, který používá knihovny agentů k šíření zpráv způsobuje, že mechanismus předávání zpráv mít další režii než úkol paralelní konstrukce, poskytované Parallel Library vzory (PPL).Proto ujistěte se, že práce, kterou provádí datového kanálu dostatečně dlouhý posun toto zatížení.
Ačkoli data kanálu je nejúčinnější, pokud jeho úkoly jsou hrubý, každé fázi datový kanál slouží k provádění více detailní práce konstrukce PPL skupiny úkolů a paralelní algoritmy.Příklad používající v každé fázi zpracování podrobného paralelismu hrubý datové sítě naleznete v tématu Návod: Vytvoření sítě pro zpracování obrázků.
[Nahoře]
Nepředávejte zprávy hodnotou, pokud mají velkou datovou část
V některých případech modul runtime vytvoří kopii každé zprávy předá z jedné vyrovnávací paměti zprávy do jiné zprávy vyrovnávací paměti.Například concurrency::overwrite_buffer třída nabízí kopie všech zpráv, které obdrží na každý jejich cíle.Modul runtime také vytvoří kopie dat zprávu při použití funkce předávání zpráv, jako concurrency::send a concurrency::receive psát zprávy a číst zprávy z vyrovnávací paměti zprávy.Přestože tento mechanismus pomáhá eliminovat riziko souběžně zápis ke sdíleným datům, by mohlo vést k nedostatečné paměti výkon při datová část zprávy je poměrně velký.
Můžete použít odkazy nebo odkazy pro zlepšení výkonu paměti při předání zprávy mají velké datové části.Následující příklad porovnává předají objemné zprávy podle hodnoty pro předání ukazatele zprávy stejného typu.V příkladu jsou definovány dva typy agent, producer a consumer, který působí na message_data objekty.V příkladu porovná čas, který je vyžadován pro výrobce odeslat několik message_data objekty pro spotřebitele na dobu, která je vyžadována pro výrobce agent odeslat několik ukazatelů na message_data objekty pro spotřebitele.
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
Tento příklad vytvoří následující výstup:
Verze, která používá ukazatele provádí lépe, protože odstraňuje požadavek na modul runtime k vytvoření úplné kopie všech message_data objekt, který předává od výrobce ke spotřebiteli.
[Nahoře]
Použijte ukazatele shared_ptr v datových sítích bez definovaného vlastnictví
Při odesílání zpráv pomocí ukazatele prostřednictvím předávání zpráv kanálu nebo sítě obvykle dostatek paměti pro každou zprávu na přední straně sítě a uvolnit paměť, že na konci sítě.Ačkoli tento mechanismus často funguje dobře, existují případy, ve kterých je obtížné nebo není možné jej používat.Zvažte například případ, kdy datová síť obsahuje více koncové uzly.V tomto případě neexistuje žádný jasný místo uvolnit paměť pro zprávy.
Chcete-li tento problém vyřešit, můžete použít mechanismu, například std::shared_ptr, umožňující ukazatel vlastněné více komponent.Při závěrečné shared_ptr je zničení objektu, který vlastní prostředek, prostředek je také uvolněno.
Následující příklad demonstruje použití shared_ptr sdílet hodnoty ukazatele mezi více vyrovnávací paměti zprávy.Příklad připojuje concurrency::overwrite_buffer objektu k tři concurrency::call objekty.overwrite_buffer Třída nabízí zprávy do všech svých cílů.Protože existuje více vlastníků dat na konci datové sítě, v tomto příkladu shared_ptr umožnit oběma call objekt, který chcete sdílet vlastnictví zprávy.
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
Tento příklad vytvoří následující výstup:
Viz také
Úkoly
Návod: Vytvoření aplikace založené na agentovi
Postupy: Vytvoření agenta toku dat
Návod: Vytvoření sítě pro zpracování obrázků
Koncepty
Osvědčené postupy v knihovně PPL (Parallel Patterns Library)
Obecné osvědčené postupy v Concurrency Runtime