SqlClient 流式处理支持
适用于: .NET Framework .NET .NET Standard
SQL Server 和应用程序之间的流式处理支持支持服务器上的非结构化数据(文档、图像和媒体文件)。 SQL Server 数据库可以存储二进制大型对象 (BLOB),但检索 BLOB 会使用大量内存。
针对 SQL Server 的流式处理支持简化了对数据进行流式处理的应用程序的编写,无需完全将数据加载到内存中,从而减少了内存溢出异常。
流式处理支持还会使中间层应用程序能够更好地进行缩放,尤其是在业务对象连接到 Azure SQL 以发送、检索和操作大型 BLOB 的情况下。
警告
支持流式处理的成员用于从查询中检索数据,并将参数传递给查询和存储过程。 流式处理功能可解决基本的 OLTP 和数据迁移方案,适用于本地和非本地数据迁移环境。
SQL Server 中的流式处理支持
SQL Server 的流式处理支持在 DbDataReader 和 SqlDataReader 类中引入新功能,以便获取 Stream、XmlReader 和 TextReader对象并对其做出反应。 这些类用于检索查询中的数据。 因此,SQL Server 中的流式处理支持可解决 OLTP 方案,且适用于本地和非本地环境。
为了启用 SQL Server 中的流式处理支持,将以下成员添加到了 SqlDataReader:
为了启用 SQL Server 中的流式处理支持,将以下成员添加到了 DbDataReader:
SQL Server 的流式处理支持
对 SQL Server 的流式处理支持位于 SqlParameter 类,因此它可以接受 XmlReader、Stream 和 TextReader 对象并做出反应。 SqlParameter 用于将参数传递给查询和存储过程。
注意
释放 SqlCommand 对象或调用 Cancel 必须取消任何流操作。 如果应用程序发送 CancellationToken,则不保证取消。
以下 SqlDbType 类型将接受 Value 的 Stream:
二进制
VarBinary
以下 SqlDbType 类型将接受 Value 的 TextReader:
Char
NChar
NVarChar
Xml
XmlSqlDbType 类型将接受 XmlReader 的 Value。
SqlValue 可接受类型 XmlReader、TextReader 和 Stream 的值。
XmlReader、TextReader 和 Stream 对象将转移到由 Size 定义的值。
示例 -- 来自 SQL Server 的流式处理
使用以下 Transact-SQL 创建示例数据库:
CREATE DATABASE [Demo]
GO
USE [Demo]
GO
CREATE TABLE [Streams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX),
[bindata] VARBINARY(MAX),
[xmldata] XML)
GO
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'This is a test', 0x48656C6C6F, N'<test>value</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Hello, World!', 0x54657374696E67, N'<test>value2</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Another row', 0x666F6F626172, N'<fff>bbb</fff><fff>bbc</fff>')
GO
该示例说明如何执行以下功能:
通过提供用于检索大型文件的异步方法来避免阻止用户接口线程。
在 .NET 中传输 SQL Server 的大型文本文件。
在 .NET 中传输 SQL Server 的大型 XML 文件。
检索 SQL Server 中的数据。
将一个 SQL Server 数据库中的大型文件 (BLOB) 传输到另一个数据库而不会用尽内存。
using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading.Tasks;
using System.Xml;
namespace StreamingFromServer
{
class Program
{
private const string connectionString = @"Server=localhost;Database=Demo;Integrated Security=true";
static void Main(string[] args)
{
CopyBinaryValueToFile().Wait();
PrintTextValues().Wait();
PrintXmlValues().Wait();
PrintXmlValuesViaNVarChar().Wait();
Console.WriteLine("Done");
}
// Application retrieving a large BLOB from SQL Server in .NET 4.5 using the new asynchronous capability
private static async Task CopyBinaryValueToFile()
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [bindata] FROM [Streams] WHERE [id]=@id", connection))
{
command.Parameters.AddWithValue("id", 1);
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
if (await reader.ReadAsync())
{
if (!(await reader.IsDBNullAsync(0)))
{
using (FileStream file = new FileStream("binarydata.bin", FileMode.Create, FileAccess.Write))
{
using (Stream data = reader.GetStream(0))
{
// Asynchronously copy the stream from the server to the file we just created
await data.CopyToAsync(file);
}
}
}
}
}
}
}
}
// Application transferring a large Text File from SQL Server
private static async Task PrintTextValues()
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [id], [textdata] FROM [Streams]", connection))
{
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire text document into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
while (await reader.ReadAsync())
{
Console.Write("{0}: ", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1))
{
Console.Write("(NULL)");
}
else
{
char[] buffer = new char[4096];
int charsRead = 0;
using (TextReader data = reader.GetTextReader(1))
{
do
{
// Grab each chunk of text and write it to the console
// If you are writing to a TextWriter you should use WriteAsync or WriteLineAsync
charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
Console.Write(buffer, 0, charsRead);
} while (charsRead > 0);
}
}
Console.WriteLine();
}
}
}
}
}
// Application transferring a large Xml Document from SQL Server
private static async Task PrintXmlValues()
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [id], [xmldata] FROM [Streams]", connection))
{
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
while (await reader.ReadAsync())
{
Console.WriteLine("{0}: ", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1))
{
Console.WriteLine("\t(NULL)");
}
else
{
using (XmlReader xmlReader = reader.GetXmlReader(1))
{
int depth = 1;
// NOTE: The XmlReader returned by GetXmlReader does NOT support async operations
// See the example below (PrintXmlValuesViaNVarChar) for how to get an XmlReader with asynchronous capabilities
while (xmlReader.Read())
{
switch (xmlReader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
depth++;
break;
case XmlNodeType.Text:
Console.WriteLine("{0}{1}", new string('\t', depth), xmlReader.Value);
break;
case XmlNodeType.EndElement:
depth--;
Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
break;
}
}
}
}
}
}
}
}
}
// Application transferring a large Xml Document from SQL Server
// This goes via NVarChar and TextReader to enable asynchronous reading
private static async Task PrintXmlValuesViaNVarChar()
{
XmlReaderSettings xmlSettings = new XmlReaderSettings()
{
// Async must be explicitly enabled in the XmlReaderSettings otherwise the XmlReader will throw exceptions when async methods are called
Async = true,
// Since we will immediately wrap the TextReader we are creating in an XmlReader, we will permit the XmlReader to take care of closing\disposing it
CloseInput = true,
// If the Xml you are reading is not a valid document (as per <https://docs.microsoft.com/previous-versions/dotnet/netframework-4.0/6bts1x50(v=vs.100)>) you will need to set the conformance level to Fragment
ConformanceLevel = ConformanceLevel.Fragment
};
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
// Cast the XML into NVarChar to enable GetTextReader - trying to use GetTextReader on an XML type will throw an exception
using (SqlCommand command = new SqlCommand("SELECT [id], CAST([xmldata] AS NVARCHAR(MAX)) FROM [Streams]", connection))
{
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
while (await reader.ReadAsync())
{
Console.WriteLine("{0}:", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1))
{
Console.WriteLine("\t(NULL)");
}
else
{
// Grab the row as a TextReader, then create an XmlReader on top of it
// We are not keeping a reference to the TextReader since the XmlReader is created with the "CloseInput" setting (so it will close the TextReader when needed)
using (XmlReader xmlReader = XmlReader.Create(reader.GetTextReader(1), xmlSettings))
{
int depth = 1;
// The XmlReader above now supports asynchronous operations, so we can use ReadAsync here
while (await xmlReader.ReadAsync())
{
switch (xmlReader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
depth++;
break;
case XmlNodeType.Text:
// Depending on what your data looks like, you should either use Value or GetValueAsync
// Value has less overhead (since it doesn't create a Task), but it may also block if additional data is required
Console.WriteLine("{0}{1}", new string('\t', depth), await xmlReader.GetValueAsync());
break;
case XmlNodeType.EndElement:
depth--;
Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
break;
}
}
}
}
}
}
}
}
}
}
}
示例 -- 流式传输到 SQL Server
使用以下 Transact-SQL 创建示例数据库:
CREATE DATABASE [Demo2]
GO
USE [Demo2]
GO
CREATE TABLE [BinaryStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
CREATE TABLE [TextStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX))
GO
CREATE TABLE [BinaryStreamsCopy] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
该示例说明如何执行以下功能:
在 .NET 中将大型 BLOB 传输到 SQL Server。
在 .NET 中将大型文本文件传输到 SQL Server。
使用新异步功能来传输大型 BLOB。
使用新异步功能和 await 关键字来传输大型 BLOB。
取消大型 BLOB 传输。
使用新的异步功能从一个 SQL Server 流式传输到另一个 SQL Server。
using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace StreamingToServer
{
class Program
{
private const string connectionString = @"Server=localhost;Database=Demo2;Integrated Security=true";
static void Main(string[] args)
{
CreateDemoFiles();
StreamBLOBToServer().Wait();
StreamTextToServer().Wait();
// Create a CancellationTokenSource that will be cancelled after 100ms
// Typically this token source will be cancelled by a user request (e.g. a Cancel button)
CancellationTokenSource tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(100);
try
{
CancelBLOBStream(tokenSource.Token).Wait();
}
catch (AggregateException ex)
{
// Cancelling an async operation will throw an exception
// Since we are using the Task's Wait method, this exception will be wrapped in an AggregateException
// If you were using the 'await' keyword, the compiler would take care of unwrapping the AggregateException
// Depending on when the cancellation occurs, you can either get an error from SQL Server or from .Net
if ((ex.InnerException is SqlException) || (ex.InnerException is TaskCanceledException))
{
// This is an expected exception
Console.WriteLine("Got expected exception: {0}", ex.InnerException.Message);
}
else
{
// Did not expect this exception - re-throw it
throw;
}
}
Console.WriteLine("Done");
}
// This is used to generate the files which are used by the other sample methods
private static void CreateDemoFiles()
{
Random rand = new Random();
byte[] data = new byte[1024];
rand.NextBytes(data);
using (FileStream file = File.Open("binarydata.bin", FileMode.Create))
{
file.Write(data, 0, data.Length);
}
using (StreamWriter writer = new StreamWriter(File.Open("textdata.txt", FileMode.Create)))
{
writer.Write(Convert.ToBase64String(data));
}
}
// Application transferring a large BLOB to SQL Server
private static async Task StreamBLOBToServer()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
using (SqlCommand cmd = new SqlCommand("INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn))
{
using (FileStream file = File.Open("binarydata.bin", FileMode.Open))
{
// Add a parameter which uses the FileStream we just opened
// Size is set to -1 to indicate "MAX"
cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;
// Send the data to the server asynchronously
await cmd.ExecuteNonQueryAsync();
}
}
}
}
// Application transferring a large Text File to SQL Server
private static async Task StreamTextToServer()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
using (SqlCommand cmd = new SqlCommand("INSERT INTO [TextStreams] (textdata) VALUES (@textdata)", conn))
{
using (StreamReader file = File.OpenText("textdata.txt"))
{
// Add a parameter which uses the StreamReader we just opened
// Size is set to -1 to indicate "MAX"
cmd.Parameters.Add("@textdata", SqlDbType.NVarChar, -1).Value = file;
// Send the data to the server asynchronously
await cmd.ExecuteNonQueryAsync();
}
}
}
}
// Cancelling the transfer of a large BLOB
private static async Task CancelBLOBStream(CancellationToken cancellationToken)
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
// We can cancel not only sending the data to the server, but also opening the connection
await conn.OpenAsync(cancellationToken);
// Artificially delay the command by 100ms
using (SqlCommand cmd = new SqlCommand("WAITFOR DELAY '00:00:00:100';INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn))
{
using (FileStream file = File.Open("binarydata.bin", FileMode.Open))
{
// Add a parameter which uses the FileStream we just opened
// Size is set to -1 to indicate "MAX"
cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;
// Send the data to the server asynchronously
// Pass the cancellation token such that the command will be cancelled if needed
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
}
}
}
}
}
示例 -- 从一个 SQL Server 流式处理到另一个 SQL Server
此示例演示如何以异步方式将大型 BLOB 从一个 SQL Server 流式传输到另一个 SQL Server,支持取消。
注意
在运行以下示例之前,请确保已创建 Demo 和 Demo2 数据库。
using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace StreamingFromServerToAnother
{
class Program
{
private const string connectionString = @"Server=localhost;Database=Demo2;Integrated Security=true";
static void Main(string[] args)
{
// For this example, we don't want to cancel
// So we can pass in a "blank" cancellation token
E2EStream(CancellationToken.None).Wait();
Console.WriteLine("Done");
}
// Streaming from one SQL Server to Another One
private static async Task E2EStream(CancellationToken cancellationToken)
{
using (SqlConnection readConn = new SqlConnection(connectionString))
{
using (SqlConnection writeConn = new SqlConnection(connectionString))
{
// Note that we are using the same cancellation token for calls to both connections\commands
// Also we can start both the connection opening asynchronously, and then wait for both to complete
Task openReadConn = readConn.OpenAsync(cancellationToken);
Task openWriteConn = writeConn.OpenAsync(cancellationToken);
await Task.WhenAll(openReadConn, openWriteConn);
using (SqlCommand readCmd = new SqlCommand("SELECT [bindata] FROM [BinaryStreams]", readConn))
{
using (SqlCommand writeCmd = new SqlCommand("INSERT INTO [BinaryStreamsCopy] (bindata) VALUES (@bindata)", writeConn))
{
// Add an empty parameter to the write command which will be used for the streams we are copying
// Size is set to -1 to indicate "MAX"
SqlParameter streamParameter = writeCmd.Parameters.Add("@bindata", SqlDbType.Binary, -1);
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken))
{
while (await reader.ReadAsync(cancellationToken))
{
// Grab a stream to the binary data in the source database
using (Stream dataStream = reader.GetStream(0))
{
// Set the parameter value to the stream source that was opened
streamParameter.Value = dataStream;
// Asynchronously send data from one database to another
await writeCmd.ExecuteNonQueryAsync(cancellationToken);
}
}
}
}
}
}
}
}
}
}