다음을 통해 공유


예제: BatchBlock 및 BatchedJoinBlock을 사용하여 효율성을 향상시키는 방법

TPL 데이터 흐름 라이브러리는 System.Threading.Tasks.Dataflow.BatchBlock<T>System.Threading.Tasks.Dataflow.BatchedJoinBlock<T1,T2> 클래스를 제공하여, 하나 이상의 원본에서 데이터를 수신하고 버퍼링한 다음, 그 버퍼링된 데이터를 하나의 컬렉션으로 전파할 수 있도록 합니다. 이 일괄 처리 메커니즘은 하나 이상의 원본에서 데이터를 수집한 다음 여러 데이터 요소를 일괄 처리로 처리할 때 유용합니다. 예를 들어 데이터 흐름을 사용하여 데이터베이스에 레코드를 삽입하는 애플리케이션을 고려해 보세요. 이 작업은 여러 항목을 한 번에 하나씩 삽입하는 대신 동시에 삽입하는 경우 더 효율적일 수 있습니다. 이 문서에서는 클래스를 BatchBlock<T> 사용하여 이러한 데이터베이스 삽입 작업의 효율성을 개선하는 방법을 설명합니다. 또한 클래스를 사용하여 BatchedJoinBlock<T1,T2> 프로그램이 데이터베이스에서 읽을 때 발생하는 결과와 예외를 모두 캡처하는 방법을 설명합니다.

비고

TPL 데이터 흐름 라이브러리(System.Threading.Tasks.Dataflow 네임스페이스)는 .NET과 함께 배포되지 않습니다. Visual Studio에서 System.Threading.Tasks.Dataflow 네임스페이스를 설치하려면, 프로젝트를 열고 프로젝트 메뉴에서 NuGet 패키지 관리를 선택한 다음, System.Threading.Tasks.Dataflow 패키지를 온라인으로 검색합니다. 대안으로, .NET Core CLI 을 사용하여을 설치하려면, dotnet add package System.Threading.Tasks.Dataflow을 실행하십시오.

필수 조건

  1. 이 연습을 시작하기 전에 데이터 흐름 문서에서 조인 블록 섹션을 읽어 보세요.

  2. 컴퓨터에서 사용할 수 있는 Northwind 데이터베이스 Northwind.sdf의 복사본이 있는지 확인합니다.

    중요합니다

    일부 Windows 버전에서는 Visual Studio가 관리자가 아닌 모드로 실행 중인 경우 Northwind.sdf에 연결할 수 없습니다. Northwind.sdf에 연결하려면 관리자 모드로 실행 모드에서 Visual Studio 또는 Visual Studio용 개발자 명령 프롬프트를 시작합니다.

콘솔 애플리케이션 만들기

  1. Visual Studio에서 Visual C# 또는 Visual Basic 콘솔 애플리케이션 프로젝트를 만듭니다. 이 문서에서 프로젝트의 이름은 DataflowBatchDatabase다음과 같습니다.

  2. Form1.cs(Visual Basic용 Form1.vb)에 다음 using (Imports Visual Basic) 문이 포함되어 있는지 확인합니다.

    using System;
    using System.Collections.Generic;
    using System.Data.SqlClient;
    using System.Diagnostics;
    using System.IO;
    using System.Threading.Tasks.Dataflow;
    
    Imports System.Collections.Generic
    Imports System.Data.SqlClient
    Imports System.Data.SqlServerCe
    Imports System.Diagnostics
    Imports System.IO
    Imports System.Threading.Tasks.Dataflow
    
    
  3. 클래스에 다음 데이터 멤버를 추가합니다 Program .

    // The number of employees to add to the database.
    // TODO: Change this value to experiment with different numbers of
    // employees to insert into the database.
    static readonly int insertCount = 256;
    
    // The size of a single batch of employees to add to the database.
    // TODO: Change this value to experiment with different batch sizes.
    static readonly int insertBatchSize = 96;
    
    // The source database file.
    // TODO: Change this value if Northwind.sdf is at a different location
    // on your computer.
    static readonly string sourceDatabase =
       @"C:\...\Northwind.sdf";
    
    // TODO: Change this value if you require a different temporary location.
    static readonly string scratchDatabase =
       @"C:\Temp\Northwind.sdf";
    
    ' The number of employees to add to the database.
    ' TODO: Change this value to experiment with different numbers of 
    ' employees to insert into the database.
    Private Shared ReadOnly insertCount As Integer = 256
    
    ' The size of a single batch of employees to add to the database.
    ' TODO: Change this value to experiment with different batch sizes.
    Private Shared ReadOnly insertBatchSize As Integer = 96
    
    ' The source database file.
    ' TODO: Change this value if Northwind.sdf is at a different location
    ' on your computer.
    Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"
    
    ' TODO: Change this value if you require a different temporary location.
    Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
    

Employee 클래스 정의

Program 클래스에 Employee 클래스를 추가합니다.

// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
   public int EmployeeID { get; set; }
   public string LastName { get; set; }
   public string FirstName { get; set; }

   // A random number generator that helps tp generate
   // Employee property values.
   static Random rand = new Random(42);

   // Possible random first names.
   static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
   // Possible random last names.
   static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };

   // Creates an Employee object that contains random
   // property values.
   public static Employee Random()
   {
      return new Employee
      {
         EmployeeID = -1,
         LastName = lastNames[rand.Next() % lastNames.Length],
         FirstName = firstNames[rand.Next() % firstNames.Length]
      };
   }
}
' Describes an employee. Each property maps to a 
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
    Public Property EmployeeID() As Integer
    Public Property LastName() As String
    Public Property FirstName() As String

    ' A random number generator that helps tp generate
    ' Employee property values.
    Private Shared rand As New Random(42)

    ' Possible random first names.
    Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
    ' Possible random last names.
    Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}

    ' Creates an Employee object that contains random 
    ' property values.
    Public Shared Function Random() As Employee
        Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
    End Function
End Class

클래스에는 Employee 세 가지 속성, EmployeeIDLastNameFirstName. 이러한 속성은 Northwind 데이터베이스의 Employees 테이블에 있는 Employee ID, Last Name, 및 First Name 열에 해당합니다. 이 데모의 경우 클래스는 Employee 해당 속성에 대한 임의 Random 값이 있는 개체를 Employee 만드는 메서드도 정의합니다.

직원 데이터베이스 작업 정의

Program 클래스에 InsertEmployees, GetEmployeeCount, 및 GetEmployeeID 메서드를 추가합니다.

// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
   using (SqlConnection connection =
      new SqlConnection(connectionString))
   {
      try
      {
         // Create the SQL command.
         SqlCommand command = new SqlCommand(
            "INSERT INTO Employees ([Last Name], [First Name])" +
            "VALUES (@lastName, @firstName)",
            connection);

         connection.Open();
         for (int i = 0; i < employees.Length; i++)
         {
            // Set parameters.
            command.Parameters.Clear();
            command.Parameters.Add("@lastName", employees[i].LastName);
            command.Parameters.Add("@firstName", employees[i].FirstName);

            // Execute the command.
            command.ExecuteNonQuery();
         }
      }
      finally
      {
         connection.Close();
      }
   }
}

// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
   int result = 0;
   using (SqlConnection sqlConnection =
      new SqlConnection(connectionString))
   {
      SqlCommand sqlCommand = new SqlCommand(
         "SELECT COUNT(*) FROM Employees", sqlConnection);

      sqlConnection.Open();
      try
      {
         result = (int)sqlCommand.ExecuteScalar();
      }
      finally
      {
         sqlConnection.Close();
      }
   }
   return result;
}

// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
   string connectionString)
{
   using (SqlConnection connection =
      new SqlConnection(connectionString))
   {
      SqlCommand command = new SqlCommand(
         string.Format(
            "SELECT [Employee ID] FROM Employees " +
            "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
            lastName, firstName),
         connection);

      connection.Open();
      try
      {
         return (int)command.ExecuteScalar();
      }
      finally
      {
         connection.Close();
      }
   }
}
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
    Using connection As New SqlConnection(connectionString)
        Try
            ' Create the SQL command.
            Dim command As New SqlCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

            connection.Open()
            For i As Integer = 0 To employees.Length - 1
                ' Set parameters.
                command.Parameters.Clear()
                command.Parameters.Add("@lastName", employees(i).LastName)
                command.Parameters.Add("@firstName", employees(i).FirstName)

                ' Execute the command.
                command.ExecuteNonQuery()
            Next i
        Finally
            connection.Close()
        End Try
    End Using
End Sub

' Retrieves the number of entries in the Employees table in 
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
    Dim result As Integer = 0
    Using sqlConnection As New SqlConnection(connectionString)
        Dim sqlCommand As New SqlCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

        sqlConnection.Open()
        Try
            result = CInt(Fix(sqlCommand.ExecuteScalar()))
        Finally
            sqlConnection.Close()
        End Try
    End Using
    Return result
End Function

' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
    Using connection As New SqlConnection(connectionString)
        Dim command As New SqlCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)

        connection.Open()
        Try
            Return CInt(Fix(command.ExecuteScalar()))
        Finally
            connection.Close()
        End Try
    End Using
