Panduan: Menggunakan BatchBlock dan BatchedJoinBlock untuk Meningkatkan Efisiensi

Pustaka Aliran Data TPL menyediakan System.Threading.Tasks.Dataflow.BatchBlock<T> kelas dan System.Threading.Tasks.Dataflow.BatchedJoinBlock<T1,T2> sehingga Anda dapat menerima dan menyangga data dari satu atau beberapa sumber lalu menyebarluaskan data buffer tersebut sebagai satu koleksi. Mekanisme batching ini berguna saat Anda mengumpulkan data dari satu atau beberapa sumber lalu memproses beberapa elemen data sebagai batch. Misalnya, pertimbangkan aplikasi yang menggunakan aliran data untuk menyisipkan rekaman ke dalam database. Operasi ini bisa lebih efisien jika beberapa item disisipkan pada saat yang sama alih-alih satu per satu secara berurutan. Dokumen ini menjelaskan cara menggunakan BatchBlock<T> kelas untuk meningkatkan efisiensi operasi penyisipan database tersebut. Ini juga menjelaskan cara menggunakan BatchedJoinBlock<T1,T2> kelas untuk mengambil hasil dan pengecualian apa pun yang terjadi ketika program membaca dari database.

Nota

Pustaka aliran data TPL (System.Threading.Tasks.Dataflow namespace) disertakan dalam .NET 6 dan versi yang lebih baru. Untuk proyek .NET Framework dan .NET Standard, Anda perlu menginstal 📦 paket System.Threading.Tasks.Dataflow NuGet.

Prasyarat

  1. Baca bagian Penggabungan Blok di dokumen Dataflow sebelum Anda memulai ulasan ini.

  2. Pastikan Anda memiliki salinan database Northwind, Northwind.sdf, yang tersedia di komputer Anda.

    Penting

    Di beberapa versi Windows, Anda tidak dapat tersambung ke Northwind.sdf jika Visual Studio berjalan dalam mode non-administrator. Untuk menyambungkan ke Northwind.sdf, mulai Visual Studio atau Perintah Pengembang untuk Visual Studio dalam mode Jalankan sebagai administrator .

Membuat Aplikasi Konsol

  1. Di Visual Studio, buat proyek Aplikasi Visual C# atau Visual Basic Console . Dalam dokumen ini, proyek diberi nama DataflowBatchDatabase.

  2. Pastikan Form1.cs (Form1.vb untuk Visual Basic) berisi pernyataan berikut ( usingImports di Visual Basic).

    using System;
    using System.Collections.Generic;
    using System.Data.SqlClient;
    using System.Diagnostics;
    using System.IO;
    using System.Threading.Tasks.Dataflow;
    
    Imports System.Data
    Imports System.Data.SqlClient
    Imports System.IO
    Imports System.Threading.Tasks.Dataflow
    
    
  3. Tambahkan anggota data berikut ke Program kelas .

    // The number of employees to add to the database.
    // TODO: Change this value to experiment with different numbers of
    // employees to insert into the database.
    static readonly int s_insertCount = 256;
    
    // The size of a single batch of employees to add to the database.
    // TODO: Change this value to experiment with different batch sizes.
    static readonly int s_insertBatchSize = 96;
    
    // The source database file.
    // TODO: Change this value if Northwind.sdf is at a different location
    // on your computer.
    static readonly string s_sourceDatabase = @"C:\...\Northwind.sdf";
    
    // TODO: Change this value if you require a different temporary location.
    static readonly string s_scratchDatabase = @"C:\Temp\Northwind.sdf";
    
    ' The number of employees to add to the database.
    ' TODO: Change this value to experiment with different numbers of 
    ' employees to insert into the database.
    Private Shared ReadOnly s_insertCount As Integer = 256
    
    ' The size of a single batch of employees to add to the database.
    ' TODO: Change this value to experiment with different batch sizes.
    Private Shared ReadOnly s_insertBatchSize As Integer = 96
    
    ' The source database file.
    ' TODO: Change this value if Northwind.sdf is at a different location
    ' on your computer.
    Private Shared ReadOnly s_sourceDatabase As String =
        "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"
    
    ' TODO: Change this value if you require a different temporary location.
    Private Shared ReadOnly s_scratchDatabase As String =
        "C:\Temp\Northwind.sdf"
    

Mendefinisikan Kelas Karyawan

Tambahkan kelas Employee ke kelas Program.

// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
   public int EmployeeID { get; set; }
   public string LastName { get; set; }
   public string FirstName { get; set; }

   // A random number generator that helps tp generate
   // Employee property values.
   static readonly Random s_rand = new Random(42);

   // Possible random first names.
   static readonly string[] s_firstNames =
          { "Tom", "Mike", "Ruth", "Bob", "John" };
   // Possible random last names.
   static readonly string[] s_lastNames =
          { "Jones", "Smith", "Johnson", "Walker" };

   // Creates an Employee object that contains random
   // property values.
   public static Employee Random()
   {
      return new Employee
      {
         EmployeeID = -1,
         LastName = s_lastNames[s_rand.Next() % s_lastNames.Length],
         FirstName = s_firstNames[s_rand.Next() % s_firstNames.Length]
      };
   }
}
' Describes an employee. Each property maps to a 
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
    Public Property EmployeeID() As Integer
    Public Property LastName() As String
    Public Property FirstName() As String

    ' A random number generator that helps tp generate
    ' Employee property values.
    Private Shared ReadOnly s_rand As New Random(42)

    ' Possible random first names.
    Private Shared ReadOnly s_firstNames() As String =
        {"Tom", "Mike", "Ruth", "Bob", "John"}
    ' Possible random last names.
    Private Shared ReadOnly s_lastNames() As String =
        {"Jones", "Smith", "Johnson", "Walker"}

    ' Creates an Employee object that contains random 
    ' property values.
    Public Shared Function Random() As Employee
        Return New Employee With {
            .EmployeeID = -1,
            .LastName = s_lastNames(s_rand.Next() Mod s_lastNames.Length),
            .FirstName = s_firstNames(s_rand.Next() Mod s_firstNames.Length)
        }
    End Function
End Class

Kelas Employee berisi tiga properti, EmployeeID, LastName, dan FirstName. Properti ini sesuai dengan kolom Employee ID, Last Name, dan First Name pada tabel Employees di database Northwind. Untuk demonstrasi ini, Employee kelas juga mendefinisikan Random metode , yang membuat Employee objek yang memiliki nilai acak untuk propertinya.

Menentukan Operasi Database Karyawan

Tambahkan metode InsertEmployees, GetEmployeeCount, dan GetEmployeeID ke kelas Program.

// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
   using (SqlConnection connection =
      new SqlConnection(connectionString))
   {
      try
      {
         // Create the SQL command.
         SqlCommand command = new SqlCommand(
            "INSERT INTO Employees ([Last Name], [First Name])" +
            "VALUES (@lastName, @firstName)",
            connection);

         connection.Open();
         for (int i = 0; i < employees.Length; i++)
         {
            // Set parameters.
            command.Parameters.Clear();
            command.Parameters.AddWithValue("@lastName", employees[i].LastName);
            command.Parameters.AddWithValue("@firstName", employees[i].FirstName);

            // Execute the command.
            command.ExecuteNonQuery();
         }
      }
      finally
      {
         connection.Close();
      }
   }
}

// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
   int result = 0;
   using (SqlConnection sqlConnection =
      new SqlConnection(connectionString))
   {
      SqlCommand sqlCommand = new SqlCommand(
         "SELECT COUNT(*) FROM Employees", sqlConnection);

      sqlConnection.Open();
      try
      {
         result = (int)sqlCommand.ExecuteScalar();
      }
      finally
      {
         sqlConnection.Close();
      }
   }
   return result;
}

// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
   string connectionString)
{
   using (SqlConnection connection =
      new SqlConnection(connectionString))
   {
      SqlCommand command = new SqlCommand(
         "SELECT [Employee ID] FROM Employees " +
         "WHERE [Last Name] = @lastName AND [First Name] = @firstName",
         connection);

      command.Parameters.Add("@lastName", System.Data.SqlDbType.NVarChar).Value = lastName;
      command.Parameters.Add("@firstName", System.Data.SqlDbType.NVarChar).Value = firstName;

      connection.Open();
      try
      {
         return (int)command.ExecuteScalar();
      }
      finally
      {
         connection.Close();
      }
   }
}
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
    Using connection As New SqlConnection(connectionString)
        Try
            ' Create the SQL command.
            Dim command As New SqlCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

            connection.Open()
            For i As Integer = 0 To employees.Length - 1
                ' Set parameters.
                command.Parameters.Clear()
                command.Parameters.Add("@lastName", SqlDbType.NVarChar).Value = employees(i).LastName
                command.Parameters.Add("@firstName", SqlDbType.NVarChar).Value = employees(i).FirstName

                ' Execute the command.
                command.ExecuteNonQuery()
            Next i
        Finally
            connection.Close()
        End Try
    End Using
End Sub

' Retrieves the number of entries in the Employees table in 
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
    Dim result As Integer = 0
    Using sqlConnection As New SqlConnection(connectionString)
        Dim sqlCommand As New SqlCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

        sqlConnection.Open()
        Try
            result = CInt(Fix(sqlCommand.ExecuteScalar()))
        Finally
            sqlConnection.Close()
        End Try
    End Using
    Return result
End Function

' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
    Using connection As New SqlConnection(connectionString)
        Dim query As String = "SELECT [Employee ID] FROM Employees WHERE [Last Name] = @lastName AND [First Name] = @firstName"
        Dim command As New SqlCommand(query, connection)
        command.Parameters.Add("@lastName", lastName)
        command.Parameters.Add("@firstName", firstName)

        connection.Open()
        Try
            Return CInt(Fix(command.ExecuteScalar()))
        Finally
            connection.Close()
        End Try
    End Using
End Function

Metode ini menambahkan catatan karyawan baru ke database. Metode GetEmployeeCount mengambil entri dalam tabel Employees. Metode GetEmployeeID mengambil ID karyawan pertama yang memiliki nama yang telah ditentukan. Masing-masing metode ini mengambil string koneksi ke database Northwind.

Menambahkan Data Karyawan ke Database Tanpa Menggunakan Buffering

Tambahkan metode AddEmployees dan PostRandomEmployees ke kelas Program.

// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
   Console.WriteLine($"Adding {count} entries to Employee table...");

   for (int i = 0; i < count; i++)
   {
      target.Post(Employee.Random());
   }
}

// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
   // Create an ActionBlock<Employee> object that adds a single
   // employee entry to the database.
   var insertEmployee = new ActionBlock<Employee>(e =>
      InsertEmployees(new Employee[] { e }, connectionString));

   // Post several random Employee objects to the dataflow block.
   PostRandomEmployees(insertEmployee, count);

   // Set the dataflow block to the completed state and wait for
   // all insert operations to complete.
   insertEmployee.Complete();
   insertEmployee.Completion.Wait();
}
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
    Console.WriteLine("Adding {0} entries to Employee table...", count)

    For i As Integer = 0 To count - 1
        target.Post(Employee.Random())
    Next i
End Sub

' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
    ' Create an ActionBlock<Employee> object that adds a single
    ' employee entry to the database.
    Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))

    ' Post several random Employee objects to the dataflow block.
    PostRandomEmployees(insertEmployee, count)

    ' Set the dataflow block to the completed state and wait for 
    ' all insert operations to complete.
    insertEmployee.Complete()
    insertEmployee.Completion.Wait()
End Sub

Metode ini AddEmployees menambahkan data karyawan acak ke database dengan menggunakan aliran data. Ini membuat ActionBlock<TInput> objek yang memanggil InsertEmployees metode untuk menambahkan entri karyawan ke database. Kemudian, metode AddEmployees memanggil metode PostRandomEmployees untuk memposting beberapa objek Employee ke objek ActionBlock<TInput>. Metode AddEmployees kemudian menunggu semua operasi sisipan selesai.

Menggunakan Buffering untuk Menambahkan Data Karyawan ke Database

Tambahkan metode AddEmployeesBatched ke kelas Program.

// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
   int count)
{
   // Create a BatchBlock<Employee> that holds several Employee objects and
   // then propagates them out as an array.
   var batchEmployees = new BatchBlock<Employee>(batchSize);

   // Create an ActionBlock<Employee[]> object that adds multiple
   // employee entries to the database.
   var insertEmployees = new ActionBlock<Employee[]>(a =>
      InsertEmployees(a, connectionString));

   // Link the batch block to the action block.
   batchEmployees.LinkTo(insertEmployees);

   // When the batch block completes, set the action block also to complete.
   batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

   // Post several random Employee objects to the batch block.
   PostRandomEmployees(batchEmployees, count);

   // Set the batch block to the completed state and wait for
   // all insert operations to complete.
   batchEmployees.Complete();
   insertEmployees.Completion.Wait();
}
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
    ' Create a BatchBlock<Employee> that holds several Employee objects and
    ' then propagates them out as an array.
    Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

    ' Create an ActionBlock<Employee[]> object that adds multiple
    ' employee entries to the database.
    Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

    ' Link the batch block to the action block.
    batchEmployees.LinkTo(insertEmployees)

    ' When the batch block completes, set the action block also to complete.
    batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

    ' Post several random Employee objects to the batch block.
    PostRandomEmployees(batchEmployees, count)

    ' Set the batch block to the completed state and wait for 
    ' all insert operations to complete.
    batchEmployees.Complete()
    insertEmployees.Completion.Wait()
End Sub

Metode ini mirip AddEmployees, kecuali bahwa metode ini juga menggunakan BatchBlock<T> kelas untuk menyangga beberapa Employee objek sebelum mengirim objek tersebut ke ActionBlock<TInput> objek. Karena kelas BatchBlock<T> mengpropagasi beberapa elemen sebagai koleksi, objek ActionBlock<TInput> dimodifikasi untuk bertindak pada array objek Employee. Seperti dalam metode AddEmployees, AddEmployeesBatched memanggil metode PostRandomEmployees untuk memposting beberapa objek Employee; namun, AddEmployeesBatched memposting objek ini ke objek BatchBlock<T>. Metode ini AddEmployeesBatched juga menunggu semua operasi sisipan selesai.

