你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用更改源估算器

适用范围: NoSQL

本文介绍如何监视更改源处理器实例在读取更改源时的进度。

为什么监视进度很重要?

更改源处理器充当一个指针,它会在更改源中向前移动,并将更改传递给委托实现。

更改源处理器部署可以按特定速率处理更改,具体取决于可用的资源,例如 CPU、内存、网络,等等。

如果此速率低于 Azure Cosmos DB 容器中发生更改的速率,处理器将开始滞后。

确定此情况后,我们就可以决定是否需要缩放更改源处理器部署。

实现更改源估算器

作为自动通知的推送模型

更改源处理器一样,更改源估算器可用作推送模型。 此估算器会度量上一个处理项(按租用容器的状态定义)和容器中的最新更改之间的差异,并将该值推送到某个代理。 进行度量的时间间隔也可以自定义,默认值为 5 秒。

例如,如果更改源处理器使用最新版模型,并定义如下:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

那么,若要初始化某个估算器来度量该处理器,正确的方式是使用 GetChangeFeedEstimatorBuilder,如下所示:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

其中的处理器和估算器共享同一 leaseContainer 和同一名称。

另外两个参数,其中一个是代理,其接收的数字表示有多少更改待处理器读取,另一个是需要进行度量的时间间隔。

例如,下面是用于接收估算的委托:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

可以将此估算发送给监视解决方案,并通过它了解进度随时间的变化情况。

作为按需详细估算

有一种替代方法可让你按需获得估算值(与推送模型相比)。 此模型还提供了更多详细信息:

  • 每次租用的估计滞后时间。
  • 实例拥有并处理每次租用,因此你可以确定该实例是否存在问题。

如果更改源处理器定义如下:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

你可以使用相同的租用配置创建估算器:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

无论何时,只要你需要,就可以以所需的频率获得详细的估算值:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

每个 ChangeFeedProcessorState 都包含租用和滞后时间信息,以及当前哪个实例拥有它。

估算器部署

更改源估算器不需部署到更改源处理器中,也不需部署到同一项目中。 建议在与处理器完全不同的独立实例上部署估算器。 单个估算器实例可以跟踪更改源处理器部署中所有租约和实例的进度。

每次估算都将消耗受监视容器和租用容器请求单位。 间隔 1 分钟的频率比较合适,频率越低,消耗的请求单位数就越高。

支持的更改源模式

更改源估算器适用于最新版本模式所有版本与删除模式。 在这两种模式下,提供的估算值都不保证是要处理的未完成更改的确切计数。

其他资源

后续步骤

现在,可以通过以下文章继续详细了解更改源处理器: