evento
31/03, 23 - 2/04, 23
O maior evento de aprendizagem de Malha, Power BI e SQL. 31 de março a 2 de abril. Use o código FABINSIDER para economizar $400.
Registe-se hoje mesmoEste browser já não é suportado.
Atualize para o Microsoft Edge para tirar partido das mais recentes funcionalidades, atualizações de segurança e de suporte técnico.
Importante
Este conector pode ser utilizado na Análise em Tempo Real no Microsoft Fabric. Utilize as instruções neste artigo com as seguintes exceções:
O Apache Spark é um motor de análise unificado para processamento de dados em larga escala. O Azure Data Explorer é um serviço de análise de dados rápido e totalmente gerido que permite realizar análises em tempo real em grandes volumes de dados.
O conector do Azure Data Explorer para o Spark é um projeto open source que pode ser executado em qualquer cluster do Spark. Implementa a origem de dados e o sink de dados para mover dados em clusters do Azure Data Explorer e do Spark. Com o Azure Data Explorer e o Apache Spark, pode criar aplicações rápidas e dimensionáveis direcionadas para cenários orientados por dados. Por exemplo, machine learning (ML), Extract-Transform-Load (ETL) e Log Analytics. Com o conector, o Azure Data Explorer torna-se um arquivo de dados válido para operações padrão de origem e sink do Spark, como escrita, leitura e writeStream.
Pode escrever no Azure Data Explorer através da ingestão em fila ou da ingestão de transmissão em fluxo. A leitura do Azure Data Explorer suporta a eliminação de colunas e o pushdown predicado, que filtra os dados no Azure Data Explorer, reduzindo o volume de dados transferidos.
Nota
Para obter informações sobre como trabalhar com o conector do Synapse Spark para o Azure Data Explorer, consulte Ligar ao Azure Data Explorer com o Apache Spark para Azure Synapse Analytics.
Este tópico descreve como instalar e configurar o conector do Azure Data Explorer Spark e mover dados entre clusters do Azure Data Explorer e do Apache Spark.
Nota
Embora alguns dos exemplos abaixo se refira a um cluster do Azure Databricks Spark, o conector do Azure Data Explorer Spark não assume dependências diretas no Databricks ou em qualquer outra distribuição do Spark.
Gorjeta
As versões do Spark 2.3.x também são suportadas, mas podem exigir algumas alterações nas dependências pom.xml.
A partir da versão 2.3.0, introduzimos novos IDs de artefactos que substituem spark-kusto-connector: kusto-spark_3.0_2.12 destinados ao Spark 3.x e Scala 2.12 e kusto-spark_2.4_2.11 destinados ao Spark 2.4.x e scala 2.11.
Nota
As versões anteriores à versão 2.5.1 já não funcionam para ingerir numa tabela existente. Atualize para uma versão posterior. Este passo é opcional. Se estiver a utilizar bibliotecas pré-criadas, por exemplo, o Maven, veja Configuração do cluster do Spark.
Se não estiver a utilizar bibliotecas pré-criadas, terá de instalar as bibliotecas listadas em dependências , incluindo as seguintes bibliotecas do SDK Java kusto . Para encontrar a versão certa para instalar, procure o pom da versão relevante:
Veja esta origem para criar o Conector spark.
Para aplicações Scala/Java com definições de projeto do Maven, ligue a sua aplicação ao seguinte artefacto (a versão mais recente pode ser diferente):
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>kusto-spark_3.0_2.12</artifactId>
<version>2.5.1</version>
</dependency>
Para criar o jar e executar todos os testes:
mvn clean package
Para criar o jar, execute todos os testes e instale o jar no repositório maven local:
mvn clean install
Para obter mais informações, veja Utilização do conector.
Nota
Recomenda-se que utilize a versão mais recente do conector do Azure Data Explorer Spark ao executar os seguintes passos.
Configure as seguintes definições de cluster do Spark, com base no cluster do Azure Databricks com o Spark 2.4.4 e o Scala 2.11 ou Spark 3.0.1 e Scala 2.12:
Instale a biblioteca spark-kusto-connector mais recente a partir do Maven:
Verifique se todas as bibliotecas necessárias estão instaladas:
Para instalação com um ficheiro JAR, verifique se foram instaladas dependências adicionais:
O conector do Azure Data Explorer Spark permite-lhe autenticar com Microsoft Entra ID com um dos seguintes métodos:
Microsoft Entra autenticação de aplicações é o método de autenticação mais simples e comum e é recomendado para o conector do Azure Data Explorer Spark.
Propriedades | Cadeia de Opções | Description |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra identificador da aplicação (cliente). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra autoridade de autenticação. Microsoft Entra ID do Diretório (inquilino). Opcional – predefinição para microsoft.com. Para obter mais informações, veja Microsoft Entra autoridade. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra chave da aplicação para o cliente. |
Nota
As versões da API mais antigas (menos de 2.0.0) têm a seguinte nomenclatura: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Conceda os seguintes privilégios num cluster do Azure Data Explorer:
Para obter mais informações sobre as funções principais do Azure Data Explorer, veja Controlo de acesso baseado em funções. Para gerir funções de segurança, veja Gestão de funções de segurança.
Configurar parâmetros de sink:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
val appId = KustoSparkTestAppId
val appKey = KustoSparkTestAppKey
val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
val cluster = "Sparktest.eastus2"
val database = "TestDb"
val table = "StringAndIntTable"
Escreva DataFrame do Spark no cluster do Azure Data Explorer como lote:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
.option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
.option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
.mode(SaveMode.Append)
.save()
Em alternativa, utilize a sintaxe simplificada:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Escrever dados de transmissão em fluxo:
import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.streaming.Trigger
// Set up a checkpoint and disable codeGen.
spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
// Write to a Kusto table from a streaming source
val kustoQ = df
.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(conf)
.trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
.start()
Ao ler pequenas quantidades de dados, defina a consulta de dados:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import com.microsoft.azure.kusto.data.ClientRequestProperties
val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
val conf: Map[String, String] = Map(
KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
)
val df = spark.read.format("com.microsoft.kusto.spark.datasource").
options(conf).
option(KustoSourceOptions.KUSTO_QUERY, query).
option(KustoSourceOptions.KUSTO_DATABASE, database).
option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
load()
// Simplified syntax flavor
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
val cpr: Option[ClientRequestProperties] = None // Optional
val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
display(df2)
Opcional: se fornecer o armazenamento transitório de blobs (e não o Azure Data Explorer), os blobs são criados sob a responsabilidade do autor da chamada. Isto inclui o aprovisionamento do armazenamento, a rotação de chaves de acesso e a eliminação de artefactos transitórios. O módulo KustoBlobStorageUtils contém funções auxiliares para eliminar blobs com base em coordenadas de conta e contentor e credenciais de conta ou num URL de SAS completo com permissões de escrita, leitura e lista. Quando o RDD correspondente já não for necessário, cada transação armazena artefactos de blobs transitórios num diretório separado. Este diretório é capturado como parte de registos de informações de transação de leitura comunicados no nó Do Controlador do Spark.
// Use either container/account-key/account name, or container SaS
val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
// val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
No exemplo acima, o Key Vault não é acedido através da interface do conector; é utilizado um método mais simples de utilização dos segredos do Databricks.
Leia a partir do Azure Data Explorer.
Se fornecer o armazenamento de blobs transitório, leia a partir do Azure Data Explorer da seguinte forma:
val conf3 = Map(
KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
val dfFiltered = df2
.where(df2.col("ColA").startsWith("row-2"))
.filter("ColB > 12")
.filter("ColB <= 21")
.select("ColA")
display(dfFiltered)
Se o Azure Data Explorer fornecer o armazenamento transitório de blobs, leia a partir do Azure Data Explorer da seguinte forma:
val conf3 = Map(
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
val dfFiltered = df2
.where(df2.col("ColA").startsWith("row-2"))
.filter("ColB > 12")
.filter("ColB <= 21")
.select("ColA")
display(dfFiltered)
evento
31/03, 23 - 2/04, 23
O maior evento de aprendizagem de Malha, Power BI e SQL. 31 de março a 2 de abril. Use o código FABINSIDER para economizar $400.
Registe-se hoje mesmo