Compartilhar via


Acelere a análise de Big Data em tempo real usando o conector do Spark

Aplica-se a:Banco de Dados SQL do AzureInstância Gerenciada de SQL do Azure

Observação

A partir de setembro de 2020, esse conector não recebe manutenção ativa. No entanto, o Conector do Apache Spark para SQL Server e SQL do Azure agora estão disponíveis, com suporte para as associações de Python e R, uma interface mais fácil de usar para inserir dados em massa e muitas outras melhorias. É altamente recomendável que você avalie e use o novo conector em vez desse. As informações sobre o conector antigo (esta página) só são mantidas para fins de arquivamento.

O conector do Spark permite que os bancos de dados no Banco de Dados SQL do Azure, na Instância Gerenciada de SQL do Azure e no SQL Server atuem como o coletor de dados de entrada ou de saída para trabalhos do Spark. Ele permite que você utilize dados transacionais em tempo real na análise de Big Data e persiste resultados para relatórios ou consultas ad hoc. Em comparação ao conector JDBC interno, esse conector permite inserir dados em massa em seu banco de dados. Ele pode superar a inserção de linha por linha com desempenho de 10 a 20 vezes mais rápido. O conector do Spark dá suporte à autenticação com o Microsoft Entra ID (antigo Azure Active Directory) para se conectar ao banco de dados SQL do Azure e à Instância Gerenciada de SQL do Azure, permitindo que você conecte seu banco de dados do Azure Databricks usando sua conta do Microsoft Entra. Ele fornece interfaces semelhantes com o conector interno do JDBC. É muito fácil migrar trabalhos do Spark existentes para usar esse novo conector.

Observação

O Microsoft Entra ID era anteriormente conhecido como Azure Active Directory (Azure AD).

Baixar e compilar um conector do Spark

O repositório do GitHub para o conector antigo anteriormente vinculado dessa página não recebe manutenção ativa. Em vez disso, é altamente recomendável avaliar e usar o novo conector.

Versões oficiais compatíveis

Componente Versão
Apache Spark 2.0.2 ou posterior
Scala 2.10 ou posterior
Microsoft JDBC Driver para SQL Server 6.2 ou posterior
Microsoft SQL Server SQL Server 2008 ou posterior
Banco de Dados SQL do Azure Com suporte
Instância Gerenciada do Azure SQL Com suporte

O conector do Spark utiliza o Driver JDBC da Microsoft para SQL Server para mover dados entre bancos de dados e nós de trabalho do Spark:

O fluxo de dados é o seguinte:

  1. O nó mestre Spark se conecta a bancos de dados no Banco de Dados SQL ou no SQL Server e carrega os dados de uma tabela específica ou usando uma consulta SQL específica.
  2. O nó mestre Spark distribui os dados para nós de trabalho para transformação.
  3. O nó de trabalho se conecta a bancos de dados que se conectam ao Banco de Dados SQL e ao SQL Server e gravam os dados no banco de dados. O usuário pode optar por usar a inserção de linha por linha ou em massa.

O diagrama a seguir ilustra o fluxo de dados.

Diagram shows the described flow, with a master node connecting directly to the database and connecting to three worker nodes, which connect to the database.

Compilar o conector do Spark

Atualmente, o projeto do conector usa maven. Para compilar o conector sem dependências, execute:

  • mvn clean package
  • Baixe as versões mais recentes do JAR da pasta release
  • Incluir o JAR do Spark do Banco de Dados SQL

Conectar e ler dados usando o conector do Spark

Você pode se conectar a bancos de dados no Banco de Dados SQL e no SQL Server de um trabalho do Spark para ler ou gravar dados. Você também pode executar uma consulta DML ou DDL em bancos de dados no Banco de Dados SQL e no SQL Server.

Ler dados do SQL do Azure ou do SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Ler dados do SQL do Azure e do SQL Server com a consulta SQL especificada

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

Gravar dados do SQL do Azure ou no SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

Executar a consulta DML ou DDL no SQL do Azure e no SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

Conectar-se no Spark usando a autenticação do Microsoft Entra

Você pode se conectar ao banco de dados SQL e à Instância Gerenciada de SQL usando a autenticação do Microsoft Entra. Use a autenticação do Microsoft Entra para gerenciar centralmente as identidades de usuários do banco de dados e como uma alternativa à autenticação do SQL.

Conectando-se usando o modo de autenticação ActiveDirectoryPassword

Requisitos da instalação

Se você estiver usando o modo de autenticação ActiveDirectoryPassword, será necessário fazer o download de microsoft-authentication-library-for-java e suas dependências, além de incluí-las no caminho de build de Java.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Como conectar-se usando um token de acesso

Requisitos da instalação

Se você estiver usando o modo de autenticação baseado em token de acesso, será necessário fazer o download de microsoft-authentication-library-for-java e suas dependências, além de incluí-las no caminho de build de Java.

Confira Usar a autenticação do Microsoft Entra para saber como obter um token de acesso para seu banco de dados no banco de dados SQL do Azure ou na Instância Gerenciada de SQL do Azure.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Gravar dados usando a inserção em massa

O conector do jdbc tradicional grava dados no banco de dados usando a inserção linha por linha. Você pode usar o conector do Spark para gravar dados no SQL do Azure e no SQL Server usando a inserção em massa. Ele melhora significativamente o desempenho de gravação durante o carregamento de grandes conjuntos de dados ou ao carregar dados em tabelas em que um índice columnstore é usado.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

Próximas etapas

Se ainda não tiver feito isso, baixe o conector do Spark do repositório do GitHub azure-sqldb-spark e explore os recursos adicionais no repositório:

Pode ser útil também ler o Guia do Apache Spark SQL, DataFrames e Conjuntos de dados e a documentação do Azure Databricks.