チュートリアル:BatchBlock および BatchedJoinBlock を使用した効率の向上

TPL データ フロー ライブラリが提供する System.Threading.Tasks.Dataflow.BatchBlock<T> および System.Threading.Tasks.Dataflow.BatchedJoinBlock<T1,T2> クラスを使って、1 つ以上のソースからデータを受信してバッファーに格納し、それを 1 つのコレクションとして反映することができます。 このバッチ メカニズムは、1 つ以上のソースからデータを収集し、複数のデータ要素をバッチとして処理する場合に便利です。 例として、データフローを使ってレコードをデータベースに挿入するアプリケーションについて考えてみましょう。 この操作は、1 つずつ順番にではなく、複数の項目が同時に挿入される場合により効率的となります。 このドキュメントでは、BatchBlock<T> クラスを使用して、そのようなデータベースの挿入操作を効率的に行う方法について説明します。 また、BatchedJoinBlock<T1,T2> クラスを使用して、プログラムでデータベースを読み取る場合に、その結果と発生する例外の両方をキャプチャする方法について説明します。

注意

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow を実行します。

[前提条件]

  1. このチュートリアルを開始する前に、ドキュメント「データフロー」の結合ブロックのセクションを参照してください。

  2. コンピューターに Northwind データベース (Northwind.sdf) のコピーがあることを確認します。 このファイルは通常は、%Program Files%\Microsoft SQL Server Compact Edition\v3.5\Samples\ フォルダーに置かれています。

    重要

    Windows のバージョンによっては、Visual Studio が管理者以外のモードで実行されている場合には、Northwind.sdf に接続できません。 Northwind.sdf に接続するには、[管理者として実行] モードで、Visual Studio または Visual Studio 用開発者コマンド プロンプトを開始します。

このチュートリアルは、次のセクションで構成されています。

コンソール アプリケーションの作成

  1. Visual Studio で、Visual C# または Visual Basic のコンソール アプリケーション プロジェクトを作成します。 このドキュメントでは、プロジェクトの名前を DataflowBatchDatabase とします。

  2. プロジェクトで、System.Data.SqlServerCe.dll への参照と System.Threading.Tasks.Dataflow.dll への参照を追加します。

  3. Form1.cs (Visual Basic の Form1.vb) に次の using (Visual Basic では Imports) ステートメントが含まれていることを確認します。

    using System;
    using System.Collections.Generic;
    using System.Data.SqlServerCe;
    using System.Diagnostics;
    using System.IO;
    using System.Threading.Tasks.Dataflow;
    
    Imports System.Collections.Generic
    Imports System.Data.SqlServerCe
    Imports System.Diagnostics
    Imports System.IO
    Imports System.Threading.Tasks.Dataflow
    
    
  4. Program クラスに次のデータ メンバーを追加します。

    // The number of employees to add to the database.
    // TODO: Change this value to experiment with different numbers of
    // employees to insert into the database.
    static readonly int insertCount = 256;
    
    // The size of a single batch of employees to add to the database.
    // TODO: Change this value to experiment with different batch sizes.
    static readonly int insertBatchSize = 96;
    
    // The source database file.
    // TODO: Change this value if Northwind.sdf is at a different location
    // on your computer.
    static readonly string sourceDatabase =
       @"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf";
    
    // TODO: Change this value if you require a different temporary location.
    static readonly string scratchDatabase =
       @"C:\Temp\Northwind.sdf";
    
    ' The number of employees to add to the database.
    ' TODO: Change this value to experiment with different numbers of 
    ' employees to insert into the database.
    Private Shared ReadOnly insertCount As Integer = 256
    
    ' The size of a single batch of employees to add to the database.
    ' TODO: Change this value to experiment with different batch sizes.
    Private Shared ReadOnly insertBatchSize As Integer = 96
    
    ' The source database file.
    ' TODO: Change this value if Northwind.sdf is at a different location
    ' on your computer.
    Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"
    
    ' TODO: Change this value if you require a different temporary location.
    Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
    

Employee クラスの定義

Program クラスに Employee クラスを追加します。

// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
   public int EmployeeID { get; set; }
   public string LastName { get; set; }
   public string FirstName { get; set; }

   // A random number generator that helps tp generate
   // Employee property values.
   static Random rand = new Random(42);

   // Possible random first names.
   static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
   // Possible random last names.
   static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };

   // Creates an Employee object that contains random
   // property values.
   public static Employee Random()
   {
      return new Employee
      {
         EmployeeID = -1,
         LastName = lastNames[rand.Next() % lastNames.Length],
         FirstName = firstNames[rand.Next() % firstNames.Length]
      };
   }
}
' Describes an employee. Each property maps to a 
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
    Public Property EmployeeID() As Integer
    Public Property LastName() As String
    Public Property FirstName() As String

    ' A random number generator that helps tp generate
    ' Employee property values.
    Private Shared rand As New Random(42)

    ' Possible random first names.
    Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
    ' Possible random last names.
    Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}

    ' Creates an Employee object that contains random 
    ' property values.
    Public Shared Function Random() As Employee
        Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
    End Function
End Class

Employee クラスには、EmployeeIDLastName、および FirstName という 3 つのプロパティが含まれています。 これらのプロパティは、Northwind データベースの Employee ID テーブルの Last NameFirst Name および Employees の列に対応します。 このデモでは、Employee クラスは、プロパティに任意の値を持つ Random オブジェクトを作成する、Employee メソッドを定義します。

従業員データベースの操作の定義

Program クラスに InsertEmployeesGetEmployeeCount および GetEmployeeID メソッドを追加します。

// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
   using (SqlCeConnection connection =
      new SqlCeConnection(connectionString))
   {
      try
      {
         // Create the SQL command.
         SqlCeCommand command = new SqlCeCommand(
            "INSERT INTO Employees ([Last Name], [First Name])" +
            "VALUES (@lastName, @firstName)",
            connection);

         connection.Open();
         for (int i = 0; i < employees.Length; i++)
         {
            // Set parameters.
            command.Parameters.Clear();
            command.Parameters.Add("@lastName", employees[i].LastName);
            command.Parameters.Add("@firstName", employees[i].FirstName);

            // Execute the command.
            command.ExecuteNonQuery();
         }
      }
      finally
      {
         connection.Close();
      }
   }
}

// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
   int result = 0;
   using (SqlCeConnection sqlConnection =
      new SqlCeConnection(connectionString))
   {
      SqlCeCommand sqlCommand = new SqlCeCommand(
         "SELECT COUNT(*) FROM Employees", sqlConnection);

      sqlConnection.Open();
      try
      {
         result = (int)sqlCommand.ExecuteScalar();
      }
      finally
      {
         sqlConnection.Close();
      }
   }
   return result;
}

// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
   string connectionString)
{
   using (SqlCeConnection connection =
      new SqlCeConnection(connectionString))
   {
      SqlCeCommand command = new SqlCeCommand(
         string.Format(
            "SELECT [Employee ID] FROM Employees " +
            "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
            lastName, firstName),
         connection);

      connection.Open();
      try
      {
         return (int)command.ExecuteScalar();
      }
      finally
      {
         connection.Close();
      }
   }
}
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
    Using connection As New SqlCeConnection(connectionString)
        Try
            ' Create the SQL command.
            Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

            connection.Open()
            For i As Integer = 0 To employees.Length - 1
                ' Set parameters.
                command.Parameters.Clear()
                command.Parameters.Add("@lastName", employees(i).LastName)
                command.Parameters.Add("@firstName", employees(i).FirstName)

                ' Execute the command.
                command.ExecuteNonQuery()
            Next i
        Finally
            connection.Close()
        End Try
    End Using
End Sub

' Retrieves the number of entries in the Employees table in 
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
    Dim result As Integer = 0
    Using sqlConnection As New SqlCeConnection(connectionString)
        Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

        sqlConnection.Open()
        Try
            result = CInt(Fix(sqlCommand.ExecuteScalar()))
        Finally
            sqlConnection.Close()
        End Try
    End Using
    Return result
End Function

' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
    Using connection As New SqlCeConnection(connectionString)
        Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)

        connection.Open()
        Try
            Return CInt(Fix(command.ExecuteScalar()))
        Finally
            connection.Close()
        End Try
    End Using
End Function

InsertEmployees メソッドは、データベースに新しい従業員レコードを追加します。 GetEmployeeCount メソッドは Employees テーブルのエントリの数を取得します。 GetEmployeeID メソッドは、指定された名前を持つ最初の従業員 ID を取得します。 これらの各メソッドは、Northwind データベースへの接続文字列を受け取り、System.Data.SqlServerCe 名前空間の機能を使用して、データベースと通信します。

バッファリングを使用しない従業員データのデータベースへの追加

Program クラスに AddEmployees および PostRandomEmployees メソッドを追加します。

// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
   Console.WriteLine("Adding {0} entries to Employee table...", count);

   for (int i = 0; i < count; i++)
   {
      target.Post(Employee.Random());
   }
}

// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
   // Create an ActionBlock<Employee> object that adds a single
   // employee entry to the database.
   var insertEmployee = new ActionBlock<Employee>(e =>
      InsertEmployees(new Employee[] { e }, connectionString));

   // Post several random Employee objects to the dataflow block.
   PostRandomEmployees(insertEmployee, count);

   // Set the dataflow block to the completed state and wait for
   // all insert operations to complete.
   insertEmployee.Complete();
   insertEmployee.Completion.Wait();
}
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
    Console.WriteLine("Adding {0} entries to Employee table...", count)

    For i As Integer = 0 To count - 1
        target.Post(Employee.Random())
    Next i
End Sub

' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
    ' Create an ActionBlock<Employee> object that adds a single
    ' employee entry to the database.
    Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))

    ' Post several random Employee objects to the dataflow block.
    PostRandomEmployees(insertEmployee, count)

    ' Set the dataflow block to the completed state and wait for 
    ' all insert operations to complete.
    insertEmployee.Complete()
    insertEmployee.Completion.Wait()
End Sub

AddEmployees メソッドは、データベースにデータ フローを使用して任意の従業員データを追加します。 それは、ActionBlock<TInput> メソッドを呼び出す InsertEmployees オブジェクトを作成して、従業員のエントリをデータベースに追加します。 次に AddEmployees メソッドは、PostRandomEmployees メソッドを呼び出して、複数の Employee オブジェクトを ActionBlock<TInput> オブジェクトへポストします。 次に AddEmployees メソッドは、すべての挿入操作が終了するまで待機します。

バッファリングを使用した従業員データのデータベースへの追加

Program クラスに AddEmployeesBatched メソッドを追加します。

// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
   int count)
{
   // Create a BatchBlock<Employee> that holds several Employee objects and
   // then propagates them out as an array.
   var batchEmployees = new BatchBlock<Employee>(batchSize);

   // Create an ActionBlock<Employee[]> object that adds multiple
   // employee entries to the database.
   var insertEmployees = new ActionBlock<Employee[]>(a =>
      InsertEmployees(a, connectionString));

   // Link the batch block to the action block.
   batchEmployees.LinkTo(insertEmployees);

   // When the batch block completes, set the action block also to complete.
   batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

   // Post several random Employee objects to the batch block.
   PostRandomEmployees(batchEmployees, count);

   // Set the batch block to the completed state and wait for
   // all insert operations to complete.
   batchEmployees.Complete();
   insertEmployees.Completion.Wait();
}
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
    ' Create a BatchBlock<Employee> that holds several Employee objects and
    ' then propagates them out as an array.
    Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

    ' Create an ActionBlock<Employee[]> object that adds multiple
    ' employee entries to the database.
    Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

    ' Link the batch block to the action block.
    batchEmployees.LinkTo(insertEmployees)

    ' When the batch block completes, set the action block also to complete.
    batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

    ' Post several random Employee objects to the batch block.
    PostRandomEmployees(batchEmployees, count)

    ' Set the batch block to the completed state and wait for 
    ' all insert operations to complete.
    batchEmployees.Complete()
    insertEmployees.Completion.Wait()
End Sub

このメソッドは AddEmployees に似ていますが、BatchBlock<T> クラスを使用して複数の Employee オブジェクトをバッファリングしてから、それらのオブジェクトを ActionBlock<TInput> オブジェクトに送信する点が異なります。 BatchBlock<T> クラスは複数の要素を 1 つのコレクションとして反映させるため、ActionBlock<TInput> オブジェクトは Employee オブジェクトの配列として動作するように変更されます。 AddEmployees メソッドと同様に、AddEmployeesBatchedPostRandomEmployees メソッドを呼び出して複数の Employee オブジェクトをポストしますが、AddEmployeesBatched はこれらのオブジェクトを BatchBlock<T> オブジェクトにポストします。 また、AddEmployeesBatched メソッドは、すべての挿入操作が終了するまで待機もします。

バッファリングされた結合を使用した従業員データのデータベースからの読み込み

Program クラスに GetRandomEmployees メソッドを追加します。

// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
   int count)
{
   // Create a BatchedJoinBlock<Employee, Exception> object that holds
   // both employee and exception data.
   var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);

   // Holds the total number of exceptions that occurred.
   int totalErrors = 0;

   // Create an action block that prints employee and error information
   // to the console.
   var printEmployees =
      new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
      {
         // Print information about the employees in this batch.
         Console.WriteLine("Received a batch...");
         foreach (Employee e in data.Item1)
         {
            Console.WriteLine("Last={0} First={1} ID={2}",
               e.LastName, e.FirstName, e.EmployeeID);
         }

         // Print the error count for this batch.
         Console.WriteLine("There were {0} errors in this batch...",
            data.Item2.Count);

         // Update total error count.
         totalErrors += data.Item2.Count;
      });

   // Link the batched join block to the action block.
   selectEmployees.LinkTo(printEmployees);

   // When the batched join block completes, set the action block also to complete.
   selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });

   // Try to retrieve the ID for several random employees.
   Console.WriteLine("Selecting random entries from Employees table...");
   for (int i = 0; i < count; i++)
   {
      try
      {
         // Create a random employee.
         Employee e = Employee.Random();

         // Try to retrieve the ID for the employee from the database.
         e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);

         // Post the Employee object to the Employee target of
         // the batched join block.
         selectEmployees.Target1.Post(e);
      }
      catch (NullReferenceException e)
      {
         // GetEmployeeID throws NullReferenceException when there is
         // no such employee with the given name. When this happens,
         // post the Exception object to the Exception target of
         // the batched join block.
         selectEmployees.Target2.Post(e);
      }
   }

   // Set the batched join block to the completed state and wait for
   // all retrieval operations to complete.
   selectEmployees.Complete();
   printEmployees.Completion.Wait();

   // Print the total error count.
   Console.WriteLine("Finished. There were {0} total errors.", totalErrors);
}
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
    ' Create a BatchedJoinBlock<Employee, Exception> object that holds
    ' both employee and exception data.
    Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)

    ' Holds the total number of exceptions that occurred.
    Dim totalErrors As Integer = 0

    ' Create an action block that prints employee and error information
    ' to the console.
    Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
                                                                                                   ' Print information about the employees in this batch.
                                                                                                   ' Print the error count for this batch.
                                                                                                   ' Update total error count.
                                                                                                   Console.WriteLine("Received a batch...")
                                                                                                   For Each e As Employee In data.Item1
                                                                                                       Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
                                                                                                   Next e
                                                                                                   Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
                                                                                                   totalErrors += data.Item2.Count
                                                                                               End Sub)

    ' Link the batched join block to the action block.
    selectEmployees.LinkTo(printEmployees)

    ' When the batched join block completes, set the action block also to complete.
    selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())

    ' Try to retrieve the ID for several random employees.
    Console.WriteLine("Selecting random entries from Employees table...")
    For i As Integer = 0 To count - 1
        Try
            ' Create a random employee.
            Dim e As Employee = Employee.Random()

            ' Try to retrieve the ID for the employee from the database.
            e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)

            ' Post the Employee object to the Employee target of 
            ' the batched join block.
            selectEmployees.Target1.Post(e)
        Catch e As NullReferenceException
            ' GetEmployeeID throws NullReferenceException when there is 
            ' no such employee with the given name. When this happens,
            ' post the Exception object to the Exception target of
            ' the batched join block.
            selectEmployees.Target2.Post(e)
        End Try
    Next i

    ' Set the batched join block to the completed state and wait for 
    ' all retrieval operations to complete.
    selectEmployees.Complete()
    printEmployees.Completion.Wait()

    ' Print the total error count.
    Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub

このメソッドは任意の従業員の情報をコンソールに出力します。 それは複数の任意の Employee オブジェクトを作成し、GetEmployeeID メソッドを呼び出して、各オブジェクトの一意の識別子を取得します。 指定された姓名に一致する従業員がいない場合には GetEmployeeID メソッドは例外をスローするため、GetRandomEmployees メソッドは BatchedJoinBlock<T1,T2> クラスを使用して、Employee の呼び出しが成功した場合には GetEmployeeID オブジェクトを格納し、呼び出しが失敗した場合には System.Exception オブジェクトを格納します。 この例では、ActionBlock<TInput> オブジェクトは、Tuple<T1,T2> オブジェクトのリストと Employee オブジェクトのリストを持つ Exception オブジェクトで動作します。 BatchedJoinBlock<T1,T2> オブジェクトは、受け取った Employee および Exception オブジェクトの数の合計がバッチのサイズと等しくなったときに、このデータを反映します。

