Delen via


Streamingondersteuning voor SqlClient

Van toepassing op: .NET Framework .NET Standard

ADO.NET downloaden

Streaming-ondersteuning tussen SQL Server en een toepassing ondersteunt ongestructureerde gegevens op de server (documenten, afbeeldingen en mediabestanden). Een SQL Server-database kan binaire grote objecten (BLOBs) opslaan, maar het ophalen van BLOBS kan een grote hoeveelheid geheugen gebruiken.

Streaming-ondersteuning van en naar SQL Server vereenvoudigt het schrijven van toepassingen die gegevens streamen, zonder dat de gegevens volledig in het geheugen hoeven te worden geladen, wat resulteert in minder uitzonderingen voor geheugenoverloop.

Met streamingondersteuning kunnen toepassingen in de middelste laag ook beter worden geschaald, met name in scenario's waarin zakelijke objecten verbinding maken met Azure SQL om grote BLOBS te verzenden, op te halen en te manipuleren.

Waarschuwing

De leden die streaming ondersteunen, worden gebruikt om gegevens op te halen uit query's en parameters door te geven aan query's en opgeslagen procedures. De streamingfunctie heeft betrekking op basisscenario's voor Online Transaction Processing (OLTP) en gegevensmigratie en is van toepassing op on-premises en off-premises gegevensmigratieomgevingen.

Streaming-ondersteuning van SQL Server

Streaming-ondersteuning van SQL Server introduceert nieuwe functionaliteit in de DbDataReader en SqlDataReader klassen om Stream, XmlReader, en TextReader objecten op te halen en daarop te reageren. Deze klassen worden gebruikt om gegevens op te halen uit query's. Hierdoor is streamingondersteuning van SQL Server gericht op OLTP-scenario's en van toepassing op on-premises en off-premises omgevingen.

De volgende leden zijn toegevoegd om SqlDataReader streaming-ondersteuning van SQL Server in te schakelen:

De volgende leden zijn toegevoegd om DbDataReader streaming-ondersteuning van SQL Server in te schakelen:

Streaming-ondersteuning voor SQL Server

Streaming-ondersteuning voor SQL Server bevindt zich in de SqlParameter klasse, zodat deze objecten kan accepteren en erop kan reagerenXmlReaderStreamTextReader. SqlParameter wordt gebruikt om parameters door te geven aan query's en opgeslagen procedures.

Opmerking

Het verwijderen van een SqlCommand object of aanroepen Cancel moet een streamingbewerking annuleren. Als een toepassing verzendt CancellationToken, wordt annulering niet gegarandeerd.

De volgende SqlDbType typen accepteren een Value van Stream:

  • Binary

  • VarBinary

De volgende SqlDbType typen accepteren een Value van TextReader:

  • Char

  • NChar

  • NVarChar

  • Xml

  • Json

Het XML-typeSqlDbType accepteert een Value van XmlReader.

SqlValue kan waarden van het type XmlReaderaccepteren, TextReaderen Stream.

Het XmlReader, TextReaderen Stream object wordt overgebracht naar de waarde die is gedefinieerd door de Size.

Voorbeeldstreaming vanuit SQL Server

Gebruik de volgende Transact-SQL om de voorbeelddatabase te maken:

CREATE DATABASE [Demo]
GO
USE [Demo]
GO
CREATE TABLE [Streams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX),
[bindata] VARBINARY(MAX),
[xmldata] XML)
GO
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'This is a test', 0x48656C6C6F, N'<test>value</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Hello, World!', 0x54657374696E67, N'<test>value2</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Another row', 0x666F6F626172, N'<fff>bbb</fff><fff>bbc</fff>')
GO

In het voorbeeld ziet u hoe u de volgende acties kunt uitvoeren:

  • Vermijd het blokkeren van een gebruikersinterfacethread door een asynchrone manier te bieden om grote bestanden op te halen.

  • Een groot tekstbestand overdragen van SQL Server in .NET.

  • Een groot XML-bestand overdragen van SQL Server in .NET.

  • Gegevens ophalen uit SQL Server.

  • Draag grote bestanden (BLOBs) over van de ene SQL Server-database naar de andere zonder dat er onvoldoende geheugen beschikbaar is.

using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading.Tasks;
using System.Xml;

namespace StreamingFromServer
{
    class Program
    {
        private const string connectionString = @"Server=localhost;Database=Demo;Integrated Security=true";

        static void Main(string[] args)
        {
            CopyBinaryValueToFile().Wait();
            PrintTextValues().Wait();
            PrintXmlValues().Wait();
            PrintXmlValuesViaNVarChar().Wait();

            Console.WriteLine("Done");
        }

