共用方式為


HOW TO:在管線中使用封鎖集合的陣列

下列範例將示範如何使用 System.Collections.Concurrent.BlockingCollection<T> 物件的陣列搭配靜態方法 (例如 TryAddToAnyTryTakeFromAny),在元件之間實作快速且彈性的資料傳輸。

範例

下列範例將示範基本管線實作,其中每個物件都會以並行方式從輸入集合中取得資料、轉換資料,然後將資料傳遞給輸出集合。

Imports System
Imports System.Collections
Imports System.Collections.Concurrent
Imports System.Collections.Generic
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks
Namespace BlockingCollectionPipeline

    Class PipeLineDemo

        Public Shared Sub Main()

            Dim cts As CancellationTokenSource = New CancellationTokenSource()

            ' Start up a UI thread for cancellation.
            Task.Factory.StartNew(Sub()

                                      If (Console.ReadKey().KeyChar = "c"c) Then
                                          cts.Cancel()
                                      End If
                                  End Sub)
            'Generate some source data.
            Dim sourceArrays() As BlockingCollection(Of Integer)
            ReDim sourceArrays(5)
            For i As Integer = 0 To sourceArrays.Length - 1
                sourceArrays(i) = New BlockingCollection(Of Integer)(500)
            Next

            Parallel.For(0, sourceArrays.Length * 500, Sub(j)

                                                           Dim k = BlockingCollection(Of Integer).TryAddToAny(sourceArrays, j)
                                                           If (k >= 0) Then
                                                               Console.WriteLine("added {0} to source data", j)
                                                           End If
                                                       End Sub)

            For Each arr In sourceArrays
                arr.CompleteAdding()
            Next

            ' First filter accepts the ints, keeps back a small percentage
            ' as a processing fee, and converts the results to decimals.
            Dim filter1 = New PipelineFilter(Of Integer, Decimal)(
            sourceArrays,
            Function(n)
                Return Convert.ToDecimal(n * 0.97)
            End Function,
                        cts.Token,
                        "filter1"
             )
            ' Second filter accepts the decimals and converts them to 
            ' System.Strings.
            Dim filter2 = New PipelineFilter(Of Decimal, String)(
            filter1.m_output,
            Function(s) (String.Format("{0}", s)),
            cts.Token,
            "filter2"
             )
            ' Third filter uses the constructor with an Action<T>
            ' that renders its output to the screen, 
            ' not a blocking collection.
            Dim filter3 = New PipelineFilter(Of String, String)(
            filter2.m_output,
            Sub(s) Console.WriteLine("The final result is {0}", s),
            cts.Token,
            "filter3"
             )
            ' Start up the pipeline!
            Try

                Parallel.Invoke(
                             Sub() filter1.Run(),
                             Sub() filter2.Run(),
                             Sub() filter3.Run()
                         )

            Catch ae As AggregateException
                For Each ex In ae.InnerExceptions
                    Console.WriteLine(ex.Message + ex.StackTrace)
                Next

            End Try

            ' You will need to press twice if you ran to the end:
            ' once for the cancellation thread, and once for this thread.
            Console.WriteLine("Press any key.")
            Console.ReadKey()
        End Sub
    End Class
        class PipelineFilter(Of TInput, TOutput)

        Private m_processor As Func(Of TInput, TOutput) = Nothing
        Public m_input() As BlockingCollection(Of TInput) = Nothing
        Public m_output() As BlockingCollection(Of TOutput) = Nothing
        Private m_outputProcessor As Action(Of TInput) = Nothing
        Private m_token As CancellationToken
        Public Name As String
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
                ByVal processor As Func(Of TInput, TOutput),
                ByVal token As CancellationToken,
                ByVal name As String)

            m_input = input
            '  m_output = New BlockingCollection(Of TOutput)()
            ReDim m_output(5)
            For i As Integer = 0 To m_output.Length - 1
                m_output(i) = New BlockingCollection(Of TOutput)(500)
                m_processor = processor
                m_token = token
                name = name
            Next
        End Sub

        ' Use this constructor for the final endpoint, which does
        ' something like write to file or screen, instead of 
        ' pushing to another pipeline filter.
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
             ByVal renderer As Action(Of TInput),
           ByVal token As CancellationToken,
            ByVal name As String)

            m_input = input
            m_outputProcessor = renderer
            m_token = token
            name = name
        End Sub
        Public Sub Run()

            Console.WriteLine("{0} is running", Me.Name)
            While ((m_input.All(Function(bc) bc.IsCompleted) = False) And m_token.IsCancellationRequested = False)

                Dim receivedItem As TInput
                Dim i As Integer = BlockingCollection(Of TInput).TryTakeFromAny(
                        m_input, receivedItem, 50, m_token)
                If (i >= 0) Then

                    If (Not m_output Is Nothing) Then ' we pass data to another blocking collection

                        Dim outputItem As TOutput = m_processor(receivedItem)
                        BlockingCollection(Of TOutput).AddToAny(m_output, outputItem)
                        Console.WriteLine("{0} sent{1} to next", Me.Name, outputItem)

                    Else ' we're an endpoint

                        m_outputProcessor(receivedItem)
                    End If

                    else
                    Console.WriteLine("Unable to retrieve data from previous filter")
                End If
                        End While
            If (Not m_output Is Nothing) Then

                For Each bc In m_output
                    bc.CompleteAdding()
                Next

            End If
        End Sub
    End Class