完全な例

完全なコード例を次に示します。 Main メソッドは、バッチされたデータベースの挿入の実行に必要な時間と、非バッチでのデータベースの挿入の実行時間を比較します。 また、バッファリングされた結合を使用した従業員データのデータベースから読み取りと、エラーの報告についても示します。

using System;
using System.Collections.Generic;
using System.Data.SqlServerCe;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use batched dataflow blocks to improve
// the performance of database operations.
namespace DataflowBatchDatabase
{
   class Program
   {
      // The number of employees to add to the database.
      // TODO: Change this value to experiment with different numbers of
      // employees to insert into the database.
      static readonly int insertCount = 256;

      // The size of a single batch of employees to add to the database.
      // TODO: Change this value to experiment with different batch sizes.
      static readonly int insertBatchSize = 96;

      // The source database file.
      // TODO: Change this value if Northwind.sdf is at a different location
      // on your computer.
      static readonly string sourceDatabase =
         @"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf";

      // TODO: Change this value if you require a different temporary location.
      static readonly string scratchDatabase =
         @"C:\Temp\Northwind.sdf";

      // Describes an employee. Each property maps to a
      // column in the Employees table in the Northwind database.
      // For brevity, the Employee class does not contain
      // all columns from the Employees table.
      class Employee
      {
         public int EmployeeID { get; set; }
         public string LastName { get; set; }
         public string FirstName { get; set; }

         // A random number generator that helps tp generate
         // Employee property values.
         static Random rand = new Random(42);

         // Possible random first names.
         static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
         // Possible random last names.
         static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };

         // Creates an Employee object that contains random
         // property values.
         public static Employee Random()
         {
            return new Employee
            {
               EmployeeID = -1,
               LastName = lastNames[rand.Next() % lastNames.Length],
               FirstName = firstNames[rand.Next() % firstNames.Length]
            };
         }
      }

