Aracılığıyla paylaş


Veri Akışı (Görev Paralel Kütüphanesi)

Görev Paralel Kitaplığı (TPL), eşzamanlılık özellikli uygulamaların sağlamlığını artırmaya yardımcı olmak için veri akışı bileşenleri sağlar. Bu veri akışı bileşenleri topluca TPL Veri Akışı Kitaplığı olarak adlandırılır. Bu veri akışı modeli, kaba ayrıntılı veri akışı ve kanal oluşturma görevleri için işlem içi ileti geçişi sağlayarak aktör tabanlı programlamayı teşvik eder. Veri akışı bileşenleri, TPL'nin türleri ve zamanlama altyapısını temel alır ve zaman uyumsuz programlama için C#, Visual Basic ve F# dil desteğiyle tümleşir. Bu veri akışı bileşenleri, birbiriyle zaman uyumsuz olarak iletişim kurması gereken birden çok işleminiz olduğunda veya kullanılabilir olduğunda verileri işlemek istediğinizde kullanışlıdır. Örneğin, bir web kamerasından görüntü verilerini işleyen bir uygulama düşünün. Uygulama, veri akışı modelini kullanarak görüntü çerçevelerini kullanılabilir hale geldikçe işleyebilir. Uygulama, örneğin ışık düzeltmesi veya kırmızı göz azaltma gerçekleştirerek görüntü çerçevelerini geliştirirse, veri akışı bileşenlerinden oluşan bir işlem hattı oluşturabilirsiniz. İşlem hattının her aşaması, görüntüyü dönüştürmek için TPL tarafından sağlanan işlevsellik gibi daha ayrıntılı paralellik işlevselliği kullanabilir.

Bu belge, TPL Veri Akışı Kitaplığı'na genel bir bakış sağlar. Programlama modelini, önceden tanımlanmış veri akışı bloğu türlerini ve veri akışı bloklarını uygulamalarınızın belirli gereksinimlerini karşılayacak şekilde yapılandırmayı açıklar.

Uyarı

TPL Veri Akışı Kitaplığı (System.Threading.Tasks.Dataflow ad alanı) .NET ile dağıtılmaz. Visual Studio'da System.Threading.Tasks.Dataflow ad alanını yüklemek için projenizi açın, Project menüsünden NuGet Paketlerini Yönet seçin ve System.Threading.Tasks.Dataflow paketini çevrimiçi olarak arayın. Alternatif olarak, .NET Core CLI kullanarakyüklemek için dotnet add package System.Threading.Tasks.Dataflowçalıştırın.

Programlama Modeli

TPL Veri Akışı Kitaplığı, yüksek aktarım hızına ve düşük gecikme süresine sahip yoğun CPU kullanan ve G/Ç kullanan uygulamaları paralel hale getirme ve ileti geçirme için bir temel sağlar. Ayrıca verilerin nasıl arabelleğe alınıp sistemde nasıl hareket ettiği üzerinde açık bir denetim sağlar. Veri akışı programlama modelini daha iyi anlamak için zaman uyumsuz olarak diskten görüntü yükleyen ve bu görüntülerin bir bileşimini oluşturan bir uygulama düşünün. Geleneksel programlama modelleri genellikle görevleri koordine etmek ve paylaşılan verilere erişmek için geri çağırmaları ve kilitler gibi eşitleme nesnelerini kullanmanızı gerektirir. Veri akışı programlama modelini kullanarak, diskten okunan görüntüleri işleyen veri akışı nesneleri oluşturabilirsiniz. Veri akışı modeli altında, kullanılabilir olduğunda verilerin nasıl işleneceğini ve ayrıca veriler arasındaki bağımlılıkları bildirirsiniz. Çalışma zamanı veriler arasındaki bağımlılıkları yönettiğinden, paylaşılan verilere erişimi eşitleme gereksinimini genellikle önleyebilirsiniz. Buna ek olarak, çalışma zamanlayıcısı zaman uyumsuz veri gelişine göre çalışmayı planladığı için, veri akışı temel iş parçacıklarını verimli şekilde yöneterek yanıt hızını ve aktarım hızını artırabilir. Windows Forms uygulamasında görüntü işleme uygulamak için veri akışı programlama modelini kullanan bir örnek için bkz . İzlenecek Yol: Windows Forms Uygulamasında Veri Akışını Kullanma.

Kaynaklar ve Hedefler

TPL Veri Akışı Kitaplığı, verileri arabelleğe alan ve işleyen veri yapıları olan veri akışı bloklarından oluşur. TPL üç tür veri akışı bloğu tanımlar: kaynak bloklar, hedef bloklar ve yayıcı bloklar. Kaynak blok, veri kaynağı işlevi görür ve kaynağından okunabilir. Hedef blok, verilerin alıcısı olarak görev yapar ve hedef bloka yazılabilir. Bir yayıcı bloğu hem kaynak blok hem de hedef blok işlevi görür ve bu bloktan okunabilir ve bu bloka yazılabilir. TPL, kaynakları temsil eden, System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> hedefleri temsil eden ve System.Threading.Tasks.Dataflow.ITargetBlock<TInput> yayıcıları temsil eden arabirimi tanımlarSystem.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput>. IPropagatorBlock<TInput,TOutput> hem ISourceBlock<TOutput> hem de ITargetBlock<TInput> öğesinden devralır.

TPL Veri Akışı Kitaplığı, , ISourceBlock<TOutput>ve ITargetBlock<TInput> arabirimlerini uygulayan IPropagatorBlock<TInput,TOutput>önceden tanımlanmış birkaç veri akışı bloğu türü sağlar. Bu veri akışı bloğu türleri, bu belgede Önceden Tanımlanmış Veri Akışı Blok Türleri bölümünde açıklanmıştır.

Bağlantı Blokları