Menggunakan Join Berpenyangga untuk Membaca Data Karyawan dari Database

Tambahkan metode GetRandomEmployees ke kelas Program.

// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
   int count)
{
   // Create a BatchedJoinBlock<Employee, Exception> object that holds
   // both employee and exception data.
   var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);

   // Holds the total number of exceptions that occurred.
   int totalErrors = 0;

   // Create an action block that prints employee and error information
   // to the console.
   var printEmployees =
      new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
      {
         // Print information about the employees in this batch.
         Console.WriteLine("Received a batch...");
         foreach (Employee e in data.Item1)
         {
            Console.WriteLine($"Last={e.LastName} First={e.FirstName} ID={e.EmployeeID}");
         }

         // Print the error count for this batch.
         Console.WriteLine($"There were {data.Item2.Count} errors in this batch...");

         // Update total error count.
         totalErrors += data.Item2.Count;
      });

   // Link the batched join block to the action block.
   selectEmployees.LinkTo(printEmployees);

   // When the batched join block completes, set the action block also to complete.
   selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });

   // Try to retrieve the ID for several random employees.
   Console.WriteLine("Selecting random entries from Employees table...");
   for (int i = 0; i < count; i++)
   {
      try
      {
         // Create a random employee.
         Employee e = Employee.Random();

         // Try to retrieve the ID for the employee from the database.
         e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);

         // Post the Employee object to the Employee target of
         // the batched join block.
         selectEmployees.Target1.Post(e);
      }
      catch (NullReferenceException e)
      {
         // GetEmployeeID throws NullReferenceException when there is
         // no such employee with the given name. When this happens,
         // post the Exception object to the Exception target of
         // the batched join block.
         selectEmployees.Target2.Post(e);
      }
   }

   // Set the batched join block to the completed state and wait for
   // all retrieval operations to complete.
   selectEmployees.Complete();
   printEmployees.Completion.Wait();

   // Print the total error count.
   Console.WriteLine($"Finished. There were {totalErrors} total errors.");
}
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
    ' Create a BatchedJoinBlock<Employee, Exception> object that holds
    ' both employee and exception data.
    Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)

    ' Holds the total number of exceptions that occurred.
    Dim totalErrors As Integer = 0

    ' Create an action block that prints employee and error information
    ' to the console.
    Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(
        Sub(data)
            ' Print information about the employees in this batch.
            ' Print the error count for this batch.
            ' Update total error count.
            Console.WriteLine("Received a batch...")
            For Each e As Employee In data.Item1
                Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
            Next e
            Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
            totalErrors += data.Item2.Count
        End Sub)

    ' Link the batched join block to the action block.
    selectEmployees.LinkTo(printEmployees)

    ' When the batched join block completes, set the action block also to complete.
    selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())

    ' Try to retrieve the ID for several random employees.
    Console.WriteLine("Selecting random entries from Employees table...")
    For i As Integer = 0 To count - 1
        Try
            ' Create a random employee.
            Dim e As Employee = Employee.Random()

            ' Try to retrieve the ID for the employee from the database.
            e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)

            ' Post the Employee object to the Employee target of 
            ' the batched join block.
            selectEmployees.Target1.Post(e)
        Catch e As NullReferenceException
            ' GetEmployeeID throws NullReferenceException when there is 
            ' no such employee with the given name. When this happens,
            ' post the Exception object to the Exception target of
            ' the batched join block.
            selectEmployees.Target2.Post(e)
        End Try
    Next i

    ' Set the batched join block to the completed state and wait for 
    ' all retrieval operations to complete.
    selectEmployees.Complete()
    printEmployees.Completion.Wait()

    ' Print the total error count.
    Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub

Metode ini mencetak informasi tentang karyawan acak ke konsol. Ini membuat beberapa objek acak Employee dan memanggil metode GetEmployeeID untuk mendapatkan pengidentifikasi unik untuk setiap objek. Karena metode GetEmployeeID melemparkan pengecualian jika tidak ada karyawan yang cocok dengan nama depan dan belakang yang diberikan, metode GetRandomEmployees menggunakan kelas BatchedJoinBlock<T1,T2> untuk menyimpan objek Employee untuk panggilan yang berhasil ke GetEmployeeID dan objek System.Exception untuk panggilan yang gagal. Objek ActionBlock<TInput> dalam contoh ini bertindak pada Tuple<T1,T2> objek yang menyimpan daftar Employee objek dan daftar Exception objek. Objek BatchedJoinBlock<T1,T2> menyebarluaskan data ini ketika jumlah objek yang diterima dari Employee dan Exception sama dengan ukuran batch.

Contoh Lengkap