End Function

이 메서드는 InsertEmployees 새 직원 레코드를 데이터베이스에 추가합니다. 이 메서드는 GetEmployeeCount 테이블의 항목 수를 검색합니다 Employees . 메서드는 GetEmployeeID 제공된 이름을 포함하는 첫 번째 직원의 식별자를 검색합니다. 이러한 각 메서드는 Northwind 데이터베이스에 연결 문자열을 사용합니다.

버퍼링을 사용하지 않고 데이터베이스에 직원 데이터 추가

Program 클래스에 AddEmployeesPostRandomEmployees 메서드를 추가합니다.

// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
   Console.WriteLine($"Adding {count} entries to Employee table...");

   for (int i = 0; i < count; i++)
   {
      target.Post(Employee.Random());
   }
}

// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
   // Create an ActionBlock<Employee> object that adds a single
   // employee entry to the database.
   var insertEmployee = new ActionBlock<Employee>(e =>
      InsertEmployees(new Employee[] { e }, connectionString));

   // Post several random Employee objects to the dataflow block.
   PostRandomEmployees(insertEmployee, count);

   // Set the dataflow block to the completed state and wait for
   // all insert operations to complete.
   insertEmployee.Complete();
   insertEmployee.Completion.Wait();
}
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
    Console.WriteLine("Adding {0} entries to Employee table...", count)

    For i As Integer = 0 To count - 1
        target.Post(Employee.Random())
    Next i
End Sub

' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
    ' Create an ActionBlock<Employee> object that adds a single
    ' employee entry to the database.
    Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))

    ' Post several random Employee objects to the dataflow block.
    PostRandomEmployees(insertEmployee, count)

    ' Set the dataflow block to the completed state and wait for 
    ' all insert operations to complete.
    insertEmployee.Complete()
    insertEmployee.Completion.Wait()
End Sub

이 메서드는 AddEmployees 데이터 흐름을 사용하여 임의 직원 데이터를 데이터베이스에 추가합니다. ActionBlock<TInput> 개체를 만들어 InsertEmployees 메서드를 호출하여 데이터베이스에 직원 항목을 추가합니다. AddEmployees 메서드는 그런 다음 PostRandomEmployees 메서드를 호출하여 여러 Employee 객체를 ActionBlock<TInput> 객체에 게시합니다. 그런 다음 메서드는 AddEmployees 모든 삽입 작업이 완료되기를 기다립니다.

버퍼링을 사용하여 데이터베이스에 직원 데이터 추가

클래스에 Program 메서드를 추가합니다 AddEmployeesBatched .

// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
   int count)
{
   // Create a BatchBlock<Employee> that holds several Employee objects and
   // then propagates them out as an array.
   var batchEmployees = new BatchBlock<Employee>(batchSize);

   // Create an ActionBlock<Employee[]> object that adds multiple
   // employee entries to the database.
   var insertEmployees = new ActionBlock<Employee[]>(a =>
      InsertEmployees(a, connectionString));

   // Link the batch block to the action block.
   batchEmployees.LinkTo(insertEmployees);

   // When the batch block completes, set the action block also to complete.
   batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

   // Post several random Employee objects to the batch block.
   PostRandomEmployees(batchEmployees, count);

   // Set the batch block to the completed state and wait for
   // all insert operations to complete.
   batchEmployees.Complete();
   insertEmployees.Completion.Wait();
}
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
    ' Create a BatchBlock<Employee> that holds several Employee objects and
    ' then propagates them out as an array.
    Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

    ' Create an ActionBlock<Employee[]> object that adds multiple
    ' employee entries to the database.
    Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

    ' Link the batch block to the action block.
    batchEmployees.LinkTo(insertEmployees)

    ' When the batch block completes, set the action block also to complete.
    batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

    ' Post several random Employee objects to the batch block.
    PostRandomEmployees(batchEmployees, count)

    ' Set the batch block to the completed state and wait for 
    ' all insert operations to complete.
    batchEmployees.Complete()
    insertEmployees.Completion.Wait()
End Sub

이 메서드는 AddEmployees과 비슷하지만, 여러 Employee 개체를 ActionBlock<TInput> 개체로 보내기 전에 BatchBlock<T> 클래스를 사용하여 버퍼링합니다. 클래스가 BatchBlock<T> 여러 요소를 컬렉션으로 전파하기 때문에, ActionBlock<TInput> 객체를 Employee 객체 배열에서 작동하도록 수정합니다. AddEmployees 메서드와 마찬가지로, AddEmployeesBatchedPostRandomEmployees 메서드를 호출하여 여러 개의 Employee 객체를 게시합니다. 그러나 AddEmployeesBatched는 이러한 객체를 BatchBlock<T> 객체에 게시합니다. 또한 이 메서드는 AddEmployeesBatched 모든 삽입 작업이 완료되기를 기다립니다.

