Compartilhar via


Suporte a streaming no SqlClient

O suporte a streaming entre o SQL Server e um aplicativo (novo no .NET Framework 4.5) aplica-se a dados não estruturados no servidor (documentos, imagens e arquivos de mídia). Um banco de dados do SQL Server pode armazenar BLOBs (objetos binários grandes), mas a recuperação de BLOBS pode usar muita memória.

O suporte a streaming no SQL Server simplifica a escrita de aplicativos que fazem streaming de dados, sem precisar carregar completamente os dados na memória, resultando em menos exceções de estouro de memória.

O suporte a streaming também habilitará aplicativos da camada intermediária para dimensionar melhor, especialmente em cenários onde os objetos comerciais se conectam ao SQL Azure para enviar, recuperar e manipular BLOBs grandes.

Aviso

As chamadas assíncronas não têm suporte se um aplicativo também usa a palavra-chave da cadeia de conexão Context Connection.

Os membros adicionados para dar suporte a streaming são usados para recuperar dados de consultas e passar parâmetros para consultas e procedimentos armazenados. O recurso de streaming aborda os cenários básicos de OLTP e migração de dados e é aplicável a ambientes de migração de dados locais e externos.

Suporte a streaming do SQL Server

O suporte a streaming do SQL Server apresenta uma nova funcionalidade no DbDataReader e nas classes SqlDataReader para obter os objetos Stream, XmlReader e TextReader e fornecer uma resposta a eles. Essas classes são usadas para recuperar dados de consultas. Como resultado, o suporte a streaming do SQL Server aborda cenários de OLTP e aplica-se a ambientes locais e externos.

Os seguintes membros foram adicionados ao SqlDataReader para habilitar o suporte a streaming do SQL Server:

Os seguintes membros foram adicionados ao DbDataReader para habilitar o suporte a streaming do SQL Server:

Suporte a streaming para o SQL Server

O suporte a streaming ao SQL Server introduz uma nova funcionalidade na classe SqlParameter para poder aceitar e reagir com objetos XmlReader, Stream e TextReader. SqlParameter é usado para transmitir parâmetros para consultas e procedimentos armazenados.

Descartar um objeto SqlCommand ou chamar Cancel deve cancelar qualquer operação de streaming. Se um aplicativo enviar CancellationToken, o cancelamento não será garantido.

Os seguintes tipos de SqlDbType aceitarão um Value de Stream:

  • Binary
  • VarBinary

Os seguintes tipos de SqlDbType aceitarão um Value de TextReader:

  • Char
  • NChar
  • NVarChar
  • Xml

O tipo XmlSqlDbType aceitará um Value de XmlReader.

SqlValue pode aceitar valores do tipo XmlReader, TextReader e Stream.

O objeto XmlReader, TextReader e Stream serão transferidos até atingir o valor definido pelo Size.

Exemplo: streaming do SQL Server

Use o seguinte Transact-SQL para criar o banco de dados de exemplo:

CREATE DATABASE [Demo]
GO
USE [Demo]
GO
CREATE TABLE [Streams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX),
[bindata] VARBINARY(MAX),
[xmldata] XML)
GO
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'This is a test', 0x48656C6C6F, N'<test>value</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Hello, World!', 0x54657374696E67, N'<test>value2</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Another row', 0x666F6F626172, N'<fff>bbb</fff><fff>bbc</fff>')
GO

O exemplo a seguir mostra como fazer o seguinte:

  • Evite fechar um thread da interface do usuário fornecendo uma maneira assíncrona de recuperar arquivos grandes.
  • Transferir um arquivo de texto grande do SQL Server no .NET Framework 4.5.
  • Transferir um arquivo XML grande do SQL Server no .NET Framework 4.5.
  • Recuperar dados do SQL Server.
  • Transferir arquivos grandes (BLOBs) de um banco de dados do SQL Server para outro sem ficar sem memória.
using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading.Tasks;
using System.Xml;

namespace StreamingFromServer {
   class Program {
      private const string connectionString = @"...";

      static void Main(string[] args) {
         CopyBinaryValueToFile().Wait();
         PrintTextValues().Wait();
         PrintXmlValues().Wait();
         PrintXmlValuesViaNVarChar().Wait();

         Console.WriteLine("Done");
      }

