共用方式為


適用於 Apache Spark 的 Azure 資料總管 (Kusto) 連接器

Apache Spark 的 Azure Data Explorer(Kusto)連接器設計用來有效地在 Kusto 叢集與 Spark 之間傳輸資料。 此連接器支援 Python、Java 及 .NET 版本。

驗證

使用 Azure Synapse Notebooks 或 Apache Spark 工作定義時,系統間的認證透過連結服務實現無縫。 Token Service 會與 Microsoft Entra ID 連接,取得安全權杖,用於存取 Kusto 叢集。

對於 Azure Synapse 管線,認證使用服務主體名稱。 目前,Azure Data Explorer 連接器不支援受管理身份。

先決條件

局限性

  • Azure Data Explorer 連結的服務只能以服務主體名稱來設定。
  • 在 Azure Synapse Notebooks 或 Apache Spark 工作定義中,Azure Data Explorer 連接器使用 Microsoft Entra 直通方式連接 Kusto 叢集。

使用 Azure Data Explorer(Kusto)連接器

以下章節提供一個簡單的範例,說明如何將資料寫入 Kusto 資料表並從 Kusto 資料表讀取資料。 詳細文件請參閱 Azure Data Explorer(Kusto)連接器專案

讀取資料

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .load()

display(kustoDf)

你也可以用強制分配模式和其他進階選項批量閱讀。 如需更多資訊,可以參考 Kusto 來源選項參考

crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .option("clientRequestPropertiesJson", crp.toString()) \
            .option("readMode", 'ForceDistributedMode') \
            .load()

display(kustoDf) 

寫入資料

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .mode("Append") \
    .save()

此外,你也可以透過提供額外的擷取屬性來批次寫入資料。 欲了解更多支持的攝取特性資訊,可參考 Kusto攝取特性參考資料

extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none

sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
        False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .option("sparkIngestionPropertiesJson", sp.toString()) \
    .option("tableCreateOptions","CreateIfNotExist") \
    .mode("Append") \
    .save()

下一步