      // Adds new employee records to the database.
      static void InsertEmployees(Employee[] employees, string connectionString)
      {
         using (SqlCeConnection connection =
            new SqlCeConnection(connectionString))
         {
            try
            {
               // Create the SQL command.
               SqlCeCommand command = new SqlCeCommand(
                  "INSERT INTO Employees ([Last Name], [First Name])" +
                  "VALUES (@lastName, @firstName)",
                  connection);

               connection.Open();
               for (int i = 0; i < employees.Length; i++)
               {
                  // Set parameters.
                  command.Parameters.Clear();
                  command.Parameters.Add("@lastName", employees[i].LastName);
                  command.Parameters.Add("@firstName", employees[i].FirstName);

                  // Execute the command.
                  command.ExecuteNonQuery();
               }
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Retrieves the number of entries in the Employees table in
      // the Northwind database.
      static int GetEmployeeCount(string connectionString)
      {
         int result = 0;
         using (SqlCeConnection sqlConnection =
            new SqlCeConnection(connectionString))
         {
            SqlCeCommand sqlCommand = new SqlCeCommand(
               "SELECT COUNT(*) FROM Employees", sqlConnection);

            sqlConnection.Open();
            try
            {
               result = (int)sqlCommand.ExecuteScalar();
            }
            finally
            {
               sqlConnection.Close();
            }
         }
         return result;
      }

      // Retrieves the ID of the first employee that has the provided name.
      static int GetEmployeeID(string lastName, string firstName,
         string connectionString)
      {
         using (SqlCeConnection connection =
            new SqlCeConnection(connectionString))
         {
            SqlCeCommand command = new SqlCeCommand(
               string.Format(
                  "SELECT [Employee ID] FROM Employees " +
                  "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
                  lastName, firstName),
               connection);

            connection.Open();
            try
            {
               return (int)command.ExecuteScalar();
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Posts random Employee data to the provided target block.
      static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
      {
         Console.WriteLine("Adding {0} entries to Employee table...", count);

         for (int i = 0; i < count; i++)
         {
            target.Post(Employee.Random());
         }
      }

      // Adds random employee data to the database by using dataflow.
      static void AddEmployees(string connectionString, int count)
      {
         // Create an ActionBlock<Employee> object that adds a single
         // employee entry to the database.
         var insertEmployee = new ActionBlock<Employee>(e =>
            InsertEmployees(new Employee[] { e }, connectionString));

         // Post several random Employee objects to the dataflow block.
         PostRandomEmployees(insertEmployee, count);

         // Set the dataflow block to the completed state and wait for
         // all insert operations to complete.
         insertEmployee.Complete();
         insertEmployee.Completion.Wait();
      }

      // Adds random employee data to the database by using dataflow.
      // This method is similar to AddEmployees except that it uses batching
      // to add multiple employees to the database at a time.
      static void AddEmployeesBatched(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchBlock<Employee> that holds several Employee objects and
         // then propagates them out as an array.
         var batchEmployees = new BatchBlock<Employee>(batchSize);

         // Create an ActionBlock<Employee[]> object that adds multiple
         // employee entries to the database.
         var insertEmployees = new ActionBlock<Employee[]>(a =>
            InsertEmployees(a, connectionString));

         // Link the batch block to the action block.
         batchEmployees.LinkTo(insertEmployees);

         // When the batch block completes, set the action block also to complete.
         batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

         // Post several random Employee objects to the batch block.
         PostRandomEmployees(batchEmployees, count);

         // Set the batch block to the completed state and wait for
         // all insert operations to complete.
         batchEmployees.Complete();
         insertEmployees.Completion.Wait();
      }

      // Displays information about several random employees to the console.
      static void GetRandomEmployees(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchedJoinBlock<Employee, Exception> object that holds
         // both employee and exception data.
         var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);

         // Holds the total number of exceptions that occurred.
         int totalErrors = 0;

         // Create an action block that prints employee and error information
         // to the console.
         var printEmployees =
            new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
            {
               // Print information about the employees in this batch.
               Console.WriteLine("Received a batch...");
               foreach (Employee e in data.Item1)
               {
                  Console.WriteLine("Last={0} First={1} ID={2}",
                     e.LastName, e.FirstName, e.EmployeeID);
               }

               // Print the error count for this batch.
               Console.WriteLine("There were {0} errors in this batch...",
                  data.Item2.Count);

               // Update total error count.
               totalErrors += data.Item2.Count;
            });

         // Link the batched join block to the action block.
         selectEmployees.LinkTo(printEmployees);

         // When the batched join block completes, set the action block also to complete.
         selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });

         // Try to retrieve the ID for several random employees.
         Console.WriteLine("Selecting random entries from Employees table...");
         for (int i = 0; i < count; i++)
         {
            try
            {
               // Create a random employee.
               Employee e = Employee.Random();

               // Try to retrieve the ID for the employee from the database.
               e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);

               // Post the Employee object to the Employee target of
               // the batched join block.
               selectEmployees.Target1.Post(e);
            }
            catch (NullReferenceException e)
            {
               // GetEmployeeID throws NullReferenceException when there is
               // no such employee with the given name. When this happens,
               // post the Exception object to the Exception target of
               // the batched join block.
               selectEmployees.Target2.Post(e);
            }
         }

         // Set the batched join block to the completed state and wait for
         // all retrieval operations to complete.
         selectEmployees.Complete();
         printEmployees.Completion.Wait();

         // Print the total error count.
         Console.WriteLine("Finished. There were {0} total errors.", totalErrors);
      }

      static void Main(string[] args)
      {
         // Create a connection string for accessing the database.
         // The connection string refers to the temporary database location.
         string connectionString = string.Format(@"Data Source={0}",
            scratchDatabase);

         // Create a Stopwatch object to time database insert operations.
         Stopwatch stopwatch = new Stopwatch();

         // Start with a clean database file by copying the source database to
         // the temporary location.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple insert operations without batching.
         Console.WriteLine("Demonstrating non-batched database insert operations...");
         Console.WriteLine("Original size of Employee table: {0}.",
            GetEmployeeCount(connectionString));
         stopwatch.Start();
         AddEmployees(connectionString, insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple insert operations, this time with batching.
         Console.WriteLine("Demonstrating batched database insert operations...");
         Console.WriteLine("Original size of Employee table: {0}.",
            GetEmployeeCount(connectionString));
         stopwatch.Restart();
         AddEmployeesBatched(connectionString, insertBatchSize, insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple retrieval operations with error reporting.
         Console.WriteLine("Demonstrating batched join database select operations...");
         // Add a small number of employees to the database.
         AddEmployeesBatched(connectionString, insertBatchSize, 16);
         // Query for random employees.
         GetRandomEmployees(connectionString, insertBatchSize, 10);
      }
   }
}
/* Sample output:
Demonstrating non-batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 11035 ms.

Demonstrating batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 197 ms.

Demonstrating batched join database insert operations...
Adding 16 entries to Employee table...
Selecting items from Employee table...
Received a batch...
Last=Jones First=Tom ID=21
Last=Jones First=John ID=24
Last=Smith First=Tom ID=26
Last=Jones First=Tom ID=21
There were 4 errors in this batch...
Received a batch...
Last=Smith First=Tom ID=26
Last=Jones First=Mike ID=28
There were 0 errors in this batch...
Finished. There were 4 total errors.
*/
Imports System.Collections.Generic
Imports System.Data.SqlServerCe
Imports System.Diagnostics
Imports System.IO
Imports System.Threading.Tasks.Dataflow


' Demonstrates how to use batched dataflow blocks to improve
' the performance of database operations.
Namespace DataflowBatchDatabase
    Friend Class Program
        ' The number of employees to add to the database.
        ' TODO: Change this value to experiment with different numbers of 
        ' employees to insert into the database.
        Private Shared ReadOnly insertCount As Integer = 256

        ' The size of a single batch of employees to add to the database.
        ' TODO: Change this value to experiment with different batch sizes.
        Private Shared ReadOnly insertBatchSize As Integer = 96

        ' The source database file.
        ' TODO: Change this value if Northwind.sdf is at a different location
        ' on your computer.
        Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"

        ' TODO: Change this value if you require a different temporary location.
        Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"

        ' Describes an employee. Each property maps to a 
        ' column in the Employees table in the Northwind database.
        ' For brevity, the Employee class does not contain
        ' all columns from the Employees table.
        Private Class Employee
            Public Property EmployeeID() As Integer
            Public Property LastName() As String
            Public Property FirstName() As String

            ' A random number generator that helps tp generate
            ' Employee property values.
            Private Shared rand As New Random(42)

            ' Possible random first names.
            Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
            ' Possible random last names.
            Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}

