Aracılığıyla paylaş


Azure Cosmos DB'deki değişiklik akışı işlemcisi

UYGULANANLAR: NoSQL

Değişiklik akışı işlemcisi, Azure Cosmos DB .NET V3 ve Java V4 SDK'larının bir parçasıdır. Değişiklik akışını okuma sürecini basitleştirir ve olay işlemeyi birden çok tüketiciye etkili bir şekilde dağıtır.

Değişiklik akışı işlemcisini kullanmanın temel avantajı, değişiklik akışındaki tüm olayların "en az bir kez" teslimini sağlayan hataya dayanıklı tasarımıdır.

Desteklenen SDK’lar

.Net V3 Java Node.JS Python

Değişiklik akışı işlemcisinin bileşenleri

Değişiklik akışı işlemcisinin dört ana bileşeni vardır:

  • İzlenen kapsayıcı: İzlenen kapsayıcıda değişiklik akışının oluşturulduğu veriler bulunur. İzlenen kapsayıcıda yapılan eklemeler veya güncelleştirmeler, kapsayıcının değişiklik akışına yansıtılır.

  • Kira kapsayıcısı: Kira kapsayıcısı durum depolaması işlevi görür ve değişiklik akışının birden çok çalışan arasında işlenmesini koordine eder. Kira kapsayıcısı, izlenen kapsayıcı ile aynı hesapta veya ayrı bir hesapta depolanabilir.

  • İşlem örneği: İşlem örneği, değişiklikleri dinlemek için değişiklik akışı işlemcisini barındırıyor. Platforma bağlı olarak, sanal makine (VM), Kubernetes podu, Azure Uygulaması Hizmeti örneği veya gerçek bir fiziksel makine ile temsil edilebilir. İşlem örneği, bu makale boyunca örnek adı olarak adlandırılan benzersiz bir tanımlayıcıya sahiptir.

  • Temsilci: Temsilci, geliştirici olarak değişiklik akışı işlemcisinin okuduğu her değişiklik toplu işlemiyle ne yapmak istediğinizi tanımlayan koddur.

Değişiklik akışı işlemcisinin bu dört öğesinin birlikte nasıl çalıştığını daha fazla anlamak için aşağıdaki diyagramda bir örneğe göz atalım. İzlenen kapsayıcı öğeleri depolar ve bölüm anahtarı olarak 'City' kullanır. Bölüm anahtarı değerleri, öğeleri içeren aralıklara (her aralık fiziksel bir bölümü temsil eder) dağıtılır.

Diyagramda iki işlem örneği gösterilir ve değişiklik akışı işlemcisi işlem dağıtımını en üst düzeye çıkarmak için her örneğe farklı aralıklar atar. Her örneğin farklı, benzersiz bir adı vardır.

Her aralık paralel olarak okunur. Bir aralığın ilerleme durumu, kira belgesi aracılığıyla kira kapsayıcısında yer alan diğer aralıklardan ayrı tutulur. Kiralamaların birleşimi, değişiklik akışı işlemcisinin geçerli durumunu temsil eder.

Akış işlemcisini değiştirme örneği

Değişiklik akışı işlemcisini uygulama

.NET'teki değişiklik akışı işlemcisi en son sürüm modu ve tüm sürümler ve silmeler modu için kullanılabilir. Tüm sürümler ve silmeler modu önizleme aşamasındadır ve sürümünden 3.40.0-preview.0itibaren değişiklik akışı işlemcisi için desteklenir. Her iki mod için de giriş noktası her zaman izlenen kapsayıcıdır.

En son sürüm modunu kullanarak okumak için bir Container örnekte öğesini çağırırsınız GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Tüm sürümleri ve silme modunu kullanarak okumak için örnekten şunu çağırın GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes Container :

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Her iki mod için de ilk parametre, bu işlemcinin hedefini açıklayan ayrı bir addır. İkinci ad, değişiklikleri işleyen temsilci uygulamasıdır.

En son sürüm modu için bir temsilci örneği aşağıda verilmişti:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Tüm sürümler ve silmeler modu için bir temsilci örneği aşağıda verilmiştir:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Daha sonra kullanarak işlem örneği adını veya benzersiz tanımlayıcıyı WithInstanceNametanımlarsınız. İşlem örneği adı, dağıttığınız her işlem örneği için benzersiz ve farklı olmalıdır. kullanarak WithLeaseContainerkira durumunu korumak için kapsayıcıyı ayarlarsınız.

