你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Cosmos DB Spark 连接器 - 吞吐量控制

适用范围: NoSQL

Spark 连接器支持使用 Apache Spark 与 Azure Cosmos DB 通信。 本文介绍吞吐量控制功能的工作原理。 请查看 GitHub 中的 Spark 示例,开始使用吞吐量控制。

提示

本文介绍了如何在 Azure Cosmos DB Spark 连接器中使用全局吞吐量控制组,但 Java SDK 中也提供了该功能。 在 SDK 中,还可以使用全局和本地吞吐量控制组来限制单个客户端连接实例上下文中的 RU 消耗量。 例如,可以将其应用于单个微服务中的不同操作,或者应用于单个数据加载程序。 查看有关如何在 Java SDK 中使用吞吐量控制的文档。

警告

请注意,网关模式尚不支持吞吐量控制。 目前,对于无服务器 Azure Cosmos DB 帐户,尝试使用 targetThroughputThreshold 定义百分比将会导致失败。 只能使用 spark.cosmos.throughputControl.targetThroughput 提供目标吞吐量/RU 的绝对值。

为什么吞吐量控制很重要?

通过限制给定 Spark 客户端可以使用的请求单位量,吞吐量控制有助于隔离针对容器运行的应用程序的性能需求。

有几个高级方案可受益于客户端吞吐量控制:

  • 不同的操作和任务具有不同的优先级 - 由于数据引入或复制活动,可能需要防止正常事务受到限制。 某些操作和/或任务对延迟不敏感,并且比其他操作和/或任务更能容忍被限制。

  • 为不同的最终用户/租户提供合理性/隔离 - 一个应用程序通常有许多最终用户。 某些用户可能会发送过多请求,这会占用所有可用的吞吐量,从而导致其他用户受到限制。

  • 不同 Azure Cosmos DB 客户端之间的吞吐量负载均衡 - 在某些情况下,必须确保所有客户端获得合理的吞吐量份额

吞吐量控制可根据需要实现更精细的 RU 速率限制。

吞吐量控制的工作原理是什么?

配置 Spark 连接器的吞吐量控制时,首先创建一个容器,该容器将定义吞吐量控制元数据,并启用分区键 groupIdttl。 在这里,我们使用 Spark SQL 创建此容器,并将其命名为 ThroughputControl

    %sql
    CREATE TABLE IF NOT EXISTS cosmosCatalog.`database-v4`.ThroughputControl 
    USING cosmos.oltp
    OPTIONS(spark.cosmos.database = 'database-v4')
    TBLPROPERTIES(partitionKeyPath = '/groupId', autoScaleMaxThroughput = '4000', indexingPolicy = 'AllProperties', defaultTtlInSeconds = '-1');

注意

上面的示例创建具有自动缩放功能的容器。 如果更喜欢标准预配,可以将 autoScaleMaxThroughput 替换为 manualThroughput

重要

分区键必须定义为 /groupId,并且必须启用 ttl,这样吞吐量控制功能才能正常工作。

在给定应用程序的 Spark 配置中,我们可以为工作负载指定参数。 以下示例将吞吐量控制设置为 enabled,并定义吞吐量控制组 nametargetThroughputThreshold。 我们还定义通过控制组维护的 databasecontainer

    "spark.cosmos.throughputControl.enabled" -> "true",
    "spark.cosmos.throughputControl.name" -> "SourceContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.95", 
    "spark.cosmos.throughputControl.globalControl.database" -> "database-v4", 
    "spark.cosmos.throughputControl.globalControl.container" -> "ThroughputControl"

在上面的示例中,targetThroughputThreshold 被定义为 0.95,因此,当客户端消耗超过分配给容器的吞吐量的 95% (+/- 5% - 10%) 时,就会发生速率限制(并且会重试请求)。 此配置以文档的形式存储在吞吐量容器中,如下所示:

    {
        "id": "ZGF0YWJhc2UtdjQvY3VzdG9tZXIvU291cmNlQ29udGFpbmVyVGhyb3VnaHB1dENvbnRyb2w.info",
        "groupId": "database-v4/customer/SourceContainerThroughputControl.config",
        "targetThroughput": "",
        "targetThroughputThreshold": "0.95",
        "isDefault": true,
        "_rid": "EHcYAPolTiABAAAAAAAAAA==",
        "_self": "dbs/EHcYAA==/colls/EHcYAPolTiA=/docs/EHcYAPolTiABAAAAAAAAAA==/",
        "_etag": "\"2101ea83-0000-1100-0000-627503dd0000\"",
        "_attachments": "attachments/",
        "_ts": 1651835869
    }

注意

吞吐量控制不会对每个操作执行 RU 预计算。 它反而会根据响应头跟踪操作后的 RU 使用情况。 因此,吞吐量控制是基于近似值,并且不保证组在任何给定时间都有可用的吞吐量。 这意味着如果配置的 RU 太低,以至于单个操作就可以用完,则吞吐量控制无法避免 RU 超出配置的限制。 因此,当配置的限制高于给定控制组中的客户端可执行的任何单个操作时,吞吐量控制效果最佳。 考虑到这一点,当通过查询或更改源进行读取时,应将 spark.cosmos.read.maxItemCount 中的页面大小(默认 1000)配置为适度的数量,以便可以更高频率地重新计算客户端吞吐量控制,从而在任何给定时间更准确地反映出来。 但是,当对批量写入作业使用吞吐量控制时,单个请求中执行的文档数量将根据限制速率自动调整,以使吞吐量控制尽早启动。

警告

targetThroughputThreshold 是不可变的。 如果更改目标吞吐量阈值,则会创建一个新的吞吐量控制组(但只要使用版本 4.10.0 或更高版本,它就可以具有相同的名称)。 如果要确保所有正在使用组的 Spark 作业都立即使用新阈值,则需要重新启动它们(否则它们将在下一次重新启动后使用新阈值)。

对于使用吞吐量控制组的每个 Spark 客户端,将在 ThroughputControl 容器中创建一条记录(总共几秒钟),因此,如果 Spark 客户端不再主动运行,文档将很快消失,如下所示:

    {
        "id": "Zhjdieidjojdook3osk3okso3ksp3ospojsp92939j3299p3oj93pjp93jsps939pkp9ks39kp9339skp",
        "groupId": "database-v4/customer/SourceContainerThroughputControl.config",
        "_etag": "\"1782728-w98999w-ww9998w9-99990000\"",
        "ttl": 10,
        "initializeTime": "2022-06-26T02:24:40.054Z",
        "loadFactor": 0.97636377638898,
        "allocatedThroughput": 484.89444487847,
        "_rid": "EHcYAPolTiABAAAAAAAAAA==",
        "_self": "dbs/EHcYAA==/colls/EHcYAPolTiA=/docs/EHcYAPolTiABAAAAAAAAAA==/",
        "_etag": "\"2101ea83-0000-1100-0000-627503dd0000\"",
        "_attachments": "attachments/",
        "_ts": 1651835869
    }

在每个客户端记录中,loadFactor 属性表示给定客户端上相对于吞吐量控制组中其他客户端的负载。 allocatedThroughput 属性显示当前分配给此客户端的 RU 数量。 Spark 连接器将根据每个客户端的负载调整分配的吞吐量。 这将确保每个客户端获得与其负载成比例的可用吞吐量份额,并且所有客户机一起使用的吞吐量不会超过分配给它们所属的吞吐量控制组的总数。

后续步骤