Contoh berikut menunjukkan kode lengkap. Metode Main membandingkan waktu yang diperlukan untuk melakukan penyisipan database dalam bentuk batch versus waktu untuk melakukan penyisipan database yang tidak dalam bentuk batch. Ini juga menunjukkan penggunaan gabungan buffer untuk membaca data karyawan dari database dan juga melaporkan kesalahan.

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use batched dataflow blocks to improve
// the performance of database operations.
namespace DataflowBatchDatabase
{
   class Program
   {
      // The number of employees to add to the database.
      // TODO: Change this value to experiment with different numbers of
      // employees to insert into the database.
      static readonly int s_insertCount = 256;

      // The size of a single batch of employees to add to the database.
      // TODO: Change this value to experiment with different batch sizes.
      static readonly int s_insertBatchSize = 96;

      // The source database file.
      // TODO: Change this value if Northwind.sdf is at a different location
      // on your computer.
      static readonly string s_sourceDatabase = @"C:\...\Northwind.sdf";

      // TODO: Change this value if you require a different temporary location.
      static readonly string s_scratchDatabase = @"C:\Temp\Northwind.sdf";

      // Describes an employee. Each property maps to a
      // column in the Employees table in the Northwind database.
      // For brevity, the Employee class does not contain
      // all columns from the Employees table.
      class Employee
      {
         public int EmployeeID { get; set; }
         public string LastName { get; set; }
         public string FirstName { get; set; }

         // A random number generator that helps tp generate
         // Employee property values.
         static readonly Random s_rand = new Random(42);

         // Possible random first names.
         static readonly string[] s_firstNames =
                { "Tom", "Mike", "Ruth", "Bob", "John" };
         // Possible random last names.
         static readonly string[] s_lastNames =
                { "Jones", "Smith", "Johnson", "Walker" };

         // Creates an Employee object that contains random
         // property values.
         public static Employee Random()
         {
            return new Employee
            {
               EmployeeID = -1,
               LastName = s_lastNames[s_rand.Next() % s_lastNames.Length],
               FirstName = s_firstNames[s_rand.Next() % s_firstNames.Length]
            };
         }
      }

