適用於 Apache Spark 的 Azure 資料總管 (Kusto) 連接器是設計用來在 Kusto 叢集和 Spark 之間有效率地傳輸資料。 此連接器可用於 Python、JAVA 和 .NET。
驗證
使用 Azure Synapse Notebooks 或 Apache Spark 作業定義時,系統之間的驗證會與連結的服務順暢地進行。 權杖服務會與 Microsoft Entra ID 連線,以取得存取 Kusto 叢集時要使用的安全性權杖。
針對 Azure Synapse Pipelines,驗證會使用服務主體名稱。 目前,Azure 資料總管連接器不支援受控識別。
必要條件
- 連線至 Azure 資料總 管:您必須設定連結服務以連線到現有的 Kusto 叢集。
限制
- Azure 資料總管連結服務只能使用服務主體名稱進行設定。
- 在 Azure Synapse Notebooks 或 Apache Spark 作業定義中,Azure 資料總管連接器會使用 Microsoft Entra 傳遞來連線到 Kusto 叢集。
使用 Azure 資料總管 (Kusto) 連接器
下一節提供一個簡單的範例,說明如何將資料寫入 Kusto 資料表,並從 Kusto 資料表讀取資料。 如需詳細檔,請參閱 Azure 資料總管 (Kusto) 連接器專案 。
讀取資料
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.load()
display(kustoDf)
您也可以使用強制散發模式和其他進階選項批次讀取。 如需詳細資訊,您可以參考 Kusto 來源選項參考 。
crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.option("clientRequestPropertiesJson", crp.toString()) \
.option("readMode", 'ForceDistributedMode') \
.load()
display(kustoDf)
寫入資料
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.mode("Append") \
.save()
此外,您也可以藉由提供額外的擷取屬性來批次寫入資料。 如需所支援擷取屬性的詳細資訊,您可以流覽 Kusto 擷取屬性參考資料 。
extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none
sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.option("sparkIngestionPropertiesJson", sp.toString()) \
.option("tableCreateOptions","CreateIfNotExist") \
.mode("Append") \
.save()