逐步解說:使用 BatchBlock 和 BatchedJoinBlock 以改善效率
TPL 資料流程程式庫提供 System.Threading.Tasks.Dataflow.BatchBlock<T> 和 System.Threading.Tasks.Dataflow.BatchedJoinBlock<T1,T2> 類別,如此您可以接收來自一或多個來源的資料並緩衝,然後將該已緩衝的資料作為集合散佈。 這個批次機制在您從一或多個來源收集資料時非常實用,而且可接著批次處理多個資料元素。 例如,以使用資料流程將記錄插入資料庫的應用程式為例。 如果多個項目可以同時插入,而不是以循序方式逐一插入,這項作業就會更有效率。 本文件描述如何使用 BatchBlock<T> 類別來改善這類資料庫插入作業的效率。 文中也描述如何使用 BatchedJoinBlock<T1,T2> 類別,在程式從資料庫進行讀取時擷取結果以及所發生的任何例外狀況。
注意
TPL 資料流程程式庫 (System.Threading.Tasks.Dataflow 命名空間) 並未隨 .NET 散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表中選擇 [管理 NuGet 套件],並於線上搜尋 System.Threading.Tasks.Dataflow
套件。 除此之外也可使用 .Net Core CLI (執行 dotnet add package System.Threading.Tasks.Dataflow
) 加以安裝。
必要條件
開始此逐步解說之前,請先閱讀資料流程文件中的<聯結區塊>一節。
確定您電腦上有可用的 Northwind 資料庫複本 (Northwind.sdf)。 這個檔案通常位於資料夾:%Program Files%\Microsoft SQL Server Compact Edition\v3.5\Samples\。
重要
在某些 Windows 版本中,如果 Visual Studio 是以非系統管理員模式執行,就無法連線到 Northwind.sdf。 若要連線到 Northwind.sdf,請使用 [以系統管理員身分執行] 模式來啟動 Visual Studio 或 Visual Studio 開發人員命令提示字元。
本逐步解說包含下列各節:
建立主控台應用程式
在 Visual Studio 中,建立 Visual C# 或 Visual Basic 主控台應用程式專案。 在本文件中,專案命名為
DataflowBatchDatabase
。在您的專案中,加入 System.Data.SqlServerCe.dll 的參考,和 System.Threading.Tasks.Dataflow.dll 的參考。
確定 Form1.cs (在 Visual Basic 中為 Form1.vb) 包含下列
using
(在 Visual Basic 中為Imports
) 陳述式。using System; using System.Collections.Generic; using System.Data.SqlServerCe; using System.Diagnostics; using System.IO; using System.Threading.Tasks.Dataflow;
Imports System.Collections.Generic Imports System.Data.SqlServerCe Imports System.Diagnostics Imports System.IO Imports System.Threading.Tasks.Dataflow
將下列資料成員新增至
Program
類別。// The number of employees to add to the database. // TODO: Change this value to experiment with different numbers of // employees to insert into the database. static readonly int insertCount = 256; // The size of a single batch of employees to add to the database. // TODO: Change this value to experiment with different batch sizes. static readonly int insertBatchSize = 96; // The source database file. // TODO: Change this value if Northwind.sdf is at a different location // on your computer. static readonly string sourceDatabase = @"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"; // TODO: Change this value if you require a different temporary location. static readonly string scratchDatabase = @"C:\Temp\Northwind.sdf";
' The number of employees to add to the database. ' TODO: Change this value to experiment with different numbers of ' employees to insert into the database. Private Shared ReadOnly insertCount As Integer = 256 ' The size of a single batch of employees to add to the database. ' TODO: Change this value to experiment with different batch sizes. Private Shared ReadOnly insertBatchSize As Integer = 96 ' The source database file. ' TODO: Change this value if Northwind.sdf is at a different location ' on your computer. Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf" ' TODO: Change this value if you require a different temporary location. Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
定義員工類別
將 Employee
類別加入至 Program
類別。
// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
public int EmployeeID { get; set; }
public string LastName { get; set; }
public string FirstName { get; set; }
// A random number generator that helps tp generate
// Employee property values.
static Random rand = new Random(42);
// Possible random first names.
static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
// Possible random last names.
static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };
// Creates an Employee object that contains random
// property values.
public static Employee Random()
{
return new Employee
{
EmployeeID = -1,
LastName = lastNames[rand.Next() % lastNames.Length],
FirstName = firstNames[rand.Next() % firstNames.Length]
};
}
}
' Describes an employee. Each property maps to a
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
Public Property EmployeeID() As Integer
Public Property LastName() As String
Public Property FirstName() As String
' A random number generator that helps tp generate
' Employee property values.
Private Shared rand As New Random(42)
' Possible random first names.
Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
' Possible random last names.
Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}
' Creates an Employee object that contains random
' property values.
Public Shared Function Random() As Employee
Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
End Function
End Class
Employee
類別包含三個屬性:EmployeeID
、LastName
和 FirstName
。 這些屬性對應至 Northwind 資料庫中 Employees
資料表的 Employee ID
、Last Name
和 First Name
資料行。 在此示範中,Employee
類別也會定義 Random
方法,此方法會建立具有隨機屬性值的 Employee
物件。
定義員工資料庫作業
將 InsertEmployees
、GetEmployeeCount
和 GetEmployeeID
方法加入到 Program
類別。
// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
try
{
// Create the SQL command.
SqlCeCommand command = new SqlCeCommand(
"INSERT INTO Employees ([Last Name], [First Name])" +
"VALUES (@lastName, @firstName)",
connection);
connection.Open();
for (int i = 0; i < employees.Length; i++)
{
// Set parameters.
command.Parameters.Clear();
command.Parameters.Add("@lastName", employees[i].LastName);
command.Parameters.Add("@firstName", employees[i].FirstName);
// Execute the command.
command.ExecuteNonQuery();
}
}
finally
{
connection.Close();
}
}
}
// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
int result = 0;
using (SqlCeConnection sqlConnection =
new SqlCeConnection(connectionString))
{
SqlCeCommand sqlCommand = new SqlCeCommand(
"SELECT COUNT(*) FROM Employees", sqlConnection);
sqlConnection.Open();
try
{
result = (int)sqlCommand.ExecuteScalar();
}
finally
{
sqlConnection.Close();
}
}
return result;
}
// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
SqlCeCommand command = new SqlCeCommand(
string.Format(
"SELECT [Employee ID] FROM Employees " +
"WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
lastName, firstName),
connection);
connection.Open();
try
{
return (int)command.ExecuteScalar();
}
finally
{
connection.Close();
}
}
}
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
Using connection As New SqlCeConnection(connectionString)
Try
' Create the SQL command.
Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)
connection.Open()
For i As Integer = 0 To employees.Length - 1
' Set parameters.
command.Parameters.Clear()
command.Parameters.Add("@lastName", employees(i).LastName)
command.Parameters.Add("@firstName", employees(i).FirstName)
' Execute the command.
command.ExecuteNonQuery()
Next i
Finally
connection.Close()
End Try
End Using
End Sub
' Retrieves the number of entries in the Employees table in
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
Dim result As Integer = 0
Using sqlConnection As New SqlCeConnection(connectionString)
Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)
sqlConnection.Open()
Try
result = CInt(Fix(sqlCommand.ExecuteScalar()))
Finally
sqlConnection.Close()
End Try
End Using
Return result
End Function
' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
Using connection As New SqlCeConnection(connectionString)
Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)
connection.Open()
Try
Return CInt(Fix(command.ExecuteScalar()))
Finally
connection.Close()
End Try
End Using
End Function
InsertEmployees
方法會將新的員工記錄新增到資料庫。 GetEmployeeCount
方法會擷取 Employees
資料表中的項目數量。 GetEmployeeID
方法會擷取具有所提供之名稱的第一位員工識別碼。 這些方法都採用針對 Northwind 資料庫的連接字串,並且使用 System.Data.SqlServerCe
命名空間中的功能與資料庫通訊。
不使用緩衝將員工資料新增至資料庫
將 AddEmployees
和 PostRandomEmployees
方法加入到 Program
類別。
// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
Console.WriteLine("Adding {0} entries to Employee table...", count);
for (int i = 0; i < count; i++)
{
target.Post(Employee.Random());
}
}
// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
// Create an ActionBlock<Employee> object that adds a single
// employee entry to the database.
var insertEmployee = new ActionBlock<Employee>(e =>
InsertEmployees(new Employee[] { e }, connectionString));
// Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count);
// Set the dataflow block to the completed state and wait for
// all insert operations to complete.
insertEmployee.Complete();
insertEmployee.Completion.Wait();
}
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
Console.WriteLine("Adding {0} entries to Employee table...", count)
For i As Integer = 0 To count - 1
target.Post(Employee.Random())
Next i
End Sub
' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
' Create an ActionBlock<Employee> object that adds a single
' employee entry to the database.
Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))
' Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count)
' Set the dataflow block to the completed state and wait for
' all insert operations to complete.
insertEmployee.Complete()
insertEmployee.Completion.Wait()
End Sub
AddEmployees
方法會使用資料流程,將隨機的員工資料新增到資料庫。 它會建立呼叫 InsertEmployees
方法的 ActionBlock<TInput> 物件,以將員工項目新增到資料庫。 然後,AddEmployees
方法會呼叫 PostRandomEmployees
方法,將多個 Employee
物件張貼到 ActionBlock<TInput> 物件。 AddEmployees
方法接著等候所有插入作業完成。
使用緩衝將員工資料新增至資料庫
將 AddEmployeesBatched
方法加入至 Program
類別。
// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
int count)
{
// Create a BatchBlock<Employee> that holds several Employee objects and
// then propagates them out as an array.
var batchEmployees = new BatchBlock<Employee>(batchSize);
// Create an ActionBlock<Employee[]> object that adds multiple
// employee entries to the database.
var insertEmployees = new ActionBlock<Employee[]>(a =>
InsertEmployees(a, connectionString));
// Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees);
// When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });
// Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count);
// Set the batch block to the completed state and wait for
// all insert operations to complete.
batchEmployees.Complete();
insertEmployees.Completion.Wait();
}
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchBlock<Employee> that holds several Employee objects and
' then propagates them out as an array.
Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)
' Create an ActionBlock<Employee[]> object that adds multiple
' employee entries to the database.
Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))
' Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees)
' When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())
' Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count)
' Set the batch block to the completed state and wait for
' all insert operations to complete.
batchEmployees.Complete()
insertEmployees.Completion.Wait()
End Sub
這個方法類似於 AddEmployees
,不同之處在於它也會使用 BatchBlock<T> 類別來緩衝多個 Employee
物件,然後才將那些物件傳送到 ActionBlock<TInput> 物件。 因為 BatchBlock<T> 類別將多個元素以集合方式散佈,因此要修改 ActionBlock<TInput> 物件以在 Employee
物件陣列上運作。 如同在 AddEmployees
方法中,AddEmployeesBatched
會呼叫 PostRandomEmployees
方法以張貼多個 Employee
物件。不過,AddEmployeesBatched
會將這些物件張貼到 BatchBlock<T> 物件。 AddEmployeesBatched
方法也會等候所有插入作業完成。
使用緩衝聯結從資料庫讀取員工資料
將 GetRandomEmployees
方法加入至 Program
類別。
// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
int count)
{
// Create a BatchedJoinBlock<Employee, Exception> object that holds
// both employee and exception data.
var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);
// Holds the total number of exceptions that occurred.
int totalErrors = 0;
// Create an action block that prints employee and error information
// to the console.
var printEmployees =
new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
{
// Print information about the employees in this batch.
Console.WriteLine("Received a batch...");
foreach (Employee e in data.Item1)
{
Console.WriteLine("Last={0} First={1} ID={2}",
e.LastName, e.FirstName, e.EmployeeID);
}
// Print the error count for this batch.
Console.WriteLine("There were {0} errors in this batch...",
data.Item2.Count);
// Update total error count.
totalErrors += data.Item2.Count;
});
// Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees);
// When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });
// Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...");
for (int i = 0; i < count; i++)
{
try
{
// Create a random employee.
Employee e = Employee.Random();
// Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);
// Post the Employee object to the Employee target of
// the batched join block.
selectEmployees.Target1.Post(e);
}
catch (NullReferenceException e)
{
// GetEmployeeID throws NullReferenceException when there is
// no such employee with the given name. When this happens,
// post the Exception object to the Exception target of
// the batched join block.
selectEmployees.Target2.Post(e);
}
}
// Set the batched join block to the completed state and wait for
// all retrieval operations to complete.
selectEmployees.Complete();
printEmployees.Completion.Wait();
// Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors);
}
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchedJoinBlock<Employee, Exception> object that holds
' both employee and exception data.
Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)
' Holds the total number of exceptions that occurred.
Dim totalErrors As Integer = 0
' Create an action block that prints employee and error information
' to the console.
Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
' Print information about the employees in this batch.
' Print the error count for this batch.
' Update total error count.
Console.WriteLine("Received a batch...")
For Each e As Employee In data.Item1
Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
Next e
Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
totalErrors += data.Item2.Count
End Sub)
' Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees)
' When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())
' Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...")
For i As Integer = 0 To count - 1
Try
' Create a random employee.
Dim e As Employee = Employee.Random()
' Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)
' Post the Employee object to the Employee target of
' the batched join block.
selectEmployees.Target1.Post(e)
Catch e As NullReferenceException
' GetEmployeeID throws NullReferenceException when there is
' no such employee with the given name. When this happens,
' post the Exception object to the Exception target of
' the batched join block.
selectEmployees.Target2.Post(e)
End Try
Next i
' Set the batched join block to the completed state and wait for
' all retrieval operations to complete.
selectEmployees.Complete()
printEmployees.Completion.Wait()
' Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub
此方法會將有關隨機員工的資訊列印至主控台。 它會建立數個隨機的 Employee
物件,並呼叫 GetEmployeeID
方法以擷取每個物件的唯一識別碼。 因為 GetEmployeeID
方法會在沒有符合指定之姓名的員工時擲回例外狀況,GetRandomEmployees
方法會使用 BatchedJoinBlock<T1,T2> 類別來儲存呼叫 GetEmployeeID
成功時的 Employee
物件,以及呼叫失敗時的 System.Exception 物件。 本範例中的 ActionBlock<TInput> 物件會作為保存 Employee
物件清單和 Exception 物件清單的 Tuple<T1,T2> 物件。 當接收的 Employee
總和和 Exception 物件計數與批次大小相等時,BatchedJoinBlock<T1,T2> 物件會散佈此資料。
完整範例
下列範例顯示完整程式碼。 Main
方法會比較執行批次資料庫插入作業所需的時間,與執行非批次資料庫插入作業所需的時間。 它也會示範緩衝聯結的使用方式,以從資料庫讀取員工資料,而且也會回報錯誤。
using System;
using System.Collections.Generic;
using System.Data.SqlServerCe;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use batched dataflow blocks to improve
// the performance of database operations.
namespace DataflowBatchDatabase
{
class Program
{
// The number of employees to add to the database.
// TODO: Change this value to experiment with different numbers of
// employees to insert into the database.
static readonly int insertCount = 256;
// The size of a single batch of employees to add to the database.
// TODO: Change this value to experiment with different batch sizes.
static readonly int insertBatchSize = 96;
// The source database file.
// TODO: Change this value if Northwind.sdf is at a different location
// on your computer.
static readonly string sourceDatabase =
@"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf";
// TODO: Change this value if you require a different temporary location.
static readonly string scratchDatabase =
@"C:\Temp\Northwind.sdf";
// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
public int EmployeeID { get; set; }
public string LastName { get; set; }
public string FirstName { get; set; }
// A random number generator that helps tp generate
// Employee property values.
static Random rand = new Random(42);
// Possible random first names.
static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
// Possible random last names.
static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };
// Creates an Employee object that contains random
// property values.
public static Employee Random()
{
return new Employee
{
EmployeeID = -1,
LastName = lastNames[rand.Next() % lastNames.Length],
FirstName = firstNames[rand.Next() % firstNames.Length]
};
}
}
// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
try
{
// Create the SQL command.
SqlCeCommand command = new SqlCeCommand(
"INSERT INTO Employees ([Last Name], [First Name])" +
"VALUES (@lastName, @firstName)",
connection);
connection.Open();
for (int i = 0; i < employees.Length; i++)
{
// Set parameters.
command.Parameters.Clear();
command.Parameters.Add("@lastName", employees[i].LastName);
command.Parameters.Add("@firstName", employees[i].FirstName);
// Execute the command.
command.ExecuteNonQuery();
}
}
finally
{
connection.Close();
}
}
}
// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
int result = 0;
using (SqlCeConnection sqlConnection =
new SqlCeConnection(connectionString))
{
SqlCeCommand sqlCommand = new SqlCeCommand(
"SELECT COUNT(*) FROM Employees", sqlConnection);
sqlConnection.Open();
try
{
result = (int)sqlCommand.ExecuteScalar();
}
finally
{
sqlConnection.Close();
}
}
return result;
}
// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
SqlCeCommand command = new SqlCeCommand(
string.Format(
"SELECT [Employee ID] FROM Employees " +
"WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
lastName, firstName),
connection);
connection.Open();
try
{
return (int)command.ExecuteScalar();
}
finally
{
connection.Close();
}
}
}
// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
Console.WriteLine("Adding {0} entries to Employee table...", count);
for (int i = 0; i < count; i++)
{
target.Post(Employee.Random());
}
}
// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
// Create an ActionBlock<Employee> object that adds a single
// employee entry to the database.
var insertEmployee = new ActionBlock<Employee>(e =>
InsertEmployees(new Employee[] { e }, connectionString));
// Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count);
// Set the dataflow block to the completed state and wait for
// all insert operations to complete.
insertEmployee.Complete();
insertEmployee.Completion.Wait();
}
// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
int count)
{
// Create a BatchBlock<Employee> that holds several Employee objects and
// then propagates them out as an array.
var batchEmployees = new BatchBlock<Employee>(batchSize);
// Create an ActionBlock<Employee[]> object that adds multiple
// employee entries to the database.
var insertEmployees = new ActionBlock<Employee[]>(a =>
InsertEmployees(a, connectionString));
// Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees);
// When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });
// Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count);
// Set the batch block to the completed state and wait for
// all insert operations to complete.
batchEmployees.Complete();
insertEmployees.Completion.Wait();
}
// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
int count)
{
// Create a BatchedJoinBlock<Employee, Exception> object that holds
// both employee and exception data.
var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);
// Holds the total number of exceptions that occurred.
int totalErrors = 0;
// Create an action block that prints employee and error information
// to the console.
var printEmployees =
new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
{
// Print information about the employees in this batch.
Console.WriteLine("Received a batch...");
foreach (Employee e in data.Item1)
{
Console.WriteLine("Last={0} First={1} ID={2}",
e.LastName, e.FirstName, e.EmployeeID);
}
// Print the error count for this batch.
Console.WriteLine("There were {0} errors in this batch...",
data.Item2.Count);
// Update total error count.
totalErrors += data.Item2.Count;
});
// Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees);
// When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });
// Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...");
for (int i = 0; i < count; i++)
{
try
{
// Create a random employee.
Employee e = Employee.Random();
// Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);
// Post the Employee object to the Employee target of
// the batched join block.
selectEmployees.Target1.Post(e);
}
catch (NullReferenceException e)
{
// GetEmployeeID throws NullReferenceException when there is
// no such employee with the given name. When this happens,
// post the Exception object to the Exception target of
// the batched join block.
selectEmployees.Target2.Post(e);
}
}
// Set the batched join block to the completed state and wait for
// all retrieval operations to complete.
selectEmployees.Complete();
printEmployees.Completion.Wait();
// Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors);
}
static void Main(string[] args)
{
// Create a connection string for accessing the database.
// The connection string refers to the temporary database location.
string connectionString = string.Format(@"Data Source={0}",
scratchDatabase);
// Create a Stopwatch object to time database insert operations.
Stopwatch stopwatch = new Stopwatch();
// Start with a clean database file by copying the source database to
// the temporary location.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple insert operations without batching.
Console.WriteLine("Demonstrating non-batched database insert operations...");
Console.WriteLine("Original size of Employee table: {0}.",
GetEmployeeCount(connectionString));
stopwatch.Start();
AddEmployees(connectionString, insertCount);
stopwatch.Stop();
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);
Console.WriteLine();
// Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple insert operations, this time with batching.
Console.WriteLine("Demonstrating batched database insert operations...");
Console.WriteLine("Original size of Employee table: {0}.",
GetEmployeeCount(connectionString));
stopwatch.Restart();
AddEmployeesBatched(connectionString, insertBatchSize, insertCount);
stopwatch.Stop();
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);
Console.WriteLine();
// Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple retrieval operations with error reporting.
Console.WriteLine("Demonstrating batched join database select operations...");
// Add a small number of employees to the database.
AddEmployeesBatched(connectionString, insertBatchSize, 16);
// Query for random employees.
GetRandomEmployees(connectionString, insertBatchSize, 10);
}
}
}
/* Sample output:
Demonstrating non-batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 11035 ms.
Demonstrating batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 197 ms.
Demonstrating batched join database insert operations...
Adding 16 entries to Employee table...
Selecting items from Employee table...
Received a batch...
Last=Jones First=Tom ID=21
Last=Jones First=John ID=24
Last=Smith First=Tom ID=26
Last=Jones First=Tom ID=21
There were 4 errors in this batch...
Received a batch...
Last=Smith First=Tom ID=26
Last=Jones First=Mike ID=28
There were 0 errors in this batch...
Finished. There were 4 total errors.
*/
Imports System.Collections.Generic
Imports System.Data.SqlServerCe
Imports System.Diagnostics
Imports System.IO
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use batched dataflow blocks to improve
' the performance of database operations.
Namespace DataflowBatchDatabase
Friend Class Program
' The number of employees to add to the database.
' TODO: Change this value to experiment with different numbers of
' employees to insert into the database.
Private Shared ReadOnly insertCount As Integer = 256
' The size of a single batch of employees to add to the database.
' TODO: Change this value to experiment with different batch sizes.
Private Shared ReadOnly insertBatchSize As Integer = 96
' The source database file.
' TODO: Change this value if Northwind.sdf is at a different location
' on your computer.
Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"
' TODO: Change this value if you require a different temporary location.
Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
' Describes an employee. Each property maps to a
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
Public Property EmployeeID() As Integer
Public Property LastName() As String
Public Property FirstName() As String
' A random number generator that helps tp generate
' Employee property values.
Private Shared rand As New Random(42)
' Possible random first names.
Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
' Possible random last names.
Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}
' Creates an Employee object that contains random
' property values.
Public Shared Function Random() As Employee
Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
End Function
End Class
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
Using connection As New SqlCeConnection(connectionString)
Try
' Create the SQL command.
Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)
connection.Open()
For i As Integer = 0 To employees.Length - 1
' Set parameters.
command.Parameters.Clear()
command.Parameters.Add("@lastName", employees(i).LastName)
command.Parameters.Add("@firstName", employees(i).FirstName)
' Execute the command.
command.ExecuteNonQuery()
Next i
Finally
connection.Close()
End Try
End Using
End Sub
' Retrieves the number of entries in the Employees table in
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
Dim result As Integer = 0
Using sqlConnection As New SqlCeConnection(connectionString)
Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)
sqlConnection.Open()
Try
result = CInt(Fix(sqlCommand.ExecuteScalar()))
Finally
sqlConnection.Close()
End Try
End Using
Return result
End Function
' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
Using connection As New SqlCeConnection(connectionString)
Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)
connection.Open()
Try
Return CInt(Fix(command.ExecuteScalar()))
Finally
connection.Close()
End Try
End Using
End Function
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
Console.WriteLine("Adding {0} entries to Employee table...", count)
For i As Integer = 0 To count - 1
target.Post(Employee.Random())
Next i
End Sub
' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
' Create an ActionBlock<Employee> object that adds a single
' employee entry to the database.
Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))
' Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count)
' Set the dataflow block to the completed state and wait for
' all insert operations to complete.
insertEmployee.Complete()
insertEmployee.Completion.Wait()
End Sub
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchBlock<Employee> that holds several Employee objects and
' then propagates them out as an array.
Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)
' Create an ActionBlock<Employee[]> object that adds multiple
' employee entries to the database.
Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))
' Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees)
' When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())
' Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count)
' Set the batch block to the completed state and wait for
' all insert operations to complete.
batchEmployees.Complete()
insertEmployees.Completion.Wait()
End Sub
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchedJoinBlock<Employee, Exception> object that holds
' both employee and exception data.
Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)
' Holds the total number of exceptions that occurred.
Dim totalErrors As Integer = 0
' Create an action block that prints employee and error information
' to the console.
Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
' Print information about the employees in this batch.
' Print the error count for this batch.
' Update total error count.
Console.WriteLine("Received a batch...")
For Each e As Employee In data.Item1
Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
Next e
Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
totalErrors += data.Item2.Count
End Sub)
' Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees)
' When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())
' Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...")
For i As Integer = 0 To count - 1
Try
' Create a random employee.
Dim e As Employee = Employee.Random()
' Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)
' Post the Employee object to the Employee target of
' the batched join block.
selectEmployees.Target1.Post(e)
Catch e As NullReferenceException
' GetEmployeeID throws NullReferenceException when there is
' no such employee with the given name. When this happens,
' post the Exception object to the Exception target of
' the batched join block.
selectEmployees.Target2.Post(e)
End Try
Next i
' Set the batched join block to the completed state and wait for
' all retrieval operations to complete.
selectEmployees.Complete()
printEmployees.Completion.Wait()
' Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub
Shared Sub Main(ByVal args() As String)
' Create a connection string for accessing the database.
' The connection string refers to the temporary database location.
Dim connectionString As String = String.Format("Data Source={0}", scratchDatabase)
' Create a Stopwatch object to time database insert operations.
Dim stopwatch As New Stopwatch()
' Start with a clean database file by copying the source database to
' the temporary location.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple insert operations without batching.
Console.WriteLine("Demonstrating non-batched database insert operations...")
Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
stopwatch.Start()
AddEmployees(connectionString, insertCount)
stopwatch.Stop()
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)
Console.WriteLine()
' Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple insert operations, this time with batching.
Console.WriteLine("Demonstrating batched database insert operations...")
Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
stopwatch.Restart()
AddEmployeesBatched(connectionString, insertBatchSize, insertCount)
stopwatch.Stop()
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)
Console.WriteLine()
' Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple retrieval operations with error reporting.
Console.WriteLine("Demonstrating batched join database select operations...")
' Add a small number of employees to the database.
AddEmployeesBatched(connectionString, insertBatchSize, 16)
' Query for random employees.
GetRandomEmployees(connectionString, insertBatchSize, 10)
End Sub
End Class
End Namespace
' Sample output:
'Demonstrating non-batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 11035 ms.
'
'Demonstrating batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 197 ms.
'
'Demonstrating batched join database insert operations...
'Adding 16 entries to Employee table...
'Selecting items from Employee table...
'Received a batch...
'Last=Jones First=Tom ID=21
'Last=Jones First=John ID=24
'Last=Smith First=Tom ID=26
'Last=Jones First=Tom ID=21
'There were 4 errors in this batch...
'Received a batch...
'Last=Smith First=Tom ID=26
'Last=Jones First=Mike ID=28
'There were 0 errors in this batch...
'Finished. There were 4 total errors.
'