버퍼링된 조인을 사용하여 데이터베이스에서 직원 데이터 읽기

클래스에 Program 메서드를 추가합니다 GetRandomEmployees .

// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
   int count)
{
   // Create a BatchedJoinBlock<Employee, Exception> object that holds
   // both employee and exception data.
   var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);

   // Holds the total number of exceptions that occurred.
   int totalErrors = 0;

   // Create an action block that prints employee and error information
   // to the console.
   var printEmployees =
      new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
      {
         // Print information about the employees in this batch.
         Console.WriteLine("Received a batch...");
         foreach (Employee e in data.Item1)
         {
            Console.WriteLine($"Last={e.LastName} First={e.FirstName} ID={e.EmployeeID}");
         }

         // Print the error count for this batch.
         Console.WriteLine($"There were {data.Item2.Count} errors in this batch...");

         // Update total error count.
         totalErrors += data.Item2.Count;
      });

   // Link the batched join block to the action block.
   selectEmployees.LinkTo(printEmployees);

   // When the batched join block completes, set the action block also to complete.
   selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });

   // Try to retrieve the ID for several random employees.
   Console.WriteLine("Selecting random entries from Employees table...");
   for (int i = 0; i < count; i++)
   {
      try
      {
         // Create a random employee.
         Employee e = Employee.Random();

         // Try to retrieve the ID for the employee from the database.
         e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);

         // Post the Employee object to the Employee target of
         // the batched join block.
         selectEmployees.Target1.Post(e);
      }
      catch (NullReferenceException e)
      {
         // GetEmployeeID throws NullReferenceException when there is
         // no such employee with the given name. When this happens,
         // post the Exception object to the Exception target of
         // the batched join block.
         selectEmployees.Target2.Post(e);
      }
   }

   // Set the batched join block to the completed state and wait for
   // all retrieval operations to complete.
   selectEmployees.Complete();
   printEmployees.Completion.Wait();

   // Print the total error count.
   Console.WriteLine($"Finished. There were {totalErrors} total errors.");
}
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
    ' Create a BatchedJoinBlock<Employee, Exception> object that holds
    ' both employee and exception data.
    Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)

    ' Holds the total number of exceptions that occurred.
    Dim totalErrors As Integer = 0

    ' Create an action block that prints employee and error information
    ' to the console.
    Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
                                                                                                   ' Print information about the employees in this batch.
                                                                                                   ' Print the error count for this batch.
                                                                                                   ' Update total error count.
                                                                                                   Console.WriteLine("Received a batch...")
                                                                                                   For Each e As Employee In data.Item1
                                                                                                       Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
                                                                                                   Next e
                                                                                                   Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
                                                                                                   totalErrors += data.Item2.Count
                                                                                               End Sub)

    ' Link the batched join block to the action block.
    selectEmployees.LinkTo(printEmployees)

    ' When the batched join block completes, set the action block also to complete.
    selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())

    ' Try to retrieve the ID for several random employees.
    Console.WriteLine("Selecting random entries from Employees table...")
    For i As Integer = 0 To count - 1
        Try
            ' Create a random employee.
            Dim e As Employee = Employee.Random()

            ' Try to retrieve the ID for the employee from the database.
            e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)

            ' Post the Employee object to the Employee target of 
            ' the batched join block.
            selectEmployees.Target1.Post(e)
        Catch e As NullReferenceException
            ' GetEmployeeID throws NullReferenceException when there is 
            ' no such employee with the given name. When this happens,
            ' post the Exception object to the Exception target of
            ' the batched join block.
            selectEmployees.Target2.Post(e)
        End Try
    Next i

    ' Set the batched join block to the completed state and wait for 
    ' all retrieval operations to complete.
    selectEmployees.Complete()
    printEmployees.Completion.Wait()

    ' Print the total error count.
    Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub

이 메서드는 임의 직원에 대한 정보를 콘솔에 출력합니다. 여러 임의의 Employee 개체를 만들고 GetEmployeeID 메서드를 호출하여 각 개체에 대한 고유 식별자를 검색합니다. GetEmployeeID 메서드는 지정된 이름과 성을 가진 일치하는 직원이 없는 경우 예외를 발생시키므로, GetRandomEmployees 메서드는 BatchedJoinBlock<T1,T2> 클래스를 사용하여 GetEmployeeID 호출에 성공한 경우 Employee 개체를 저장하고, 실패한 호출에는 System.Exception 개체를 저장합니다. 이 예제에서 ActionBlock<TInput> 개체는 Employee 개체 목록과 Exception 개체 목록을 보유한 Tuple<T1,T2> 개체에 작용합니다. BatchedJoinBlock<T1,T2> 수신된 Employee 개체 수와 Exception 개체 개수의 합계가 일괄 처리 크기와 같으면 개체가 이 데이터를 전파합니다.

전체 예제

다음 예제에서는 전체 코드를 보여줍니다. 이 메서드는 Main 일괄 처리된 데이터베이스 삽입을 수행하는 데 필요한 시간과 일괄 처리되지 않은 데이터베이스 삽입을 수행하는 시간을 비교합니다. 또한 버퍼링된 조인을 사용하여 데이터베이스에서 직원 데이터를 읽고 오류를 보고하는 방법을 보여 줍니다.

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use batched dataflow blocks to improve
// the performance of database operations.
namespace DataflowBatchDatabase
{
   class Program
   {
      // The number of employees to add to the database.
      // TODO: Change this value to experiment with different numbers of
      // employees to insert into the database.
      static readonly int insertCount = 256;

      // The size of a single batch of employees to add to the database.
      // TODO: Change this value to experiment with different batch sizes.
      static readonly int insertBatchSize = 96;

      // The source database file.
      // TODO: Change this value if Northwind.sdf is at a different location
      // on your computer.
      static readonly string sourceDatabase =
         @"C:\...\Northwind.sdf";

      // TODO: Change this value if you require a different temporary location.
      static readonly string scratchDatabase =
         @"C:\Temp\Northwind.sdf";

      // Describes an employee. Each property maps to a
      // column in the Employees table in the Northwind database.
      // For brevity, the Employee class does not contain
      // all columns from the Employees table.
      class Employee
      {
         public int EmployeeID { get; set; }
         public string LastName { get; set; }
         public string FirstName { get; set; }

         // A random number generator that helps tp generate
         // Employee property values.
         static Random rand = new Random(42);

         // Possible random first names.
         static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
         // Possible random last names.
         static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };

         // Creates an Employee object that contains random
         // property values.
         public static Employee Random()
         {
            return new Employee
            {
               EmployeeID = -1,
               LastName = lastNames[rand.Next() % lastNames.Length],
               FirstName = firstNames[rand.Next() % firstNames.Length]
            };
         }
      }