Veri akışı bloklarını, veri akışı bloklarının doğrusal dizileri olan işlem hatları oluşturmak için veya veri akışı bloklarının grafikleri olan ağlara bağlayabilirsiniz. İşlem hattı bir ağ biçimidir. Bir işlem hattında veya ağda, veriler kullanılabilir hale geldikçe kaynaklar verileri eşzamansız olarak hedeflere yayar. yöntemi bir ISourceBlock<TOutput>.LinkTo kaynak veri akışı bloğunu hedef bloğuna bağlar. Bir kaynak sıfır veya daha fazla hedefe bağlanabilir; hedefler sıfırdan veya daha fazla kaynaktan bağlanabilir. Bir işlem hattına veya ağa eşzamanlı olarak veri akışı blokları ekleyebilir veya kaldırabilirsiniz. Önceden tanımlı veri akışı blok türleri, bağlantı kurmanın ve bağlantıyı kaldırmanın tüm iş parçacığı güvenliği yönlerini ele alır.

Temel bir işlem hattı oluşturmak için veri akışı bloklarını bağlayan bir örnek için bkz . İzlenecek Yol: Veri Akışı İşlem Hattı Oluşturma. Daha karmaşık bir ağ oluşturmak için veri akışı bloklarını bağlayan bir örnek için bkz . İzlenecek Yol: Windows Forms Uygulamasında Veri Akışını Kullanma. Bir kaynağın bir hedefe mesaj sunduktan sonra bağlantıyı kaldıran bir örnek için bkz Nasıl yapılır: Veri Akışı Bloklarının Bağlantısını Kaldırma.

Filtreleme

Bir kaynağı hedefe bağlamak için yöntemini çağırdığınızda ISourceBlock<TOutput>.LinkTo , hedef bloğun bu iletinin değerine göre bir iletiyi kabul edip etmediğini veya reddedeceğini belirleyen bir temsilci sağlayabilirsiniz. Bu filtreleme mekanizması, veri akışı bloğunun yalnızca belirli değerleri almasını garanti etmenin kullanışlı bir yoludur. Önceden tanımlanmış veri akışı bloğu türlerinin çoğu için, bir kaynak blok birden çok hedef bloğuna bağlıysa, hedef blok bir iletiyi reddettiğinde, kaynak bu iletiyi bir sonraki hedefe sunar. Bir kaynağın hedeflere ileti gönderme sırası kaynak tarafından tanımlanır ve kaynağın türüne göre farklılık gösterebilir. Kaynak bloğu türlerinin çoğu, bir hedef bu iletiyi kabul ettikten sonra ileti sunmayı durdurur. Bu kuralın BroadcastBlock<T> bir istisnası, bazı hedefler iletiyi reddetse bile her iletiyi tüm hedeflere sunan sınıfıdır. Yalnızca belirli iletileri işlemek için filtreleme kullanan bir örnek için bkz . İzlenecek Yol: Windows Forms Uygulamasında Veri Akışını Kullanma.

Önemli

Önceden tanımlanmış her kaynak veri akışı blok türü, iletilerin alındıkları sırayla yayılmasını garanti ettiğinden, kaynak bloğun sonraki iletiyi işleyebilmesi için her iletinin kaynak bloktan okunması gerekir. Bu nedenle, bir kaynağa birden çok hedefi bağlamak için filtreleme kullandığınızda, her iletiyi en az bir hedef bloğun aldığından emin olun. Aksi takdirde uygulamanız kilitlenmeye neden olabilir.

İleti Geçirme

Veri akışı programlama modeli, bir programın bağımsız bileşenlerinin ileti göndererek birbirleriyle iletişim kurduğu ileti geçirme kavramıyla ilgilidir. İletileri uygulama bileşenleri arasında yayma yollarından biri, hedef veri akışı bloklarına ileti göndermek için (zaman uyumlu) ve Post (zaman uyumsuz) yöntemlerini, SendAsynckaynak bloklardan ileti almak için ise , Receiveve ReceiveAsync yöntemlerini çağırmaktır TryReceive . Giriş verilerini baş düğüme (hedef blok) göndererek ve işlem hattının terminal düğümünden veya ağın terminal düğümlerinden (bir veya daha fazla kaynak blok) çıkış verileri alarak bu yöntemleri veri akışı işlem hatları veya ağlarla birleştirebilirsiniz. Ayrıca, Choose yöntemini, sağlanan kaynakların ilki olan veri içeren bir kaynaktan okumak ve bu veri üzerinde işlem yapmak için de kullanabilirsiniz.

Kaynak bloklar yöntemini çağırarak ITargetBlock<TInput>.OfferMessage hedef bloklara veri sunar. Hedef blok, sunulan bir iletiye şu üç yoldan biriyle yanıt verir: iletiyi kabul edebilir, iletiyi reddedebilir veya iletiyi erteleyebilir. Hedef, iletiyi kabul ettiğinde OfferMessage yöntemi Accepted döndürür. Hedef iletiyi reddettiyse yöntemi OfferMessage döndürür Declined. Hedef artık kaynaktan herhangi bir ileti almamasını gerektirdiğinde, OfferMessageDecliningPermanently döner. Önceden tanımlanmış kaynak bloğu türleri, böyle bir dönüş değeri alındıktan sonra bağlı hedeflere ileti sunmaz ve bu tür hedeflerle bağlantıyı otomatik olarak kaldırır.

Hedef blok iletiyi daha sonra kullanmak üzere ertelediğinde OfferMessage yöntemi döndürür Postponed. Bir iletiyi erteleyen bir hedef blok, daha sonra sunulan iletiyi ayırmaya çalışmak için ISourceBlock<TOutput>.ReserveMessage yöntemini çağırabilir. Bu noktada ileti hala kullanılabilir durumdadır ve hedef blok tarafından kullanılabilir veya ileti başka bir hedef tarafından alınmıştır. Hedef blok daha sonra iletiyi gerektirdiğinde ISourceBlock<TOutput>.ConsumeMessage yöntemini veya artık iletiye ihtiyaç duymadığında ReleaseReservation yöntemini sırasıyla çağırır. İleti ayırtma genellikle açgözlü olmayan modda çalışan veri akışı bloğu türleri tarafından kullanılır. Temkinli mod, bu belgenin ilerleyen bölümlerinde açıklanmıştır. Hedef blok, ertelenen bir mesajı ayırmak yerine, ertelenen mesajı doğrudan tüketmeyi denemek için ISourceBlock<TOutput>.ConsumeMessage yöntemini kullanabilir.

