Conector do Azure Data Explorer (Kusto) para Apache Spark
O conector do Azure Data Explorer (Kusto) para o Apache Spark foi concebido para transferir dados de forma eficiente entre clusters do Kusto e do Apache Spark. Este conector está disponível em Python, Java e .NET.
Autenticação
Ao usar os Blocos de Anotações do Azure Synapse ou as definições de trabalho do Apache Spark, a autenticação entre sistemas é integrada com o serviço vinculado. O Serviço de Token se conecta com o Microsoft Entra ID para obter tokens de segurança para uso ao acessar o cluster Kusto.
Para o Azure Synapse Pipelines, a autenticação usa o nome da entidade de serviço. Atualmente, as identidades gerenciadas não são suportadas com o conector do Azure Data Explorer.
Pré-requisitos
- Conectar-se ao Azure Data Explorer: você precisa configurar um Serviço Vinculado para se conectar a um cluster Kusto existente.
Limitações
- O serviço vinculado do Azure Data Explorer só pode ser configurado com o Nome da Entidade de Serviço.
- Dentro dos Blocos de Anotações do Azure Synapse ou das Definições de Trabalho do Apache Spark, o conector do Azure Data Explorer usa a passagem do Microsoft Entra para se conectar ao Cluster Kusto.
Usar o conector do Azure Data Explorer (Kusto)
A seção a seguir fornece um exemplo simples de como gravar dados em uma tabela Kusto e ler dados de uma tabela Kusto. Consulte o projeto de conector do Azure Data Explorer (Kusto) para obter documentação detalhada.
Ler dados
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.load()
display(kustoDf)
Você também pode ler em lote com o modo de distribuição forçada e outras opções avançadas. Para obter informações adicionais, você pode consultar a referência de opções de origem do Kusto.
crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.option("clientRequestPropertiesJson", crp.toString()) \
.option("readMode", 'ForceDistributedMode') \
.load()
display(kustoDf)
Escrever dados
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.mode("Append") \
.save()
Além disso, você também pode gravar dados em lote fornecendo propriedades de ingestão adicionais. Para obter mais informações sobre as propriedades de ingestão suportadas, você pode visitar o material de referência das propriedades de ingestão de Kusto.
extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none
sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.option("sparkIngestionPropertiesJson", sp.toString()) \
.option("tableCreateOptions","CreateIfNotExist") \
.mode("Append") \
.save()