Azure Cosmos DB'de akış çekme modelini değiştirme

UYGULANANLAR: NOSQL

Azure Cosmos DB değişiklik akışını kendi hızınızda kullanmak için değişiklik akışı çekme modelini kullanabilirsiniz. Değişiklik akışı işlemcisine benzer şekilde, değişikliklerin birden çok değişiklik akışı tüketicisinde işlenmesini paralelleştirmek için değişiklik akışı çekme modelini kullanabilirsiniz.

Değişiklik akışı işlemcisi ile karşılaştırma

Birçok senaryo, değişiklik akışı işlemcisini veya değişiklik akışı çekme modelini kullanarak değişiklik akışını işleyebilir. Çekme modelinin devamlılık belirteçleri ve değişiklik akışı işlemcisinin kira kapsayıcısı, hem son işlenen öğe için yer işaretleri hem de değişiklik akışındaki öğeler grubu olarak çalışır.

Ancak, devamlılık belirteçlerini kiraya dönüştüremezsiniz veya bunun tersi de geçerlidir.

Not

Çoğu durumda, değişiklik akışından okumanız gerektiğinde, en basit seçenek değişiklik akışı işlemcisini kullanmaktır.

Çekme modelini şu senaryolarda kullanmayı düşünmelisiniz:

  • Belirli bir bölüm anahtarındaki değişiklikleri okumak için.
  • İstemcinizin işlenmek üzere değişiklik alma hızını denetlemek için.
  • Değişiklik akışındaki mevcut verilerin tek seferlik okunmasını gerçekleştirmek için (örneğin, veri geçişi yapmak için).

Değişiklik akışı işlemcisi ile değişiklik akışı çekme modeli arasındaki bazı önemli farklar şunlardır:

Özellik Değişiklik akışı işlemcisi Değişiklik akışı çekme modeli
Değişiklik akışını işlemedeki geçerli noktayı izleme Kira (Azure Cosmos DB kapsayıcısında depolanır) Devamlılık belirteci (bellekte depolanır veya el ile kalıcı hale gelir)
Geçmiş değişiklikleri yeniden yürütme olanağı Evet, gönderme modeliyle Evet, çekme modeliyle
Gelecekteki değişiklikler için yoklama Kullanıcı tarafından belirtilen WithPollInterval değere göre değişiklikleri otomatik olarak denetler El ile
Yeni değişiklik yapılmadığı durumlarda davranış değerini WithPollInterval otomatik olarak bekleyin ve ardından yeniden denetleyin Durumu denetlemeli ve el ile yeniden denetlemelidir
Kapsayıcının tamamından değişiklikleri işleme Evet ve aynı kapsayıcıdan kullanan birden çok iş parçacığı ve makine arasında otomatik olarak paralelleştirilmiştir Evet ve kullanarak el ile paralelleştirilmiş FeedRange
Değişiklikleri yalnızca tek bir bölüm anahtarından işleme Desteklenmez Yes

Not

Çekme modelini kullanırken, değişiklik akışı işlemcisini kullanarak okumanın aksine, yeni değişiklik olmayan durumları açıkça işlemeniz gerekir.

Çekme modeliyle çalışma

Çekme modelini kullanarak değişiklik akışını işlemek için bir örneği FeedIteratoroluşturun. İlk oluşturduğunuzdaFeedIterator, hem değişiklikleri okumak için başlangıç konumundan hem de için FeedRangekullanmak istediğiniz değerden oluşan gerekli ChangeFeedStartFrom bir değer belirtmeniz gerekir. FeedRange bölüm anahtarı değerleri aralığıdır ve değişiklik akışından okunabilecek öğeleri belirli FeedIteratorbir kullanarak belirtir. Değişiklikleri işlemek istediğiniz mod için de gerekli ChangeFeedMode bir değer belirtmeniz gerekir: en son sürüm veya tüm sürümler ve silmeler. ChangeFeedMode.LatestVersion Değişiklik akışını okumak için hangi modu kullanmak istediğinizi belirtmek için veya ChangeFeedMode.AllVersionsAndDeletes kullanın. Tüm sürümleri ve silme modunu kullandığınızda, belirli bir devamlılık belirtecinden veya değerinden Now() bir değişiklik akışı başlangıcı seçmeniz gerekir.

İsteğe bağlı olarak bir PageSizeHintayarlamak için belirtebilirsinizChangeFeedRequestOptions. Ayarlandığında, bu özellik sayfa başına alınan en fazla öğe sayısını ayarlar. İzlenen koleksiyondaki işlemler saklı yordamlar aracılığıyla gerçekleştiriliyorsa, değişiklik akışındaki öğeler okunurken işlem kapsamı korunur. Sonuç olarak, alınan öğelerin sayısı belirtilen değerden yüksek olabilir, böylece aynı işlem tarafından değiştirilen öğeler tek bir atomik toplu işin parçası olarak döndürülür.

Burada, varlık nesnelerini döndüren en son sürüm modunda nasıl edinildiğini FeedIterator gösteren bir örnek verilmiştir. Bu örnekte bir User nesne verilmiştir:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

İpucu