        // Application retrieving a large BLOB from SQL Server in .NET 4.5 using the new asynchronous capability
        private static async Task CopyBinaryValueToFile()
        {
            using (SqlConnection connection = new SqlConnection(connectionString))
            {
                await connection.OpenAsync();
                using (SqlCommand command = new SqlCommand("SELECT [bindata] FROM [Streams] WHERE [id]=@id", connection))
                {
                    command.Parameters.AddWithValue("id", 1);

                    // The reader needs to be executed with the SequentialAccess behavior to enable network streaming
                    // Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions
                    using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                    {
                        if (await reader.ReadAsync())
                        {
                            if (!(await reader.IsDBNullAsync(0)))
                            {
                                using (FileStream file = new FileStream("binarydata.bin", FileMode.Create, FileAccess.Write))
                                {
                                    using (Stream data = reader.GetStream(0))
                                    {

                                        // Asynchronously copy the stream from the server to the file we just created
                                        await data.CopyToAsync(file);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

        // Application transferring a large Text File from SQL Server
        private static async Task PrintTextValues()
        {
            using (SqlConnection connection = new SqlConnection(connectionString))
            {
                await connection.OpenAsync();
                using (SqlCommand command = new SqlCommand("SELECT [id], [textdata] FROM [Streams]", connection))
                {

                    // The reader needs to be executed with the SequentialAccess behavior to enable network streaming
                    // Otherwise ReadAsync will buffer the entire text document into memory which can cause scalability issues or even OutOfMemoryExceptions
                    using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                    {
                        while (await reader.ReadAsync())
                        {
                            Console.Write("{0}: ", reader.GetInt32(0));

                            if (await reader.IsDBNullAsync(1))
                            {
                                Console.Write("(NULL)");
                            }
                            else
                            {
                                char[] buffer = new char[4096];
                                int charsRead = 0;
                                using (TextReader data = reader.GetTextReader(1))
                                {
                                    do
                                    {
                                        // Grab each chunk of text and write it to the console
                                        // If you are writing to a TextWriter you should use WriteAsync or WriteLineAsync
                                        charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
                                        Console.Write(buffer, 0, charsRead);
                                    } while (charsRead > 0);
                                }
                            }

                            Console.WriteLine();
                        }
                    }
                }
            }
        }

        // Application transferring a large Xml Document from SQL Server
        private static async Task PrintXmlValues()
        {
            using (SqlConnection connection = new SqlConnection(connectionString))
            {
                await connection.OpenAsync();
                using (SqlCommand command = new SqlCommand("SELECT [id], [xmldata] FROM [Streams]", connection))
                {

                    // The reader needs to be executed with the SequentialAccess behavior to enable network streaming
                    // Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions
                    using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                    {
                        while (await reader.ReadAsync())
                        {
                            Console.WriteLine("{0}: ", reader.GetInt32(0));

                            if (await reader.IsDBNullAsync(1))
                            {
                                Console.WriteLine("\t(NULL)");
                            }
                            else
                            {
                                using (XmlReader xmlReader = reader.GetXmlReader(1))
                                {
                                    int depth = 1;
                                    // NOTE: The XmlReader returned by GetXmlReader does NOT support async operations
                                    // See the example below (PrintXmlValuesViaNVarChar) for how to get an XmlReader with asynchronous capabilities
                                    while (xmlReader.Read())
                                    {
                                        switch (xmlReader.NodeType)
                                        {
                                            case XmlNodeType.Element:
                                                Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
                                                depth++;
                                                break;
                                            case XmlNodeType.Text:
                                                Console.WriteLine("{0}{1}", new string('\t', depth), xmlReader.Value);
                                                break;
                                            case XmlNodeType.EndElement:
                                                depth--;
                                                Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
                                                break;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

        // Application transferring a large Xml Document from SQL Server
        // This goes via NVarChar and TextReader to enable asynchronous reading
        private static async Task PrintXmlValuesViaNVarChar()
        {
            XmlReaderSettings xmlSettings = new XmlReaderSettings()
            {
                // Async must be explicitly enabled in the XmlReaderSettings otherwise the XmlReader will throw exceptions when async methods are called
                Async = true,
                // Since we will immediately wrap the TextReader we are creating in an XmlReader, we will permit the XmlReader to take care of closing\disposing it
                CloseInput = true,
                // If the Xml you are reading is not a valid document (as per <https://docs.microsoft.com/previous-versions/dotnet/netframework-4.0/6bts1x50(v=vs.100)>) you will need to set the conformance level to Fragment
                ConformanceLevel = ConformanceLevel.Fragment
            };

            using (SqlConnection connection = new SqlConnection(connectionString))
            {
                await connection.OpenAsync();

                // Cast the XML into NVarChar to enable GetTextReader - trying to use GetTextReader on an XML type will throw an exception
                using (SqlCommand command = new SqlCommand("SELECT [id], CAST([xmldata] AS NVARCHAR(MAX)) FROM [Streams]", connection))
                {

                    // The reader needs to be executed with the SequentialAccess behavior to enable network streaming
                    // Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions
                    using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                    {
                        while (await reader.ReadAsync())
                        {
                            Console.WriteLine("{0}:", reader.GetInt32(0));

                            if (await reader.IsDBNullAsync(1))
                            {
                                Console.WriteLine("\t(NULL)");
                            }
                            else
                            {
                                // Grab the row as a TextReader, then create an XmlReader on top of it
                                // We are not keeping a reference to the TextReader since the XmlReader is created with the "CloseInput" setting (so it will close the TextReader when needed)
                                using (XmlReader xmlReader = XmlReader.Create(reader.GetTextReader(1), xmlSettings))
                                {
                                    int depth = 1;
                                    // The XmlReader above now supports asynchronous operations, so we can use ReadAsync here
                                    while (await xmlReader.ReadAsync())
                                    {
                                        switch (xmlReader.NodeType)
                                        {
                                            case XmlNodeType.Element:
                                                Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
                                                depth++;
                                                break;
                                            case XmlNodeType.Text:
                                                // Depending on what your data looks like, you should either use Value or GetValueAsync
                                                // Value has less overhead (since it doesn't create a Task), but it may also block if additional data is required
                                                Console.WriteLine("{0}{1}", new string('\t', depth), await xmlReader.GetValueAsync());
                                                break;
                                            case XmlNodeType.EndElement:
                                                depth--;
                                                Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
                                                break;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

Voorbeeld--streaming naar SQL Server

Gebruik de volgende Transact-SQL om de voorbeelddatabase te maken:

CREATE DATABASE [Demo2]
GO
USE [Demo2]
GO
CREATE TABLE [BinaryStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
CREATE TABLE [TextStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX))
GO
CREATE TABLE [BinaryStreamsCopy] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO

In het voorbeeld ziet u hoe u de volgende acties kunt uitvoeren:

  • Een grote BLOB overbrengen naar SQL Server in .NET.

  • Een groot tekstbestand overbrengen naar SQL Server in .NET.

  • Gebruik de nieuwe asynchrone functie om een grote BLOB over te dragen.

  • Gebruik de nieuwe asynchrone functie en het await trefwoord om een grote BLOB over te dragen.

  • De overdracht van een grote BLOB wordt geannuleerd.

  • Streaming van de ene SQL Server naar de andere met behulp van de asynchrone functie.

using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace StreamingToServer
{
    class Program
    {
        private const string connectionString = @"Server=localhost;Database=Demo2;Integrated Security=true";

        static void Main(string[] args)
        {
            CreateDemoFiles();

            StreamBLOBToServer().Wait();
            StreamTextToServer().Wait();

            // Create a CancellationTokenSource that will be cancelled after 100ms
            // Typically this token source will be cancelled by a user request (e.g. a Cancel button)
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            tokenSource.CancelAfter(100);
            try
            {
                CancelBLOBStream(tokenSource.Token).Wait();
            }
            catch (AggregateException ex)
            {
                // Cancelling an async operation will throw an exception
                // Since we are using the Task's Wait method, this exception will be wrapped in an AggregateException
                // If you were using the 'await' keyword, the compiler would take care of unwrapping the AggregateException
                // Depending on when the cancellation occurs, you can either get an error from SQL Server or from .Net
                if ((ex.InnerException is SqlException) || (ex.InnerException is TaskCanceledException))
                {
                    // This is an expected exception
                    Console.WriteLine("Got expected exception: {0}", ex.InnerException.Message);
                }
                else
                {
                    // Did not expect this exception - re-throw it
                    throw;
                }
            }

            Console.WriteLine("Done");
        }

        // This is used to generate the files which are used by the other sample methods
        private static void CreateDemoFiles()
        {
            Random rand = new Random();
            byte[] data = new byte[1024];
            rand.NextBytes(data);

            using (FileStream file = File.Open("binarydata.bin", FileMode.Create))
            {
                file.Write(data, 0, data.Length);
            }

            using (StreamWriter writer = new StreamWriter(File.Open("textdata.txt", FileMode.Create)))
            {
                writer.Write(Convert.ToBase64String(data));
            }
        }

        // Application transferring a large BLOB to SQL Server
        private static async Task StreamBLOBToServer()
        {
            using (SqlConnection conn = new SqlConnection(connectionString))
            {
                await conn.OpenAsync();
                using (SqlCommand cmd = new SqlCommand("INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn))
                {
                    using (FileStream file = File.Open("binarydata.bin", FileMode.Open))
                    {

                        // Add a parameter which uses the FileStream we just opened
                        // Size is set to -1 to indicate "MAX"
                        cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;

                        // Send the data to the server asynchronously
                        await cmd.ExecuteNonQueryAsync();
                    }
                }
            }
        }

        // Application transferring a large Text File to SQL Server
        private static async Task StreamTextToServer()
        {
            using (SqlConnection conn = new SqlConnection(connectionString))
            {
                await conn.OpenAsync();
                using (SqlCommand cmd = new SqlCommand("INSERT INTO [TextStreams] (textdata) VALUES (@textdata)", conn))
                {
                    using (StreamReader file = File.OpenText("textdata.txt"))
                    {

                        // Add a parameter which uses the StreamReader we just opened
                        // Size is set to -1 to indicate "MAX"
                        cmd.Parameters.Add("@textdata", SqlDbType.NVarChar, -1).Value = file;

                        // Send the data to the server asynchronously
                        await cmd.ExecuteNonQueryAsync();
                    }
                }
            }
        }

        // Cancelling the transfer of a large BLOB
        private static async Task CancelBLOBStream(CancellationToken cancellationToken)
        {
            using (SqlConnection conn = new SqlConnection(connectionString))
            {
                // We can cancel not only sending the data to the server, but also opening the connection
                await conn.OpenAsync(cancellationToken);

                // Artificially delay the command by 100ms
                using (SqlCommand cmd = new SqlCommand("WAITFOR DELAY '00:00:00:100';INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn))
                {
                    using (FileStream file = File.Open("binarydata.bin", FileMode.Open))
                    {

                        // Add a parameter which uses the FileStream we just opened
                        // Size is set to -1 to indicate "MAX"
                        cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;

                        // Send the data to the server asynchronously
                        // Pass the cancellation token such that the command will be cancelled if needed
                        await cmd.ExecuteNonQueryAsync(cancellationToken);
                    }
                }
            }
        }
    }
}

Voorbeeldstreaming van de ene SQL Server naar een andere SQL Server

In dit voorbeeld ziet u hoe u asynchroon een grote BLOB van de ene SQL Server naar de andere streamt, met ondersteuning voor annulering.

Opmerking

Voordat u het volgende voorbeeld uitvoert, moet u ervoor zorgen dat de demo- en demo2-databases zijn gemaakt.

using System;
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace StreamingFromServerToAnother
{
    class Program
    {
        private const string connectionString = @"Server=localhost;Database=Demo2;Integrated Security=true";

        static void Main(string[] args)
        {
            // For this example, we don't want to cancel
            // So we can pass in a "blank" cancellation token
            E2EStream(CancellationToken.None).Wait();

            Console.WriteLine("Done");
        }

        // Streaming from one SQL Server to Another One
        private static async Task E2EStream(CancellationToken cancellationToken)
        {
            using (SqlConnection readConn = new SqlConnection(connectionString))
            {
                using (SqlConnection writeConn = new SqlConnection(connectionString))
                {

                    // Note that we are using the same cancellation token for calls to both connections\commands
                    // Also we can start both the connection opening asynchronously, and then wait for both to complete
                    Task openReadConn = readConn.OpenAsync(cancellationToken);
                    Task openWriteConn = writeConn.OpenAsync(cancellationToken);
                    await Task.WhenAll(openReadConn, openWriteConn);

                    using (SqlCommand readCmd = new SqlCommand("SELECT [bindata] FROM [BinaryStreams]", readConn))
                    {
                        using (SqlCommand writeCmd = new SqlCommand("INSERT INTO [BinaryStreamsCopy] (bindata) VALUES (@bindata)", writeConn))
                        {

                            // Add an empty parameter to the write command which will be used for the streams we are copying
                            // Size is set to -1 to indicate "MAX"
                            SqlParameter streamParameter = writeCmd.Parameters.Add("@bindata", SqlDbType.Binary, -1);

                            // The reader needs to be executed with the SequentialAccess behavior to enable network streaming
                            // Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions
                            using (SqlDataReader reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken))
                            {
                                while (await reader.ReadAsync(cancellationToken))
                                {
                                    // Grab a stream to the binary data in the source database
                                    using (Stream dataStream = reader.GetStream(0))
                                    {

                                        // Set the parameter value to the stream source that was opened
                                        streamParameter.Value = dataStream;

                                        // Asynchronously send data from one database to another
                                        await writeCmd.ExecuteNonQueryAsync(cancellationToken);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

Zie ook