Çağrısı Build , çağırarak StartAsyncbaşlatabileceğiniz işlemci örneğini verir.

Not

Yukarıdaki kod parçacıkları GitHub'daki örneklerden alınmıştır. En son sürüm modu veya tüm sürümler ve silmeler modu için örneği alabilirsiniz.

İşleme yaşam döngüsü

Bir konak örneğinin normal yaşam döngüsü şu şekildedir:

  1. Değişiklik akışını okuyun.
  2. Değişiklik yoksa önceden tanımlanmış bir süre boyunca uyku moduna geçin (Oluşturucu'da kullanılarak WithPollInterval özelleştirilebilir) ve #1'e gidin.
  3. Değişiklikler varsa, bunları temsilciye gönderin.
  4. Temsilci değişiklikleri başarıyla işlemeyi tamamladığında, kira depoyu en son işlenen zaman noktasıyla güncelleştirin ve #1'e gidin.

Hata işleme

Değişiklik akışı işlemcisi kullanıcı kodu hatalarına dayanıklıdır. Temsilci uygulamanızda işlenmeyen bir özel durum varsa (4. adım), belirli bir değişiklik toplu işlemini işleyen iş parçacığı durdurulur ve sonunda yeni bir iş parçacığı oluşturulur. Yeni iş parçacığı, kira deposunun bu bölüm anahtarı değerleri aralığı için kaydettiği en son noktayı denetler. Yeni iş parçacığı oradan yeniden başlatılır ve temsilciye aynı değişiklik toplu işlemini etkili bir şekilde gönderir. Temsilciniz değişiklikleri doğru işleyene kadar bu davranış devam eder ve değişiklik akışı işlemcisinin "en az bir kez" garantisi olmasının nedeni budur.

Not

Yalnızca bir senaryoda, bir grup değişiklik yeniden denenmiyor. Hata ilk temsilci yürütmesinde gerçekleşirse, kiralama deposunun yeniden denemede kullanılacak daha önce kaydedilmiş bir durumu yoktur. Bu gibi durumlarda yeniden deneme, son toplu işlemi içerebilen veya içeremeyen ilk başlangıç yapılandırmasını kullanır.

Değişiklik akışı işlemcinizin sürekli olarak aynı değişiklik toplu işlemini yeniden denemesini önlemek için, hata iletisi kuyruğuna özel durumlarda belge yazmak için temsilci kodunuza mantık eklemeniz gerekir. Bu tasarım, işlenmemiş değişiklikleri takip edebilmenizi ve gelecekteki değişiklikleri işlemeye devam edebilmenizi sağlar. Hata iletisi kuyruğu başka bir Azure Cosmos DB kapsayıcısı olabilir. Tam veri deposu önemli değildir. Yalnızca işlenmemiş değişikliklerin kalıcı olmasını istiyorsunuz.

Değişiklik akışı tahmin aracını, değişiklik akışı işlemcisi örneklerinizin değişiklik akışını okurken ilerleme durumunu izlemek için de kullanabilir veya temel alınan hataları algılamak için yaşam döngüsü bildirimlerini kullanabilirsiniz.

Yaşam döngüsü bildirimleri

Değişiklik akışı işlemcisini yaşam döngüsündeki ilgili herhangi bir olaya bağlayabilirsiniz. Bunlardan birine veya tümüne bildirim almayı seçebilirsiniz. Öneri, en azından hata bildirimini kaydetmektir:

  • geçerli konak işlemeye başlamak için bir kira aldığı zaman bildirim almak için WithLeaseAcquireNotification bir işleyici kaydedin.
  • Geçerli konak bir kirayı serbest bıraktığında ve işlemeyi durdurduğunda bildirim almak için WithLeaseReleaseNotification bir işleyici kaydedin.
  • geçerli konak işleme sırasında bir özel durumla karşılaştığında bildirim almak için WithErrorNotification için bir işleyici kaydedin. Kaynağın kullanıcı temsilcisi (işlenmeyen özel durum) veya işlemcinin izlenen kapsayıcıya erişmeye çalıştığında karşılaştığı bir hata (örneğin, ağ sorunları) olduğunu ayırt edebilmeniz gerekir.

Yaşam döngüsü bildirimleri her iki değişiklik akışı modunda da kullanılabilir. En son sürüm modunda yaşam döngüsü bildirimleri örneği aşağıda verilmiştir:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Dağıtım birimi

Tek bir değişiklik akışı işlemcisi dağıtım birimi, aynı değere processorName ve aynı kira kapsayıcısı yapılandırmasına sahip bir veya daha fazla işlem örneğinden oluşur, ancak farklı örnek adlarından oluşur. Değişiklikler için her ünitenin farklı bir iş akışına sahip olduğu ve her dağıtım biriminin bir veya daha fazla örnekten oluştuğu birçok dağıtım biriminiz olabilir.

Örneğin, kapsayıcınızda her değişiklik olduğunda dış API'yi tetikleyen bir dağıtım biriminiz olabilir. Başka bir dağıtım birimi, her değişiklik olduğunda verileri gerçek zamanlı olarak taşıyabilir. İzlenen kapsayıcınızda bir değişiklik olduğunda tüm dağıtım birimlerinize bildirim gönderilir.

Dinamik ölçeklendirme

Daha önce belirtildiği gibi, bir dağıtım birimi içinde bir veya daha fazla işlem örneğiniz olabilir. Dağıtım birimi içindeki işlem dağıtımından yararlanmak için tek önemli gereksinimler şunlardır:

  • Tüm örnekler aynı kira kapsayıcı yapılandırmasına sahip olmalıdır.
  • Tüm örneklerin için processorNameaynı değere sahip olması gerekir.
  • Her örneğin farklı bir örnek adına (WithInstanceName) sahip olması gerekir.

Bu üç koşul geçerliyse, değişiklik akışı işlemcisi kira kapsayıcısında bulunan tüm kiraları bu dağıtım biriminin çalışan tüm örneklerine dağıtır ve eşit dağıtım algoritması kullanarak işlemi paralelleştirir. Kiralama her zaman bir örneğe aittir, bu nedenle örnek sayısı kira sayısından büyük olmamalıdır.

Örnek sayısı artabilir ve küçülebilir. Değişiklik akışı işlemcisi, yükü uygun şekilde yeniden dağıtarak dinamik olarak ayarlar.

Ayrıca, değişiklik akışı işlemcisi, kapsayıcının aktarım hızı veya depolaması artarsa kapsayıcının ölçeğini dinamik olarak ayarlayabilir. Kapsayıcınız büyüdüğünde, değişiklik akışı işlemcisi kiraları dinamik olarak artırarak ve yeni kiraları mevcut örnekler arasında dağıtarak senaryoyu şeffaf bir şekilde işler.

Başlangıç saati

Varsayılan olarak, bir değişiklik akışı işlemcisi ilk kez başlatıldığında, kira kapsayıcısını başlatır ve işleme yaşam döngüsünü başlatır. Değişiklik akışı işlemcisi ilk kez başlatılmadan önce izlenen kapsayıcıda gerçekleşen değişiklikler algılanmamıştır.

Önceki bir tarih ve saatten okuma

Oluşturucu uzantısına bir örneğini geçirerek belirli bir tarih ve saatte başlayan değişiklikleri okumak için değişiklik akışı işlemcisini DateTime WithStartTime başlatmak mümkündür:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Değişiklik akışı işlemcisi söz konusu tarih ve saat için başlatılır ve daha sonra gerçekleşen değişiklikleri okumaya başlar.

Baştan okuma

Veri geçişleri gibi diğer senaryolarda veya bir kapsayıcının geçmişinin tamamını analiz ediyorsanız, değişiklik akışını söz konusu kapsayıcının ömrünün başından itibaren okumanız gerekir. Oluşturucu uzantısında kullanabilirsinizWithStartTime, ancak şu örnekte olduğu gibi en düşük DateTime değerin UTC gösterimini oluşturan değerini geçirinDateTime.MinValue.ToUniversalTime():

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Değişiklik akışı işlemcisi başlatılır ve kapsayıcının ömrünün başından itibaren değişiklikleri okumaya başlar.

Not

Bu özelleştirme seçenekleri yalnızca değişiklik akışı işlemcisinin başlangıç noktasını ayarlamak için çalışır. Kira kapsayıcısı ilk kez başlatıldıktan sonra, bu seçeneklerin değiştirilmesinin hiçbir etkisi olmaz.

Başlangıç noktasını özelleştirmek yalnızca en son sürüm değişiklik akışı modunda kullanılabilir. Tüm sürümleri ve silme modunu kullanırken, işlemci başlatıldığından itibaren okumaya başlamanız veya hesabınızın sürekli yedekleme saklama süresi içinde olan önceki bir kira durumundan devam etmeniz gerekir.

Akışı ve sağlanan aktarım hızını değiştirme

İzlenen kapsayıcıda değişiklik akışı okuma işlemleri istek birimlerini tüketir. İzlenen kapsayıcınızda azaltma olmadığından emin olun. Azaltma, işlemcilerinizde değişiklik akışı olaylarını almada gecikmeler ekler.

Kira kapsayıcısı üzerindeki işlemler (durumu güncelleştirme ve koruma) istek birimlerini kullanır. Aynı kira kapsayıcısını kullanan örnek sayısı ne kadar yüksekse, istek birimlerinin olası tüketimi de o kadar yüksektir. Kira kapsayıcınızda azaltma olmadığından emin olun. Azaltma, değişiklik akışı olaylarının alınmasında gecikmeler ekler. Azaltma işlemi bile tamamen sonlandırabilir.

Kira kapsayıcısını paylaşma

Kiralama kapsayıcılarını birden çok dağıtım birimi arasında paylaşabilirsiniz. Paylaşılan kira kapsayıcısında, her dağıtım birimi farklı bir izlenen kapsayıcıyı dinler veya için processorNamefarklı bir değere sahiptir. Bu yapılandırmada, her dağıtım birimi kira kapsayıcısında bağımsız bir durum tutar. Sağlanan aktarım hızının tüm dağıtım birimleri için yeterli olduğundan emin olmak için kira kapsayıcısı üzerindeki istek birimi tüketimini gözden geçirin.

Gelişmiş kira yapılandırması

Değişiklik akışı işlemcisinin çalışma şeklini üç temel yapılandırma etkileyebilir. Her yapılandırma, kira kapsayıcısı üzerindeki istek birimi tüketimini etkiler. Değişiklik akışı işlemcisini oluştururken bu yapılandırmalardan birini ayarlayabilirsiniz, ancak bunları dikkatle kullanabilirsiniz:

  • Kira Alma: Varsayılan olarak her 17 saniyede bir. Konak, kiralama deposunun durumunu düzenli aralıklarla denetler ve dinamik ölçeklendirme işleminin bir parçası olarak kiraları almayı göz önünde bulundurun. Bu işlem, kira kapsayıcısı üzerinde bir Sorgu yürütülerek gerçekleştirilir. Bu değerin azaltılması yeniden dengelemeyi ve kira alma işlemini hızlandırır, ancak kira kapsayıcısı üzerindeki istek birimi tüketimini artırır.
  • Kira Süre Sonu: Varsayılan olarak 60 saniyedir. Bir kiralamanın başka bir konak tarafından alınmadan önce herhangi bir yenileme etkinliği olmadan var olabileceği maksimum süreyi tanımlar. Bir konak kilitlendiğinde, sahip olduğu kiralar bu sürenin ardından ve yapılandırılmış yenileme aralığına ek olarak diğer konaklar tarafından alınır. Bu değerin azaltılması, konak kilitlenmesinden sonra kurtarmanın daha hızlı olmasını sağlar, ancak süre sonu değeri hiçbir zaman yenileme aralığından daha düşük olmamalıdır.
  • Kira Yenileme: Varsayılan olarak, her 13 saniyede bir. Kiraya sahip olan bir konak, kullanılacak yeni bir değişiklik olmasa bile kirayı düzenli aralıklarla yeniler. Bu işlem, kiralamada Bir Değiştir yürütülerek gerçekleştirilir. Bu değerin azaltılması, konak kilitlenmesi nedeniyle kaybolan kiraları algılamak için gereken süreyi azaltır, ancak kira kapsayıcısı üzerindeki istek birimi tüketimini artırır.

Değişiklik akışı işlemcisinin barındırıldığı yer

Değişiklik akışı işlemcisi, uzun süre çalışan işlemleri veya görevleri destekleyen herhangi bir platformda barındırılabilir. Burada bazı örnekler verilmiştir:

Kiralama kapsayıcısı durumu koruduğu için değişiklik akışı işlemcisi kısa süreli ortamlarda çalışabildiğinden, bu ortamların başlangıç döngüsü bildirimleri almak için gereken süreye gecikmeler ekler (ortam her başlatıldığında işlemciyi başlatma yükü nedeniyle).

Rol tabanlı erişim gereksinimleri

Kimlik doğrulama mekanizması olarak Microsoft Entra Id kullanırken, kimliğin uygun izinlere sahip olduğundan emin olun:

  • İzlenen kapsayıcıda:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • Kira kapsayıcısı üzerinde:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Ek kaynaklar

Sonraki adımlar

Aşağıdaki makalelerde değişiklik akışı işlemcisi hakkında daha fazla bilgi edinin: