你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Apache Spark 的 Azure 数据资源管理器 (Kusto) 连接器

适用于 Apache Spark 的 Azure 数据资源管理器 (Kusto) 连接器旨在有效地在 Kusto 群集和 Spark 之间传输数据。 此连接器在 Python、Java 和 .NET 中可用。

身份验证

使用 Azure Synapse Notebooks 或 Apache Spark 作业定义时,系统之间的身份验证与链接服务无缝连接。 令牌服务与 Microsoft Entra ID 进行连接来获取在访问 Kusto 群集时使用的安全令牌。

对于 Azure Synapse Pipelines,身份验证使用服务主体名称。 目前,Azure 数据资源管理器连接器不支持托管标识。

先决条件

限制

  • 只能使用服务主体名称来配置 Azure 数据资源管理器链接服务。
  • 在 Azure Synapse Notebooks 或 Apache Spark 作业定义中,Azure 数据资源管理器连接器使用 Microsoft Entra 直通连接到 Kusto 群集。

使用 Azure 数据资源管理器 (Kusto) 连接器

以下部分提供了一个简单的示例,说明如何将数据写入 Kusto 表以及如何从 Kusto 表读取数据。 有关详细文档,请参阅 Azure 数据资源管理器 (Kusto) 连接器项目

读取数据

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .load()

display(kustoDf)

还可以使用强制分发模式和其他高级选项进行批量读取。 有关其他信息,请参阅 Kusto 源选项参考

crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .option("clientRequestPropertiesJson", crp.toString()) \
            .option("readMode", 'ForceDistributedMode') \
            .load()

display(kustoDf) 

写入数据

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .mode("Append") \
    .save()

此外,还可以通过提供其他引入属性来批量写入数据。 有关支持的引入属性的详细信息,请访问 Kusto 引入属性参考资料

extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none

sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
        False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .option("sparkIngestionPropertiesJson", sp.toString()) \
    .option("tableCreateOptions","CreateIfNotExist") \
    .mode("Append") \
    .save()

后续步骤