Conector do Azure Data Explorer (Kusto) para Apache Spark

O conector do Azure Data Explorer (Kusto) para o Apache Spark foi concebido para transferir dados de forma eficiente entre clusters do Kusto e do Apache Spark. Este conector está disponível em Python, Java e .NET.

Autenticação

Ao usar os Blocos de Anotações do Azure Synapse ou as definições de trabalho do Apache Spark, a autenticação entre sistemas é integrada com o serviço vinculado. O Serviço de Token se conecta com o Microsoft Entra ID para obter tokens de segurança para uso ao acessar o cluster Kusto.

Para o Azure Synapse Pipelines, a autenticação usa o nome da entidade de serviço. Atualmente, as identidades gerenciadas não são suportadas com o conector do Azure Data Explorer.

Pré-requisitos

Limitações

  • O serviço vinculado do Azure Data Explorer só pode ser configurado com o Nome da Entidade de Serviço.
  • Dentro dos Blocos de Anotações do Azure Synapse ou das Definições de Trabalho do Apache Spark, o conector do Azure Data Explorer usa a passagem do Microsoft Entra para se conectar ao Cluster Kusto.

Usar o conector do Azure Data Explorer (Kusto)

A seção a seguir fornece um exemplo simples de como gravar dados em uma tabela Kusto e ler dados de uma tabela Kusto. Consulte o projeto de conector do Azure Data Explorer (Kusto) para obter documentação detalhada.

Ler dados

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .load()

display(kustoDf)

Você também pode ler em lote com o modo de distribuição forçada e outras opções avançadas. Para obter informações adicionais, você pode consultar a referência de opções de origem do Kusto.

crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .option("clientRequestPropertiesJson", crp.toString()) \
            .option("readMode", 'ForceDistributedMode') \
            .load()

display(kustoDf) 

Escrever dados

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .mode("Append") \
    .save()

Além disso, você também pode gravar dados em lote fornecendo propriedades de ingestão adicionais. Para obter mais informações sobre as propriedades de ingestão suportadas, você pode visitar o material de referência das propriedades de ingestão de Kusto.

extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none

sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
        False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .option("sparkIngestionPropertiesJson", sp.toString()) \
    .option("tableCreateOptions","CreateIfNotExist") \
    .mode("Append") \
    .save()

Próximos passos