Veri Akışı Bloğu Tamamlama

Veri akışı blokları tamamlama kavramını da destekler. Tamamlanmış durumdaki bir veri akışı bloğu başka bir çalışma gerçekleştirmez. Her veri akışı bloğunun, bloğun tamamlanma durumunu temsil eden, System.Threading.Tasks.Task olarak bilinen ilişkili bir nesnesi vardır. Bir Task nesnenin tamamlanmasını bekleyebildiğiniz için, tamamlama görevlerini kullanarak veri akışı ağının bir veya daha fazla terminal düğümünün bitmesini bekleyebilirsiniz. IDataflowBlock Arabirim, veri akışı bloğunun tamamlanması isteğini bildiren Complete yöntemini ve veri akışı bloğunun tamamlanma görevini döndüren Completion özelliğini tanımlar. ISourceBlock<TOutput> ve ITargetBlock<TInput>, IDataflowBlock arabirimini devralır.

Veri akışı bloğunun hatasız tamamlanıp tamamlanmadığını, bir veya daha fazla hatayla karşılaşıldığını veya iptal edilip edilmediğini belirlemenin iki yolu vardır. İlk yol, bir Task.Waittry- bloğundaki tamamlama görevinde catch yöntemini çağırmaktır (Try-Catch Visual Basic'te). Aşağıdaki örnek, giriş değeri sıfırdan küçükse ActionBlock<TInput> fırlatan bir ArgumentOutOfRangeException nesnesi oluşturur. AggregateException, bu örnekte tamamlama görevinde Wait çağrıldığında oluşturulur. ArgumentOutOfRangeException öğesine nesnesinin InnerExceptionsAggregateException özelliği aracılığıyla erişilir.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Bu örnekte, bir yürütme veri akışı bloğu temsilcisinde özel durumun işlenmemiş olması durumu gösterilmektedir. Bu tür blokların gövdelerindeki istisnai durumları işlemenizi öneririz. Ancak, bunu yapamıyorsanız, blok iptal edilmiş gibi davranır ve gelen iletileri işlemez.

Bir veri akışı bloğu açıkça iptal edildiğinde, AggregateException nesnesi OperationCanceledException özelliğinde InnerExceptions içerir. Veri akışı iptali hakkında daha fazla bilgi için İptali Etkinleştirme bölümüne bakın.

Veri akışı bloğunun tamamlanma durumunu belirlemenin ikinci yolu, tamamlanma görevinin devamını kullanmak veya zaman uyumsuz olarak tamamlanma görevini beklemek için C# ve Visual Basic'in zaman uyumsuz dil özelliklerini kullanmaktır. yöntemine Task.ContinueWith sağladığınız temsilci, öncül görevi temsil eden bir Task nesne alır. Completion özelliği için, devam ettirme temsilcisi tamamlama görevini kendisi alır. Aşağıdaki örnek öncekine benzer, ancak ContinueWith yöntemini kullanarak genel veri akışı işleminin durumunu yazdıran bir devamlılık görevi oluşturur.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Veri akışı bloğunun tamamlanma durumu hakkında ek bilgi belirlemek için devamlılık görevinin gövdesindeki gibi IsCanceled özellikleri de kullanabilirsiniz. Devamlılık görevleri ve bunların iptal ve hata işlemeyle ilişkisi hakkında daha fazla bilgi için bkz. Devamlılık Görevlerini Kullanarak Görevleri Zincirleme, Görev İptali ve Özel Durum İşleme.

Önceden Tanımlanmış Veri Akışı Blok Türleri

TPL Veri Akışı Kitaplığı önceden tanımlanmış birkaç veri akışı bloğu türü sağlar. Bu türler üç kategoriye ayrılır: arabelleğe alma blokları, yürütme blokları ve gruplandırma blokları. Aşağıdaki bölümlerde bu kategorileri oluşturan blok türleri açıklanmaktadır.

Tamponlama Blokları

Önbellek blokları, verileri veri tüketicileri tarafından kullanılmak üzere tutar. TPL Veri Akışı Kitaplığı üç arabelleğe alma bloğu türü sağlar: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>ve System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

Genel amaçlı zaman uyumsuz mesajlaşma yapısını temsil eden BufferBlock<T> sınıfı. Bu sınıf, birden çok kaynağın yazabileceği veya birden çok hedefin okuyabileceği iletilerin ilk giren, ilk çıkan (FIFO) kuyruğunu depolar. Hedef bir nesneden BufferBlock<T> ileti aldığında, bu ileti ileti kuyruğundan kaldırılır. Bu nedenle, bir BufferBlock<T> nesnenin birden çok hedefi olsa da, her iletiyi yalnızca bir hedef alır. Sınıfı BufferBlock<T> , birden çok iletiyi başka bir bileşene geçirmek istediğinizde ve bu bileşenin her iletiyi alması gerektiğinde kullanışlıdır.

Aşağıdaki temel örnek, bir Int32 nesneye birkaç BufferBlock<T> değer gönderir ve sonra bu değerleri bu nesneden geri okur.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Bir nesneye ileti yazmayı ve nesneden iletileri okumayı gösteren eksiksiz bir BufferBlock<T> örnek için bkz . Nasıl yapılır: Veri Akışı Bloğuna İleti Yazma ve İletileri Okuma.

BroadcastBlok<T>

Sınıfı BroadcastBlock<T> , birden çok iletiyi başka bir bileşene geçirmeniz gerektiğinde yararlıdır, ancak bu bileşen yalnızca en son değere ihtiyaç duyar. Bu sınıf, bir iletiyi birden çok bileşene yayınlamak istediğinizde de yararlıdır.

Aşağıdaki temel örnek bir Double nesneye bir BroadcastBlock<T> değer gönderir ve sonra bu değeri bu nesneden birkaç kez okur. Değerler okunduktan sonra BroadcastBlock<T> nesnelerden kaldırılmadığından, aynı değer her seferinde mevcuttur.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

İletiyi birden çok hedef bloğuna yayınlamak için nasıl kullanılacağını BroadcastBlock<T> gösteren eksiksiz bir örnek için bkz . Nasıl yapılır: Veri Akışı Bloğunda Görev Zamanlayıcı Belirtme.

WriteOnceBlock<T>

WriteOnceBlock<T> sınıfı, yalnızca bir kez yazılabilen bir BroadcastBlock<T> nesnesi dışında, WriteOnceBlock<T> sınıfına benzer. C# WriteOnceBlock<T> (Visual Basic'te ReadOnly) anahtar sözcüğüne benzer olduğunu düşünebilirsiniz, ancak nesne WriteOnceBlock<T> oluşturma sırasında değil değer aldıktan sonra sabit hale gelir. BroadcastBlock<T> Sınıfı gibi, bir hedef bir nesneden WriteOnceBlock<T> ileti aldığında, bu ileti bu nesneden kaldırılmaz. Bu nedenle, birden çok hedef iletinin bir kopyasını alır. WriteOnceBlock<T> sınıfı, birden çok iletinin yalnızca ilkini yaymak istediğinizde kullanışlıdır.

Aşağıdaki temel örnek, bir String nesneye birden çok WriteOnceBlock<T> değer gönderir ve ardından değeri bu nesneden geri okur. Bir nesne yalnızca bir WriteOnceBlock<T> kez yazılabilir çünkü bir WriteOnceBlock<T> nesne bir ileti aldıktan sonra sonraki iletileri atar.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

İlk tamamlanan işlemin değerini almak için WriteOnceBlock<T> kullanmayı gösteren tam bir örnek için bkz Nasıl Yapılır: Veri Akışı Bloklarının Bağlantısını Kaldırma.

Yürütme Blokları

Yürütme blokları, alınan her veri parçası için kullanıcı tarafından sağlanan bir temsilci çağırır. TPL Veri Akışı Kitaplığı üç yürütme bloğu türü sağlar: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>ve System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlok<T>

ActionBlock<TInput> sınıfı, veri aldığında temsilci çağıran bir hedef bloğudur. Bir ActionBlock<TInput> nesneyi, veriler kullanılabilir olduğunda zaman uyumsuz olarak çalışan bir temsilci olarak düşünün. Bir ActionBlock<TInput> nesneye sağladığınız temsilci, Action<T> türünde veya System.Func<TInput, Task> türünde olabilir. ActionBlock<TInput> nesnesini Action<T> ile kullandığınızda, temsilci döndüğünde her giriş öğesinin işlenmesi tamamlanmış olarak kabul edilir. ile ActionBlock<TInput>bir System.Func<TInput, Task> nesne kullandığınızda, her giriş öğesinin işlenmesi ancak döndürülen Task nesne tamamlandığında tamamlanmış olarak kabul edilir. Bu iki mekanizmayı kullanarak, her giriş öğesinin hem zaman uyumlu hem de zaman uyumsuz işlenmesi için kullanabilirsiniz ActionBlock<TInput> .

Aşağıdaki temel örnek bir Int32 nesneye birden çok ActionBlock<TInput> değer göndermektedir. nesnesi bu ActionBlock<TInput> değerleri konsola yazdırır. Bu örnek daha sonra bloğu tamamlanmış duruma ayarlar ve tüm veri akışı görevlerinin tamamlanmasını bekler.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Sınıfıyla ActionBlock<TInput> delegelerin nasıl kullanılacağını gösteren eksiksiz örnekler için bkz Nasıl yapılır: Veri Akışı Bloğu Veri Aldığında Eylem Gerçekleştirme.

TransformBlock<TInput, TOutput>

TransformBlock<TInput,TOutput> sınıfı, ActionBlock<TInput> sınıfına benzer; ancak hem kaynak hem de hedef olarak işlev görür. TransformBlock<TInput,TOutput> nesnesine aktardığınız temsilci, TOutput türünde bir değer döndürür. Bir TransformBlock<TInput,TOutput> nesneye sağladığınız temsilci türünde System.Func<TInput, TOutput> veya türünde System.Func<TInput, Task<TOutput>>olabilir. Bir TransformBlock<TInput,TOutput> nesnesini System.Func<TInput, TOutput> ile kullandığınızda, temsilci döndüğünde her bir giriş öğesinin işlenmesi tamamlanmış kabul edilir. ile TransformBlock<TInput,TOutput>kullanılan bir System.Func<TInput, Task<TOutput>> nesne kullandığınızda, her giriş öğesinin işlenmesi ancak döndürülen Task<TResult> nesne tamamlandığında tamamlanmış olarak kabul edilir. gibi ActionBlock<TInput>, bu iki mekanizmayı kullanarak her giriş öğesinin hem zaman uyumlu hem de zaman uyumsuz işlenmesi için kullanabilirsiniz TransformBlock<TInput,TOutput> .

Aşağıdaki temel örnek, girişinin karekökünü hesaplayan bir TransformBlock<TInput,TOutput> nesne oluşturur. TransformBlock<TInput,TOutput> nesnesi değerleri giriş olarak alır Int32 ve çıkış olarak değerler üretirDouble.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Windows Forms uygulamasında görüntü işleme gerçekleştiren bir veri akışı blokları ağında kullanılan TransformBlock<TInput,TOutput> tam örnekler için bkz . İzlenecek Yol: Windows Forms Uygulamasında Veri Akışını Kullanma.

TransformManyBlock<TInput, TOutput>

sınıfı TransformManyBlock<TInput,TOutput> sınıfına TransformBlock<TInput,TOutput> benzer, ancak TransformManyBlock<TInput,TOutput> her giriş değeri için yalnızca bir çıkış değeri yerine her giriş değeri için sıfır veya daha fazla çıkış değeri üretir. Bir TransformManyBlock<TInput,TOutput> nesneye sağladığınız temsilci türünde System.Func<TInput, IEnumerable<TOutput>> veya türünde System.Func<TInput, Task<IEnumerable<TOutput>>>olabilir. Bir TransformManyBlock<TInput,TOutput> nesnesini System.Func<TInput, IEnumerable<TOutput>> ile kullandığınızda, temsilci döndüğünde her bir giriş öğesinin işlenmesi tamamlanmış kabul edilir. ile TransformManyBlock<TInput,TOutput>bir System.Func<TInput, Task<IEnumerable<TOutput>>> nesne kullandığınızda, her giriş öğesinin işlenmesi yalnızca döndürülen System.Threading.Tasks.Task<IEnumerable<TOutput>> nesne tamamlandığında tamamlanmış olarak kabul edilir.

Aşağıdaki temel örnek, dizeleri kendi karakter dizilerine bölen bir TransformManyBlock<TInput,TOutput> nesne oluşturur. TransformManyBlock<TInput,TOutput> nesnesi değerleri giriş olarak alır String ve çıkış olarak değerler üretirChar.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Bir veri akışı işlem hattındaki her giriş için birden çok bağımsız çıkış üretmek için kullanılan TransformManyBlock<TInput,TOutput> tam örnekler için bkz . İzlenecek Yol: Veri Akışı İşlem Hattı Oluşturma.

Paralellik Derecesi

Her ActionBlock<TInput>, TransformBlock<TInput,TOutput> ve TransformManyBlock<TInput,TOutput> nesnesi, blok bunları işlemeye hazır olana dek giriş iletilerini arabelleğe alır. Varsayılan olarak, bu sınıflar iletileri alındıkları sırayla, teker teker işler. Ayrıca , ActionBlock<TInput> ve TransformBlock<TInput,TOutput> nesnelerinin birden çok iletiyi eşzamanlı olarak işlemesini sağlamak TransformManyBlock<TInput,TOutput>için paralellik derecesini belirtebilirsiniz. Eşzamanlı yürütme hakkında daha fazla bilgi için bu belgenin devamında Yer alan Paralellik Derecesini Belirtme bölümüne bakın. Yürütme veri akışı bloğunun aynı anda birden fazla iletiyi işlemesini sağlamak için paralellik derecesini ayarlayan bir örnek için bkz . Nasıl yapılır: Veri Akışı Bloğunda Paralellik Derecesini Belirtme.

Temsilci Türlerinin Özeti

Aşağıdaki tabloda ActionBlock<TInput>, TransformBlock<TInput,TOutput>, ve TransformManyBlock<TInput,TOutput> nesnelerine sağlayabileceğiniz temsilci türleri özetlenmektedir. Bu tablo ayrıca temsilci türünün zaman uyumlu mu yoksa zaman uyumsuz mu çalıştığını belirtir.

Türü Senkron Temsilci Türü Asenkron Delege Türü
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Yürütme bloğu türleriyle çalışırken lambda ifadelerini de kullanabilirsiniz. Yürütme bloğuyla lambda ifadesinin nasıl kullanılacağını gösteren bir örnek için bkz . Nasıl yapılır: Veri Akışı Bloğu Veri Aldığında Eylem Gerçekleştirme.

Blokların Gruplanması

Gruplandırma blokları, bir veya daha fazla kaynaktan ve çeşitli kısıtlamalar altında verileri birleştirir. TPL Veri Akışı Kitaplığı üç birleştirme bloğu türü sağlar: BatchBlock<T>, JoinBlock<T1,T2>ve BatchedJoinBlock<T1,T2>.

BatchBlock<T>

BatchBlock<T> sınıfı, toplu veri olarak bilinen giriş veri kümelerini çıkış verileri dizileri halinde birleştirir. BatchBlock<T> nesnesi oluşturduğunuzda her bir partinin boyutunu belirlersiniz. BatchBlock<T> Nesne belirtilen giriş öğelerinin sayısını aldığında, bu öğeleri içeren bir diziyi zaman uyumsuz olarak yayar. Bir BatchBlock<T> nesne tamamlanmış duruma ayarlanmışsa fakat toplu iş oluşturmak için yeterli öğe içermiyorsa, kalan giriş öğelerini içeren son diziyi yayar.

BatchBlock<T> sınıfı doyumsuz veya doyumsuz olmayan modda çalışır. Varsayılan olan doyumsuz modda, bir BatchBlock<T> nesne sunulan her iletiyi kabul eder ve belirtilen sayıda öğeyi aldıktan sonra bir diziyi yayar. Doyumsuz olmayan modda, bir BatchBlock<T> nesne gelen tüm iletileri, yeterli kaynak toplu iş oluşturmak için bloğa ileti sunana kadar erteler. Doyumsuz mod genellikle daha az işlem yükü gerektirdiğinden doyumsuz olmayan moddan daha iyi performans gösterir. Ancak, birden çok kaynaktan tüketimi atomik bir şekilde koordine etmeniz gerektiğinde açgözlü olmayan modu kullanabilirsiniz. Doyumsuz modu belirtmek için Greedy oluşturucusundaki False parametresini dataflowBlockOptions'dan BatchBlock<T>'e ayarlayın.

Aşağıdaki temel örnek, bir toplu iş içinde on öğeyi barındıran bir Int32 nesneye birkaç BatchBlock<T> değer gönderir. Tüm değerlerin BatchBlock<T> öğesinden dışarı yayıldığını garanti etmek için, bu örnek Complete yöntemini çağırır. Complete yöntemi BatchBlock<T> nesneyi tamamlanmış duruma ayarlar ve bu nedenle BatchBlock<T> nesnesi kalan öğeleri son bir toplu işlem olarak dışarıda yayar.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Veritabanı ekleme işlemlerinin verimliliğini artırmak için kullanan BatchBlock<T> eksiksiz bir örnek için bkz . İzlenecek Yol: Verimliliği Artırmak için BatchBlock ve BatchedJoinBlock Kullanma.

JoinBlock<T1, T2, ...>

JoinBlock<T1,T2> ve JoinBlock<T1,T2,T3> sınıfları giriş öğelerini toplar ve bu öğeleri içeren System.Tuple<T1,T2> veya System.Tuple<T1,T2,T3> nesnelerini yayar. JoinBlock<T1,T2> ve JoinBlock<T1,T2,T3> sınıfları ITargetBlock<TInput>'den devralınmaz. Bunun yerine, Target1'ü uygulayan Target2, Target3 ve ITargetBlock<TInput> özelliklerini sağlarlar.

BatchBlock<T> gibi, JoinBlock<T1,T2> ve JoinBlock<T1,T2,T3>, istekli veya isteksiz modda çalışır. Varsayılan olan açgözlü modda, bir JoinBlock<T1,T2> veya JoinBlock<T1,T2,T3> nesnesi sunulduğu her iletiyi kabul eder ve hedeflerinin her biri en az bir ileti aldıktan sonra bir ikiliyi iletir. Tutumsuz olmayan modda, bir JoinBlock<T1,T2> veya JoinBlock<T1,T2,T3> nesnesi, demet oluşturmak için gereken verilerin tüm hedeflere sunulmasına kadar tüm gelen iletileri erteler. Bu noktada blok, tüm gerekli öğeleri kaynaklardan atomik olarak almak için iki aşamalı bir işleme protokolü uygular. Bu erteleme, başka bir varlığın bu sırada verileri tüketmesini ve sistemin ilerlemesini sağlamasını mümkün kılar.

Aşağıdaki temel örnekte, bir nesnenin bir JoinBlock<T1,T2,T3> değeri hesaplamak için birden çok veriye ihtiyaç duyması durumu gösterilmektedir. Bu örnek, aritmetik bir JoinBlock<T1,T2,T3> işlem gerçekleştirmek için iki Int32 değer ve bir Char değer gerektiren bir nesne oluşturur.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

İsteksiz modda bir kaynağı iş birliği içinde paylaşmak için JoinBlock<T1,T2> nesnelerini kullanan eksiksiz bir örnek için makalesine bakın: Birden Çok Kaynaktan Veri Okumak için JoinBlock Kullanma.

BatchedJoinBlock<T1, T2, ...>

BatchedJoinBlock<T1,T2> ve BatchedJoinBlock<T1,T2,T3> sınıfları, giriş öğelerinin gruplarını toplar ve bu öğeleri içeren System.Tuple(IList(T1), IList(T2)) veya System.Tuple(IList(T1), IList(T2), IList(T3)) nesneleri yayar. BatchedJoinBlock<T1,T2>'yı, BatchBlock<T> ve JoinBlock<T1,T2>'nin bir birleşimi olarak düşünün. Bir BatchedJoinBlock<T1,T2> nesne oluşturduğunuzda her toplu işlemin boyutunu belirtin. BatchedJoinBlock<T1,T2> ayrıca, Target1 ve Target2 özelliklerini sağlayan ITargetBlock<TInput>'yi uygular. Belirtilen giriş öğelerinin sayısı tüm hedeflerden alındığında, BatchedJoinBlock<T1,T2> nesne, bu öğeleri içeren bir System.Tuple(IList(T1), IList(T2)) nesneyi asenkron olarak yayar.

Aşağıdaki temel örnek, sonuçlar, BatchedJoinBlock<T1,T2> değerler ve Int32 nesneleri olan hataları içeren bir Exception nesnesi oluşturur. Bu örnek, birden çok işlem gerçekleştirir ve sonuçları Target1 özelliğine, hataları ise Target2 özelliğine, BatchedJoinBlock<T1,T2> nesnesi üzerinde yazar. Başarılı ve başarısız işlemlerin sayısı önceden bilinmediğinden, nesneler her hedefin IList<T> sıfır veya daha fazla değer almasını sağlar.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Programın bir veritabanından okuma yaptığı sırada hem sonuçları hem de oluşan özel durumları yakalamak için BatchedJoinBlock<T1,T2> kullanan tam bir örnek için bkz Verimliliği Artırmak için İzlenecek Yol: BatchBlock ve BatchedJoinBlock Kullanma.

Veri Akışı Bloğu Davranışını Yapılandırma

Veri akışı blok türlerinin oluşturucusuna bir System.Threading.Tasks.Dataflow.DataflowBlockOptions nesnesi sağlayarak ek seçenekleri etkinleştirebilirsiniz. Bu seçenekler, temel alınan görevi ve paralellik derecesini yöneten zamanlayıcı gibi davranışları denetler. ayrıca DataflowBlockOptions , belirli veri akışı bloğu türlerine özgü davranışı belirten türetilmiş türler de vardır. Aşağıdaki tabloda, her veri akışı blok türüyle ilişkili seçenek türü özetlenmiştir.

Veri Akışı Blok Türü DataflowBlockOptions tür
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Aşağıdaki bölümlerde, , System.Threading.Tasks.Dataflow.DataflowBlockOptionsve System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions sınıfları aracılığıyla System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptionskullanılabilen önemli veri akışı bloğu seçenekleri hakkında ek bilgi sağlanır.

Görev Zamanlayıcısı Belirtme

Önceden tanımlanmış her veri akışı bloğu, verileri hedefe yayma, bir kaynaktan veri alma ve veriler kullanılabilir olduğunda kullanıcı tanımlı temsilciler çalıştırma gibi etkinlikleri gerçekleştirmek için TPL görev zamanlama mekanizmasını kullanır. TaskScheduler , görevleri iş parçacıklarına sıraya ekleyen bir görev zamanlayıcıyı temsil eden soyut bir sınıftır. Varsayılan görev zamanlayıcı olan Default, sınıfını ThreadPool kullanarak işi kuyruğa alır ve yürütür. TaskScheduler özelliğini ayarlayarak varsayılan görev zamanlayıcıyı, bir veri akışı bloğu nesnesi oluşturduğunuzda geçersiz kılabilirsiniz.

Aynı görev zamanlayıcı birden çok veri akışı bloğunu yönettiğinde, bunlar arasında ilkeleri zorunlu kılabilir. Örneğin, her biri aynı ConcurrentExclusiveSchedulerPair nesnenin özel durum zamanlayıcısını hedeflemek üzere yapılandırılmış birden çok veri akışı bloğu varsa, bu bloklar arasında çalışan tüm çalışmalar seri hale getirilir. Benzer şekilde, bu bloklar aynı ConcurrentExclusiveSchedulerPair nesnenin eşzamanlı zamanlayıcısına hedef olacak şekilde yapılandırılmışsa ve bu zamanlayıcı en yüksek eşzamanlılık düzeyine sahip olacak şekilde yapılandırılmışsa, bu bloklardan gelen tüm çalışmalar bu eşzamanlı işlem sayısıyla sınırlıdır. Okuma işlemlerinin ConcurrentExclusiveSchedulerPair paralel olarak gerçekleşmesini, ancak yazma işlemlerinin yalnızca diğer tüm işlemlerden oluşmasını sağlamak için sınıfını kullanan bir örnek için bkz . Nasıl yapılır: Veri Akışı Bloğunda Görev Zamanlayıcı Belirtme. TPL'deki görev zamanlayıcıları hakkında daha fazla bilgi için sınıf konusuna TaskScheduler bakın.

Paralellik Derecesini Belirtme

Varsayılan olarak, TPL Veri Akışı Kitaplığı'nın sağladığı ActionBlock<TInput>üç yürütme bloğu türü, , TransformBlock<TInput,TOutput>ve TransformManyBlock<TInput,TOutput>her seferinde bir iletiyi işler. Bu veri akışı bloğu türleri, iletileri alındıkları sırayla da işler. Bu veri akışı bloklarının iletileri eşzamanlı olarak işlemesini sağlamak için, veri akışı bloğu nesnesini oluştururken özelliğini ayarlayın ExecutionDataflowBlockOptions.MaxDegreeOfParallelism .

varsayılan değeri MaxDegreeOfParallelism 1'dir ve bu değer veri akışı bloğunun bir kerede bir iletiyi işlemesini garanti eder. Bu özelliğin 1'den büyük bir değere ayarlanması, veri akışı bloğunun birden çok iletiyi eşzamanlı olarak işlemesini sağlar. Bu özelliğin olarak DataflowBlockOptions.Unbounded ayarlanması, temel alınan görev zamanlayıcının en yüksek eşzamanlılık derecesini yönetmesini sağlar.

Önemli

En fazla 1'den büyük bir paralellik derecesi belirttiğinizde, birden çok ileti aynı anda işlenir ve bu nedenle iletiler alındıkları sırayla işlenmeyebilir. Ancak, iletilerin bloktan çıkış sırası, alındıkları sıradır.

MaxDegreeOfParallelism özelliği en yüksek paralellik derecesini temsil ettiğinden, veri akışı bloğu belirttiğinizden daha az paralellik derecesiyle yürütülebilir. Veri akışı bloğu, işlevsel gereksinimlerini karşılamak için veya kullanılabilir sistem kaynaklarının olmaması nedeniyle daha az paralellik kullanabilir. Veri akışı bloğu hiçbir zaman belirttiğinizden daha fazla paralellik seçmez.

özelliğinin MaxDegreeOfParallelism değeri her veri akışı bloğu nesnesine özeldir. Örneğin, dört veri akışı bloğu nesnesinin her biri en yüksek paralellik derecesi için 1 belirtirse, dört veri akışı bloğu nesnesinin tümü de paralel olarak çalışabilir.

Uzun işlemlerin paralel gerçekleşmesini sağlamak için en yüksek paralellik derecesini ayarlayan bir örnek için bkz . Nasıl yapılır: Veri Akışı Bloğunda Paralellik Derecesini Belirtme.

Görev Başına İleti Sayısını Belirtme

Önceden tanımlanmış veri akışı bloğu türleri, birden çok giriş öğesini işlemek için görevleri kullanır. Bu, uygulamaların daha verimli çalışmasını sağlayan verileri işlemek için gereken görev nesnelerinin sayısını en aza indirmeye yardımcı olur. Ancak, bir veri akışı blokları kümesindeki görevler verileri işliyorsa, diğer veri akışı bloklarından gelen görevlerin iletileri kuyruğa alarak işlem süresini beklemesi gerekebilir. Veri akışı görevleri arasında daha iyi eşitlik sağlamak için özelliğini ayarlayın MaxMessagesPerTask . MaxMessagesPerTask varsayılan olan olarak ayarlandığındaDataflowBlockOptions.Unbounded, veri akışı bloğu tarafından kullanılan görev kullanılabilir sayıda iletiyi işler. MaxMessagesPerTask dışında bir değere Unboundedayarlandığında, veri akışı bloğu nesne başına Task en fazla bu sayıda iletiyi işler. Özelliğin MaxMessagesPerTask ayarlanması görevler arasında eşitliği artırabilir, ancak sistemin gerekenden daha fazla görev oluşturmasına neden olabilir ve bu da performansı düşürebilir.

İptal Etkinleştirme

TPL, görevlerin işbirliğine dayalı bir şekilde iptali koordine etmelerini sağlayan bir mekanizma sağlar. Veri akışı bloklarının bu iptal mekanizmasına katılmasını sağlamak için özelliğini ayarlayın CancellationToken . Bu CancellationToken nesne iptal edildi durumuna ayarlandığında, bu belirteci izleyen tüm veri akışı blokları geçerli öğelerinin yürütülmesini tamamlar ancak sonraki öğeleri işlemeye başlamaz. Bu veri akışı blokları ayrıca arabelleğe alınmış iletileri temizler, herhangi bir kaynak ve hedef blokla bağlantıları keser ve iptal edilen duruma geçiş yapar. İptal edilen duruma geçildiğinde, bir özel durum oluşmadığı sürece Completion özelliği Status özelliği Canceled olarak ayarlanmıştır. Bu durumda, Status olarak Faultedayarlanır.

Windows Forms uygulamasında iptalin nasıl kullanılacağını gösteren bir örnek için bkz . Nasıl yapılır: Veri Akışı Bloğunu İptal Etme. TPL'de iptal hakkında daha fazla bilgi için bkz . Görev İptali.

Doyumsuz ve Doyumsuz Olmayan Davranışı Belirtme

Çeşitli gruplama veri akışı bloğu türleri doyumsuz veya doyumsuz olmayan modda çalışabilir. Varsayılan olarak, önceden tanımlanmış veri akışı blok türleri açgözlü modda çalışır.

Birleştirme bloğu türleri, örneğin JoinBlock<T1,T2>, gibi durumlarda doyumsuz mod, karşılık gelen veriler henüz mevcut olmasa bile bloğun verileri hemen kabul ettiği anlamına gelir. Açgözlü olmayan mod, bloğun, birleşimi tamamlamak için her hedefte bir tane mevcut olana kadar tüm gelen mesajları ertelediğini belirtir. Ertelenen iletilerden herhangi biri artık kullanılamıyorsa, birleştirme bloğu tüm ertelenen iletileri serbest bırakır ve işlemi yeniden başlatır. BatchBlock<T> sınıfı için, açgözlü ve açgözlü olmayan davranışlar benzerdir, ancak açgözlü olmayan modda, bir BatchBlock<T> nesne, farklı kaynaklardan yeterli sayıda ileti birikip bir toplu işi tamamlayana kadar tüm gelen iletileri bekletir.

Veri akışı bloğu için tutumlu olmayan modu belirtmek amacıyla Greedy'yi False olarak ayarlayın. Bir veri kaynağının daha verimli bir şekilde paylaşılması için birden çok birleştirme bloğunun etkinleştirilmesinde hırslı olmayan modun nasıl kullanılacağını gösteren bir örnek için bkz. Nasıl yapılır: Birden Çok Kaynaktan Veri Okumak için JoinBlock Kullanma.

Özel Veri Akışı Blokları

TPL Veri Akışı Kitaplığı önceden tanımlanmış birçok blok türü sağlasa da, özel davranış gerçekleştiren ek blok türleri oluşturabilirsiniz. ISourceBlock<TOutput> veya ITargetBlock<TInput> arabirimlerini doğrudan uygulayın veya yöntemini kullanarak Encapsulate mevcut blok türlerinin davranışını kapsülleyen karmaşık bir blok oluşturun. Özel veri akışı bloğu işlevselliğinin nasıl uygulandığını gösteren örnekler için bkz . İzlenecek Yol: Özel Veri Akışı Blok Türü Oluşturma.

Başlık Açıklama
Nasıl yapılır: Veri Akışı Bloğuna İleti Yazma ve Veri Akışı Bloğundan İletileri Okuma Bir nesneye ileti yazmayı ve nesneden BufferBlock<T> iletileri okumayı gösterir.
Nasıl yapılır: Producer-Consumer Veri Akışı Deseni Uygulama Üreticinin bir veri akışı bloğuna ileti gönderdiği ve tüketicinin bu bloktan iletileri okuduğu bir üretici-tüketici deseni uygulamak için veri akışı modelinin nasıl kullanılacağını açıklar.
Nasıl yapılır: Veri Akışı Bloğu Veri Aldığında Eylem Gerçekleştirme Yürütme veri akışı bloğu türlerine, ActionBlock<TInput>, TransformBlock<TInput,TOutput> ve TransformManyBlock<TInput,TOutput> temsilciler nasıl sağlanır açıklar.
Adım Adım Kılavuz: Veri Akışı Boru Hattı Oluşturma Web'den metin indiren ve bu metin üzerinde işlem gerçekleştiren bir veri akışı işlem hattının nasıl oluşturulacağını açıklar.
Nasıl yapılır: Veri Akışı Bloklarının Bağlantısını Kaldırma LinkTo yönteminin, kaynak hedefe bir ileti sunduktan sonra, hedef bloğun kaynağıyla bağlantısını nasıl keseceğini gösterir.
İzlenecek yol: Windows Forms Uygulamasında Veri Akışı Kullanma Windows Forms uygulamasında görüntü işleme gerçekleştiren veri akışı bloklarından oluşan bir ağ oluşturmayı gösterir.
Nasıl yapılır: Veri Akışı Bloğunu İptal Etme Windows Forms uygulamasında iptalin nasıl kullanılacağını gösterir.
Nasıl yapılır: Birden Çok Kaynaktan Veri Okumak için JoinBlock Kullanma Birden çok kaynaktan veri kullanılabilir olduğunda bir işlem gerçekleştirmek için JoinBlock<T1,T2> sınıfını nasıl kullanacağınızı ve birden çok join bloğunun veri kaynağını daha verimli bir şekilde paylaşılmasına olanak tanımak için doyumsuz olmayan modun nasıl kullanılacağını açıklar.
Nasıl yapılır: Veri Akışı Bloğunda Paralellik Derecesini Belirtme Bir yürütme veri akışı bloğunun MaxDegreeOfParallelism aynı anda birden fazla iletiyi işleyecek şekilde etkinleştirilmesi için özelliğinin nasıl ayarlanacağı açıklanır.
Nasıl yapılır: Veri Akışı Bloğunda Görev Zamanlayıcı Belirtme Uygulamanızda veri akışı kullanırken belirli bir görev zamanlayıcısını nasıl ilişkilendirebilirsiniz gösterir.
İzlenecek yol: Verimliliği Artırmak için BatchBlock ve BatchedJoinBlock Kullanma Veritabanı ekleme işlemlerinin BatchBlock<T> verimliliğini artırmak için sınıfının nasıl kullanılacağını ve hem sonuçları hem de program veritabanından BatchedJoinBlock<T1,T2> okurken oluşan özel durumları yakalamak için sınıfın nasıl kullanılacağını açıklar.
İzlenecek yol: Özel Veri Akışı Blok Türü Oluşturma Özel davranış uygulayan bir veri akışı blok türü oluşturmanın iki yolunu gösterir.
Görev Paralel Kitaplığı (TPL) .NET Framework uygulamalarında paralel ve eşzamanlı programlamayı basitleştiren bir kitaplık olan TPL'yi tanıtır.