共用方式為


適用於 Apache Spark 的 Azure 資料總管 (Kusto) 連接器

適用於 Apache Spark 的 Azure 資料總管 (Kusto) 連接器是設計用來在 Kusto 叢集和 Spark 之間有效率地傳輸資料。 此連接器可用於 Python、JAVA 和 .NET。

驗證

使用 Azure Synapse Notebooks 或 Apache Spark 作業定義時,系統之間的驗證會與連結的服務順暢地進行。 權杖服務會與 Microsoft Entra ID 連線,以取得存取 Kusto 叢集時要使用的安全性權杖。

針對 Azure Synapse Pipelines,驗證會使用服務主體名稱。 目前,Azure 資料總管連接器不支援受控識別。

必要條件

限制

  • Azure 資料總管連結服務只能使用服務主體名稱進行設定。
  • 在 Azure Synapse Notebooks 或 Apache Spark 作業定義中,Azure 資料總管連接器會使用 Microsoft Entra 傳遞來連線到 Kusto 叢集。

使用 Azure 資料總管 (Kusto) 連接器

下一節提供一個簡單的範例,說明如何將資料寫入 Kusto 資料表,並從 Kusto 資料表讀取資料。 如需詳細檔,請參閱 Azure 資料總管 (Kusto) 連接器專案

讀取資料

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .load()

display(kustoDf)

您也可以使用強制散發模式和其他進階選項批次讀取。 如需詳細資訊,您可以參考 Kusto 來源選項參考

crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .option("clientRequestPropertiesJson", crp.toString()) \
            .option("readMode", 'ForceDistributedMode') \
            .load()

display(kustoDf) 

寫入資料

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .mode("Append") \
    .save()

此外,您也可以藉由提供額外的擷取屬性來批次寫入資料。 如需所支援擷取屬性的詳細資訊,您可以流覽 Kusto 擷取屬性參考資料

extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none

sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
        False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .option("sparkIngestionPropertiesJson", sp.toString()) \
    .option("tableCreateOptions","CreateIfNotExist") \
    .mode("Append") \
    .save()

下一步