Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
De TPL-gegevensstroombibliotheek biedt de System.Threading.Tasks.Dataflow.BatchBlock<T>- en System.Threading.Tasks.Dataflow.BatchedJoinBlock<T1,T2> klassen, zodat u gegevens van een of meer bronnen kunt ontvangen en bufferen en deze vervolgens als één verzameling kunt doorgeven. Dit batchmechanisme is handig wanneer u gegevens verzamelt uit een of meer bronnen en vervolgens meerdere gegevenselementen als batch verwerkt. Denk bijvoorbeeld aan een toepassing die gebruikmaakt van een gegevensstroom om records in een database in te voegen. Deze bewerking kan efficiënter zijn als meerdere items tegelijk worden ingevoegd in plaats van één voor één opeenvolgend. In dit document wordt beschreven hoe u de BatchBlock<T>-klasse gebruikt om de efficiëntie van dergelijke databaseinvoegbewerkingen te verbeteren. Ook wordt beschreven hoe u de BatchedJoinBlock<T1,T2>-klasse gebruikt om zowel de resultaten als eventuele uitzonderingen vast te leggen die optreden wanneer het programma uit een database leest.
Notitie
De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow-naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow
-pakket. Om deze te installeren met behulp van de .NET Core CLI, voert u dotnet add package System.Threading.Tasks.Dataflow
uit.
Benodigdheden
Lees het onderdeel Join Blocks in het document Gegevensstroom voordat u aan deze instructie begint.
Zorg ervoor dat u een kopie hebt van de Northwind-database, Northwind.sdf, die beschikbaar is op uw computer.
Belangrijk
In sommige versies van Windows kunt u geen verbinding maken met Northwind.sdf als Visual Studio wordt uitgevoerd in een niet-beheerdersmodus. Als u verbinding wilt maken met Northwind.sdf, start u Visual Studio of een Ontwikkelaarsopdrachtprompt voor Visual Studio in de Als beheerder uitvoeren modus.
De consoletoepassing maken
Maak in Visual Studio een Visual C# of Visual Basic Console Application project. In dit document heet het project
DataflowBatchDatabase
.Zorg ervoor dat Form1.cs (Form1.vb voor Visual Basic) de volgende
using
(Imports
in Visual Basic) bevat.using System; using System.Collections.Generic; using System.Data.SqlClient; using System.Diagnostics; using System.IO; using System.Threading.Tasks.Dataflow;
Imports System.Collections.Generic Imports System.Data.SqlClient Imports System.Data.SqlServerCe Imports System.Diagnostics Imports System.IO Imports System.Threading.Tasks.Dataflow
Voeg de volgende gegevensleden toe aan de
Program
-klasse.// The number of employees to add to the database. // TODO: Change this value to experiment with different numbers of // employees to insert into the database. static readonly int insertCount = 256; // The size of a single batch of employees to add to the database. // TODO: Change this value to experiment with different batch sizes. static readonly int insertBatchSize = 96; // The source database file. // TODO: Change this value if Northwind.sdf is at a different location // on your computer. static readonly string sourceDatabase = @"C:\...\Northwind.sdf"; // TODO: Change this value if you require a different temporary location. static readonly string scratchDatabase = @"C:\Temp\Northwind.sdf";
' The number of employees to add to the database. ' TODO: Change this value to experiment with different numbers of ' employees to insert into the database. Private Shared ReadOnly insertCount As Integer = 256 ' The size of a single batch of employees to add to the database. ' TODO: Change this value to experiment with different batch sizes. Private Shared ReadOnly insertBatchSize As Integer = 96 ' The source database file. ' TODO: Change this value if Northwind.sdf is at a different location ' on your computer. Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf" ' TODO: Change this value if you require a different temporary location. Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
De werknemersklasse definiëren
Voeg de Program
-klasse toe aan de Employee
klasse.
// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
public int EmployeeID { get; set; }
public string LastName { get; set; }
public string FirstName { get; set; }
// A random number generator that helps tp generate
// Employee property values.
static Random rand = new Random(42);
// Possible random first names.
static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
// Possible random last names.
static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };
// Creates an Employee object that contains random
// property values.
public static Employee Random()
{
return new Employee
{
EmployeeID = -1,
LastName = lastNames[rand.Next() % lastNames.Length],
FirstName = firstNames[rand.Next() % firstNames.Length]
};
}
}
' Describes an employee. Each property maps to a
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
Public Property EmployeeID() As Integer
Public Property LastName() As String
Public Property FirstName() As String
' A random number generator that helps tp generate
' Employee property values.
Private Shared rand As New Random(42)
' Possible random first names.
Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
' Possible random last names.
Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}
' Creates an Employee object that contains random
' property values.
Public Shared Function Random() As Employee
Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
End Function
End Class
De klasse Employee
bevat drie eigenschappen, EmployeeID
, LastName
en FirstName
. Deze eigenschappen komen overeen met de kolommen Employee ID
, Last Name
en First Name
in de Employees
tabel in de Northwind-database. Voor deze demonstratie definieert de Employee
klasse ook de Random
methode, waarmee een Employee
-object wordt gemaakt met willekeurige waarden voor de eigenschappen.
Databasebewerkingen voor werknemers definiëren
Voeg de methoden Program
, InsertEmployees
en GetEmployeeCount
toe aan de GetEmployeeID
klasse.
// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
using (SqlConnection connection =
new SqlConnection(connectionString))
{
try
{
// Create the SQL command.
SqlCommand command = new SqlCommand(
"INSERT INTO Employees ([Last Name], [First Name])" +
"VALUES (@lastName, @firstName)",
connection);
connection.Open();
for (int i = 0; i < employees.Length; i++)
{
// Set parameters.
command.Parameters.Clear();
command.Parameters.Add("@lastName", employees[i].LastName);
command.Parameters.Add("@firstName", employees[i].FirstName);
// Execute the command.
command.ExecuteNonQuery();
}
}
finally
{
connection.Close();
}
}
}
// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
int result = 0;
using (SqlConnection sqlConnection =
new SqlConnection(connectionString))
{
SqlCommand sqlCommand = new SqlCommand(
"SELECT COUNT(*) FROM Employees", sqlConnection);
sqlConnection.Open();
try
{
result = (int)sqlCommand.ExecuteScalar();
}
finally
{
sqlConnection.Close();
}
}
return result;
}
// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
string connectionString)
{
using (SqlConnection connection =
new SqlConnection(connectionString))
{
SqlCommand command = new SqlCommand(
string.Format(
"SELECT [Employee ID] FROM Employees " +
"WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
lastName, firstName),
connection);
connection.Open();
try
{
return (int)command.ExecuteScalar();
}
finally
{
connection.Close();
}
}
}
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
Using connection As New SqlConnection(connectionString)
Try
' Create the SQL command.
Dim command As New SqlCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)
connection.Open()
For i As Integer = 0 To employees.Length - 1
' Set parameters.
command.Parameters.Clear()
command.Parameters.Add("@lastName", employees(i).LastName)
command.Parameters.Add("@firstName", employees(i).FirstName)
' Execute the command.
command.ExecuteNonQuery()
Next i
Finally
connection.Close()
End Try
End Using
End Sub
' Retrieves the number of entries in the Employees table in
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
Dim result As Integer = 0
Using sqlConnection As New SqlConnection(connectionString)
Dim sqlCommand As New SqlCommand("SELECT COUNT(*) FROM Employees", sqlConnection)
sqlConnection.Open()
Try
result = CInt(Fix(sqlCommand.ExecuteScalar()))
Finally
sqlConnection.Close()
End Try
End Using
Return result
End Function
' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
Using connection As New SqlConnection(connectionString)
Dim command As New SqlCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)
connection.Open()
Try
Return CInt(Fix(command.ExecuteScalar()))
Finally
connection.Close()
End Try
End Using
End Function
Met de methode InsertEmployees
worden nieuwe werknemersrecords aan de database toegevoegd. Met de methode GetEmployeeCount
wordt het aantal vermeldingen in de Employees
tabel opgehaald. Met de methode GetEmployeeID
wordt de id opgehaald van de eerste werknemer met de opgegeven naam. Elk van deze methoden gebruikt een verbindingsreeks naar de Northwind-database.
Werknemersgegevens toevoegen aan de database zonder buffering
Voeg de Program
- en AddEmployees
-methoden toe aan de PostRandomEmployees
klasse.
// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
Console.WriteLine($"Adding {count} entries to Employee table...");
for (int i = 0; i < count; i++)
{
target.Post(Employee.Random());
}
}
// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
// Create an ActionBlock<Employee> object that adds a single
// employee entry to the database.
var insertEmployee = new ActionBlock<Employee>(e =>
InsertEmployees(new Employee[] { e }, connectionString));
// Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count);
// Set the dataflow block to the completed state and wait for
// all insert operations to complete.
insertEmployee.Complete();
insertEmployee.Completion.Wait();
}
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
Console.WriteLine("Adding {0} entries to Employee table...", count)
For i As Integer = 0 To count - 1
target.Post(Employee.Random())
Next i
End Sub
' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
' Create an ActionBlock<Employee> object that adds a single
' employee entry to the database.
Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))
' Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count)
' Set the dataflow block to the completed state and wait for
' all insert operations to complete.
insertEmployee.Complete()
insertEmployee.Completion.Wait()
End Sub
Met de methode AddEmployees
worden willekeurige werknemersgegevens aan de database toegevoegd met behulp van de gegevensstroom. Er wordt een ActionBlock<TInput>-object gemaakt waarmee de InsertEmployees
methode wordt aangeroepen om een werknemervermelding toe te voegen aan de database. De methode AddEmployees
roept vervolgens de PostRandomEmployees
methode aan om meerdere Employee
objecten te posten naar het ActionBlock<TInput>-object. De methode AddEmployees
wacht vervolgens totdat alle invoegbewerkingen zijn voltooid.
Buffering gebruiken om werknemersgegevens toe te voegen aan de database
Voeg de Program
-methode toe aan de AddEmployeesBatched
klasse.
// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
int count)
{
// Create a BatchBlock<Employee> that holds several Employee objects and
// then propagates them out as an array.
var batchEmployees = new BatchBlock<Employee>(batchSize);
// Create an ActionBlock<Employee[]> object that adds multiple
// employee entries to the database.
var insertEmployees = new ActionBlock<Employee[]>(a =>
InsertEmployees(a, connectionString));
// Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees);
// When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });
// Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count);
// Set the batch block to the completed state and wait for
// all insert operations to complete.
batchEmployees.Complete();
insertEmployees.Completion.Wait();
}
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchBlock<Employee> that holds several Employee objects and
' then propagates them out as an array.
Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)
' Create an ActionBlock<Employee[]> object that adds multiple
' employee entries to the database.
Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))
' Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees)
' When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())
' Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count)
' Set the batch block to the completed state and wait for
' all insert operations to complete.
batchEmployees.Complete()
insertEmployees.Completion.Wait()
End Sub
Deze methode lijkt op AddEmployees
, behalve dat de klasse BatchBlock<T> ook wordt gebruikt om meerdere Employee
objecten te bufferen voordat deze objecten naar het ActionBlock<TInput>-object worden verzonden. Omdat de BatchBlock<T> klasse meerdere elementen als verzameling doorgeeft, wordt het ActionBlock<TInput> object gewijzigd om te reageren op een matrix van Employee
objecten. Net als in de methode AddEmployees
roept AddEmployeesBatched
de methode PostRandomEmployees
aan om meerdere Employee
objecten te posten; AddEmployeesBatched
deze objecten echter op het BatchBlock<T>-object plaatst. De methode AddEmployeesBatched
wacht ook totdat alle invoegbewerkingen zijn voltooid.
Gebufferde join gebruiken om werknemersgegevens uit de database te lezen
Voeg de Program
-methode toe aan de GetRandomEmployees
klasse.
// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
int count)
{
// Create a BatchedJoinBlock<Employee, Exception> object that holds
// both employee and exception data.
var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);
// Holds the total number of exceptions that occurred.
int totalErrors = 0;
// Create an action block that prints employee and error information
// to the console.
var printEmployees =
new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
{
// Print information about the employees in this batch.
Console.WriteLine("Received a batch...");
foreach (Employee e in data.Item1)
{
Console.WriteLine($"Last={e.LastName} First={e.FirstName} ID={e.EmployeeID}");
}
// Print the error count for this batch.
Console.WriteLine($"There were {data.Item2.Count} errors in this batch...");
// Update total error count.
totalErrors += data.Item2.Count;
});
// Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees);
// When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });
// Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...");
for (int i = 0; i < count; i++)
{
try
{
// Create a random employee.
Employee e = Employee.Random();
// Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);
// Post the Employee object to the Employee target of
// the batched join block.
selectEmployees.Target1.Post(e);
}
catch (NullReferenceException e)
{
// GetEmployeeID throws NullReferenceException when there is
// no such employee with the given name. When this happens,
// post the Exception object to the Exception target of
// the batched join block.
selectEmployees.Target2.Post(e);
}
}
// Set the batched join block to the completed state and wait for
// all retrieval operations to complete.
selectEmployees.Complete();
printEmployees.Completion.Wait();
// Print the total error count.
Console.WriteLine($"Finished. There were {totalErrors} total errors.");
}
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchedJoinBlock<Employee, Exception> object that holds
' both employee and exception data.
Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)
' Holds the total number of exceptions that occurred.
Dim totalErrors As Integer = 0
' Create an action block that prints employee and error information
' to the console.
Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
' Print information about the employees in this batch.
' Print the error count for this batch.
' Update total error count.
Console.WriteLine("Received a batch...")
For Each e As Employee In data.Item1
Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
Next e
Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
totalErrors += data.Item2.Count
End Sub)
' Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees)
' When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())
' Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...")
For i As Integer = 0 To count - 1
Try
' Create a random employee.
Dim e As Employee = Employee.Random()
' Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)
' Post the Employee object to the Employee target of
' the batched join block.
selectEmployees.Target1.Post(e)
Catch e As NullReferenceException
' GetEmployeeID throws NullReferenceException when there is
' no such employee with the given name. When this happens,
' post the Exception object to the Exception target of
' the batched join block.
selectEmployees.Target2.Post(e)
End Try
Next i
' Set the batched join block to the completed state and wait for
' all retrieval operations to complete.
selectEmployees.Complete()
printEmployees.Completion.Wait()
' Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub
Met deze methode wordt informatie over willekeurige werknemers naar de console afgedrukt. Er worden verschillende willekeurige Employee
objecten gemaakt en de GetEmployeeID
methode aangeroepen om de unieke id voor elk object op te halen. Omdat de methode GetEmployeeID
een uitzondering genereert als er geen overeenkomende werknemer is met de opgegeven voor- en achternamen, gebruikt de methode GetRandomEmployees
de BatchedJoinBlock<T1,T2> klasse om Employee
objecten op te slaan voor geslaagde aanroepen naar GetEmployeeID
en System.Exception objecten voor aanroepen die mislukken. Het ActionBlock<TInput> object in dit voorbeeld werkt op een Tuple<T1,T2> object dat een lijst met Employee
objecten en een lijst met Exception objecten bevat. Het BatchedJoinBlock<T1,T2>-object geeft deze gegevens door wanneer de som van de ontvangen Employee
en Exception objectaantallen gelijk is aan de batchgrootte.
Het volledige voorbeeld
In het volgende voorbeeld ziet u de volledige code. De methode Main
vergelijkt de tijd die nodig is om batchgewijze databaseinvoegingen uit te voeren versus de tijd die nodig is om niet-batchgewijze databaseinvoegingen uit te voeren. Ook wordt het gebruik van gebufferde join gedemonstreerd om werknemersgegevens uit de database te lezen en ook fouten te rapporteren.
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use batched dataflow blocks to improve
// the performance of database operations.
namespace DataflowBatchDatabase
{
class Program
{
// The number of employees to add to the database.
// TODO: Change this value to experiment with different numbers of
// employees to insert into the database.
static readonly int insertCount = 256;
// The size of a single batch of employees to add to the database.
// TODO: Change this value to experiment with different batch sizes.
static readonly int insertBatchSize = 96;
// The source database file.
// TODO: Change this value if Northwind.sdf is at a different location
// on your computer.
static readonly string sourceDatabase =
@"C:\...\Northwind.sdf";
// TODO: Change this value if you require a different temporary location.
static readonly string scratchDatabase =
@"C:\Temp\Northwind.sdf";
// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
public int EmployeeID { get; set; }
public string LastName { get; set; }
public string FirstName { get; set; }
// A random number generator that helps tp generate
// Employee property values.
static Random rand = new Random(42);
// Possible random first names.
static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
// Possible random last names.
static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };
// Creates an Employee object that contains random
// property values.
public static Employee Random()
{
return new Employee
{
EmployeeID = -1,
LastName = lastNames[rand.Next() % lastNames.Length],
FirstName = firstNames[rand.Next() % firstNames.Length]
};
}
}
// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
using (SqlConnection connection =
new SqlConnection(connectionString))
{
try
{
// Create the SQL command.
SqlCommand command = new SqlCommand(
"INSERT INTO Employees ([Last Name], [First Name])" +
"VALUES (@lastName, @firstName)",
connection);
connection.Open();
for (int i = 0; i < employees.Length; i++)
{
// Set parameters.
command.Parameters.Clear();
command.Parameters.Add("@lastName", employees[i].LastName);
command.Parameters.Add("@firstName", employees[i].FirstName);
// Execute the command.
command.ExecuteNonQuery();
}
}
finally
{
connection.Close();
}
}
}
// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
int result = 0;
using (SqlConnection sqlConnection =
new SqlConnection(connectionString))
{
SqlCommand sqlCommand = new SqlCommand(
"SELECT COUNT(*) FROM Employees", sqlConnection);
sqlConnection.Open();
try
{
result = (int)sqlCommand.ExecuteScalar();
}
finally
{
sqlConnection.Close();
}
}
return result;
}
// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
string connectionString)
{
using (SqlConnection connection =
new SqlConnection(connectionString))
{
SqlCommand command = new SqlCommand(
string.Format(
"SELECT [Employee ID] FROM Employees " +
"WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
lastName, firstName),
connection);
connection.Open();
try
{
return (int)command.ExecuteScalar();
}
finally
{
connection.Close();
}
}
}
// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
Console.WriteLine($"Adding {count} entries to Employee table...");
for (int i = 0; i < count; i++)
{
target.Post(Employee.Random());
}
}
// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
// Create an ActionBlock<Employee> object that adds a single
// employee entry to the database.
var insertEmployee = new ActionBlock<Employee>(e =>
InsertEmployees(new Employee[] { e }, connectionString));
// Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count);
// Set the dataflow block to the completed state and wait for
// all insert operations to complete.
insertEmployee.Complete();
insertEmployee.Completion.Wait();
}
// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
int count)
{
// Create a BatchBlock<Employee> that holds several Employee objects and
// then propagates them out as an array.
var batchEmployees = new BatchBlock<Employee>(batchSize);
// Create an ActionBlock<Employee[]> object that adds multiple
// employee entries to the database.
var insertEmployees = new ActionBlock<Employee[]>(a =>
InsertEmployees(a, connectionString));
// Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees);
// When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });
// Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count);
// Set the batch block to the completed state and wait for
// all insert operations to complete.
batchEmployees.Complete();
insertEmployees.Completion.Wait();
}
// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
int count)
{
// Create a BatchedJoinBlock<Employee, Exception> object that holds
// both employee and exception data.
var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);
// Holds the total number of exceptions that occurred.
int totalErrors = 0;
// Create an action block that prints employee and error information
// to the console.
var printEmployees =
new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
{
// Print information about the employees in this batch.
Console.WriteLine("Received a batch...");
foreach (Employee e in data.Item1)
{
Console.WriteLine($"Last={e.LastName} First={e.FirstName} ID={e.EmployeeID}");
}
// Print the error count for this batch.
Console.WriteLine($"There were {data.Item2.Count} errors in this batch...");
// Update total error count.
totalErrors += data.Item2.Count;
});
// Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees);
// When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });
// Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...");
for (int i = 0; i < count; i++)
{
try
{
// Create a random employee.
Employee e = Employee.Random();
// Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);
// Post the Employee object to the Employee target of
// the batched join block.
selectEmployees.Target1.Post(e);
}
catch (NullReferenceException e)
{
// GetEmployeeID throws NullReferenceException when there is
// no such employee with the given name. When this happens,
// post the Exception object to the Exception target of
// the batched join block.
selectEmployees.Target2.Post(e);
}
}
// Set the batched join block to the completed state and wait for
// all retrieval operations to complete.
selectEmployees.Complete();
printEmployees.Completion.Wait();
// Print the total error count.
Console.WriteLine($"Finished. There were {totalErrors} total errors.");
}
static void Main(string[] args)
{
// Create a connection string for accessing the database.
// The connection string refers to the temporary database location.
string connectionString = "...";
// Create a Stopwatch object to time database insert operations.
Stopwatch stopwatch = new Stopwatch();
// Start with a clean database file by copying the source database to
// the temporary location.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple insert operations without batching.
Console.WriteLine("Demonstrating non-batched database insert operations...");
Console.WriteLine($"Original size of Employee table: {GetEmployeeCount(connectionString)}.");
stopwatch.Start();
AddEmployees(connectionString, insertCount);
stopwatch.Stop();
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);
Console.WriteLine();
// Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple insert operations, this time with batching.
Console.WriteLine("Demonstrating batched database insert operations...");
Console.WriteLine($"Original size of Employee table: {GetEmployeeCount(connectionString)}.");
stopwatch.Restart();
AddEmployeesBatched(connectionString, insertBatchSize, insertCount);
stopwatch.Stop();
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);
Console.WriteLine();
// Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple retrieval operations with error reporting.
Console.WriteLine("Demonstrating batched join database select operations...");
// Add a small number of employees to the database.
AddEmployeesBatched(connectionString, insertBatchSize, 16);
// Query for random employees.
GetRandomEmployees(connectionString, insertBatchSize, 10);
}
}
}
/* Sample output:
Demonstrating non-batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 11035 ms.
Demonstrating batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 197 ms.
Demonstrating batched join database insert operations...
Adding 16 entries to Employee table...
Selecting items from Employee table...
Received a batch...
Last=Jones First=Tom ID=21
Last=Jones First=John ID=24
Last=Smith First=Tom ID=26
Last=Jones First=Tom ID=21
There were 4 errors in this batch...
Received a batch...
Last=Smith First=Tom ID=26
Last=Jones First=Mike ID=28
There were 0 errors in this batch...
Finished. There were 4 total errors.
*/
Imports System.Collections.Generic
Imports System.Data.SqlClient
Imports System.Data.SqlServerCe
Imports System.Diagnostics
Imports System.IO
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use batched dataflow blocks to improve
' the performance of database operations.
Namespace DataflowBatchDatabase
Friend Class Program
' The number of employees to add to the database.
' TODO: Change this value to experiment with different numbers of
' employees to insert into the database.
Private Shared ReadOnly insertCount As Integer = 256
' The size of a single batch of employees to add to the database.
' TODO: Change this value to experiment with different batch sizes.
Private Shared ReadOnly insertBatchSize As Integer = 96
' The source database file.
' TODO: Change this value if Northwind.sdf is at a different location
' on your computer.
Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"
' TODO: Change this value if you require a different temporary location.
Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
' Describes an employee. Each property maps to a
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
Public Property EmployeeID() As Integer
Public Property LastName() As String
Public Property FirstName() As String
' A random number generator that helps tp generate
' Employee property values.
Private Shared rand As New Random(42)
' Possible random first names.
Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
' Possible random last names.
Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}
' Creates an Employee object that contains random
' property values.
Public Shared Function Random() As Employee
Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
End Function
End Class
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
Using connection As New SqlConnection(connectionString)
Try
' Create the SQL command.
Dim command As New SqlCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)
connection.Open()
For i As Integer = 0 To employees.Length - 1
' Set parameters.
command.Parameters.Clear()
command.Parameters.Add("@lastName", employees(i).LastName)
command.Parameters.Add("@firstName", employees(i).FirstName)
' Execute the command.
command.ExecuteNonQuery()
Next i
Finally
connection.Close()
End Try
End Using
End Sub
' Retrieves the number of entries in the Employees table in
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
Dim result As Integer = 0
Using sqlConnection As New SqlConnection(connectionString)
Dim sqlCommand As New SqlCommand("SELECT COUNT(*) FROM Employees", sqlConnection)
sqlConnection.Open()
Try
result = CInt(Fix(sqlCommand.ExecuteScalar()))
Finally
sqlConnection.Close()
End Try
End Using
Return result
End Function
' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
Using connection As New SqlConnection(connectionString)
Dim command As New SqlCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)
connection.Open()
Try
Return CInt(Fix(command.ExecuteScalar()))
Finally
connection.Close()
End Try
End Using
End Function
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
Console.WriteLine("Adding {0} entries to Employee table...", count)
For i As Integer = 0 To count - 1
target.Post(Employee.Random())
Next i
End Sub
' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
' Create an ActionBlock<Employee> object that adds a single
' employee entry to the database.
Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))
' Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count)
' Set the dataflow block to the completed state and wait for
' all insert operations to complete.
insertEmployee.Complete()
insertEmployee.Completion.Wait()
End Sub
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchBlock<Employee> that holds several Employee objects and
' then propagates them out as an array.
Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)
' Create an ActionBlock<Employee[]> object that adds multiple
' employee entries to the database.
Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))
' Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees)
' When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())
' Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count)
' Set the batch block to the completed state and wait for
' all insert operations to complete.
batchEmployees.Complete()
insertEmployees.Completion.Wait()
End Sub
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchedJoinBlock<Employee, Exception> object that holds
' both employee and exception data.
Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)
' Holds the total number of exceptions that occurred.
Dim totalErrors As Integer = 0
' Create an action block that prints employee and error information
' to the console.
Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
' Print information about the employees in this batch.
' Print the error count for this batch.
' Update total error count.
Console.WriteLine("Received a batch...")
For Each e As Employee In data.Item1
Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
Next e
Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
totalErrors += data.Item2.Count
End Sub)
' Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees)
' When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())
' Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...")
For i As Integer = 0 To count - 1
Try
' Create a random employee.
Dim e As Employee = Employee.Random()
' Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)
' Post the Employee object to the Employee target of
' the batched join block.
selectEmployees.Target1.Post(e)
Catch e As NullReferenceException
' GetEmployeeID throws NullReferenceException when there is
' no such employee with the given name. When this happens,
' post the Exception object to the Exception target of
' the batched join block.
selectEmployees.Target2.Post(e)
End Try
Next i
' Set the batched join block to the completed state and wait for
' all retrieval operations to complete.
selectEmployees.Complete()
printEmployees.Completion.Wait()
' Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub
Shared Sub Main(ByVal args() As String)
' Create a connection string for accessing the database.
' The connection string refers to the temporary database location.
Dim connectionString As String = String.Format("Data Source={0}", scratchDatabase)
' Create a Stopwatch object to time database insert operations.
Dim stopwatch As New Stopwatch()
' Start with a clean database file by copying the source database to
' the temporary location.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple insert operations without batching.
Console.WriteLine("Demonstrating non-batched database insert operations...")
Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
stopwatch.Start()
AddEmployees(connectionString, insertCount)
stopwatch.Stop()
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)
Console.WriteLine()
' Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple insert operations, this time with batching.
Console.WriteLine("Demonstrating batched database insert operations...")
Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
stopwatch.Restart()
AddEmployeesBatched(connectionString, insertBatchSize, insertCount)
stopwatch.Stop()
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)
Console.WriteLine()
' Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple retrieval operations with error reporting.
Console.WriteLine("Demonstrating batched join database select operations...")
' Add a small number of employees to the database.
AddEmployeesBatched(connectionString, insertBatchSize, 16)
' Query for random employees.
GetRandomEmployees(connectionString, insertBatchSize, 10)
End Sub
End Class
End Namespace
' Sample output:
'Demonstrating non-batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 11035 ms.
'
'Demonstrating batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 197 ms.
'
'Demonstrating batched join database insert operations...
'Adding 16 entries to Employee table...
'Selecting items from Employee table...
'Received a batch...
'Last=Jones First=Tom ID=21
'Last=Jones First=John ID=24
'Last=Smith First=Tom ID=26
'Last=Jones First=Tom ID=21
'There were 4 errors in this batch...
'Received a batch...
'Last=Smith First=Tom ID=26
'Last=Jones First=Mike ID=28
'There were 0 errors in this batch...
'Finished. There were 4 total errors.
'