Aracılığıyla paylaş


Değişiklik akışı tahmin aracını kullanma

UYGULANANLAR: NoSQL

Bu makalede, değişiklik akışı işlemcisi örneklerinizin değişiklik akışını okurken ilerleme durumunu nasıl izleyebildiğiniz açıklanmaktadır.

İzleme ilerlemesi neden önemlidir?

Değişiklik akışı işlemcisi, değişiklik akışınızda ileriye doğru hareket eden ve değişiklikleri temsilci uygulamasına teslim eden bir işaretçi işlevi görür.

Değişiklik akışı işlemcisi dağıtımınız CPU, bellek, ağ gibi kullanılabilir kaynaklarına göre değişiklikleri belirli bir hızda işleyebilir.

Bu hız, Değişikliklerinizin Azure Cosmos DB kapsayıcınızda gerçekleşme hızından daha yavaşsa işlemciniz gecikmeye başlar.

Bu senaryoyı tanımlamak, değişiklik akışı işlemci dağıtımımızı ölçeklendirmemiz gerekip gerekmediğini anlamanıza yardımcı olur.

Değişiklik akışı tahmin aracını uygulama

Otomatik bildirimler için anında iletme modeli olarak

Değişiklik akışı işlemcisi gibi, değişiklik akışı tahmin aracı da anında iletme modeli olarak çalışabilir. Tahmin aracı, son işlenen öğe (kira kapsayıcısının durumuyla tanımlanır) ile kapsayıcıdaki en son değişiklik arasındaki farkı ölçecek ve bu değeri bir temsilciye gönderecektir. Ölçümün alındığı aralık, varsayılan 5 saniyelik bir değerle de özelleştirilebilir.

Örneğin, değişiklik akışı işlemciniz şu şekilde tanımlanmışsa:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

İşlemcinin şu şekilde kullanacağını GetChangeFeedEstimatorBuilder ölçmek için tahmin aracı başlatmanın doğru yolu:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Burada hem işlemci hem de tahmin aracı aynı leaseContainer ve aynı adı paylaşır.

Diğer iki parametre, işlemci tarafından kaç değişikliğin okunmasını beklediğini ve bu ölçümün alınmasını istediğiniz zaman aralığını gösteren bir sayı alan temsilcidir.

Tahmini alan bir temsilci örneği:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Bu tahmini izleme çözümünüze gönderebilir ve ilerleme durumunuzun zaman içinde nasıl çalıştığını anlamak için kullanabilirsiniz.

İsteğe bağlı ayrıntılı tahmin olarak

Gönderme modelinin aksine, tahminleri isteğe bağlı olarak elde etmenizi sağlayan bir alternatif vardır. Bu model ayrıca daha ayrıntılı bilgiler sağlar:

  • Kira başına tahmini gecikme.
  • Her kiraya sahip olan ve işleyen örnek, bir örnekte sorun olup olmadığını belirleyebilirsiniz.

Değişiklik akışı işlemciniz şu şekilde tanımlanmışsa:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Tahmin aracını aynı kira yapılandırmasıyla oluşturabilirsiniz:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

İstediğiniz zaman, ihtiyacınız olan sıklıkta ayrıntılı tahmin elde edebilirsiniz:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Her ChangeFeedProcessorState biri kiralama ve gecikme bilgilerini ve aynı zamanda buna sahip olan geçerli örneğin kim olduğunu içerir.

Tahmin aracı dağıtımı

Değişiklik akışı tahmin aracının, değişiklik akışı işlemcinizin bir parçası olarak dağıtılması veya aynı projenin parçası olması gerekmez. Tahmin aracını işlemcilerinizden bağımsız ve tamamen farklı bir örneğe dağıtmanızı öneririz. Tek bir tahmin örneği, değişiklik akışı işlemcisi dağıtımınızdaki tüm kiraların ve örneklerin ilerleme durumunu izleyebilir.

Her tahmin, izlenen ve kira kapsayıcılarınızdaki istek birimlerini kullanır. Aralarında 1 dakikalık bir sıklık iyi bir başlangıç noktasıdır, sıklık ne kadar düşük olursa, kullanılan istek birimleri o kadar yüksek olur.

Ek kaynaklar

Sonraki adımlar

Şimdi aşağıdaki makalelerde değişiklik akışı işlemcisi hakkında daha fazla bilgi edinebilirsiniz: