Apache Spark 的 Azure Data Explorer(Kusto)連接器設計用來有效地在 Kusto 叢集與 Spark 之間傳輸資料。 此連接器支援 Python、Java 及 .NET 版本。
驗證
使用 Azure Synapse Notebooks 或 Apache Spark 工作定義時,系統間的認證透過連結服務實現無縫。 Token Service 會與 Microsoft Entra ID 連接,取得安全權杖,用於存取 Kusto 叢集。
對於 Azure Synapse 管線,認證使用服務主體名稱。 目前,Azure Data Explorer 連接器不支援受管理身份。
先決條件
- 連接 Azure 資料總管:你需要設定一個連結服務來連接到現有的 Kusto 叢集。
局限性
- Azure Data Explorer 連結的服務只能以服務主體名稱來設定。
- 在 Azure Synapse Notebooks 或 Apache Spark 工作定義中,Azure Data Explorer 連接器使用 Microsoft Entra 直通方式連接 Kusto 叢集。
使用 Azure Data Explorer(Kusto)連接器
以下章節提供一個簡單的範例,說明如何將資料寫入 Kusto 資料表並從 Kusto 資料表讀取資料。 詳細文件請參閱 Azure Data Explorer(Kusto)連接器專案 。
讀取資料
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.load()
display(kustoDf)
你也可以用強制分配模式和其他進階選項批量閱讀。 如需更多資訊,可以參考 Kusto 來源選項參考。
crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.option("clientRequestPropertiesJson", crp.toString()) \
.option("readMode", 'ForceDistributedMode') \
.load()
display(kustoDf)
寫入資料
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.mode("Append") \
.save()
此外,你也可以透過提供額外的擷取屬性來批次寫入資料。 欲了解更多支持的攝取特性資訊,可參考 Kusto攝取特性參考資料。
extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none
sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.option("sparkIngestionPropertiesJson", sp.toString()) \
.option("tableCreateOptions","CreateIfNotExist") \
.mode("Append") \
.save()