SqlClient-Streamingunterstützung
Gilt für: .NET Framework .NET .NET Standard
Die Streamingunterstützung zwischen SQL Server und einer Anwendung unterstützt unstrukturierte Daten auf dem Server (Dokumente, Bilder und Mediendateien). Eine SQL Server-Datenbank kann BLOBs (Binary Large Objects) speichern, beim Abrufen von BLOBs kann jedoch viel Arbeitsspeicher beansprucht werden.
Durch die Unterstützung des Streamings an und von SQL Server wird das Schreiben von Anwendungen vereinfacht, die Daten streamen. Daten müssen nicht vollständig in den Arbeitsspeicher geladen werden, was zu weniger Ausnahmefehlern aufgrund Arbeitsspeicherüberlaufs führt.
Die Unterstützung des Streamings wird auch eine bessere Skalierung von Middle-Tier-Anwendungen ermöglichen, insbesondere in Szenarien, in denen Geschäftsobjekte Verbindungen mit Azure SQL herstellen, um große BLOBs zu senden, abzurufen und zu bearbeiten.
Warnung
Die Member, die Streaming unterstützen, werden zum Abrufen von Daten aus Abfragen und zur Übergabe von Parametern an Abfragen und gespeicherte Prozeduren verwendet. Das Streamingfeature behandelt grundlegende OLTP- und Datenmigrationsszenarien und ist für lokale und externe Datenmigrationsumgebungen geeignet.
Streamingunterstützung von SQL Server
Mit der Unterstützung des Streamings von SQL Server werden neue Funktionen in den Klassen DbDataReader und SqlDataReader eingeführt, um Stream-, XmlReader- und TextReader-Objekte abzurufen und darauf zu reagieren. Diese Klassen werden verwendet, um Daten aus Abfragen abzurufen. Daher betrifft das unterstützte Streaming von SQL Server OLTP-Szenarien und ist für lokale und externe Umgebungen geeignet.
Die folgenden Member wurden SqlDataReader hinzugefügt, um die Streamingunterstützung von SQL Server zu ermöglichen:
Die folgenden Member wurden DbDataReader hinzugefügt, um die Streamingunterstützung von SQL Server zu ermöglichen:
Streamingunterstützung zu SQL Server
Die Streamingunterstützung zu SQL Server befindet sich in der Klasse SqlParameter, damit sie XmlReader-, Stream- und TextReader-Objekte akzeptieren und auf diese reagieren kann. SqlParameter wird verwendet, um Parameter an Abfragen und gespeicherte Prozeduren zu übergeben.
Hinweis
Durch das Verwerfen eines SqlCommand-Objekts oder das Aufrufen von Cancel muss jeglicher Streamingvorgang abgebrochen werden. Wenn eine Anwendung CancellationToken sendet, wird der Abbruch nicht sichergestellt.
Die folgenden SqlDbType-Typen akzeptieren den Value-Wert Stream:
Binär (Binary)
VarBinary
Die folgenden SqlDbType-Typen akzeptieren den Value-Wert TextReader:
Char
NChar
NVarChar
Xml
Der XmlSqlDbType-Typ akzeptiert ein Value von XmlReader.
SqlValue kann Werte des Typs XmlReader, TextReader und Stream akzeptieren.
Das XmlReader-, TextReader- und Stream-Objekt wird bis zu dem von Size definierten Wert übertragen.
Beispiel: Streaming von SQL Server
Verwenden Sie das folgende Transact-SQL, um die Beispieldatenbank zu erstellen:
CREATE DATABASE [Demo]
GO
USE [Demo]
GO
CREATE TABLE [Streams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX),
[bindata] VARBINARY(MAX),
[xmldata] XML)
GO
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'This is a test', 0x48656C6C6F, N'<test>value</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Hello, World!', 0x54657374696E67, N'<test>value2</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Another row', 0x666F6F626172, N'<fff>bbb</fff><fff>bbc</fff>')
GO
Das Beispiel erläutert die folgenden Aufgaben:
Vermeiden, dass ein Benutzeroberflächenthread blockiert wird, indem eine asynchrone Methode zum Abrufen großer Dateien bereitgestellt wird.
Übertragen einer großen Textdatei aus SQL Server in .NET.
Übertragen einer großen XML-Datei aus SQL Server in .NET.
Abrufen von Daten aus SQL Server.
Übertragen großer Dateien (BLOB)s aus einer SQL Server-Datenbank in eine andere, ohne dass der Arbeitsspeicher knapp wird.
using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading.Tasks;
using System.Xml;
namespace StreamingFromServer
{
class Program
{
private const string connectionString = @"Server=localhost;Database=Demo;Integrated Security=true";
static void Main(string[] args)
{
CopyBinaryValueToFile().Wait();
PrintTextValues().Wait();
PrintXmlValues().Wait();
PrintXmlValuesViaNVarChar().Wait();
Console.WriteLine("Done");
}
// Application retrieving a large BLOB from SQL Server in .NET 4.5 using the new asynchronous capability
private static async Task CopyBinaryValueToFile()
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [bindata] FROM [Streams] WHERE [id]=@id", connection))
{
command.Parameters.AddWithValue("id", 1);
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
if (await reader.ReadAsync())
{
if (!(await reader.IsDBNullAsync(0)))
{
using (FileStream file = new FileStream("binarydata.bin", FileMode.Create, FileAccess.Write))
{
using (Stream data = reader.GetStream(0))
{
// Asynchronously copy the stream from the server to the file we just created
await data.CopyToAsync(file);
}
}
}
}
}
}
}
}
// Application transferring a large Text File from SQL Server
private static async Task PrintTextValues()
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [id], [textdata] FROM [Streams]", connection))
{
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire text document into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
while (await reader.ReadAsync())
{
Console.Write("{0}: ", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1))
{
Console.Write("(NULL)");
}
else
{
char[] buffer = new char[4096];
int charsRead = 0;
using (TextReader data = reader.GetTextReader(1))
{
do
{
// Grab each chunk of text and write it to the console
// If you are writing to a TextWriter you should use WriteAsync or WriteLineAsync
charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
Console.Write(buffer, 0, charsRead);
} while (charsRead > 0);
}
}
Console.WriteLine();
}
}
}
}
}
// Application transferring a large Xml Document from SQL Server
private static async Task PrintXmlValues()
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand("SELECT [id], [xmldata] FROM [Streams]", connection))
{
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
while (await reader.ReadAsync())
{
Console.WriteLine("{0}: ", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1))
{
Console.WriteLine("\t(NULL)");
}
else
{
using (XmlReader xmlReader = reader.GetXmlReader(1))
{
int depth = 1;
// NOTE: The XmlReader returned by GetXmlReader does NOT support async operations
// See the example below (PrintXmlValuesViaNVarChar) for how to get an XmlReader with asynchronous capabilities
while (xmlReader.Read())
{
switch (xmlReader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
depth++;
break;
case XmlNodeType.Text:
Console.WriteLine("{0}{1}", new string('\t', depth), xmlReader.Value);
break;
case XmlNodeType.EndElement:
depth--;
Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
break;
}
}
}
}
}
}
}
}
}
// Application transferring a large Xml Document from SQL Server
// This goes via NVarChar and TextReader to enable asynchronous reading
private static async Task PrintXmlValuesViaNVarChar()
{
XmlReaderSettings xmlSettings = new XmlReaderSettings()
{
// Async must be explicitly enabled in the XmlReaderSettings otherwise the XmlReader will throw exceptions when async methods are called
Async = true,
// Since we will immediately wrap the TextReader we are creating in an XmlReader, we will permit the XmlReader to take care of closing\disposing it
CloseInput = true,
// If the Xml you are reading is not a valid document (as per <https://docs.microsoft.com/previous-versions/dotnet/netframework-4.0/6bts1x50(v=vs.100)>) you will need to set the conformance level to Fragment
ConformanceLevel = ConformanceLevel.Fragment
};
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync();
// Cast the XML into NVarChar to enable GetTextReader - trying to use GetTextReader on an XML type will throw an exception
using (SqlCommand command = new SqlCommand("SELECT [id], CAST([xmldata] AS NVARCHAR(MAX)) FROM [Streams]", connection))
{
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
while (await reader.ReadAsync())
{
Console.WriteLine("{0}:", reader.GetInt32(0));
if (await reader.IsDBNullAsync(1))
{
Console.WriteLine("\t(NULL)");
}
else
{
// Grab the row as a TextReader, then create an XmlReader on top of it
// We are not keeping a reference to the TextReader since the XmlReader is created with the "CloseInput" setting (so it will close the TextReader when needed)
using (XmlReader xmlReader = XmlReader.Create(reader.GetTextReader(1), xmlSettings))
{
int depth = 1;
// The XmlReader above now supports asynchronous operations, so we can use ReadAsync here
while (await xmlReader.ReadAsync())
{
switch (xmlReader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
depth++;
break;
case XmlNodeType.Text:
// Depending on what your data looks like, you should either use Value or GetValueAsync
// Value has less overhead (since it doesn't create a Task), but it may also block if additional data is required
Console.WriteLine("{0}{1}", new string('\t', depth), await xmlReader.GetValueAsync());
break;
case XmlNodeType.EndElement:
depth--;
Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
break;
}
}
}
}
}
}
}
}
}
}
}
Beispiel: Streaming zu SQL Server
Verwenden Sie das folgende Transact-SQL, um die Beispieldatenbank zu erstellen:
CREATE DATABASE [Demo2]
GO
USE [Demo2]
GO
CREATE TABLE [BinaryStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
CREATE TABLE [TextStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX))
GO
CREATE TABLE [BinaryStreamsCopy] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
Das Beispiel erläutert die folgenden Aufgaben:
Übertragen eines großen BLOBs zu SQL Server in .NET.
Übertragen einer großen Textdatei zu SQL Server in .NET.
Verwenden der neuen asynchronen Funktion zur Übertragung eines großen BLOBs.
Verwenden der neuen asynchronen Funktion und des await-Schlüsselworts zur Übertragung eines großen BLOBs.
Abbrechen der Übertragung eines großen BLOBs.
Streamen von einer SQL Server-Instanz zu einer anderen mithilfe des neuen asynchronen Features.
using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace StreamingToServer
{
class Program
{
private const string connectionString = @"Server=localhost;Database=Demo2;Integrated Security=true";
static void Main(string[] args)
{
CreateDemoFiles();
StreamBLOBToServer().Wait();
StreamTextToServer().Wait();
// Create a CancellationTokenSource that will be cancelled after 100ms
// Typically this token source will be cancelled by a user request (e.g. a Cancel button)
CancellationTokenSource tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(100);
try
{
CancelBLOBStream(tokenSource.Token).Wait();
}
catch (AggregateException ex)
{
// Cancelling an async operation will throw an exception
// Since we are using the Task's Wait method, this exception will be wrapped in an AggregateException
// If you were using the 'await' keyword, the compiler would take care of unwrapping the AggregateException
// Depending on when the cancellation occurs, you can either get an error from SQL Server or from .Net
if ((ex.InnerException is SqlException) || (ex.InnerException is TaskCanceledException))
{
// This is an expected exception
Console.WriteLine("Got expected exception: {0}", ex.InnerException.Message);
}
else
{
// Did not expect this exception - re-throw it
throw;
}
}
Console.WriteLine("Done");
}
// This is used to generate the files which are used by the other sample methods
private static void CreateDemoFiles()
{
Random rand = new Random();
byte[] data = new byte[1024];
rand.NextBytes(data);
using (FileStream file = File.Open("binarydata.bin", FileMode.Create))
{
file.Write(data, 0, data.Length);
}
using (StreamWriter writer = new StreamWriter(File.Open("textdata.txt", FileMode.Create)))
{
writer.Write(Convert.ToBase64String(data));
}
}
// Application transferring a large BLOB to SQL Server
private static async Task StreamBLOBToServer()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
using (SqlCommand cmd = new SqlCommand("INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn))
{
using (FileStream file = File.Open("binarydata.bin", FileMode.Open))
{
// Add a parameter which uses the FileStream we just opened
// Size is set to -1 to indicate "MAX"
cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;
// Send the data to the server asynchronously
await cmd.ExecuteNonQueryAsync();
}
}
}
}
// Application transferring a large Text File to SQL Server
private static async Task StreamTextToServer()
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
using (SqlCommand cmd = new SqlCommand("INSERT INTO [TextStreams] (textdata) VALUES (@textdata)", conn))
{
using (StreamReader file = File.OpenText("textdata.txt"))
{
// Add a parameter which uses the StreamReader we just opened
// Size is set to -1 to indicate "MAX"
cmd.Parameters.Add("@textdata", SqlDbType.NVarChar, -1).Value = file;
// Send the data to the server asynchronously
await cmd.ExecuteNonQueryAsync();
}
}
}
}
// Cancelling the transfer of a large BLOB
private static async Task CancelBLOBStream(CancellationToken cancellationToken)
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
// We can cancel not only sending the data to the server, but also opening the connection
await conn.OpenAsync(cancellationToken);
// Artificially delay the command by 100ms
using (SqlCommand cmd = new SqlCommand("WAITFOR DELAY '00:00:00:100';INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn))
{
using (FileStream file = File.Open("binarydata.bin", FileMode.Open))
{
// Add a parameter which uses the FileStream we just opened
// Size is set to -1 to indicate "MAX"
cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;
// Send the data to the server asynchronously
// Pass the cancellation token such that the command will be cancelled if needed
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
}
}
}
}
}
Beispiel: Streaming von einer SQL Server-Instanz zu einer anderen SQL Server-Instanz
Dieses Beispiel zeigt, wie ein großes BLOB asynchron von einer SQL Server-Instanz zu einer anderen gestreamt wird, wobei der Abbruch unterstützt wird.
Hinweis
Stellen Sie vor dem Ausführen des folgenden Beispiels sicher, dass die Datenbanken Demo und Demo2 erstellt wurden.
using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace StreamingFromServerToAnother
{
class Program
{
private const string connectionString = @"Server=localhost;Database=Demo2;Integrated Security=true";
static void Main(string[] args)
{
// For this example, we don't want to cancel
// So we can pass in a "blank" cancellation token
E2EStream(CancellationToken.None).Wait();
Console.WriteLine("Done");
}
// Streaming from one SQL Server to Another One
private static async Task E2EStream(CancellationToken cancellationToken)
{
using (SqlConnection readConn = new SqlConnection(connectionString))
{
using (SqlConnection writeConn = new SqlConnection(connectionString))
{
// Note that we are using the same cancellation token for calls to both connections\commands
// Also we can start both the connection opening asynchronously, and then wait for both to complete
Task openReadConn = readConn.OpenAsync(cancellationToken);
Task openWriteConn = writeConn.OpenAsync(cancellationToken);
await Task.WhenAll(openReadConn, openWriteConn);
using (SqlCommand readCmd = new SqlCommand("SELECT [bindata] FROM [BinaryStreams]", readConn))
{
using (SqlCommand writeCmd = new SqlCommand("INSERT INTO [BinaryStreamsCopy] (bindata) VALUES (@bindata)", writeConn))
{
// Add an empty parameter to the write command which will be used for the streams we are copying
// Size is set to -1 to indicate "MAX"
SqlParameter streamParameter = writeCmd.Parameters.Add("@bindata", SqlDbType.Binary, -1);
// The reader needs to be executed with the SequentialAccess behavior to enable network streaming
// Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions
using (SqlDataReader reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken))
{
while (await reader.ReadAsync(cancellationToken))
{
// Grab a stream to the binary data in the source database
using (Stream dataStream = reader.GetStream(0))
{
// Set the parameter value to the stream source that was opened
streamParameter.Value = dataStream;
// Asynchronously send data from one database to another
await writeCmd.ExecuteNonQueryAsync(cancellationToken);
}
}
}
}
}
}
}
}
}
}