大容量复制操作可以作为独立操作或作为多步事务的一部分执行。 后面这个选项使你能够在同一事务中执行多个大容量复制操作,以及执行其他数据库操作(例如插入、更新和删除),同时仍能够提交或回滚整个事务。
默认情况下,大容量复制操作作为独立的操作完成。 大容量复制操作以非事务方式执行,不可以对其进行回滚。 如果需要在出错时回滚全部或部分大容量复制,可以执行以下操作:
使用 SqlBulkCopy 托管的事务
在现有事务中执行大容量复制操作
登记到 System.TransactionsTransaction。
执行非事务大容量复制操作
下面的控制台应用程序演示了非事务大容量复制操作在操作中途遇到错误时将发生什么情况。
在该示例中,源表和目标表分别包括一个名为 ProductID 的 Identity 列。 该代码首先通过删除所有行,然后插入知道其 ProductID 存在于源表中的一行,准备目标表 。 默认情况下,系统会在目标表中为添加的每个行生成一个 Identity 列的新值。 在此示例中,在打开连接时设置某个选项,该选项用于强制执行大容量加载过程以改为使用源表中的 Identity 值。
执行大容量复制操作,并将 BatchSize 属性设置为 10。 当操作遇到无效行时,将引发异常。 在此第一个示例中,大容量复制操作是非事务的。 提交发生错误之前复制的所有批。 回滚包含重复键的批,并暂停大容量复制操作,然后它才处理任何剩余的批。
注意
除非已按大容量复制示例设置中所述创建了工作表,否则此示例将不会运行。 提供此代码是为了演示仅使用 SqlBulkCopy 时的语法。 如果源表和目标表位于同一 SQL Server 实例中,可以更便捷地使用 Transact-SQL INSERT ... SELECT 语句复制数据。
using Microsoft.Data.SqlClient;
class Program
{
static void Main()
{
string connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new SqlConnection(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new SqlCommand();
commandDelete.Connection = sourceConnection;
commandDelete.CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns";
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new SqlCommand();
commandInsert.Connection = sourceConnection;
commandInsert.CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF";
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new SqlCommand(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new SqlCommand(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
// Set up the bulk copy object using the KeepIdentity option.
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(
connectionString, SqlBulkCopyOptions.KeepIdentity))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error
// after some of the batches have been copied.
try
{
bulkCopy.WriteToServer(reader);
}
catch (Exception ex)
{
// Print the number of rows processed using the
// RowsCopied property.
Console.WriteLine("{0} rows were processed.",
bulkCopy.RowsCopied);
Console.WriteLine(ex.Message);
}
finally
{
reader.Close();
}
}
// Perform a final count on the destination
// table to see how many rows were added.
// Note that for this scenario, the value will
// not be equal to the RowsCopied property.
long countEnd = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
}
private static string GetConnectionString()
// To avoid storing the sourceConnection string in your code,
// you can retrieve it from a configuration file.
{
return "Data Source=(local); " +
" Integrated Security=true;" +
"Initial Catalog=AdventureWorks;";
}
}
在事务中执行专用的大容量复制操作
默认情况下,大容量复制操作是其自己的事务。 要执行专用的大容量复制操作时,使用连接字符串创建 SqlBulkCopy 的实例,或者在没有活动事务的情况下使用现有 SqlConnection 对象。 在每个方案中,大容量复制操作创建一个事务,然后提交或回滚该事务。
可在 UseInternalTransaction 类构造函数中显式指定 SqlBulkCopy 选项以在其自己的事务中执行大容量复制操作。 每一批的操作将在单独的事务中执行。
注意
因为不同批在不同的事务中执行,因此如果在大容量复制操作期间出现错误,将回滚当前批中的所有行,但之前批中的行将保留在数据库中。
下面的控制台应用程序与之前的示例类似,但有一个例外:在此示例中,大容量复制操作管理其自己的事务。 提交发生错误之前复制的所有批。 回滚包含重复键的批,并暂停大容量复制操作,然后它才处理任何剩余的批。
重要
除非已按大容量复制示例设置中所述创建了工作表,否则此示例将不会运行。 提供此代码是为了演示仅使用 SqlBulkCopy 时的语法。 如果源表和目标表位于同一 SQL Server 实例中,可以更便捷地使用 Transact-SQL INSERT ... SELECT 语句复制数据。
using Microsoft.Data.SqlClient;
class Program
{
static void Main()
{
string connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new SqlConnection(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new SqlCommand();
commandDelete.Connection = sourceConnection;
commandDelete.CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns";
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new SqlCommand();
commandInsert.Connection = sourceConnection;
commandInsert.CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF";
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new SqlCommand(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new SqlCommand(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
// Set up the bulk copy object.
// Note that when specifying the UseInternalTransaction
// option, you cannot also specify an external transaction.
// Therefore, you must use the SqlBulkCopy construct that
// requires a string for the connection, rather than an
// existing SqlConnection object.
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(
connectionString, SqlBulkCopyOptions.KeepIdentity |
SqlBulkCopyOptions.UseInternalTransaction))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error
// after some of the batches have been copied.
try
{
bulkCopy.WriteToServer(reader);
}
catch (Exception ex)
{
// Print the number of rows processed using the
// RowsCopied property.
Console.WriteLine("{0} rows were processed.",
bulkCopy.RowsCopied);
Console.WriteLine(ex.Message);
}
finally
{
reader.Close();
}
}
// Perform a final count on the destination
// table to see how many rows were added.
// Note that for this scenario, the value will
// not be equal to the RowsCopied property.
long countEnd = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
}
private static string GetConnectionString()
// To avoid storing the sourceConnection string in your code,
// you can retrieve it from a configuration file.
{
return "Data Source=(local); " +
" Integrated Security=true;" +
"Initial Catalog=AdventureWorks;";
}
}
使用现有事务
可以指定现有 SqlTransaction 对象作为 SqlBulkCopy 构造函数中的参数。 在这种情况下,大容量复制操作在现有事务中进行,并且不对事务状态进行任何更改,即既不提交也不中止该事务。 此方法允许应用程序将大容量复制操作包含在带有其他数据库操作的事务中。 不过,如果未指定 SqlTransaction 对象并传递空引用,且连接有活动事务,异常就会抛出。
如果由于出现错误而需要回滚整个大容量复制操作,或如果大容量复制应在可以回滚的较大进程中执行,你可以向 SqlTransaction 构造函数提供 SqlBulkCopy 对象。
下面的控制台应用程序与第一个(非事务)示例类似,但有一个例外:在此示例中,大容量复制操作包含在较大的外部事务中。 发生主键冲突错误时,将回滚整个事务,并且不会向目标表添加任何行。
重要
除非已按大容量复制示例设置中所述创建了工作表,否则此示例将不会运行。 提供此代码是为了演示仅使用 SqlBulkCopy 时的语法。 如果源表和目标表位于同一 SQL Server 实例中,可以更便捷地使用 Transact-SQL INSERT ... SELECT 语句复制数据。
using Microsoft.Data.SqlClient;
class Program
{
static void Main()
{
string connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new SqlConnection(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new SqlCommand();
commandDelete.Connection = sourceConnection;
commandDelete.CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns";
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new SqlCommand();
commandInsert.Connection = sourceConnection;
commandInsert.CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF";
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new SqlCommand(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new SqlCommand(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
//Set up the bulk copy object inside the transaction.
using (SqlConnection destinationConnection =
new SqlConnection(connectionString))
{
destinationConnection.Open();
using (SqlTransaction transaction =
destinationConnection.BeginTransaction())
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(
destinationConnection, SqlBulkCopyOptions.KeepIdentity,
transaction))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error.
try
{
bulkCopy.WriteToServer(reader);
transaction.Commit();
}
catch (Exception ex)
{
// Print the number of rows processed using the
// RowsCopied property.
Console.WriteLine("{0} rows were processed.",
bulkCopy.RowsCopied);
Console.WriteLine(ex.Message);
transaction.Rollback();
}
finally
{
reader.Close();
}
}
}
}
// Perform a final count on the destination
// table to see how many rows were added.
long countEnd = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
}
private static string GetConnectionString()
// To avoid storing the sourceConnection string in your code,
// you can retrieve it from a configuration file.
{
return "Data Source=(local); " +
" Integrated Security=true;" +
"Initial Catalog=AdventureWorks;";
}
}