sürümünden 3.34.0önce, ayarıyla ChangeFeedMode.Incrementalen son sürüm modu kullanılabilir. LatestVersion Hem hem de Incremental değişiklik akışının en son sürüm moduna bakın ve her iki modu kullanan uygulamalar da aynı davranışı görür.

Tüm sürümler ve silmeler modu önizleme aşamasındadır ve önizleme .NET SDK sürümleri >= 3.32.0-previewile kullanılabilir. Aşağıda, dinamik nesneler döndüren tüm sürümler ve silmeler modunda elde etme FeedIterator örneği verilmiştir:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Not

En son sürüm modunda, değiştirilen öğeyi temsil eden nesneleri ve bazı ek meta verileri alırsınız. Tüm sürümler ve silmeler modu farklı bir veri modeli döndürür. Daha fazla bilgi için bkz. Yanıt nesnesini ayrıştırma.

Akışlar aracılığıyla değişiklik akışını kullanma

FeedIterator her iki değişiklik akışı modu için iki seçenek vardır. Varlık nesnelerini döndüren örneklere ek olarak, yanıtı destekle Stream de alabilirsiniz. Akışlar, verileri önce seri durumdan çıkarmadan okumanıza olanak sağlar, bu nedenle istemci kaynaklarına kaydedersiniz.

Aşağıda, döndüren en son sürüm modunda nasıl edinildiğini FeedIterator gösteren bir örnek verilmişti Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Kapsayıcının tamamı için değişiklikleri kullanma

parametresini FeedIteratorsağlamazsanızFeedRange, kapsayıcının değişiklik akışının tamamını kendi hızınızda işleyebilirsiniz. Aşağıda, en son sürüm modunu kullanarak geçerli zamanda başlayarak tüm değişiklikleri okumaya başlayan bir örnek verilmişti:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan, değeri HasMoreResults her zaman trueolur. Değişiklik akışını okumaya çalıştığınızda ve kullanılabilir yeni bir değişiklik olmadığında, durum bilgisi olan NotModified bir yanıt alırsınız. Yukarıdaki örnekte, değişiklikler yeniden denetlenmeden önce beş saniye beklenerek işlenir.

Bölüm anahtarı için değişiklikleri kullanma

Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz. Belirli bir bölüm anahtarı için alabilir FeedIterator ve değişiklikleri kapsayıcının tamamında yaptığınız gibi işleyebilirsiniz.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Paralelleştirme için FeedRange kullanma

Değişiklik akışı işlemcisinde iş otomatik olarak birden çok tüketiciye yayılır. Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz FeedRange . A FeedRange , bölüm anahtarı değerleri aralığını temsil eder.

Kapsayıcınız için aralıkların listesinin nasıl alındığını gösteren bir örnek aşağıda verilmiştir:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Kapsayıcınız için değerlerin FeedRange listesini aldığınızda, fiziksel bölüm başına bir tane FeedRange alırsınız.

kullanarak FeedRange, değişiklik akışının birden çok makine veya iş parçacığı arasında işlenmesini paralelleştirmek için bir FeedIterator oluşturabilirsiniz. Kapsayıcının tamamı için veya tek bir bölüm anahtarının nasıl alındığını FeedIterator gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işleyebilen birden çok FeedIterator elde etmek için FeedRanges kullanabilirsiniz.

FeedRanges kullanmak istediğiniz durumlarda, FeedRanges'i alan ve bunları bu makinelere dağıtan bir düzenleyici işlemine sahip olmanız gerekir. Bu dağıtım şöyle olabilir:

  • Bu dize değerini kullanma FeedRange.ToJsonString ve dağıtma. Tüketiciler bu değeri ile FeedRange.FromJsonStringkullanabilir.
  • Dağıtım devam ediyorsa, nesne başvuruyu FeedRange geçirin.

Aşağıda, paralel olarak okunan iki varsayımsal ayrı makine kullanılarak kapsayıcının değişiklik akışının başından itibaren nasıl okunduğunu gösteren bir örnek verilmiştir:

Makine 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Makine 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Devam belirteçlerini kaydetme

Devamlılık belirtecini alarak konumunuzu FeedIterator kaydedebilirsiniz. Devamlılık belirteci, FeedIterator'ınızın son işlenen değişikliklerini izleyen ve daha sonra bu noktada sürdürülmesini sağlayan bir dize değeridir FeedIterator . Belirtilirse devamlılık belirteci başlangıç zamanından önceliklidir ve başlangıç değerlerinden başlar. Aşağıdaki kod, kapsayıcı oluşturma işleminden bu yana değişiklik akışını okur. Daha fazla değişiklik sağlandıktan sonra, değişiklik akışı tüketiminin daha sonra sürdürülebilmesi için bir devamlılık belirteci kalıcı hale gelir.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

En son sürüm modunu kullanırken, Azure Cosmos DB kapsayıcısı FeedIterator hala mevcut olduğu sürece devamlılık belirtecinin süresi hiçbir zaman dolmaz. Tüm sürümleri ve silme modunu kullanırken, FeedIterator sürekli yedeklemeler için bekletme penceresinde değişiklikler olduğu sürece devamlılık belirteci geçerli olur.

Sonraki adımlar