你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 Apache Spark 的 Azure 数据资源管理器 (Kusto) 连接器
适用于 Apache Spark 的 Azure 数据资源管理器 (Kusto) 连接器旨在有效地在 Kusto 群集和 Spark 之间传输数据。 此连接器在 Python、Java 和 .NET 中可用。
身份验证
使用 Azure Synapse Notebooks 或 Apache Spark 作业定义时,系统之间的身份验证与链接服务无缝连接。 令牌服务与 Microsoft Entra ID 进行连接来获取在访问 Kusto 群集时使用的安全令牌。
对于 Azure Synapse Pipelines,身份验证使用服务主体名称。 目前,Azure 数据资源管理器连接器不支持托管标识。
先决条件
- 连接到 Azure 数据资源管理器:需要设置链接服务以连接到现有 Kusto 群集。
限制
- 只能使用服务主体名称来配置 Azure 数据资源管理器链接服务。
- 在 Azure Synapse Notebooks 或 Apache Spark 作业定义中,Azure 数据资源管理器连接器使用 Microsoft Entra 直通连接到 Kusto 群集。
使用 Azure 数据资源管理器 (Kusto) 连接器
以下部分提供了一个简单的示例,说明如何将数据写入 Kusto 表以及如何从 Kusto 表读取数据。 有关详细文档,请参阅 Azure 数据资源管理器 (Kusto) 连接器项目。
读取数据
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.load()
display(kustoDf)
还可以使用强制分发模式和其他高级选项进行批量读取。 有关其他信息,请参阅 Kusto 源选项参考。
crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.option("clientRequestPropertiesJson", crp.toString()) \
.option("readMode", 'ForceDistributedMode') \
.load()
display(kustoDf)
写入数据
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.mode("Append") \
.save()
此外,还可以通过提供其他引入属性来批量写入数据。 有关支持的引入属性的详细信息,请访问 Kusto 引入属性参考资料。
extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none
sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.option("sparkIngestionPropertiesJson", sp.toString()) \
.option("tableCreateOptions","CreateIfNotExist") \
.mode("Append") \
.save()