SqlClient 串流支援
SQL Server 與應用程式之間的串流支援 (.NET Framework 4.5 的新功能) 可支援伺服器上的非結構化資料 (文件、影像與媒體檔案)。 SQL Server 資料庫可以儲存二進位大型物件 (BLOB),但擷取 BLOB 可能會佔用很多記憶體。
與 SQL Server 之間的串流支援,簡化了串流資料之應用程式的撰寫,不需將資料完全載入記憶體,因此可減少記憶體溢位例外狀況。
尤其是在商務物件連接到 SQL Azure 以傳送、擷取及管理大型 BLOB 的情況下,資料流支援也可以讓中介層應用程式擴充得更好。
警告
如果應用程式也使用 Context Connection
連接字串關鍵字,則不支援非同步呼叫。
加入以支援資料流的成員可用於擷取查詢中的資料,以及將參數傳遞至查詢及預存程序。 串流功能可處理基本 OLTP 與資料移轉案例,而且適用於內部部署與外部部署資料移轉環境。
從 SQL Server 的串流支援
從 SQL Server 串流的支援在 DbDataReader 與 SqlDataReader 類別中引進新功能,以取得 Stream、XmlReader 與 TextReader 物件,並回應這些物件。 這些類別用於從查詢中擷取資料。 因此,從 SQL Server 串流的支援可處理 OLTP 案例,而且適用於內部部署與外部部署環境。
已在 SqlDataReader 中新增下列成員,以啟用從 SQL Server 串流的支援:
已在 DbDataReader 中新增下列成員,以啟用從 SQL Server 串流的支援:
到 SQL Server 的串流支援
到 SQL Server 的串流支援在 SqlParameter 類別中導入新功能,使得它能夠接受並回應 XmlReader、Stream 和 TextReader 物件。 SqlParameter 用於將參數傳遞給查詢和預存程序。
您必須取消所有資料流作業,才能處置 SqlCommand 物件或呼叫 Cancel。 如果應用程式傳送 CancellationToken,就不保證能取消。
下列 SqlDbType 類型將接受 Value 的 Stream:
- 二進位
- VarBinary
下列 SqlDbType 類型將接受 Value 的 TextReader:
- Char
- NChar
- NVarChar
- XML
XmlSqlDbType 型別將接受 XmlReader 的 Value。
SqlValue 可接受型別 XmlReader、TextReader 和 Stream 的值。
XmlReader、TextReader 和 Stream 物件會被轉移到 Size 所定義的值。
範例:從 SQL Server 串流
使用下列 Transact-SQL 來建立範例資料庫:
CREATE DATABASE [Demo]
GO
USE [Demo]
GO
CREATE TABLE [Streams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX),
[bindata] VARBINARY(MAX),
[xmldata] XML)
GO
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'This is a test', 0x48656C6C6F, N'<test>value</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Hello, World!', 0x54657374696E67, N'<test>value2</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Another row', 0x666F6F626172, N'<fff>bbb</fff><fff>bbc</fff>')
GO
範例顯示如何執行下列動作:
- 提供擷取大型檔案的非同步方法,避免封鎖使用者介面執行緒。
- 在 .NET Framework 4.5 中從 SQL Server 傳輸大型文字檔。
- 在 .NET Framework 4.5 中從 SQL Server 傳輸大型 XML 檔。
- 從 SQL Server 擷取資料。
- 從某個 SQL Server 資料庫將大型檔案 (BLOB) 傳輸到另一個資料庫,而不會耗盡記憶體。
using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading.Tasks;
using System.Xml;
namespace StreamingFromServer {
class Program {
private const string connectionString = @"...";
static void Main(string[] args) {
CopyBinaryValueToFile().Wait();
PrintTextValues().Wait();
PrintXmlValues().Wait();
PrintXmlValuesViaNVarChar().Wait();
Console.WriteLine("Done");
}
// Retrieve a large BLOB from SQL Server in .NET Framework 4.5 using the asynchronous capability.
private static async Task CopyBinaryValueToFile() {
string filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments), "binarydata.bin");
using (SqlConnection connection = new SqlConnection(connectionString)) {
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [bindata] FROM [Streams] WHERE [id]=@id", connection)) {
command.Parameters.AddWithValue("id", 1);
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
// Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions.
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
if (await reader.ReadAsync()) {
if (!(await reader.IsDBNullAsync(0))) {
using (FileStream file = new FileStream(filePath, FileMode.Create, FileAccess.Write)) {
using (Stream data = reader.GetStream(0)) {
// Asynchronously copy the stream from the server to the file we just created.
await data.CopyToAsync(file);
}
}
}
}
}
}
}
}
// Transfer a large Text File from SQL Server in .NET Framework 4.5.
private static async Task PrintTextValues() {
using (SqlConnection connection = new SqlConnection(connectionString)) {
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [id], [textdata] FROM [Streams]", connection)) {
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
// Otherwise ReadAsync will buffer the entire text document into memory which can cause scalability issues or even OutOfMemoryExceptions.
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
while (await reader.ReadAsync()) {
Console.Write("{0}: ", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1)) {
Console.Write("(NULL)");
}
else {
char[] buffer = new char[4096];
int charsRead = 0;
using (TextReader data = reader.GetTextReader(1)) {
do {
// Grab each chunk of text and write it to the console.
// If you are writing to a TextWriter, you should use WriteAsync or WriteLineAsync.
charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
Console.Write(buffer, 0, charsRead);
} while (charsRead > 0);
}
}
Console.WriteLine();
}
}
}
}
}
// Transfer a large Xml Document from SQL Server in .NET Framework 4.5.
private static async Task PrintXmlValues() {
using (SqlConnection connection = new SqlConnection(connectionString)) {
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [id], [xmldata] FROM [Streams]", connection)) {
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
// Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions.
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
while (await reader.ReadAsync()) {
Console.WriteLine("{0}: ", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1)) {
Console.WriteLine("\t(NULL)");
}
else {
using (XmlReader xmlReader = reader.GetXmlReader(1)) {
int depth = 1;
// NOTE: The XmlReader returned by GetXmlReader does NOT support async operations.
// See the example below (PrintXmlValuesViaNVarChar) for how to get an XmlReader with asynchronous capabilities.
while (xmlReader.Read()) {
switch (xmlReader.NodeType) {
case XmlNodeType.Element:
Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
depth++;
break;
case XmlNodeType.Text:
Console.WriteLine("{0}{1}", new string('\t', depth), xmlReader.Value);
break;
case XmlNodeType.EndElement:
depth--;
Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
break;
}
}
}
}
}
}
}
}
}
// Transfer a large Xml Document from SQL Server in .NET Framework 4.5.
// This goes via NVarChar and TextReader to enable asynchronous reading.
private static async Task PrintXmlValuesViaNVarChar() {
XmlReaderSettings xmlSettings = new XmlReaderSettings() {
// Async must be explicitly enabled in the XmlReaderSettings otherwise the XmlReader will throw exceptions when async methods are called.
Async = true,
// Since we will immediately wrap the TextReader we are creating in an XmlReader, we will permit the XmlReader to take care of closing\disposing it.
CloseInput = true,
// If the Xml you are reading is not a valid document (as per <https://learn.microsoft.com/previous-versions/dotnet/netframework-4.0/6bts1x50(v=vs.100)>) you will need to set the conformance level to Fragment.
ConformanceLevel = ConformanceLevel.Fragment
};
using (SqlConnection connection = new SqlConnection(connectionString)) {
await connection.OpenAsync();
// Cast the XML into NVarChar to enable GetTextReader - trying to use GetTextReader on an XML type will throw an exception.
using (SqlCommand command = new SqlCommand("SELECT [id], CAST([xmldata] AS NVARCHAR(MAX)) FROM [Streams]", connection)) {
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
// Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions.
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
while (await reader.ReadAsync()) {
Console.WriteLine("{0}:", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1)) {
Console.WriteLine("\t(NULL)");
}
else {
// Grab the row as a TextReader, then create an XmlReader on top of it.
// The code doesn't keep a reference to the TextReader since the XmlReader is created with the "CloseInput" setting (so it will close the TextReader when needed).
using (XmlReader xmlReader = XmlReader.Create(reader.GetTextReader(1), xmlSettings)) {
int depth = 1;
// The XmlReader above now supports asynchronous operations, so we can use ReadAsync here.
while (await xmlReader.ReadAsync()) {
switch (xmlReader.NodeType) {
case XmlNodeType.Element:
Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
depth++;
break;
case XmlNodeType.Text:
// Depending on what your data looks like, you should either use Value or GetValueAsync.
// Value has less overhead (since it doesn't create a Task), but it may also block if additional data is required.
Console.WriteLine("{0}{1}", new string('\t', depth), await xmlReader.GetValueAsync());
break;
case XmlNodeType.EndElement:
depth--;
Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
break;
}
}
}
}
}
}
}
}
}
}
}
範例:串流到 SQL Server
使用下列 Transact-SQL 來建立範例資料庫:
CREATE DATABASE [Demo2]
GO
USE [Demo2]
GO
CREATE TABLE [BinaryStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
CREATE TABLE [TextStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX))
GO
CREATE TABLE [BinaryStreamsCopy] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
範例顯示如何執行下列動作:
- 在 .NET Framework 4.5 中將大型 BLOB 傳輸到 SQL Server。
- 在 .NET Framework 4.5 中將大型文字檔傳輸到 SQL Server。
- 使用新的非同步功能傳輸大型 BLOB。
- 使用新的非同步功能及 await 關鍵字傳輸大型 BLOB。
- 取消傳輸大型 BLOB。
- 使用新的非同步功能,從單一 SQL Server 串流到另一 SQL Server。
using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace StreamingToServer {
class Program {
private const string connectionString = @"...";
static void Main(string[] args) {
CreateDemoFiles();
StreamBLOBToServer().Wait();
StreamTextToServer().Wait();
// Create a CancellationTokenSource that will be cancelled after 100ms
// Typically this token source will be cancelled by a user request (e.g. a Cancel button).
CancellationTokenSource tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(100);
try {
CancelBLOBStream(tokenSource.Token).Wait();
}
catch (AggregateException ex) {
// Cancelling an async operation will throw an exception.
// Since we are using the Task's Wait method, this exception will be wrapped in an AggregateException.
// If you were using the 'await' keyword, the compiler would take care of unwrapping the AggregateException.
// Depending on when the cancellation occurs, you can either get an error from SQL Server or from .Net.
if ((ex.InnerException is SqlException) || (ex.InnerException is TaskCanceledException)) {
// This is an expected exception.
Console.WriteLine("Got expected exception: {0}", ex.InnerException.Message);
}
else {
// Did not expect this exception - rethrow it.
throw;
}
}
Console.WriteLine("Done");
}
// This is used to generate the files which are used by the other sample methods.
private static void CreateDemoFiles() {
Random rand = new Random();
byte[] data = new byte[1024];
rand.NextBytes(data);
using (FileStream file = File.Open("binarydata.bin", FileMode.Create)) {
file.Write(data, 0, data.Length);
}
using (StreamWriter writer = new StreamWriter(File.Open("textdata.txt", FileMode.Create))) {
writer.Write(Convert.ToBase64String(data));
}
}
// Transfer a large BLOB to SQL Server in .NET Framework 4.5.
private static async Task StreamBLOBToServer() {
using (SqlConnection conn = new SqlConnection(connectionString)) {
await conn.OpenAsync();
using (SqlCommand cmd = new SqlCommand("INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn)) {
using (FileStream file = File.Open("binarydata.bin", FileMode.Open)) {
// Add a parameter which uses the FileStream we just opened
// Size is set to -1 to indicate "MAX".
cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;
// Send the data to the server asynchronously.
await cmd.ExecuteNonQueryAsync();
}
}
}
}
// Transfer a large Text File to SQL Server in .NET Framework 4.5.
private static async Task StreamTextToServer() {
using (SqlConnection conn = new SqlConnection(connectionString)) {
await conn.OpenAsync();
using (SqlCommand cmd = new SqlCommand("INSERT INTO [TextStreams] (textdata) VALUES (@textdata)", conn)) {
using (StreamReader file = File.OpenText("textdata.txt")) {
// Add a parameter which uses the StreamReader we just opened.
// Size is set to -1 to indicate "MAX".
cmd.Parameters.Add("@textdata", SqlDbType.NVarChar, -1).Value = file;
// Send the data to the server asynchronously.
await cmd.ExecuteNonQueryAsync();
}
}
}
}
// Cancel the transfer of a large BLOB.
private static async Task CancelBLOBStream(CancellationToken cancellationToken) {
using (SqlConnection conn = new SqlConnection(connectionString)) {
// We can cancel not only sending the data to the server, but also opening the connection.
await conn.OpenAsync(cancellationToken);
// Artificially delay the command by 100ms.
using (SqlCommand cmd = new SqlCommand("WAITFOR DELAY '00:00:00:100';INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn)) {
using (FileStream file = File.Open("binarydata.bin", FileMode.Open)) {
// Add a parameter which uses the FileStream we just opened.
// Size is set to -1 to indicate "MAX".
cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;
// Send the data to the server asynchronously.
// Pass the cancellation token such that the command will be cancelled if needed.
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
}
}
}
}
}
範例:從單一 SQL Server 串流到另一 SQL Server
此範例示範如何以非同步方式,將大型 BLOB 從某部 SQL Server 串流到另一部,並支援取消。
using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace StreamingFromServerToAnother {
class Program {
private const string connectionString = @"...";
static void Main(string[] args) {
// For this example, we don't want to cancel,
// so pass in a "blank" cancellation token.
E2EStream(CancellationToken.None).Wait();
Console.WriteLine("Done");
}
// Streaming from one SQL Server to Another One using the new Async.NET.
private static async Task E2EStream(CancellationToken cancellationToken) {
using (SqlConnection readConn = new SqlConnection(connectionString)) {
using (SqlConnection writeConn = new SqlConnection(connectionString)) {
// Note that we are using the same cancellation token for calls to both connections\commands.
// Also we can start both the connection opening asynchronously, and then wait for both to complete.
Task openReadConn = readConn.OpenAsync(cancellationToken);
Task openWriteConn = writeConn.OpenAsync(cancellationToken);
await Task.WhenAll(openReadConn, openWriteConn);
using (SqlCommand readCmd = new SqlCommand("SELECT [bindata] FROM [BinaryStreams]", readConn)) {
using (SqlCommand writeCmd = new SqlCommand("INSERT INTO [BinaryStreamsCopy] (bindata) VALUES (@bindata)", writeConn)) {
// Add an empty parameter to the write command which will be used for the streams we are copying.
// Size is set to -1 to indicate "MAX".
SqlParameter streamParameter = writeCmd.Parameters.Add("@bindata", SqlDbType.Binary, -1);
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
// Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions.
using (SqlDataReader reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)) {
while (await reader.ReadAsync(cancellationToken)) {
// Grab a stream to the binary data in the source database.
using (Stream dataStream = reader.GetStream(0)) {
// Set the parameter value to the stream source that was opened.
streamParameter.Value = dataStream;
// Asynchronously send data from one database to another.
await writeCmd.ExecuteNonQueryAsync(cancellationToken);
}
}
}
}
}
}
}
}
}
}