      // Retrieve a large BLOB from SQL Server in .NET Framework 4.5 using the asynchronous capability.
      private static async Task CopyBinaryValueToFile() {
         string filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments), "binarydata.bin");

         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();
            using (SqlCommand command = new SqlCommand("SELECT [bindata] FROM [Streams] WHERE [id]=@id", connection)) {
               command.Parameters.AddWithValue("id", 1);

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  if (await reader.ReadAsync()) {
                     if (!(await reader.IsDBNullAsync(0))) {
                        using (FileStream file = new FileStream(filePath, FileMode.Create, FileAccess.Write)) {
                           using (Stream data = reader.GetStream(0)) {

                              // Asynchronously copy the stream from the server to the file we just created.
                              await data.CopyToAsync(file);
                           }
                        }
                     }
                  }
               }
            }
         }
      }

      // Transfer a large Text File from SQL Server in .NET Framework 4.5.
      private static async Task PrintTextValues() {
         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();
            using (SqlCommand command = new SqlCommand("SELECT [id], [textdata] FROM [Streams]", connection)) {

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire text document into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  while (await reader.ReadAsync()) {
                     Console.Write("{0}: ", reader.GetInt32(0));

                     if (await reader.IsDBNullAsync(1)) {
                        Console.Write("(NULL)");
                     }
                     else {
                        char[] buffer = new char[4096];
                        int charsRead = 0;
                        using (TextReader data = reader.GetTextReader(1)) {
                           do {
                              // Grab each chunk of text and write it to the console.
                              // If you are writing to a TextWriter, you should use WriteAsync or WriteLineAsync.
                              charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
                              Console.Write(buffer, 0, charsRead);
                           } while (charsRead > 0);
                        }
                     }

                     Console.WriteLine();
                  }
               }
            }
         }
      }

      // Transfer a large Xml Document from SQL Server in .NET Framework 4.5.
      private static async Task PrintXmlValues() {
         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();
            using (SqlCommand command = new SqlCommand("SELECT [id], [xmldata] FROM [Streams]", connection)) {

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  while (await reader.ReadAsync()) {
                     Console.WriteLine("{0}: ", reader.GetInt32(0));

                     if (await reader.IsDBNullAsync(1)) {
                        Console.WriteLine("\t(NULL)");
                     }
                     else {
                        using (XmlReader xmlReader = reader.GetXmlReader(1)) {
                           int depth = 1;
                           // NOTE: The XmlReader returned by GetXmlReader does NOT support async operations.
                           // See the example below (PrintXmlValuesViaNVarChar) for how to get an XmlReader with asynchronous capabilities.
                           while (xmlReader.Read()) {
                              switch (xmlReader.NodeType) {
                                 case XmlNodeType.Element:
                                    Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
                                    depth++;
                                    break;
                                 case XmlNodeType.Text:
                                    Console.WriteLine("{0}{1}", new string('\t', depth), xmlReader.Value);
                                    break;
                                 case XmlNodeType.EndElement:
                                    depth--;
                                    Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
                                    break;
                              }
                           }
                        }
                     }
                  }
               }
            }
         }
      }

      // Transfer a large Xml Document from SQL Server in .NET Framework 4.5.
      // This goes via NVarChar and TextReader to enable asynchronous reading.
      private static async Task PrintXmlValuesViaNVarChar() {
         XmlReaderSettings xmlSettings = new XmlReaderSettings() {
            // Async must be explicitly enabled in the XmlReaderSettings otherwise the XmlReader will throw exceptions when async methods are called.
            Async = true,
            // Since we will immediately wrap the TextReader we are creating in an XmlReader, we will permit the XmlReader to take care of closing\disposing it.
            CloseInput = true,
            // If the Xml you are reading is not a valid document (as per <https://learn.microsoft.com/previous-versions/dotnet/netframework-4.0/6bts1x50(v=vs.100)>) you will need to set the conformance level to Fragment.
            ConformanceLevel = ConformanceLevel.Fragment
         };

         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();

            // Cast the XML into NVarChar to enable GetTextReader - trying to use GetTextReader on an XML type will throw an exception.
            using (SqlCommand command = new SqlCommand("SELECT [id], CAST([xmldata] AS NVARCHAR(MAX)) FROM [Streams]", connection)) {

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  while (await reader.ReadAsync()) {
                     Console.WriteLine("{0}:", reader.GetInt32(0));

                     if (await reader.IsDBNullAsync(1)) {
                        Console.WriteLine("\t(NULL)");
                     }
                     else {
                        // Grab the row as a TextReader, then create an XmlReader on top of it.
                        // The code doesn't keep a reference to the TextReader since the XmlReader is created with the "CloseInput" setting (so it will close the TextReader when needed).
                        using (XmlReader xmlReader = XmlReader.Create(reader.GetTextReader(1), xmlSettings)) {
                           int depth = 1;
                           // The XmlReader above now supports asynchronous operations, so we can use ReadAsync here.
                           while (await xmlReader.ReadAsync()) {
                              switch (xmlReader.NodeType) {
                                 case XmlNodeType.Element:
                                    Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
                                    depth++;
                                    break;
                                 case XmlNodeType.Text:
                                    // Depending on what your data looks like, you should either use Value or GetValueAsync.
                                    // Value has less overhead (since it doesn't create a Task), but it may also block if additional data is required.
                                    Console.WriteLine("{0}{1}", new string('\t', depth), await xmlReader.GetValueAsync());
                                    break;
                                 case XmlNodeType.EndElement:
                                    depth--;
                                    Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
                                    break;
                              }
                           }
                        }
                     }
                  }
               }
            }
         }
      }
   }
}

