Condividi tramite


Connettore di Esplora dati di Azure (Kusto) per Apache Spark

Il connettore Esplora dati di Azure (Kusto) per Apache Spark è progettato per trasferire in modo efficiente i dati tra cluster Kusto e Spark. Questo connettore è disponibile in Python, Java e .NET.

Autenticazione

Quando si usano i notebook di Azure Synapse o le definizioni dei processi Apache Spark, l'autenticazione tra i sistemi viene resa semplice con il servizio collegato. Il Servizio token si connette con Microsoft Entra ID per ottenere i token di sicurezza da usare quando si accede al cluster Kusto.

Per le pipeline di Azure Synapse, l'autenticazione usa il nome dell'entità servizio. Attualmente, le identità gestite non sono supportate con il connettore Esplora dati di Azure.

Prerequisiti

Limiti

  • Il servizio collegato Esplora dati di Azure supporta la configurazione solo con il nome dell'entità servizio.
  • All'interno dei notebook di Azure Synapse o delle definizioni dei processi Apache Spark, il connettore Esplora dati di Azure usa il pass-through di Microsoft Entra per connettersi al cluster Kusto.

Usare il connettore Esplora dati di Azure (Kusto)

La sezione seguente fornisce un esempio semplice di come scrivere e leggere i dati da una tabella Kusto. Per informazioni dettagliate, vedere il progetto del connettore Esplora dati di Azure (Kusto).

Leggere i dati

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .load()

display(kustoDf)

È anche possibile eseguire la lettura batch con la modalità di distribuzione forzata e altre opzioni avanzate. Per altre informazioni, è possibile prendere visione delle Informazioni di riferimento sulle opzioni di origine Kusto.

crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .option("clientRequestPropertiesJson", crp.toString()) \
            .option("readMode", 'ForceDistributedMode') \
            .load()

display(kustoDf) 

Scrivere dati

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .mode("Append") \
    .save()

Inoltre, è anche possibile eseguire la scrittura di dati in batch fornendo proprietà di inserimento aggiuntive. Per altre informazioni sulle proprietà di inserimento supportate, è possibile visitare il materiale di riferimento sulle proprietà di inserimento Kusto.

extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none

sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
        False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .option("sparkIngestionPropertiesJson", sp.toString()) \
    .option("tableCreateOptions","CreateIfNotExist") \
    .mode("Append") \
    .save()

Passaggi successivi