      // Adds new employee records to the database.
      static void InsertEmployees(Employee[] employees, string connectionString)
      {
         using (SqlConnection connection =
            new SqlConnection(connectionString))
         {
            try
            {
               // Create the SQL command.
               SqlCommand command = new SqlCommand(
                  "INSERT INTO Employees ([Last Name], [First Name])" +
                  "VALUES (@lastName, @firstName)",
                  connection);

               connection.Open();
               for (int i = 0; i < employees.Length; i++)
               {
                  // Set parameters.
                  command.Parameters.Clear();
                  command.Parameters.AddWithValue("@lastName", employees[i].LastName);
                  command.Parameters.AddWithValue("@firstName", employees[i].FirstName);

                  // Execute the command.
                  command.ExecuteNonQuery();
               }
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Retrieves the number of entries in the Employees table in
      // the Northwind database.
      static int GetEmployeeCount(string connectionString)
      {
         int result = 0;
         using (SqlConnection sqlConnection =
            new SqlConnection(connectionString))
         {
            SqlCommand sqlCommand = new SqlCommand(
               "SELECT COUNT(*) FROM Employees", sqlConnection);

            sqlConnection.Open();
            try
            {
               result = (int)sqlCommand.ExecuteScalar();
            }
            finally
            {
               sqlConnection.Close();
            }
         }
         return result;
      }

      // Retrieves the ID of the first employee that has the provided name.
      static int GetEmployeeID(string lastName, string firstName,
         string connectionString)
      {
         using (SqlConnection connection =
            new SqlConnection(connectionString))
         {
            SqlCommand command = new SqlCommand(
               "SELECT [Employee ID] FROM Employees " +
               "WHERE [Last Name] = @lastName AND [First Name] = @firstName",
               connection);

            command.Parameters.Add("@lastName", System.Data.SqlDbType.NVarChar).Value = lastName;
            command.Parameters.Add("@firstName", System.Data.SqlDbType.NVarChar).Value = firstName;

            connection.Open();
            try
            {
               return (int)command.ExecuteScalar();
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Posts random Employee data to the provided target block.
      static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
      {
         Console.WriteLine($"Adding {count} entries to Employee table...");

         for (int i = 0; i < count; i++)
         {
            target.Post(Employee.Random());
         }
      }

      // Adds random employee data to the database by using dataflow.
      static void AddEmployees(string connectionString, int count)
      {
         // Create an ActionBlock<Employee> object that adds a single
         // employee entry to the database.
         var insertEmployee = new ActionBlock<Employee>(e =>
            InsertEmployees(new Employee[] { e }, connectionString));

         // Post several random Employee objects to the dataflow block.
         PostRandomEmployees(insertEmployee, count);

         // Set the dataflow block to the completed state and wait for
         // all insert operations to complete.
         insertEmployee.Complete();
         insertEmployee.Completion.Wait();
      }

      // Adds random employee data to the database by using dataflow.
      // This method is similar to AddEmployees except that it uses batching
      // to add multiple employees to the database at a time.
      static void AddEmployeesBatched(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchBlock<Employee> that holds several Employee objects and
         // then propagates them out as an array.
         var batchEmployees = new BatchBlock<Employee>(batchSize);

         // Create an ActionBlock<Employee[]> object that adds multiple
         // employee entries to the database.
         var insertEmployees = new ActionBlock<Employee[]>(a =>
            InsertEmployees(a, connectionString));

         // Link the batch block to the action block.
         batchEmployees.LinkTo(insertEmployees);

         // When the batch block completes, set the action block also to complete.
         batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

         // Post several random Employee objects to the batch block.
         PostRandomEmployees(batchEmployees, count);

         // Set the batch block to the completed state and wait for
         // all insert operations to complete.
         batchEmployees.Complete();
         insertEmployees.Completion.Wait();
      }

      // Displays information about several random employees to the console.
      static void GetRandomEmployees(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchedJoinBlock<Employee, Exception> object that holds
         // both employee and exception data.
         var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);

         // Holds the total number of exceptions that occurred.
         int totalErrors = 0;

         // Create an action block that prints employee and error information
         // to the console.
         var printEmployees =
            new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
            {
               // Print information about the employees in this batch.
               Console.WriteLine("Received a batch...");
               foreach (Employee e in data.Item1)
               {
                  Console.WriteLine($"Last={e.LastName} First={e.FirstName} ID={e.EmployeeID}");
               }

               // Print the error count for this batch.
               Console.WriteLine($"There were {data.Item2.Count} errors in this batch...");

               // Update total error count.
               totalErrors += data.Item2.Count;
            });

         // Link the batched join block to the action block.
         selectEmployees.LinkTo(printEmployees);

         // When the batched join block completes, set the action block also to complete.
         selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });

         // Try to retrieve the ID for several random employees.
         Console.WriteLine("Selecting random entries from Employees table...");
         for (int i = 0; i < count; i++)
         {
            try
            {
               // Create a random employee.
               Employee e = Employee.Random();

               // Try to retrieve the ID for the employee from the database.
               e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);

               // Post the Employee object to the Employee target of
               // the batched join block.
               selectEmployees.Target1.Post(e);
            }
            catch (NullReferenceException e)
            {
               // GetEmployeeID throws NullReferenceException when there is
               // no such employee with the given name. When this happens,
               // post the Exception object to the Exception target of
               // the batched join block.
               selectEmployees.Target2.Post(e);
            }
         }

         // Set the batched join block to the completed state and wait for
         // all retrieval operations to complete.
         selectEmployees.Complete();
         printEmployees.Completion.Wait();

         // Print the total error count.
         Console.WriteLine($"Finished. There were {totalErrors} total errors.");
      }

      static void Main()
      {
         // Create a connection string for accessing the database.
         // The connection string refers to the temporary database location.
         string connectionString = "...";

         // Create a Stopwatch object to time database insert operations.
         Stopwatch stopwatch = new Stopwatch();

         // Start with a clean database file by copying the source database to
         // the temporary location.
         File.Copy(s_sourceDatabase, s_scratchDatabase, true);

         // Demonstrate multiple insert operations without batching.
         Console.WriteLine("Demonstrating non-batched database insert operations...");
         Console.WriteLine($"Original size of Employee table: {GetEmployeeCount(connectionString)}.");
         stopwatch.Start();
         AddEmployees(connectionString, s_insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(s_sourceDatabase, s_scratchDatabase, true);

         // Demonstrate multiple insert operations, this time with batching.
         Console.WriteLine("Demonstrating batched database insert operations...");
         Console.WriteLine($"Original size of Employee table: {GetEmployeeCount(connectionString)}.");
         stopwatch.Restart();
         AddEmployeesBatched(connectionString, s_insertBatchSize, s_insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(s_sourceDatabase, s_scratchDatabase, true);

         // Demonstrate multiple retrieval operations with error reporting.
         Console.WriteLine("Demonstrating batched join database select operations...");
         // Add a small number of employees to the database.
         AddEmployeesBatched(connectionString, s_insertBatchSize, 16);
         // Query for random employees.
         GetRandomEmployees(connectionString, s_insertBatchSize, 10);
      }
   }
}

/* Sample output:
Demonstrating non-batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 11035 ms.

Demonstrating batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 197 ms.

Demonstrating batched join database insert operations...
Adding 16 entries to Employee table...
Selecting items from Employee table...
Received a batch...
Last=Jones First=Tom ID=21
Last=Jones First=John ID=24
Last=Smith First=Tom ID=26
Last=Jones First=Tom ID=21
There were 4 errors in this batch...
Received a batch...
Last=Smith First=Tom ID=26
Last=Jones First=Mike ID=28
There were 0 errors in this batch...
Finished. There were 4 total errors.
*/
Imports System.Data
Imports System.Data.SqlClient
Imports System.IO
Imports System.Threading.Tasks.Dataflow


' Demonstrates how to use batched dataflow blocks to improve
' the performance of database operations.
Namespace DataflowBatchDatabase
    Friend Class Program
        ' The number of employees to add to the database.
        ' TODO: Change this value to experiment with different numbers of 
        ' employees to insert into the database.
        Private Shared ReadOnly s_insertCount As Integer = 256

        ' The size of a single batch of employees to add to the database.
        ' TODO: Change this value to experiment with different batch sizes.
        Private Shared ReadOnly s_insertBatchSize As Integer = 96

        ' The source database file.
        ' TODO: Change this value if Northwind.sdf is at a different location
        ' on your computer.
        Private Shared ReadOnly s_sourceDatabase As String =
            "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"

        ' TODO: Change this value if you require a different temporary location.
        Private Shared ReadOnly s_scratchDatabase As String =
            "C:\Temp\Northwind.sdf"

        ' Describes an employee. Each property maps to a 
        ' column in the Employees table in the Northwind database.
        ' For brevity, the Employee class does not contain
        ' all columns from the Employees table.
        Private Class Employee
            Public Property EmployeeID() As Integer
            Public Property LastName() As String
            Public Property FirstName() As String

            ' A random number generator that helps tp generate
            ' Employee property values.
            Private Shared ReadOnly s_rand As New Random(42)

            ' Possible random first names.
            Private Shared ReadOnly s_firstNames() As String =
                {"Tom", "Mike", "Ruth", "Bob", "John"}
            ' Possible random last names.
            Private Shared ReadOnly s_lastNames() As String =
                {"Jones", "Smith", "Johnson", "Walker"}

            ' Creates an Employee object that contains random 
            ' property values.
            Public Shared Function Random() As Employee
                Return New Employee With {
                    .EmployeeID = -1,
                    .LastName = s_lastNames(s_rand.Next() Mod s_lastNames.Length),
                    .FirstName = s_firstNames(s_rand.Next() Mod s_firstNames.Length)
                }
            End Function
        End Class

        ' Adds new employee records to the database.
        Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
            Using connection As New SqlConnection(connectionString)
                Try
                    ' Create the SQL command.
                    Dim command As New SqlCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

                    connection.Open()
                    For i As Integer = 0 To employees.Length - 1
                        ' Set parameters.
                        command.Parameters.Clear()
                        command.Parameters.Add("@lastName", SqlDbType.NVarChar).Value = employees(i).LastName
                        command.Parameters.Add("@firstName", SqlDbType.NVarChar).Value = employees(i).FirstName

                        ' Execute the command.
                        command.ExecuteNonQuery()
                    Next i
                Finally
                    connection.Close()
                End Try
            End Using
        End Sub

        ' Retrieves the number of entries in the Employees table in 
        ' the Northwind database.
        Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
            Dim result As Integer = 0
            Using sqlConnection As New SqlConnection(connectionString)
                Dim sqlCommand As New SqlCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

                sqlConnection.Open()
                Try
                    result = CInt(Fix(sqlCommand.ExecuteScalar()))
                Finally
                    sqlConnection.Close()
                End Try
            End Using
            Return result
        End Function

        ' Retrieves the ID of the first employee that has the provided name.
        Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
            Using connection As New SqlConnection(connectionString)
                Dim query As String = "SELECT [Employee ID] FROM Employees WHERE [Last Name] = @lastName AND [First Name] = @firstName"
                Dim command As New SqlCommand(query, connection)
                command.Parameters.Add("@lastName", lastName)
                command.Parameters.Add("@firstName", firstName)

                connection.Open()
                Try
                    Return CInt(Fix(command.ExecuteScalar()))
                Finally
                    connection.Close()
                End Try
            End Using
        End Function

        ' Posts random Employee data to the provided target block.
        Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
            Console.WriteLine("Adding {0} entries to Employee table...", count)

            For i As Integer = 0 To count - 1
                target.Post(Employee.Random())
            Next i
        End Sub

        ' Adds random employee data to the database by using dataflow.
        Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
            ' Create an ActionBlock<Employee> object that adds a single
            ' employee entry to the database.
            Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))