End Namespace

namespace BlockingCollectionPipeline
{
    using System;
    using System.Collections;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;


    class PipeLineDemo
    {

        public static void Main()
        {
            CancellationTokenSource cts = new CancellationTokenSource();

            // Start up a UI thread for cancellation.
            Task.Factory.StartNew(() =>
                {
                    if(Console.ReadKey().KeyChar == 'c')
                        cts.Cancel();
                });

            //Generate some source data.
            BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5];
            for(int i = 0; i < sourceArrays.Length; i++)
                sourceArrays[i] = new BlockingCollection<int>(500);
            Parallel.For(0, sourceArrays.Length * 500, (j) =>
                                {
                                    int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
                                    if(k >=0)
                                        Console.WriteLine("added {0} to source data", j);
                                });

            foreach (var arr in sourceArrays)
                arr.CompleteAdding();

            // First filter accepts the ints, keeps back a small percentage
            // as a processing fee, and converts the results to decimals.
            var filter1 = new PipelineFilter<int, decimal>
            (
                sourceArrays,
                (n) => Convert.ToDecimal(n * 0.97),
                cts.Token,
                "filter1"
             );

            // Second filter accepts the decimals and converts them to 
            // System.Strings.
            var filter2 = new PipelineFilter<decimal, string>
            (
                filter1.m_output,
                (s) => String.Format("{0}", s),
                cts.Token,
                "filter2"
             );

            // Third filter uses the constructor with an Action<T>
            // that renders its output to the screen, 
            // not a blocking collection.
            var filter3 = new PipelineFilter<string, string>
            (
                filter2.m_output,
                (s) => Console.WriteLine("The final result is {0}", s),
                cts.Token,
                "filter3"
             );

             // Start up the pipeline!
            try
            {
                Parallel.Invoke(
                             () => filter1.Run(),
                             () => filter2.Run(),
                             () => filter3.Run()
                         );
            }
            catch (AggregateException ae)
                {
                foreach(var ex in ae.InnerExceptions)
                    Console.WriteLine(ex.Message + ex.StackTrace);
                }

            // You will need to press twice if you ran to the end:
            // once for the cancellation thread, and once for this thread.
            Console.WriteLine("Press any key.");
            Console.ReadKey();


        }

        class PipelineFilter<TInput, TOutput>
        {
            Func<TInput, TOutput> m_processor = null;
            public BlockingCollection<TInput>[] m_input;
            public BlockingCollection<TOutput>[] m_output = null;
            Action<TInput> m_outputProcessor = null;
            CancellationToken m_token;
            public string Name { get; private set; }

            public PipelineFilter(
                BlockingCollection<TInput>[] input,
                Func<TInput, TOutput> processor,
                CancellationToken token,
                string name)
            {
                m_input = input;
                m_output = new BlockingCollection<TOutput>[5];
                for (int i = 0; i < m_output.Length; i++)
                    m_output[i] = new BlockingCollection<TOutput>(500);

                m_processor = processor;
                m_token = token;
                Name = name;
            }

            // Use this constructor for the final endpoint, which does
            // something like write to file or screen, instead of 
            // pushing to another pipeline filter.
            public PipelineFilter(
                BlockingCollection<TInput>[] input,
                Action<TInput> renderer,
                CancellationToken token,
                string name)
            {
                m_input = input;
                m_outputProcessor = renderer;
                m_token = token;
                Name = name;
            }

            public void Run()
            {
                Console.WriteLine("{0} is running", this.Name);
                while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested)
                {
                    TInput receivedItem;
                    int i = BlockingCollection<TInput>.TryTakeFromAny(
                        m_input, out receivedItem, 50, m_token);
                    if ( i >= 0)
                    {
                        if (m_output != null) // we pass data to another blocking collection
                        {
                            TOutput outputItem = m_processor(receivedItem);
                            BlockingCollection<TOutput>.AddToAny(m_output, outputItem);
                            Console.WriteLine("{0} sent {1} to next", this.Name, outputItem);
                        }
                        else // we're an endpoint
                        {
                            m_outputProcessor(receivedItem);
                        }
                    }
                    else
                        Console.WriteLine("Unable to retrieve data from previous filter");
                }
                if (m_output != null)
                {
                    foreach (var bc in m_output) bc.CompleteAdding();
                }
            }
        }
    }    
}

請參閱

參考

System.Collections.Concurrent

概念

安全執行緒集合