      // Adds new employee records to the database.
      static void InsertEmployees(Employee[] employees, string connectionString)
      {
         using (SqlConnection connection =
            new SqlConnection(connectionString))
         {
            try
            {
               // Create the SQL command.
               SqlCommand command = new SqlCommand(
                  "INSERT INTO Employees ([Last Name], [First Name])" +
                  "VALUES (@lastName, @firstName)",
                  connection);

               connection.Open();
               for (int i = 0; i < employees.Length; i++)
               {
                  // Set parameters.
                  command.Parameters.Clear();
                  command.Parameters.Add("@lastName", employees[i].LastName);
                  command.Parameters.Add("@firstName", employees[i].FirstName);

                  // Execute the command.
                  command.ExecuteNonQuery();
               }
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Retrieves the number of entries in the Employees table in
      // the Northwind database.
      static int GetEmployeeCount(string connectionString)
      {
         int result = 0;
         using (SqlConnection sqlConnection =
            new SqlConnection(connectionString))
         {
            SqlCommand sqlCommand = new SqlCommand(
               "SELECT COUNT(*) FROM Employees", sqlConnection);

            sqlConnection.Open();
            try
            {
               result = (int)sqlCommand.ExecuteScalar();
            }
            finally
            {
               sqlConnection.Close();
            }
         }
         return result;
      }

      // Retrieves the ID of the first employee that has the provided name.
      static int GetEmployeeID(string lastName, string firstName,
         string connectionString)
      {
         using (SqlConnection connection =
            new SqlConnection(connectionString))
         {
            SqlCommand command = new SqlCommand(
               string.Format(
                  "SELECT [Employee ID] FROM Employees " +
                  "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
                  lastName, firstName),
               connection);

            connection.Open();
            try
            {
               return (int)command.ExecuteScalar();
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Posts random Employee data to the provided target block.
      static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
      {
         Console.WriteLine($"Adding {count} entries to Employee table...");

         for (int i = 0; i < count; i++)
         {
            target.Post(Employee.Random());
         }
      }

      // Adds random employee data to the database by using dataflow.
      static void AddEmployees(string connectionString, int count)
      {
         // Create an ActionBlock<Employee> object that adds a single
         // employee entry to the database.
         var insertEmployee = new ActionBlock<Employee>(e =>
            InsertEmployees(new Employee[] { e }, connectionString));

         // Post several random Employee objects to the dataflow block.
         PostRandomEmployees(insertEmployee, count);

         // Set the dataflow block to the completed state and wait for
         // all insert operations to complete.
         insertEmployee.Complete();
         insertEmployee.Completion.Wait();
      }

      // Adds random employee data to the database by using dataflow.
      // This method is similar to AddEmployees except that it uses batching
      // to add multiple employees to the database at a time.
      static void AddEmployeesBatched(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchBlock<Employee> that holds several Employee objects and
         // then propagates them out as an array.
         var batchEmployees = new BatchBlock<Employee>(batchSize);

         // Create an ActionBlock<Employee[]> object that adds multiple
         // employee entries to the database.
         var insertEmployees = new ActionBlock<Employee[]>(a =>
            InsertEmployees(a, connectionString));

         // Link the batch block to the action block.
         batchEmployees.LinkTo(insertEmployees);

         // When the batch block completes, set the action block also to complete.
         batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

         // Post several random Employee objects to the batch block.
         PostRandomEmployees(batchEmployees, count);

         // Set the batch block to the completed state and wait for
         // all insert operations to complete.
         batchEmployees.Complete();
         insertEmployees.Completion.Wait();
      }

      // Displays information about several random employees to the console.
      static void GetRandomEmployees(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchedJoinBlock<Employee, Exception> object that holds
         // both employee and exception data.
         var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);

         // Holds the total number of exceptions that occurred.
         int totalErrors = 0;

         // Create an action block that prints employee and error information
         // to the console.
         var printEmployees =
            new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
            {
               // Print information about the employees in this batch.
               Console.WriteLine("Received a batch...");
               foreach (Employee e in data.Item1)
               {
                  Console.WriteLine($"Last={e.LastName} First={e.FirstName} ID={e.EmployeeID}");
               }

               // Print the error count for this batch.
               Console.WriteLine($"There were {data.Item2.Count} errors in this batch...");

               // Update total error count.
               totalErrors += data.Item2.Count;
            });

         // Link the batched join block to the action block.
         selectEmployees.LinkTo(printEmployees);

         // When the batched join block completes, set the action block also to complete.
         selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });

         // Try to retrieve the ID for several random employees.
         Console.WriteLine("Selecting random entries from Employees table...");
         for (int i = 0; i < count; i++)
         {
            try
            {
               // Create a random employee.
               Employee e = Employee.Random();

               // Try to retrieve the ID for the employee from the database.
               e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);

               // Post the Employee object to the Employee target of
               // the batched join block.
               selectEmployees.Target1.Post(e);
            }
            catch (NullReferenceException e)
            {
               // GetEmployeeID throws NullReferenceException when there is
               // no such employee with the given name. When this happens,
               // post the Exception object to the Exception target of
               // the batched join block.
               selectEmployees.Target2.Post(e);
            }
         }

         // Set the batched join block to the completed state and wait for
         // all retrieval operations to complete.
         selectEmployees.Complete();
         printEmployees.Completion.Wait();

         // Print the total error count.
         Console.WriteLine($"Finished. There were {totalErrors} total errors.");
      }

      static void Main(string[] args)
      {
         // Create a connection string for accessing the database.
         // The connection string refers to the temporary database location.
         string connectionString = "...";

         // Create a Stopwatch object to time database insert operations.
         Stopwatch stopwatch = new Stopwatch();

         // Start with a clean database file by copying the source database to
         // the temporary location.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple insert operations without batching.
         Console.WriteLine("Demonstrating non-batched database insert operations...");
         Console.WriteLine($"Original size of Employee table: {GetEmployeeCount(connectionString)}.");
         stopwatch.Start();
         AddEmployees(connectionString, insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple insert operations, this time with batching.
         Console.WriteLine("Demonstrating batched database insert operations...");
         Console.WriteLine($"Original size of Employee table: {GetEmployeeCount(connectionString)}.");
         stopwatch.Restart();
         AddEmployeesBatched(connectionString, insertBatchSize, insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple retrieval operations with error reporting.
         Console.WriteLine("Demonstrating batched join database select operations...");
         // Add a small number of employees to the database.
         AddEmployeesBatched(connectionString, insertBatchSize, 16);
         // Query for random employees.
         GetRandomEmployees(connectionString, insertBatchSize, 10);
      }
   }
}
/* Sample output:
Demonstrating non-batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 11035 ms.

Demonstrating batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 197 ms.

Demonstrating batched join database insert operations...
Adding 16 entries to Employee table...
Selecting items from Employee table...
Received a batch...
Last=Jones First=Tom ID=21
Last=Jones First=John ID=24
Last=Smith First=Tom ID=26
Last=Jones First=Tom ID=21
There were 4 errors in this batch...
Received a batch...
Last=Smith First=Tom ID=26
Last=Jones First=Mike ID=28
There were 0 errors in this batch...
Finished. There were 4 total errors.
*/
Imports System.Collections.Generic
Imports System.Data.SqlClient
Imports System.Data.SqlServerCe
Imports System.Diagnostics
Imports System.IO
Imports System.Threading.Tasks.Dataflow


' Demonstrates how to use batched dataflow blocks to improve
' the performance of database operations.
Namespace DataflowBatchDatabase
    Friend Class Program
        ' The number of employees to add to the database.
        ' TODO: Change this value to experiment with different numbers of 
        ' employees to insert into the database.
        Private Shared ReadOnly insertCount As Integer = 256

        ' The size of a single batch of employees to add to the database.
        ' TODO: Change this value to experiment with different batch sizes.
        Private Shared ReadOnly insertBatchSize As Integer = 96

        ' The source database file.
        ' TODO: Change this value if Northwind.sdf is at a different location
        ' on your computer.
        Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"

        ' TODO: Change this value if you require a different temporary location.
        Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"

        ' Describes an employee. Each property maps to a 
        ' column in the Employees table in the Northwind database.
        ' For brevity, the Employee class does not contain
        ' all columns from the Employees table.
        Private Class Employee
            Public Property EmployeeID() As Integer
            Public Property LastName() As String
            Public Property FirstName() As String

            ' A random number generator that helps tp generate
            ' Employee property values.
            Private Shared rand As New Random(42)

            ' Possible random first names.
            Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
            ' Possible random last names.
            Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}