            ' Post several random Employee objects to the dataflow block.
            PostRandomEmployees(insertEmployee, count)

            ' Set the dataflow block to the completed state and wait for 
            ' all insert operations to complete.
            insertEmployee.Complete()
            insertEmployee.Completion.Wait()
        End Sub

        ' Adds random employee data to the database by using dataflow.
        ' This method is similar to AddEmployees except that it uses batching
        ' to add multiple employees to the database at a time.
        Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
            ' Create a BatchBlock<Employee> that holds several Employee objects and
            ' then propagates them out as an array.
            Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

            ' Create an ActionBlock<Employee[]> object that adds multiple
            ' employee entries to the database.
            Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

            ' Link the batch block to the action block.
            batchEmployees.LinkTo(insertEmployees)

            ' When the batch block completes, set the action block also to complete.
            batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

            ' Post several random Employee objects to the batch block.
            PostRandomEmployees(batchEmployees, count)

            ' Set the batch block to the completed state and wait for 
            ' all insert operations to complete.
            batchEmployees.Complete()
            insertEmployees.Completion.Wait()
        End Sub

        ' Displays information about several random employees to the console.
        Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
            ' Create a BatchedJoinBlock<Employee, Exception> object that holds
            ' both employee and exception data.
            Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)

            ' Holds the total number of exceptions that occurred.
            Dim totalErrors As Integer = 0

            ' Create an action block that prints employee and error information
            ' to the console.
            Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(
                Sub(data)
                    ' Print information about the employees in this batch.
                    ' Print the error count for this batch.
                    ' Update total error count.
                    Console.WriteLine("Received a batch...")
                    For Each e As Employee In data.Item1
                        Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
                    Next e
                    Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
                    totalErrors += data.Item2.Count
                End Sub)

            ' Link the batched join block to the action block.
            selectEmployees.LinkTo(printEmployees)

            ' When the batched join block completes, set the action block also to complete.
            selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())

            ' Try to retrieve the ID for several random employees.
            Console.WriteLine("Selecting random entries from Employees table...")
            For i As Integer = 0 To count - 1
                Try
                    ' Create a random employee.
                    Dim e As Employee = Employee.Random()

                    ' Try to retrieve the ID for the employee from the database.
                    e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)

                    ' Post the Employee object to the Employee target of 
                    ' the batched join block.
                    selectEmployees.Target1.Post(e)
                Catch e As NullReferenceException
                    ' GetEmployeeID throws NullReferenceException when there is 
                    ' no such employee with the given name. When this happens,
                    ' post the Exception object to the Exception target of
                    ' the batched join block.
                    selectEmployees.Target2.Post(e)
                End Try
            Next i

            ' Set the batched join block to the completed state and wait for 
            ' all retrieval operations to complete.
            selectEmployees.Complete()
            printEmployees.Completion.Wait()

            ' Print the total error count.
            Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
        End Sub

        Shared Sub Main()
            ' Create a connection string for accessing the database.
            ' The connection string refers to the temporary database location.
            Dim connectionString As String = String.Format(
                "Data Source={0}",
                s_scratchDatabase)

            ' Create a Stopwatch object to time database insert operations.
            Dim stopwatch As New Stopwatch()

            ' Start with a clean database file by copying the source database to 
            ' the temporary location.
            File.Copy(s_sourceDatabase, s_scratchDatabase, True)

            ' Demonstrate multiple insert operations without batching.
            Console.WriteLine("Demonstrating non-batched database insert operations...")
            Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
            stopwatch.Start()
            AddEmployees(connectionString, s_insertCount)
            stopwatch.Stop()
            Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

            Console.WriteLine()

            ' Start again with a clean database file.
            File.Copy(s_sourceDatabase, s_scratchDatabase, True)

            ' Demonstrate multiple insert operations, this time with batching.
            Console.WriteLine("Demonstrating batched database insert operations...")
            Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
            stopwatch.Restart()
            AddEmployeesBatched(connectionString, s_insertBatchSize, s_insertCount)
            stopwatch.Stop()
            Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

            Console.WriteLine()

            ' Start again with a clean database file.
            File.Copy(s_sourceDatabase, s_scratchDatabase, True)

            ' Demonstrate multiple retrieval operations with error reporting.
            Console.WriteLine("Demonstrating batched join database select operations...")
            ' Add a small number of employees to the database.
            AddEmployeesBatched(connectionString, s_insertBatchSize, 16)
            ' Query for random employees.
            GetRandomEmployees(connectionString, s_insertBatchSize, 10)
        End Sub
    End Class
End Namespace

' Sample output:
'Demonstrating non-batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 11035 ms.
'
'Demonstrating batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 197 ms.
'
'Demonstrating batched join database insert operations...
'Adding 16 entries to Employee table...
'Selecting items from Employee table...
'Received a batch...
'Last=Jones First=Tom ID=21
'Last=Jones First=John ID=24
'Last=Smith First=Tom ID=26
'Last=Jones First=Tom ID=21
'There were 4 errors in this batch...
'Received a batch...
'Last=Smith First=Tom ID=26
'Last=Jones First=Mike ID=28
'There were 0 errors in this batch...
'Finished. There were 4 total errors.
'

Lihat juga