Транзакции и операции массового копирования
Операции массового копирования могут выполняться как изолированные операции или как часть многошаговой транзакции. В последнем случае вы можете выполнить более одной операции массового копирования в одной транзакции, а также выполнять другие операции с базами данных (такие как вставка, обновление и удаление) с возможностью фиксации или отката всей транзакции.
По умолчанию операция массового копирования выполняется как изолированная. Операция массового копирования выполняется не как транзакция и без возможности отката. Если необходимо выполнить откат всей или части операции массового копирования при возникновении ошибки, можно использовать управляемую SqlBulkCopy транзакцию, выполнить операцию массового копирования в существующей транзакции или прикрепиться к System.TransactionsTransaction.
Выполнение операции массового копирования без использования транзакции
Следующее консольное приложение показывает, что происходит, когда во время операции массового копирования без транзакции возникает ошибка.
В этом примере и таблица источника, и целевая таблица включают столбец Identity
, именуемый ProductID. Сначала код подготавливает целевую таблицу, удаляя все строки и вставляя одну строку, у которой ProductID точно существует в исходной таблице. По умолчанию в целевой таблице для каждой добавляемой строки создается новое значение столбца Identity
. В этом примере при открытии соединения задается параметр, заставляющий процесс массовой загрузки использовать значения Identity
из исходной таблицы.
Операция массового копирования выполняется со свойством BatchSize, равным 10. Когда операция встречает недопустимую строку, вызывается исключение. В первом примере операция массового копирования выполняется без транзакции. Фиксируются все пакеты, скопированные до появления ошибки. Пакет, содержащий повторяющийся ключ, откатывается, а операция массового копирования прерывается до обработки любых других пакетов.
Примечание.
Этот пример не будет выполняться, если вы не создали рабочие таблицы, как описано в разделе "Пример массового копирования". Этот код предназначен только для демонстрации синтаксиса использования SqlBulkCopy. Если исходная и целевая таблицы расположены в одном экземпляре SQL Server, будет проще и быстрее использовать инструкцию Transact-SQL INSERT … SELECT
для копирования данных.
var connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new()
{
Connection = sourceConnection,
CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns"
};
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new()
{
Connection = sourceConnection,
CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF"
};
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
// Set up the bulk copy object using the KeepIdentity option.
using (SqlBulkCopy bulkCopy = new(
connectionString, SqlBulkCopyOptions.KeepIdentity))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error
// after some of the batches have been copied.
try
{
bulkCopy.WriteToServer(reader);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
reader.Close();
}
}
// Perform a final count on the destination
// table to see how many rows were added.
long countEnd = Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
Imports System.Data.SqlClient
Module Module1
Sub Main()
Dim connectionString As String = GetConnectionString()
' Open a sourceConnection to the AdventureWorks database.
Using sourceConnection As SqlConnection = _
New SqlConnection(connectionString)
sourceConnection.Open()
' Delete all from the destination table.
Dim commandDelete As New SqlCommand
commandDelete.Connection = sourceConnection
commandDelete.CommandText = _
"DELETE FROM dbo.BulkCopyDemoMatchingColumns"
commandDelete.ExecuteNonQuery()
' Add a single row that will result in duplicate key
' when all rows from source are bulk copied.
' Note that this technique will only be successful in
' illustrating the point if a row with ProductID = 446
' exists in the AdventureWorks Production.Products table.
' If you have made changes to the data in this table, change
' the SQL statement in the code to add a ProductID that
' does exist in your version of the Production.Products
' table. Choose any ProductID in the middle of the table
' (not first or last row) to best illustrate the result.
Dim commandInsert As New SqlCommand
commandInsert.Connection = sourceConnection
commandInsert.CommandText = _
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" & _
"INSERT INTO dbo.BulkCopyDemoMatchingColumns " & _
"([ProductID], [Name] ,[ProductNumber]) " & _
"VALUES(446, 'Lock Nut 23','LN-3416');" & _
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF"
commandInsert.ExecuteNonQuery()
' Perform an initial count on the destination table.
Dim commandRowCount As New SqlCommand( _
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;", _
sourceConnection)
Dim countStart As Long = _
System.Convert.ToInt32(commandRowCount.ExecuteScalar())
Console.WriteLine("Starting row count = {0}", countStart)
' Get data from the source table as a SqlDataReader.
Dim commandSourceData As SqlCommand = New SqlCommand( _
"SELECT ProductID, Name, ProductNumber " & _
"FROM Production.Product;", sourceConnection)
Dim reader As SqlDataReader = _
commandSourceData.ExecuteReader()
' Set up the bulk copy object using the KeepIdentity option.
Using bulkCopy As SqlBulkCopy = New SqlBulkCopy(connectionString, _
SqlBulkCopyOptions.KeepIdentity)
bulkCopy.BatchSize = 10
bulkCopy.DestinationTableName = "dbo.BulkCopyDemoMatchingColumns"
' Write from the source to the destination.
' This should fail with a duplicate key error
' after some of the batches have already been copied.
Try
bulkCopy.WriteToServer(reader)
Catch ex As Exception
Console.WriteLine(ex.Message)
Finally
reader.Close()
End Try
End Using
' Perform a final count on the destination table
' to see how many rows were added.
Dim countEnd As Long = _
System.Convert.ToInt32(commandRowCount.ExecuteScalar())
Console.WriteLine("Ending row count = {0}", countEnd)
Console.WriteLine("{0} rows were added.", countEnd - countStart)
Console.WriteLine("Press Enter to finish.")
Console.ReadLine()
End Using
End Sub
Private Function GetConnectionString() As String
Throw New NotImplementedException()
End Function
End Module
Выполнение специальной операции массового копирования в транзакции
По умолчанию операция массового копирования является своей собственной транзакцией. Если вы хотите выполнить выделенную операцию массового копирования, создайте новый экземпляр SqlBulkCopy с помощью строки подключения или примените существующий объект SqlConnection, в котором нет активных транзакций. В каждом случае операция массового копирования создает и затем фиксирует или откатывает транзакцию.
Вы можете явно указать параметр UseInternalTransaction в SqlBulkCopy, чтобы в явном виде заставить операцию массового копирования выполняться в своей собственной транзакции, вследствие чего каждый пакет операции будет выполняться в отдельной транзакции.
Примечание.
Поскольку разные пакеты выполняются в разных транзакциях, если во время операции массового копирования возникает ошибка, для всех строк в текущем пакете будет выполнен откат, но строки из предыдущих пакетов останутся в базе данных.
Следующее консольное приложение похоже на предыдущий пример с одним исключением: в этом примере операция массового копирования обрабатывает собственные транзакции. Фиксируются все пакеты, скопированные до появления ошибки. Пакет, содержащий повторяющийся ключ, откатывается, а операция массового копирования прерывается до обработки любых других пакетов.
Внимание
Этот пример не будет выполняться, если вы не создали рабочие таблицы, как описано в разделе "Пример массового копирования". Этот код предназначен только для демонстрации синтаксиса использования SqlBulkCopy. Если исходная и целевая таблицы расположены в одном экземпляре SQL Server, будет проще и быстрее использовать инструкцию Transact-SQL INSERT … SELECT
для копирования данных.
var connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new()
{
Connection = sourceConnection,
CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns"
};
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new()
{
Connection = sourceConnection,
CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF"
};
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
// Set up the bulk copy object.
// Note that when specifying the UseInternalTransaction
// option, you cannot also specify an external transaction.
// Therefore, you must use the SqlBulkCopy construct that
// requires a string for the connection, rather than an
// existing SqlConnection object.
using (SqlBulkCopy bulkCopy = new(
connectionString, SqlBulkCopyOptions.KeepIdentity |
SqlBulkCopyOptions.UseInternalTransaction))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error
// after some of the batches have been copied.
try
{
bulkCopy.WriteToServer(reader);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
reader.Close();
}
}
// Perform a final count on the destination
// table to see how many rows were added.
long countEnd = Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
Imports System.Data.SqlClient
Module Module1
Sub Main()
Dim connectionString As String = GetConnectionString()
' Open a sourceConnection to the AdventureWorks database.
Using sourceConnection As SqlConnection = _
New SqlConnection(connectionString)
sourceConnection.Open()
' Delete all from the destination table.
Dim commandDelete As New SqlCommand
commandDelete.Connection = sourceConnection
commandDelete.CommandText = _
"DELETE FROM dbo.BulkCopyDemoMatchingColumns"
commandDelete.ExecuteNonQuery()
' Add a single row that will result in duplicate key
' when all rows from source are bulk copied.
' Note that this technique will only be successful in
' illustrating the point if a row with ProductID = 446
' exists in the AdventureWorks Production.Products table.
' If you have made changes to the data in this table, change
' the SQL statement in the code to add a ProductID that
' does exist in your version of the Production.Products
' table. Choose any ProductID in the middle of the table
' (not first or last row) to best illustrate the result.
Dim commandInsert As New SqlCommand
commandInsert.Connection = sourceConnection
commandInsert.CommandText = _
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" & _
"INSERT INTO dbo.BulkCopyDemoMatchingColumns " & _
"([ProductID], [Name] ,[ProductNumber]) " & _
"VALUES(446, 'Lock Nut 23','LN-3416');" & _
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF"
commandInsert.ExecuteNonQuery()
' Perform an initial count on the destination table.
Dim commandRowCount As New SqlCommand( _
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;", _
sourceConnection)
Dim countStart As Long = _
System.Convert.ToInt32(commandRowCount.ExecuteScalar())
Console.WriteLine("Starting row count = {0}", countStart)
' Get data from the source table as a SqlDataReader.
Dim commandSourceData As SqlCommand = New SqlCommand( _
"SELECT ProductID, Name, ProductNumber " & _
"FROM Production.Product;", sourceConnection)
Dim reader As SqlDataReader = _
commandSourceData.ExecuteReader()
' Set up the bulk copy object.
' Note that when specifying the UseInternalTransaction option,
' you cannot also specify an external transaction. Therefore,
' you must use the SqlBulkCopy construct that requires a string
' for the connection, rather than an existing SqlConnection object.
Using bulkCopy As SqlBulkCopy = New SqlBulkCopy(connectionString, _
SqlBulkCopyOptions.UseInternalTransaction Or _
SqlBulkCopyOptions.KeepIdentity)
bulkCopy.BatchSize = 10
bulkCopy.DestinationTableName = "dbo.BulkCopyDemoMatchingColumns"
' Write from the source to the destination.
' This should fail with a duplicate key error
' after some of the batches have already been copied.
Try
bulkCopy.WriteToServer(reader)
Catch ex As Exception
Console.WriteLine(ex.Message)
Finally
reader.Close()
End Try
End Using
' Perform a final count on the destination table
' to see how many rows were added.
Dim countEnd As Long = _
System.Convert.ToInt32(commandRowCount.ExecuteScalar())
Console.WriteLine("Ending row count = {0}", countEnd)
Console.WriteLine("{0} rows were added.", countEnd - countStart)
Console.WriteLine("Press Enter to finish.")
Console.ReadLine()
End Using
End Sub
Private Function GetConnectionString() As String
Throw New NotImplementedException()
End Function
End Module
Использование существующих транзакций
Вы можете указать существующий объект SqlTransaction в качестве параметра в конструкторе SqlBulkCopy. В этом случае операция массового копирования выполняется в существующей транзакции, а состояние транзакции не изменяется (то есть она не фиксируется и не прерывается). Это позволяет приложению включить операцию массового копирования в транзакцию с другими операциями базы данных. Но если вы не укажете объект SqlTransaction и передадите пустую ссылку, а соединение содержит активную транзакцию, создается исключение.
Если вам нужно откатить всю операцию массового копирования из-за возникшей ошибки, или это массовое копирование является частью более масштабного отменяемого процесса, вы можете передать объект SqlTransaction в конструктор SqlBulkCopy.
Следующее консольное приложение похоже на первый пример (без транзакции) с одним исключением: в этом примере операция массового копирования включена в большую, внешнюю транзакцию. Если возникает ошибка нарушения первичного ключа, производится откат всей транзакции и никакие строки не добавляются в целевую таблицу.
Внимание
Этот пример не будет выполняться, если вы не создали рабочие таблицы, как описано в разделе "Пример массового копирования". Этот код предназначен только для демонстрации синтаксиса использования SqlBulkCopy. Если исходная и целевая таблицы расположены в одном экземпляре SQL Server, будет проще и быстрее использовать инструкцию Transact-SQL INSERT … SELECT
для копирования данных.
var connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new()
{
Connection = sourceConnection,
CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns"
};
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new()
{
Connection = sourceConnection,
CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF"
};
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
//Set up the bulk copy object inside the transaction.
using (SqlConnection destinationConnection =
new(connectionString))
{
destinationConnection.Open();
using (SqlTransaction transaction =
destinationConnection.BeginTransaction())
{
using (SqlBulkCopy bulkCopy = new(
destinationConnection, SqlBulkCopyOptions.KeepIdentity,
transaction))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error.
try
{
bulkCopy.WriteToServer(reader);
transaction.Commit();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
transaction.Rollback();
}
finally
{
reader.Close();
}
}
}
}
// Perform a final count on the destination
// table to see how many rows were added.
long countEnd = Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
Imports System.Data.SqlClient
Module Module1
Sub Main()
Dim connectionString As String = GetConnectionString()
' Open a sourceConnection to the AdventureWorks database.
Using sourceConnection As SqlConnection = _
New SqlConnection(connectionString)
sourceConnection.Open()
' Delete all from the destination table.
Dim commandDelete As New SqlCommand
commandDelete.Connection = sourceConnection
commandDelete.CommandText = _
"DELETE FROM dbo.BulkCopyDemoMatchingColumns"
commandDelete.ExecuteNonQuery()
' Add a single row that will result in duplicate key
' when all rows from source are bulk copied.
' Note that this technique will only be successful in
' illustrating the point if a row with ProductID = 446
' exists in the AdventureWorks Production.Products table.
' If you have made changes to the data in this table, change
' the SQL statement in the code to add a ProductID that
' does exist in your version of the Production.Products
' table. Choose any ProductID in the middle of the table
' (not first or last row) to best illustrate the result.
Dim commandInsert As New SqlCommand
commandInsert.Connection = sourceConnection
commandInsert.CommandText = _
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" & _
"INSERT INTO dbo.BulkCopyDemoMatchingColumns " & _
"([ProductID], [Name] ,[ProductNumber]) " & _
"VALUES(446, 'Lock Nut 23','LN-3416');" & _
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF"
commandInsert.ExecuteNonQuery()
' Perform an initial count on the destination table.
Dim commandRowCount As New SqlCommand( _
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;", _
sourceConnection)
Dim countStart As Long = _
System.Convert.ToInt32(commandRowCount.ExecuteScalar())
Console.WriteLine("Starting row count = {0}", countStart)
' Get data from the source table as a SqlDataReader.
Dim commandSourceData As SqlCommand = New SqlCommand( _
"SELECT ProductID, Name, ProductNumber " & _
"FROM Production.Product;", sourceConnection)
Dim reader As SqlDataReader = _
commandSourceData.ExecuteReader()
' Set up the bulk copy object inside the transaction.
Using destinationConnection As SqlConnection = _
New SqlConnection(connectionString)
destinationConnection.Open()
Using transaction As SqlTransaction = _
destinationConnection.BeginTransaction()
Using bulkCopy As SqlBulkCopy = New _
SqlBulkCopy(destinationConnection, _
SqlBulkCopyOptions.KeepIdentity, transaction)
bulkCopy.BatchSize = 10
bulkCopy.DestinationTableName = _
"dbo.BulkCopyDemoMatchingColumns"
' Write from the source to the destination.
' This should fail with a duplicate key error.
Try
bulkCopy.WriteToServer(reader)
transaction.Commit()
Catch ex As Exception
Console.WriteLine(ex.Message)
transaction.Rollback()
Finally
reader.Close()
End Try
End Using
End Using
End Using
' Perform a final count on the destination table
' to see how many rows were added.
Dim countEnd As Long = _
System.Convert.ToInt32(commandRowCount.ExecuteScalar())
Console.WriteLine("Ending row count = {0}", countEnd)
Console.WriteLine("{0} rows were added.", countEnd - countStart)
Console.WriteLine("Press Enter to finish.")
Console.ReadLine()
End Using
End Sub
Private Function GetConnectionString() As String
Throw New NotImplementedException()
End Function
End Module