Exemplo: streaming para o SQL Server

Use o seguinte Transact-SQL para criar o banco de dados de exemplo:

CREATE DATABASE [Demo2]
GO
USE [Demo2]
GO
CREATE TABLE [BinaryStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
CREATE TABLE [TextStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX))
GO
CREATE TABLE [BinaryStreamsCopy] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO

O exemplo a seguir mostra como fazer o seguinte:

  • Transfira um BLOB grande para o SQL Server no .NET Framework 4.5.
  • Transfira um arquivo de texto grande para o SQL Server no .NET Framework 4.5.
  • Use o novo recurso assíncrono para transferir um BLOB grande.
  • Use o novo recurso assíncrono e a palavra-chave await para transferir um BLOB grande.
  • Cancele a transferência de um BLOB grande.
  • Transmita de um SQL Server para outro usando o novo recurso assíncrono.
using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace StreamingToServer {
   class Program {
      private const string connectionString = @"...";

      static void Main(string[] args) {
         CreateDemoFiles();

         StreamBLOBToServer().Wait();
         StreamTextToServer().Wait();

         // Create a CancellationTokenSource that will be cancelled after 100ms
         // Typically this token source will be cancelled by a user request (e.g. a Cancel button).
         CancellationTokenSource tokenSource = new CancellationTokenSource();
         tokenSource.CancelAfter(100);
         try {
            CancelBLOBStream(tokenSource.Token).Wait();
         }
         catch (AggregateException ex) {
            // Cancelling an async operation will throw an exception.
            // Since we are using the Task's Wait method, this exception will be wrapped in an AggregateException.
            // If you were using the 'await' keyword, the compiler would take care of unwrapping the AggregateException.
            // Depending on when the cancellation occurs, you can either get an error from SQL Server or from .Net.
            if ((ex.InnerException is SqlException) || (ex.InnerException is TaskCanceledException)) {
               // This is an expected exception.
               Console.WriteLine("Got expected exception: {0}", ex.InnerException.Message);
            }
            else {
               // Did not expect this exception - rethrow it.
               throw;
            }
         }

         Console.WriteLine("Done");
      }

      // This is used to generate the files which are used by the other sample methods.
      private static void CreateDemoFiles() {
         Random rand = new Random();
         byte[] data = new byte[1024];
         rand.NextBytes(data);

         using (FileStream file = File.Open("binarydata.bin", FileMode.Create)) {
            file.Write(data, 0, data.Length);
         }

         using (StreamWriter writer = new StreamWriter(File.Open("textdata.txt", FileMode.Create))) {
            writer.Write(Convert.ToBase64String(data));
         }
      }

      // Transfer a large BLOB to SQL Server in .NET Framework 4.5.
      private static async Task StreamBLOBToServer() {
         using (SqlConnection conn = new SqlConnection(connectionString)) {
            await conn.OpenAsync();
            using (SqlCommand cmd = new SqlCommand("INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn)) {
               using (FileStream file = File.Open("binarydata.bin", FileMode.Open)) {

                  // Add a parameter which uses the FileStream we just opened
                  // Size is set to -1 to indicate "MAX".
                  cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;

                  // Send the data to the server asynchronously.
                  await cmd.ExecuteNonQueryAsync();
               }
            }
         }
      }

      // Transfer a large Text File to SQL Server in .NET Framework 4.5.
      private static async Task StreamTextToServer() {
         using (SqlConnection conn = new SqlConnection(connectionString)) {
            await conn.OpenAsync();
            using (SqlCommand cmd = new SqlCommand("INSERT INTO [TextStreams] (textdata) VALUES (@textdata)", conn)) {
               using (StreamReader file = File.OpenText("textdata.txt")) {

                  // Add a parameter which uses the StreamReader we just opened.
                  // Size is set to -1 to indicate "MAX".
                  cmd.Parameters.Add("@textdata", SqlDbType.NVarChar, -1).Value = file;

                  // Send the data to the server asynchronously.
                  await cmd.ExecuteNonQueryAsync();
               }
            }
         }
      }

      // Cancel the transfer of a large BLOB.
      private static async Task CancelBLOBStream(CancellationToken cancellationToken) {
         using (SqlConnection conn = new SqlConnection(connectionString)) {
            // We can cancel not only sending the data to the server, but also opening the connection.
            await conn.OpenAsync(cancellationToken);

            // Artificially delay the command by 100ms.
            using (SqlCommand cmd = new SqlCommand("WAITFOR DELAY '00:00:00:100';INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn)) {
               using (FileStream file = File.Open("binarydata.bin", FileMode.Open)) {

                  // Add a parameter which uses the FileStream we just opened.
                  // Size is set to -1 to indicate "MAX".
                  cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;

                  // Send the data to the server asynchronously.
                  // Pass the cancellation token such that the command will be cancelled if needed.
                  await cmd.ExecuteNonQueryAsync(cancellationToken);
               }
            }
         }
      }
   }
}

Exemplo: streaming de um SQL Server para outro SQL Server

Este exemplo demonstra como transmitir de modo assíncrono um BLOB grande de um SQL Server para outro, com suporte para cancelamento.

using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace StreamingFromServerToAnother {
   class Program {
      private const string connectionString = @"...";

      static void Main(string[] args) {
         // For this example, we don't want to cancel,
         // so pass in a "blank" cancellation token.
         E2EStream(CancellationToken.None).Wait();

         Console.WriteLine("Done");
      }

      // Streaming from one SQL Server to Another One using the new Async.NET.
      private static async Task E2EStream(CancellationToken cancellationToken) {
         using (SqlConnection readConn = new SqlConnection(connectionString)) {
            using (SqlConnection writeConn = new SqlConnection(connectionString)) {

               // Note that we are using the same cancellation token for calls to both connections\commands.
               // Also we can start both the connection opening asynchronously, and then wait for both to complete.
               Task openReadConn = readConn.OpenAsync(cancellationToken);
               Task openWriteConn = writeConn.OpenAsync(cancellationToken);
               await Task.WhenAll(openReadConn, openWriteConn);

               using (SqlCommand readCmd = new SqlCommand("SELECT [bindata] FROM [BinaryStreams]", readConn)) {
                  using (SqlCommand writeCmd = new SqlCommand("INSERT INTO [BinaryStreamsCopy] (bindata) VALUES (@bindata)", writeConn)) {

                     // Add an empty parameter to the write command which will be used for the streams we are copying.
                     // Size is set to -1 to indicate "MAX".
                     SqlParameter streamParameter = writeCmd.Parameters.Add("@bindata", SqlDbType.Binary, -1);

                     // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
                     // Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions.
                     using (SqlDataReader reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)) {
                        while (await reader.ReadAsync(cancellationToken)) {
                           // Grab a stream to the binary data in the source database.
                           using (Stream dataStream = reader.GetStream(0)) {

                              // Set the parameter value to the stream source that was opened.
                              streamParameter.Value = dataStream;

                              // Asynchronously send data from one database to another.
                              await writeCmd.ExecuteNonQueryAsync(cancellationToken);
                           }
                        }
                     }
                  }
               }
            }
         }
      }
   }
}

Confira também