            ' Creates an Employee object that contains random 
            ' property values.
            Public Shared Function Random() As Employee
                Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
            End Function
        End Class

        ' Adds new employee records to the database.
        Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
            Using connection As New SqlConnection(connectionString)
                Try
                    ' Create the SQL command.
                    Dim command As New SqlCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

                    connection.Open()
                    For i As Integer = 0 To employees.Length - 1
                        ' Set parameters.
                        command.Parameters.Clear()
                        command.Parameters.Add("@lastName", employees(i).LastName)
                        command.Parameters.Add("@firstName", employees(i).FirstName)

                        ' Execute the command.
                        command.ExecuteNonQuery()
                    Next i
                Finally
                    connection.Close()
                End Try
            End Using
        End Sub

        ' Retrieves the number of entries in the Employees table in 
        ' the Northwind database.
        Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
            Dim result As Integer = 0
            Using sqlConnection As New SqlConnection(connectionString)
                Dim sqlCommand As New SqlCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

                sqlConnection.Open()
                Try
                    result = CInt(Fix(sqlCommand.ExecuteScalar()))
                Finally
                    sqlConnection.Close()
                End Try
            End Using
            Return result
        End Function

        ' Retrieves the ID of the first employee that has the provided name.
        Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
            Using connection As New SqlConnection(connectionString)
                Dim command As New SqlCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)

                connection.Open()
                Try
                    Return CInt(Fix(command.ExecuteScalar()))
                Finally
                    connection.Close()
                End Try
            End Using
        End Function

        ' Posts random Employee data to the provided target block.
        Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
            Console.WriteLine("Adding {0} entries to Employee table...", count)

            For i As Integer = 0 To count - 1
                target.Post(Employee.Random())
            Next i
        End Sub

        ' Adds random employee data to the database by using dataflow.
        Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
            ' Create an ActionBlock<Employee> object that adds a single
            ' employee entry to the database.
            Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))

            ' Post several random Employee objects to the dataflow block.
            PostRandomEmployees(insertEmployee, count)

            ' Set the dataflow block to the completed state and wait for 
            ' all insert operations to complete.
            insertEmployee.Complete()
            insertEmployee.Completion.Wait()
        End Sub

        ' Adds random employee data to the database by using dataflow.
        ' This method is similar to AddEmployees except that it uses batching
        ' to add multiple employees to the database at a time.
        Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
            ' Create a BatchBlock<Employee> that holds several Employee objects and
            ' then propagates them out as an array.
            Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

            ' Create an ActionBlock<Employee[]> object that adds multiple
            ' employee entries to the database.
            Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

            ' Link the batch block to the action block.
            batchEmployees.LinkTo(insertEmployees)

            ' When the batch block completes, set the action block also to complete.
            batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

            ' Post several random Employee objects to the batch block.
            PostRandomEmployees(batchEmployees, count)

            ' Set the batch block to the completed state and wait for 
            ' all insert operations to complete.
            batchEmployees.Complete()
            insertEmployees.Completion.Wait()
        End Sub

        ' Displays information about several random employees to the console.
        Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
            ' Create a BatchedJoinBlock<Employee, Exception> object that holds
            ' both employee and exception data.
            Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)

            ' Holds the total number of exceptions that occurred.
            Dim totalErrors As Integer = 0

            ' Create an action block that prints employee and error information
            ' to the console.
            Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
                                                                                                           ' Print information about the employees in this batch.
                                                                                                           ' Print the error count for this batch.
                                                                                                           ' Update total error count.
                                                                                                           Console.WriteLine("Received a batch...")
                                                                                                           For Each e As Employee In data.Item1
                                                                                                               Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
                                                                                                           Next e
                                                                                                           Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
                                                                                                           totalErrors += data.Item2.Count
                                                                                                       End Sub)

            ' Link the batched join block to the action block.
            selectEmployees.LinkTo(printEmployees)

            ' When the batched join block completes, set the action block also to complete.
            selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())

            ' Try to retrieve the ID for several random employees.
            Console.WriteLine("Selecting random entries from Employees table...")
            For i As Integer = 0 To count - 1
                Try
                    ' Create a random employee.
                    Dim e As Employee = Employee.Random()

                    ' Try to retrieve the ID for the employee from the database.
                    e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)

                    ' Post the Employee object to the Employee target of 
                    ' the batched join block.
                    selectEmployees.Target1.Post(e)
                Catch e As NullReferenceException
                    ' GetEmployeeID throws NullReferenceException when there is 
                    ' no such employee with the given name. When this happens,
                    ' post the Exception object to the Exception target of
                    ' the batched join block.
                    selectEmployees.Target2.Post(e)
                End Try
            Next i

            ' Set the batched join block to the completed state and wait for 
            ' all retrieval operations to complete.
            selectEmployees.Complete()
            printEmployees.Completion.Wait()

            ' Print the total error count.
            Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
        End Sub

        Shared Sub Main(ByVal args() As String)
            ' Create a connection string for accessing the database.
            ' The connection string refers to the temporary database location.
            Dim connectionString As String = String.Format("Data Source={0}", scratchDatabase)

            ' Create a Stopwatch object to time database insert operations.
            Dim stopwatch As New Stopwatch()

            ' Start with a clean database file by copying the source database to 
            ' the temporary location.
            File.Copy(sourceDatabase, scratchDatabase, True)

            ' Demonstrate multiple insert operations without batching.
            Console.WriteLine("Demonstrating non-batched database insert operations...")
            Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
            stopwatch.Start()
            AddEmployees(connectionString, insertCount)
            stopwatch.Stop()
            Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

            Console.WriteLine()

            ' Start again with a clean database file.
            File.Copy(sourceDatabase, scratchDatabase, True)

            ' Demonstrate multiple insert operations, this time with batching.
            Console.WriteLine("Demonstrating batched database insert operations...")
            Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
            stopwatch.Restart()
            AddEmployeesBatched(connectionString, insertBatchSize, insertCount)
            stopwatch.Stop()
            Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

            Console.WriteLine()

            ' Start again with a clean database file.
            File.Copy(sourceDatabase, scratchDatabase, True)

            ' Demonstrate multiple retrieval operations with error reporting.
            Console.WriteLine("Demonstrating batched join database select operations...")
            ' Add a small number of employees to the database.
            AddEmployeesBatched(connectionString, insertBatchSize, 16)
            ' Query for random employees.
            GetRandomEmployees(connectionString, insertBatchSize, 10)
        End Sub
    End Class
End Namespace
' Sample output:
'Demonstrating non-batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 11035 ms.
'
'Demonstrating batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 197 ms.
'
'Demonstrating batched join database insert operations...
'Adding 16 entries to Employee table...
'Selecting items from Employee table...
'Received a batch...
'Last=Jones First=Tom ID=21
'Last=Jones First=John ID=24
'Last=Smith First=Tom ID=26
'Last=Jones First=Tom ID=21
'There were 4 errors in this batch...
'Received a batch...
'Last=Smith First=Tom ID=26
'Last=Jones First=Mike ID=28
'There were 0 errors in this batch...
'Finished. There were 4 total errors.
'

참고하십시오

  • 데이터 흐름