非同步程式設計
適用於:.NET Framework .NET .NET Standard
本文討論 Microsoft SqlClient Data Provider for SQL Server (SqlClient) 中非同步程式設計的支援。
舊版非同步程式設計
Microsoft SqlClient Data Provider for SQL Server 包括來自 System.Data.SqlClient 的方法,以針對移轉至 Microsoft.Data.SqlClient 的應用程式維持回溯相容性。 不建議針對新的開發使用下列舊版非同步程式設計方法:
提示
在 Microsoft SqlClient Data Provider for SQL Server 中,這些舊版方法已不需要連接字串中的 Asynchronous Processing=true
。
非同步程式設計功能
這些非同步程式設計功能提供讓程式碼成為非同步的簡單技巧。
如需 .NET 中非同步程式設計的詳細資訊,請參閱:
當您的使用者介面沒有回應或不能擴充伺服器時,您可能就需要使程式碼更加非同步。 傳統的非同步程式碼編寫涉及安裝回呼 (也稱為接續),以表示非同步作業完成後發生的邏輯。 此樣式會使非同步程式碼的結構比同步程式碼更複雜。
您可以在不使用回呼的情況下呼叫非同步方法,而不需要跨多個方法或 lambda 運算式分割您的程式碼。
async
修飾詞,指定方法為非同步方法。 呼叫 async
方法時會傳回工作。 當 await
運算子套用至工作時,目前的方法會立即結束。 當工作完成時,會以相同的方法繼續執行。
提示
在 Microsoft SqlClient Data Provider for SQL Server 中,不需要進行非同步呼叫來設定 Context Connection
連接字串關鍵字。
呼叫 async
方法並不會建立額外的執行緒。 它可能會在結尾處簡短地使用現有的 I/O 完成執行緒。
Microsoft SqlClient Data Provider for SQL Server 中的下列方法支援非同步程式設計:
其他非同步成員支援 SqlClient 串流支援。
提示
非同步方法不需要連接字串中的 Asynchronous Processing=true
。 而且此屬性在 Microsoft SqlClient Data Provider for SQL Server 中已淘汰。
同步至非同步連接開啟
您可以升級現有的應用程式,以使用非同步功能。 例如,假設應用程式具有同步連接演算法,並且會在每次連線至資料庫時封鎖 UI 執行緒。 連線之後,應用程式會呼叫預存程序,提示其他使用者剛剛有使用者登入。
using System;
using System.Data;
using Microsoft.Data.SqlClient;
namespace SqlCommandCS
{
class Program
{
static void Main()
{
string str = "Data Source=(local);Initial Catalog=Northwind;"
+ "Integrated Security=SSPI";
string qs = "SELECT OrderID, CustomerID FROM dbo.Orders;";
CreateCommand(qs, str);
}
private static void CreateCommand(string queryString,
string connectionString)
{
using (SqlConnection connection = new SqlConnection(
connectionString))
{
SqlCommand command = new SqlCommand(queryString, connection);
command.Connection.Open();
command.ExecuteNonQuery();
}
}
}
}
轉換成使用非同步功能時,程式看起來會像這樣:
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;
class A {
public static void Main()
{
using (SqlConnection conn = new SqlConnection("Data Source=(local); Initial Catalog=NorthWind; Integrated Security=SSPI"))
{
SqlCommand command = new SqlCommand("SELECT TOP 2 * FROM dbo.Orders", conn);
int result = A.Method(conn, command).Result;
SqlDataReader reader = command.ExecuteReader();
while (reader.Read())
Console.WriteLine(reader[0]);
}
}
static async Task<int> Method(SqlConnection conn, SqlCommand cmd) {
await conn.OpenAsync();
await cmd.ExecuteNonQueryAsync();
return 1;
}
}
在現有的應用程式中新增非同步功能 (混用新舊模式)
您也可以新增非同步功能 (SqlConnection::OpenAsync),而不需要變更現有的非同步邏輯。 例如,如果應用程式目前使用:
AsyncCallback productList = new AsyncCallback(ProductList);
SqlConnection conn = new SqlConnection("Data Source=(local); Initial Catalog=NorthWind; Integrated Security=SSPI");
conn.Open();
SqlCommand cmd = new SqlCommand("select top 2 * from orders", conn);
IAsyncResult ia = cmd.BeginExecuteReader(productList, cmd);
您可以開始使用非同步模式,而不需要大幅變更現有的演算法。
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;
class A
{
static void ProductList(IAsyncResult result) { }
public static void Main()
{
// AsyncCallback productList = new AsyncCallback(ProductList);
// SqlConnection conn = new SqlConnection("Data Source=(local); Initial Catalog=NorthWind; Integrated Security=SSPI");
// conn.Open();
// SqlCommand cmd = new SqlCommand("select top 2 * from orders", conn);
// IAsyncResult ia = cmd.BeginExecuteReader(productList, cmd);
AsyncCallback productList = new AsyncCallback(ProductList);
SqlConnection conn = new SqlConnection("Data Source=(local); Initial Catalog=NorthWind; Integrated Security=SSPI");
conn.OpenAsync().ContinueWith((task) => {
SqlCommand cmd = new SqlCommand("select top 2 * from orders", conn);
IAsyncResult ia = cmd.BeginExecuteReader(productList, cmd);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
}
使用基底提供者模型與非同步功能
您可能需要建立可連線至不同資料庫並執行查詢的工具。 您可以使用基底提供者模型與非同步功能。
您必須啟用伺服器上的「Microsoft 分散式異動控制器」(MSDTC),才能使用分散式異動。 如需如何啟用 MSDTC 的詳細資訊,請參閱如何在網頁伺服器上啟用 MSDTC。
using System.Data.Common;
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;
class program
{
static async Task PerformDBOperationsUsingProviderModel(string connectionString)
{
using (DbConnection connection = SqlClientFactory.Instance.CreateConnection())
{
connection.ConnectionString = connectionString;
await connection.OpenAsync();
DbCommand command = connection.CreateCommand();
command.CommandText = "SELECT * FROM AUTHORS";
using (DbDataReader reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
for (int i = 0; i < reader.FieldCount; i++)
{
// Process each column as appropriate
object obj = await reader.GetFieldValueAsync<object>(i);
Console.WriteLine(obj);
}
}
}
}
}
public static void Main()
{
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();
// replace these with your own values
builder.DataSource = "localhost";
builder.InitialCatalog = "pubs";
builder.IntegratedSecurity = true;
Task task = PerformDBOperationsUsingProviderModel(builder.ConnectionString);
task.Wait();
}
}
使用 SQL 交易與新的非同步功能
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;
class Program
{
static void Main()
{
string connectionString =
"Persist Security Info=False;Integrated Security=SSPI;database=Northwind;server=(local)";
Task task = ExecuteSqlTransaction(connectionString);
task.Wait();
}
static async Task ExecuteSqlTransaction(string connectionString)
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
SqlCommand command = connection.CreateCommand();
SqlTransaction transaction = null;
// Start a local transaction.
transaction = await Task.Run<SqlTransaction>(
() => connection.BeginTransaction("SampleTransaction")
);
// Must assign both transaction object and connection
// to Command object for a pending local transaction
command.Connection = connection;
command.Transaction = transaction;
try {
command.CommandText =
"Insert into Region (RegionID, RegionDescription) VALUES (555, 'Description')";
await command.ExecuteNonQueryAsync();
command.CommandText =
"Insert into Region (RegionID, RegionDescription) VALUES (556, 'Description')";
await command.ExecuteNonQueryAsync();
// Attempt to commit the transaction.
await Task.Run(() => transaction.Commit());
Console.WriteLine("Both records are written to database.");
}
catch (Exception ex)
{
Console.WriteLine("Commit Exception Type: {0}", ex.GetType());
Console.WriteLine(" Message: {0}", ex.Message);
// Attempt to roll back the transaction.
try
{
transaction.Rollback();
}
catch (Exception ex2)
{
// This catch block will handle any errors that may have occurred
// on the server that would cause the rollback to fail, such as
// a closed connection.
Console.WriteLine("Rollback Exception Type: {0}", ex2.GetType());
Console.WriteLine(" Message: {0}", ex2.Message);
}
}
}
}
}
使用分散式交易與新的非同步功能
在企業應用程式中,您可能需要在某些案例中加入分散式交易,以支援多部資料庫伺服器之間的交易。 您可以使用 System.Transactions 命名空間並登記分散式異動,如下所示:
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;
using System.Transactions;
class Program
{
public static void Main()
{
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();
// replace these with your own values
// create two tables RegionTable1 and RegionTable2
// and add a constraint in one of these tables
// to avoid duplicate RegionID
builder.DataSource = "localhost";
builder.InitialCatalog = "Northwind";
builder.IntegratedSecurity = true;
Task task = ExecuteDistributedTransaction(builder.ConnectionString, builder.ConnectionString);
task.Wait();
}
static async Task ExecuteDistributedTransaction(string connectionString1, string connectionString2)
{
using (SqlConnection connection1 = new SqlConnection(connectionString1))
using (SqlConnection connection2 = new SqlConnection(connectionString2))
{
using (CommittableTransaction transaction = new CommittableTransaction())
{
await connection1.OpenAsync();
connection1.EnlistTransaction(transaction);
await connection2.OpenAsync();
connection2.EnlistTransaction(transaction);
try
{
SqlCommand command1 = connection1.CreateCommand();
command1.CommandText = "Insert into RegionTable1 (RegionID, RegionDescription) VALUES (100, 'Description')";
await command1.ExecuteNonQueryAsync();
SqlCommand command2 = connection2.CreateCommand();
command2.CommandText = "Insert into RegionTable2 (RegionID, RegionDescription) VALUES (100, 'Description')";
await command2.ExecuteNonQueryAsync();
transaction.Commit();
}
catch (Exception ex)
{
Console.WriteLine("Exception Type: {0}", ex.GetType());
Console.WriteLine(" Message: {0}", ex.Message);
try
{
transaction.Rollback();
}
catch (Exception ex2)
{
Console.WriteLine("Rollback Exception Type: {0}", ex2.GetType());
Console.WriteLine(" Message: {0}", ex2.Message);
}
}
}
}
}
}
取消非同步作業
您可以使用 CancellationToken 取消非同步要求。
using Microsoft.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
namespace Samples
{
class CancellationSample
{
public static void Main(string[] args)
{
CancellationTokenSource source = new CancellationTokenSource();
source.CancelAfter(2000); // give up after 2 seconds
try
{
Task result = CancellingAsynchronousOperations(source.Token);
result.Wait();
}
catch (AggregateException exception)
{
if (exception.InnerException is SqlException)
{
Console.WriteLine("Operation canceled");
}
else
{
throw;
}
}
}
static async Task CancellingAsynchronousOperations(CancellationToken cancellationToken)
{
using (SqlConnection connection = new SqlConnection("Server=(local);Integrated Security=true"))
{
await connection.OpenAsync(cancellationToken);
SqlCommand command = new SqlCommand("WAITFOR DELAY '00:10:00'", connection);
await command.ExecuteNonQueryAsync(cancellationToken);
}
}
}
}
包含 SqlBulkCopy 的非同步作業
非同步功能也會在含 SqlBulkCopy.WriteToServerAsync 的 Microsoft.Data.SqlClient.SqlBulkCopy 中。
using System.Data;
using Microsoft.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
namespace SqlBulkCopyAsyncCodeSample
{
class Program
{
static string selectStatement = "SELECT * FROM [pubs].[dbo].[titles]";
static string createDestTableStatement =
@"CREATE TABLE {0} (
[title_id] [varchar](6) NOT NULL,
[title] [varchar](80) NOT NULL,
[type] [char](12) NOT NULL,
[pub_id] [char](4) NULL,
[price] [money] NULL,
[advance] [money] NULL,
[royalty] [int] NULL,
[ytd_sales] [int] NULL,
[notes] [varchar](200) NULL,
[pubdate] [datetime] NOT NULL)";
// Replace the connection string if needed, for instance to connect to SQL Express: @"Server=(local)\SQLEXPRESS;Database=Demo;Integrated Security=true"
// static string connectionString = @"Server=(localdb)\V11.0;Database=Demo";
static string connectionString = @"Server=(local);Database=Demo;Integrated Security=true";
// static string marsConnectionString = @"Server=(localdb)\V11.0;Database=Demo;MultipleActiveResultSets=true;";
static string marsConnectionString = @"Server=(local);Database=Demo;MultipleActiveResultSets=true;Integrated Security=true";
// Replace the Server name with your actual sql azure server name and User ID/Password
static string azureConnectionString = @"Server=SqlAzure;User ID=<myUserID>;Password=<myPassword>;Database=Demo";
static void Main(string[] args)
{
SynchronousSqlBulkCopy();
AsyncSqlBulkCopy().Wait();
MixSyncAsyncSqlBulkCopy().Wait();
AsyncSqlBulkCopyNotifyAfter().Wait();
AsyncSqlBulkCopyDataRows().Wait();
AsyncSqlBulkCopySqlServerToSqlAzure().Wait();
AsyncSqlBulkCopyCancel().Wait();
AsyncSqlBulkCopyMARS().Wait();
}
// 3.1.1 Synchronous bulk copy in .NET 4.5
private static void SynchronousSqlBulkCopy()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
conn.Open();
DataTable dt = new DataTable();
using (SqlCommand cmd = new SqlCommand(selectStatement, conn))
{
SqlDataAdapter adapter = new SqlDataAdapter(cmd);
adapter.Fill(dt);
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
cmd.CommandText = string.Format(createDestTableStatement, temptable);
cmd.ExecuteNonQuery();
using (SqlBulkCopy bcp = new SqlBulkCopy(conn))
{
bcp.DestinationTableName = temptable;
bcp.WriteToServer(dt);
}
}
}
}
// 3.1.2 Asynchronous bulk copy in .NET 4.5
private static async Task AsyncSqlBulkCopy()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
DataTable dt = new DataTable();
using (SqlCommand cmd = new SqlCommand(selectStatement, conn))
{
SqlDataAdapter adapter = new SqlDataAdapter(cmd);
adapter.Fill(dt);
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
cmd.CommandText = string.Format(createDestTableStatement, temptable);
await cmd.ExecuteNonQueryAsync();
using (SqlBulkCopy bcp = new SqlBulkCopy(conn))
{
bcp.DestinationTableName = temptable;
await bcp.WriteToServerAsync(dt);
}
}
}
}
// 3.2 Add new Async.NET capabilities in an existing application (Mixing synchronous and asynchronous calls)
private static async Task MixSyncAsyncSqlBulkCopy()
{
using (SqlConnection conn1 = new SqlConnection(connectionString))
{
conn1.Open();
using (SqlCommand cmd = new SqlCommand(selectStatement, conn1))
{
using (SqlDataReader reader = cmd.ExecuteReader())
{
using (SqlConnection conn2 = new SqlConnection(connectionString))
{
await conn2.OpenAsync();
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
SqlCommand createCmd = new SqlCommand(string.Format(createDestTableStatement, temptable), conn2);
await createCmd.ExecuteNonQueryAsync();
using (SqlBulkCopy bcp = new SqlBulkCopy(conn2))
{
bcp.DestinationTableName = temptable;
await bcp.WriteToServerAsync(reader);
}
}
}
}
}
}
// 3.3 Using the NotifyAfter property
private static async Task AsyncSqlBulkCopyNotifyAfter()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
DataTable dt = new DataTable();
using (SqlCommand cmd = new SqlCommand(selectStatement, conn))
{
SqlDataAdapter adapter = new SqlDataAdapter(cmd);
adapter.Fill(dt);
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
cmd.CommandText = string.Format(createDestTableStatement, temptable);
await cmd.ExecuteNonQueryAsync();
using (SqlBulkCopy bcp = new SqlBulkCopy(conn))
{
bcp.DestinationTableName = temptable;
bcp.NotifyAfter = 5;
bcp.SqlRowsCopied += new SqlRowsCopiedEventHandler(OnSqlRowsCopied);
await bcp.WriteToServerAsync(dt);
}
}
}
}
private static void OnSqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
{
Console.WriteLine("Copied {0} so far...", e.RowsCopied);
}
// 3.4 Using the new SqlBulkCopy Async.NET capabilities with DataRow[]
private static async Task AsyncSqlBulkCopyDataRows()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
DataTable dt = new DataTable();
using (SqlCommand cmd = new SqlCommand(selectStatement, conn))
{
SqlDataAdapter adapter = new SqlDataAdapter(cmd);
adapter.Fill(dt);
DataRow[] rows = dt.Select();
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
cmd.CommandText = string.Format(createDestTableStatement, temptable);
await cmd.ExecuteNonQueryAsync();
using (SqlBulkCopy bcp = new SqlBulkCopy(conn))
{
bcp.DestinationTableName = temptable;
await bcp.WriteToServerAsync(rows);
}
}
}
}
// 3.5 Copying data from SQL Server to SQL Azure in .NET 4.5
private static async Task AsyncSqlBulkCopySqlServerToSqlAzure()
{
using (SqlConnection srcConn = new SqlConnection(connectionString))
using (SqlConnection destConn = new SqlConnection(azureConnectionString))
{
await srcConn.OpenAsync();
await destConn.OpenAsync();
using (SqlCommand srcCmd = new SqlCommand(selectStatement, srcConn))
{
using (SqlDataReader reader = await srcCmd.ExecuteReaderAsync())
{
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
using (SqlCommand destCmd = new SqlCommand(string.Format(createDestTableStatement, temptable), destConn))
{
await destCmd.ExecuteNonQueryAsync();
using (SqlBulkCopy bcp = new SqlBulkCopy(destConn))
{
bcp.DestinationTableName = temptable;
await bcp.WriteToServerAsync(reader);
}
}
}
}
}
}
// 3.6 Cancelling an Asynchronous Operation to SQL Azure
private static async Task AsyncSqlBulkCopyCancel()
{
CancellationTokenSource cts = new CancellationTokenSource();
using (SqlConnection srcConn = new SqlConnection(connectionString))
using (SqlConnection destConn = new SqlConnection(azureConnectionString))
{
await srcConn.OpenAsync(cts.Token);
await destConn.OpenAsync(cts.Token);
using (SqlCommand srcCmd = new SqlCommand(selectStatement, srcConn))
{
using (SqlDataReader reader = await srcCmd.ExecuteReaderAsync(cts.Token))
{
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
using (SqlCommand destCmd = new SqlCommand(string.Format(createDestTableStatement, temptable), destConn))
{
await destCmd.ExecuteNonQueryAsync(cts.Token);
using (SqlBulkCopy bcp = new SqlBulkCopy(destConn))
{
bcp.DestinationTableName = temptable;
await bcp.WriteToServerAsync(reader, cts.Token);
//Cancel Async SqlBulCopy Operation after 200 ms
cts.CancelAfter(200);
}
}
}
}
}
}
// 3.7 Using Async.Net and MARS
private static async Task AsyncSqlBulkCopyMARS()
{
using (SqlConnection marsConn = new SqlConnection(marsConnectionString))
{
await marsConn.OpenAsync();
SqlCommand titlesCmd = new SqlCommand("SELECT * FROM [pubs].[dbo].[titles]", marsConn);
SqlCommand authorsCmd = new SqlCommand("SELECT * FROM [pubs].[dbo].[authors]", marsConn);
//With MARS we can have multiple active results sets on the same connection
using (SqlDataReader titlesReader = await titlesCmd.ExecuteReaderAsync())
using (SqlDataReader authorsReader = await authorsCmd.ExecuteReaderAsync())
{
await authorsReader.ReadAsync();
string temptable = "[#" + Guid.NewGuid().ToString("N") + "]";
using (SqlConnection destConn = new SqlConnection(connectionString))
{
await destConn.OpenAsync();
using (SqlCommand destCmd = new SqlCommand(string.Format(createDestTableStatement, temptable), destConn))
{
await destCmd.ExecuteNonQueryAsync();
using (SqlBulkCopy bcp = new SqlBulkCopy(destConn))
{
bcp.DestinationTableName = temptable;
await bcp.WriteToServerAsync(titlesReader);
}
}
}
}
}
}
}
}
以非同步方式搭配 MARS 使用多個命令
此範例會開啟 AdventureWorks 資料庫的單一連線。 使用 SqlCommand 物件,即會建立 SqlDataReader。 使用讀取器時,會開啟第二個 SqlDataReader,並使用第一個 SqlDataReader 的資料作為第二個讀取器的 WHERE 子句輸入。
注意
下列範例使用 AdventureWorks 範例資料庫。 範例程式碼中提供的連接字串會假設資料庫安裝在本機電腦且可供使用。 請依據環境需求修改連接字串。
using System.Data.Common;
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;
class Class1
{
static void Main()
{
Task task = MultipleCommands();
task.Wait();
}
static async Task MultipleCommands()
{
// By default, MARS is disabled when connecting to a MARS-enabled.
// It must be enabled in the connection string.
string connectionString = GetConnectionString();
int vendorID;
SqlDataReader productReader = null;
string vendorSQL =
"SELECT BusinessEntityID, Name FROM Purchasing.Vendor";
string productSQL =
"SELECT Production.Product.Name FROM Production.Product " +
"INNER JOIN Purchasing.ProductVendor " +
"ON Production.Product.ProductID = " +
"Purchasing.ProductVendor.ProductID " +
"WHERE Purchasing.ProductVendor.BusinessEntityID = @VendorId";
using (SqlConnection awConnection =
new SqlConnection(connectionString))
{
SqlCommand vendorCmd = new SqlCommand(vendorSQL, awConnection);
SqlCommand productCmd =
new SqlCommand(productSQL, awConnection);
productCmd.Parameters.Add("@VendorId", SqlDbType.Int);
await awConnection.OpenAsync();
using (SqlDataReader vendorReader = await vendorCmd.ExecuteReaderAsync())
{
while (await vendorReader.ReadAsync())
{
Console.WriteLine(vendorReader["Name"]);
vendorID = (int)vendorReader["BusinessEntityID"];
productCmd.Parameters["@VendorId"].Value = vendorID;
// The following line of code requires a MARS-enabled connection.
productReader = await productCmd.ExecuteReaderAsync();
using (productReader)
{
while (await productReader.ReadAsync())
{
Console.WriteLine(" " +
productReader["Name"].ToString());
}
}
}
}
}
}
private static string GetConnectionString()
{
// To avoid storing the connection string in your code, you can retrieve it from a configuration file.
return "Data Source=(local);Integrated Security=SSPI;Initial Catalog=AdventureWorks;MultipleActiveResultSets=True";
}
}
搭配 MARS 以非同步方式讀取及更新資料
MARS 允許將連線用於含有一個以上擱置中作業的讀取作業與資料操作語言 (DML) 作業。 此功能讓應用程式不需要處理連線忙碌的錯誤。 而且,MARS 也可以取代伺服器端資料指標的使用,這通常會取用更多資源。 最後,因為多個作業可在單一連線上進行操作,所以可共用相同的交易內容,而無需使用 sp_getbindtoken 及 sp_bindsession 系統預存程序。
下列主控台應用程式示範如何使用兩個具有三個 SqlCommand 物件的 SqlDataReader 物件,以及已啟用 MARS 的單一 SqlConnection 物件。 第一個命令物件會擷取其信用評等為 5 的廠商清單。 第二個命令物件會使用 SqlDataReader 提供的廠商識別碼,來載入第二個 SqlDataReader 以及該特定廠商的所有產品。 第二個 SqlDataReader 會造訪每個產品記錄。 將會執行計算,以判斷新的 OnOrderQty 應該是什麼。 然後使用第三個命令物件,以新值來更新 ProductVendor 資料表。 這整個程序都會在單一交易內進行,並在結束時復原。
注意
下列範例使用 AdventureWorks 範例資料庫。 範例程式碼中提供的連接字串會假設資料庫安裝在本機電腦且可供使用。 請依據環境需求修改連接字串。
using System.Data.Common;
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;
class Program
{
static void Main()
{
Task task = ReadingAndUpdatingData();
task.Wait();
}
static async Task ReadingAndUpdatingData()
{
// By default, MARS is disabled when connecting to a MARS-enabled host.
// It must be enabled in the connection string.
string connectionString = GetConnectionString();
SqlTransaction updateTx = null;
SqlCommand vendorCmd = null;
SqlCommand prodVendCmd = null;
SqlCommand updateCmd = null;
SqlDataReader prodVendReader = null;
int vendorID = 0;
int productID = 0;
int minOrderQty = 0;
int maxOrderQty = 0;
int onOrderQty = 0;
int recordsUpdated = 0;
int totalRecordsUpdated = 0;
string vendorSQL =
"SELECT BusinessEntityID, Name FROM Purchasing.Vendor " +
"WHERE CreditRating = 5";
string prodVendSQL =
"SELECT ProductID, MaxOrderQty, MinOrderQty, OnOrderQty " +
"FROM Purchasing.ProductVendor " +
"WHERE BusinessEntityID = @VendorID";
string updateSQL =
"UPDATE Purchasing.ProductVendor " +
"SET OnOrderQty = @OrderQty " +
"WHERE ProductID = @ProductID AND BusinessEntityID = @VendorID";
using (SqlConnection awConnection =
new SqlConnection(connectionString))
{
await awConnection.OpenAsync();
updateTx = await Task.Run(() => awConnection.BeginTransaction());
vendorCmd = new SqlCommand(vendorSQL, awConnection);
vendorCmd.Transaction = updateTx;
prodVendCmd = new SqlCommand(prodVendSQL, awConnection);
prodVendCmd.Transaction = updateTx;
prodVendCmd.Parameters.Add("@VendorId", SqlDbType.Int);
updateCmd = new SqlCommand(updateSQL, awConnection);
updateCmd.Transaction = updateTx;
updateCmd.Parameters.Add("@OrderQty", SqlDbType.Int);
updateCmd.Parameters.Add("@ProductID", SqlDbType.Int);
updateCmd.Parameters.Add("@VendorID", SqlDbType.Int);
using (SqlDataReader vendorReader = await vendorCmd.ExecuteReaderAsync())
{
while (await vendorReader.ReadAsync())
{
Console.WriteLine(vendorReader["Name"]);
vendorID = (int)vendorReader["BusinessEntityID"];
prodVendCmd.Parameters["@VendorID"].Value = vendorID;
prodVendReader = await prodVendCmd.ExecuteReaderAsync();
using (prodVendReader)
{
while (await prodVendReader.ReadAsync())
{
productID = (int)prodVendReader["ProductID"];
if (prodVendReader["OnOrderQty"] == DBNull.Value)
{
minOrderQty = (int)prodVendReader["MinOrderQty"];
onOrderQty = minOrderQty;
}
else
{
maxOrderQty = (int)prodVendReader["MaxOrderQty"];
onOrderQty = (int)(maxOrderQty / 2);
}
updateCmd.Parameters["@OrderQty"].Value = onOrderQty;
updateCmd.Parameters["@ProductID"].Value = productID;
updateCmd.Parameters["@VendorID"].Value = vendorID;
recordsUpdated = await updateCmd.ExecuteNonQueryAsync();
totalRecordsUpdated += recordsUpdated;
}
}
}
}
Console.WriteLine("Total Records Updated: ", totalRecordsUpdated.ToString());
await Task.Run(() => updateTx.Rollback());
Console.WriteLine("Transaction Rolled Back");
}
}
private static string GetConnectionString()
{
// To avoid storing the connection string in your code, you can retrieve it from a configuration file.
return "Data Source=(local);Integrated Security=SSPI;Initial Catalog=AdventureWorks;MultipleActiveResultSets=True";
}
}