            ' Creates an Employee object that contains random 
            ' property values.
            Public Shared Function Random() As Employee
                Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
            End Function
        End Class

        ' Adds new employee records to the database.
        Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
            Using connection As New SqlCeConnection(connectionString)
                Try
                    ' Create the SQL command.
                    Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

                    connection.Open()
                    For i As Integer = 0 To employees.Length - 1
                        ' Set parameters.
                        command.Parameters.Clear()
                        command.Parameters.Add("@lastName", employees(i).LastName)
                        command.Parameters.Add("@firstName", employees(i).FirstName)

                        ' Execute the command.
                        command.ExecuteNonQuery()
                    Next i
                Finally
                    connection.Close()
                End Try
            End Using
        End Sub

        ' Retrieves the number of entries in the Employees table in 
        ' the Northwind database.
        Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
            Dim result As Integer = 0
            Using sqlConnection As New SqlCeConnection(connectionString)
                Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

                sqlConnection.Open()
                Try
                    result = CInt(Fix(sqlCommand.ExecuteScalar()))
                Finally
                    sqlConnection.Close()
                End Try
            End Using
            Return result
        End Function

        ' Retrieves the ID of the first employee that has the provided name.
        Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
            Using connection As New SqlCeConnection(connectionString)
                Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)

                connection.Open()
                Try
                    Return CInt(Fix(command.ExecuteScalar()))
                Finally
                    connection.Close()
                End Try
            End Using
        End Function

        ' Posts random Employee data to the provided target block.
        Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
            Console.WriteLine("Adding {0} entries to Employee table...", count)

            For i As Integer = 0 To count - 1
                target.Post(Employee.Random())
            Next i
        End Sub

        ' Adds random employee data to the database by using dataflow.
        Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
            ' Create an ActionBlock<Employee> object that adds a single
            ' employee entry to the database.
            Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))

            ' Post several random Employee objects to the dataflow block.
            PostRandomEmployees(insertEmployee, count)

            ' Set the dataflow block to the completed state and wait for 
            ' all insert operations to complete.
            insertEmployee.Complete()
            insertEmployee.Completion.Wait()
        End Sub

        ' Adds random employee data to the database by using dataflow.
        ' This method is similar to AddEmployees except that it uses batching
        ' to add multiple employees to the database at a time.
        Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
            ' Create a BatchBlock<Employee> that holds several Employee objects and
            ' then propagates them out as an array.
            Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

            ' Create an ActionBlock<Employee[]> object that adds multiple
            ' employee entries to the database.
            Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

            ' Link the batch block to the action block.
            batchEmployees.LinkTo(insertEmployees)

            ' When the batch block completes, set the action block also to complete.
            batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

            ' Post several random Employee objects to the batch block.
            PostRandomEmployees(batchEmployees, count)

            ' Set the batch block to the completed state and wait for 
            ' all insert operations to complete.
            batchEmployees.Complete()
            insertEmployees.Completion.Wait()
        End Sub

        ' Displays information about several random employees to the console.
        Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
            ' Create a BatchedJoinBlock<Employee, Exception> object that holds
            ' both employee and exception data.
            Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)

            ' Holds the total number of exceptions that occurred.
            Dim totalErrors As Integer = 0

            ' Create an action block that prints employee and error information
            ' to the console.
            Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
                                                                                                           ' Print information about the employees in this batch.
                                                                                                           ' Print the error count for this batch.
                                                                                                           ' Update total error count.
                                                                                                           Console.WriteLine("Received a batch...")
                                                                                                           For Each e As Employee In data.Item1
                                                                                                               Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
                                                                                                           Next e
                                                                                                           Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
                                                                                                           totalErrors += data.Item2.Count
                                                                                                       End Sub)

            ' Link the batched join block to the action block.
            selectEmployees.LinkTo(printEmployees)

            ' When the batched join block completes, set the action block also to complete.
            selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())

            ' Try to retrieve the ID for several random employees.
            Console.WriteLine("Selecting random entries from Employees table...")
            For i As Integer = 0 To count - 1
                Try
                    ' Create a random employee.
                    Dim e As Employee = Employee.Random()

                    ' Try to retrieve the ID for the employee from the database.
                    e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)

                    ' Post the Employee object to the Employee target of 
                    ' the batched join block.
                    selectEmployees.Target1.Post(e)
                Catch e As NullReferenceException
                    ' GetEmployeeID throws NullReferenceException when there is 
                    ' no such employee with the given name. When this happens,
                    ' post the Exception object to the Exception target of
                    ' the batched join block.
                    selectEmployees.Target2.Post(e)
                End Try
            Next i

            ' Set the batched join block to the completed state and wait for 
            ' all retrieval operations to complete.
            selectEmployees.Complete()
            printEmployees.Completion.Wait()

            ' Print the total error count.
            Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
        End Sub

        Shared Sub Main(ByVal args() As String)
            ' Create a connection string for accessing the database.
            ' The connection string refers to the temporary database location.
            Dim connectionString As String = String.Format("Data Source={0}", scratchDatabase)

            ' Create a Stopwatch object to time database insert operations.
            Dim stopwatch As New Stopwatch()

            ' Start with a clean database file by copying the source database to 
            ' the temporary location.
            File.Copy(sourceDatabase, scratchDatabase, True)

            ' Demonstrate multiple insert operations without batching.
            Console.WriteLine("Demonstrating non-batched database insert operations...")
            Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
            stopwatch.Start()
            AddEmployees(connectionString, insertCount)
            stopwatch.Stop()
            Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

            Console.WriteLine()

            ' Start again with a clean database file.
            File.Copy(sourceDatabase, scratchDatabase, True)

            ' Demonstrate multiple insert operations, this time with batching.
            Console.WriteLine("Demonstrating batched database insert operations...")
            Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
            stopwatch.Restart()
            AddEmployeesBatched(connectionString, insertBatchSize, insertCount)
            stopwatch.Stop()
            Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

            Console.WriteLine()

            ' Start again with a clean database file.
            File.Copy(sourceDatabase, scratchDatabase, True)

            ' Demonstrate multiple retrieval operations with error reporting.
            Console.WriteLine("Demonstrating batched join database select operations...")
            ' Add a small number of employees to the database.
            AddEmployeesBatched(connectionString, insertBatchSize, 16)
            ' Query for random employees.
            GetRandomEmployees(connectionString, insertBatchSize, 10)
        End Sub
    End Class
End Namespace
' Sample output:
'Demonstrating non-batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 11035 ms.
'
'Demonstrating batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 197 ms.
'
'Demonstrating batched join database insert operations...
'Adding 16 entries to Employee table...
'Selecting items from Employee table...
'Received a batch...
'Last=Jones First=Tom ID=21
'Last=Jones First=John ID=24
'Last=Smith First=Tom ID=26
'Last=Jones First=Tom ID=21
'There were 4 errors in this batch...
'Received a batch...
'Last=Smith First=Tom ID=26
'Last=Jones First=Mike ID=28
'There were 0 errors in this batch...
'Finished. There were 4 total errors.
'

関連項目