Bloki komunikatów asynchronicznych
Biblioteka agentów udostępnia kilka typów bloków komunikatów, które umożliwiają propagowanie komunikatów między składnikami aplikacji w sposób bezpieczny wątkowo. Te typy bloków komunikatów są często używane z różnymi procedurami przekazywania komunikatów, takimi jak współbieżność::send, współbieżność::asend, współbieżność::receive i współbieżność::try_receive. Aby uzyskać więcej informacji na temat procedur przekazywania komunikatów zdefiniowanych przez bibliotekę agentów, zobacz Funkcje przekazywania komunikatów.
Sekcje
Ten temat zawiera następujące sekcje:
Źródła i obiekty docelowe
Źródła i cele są dwoma ważnymi uczestnikami przekazywania komunikatów. Źródło odwołuje się do punktu końcowego komunikacji, który wysyła komunikaty. Element docelowy odnosi się do punktu końcowego komunikacji, który odbiera komunikaty. Źródło można traktować jako punkt końcowy odczytany z elementu docelowego i jako punkt końcowy, do którego zapisujesz. Aplikacje łączą źródła i obiekty docelowe ze sobą w celu utworzenia sieci obsługi komunikatów.
Biblioteka agentów używa dwóch klas abstrakcyjnych do reprezentowania źródeł i obiektów docelowych: concurrency::ISource i concurrency::ITarget. Typy bloków komunikatów, które działają jako źródła pochodzące z ISource
; typy bloków komunikatów, które działają jako obiekty docelowe pochodzące z ITarget
. Typy bloków komunikatów, które działają jako źródła i obiekty docelowe, pochodzą zarówno z ISource
, jak i ITarget
.
[Top]
Propagacja komunikatów
Propagacja komunikatów to czynność wysyłania komunikatu z jednego składnika do innego. Gdy blok komunikatu jest oferowany, może zaakceptować, odrzucić lub odłożyć tę wiadomość. Każdy typ bloku komunikatów przechowuje i przesyła komunikaty na różne sposoby. Na przykład unbounded_buffer
klasa przechowuje nieograniczoną liczbę komunikatów, overwrite_buffer
klasa przechowuje jeden komunikat jednocześnie, a klasa transformer przechowuje zmienioną wersję każdego komunikatu. Te typy bloków komunikatów zostały szczegółowo opisane w dalszej części tego dokumentu.
Gdy blok komunikatów akceptuje komunikat, może opcjonalnie wykonać pracę, a jeśli blok komunikatów jest źródłem, przekaż wynikowy komunikat do innego elementu członkowskiego sieci. Blok komunikatów może użyć funkcji filtru, aby odrzucić komunikaty, których nie chce odbierać. Filtry zostały szczegółowo opisane w dalszej części tego tematu w sekcji Filtrowanie komunikatów. Blok komunikatu, który odłoży komunikat, może zarezerwować ten komunikat i użyć go później. Rezerwacja komunikatów została szczegółowo opisana w dalszej części tego tematu w sekcji Rezerwacja komunikatów.
Biblioteka agentów umożliwia blokom komunikatów asynchroniczne lub synchroniczne przekazywanie komunikatów. Po przekazaniu komunikatu do bloku komunikatów synchronicznie, na przykład przy użyciu send
funkcji, środowisko uruchomieniowe blokuje bieżący kontekst do momentu zaakceptowania lub odrzucenia komunikatu przez blok docelowy. Po przekazaniu komunikatu do bloku komunikatów asynchronicznie, na przykład przy użyciu asend
funkcji, środowisko uruchomieniowe wyświetla komunikat do obiektu docelowego, a jeśli obiekt docelowy akceptuje komunikat, środowisko uruchomieniowe planuje zadanie asynchroniczne, które propaguje komunikat do odbiorcy. Środowisko uruchomieniowe używa lekkich zadań do propagowania komunikatów we współpracy. Aby uzyskać więcej informacji na temat lekkich zadań, zobacz Harmonogram zadań.
Aplikacje łączą źródła i obiekty docelowe ze sobą w celu utworzenia sieci obsługi komunikatów. Zazwyczaj połączenie sieci i wywołanie send
lub asend
przekazanie danych do sieci. Aby połączyć blok komunikatów źródłowych z obiektem docelowym, wywołaj metodę concurrency::ISource::link_target . Aby odłączyć blok źródłowy od obiektu docelowego, wywołaj metodę concurrency::ISource::unlink_target . Aby odłączyć blok źródłowy od wszystkich swoich obiektów docelowych, wywołaj metodę concurrency::ISource::unlink_targets . Gdy jeden ze wstępnie zdefiniowanych typów bloków komunikatów opuszcza zakres lub jest niszczony, automatycznie rozłącza się z dowolnymi blokami docelowymi. Niektóre typy bloków komunikatów ograniczają maksymalną liczbę obiektów docelowych, do których mogą zapisywać dane. W poniższej sekcji opisano ograniczenia dotyczące wstępnie zdefiniowanych typów bloków komunikatów.
[Top]
Omówienie typów bloków komunikatów
W poniższej tabeli krótko opisano rolę ważnych typów bloków komunikatów.
unbounded_buffer
Przechowuje kolejkę komunikatów.
overwrite_buffer
Przechowuje jeden komunikat, który można zapisywać i odczytywać z wielu razy.
single_assignment
Przechowuje jeden komunikat, który można zapisać jeden raz i odczytać z wielu razy.
nazwać
Wykonuje pracę po odebraniu komunikatu.
transformator
Wykonuje pracę, gdy odbiera dane i wysyła wynik tej pracy do innego bloku docelowego. Klasa transformer
może działać na różnych typach danych wejściowych i wyjściowych.
wybór
Wybiera pierwszy dostępny komunikat z zestawu źródeł.
sprzężenia i sprzężenia wielotypowego
Poczekaj na odebranie wszystkich komunikatów z zestawu źródeł, a następnie połącz komunikaty w jeden komunikat dla innego bloku komunikatów.
minutnik
Wysyła komunikat do bloku docelowego w regularnym interwale.
Te typy bloków komunikatów mają różne cechy, które sprawiają, że są przydatne w różnych sytuacjach. Oto niektóre cechy:
Typ propagacji: określa, czy blok komunikatów działa jako źródło danych, odbiornik danych, czy oba te elementy.
Kolejność komunikatów: czy blok komunikatów zachowuje oryginalną kolejność wysyłania lub odbierania komunikatów. Każdy wstępnie zdefiniowany typ bloku komunikatów zachowuje oryginalną kolejność, w której wysyła lub odbiera komunikaty.
Liczba źródeł: maksymalna liczba źródeł, z których blok komunikatów może odczytywać dane.
Liczba obiektów docelowych: maksymalna liczba obiektów docelowych, do których może zapisywać blok komunikatów.
W poniższej tabeli pokazano, jak te cechy odnoszą się do różnych typów bloków komunikatów.
Typ bloku komunikatów | Typ propagacji (źródło, cel lub oba) | Porządkowanie komunikatów (uporządkowane lub nieurządzone) | Liczba źródeł | Liczba elementów docelowych |
---|---|---|---|---|
unbounded_buffer |
Oba | Zamówiona | Bezgraniczny | Bezgraniczny |
overwrite_buffer |
Oba | Zamówiona | Bezgraniczny | Bezgraniczny |
single_assignment |
Oba | Zamówiona | Bezgraniczny | Bezgraniczny |
call |
Obiekt docelowy | Zamówiona | Bezgraniczny | Nie dotyczy |
transformer |
Oba | Zamówiona | Bezgraniczny | 1 |
choice |
Oba | Zamówiona | 10 | 1 |
join |
Oba | Zamówiona | Bezgraniczny | 1 |
multitype_join |
Oba | Zamówiona | 10 | 1 |
timer |
Źródło | Nie dotyczy | Nie dotyczy | 1 |
W poniższych sekcjach opisano bardziej szczegółowo typy bloków komunikatów.
[Top]
Klasa unbounded_buffer
Klasa concurrency::unbounded_buffer reprezentuje asynchroniczną strukturę obsługi komunikatów ogólnego przeznaczenia. Ta klasa przechowuje najpierw kolejkę komunikatów (FIFO), które mogą być zapisywane przez wiele źródeł lub odczytywane przez wiele obiektów docelowych. Gdy obiekt docelowy odbiera komunikat z unbounded_buffer
obiektu, ten komunikat zostanie usunięty z kolejki komunikatów. W związku z tym, mimo że unbounded_buffer
obiekt może mieć wiele obiektów docelowych, każdy komunikat otrzyma tylko jeden obiekt docelowy. Klasa jest przydatna unbounded_buffer
, gdy chcesz przekazać wiele komunikatów do innego składnika, a ten składnik musi odbierać każdy komunikat.
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą unbounded_buffer
. Ten przykład wysyła trzy wartości do unbounded_buffer
obiektu, a następnie odczytuje te wartości z powrotem z tego samego obiektu.
// unbounded_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Create an unbounded_buffer object that works with
// int data.
unbounded_buffer<int> items;
// Send a few items to the unbounded_buffer object.
send(items, 33);
send(items, 44);
send(items, 55);
// Read the items from the unbounded_buffer object and print
// them to the console.
wcout << receive(items) << endl;
wcout << receive(items) << endl;
wcout << receive(items) << endl;
}
Ten przykład generuje następujące wyniki:
334455
Kompletny przykład pokazujący sposób używania unbounded_buffer
klasy, zobacz Instrukcje: Implementowanie różnych wzorców odbiorców producenta.
[Top]
Klasa overwrite_buffer
Klasa concurrency::overwrite_buffer przypomina klasę unbounded_buffer
, z tą różnicą, że overwrite_buffer
obiekt przechowuje tylko jeden komunikat. Ponadto, gdy obiekt docelowy odbiera komunikat z overwrite_buffer
obiektu, ten komunikat nie jest usuwany z buforu. W związku z tym wiele obiektów docelowych otrzymuje kopię komunikatu.
Klasa jest przydatna overwrite_buffer
, gdy chcesz przekazać wiele komunikatów do innego składnika, ale ten składnik wymaga tylko najnowszej wartości. Ta klasa jest również przydatna, gdy chcesz rozgłaszać komunikat do wielu składników.
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą overwrite_buffer
. Ten przykład wysyła trzy wartości do obiektu, a następnie odczytuje bieżącą overwrite _buffer
wartość z tego samego obiektu trzy razy. Ten przykład jest podobny do przykładu unbounded_buffer
dla klasy . overwrite_buffer
Jednak klasa przechowuje tylko jeden komunikat. Ponadto środowisko uruchomieniowe nie usuwa komunikatu z obiektu po jego odczytaniu overwrite_buffer
.
// overwrite_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Create an overwrite_buffer object that works with
// int data.
overwrite_buffer<int> item;
// Send a few items to the overwrite_buffer object.
send(item, 33);
send(item, 44);
send(item, 55);
// Read the current item from the overwrite_buffer object and print
// it to the console three times.
wcout << receive(item) << endl;
wcout << receive(item) << endl;
wcout << receive(item) << endl;
}
Ten przykład generuje następujące wyniki:
555555
Kompletny przykład pokazujący sposób używania overwrite_buffer
klasy, zobacz Instrukcje: Implementowanie różnych wzorców odbiorców producenta.
[Top]
Klasa single_assignment
Klasa concurrency::single_assignment przypomina klasę overwrite_buffer
, z tą różnicą single_assignment
, że obiekt można zapisać tylko raz. overwrite_buffer
Podobnie jak klasa, gdy obiekt docelowy odbiera komunikat z single_assignment
obiektu, ten komunikat nie jest usuwany z tego obiektu. W związku z tym wiele obiektów docelowych otrzymuje kopię komunikatu. Klasa jest przydatna single_assignment
, gdy chcesz rozgłaszać jeden komunikat do wielu składników.
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą single_assignment
. Ten przykład wysyła trzy wartości do obiektu, a następnie odczytuje bieżącą single_assignment
wartość z tego samego obiektu trzy razy. Ten przykład jest podobny do przykładu overwrite_buffer
dla klasy . Mimo że zarówno klasy, jak overwrite_buffer
i single_assignment
przechowują pojedynczy komunikat, single_assignment
klasa może być zapisywana tylko raz.
// single_assignment-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Create an single_assignment object that works with
// int data.
single_assignment<int> item;
// Send a few items to the single_assignment object.
send(item, 33);
send(item, 44);
send(item, 55);
// Read the current item from the single_assignment object and print
// it to the console three times.
wcout << receive(item) << endl;
wcout << receive(item) << endl;
wcout << receive(item) << endl;
}
Ten przykład generuje następujące wyniki:
333333
Pełny przykład pokazujący, jak używać single_assignment
klasy, zobacz Przewodnik: implementowanie przyszłości.
[Top]
Klasa wywołania
Klasa concurrency::call działa jako odbiornik komunikatów, który wykonuje funkcję pracy po odebraniu danych. Ta funkcja pracy może być wyrażeniem lambda, obiektem funkcji lub wskaźnikiem funkcji. call
Obiekt zachowuje się inaczej niż zwykłe wywołanie funkcji, ponieważ działa równolegle z innymi składnikami, które wysyłają do niego komunikaty. call
Jeśli obiekt wykonuje pracę po odebraniu komunikatu, dodaje ten komunikat do kolejki. Każdy call
obiekt przetwarza komunikaty w kolejce w kolejności, w której są odbierane.
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą call
. W tym przykładzie tworzony jest call
obiekt, który drukuje każdą wartość odbieraną w konsoli programu . Następnie przykład wysyła trzy wartości do call
obiektu. call
Ponieważ obiekt przetwarza komunikaty w osobnym wątku, w tym przykładzie użyto również zmiennej licznika i obiektu zdarzenia, aby upewnić się, że call
obiekt przetwarza wszystkie komunikaty przed wmain
zwróceniem funkcji.
// call-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// An event that is set when the call object receives all values.
event received_all;
// Counts the
long receive_count = 0L;
long max_receive_count = 3L;
// Create an call object that works with int data.
call<int> target([&received_all,&receive_count,max_receive_count](int n) {
// Print the value that the call object receives to the console.
wcout << n << endl;
// Set the event when all messages have been processed.
if (++receive_count == max_receive_count)
received_all.set();
});
// Send a few items to the call object.
send(target, 33);
send(target, 44);
send(target, 55);
// Wait for the call object to process all items.
received_all.wait();
}
Ten przykład generuje następujące wyniki:
334455
Pełny przykład pokazujący, jak używać call
klasy, zobacz How to: Provide Work Functions to the call and transformer Classes (Instrukcje: zapewnianie funkcji roboczych dla wywołań i klas przekształcania).
[Top]
Klasa transformatora
Klasa concurrency::transformer działa zarówno jako odbiornik komunikatu, jak i jako nadawca komunikatów. Klasa transformer
przypomina klasę call
, ponieważ wykonuje funkcję pracy zdefiniowaną przez użytkownika, gdy odbiera dane. transformer
Jednak klasa wysyła również wynik funkcji pracy do obiektów odbiornika. call
Podobnie jak obiekt, transformer
obiekt działa równolegle do innych składników, które wysyłają do niego komunikaty. transformer
Jeśli obiekt wykonuje pracę po odebraniu komunikatu, dodaje ten komunikat do kolejki. Każdy transformer
obiekt przetwarza komunikaty w kolejce w kolejności, w jakiej są odbierane.
Klasa transformer
wysyła komunikat do jednego miejsca docelowego. Jeśli ustawisz _PTarget
parametr w konstruktorze na NULL
wartość , możesz później określić element docelowy, wywołując metodę concurrency::link_target .
W przeciwieństwie do wszystkich innych asynchronicznych typów bloków komunikatów, które są udostępniane przez bibliotekę agentów, transformer
klasa może działać na różnych typach danych wejściowych i wyjściowych. Ta możliwość przekształcania danych z jednego typu na inny sprawia, że klasa jest transformer
kluczowym składnikiem w wielu współbieżnych sieciach. Ponadto można dodać bardziej szczegółowe funkcje równoległe w funkcji transformer
pracy obiektu.
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą transformer
. W tym przykładzie tworzony jest transformer
obiekt, który tworzy wielokrotność każdej wartości wejściowej int
o 0,33 w celu wygenerowania double
wartości jako danych wyjściowych. Następnie przykład odbiera przekształcone wartości z tego samego transformer
obiektu i drukuje je w konsoli.
// transformer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Create an transformer object that receives int data and
// sends double data.
transformer<int, double> third([](int n) {
// Return one-third of the input value.
return n * 0.33;
});
// Send a few items to the transformer object.
send(third, 33);
send(third, 44);
send(third, 55);
// Read the processed items from the transformer object and print
// them to the console.
wcout << receive(third) << endl;
wcout << receive(third) << endl;
wcout << receive(third) << endl;
}
Ten przykład generuje następujące wyniki:
10.8914.5218.15
Pełny przykład pokazujący, jak używać transformer
klasy, zobacz How to: Use transformer in a Data Pipeline (Jak używać funkcji przekształcania w potoku danych).
[Top]
Klasa wyboru
Klasa concurrency::choice wybiera pierwszy dostępny komunikat z zestawu źródeł. Klasa choice
reprezentuje mechanizm przepływu sterowania zamiast mechanizmu przepływu danych (w temacie Asynchronous Agents Library opisano różnice między przepływem danych i przepływem sterowania).
Odczyt z wybranego obiektu przypomina wywoływanie funkcji WaitForMultipleObjects
interfejsu API systemu Windows, gdy ma bWaitAll
parametr ustawiony na FALSE
. choice
Jednak klasa wiąże dane z samym zdarzeniem zamiast z obiektem synchronizacji zewnętrznej.
Zazwyczaj używasz choice
klasy razem z funkcją concurrency::receive , aby sterować przepływem sterowania w aplikacji. choice
Użyj klasy , jeśli musisz wybrać jeden z komunikatów, które mają różne typy. single_assignment
Użyj klasy , jeśli musisz wybrać jeden z komunikatów, które mają ten sam typ.
Kolejność łączenia źródeł z obiektem choice
jest ważna, ponieważ może określić, który komunikat jest zaznaczony. Rozważmy na przykład przypadek łączenia wielu komunikatów, które już zawierają komunikat do choice
obiektu. Obiekt choice
wybiera komunikat z pierwszego źródła, z którego jest połączony. Po łączeniu wszystkich źródeł choice
obiekt zachowuje kolejność, w której każde źródło odbiera komunikat.
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą choice
. W tym przykładzie użyto funkcji concurrency::make_choice , aby utworzyć choice
obiekt, który wybiera jeden z trzech bloków komunikatów. Następnie przykład oblicza różne liczby Fibonacciego i przechowuje każdy wynik w innym bloku komunikatów. Następnie przykład wyświetla w konsoli komunikat oparty na operacji, która zakończyła się jako pierwsza.
// choice-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
if (n < 2)
return n;
return fibonacci(n-1) + fibonacci(n-2);
}
int wmain()
{
// Although the following thee message blocks are written to one time only,
// this example illustrates the fact that the choice class works with
// different message block types.
// Holds the 35th Fibonacci number.
single_assignment<int> fib35;
// Holds the 37th Fibonacci number.
overwrite_buffer<int> fib37;
// Holds half of the 42nd Fibonacci number.
unbounded_buffer<double> half_of_fib42;
// Create a choice object that selects the first single_assignment
// object that receives a value.
auto select_one = make_choice(&fib35, &fib37, &half_of_fib42);
// Execute a few lengthy operations in parallel. Each operation sends its
// result to one of the single_assignment objects.
parallel_invoke(
[&fib35] { send(fib35, fibonacci(35)); },
[&fib37] { send(fib37, fibonacci(37)); },
[&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
);
// Print a message that is based on the operation that finished first.
switch (receive(select_one))
{
case 0:
wcout << L"fib35 received its value first. Result = "
<< receive(fib35) << endl;
break;
case 1:
wcout << L"fib37 received its value first. Result = "
<< receive(fib37) << endl;
break;
case 2:
wcout << L"half_of_fib42 received its value first. Result = "
<< receive(half_of_fib42) << endl;
break;
default:
wcout << L"Unexpected." << endl;
break;
}
}
W tym przykładzie są generowane następujące przykładowe dane wyjściowe:
fib35 received its value first. Result = 9227465
Ponieważ zadanie, które oblicza liczbę 35Fibonacciego , nie ma gwarancji, że zostanie ukończone jako pierwsze, dane wyjściowe tego przykładu mogą się różnić.
W tym przykładzie użyto algorytmu concurrency::p arallel_invoke w celu równoległego obliczenia liczb Fibonacciego. Aby uzyskać więcej informacji na temat parallel_invoke
programu , zobacz Parallel Algorithms (Algorytmy równoległe).
Pełny przykład pokazujący, jak używać choice
klasy, zobacz Instrukcje: Wybieranie między ukończonymi zadaniami.
[Top]
sprzężenia i multitype_join Klasy
Klasy concurrency::join i concurrency::multitype_join umożliwiają oczekiwanie na każdy element członkowski zestawu źródeł w celu odebrania komunikatu. Klasa join
działa na obiektach źródłowych, które mają wspólny typ komunikatu. Klasa multitype_join
działa na obiektach źródłowych, które mogą mieć różne typy komunikatów.
Odczyt z join
obiektu lub multitype_join
przypomina wywoływanie funkcji WaitForMultipleObjects
interfejsu API systemu Windows, gdy ma bWaitAll
parametr ustawiony na TRUE
. Jednak podobnie jak choice
obiekt join
i multitype_join
obiekty używają mechanizmu zdarzeń, który wiąże dane z samym zdarzeniem zamiast z obiektem synchronizacji zewnętrznej.
Odczyt z join
obiektu tworzy obiekt std::vector . Odczyt z multitype_join
obiektu tworzy obiekt std::tuple . Elementy są wyświetlane w tych obiektach w takiej samej kolejności, jak odpowiadające im źródłowe są połączone z obiektem join
lub multitype_join
. Ponieważ kolejność łączenia źródłowych z obiektem join
lub multitype_join
jest skojarzona z kolejnością elementów w wynikowym vector
lub tuple
obiekcie, zalecamy, aby nie odłączyć istniejącego buforu źródłowego ze sprzężenia. Może to spowodować nieokreślone zachowanie.
Złączenia zachłanne vs. niezachłanne
Klasy join
i multitype_join
wspierają koncepcję chciwości i niechłannych sprzężeń. Chciwy sprzężenia akceptuje komunikat z każdego ze swoich źródeł, ponieważ komunikaty staną się dostępne do momentu udostępnienia wszystkich wiadomości. Niechętne sprzężenia odbiera komunikaty w dwóch fazach. Po pierwsze, niechętne sprzężenia czeka, aż zostanie zaproponowany komunikat z każdego ze swoich źródeł. Po drugie, po udostępnieniu wszystkich komunikatów źródłowych niechłanne sprzężenie próbuje zarezerwować każdy z tych komunikatów. Jeśli może zarezerwować każdy komunikat, zużywa wszystkie komunikaty i propaguje je do celu. W przeciwnym razie zwalnia lub anuluje rezerwacje komunikatów i ponownie czeka na odebranie komunikatu przez każde źródło.
Chciwe sprzężenia działają lepiej niż niechłanne sprzężenia, ponieważ akceptują wiadomości natychmiast. Jednak w rzadkich przypadkach chciwość sprzężeń może prowadzić do zakleszczeń. Użyj sprzężenia niechłannego, gdy masz wiele sprzężeń, które zawierają co najmniej jeden udostępniony obiekt źródłowy.
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą join
. W tym przykładzie użyto funkcji concurrency::make_join w celu utworzenia obiektu odbieranego join
z trzech single_assignment
obiektów. Ten przykład oblicza różne liczby Fibonacciego, przechowuje każdy wynik w innym single_assignment
obiekcie, a następnie drukuje do konsoli każdy wynik przechowywany przez join
obiekt. Ten przykład jest podobny do przykładu choice
dla klasy, z tą różnicą, że join
klasa czeka, aż wszystkie bloki komunikatów źródłowych otrzymają komunikat.
// join-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
if (n < 2)
return n;
return fibonacci(n-1) + fibonacci(n-2);
}
int wmain()
{
// Holds the 35th Fibonacci number.
single_assignment<int> fib35;
// Holds the 37th Fibonacci number.
single_assignment<int> fib37;
// Holds half of the 42nd Fibonacci number.
single_assignment<double> half_of_fib42;
// Create a join object that selects the values from each of the
// single_assignment objects.
auto join_all = make_join(&fib35, &fib37, &half_of_fib42);
// Execute a few lengthy operations in parallel. Each operation sends its
// result to one of the single_assignment objects.
parallel_invoke(
[&fib35] { send(fib35, fibonacci(35)); },
[&fib37] { send(fib37, fibonacci(37)); },
[&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
);
auto result = receive(join_all);
wcout << L"fib35 = " << get<0>(result) << endl;
wcout << L"fib37 = " << get<1>(result) << endl;
wcout << L"half_of_fib42 = " << get<2>(result) << endl;
}
Ten przykład generuje następujące wyniki:
fib35 = 9227465fib37 = 24157817half_of_fib42 = 1.33957e+008
W tym przykładzie użyto algorytmu concurrency::p arallel_invoke w celu równoległego obliczenia liczb Fibonacciego. Aby uzyskać więcej informacji na temat parallel_invoke
programu , zobacz Parallel Algorithms (Algorytmy równoległe).
Aby zapoznać się z kompletnymi przykładami pokazującymi, jak używać join
klasy, zobacz Instrukcje: Wybieranie między ukończonymi zadaniami i Przewodnik: Używanie sprzężenia w celu zapobiegania zakleszczeniom.
[Top]
Klasa czasomierza
Klasa concurrency::timer działa jako źródło komunikatów. Obiekt timer
wysyła komunikat do obiektu docelowego po upływie określonego czasu. Klasa jest przydatna timer
, gdy musisz opóźnić wysyłanie komunikatu lub wysłać wiadomość w regularnych odstępach czasu.
Klasa timer
wysyła komunikat tylko do jednego elementu docelowego. Jeśli ustawisz _PTarget
parametr w konstruktorze na NULL
wartość , możesz później określić obiekt docelowy, wywołując metodę concurrency::ISource::link_target .
Obiekt timer
może być powtarzany lub nie powtarzany. Aby utworzyć powtarzający się czasomierz, przekaż true
parametr _Repeating
podczas wywoływania konstruktora. W przeciwnym razie przekaż false
parametr , _Repeating
aby utworzyć czasomierz bez powtarzania. Jeśli czasomierz powtarza się, wysyła ten sam komunikat do obiektu docelowego po każdym interwale.
Biblioteka agentów tworzy timer
obiekty w stanie niestartym. Aby uruchomić obiekt czasomierza, wywołaj metodę concurrency::timer::start . Aby zatrzymać obiekt, należy zniszczyć obiekt lub wywołać metodę timer
concurrency::timer::stop . Aby wstrzymać powtarzający się czasomierz, wywołaj metodę concurrency::timer::p ause .
Przykład
W poniższym przykładzie przedstawiono podstawową strukturę pracy z klasą timer
. W przykładzie użyto obiektów timer
i do call
raportowania postępu długotrwałej operacji.
// timer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
if (n < 2)
return n;
return fibonacci(n-1) + fibonacci(n-2);
}
int wmain()
{
// Create a call object that prints characters that it receives
// to the console.
call<wchar_t> print_character([](wchar_t c) {
wcout << c;
});
// Create a timer object that sends the period (.) character to
// the call object every 100 milliseconds.
timer<wchar_t> progress_timer(100u, L'.', &print_character, true);
// Start the timer.
wcout << L"Computing fib(42)";
progress_timer.start();
// Compute the 42nd Fibonacci number.
int fib42 = fibonacci(42);
// Stop the timer and print the result.
progress_timer.stop();
wcout << endl << L"result is " << fib42 << endl;
}
W tym przykładzie są generowane następujące przykładowe dane wyjściowe:
Computing fib(42)..................................................result is 267914296
Pełny przykład pokazujący, jak używać timer
klasy, zobacz Instrukcje: Wysyłanie komunikatu w regularnym interwale.
[Top]
Filtrowanie komunikatów
Podczas tworzenia obiektu bloku komunikatów można podać funkcję filtru, która określa, czy blok komunikatów akceptuje lub odrzuca komunikat. Funkcja filtru to przydatny sposób zagwarantowania, że blok komunikatów odbiera tylko określone wartości.
W poniższym przykładzie pokazano, jak utworzyć unbounded_buffer
obiekt, który używa funkcji filtru do akceptowania tylko liczb parzysowych. Obiekt unbounded_buffer
odrzuca nieparzyste liczby i dlatego nie propaguje nieparzysty liczb do bloków docelowych.
// filter-function.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Create an unbounded_buffer object that uses a filter
// function to accept only even numbers.
unbounded_buffer<int> accept_evens(
[](int n) {
return (n%2) == 0;
});
// Send a few values to the unbounded_buffer object.
unsigned int accept_count = 0;
for (int i = 0; i < 10; ++i)
{
// The asend function returns true only if the target
// accepts the message. This enables us to determine
// how many elements are stored in the unbounded_buffer
// object.
if (asend(accept_evens, i))
{
++accept_count;
}
}
// Print to the console each value that is stored in the
// unbounded_buffer object. The unbounded_buffer object should
// contain only even numbers.
while (accept_count > 0)
{
wcout << receive(accept_evens) << L' ';
--accept_count;
}
}
Ten przykład generuje następujące wyniki:
0 2 4 6 8
Funkcja filtru może być funkcją lambda, wskaźnikiem funkcji lub obiektem funkcji. Każda funkcja filter przyjmuje jedną z następujących formularzy.
bool (T)
bool (T const &)
Aby wyeliminować niepotrzebne kopiowanie danych, użyj drugiego formularza, gdy masz typ agregacji, który jest propagowany przez wartość.
Filtrowanie komunikatów obsługuje model programowania przepływu danych, w którym składniki wykonują obliczenia podczas odbierania danych. Aby zapoznać się z przykładami korzystającymi z funkcji filtrowania w celu kontrolowania przepływu danych w sieci przekazującej komunikaty, zobacz Instrukcje: używanie filtru bloku komunikatów, Przewodnik: tworzenie agenta przepływu danych i przewodnik: tworzenie sieci przetwarzania obrazów.
[Top]
Rezerwacja komunikatów
Rezerwacja komunikatów umożliwia zablokowanie komunikatu w celu zarezerwowania komunikatu do późniejszego użycia. Zazwyczaj rezerwacja komunikatów nie jest używana bezpośrednio. Jednak zrozumienie rezerwacji komunikatów może pomóc lepiej zrozumieć zachowanie niektórych wstępnie zdefiniowanych typów bloków komunikatów.
Rozważ niechętne i chciwe sprzężenia. Obie te elementy używają rezerwacji komunikatów do zarezerwowania komunikatów do późniejszego użycia. Opisane wcześniej sprzężenia niechłanne odbiera komunikaty w dwóch fazach. W pierwszej fazie obiekt niechłanny join
czeka, aż każdy z jego źródeł otrzyma komunikat. Niechętne sprzężenie próbuje zarezerwować każdą z tych wiadomości. Jeśli może zarezerwować każdy komunikat, zużywa wszystkie komunikaty i propaguje je do celu. W przeciwnym razie zwalnia lub anuluje rezerwacje komunikatów i ponownie czeka na odebranie komunikatu przez każde źródło.
Chciwe sprzężenie, które odczytuje również komunikaty wejściowe z wielu źródeł, używa rezerwacji komunikatów do odczytywania dodatkowych komunikatów podczas oczekiwania na odbieranie komunikatu z każdego źródła. Rozważmy na przykład chciwy sprzężenie, które odbiera komunikaty z bloków komunikatów A
i B
. Jeśli chciwy sprzężenie odbiera dwa komunikaty z B, ale nie otrzymał jeszcze komunikatu z A
, chciwy sprzężenie zapisuje unikatowy identyfikator komunikatu dla drugiego komunikatu z B
. Gdy sprzężenie chciwości odbierze komunikat i A
rozpropaguje te komunikaty, użyje zapisanego identyfikatora komunikatu, aby sprawdzić, czy drugi komunikat z B
jest nadal dostępny.
Rezerwację komunikatów można używać podczas implementowania własnych niestandardowych typów bloków komunikatów. Aby zapoznać się z przykładem tworzenia niestandardowego typu bloku komunikatów, zobacz Przewodnik: tworzenie niestandardowego bloku komunikatów.
[Top]