Azure Cosmos DB'de akış çekme modelini değiştirme

Değişiklik akışı çekme modelini kullanarak Azure Cosmos DB değişiklik akışını kendi hızınızda kullanabilirsiniz. Değişiklik akışı işlemcisine benzer şekilde, değişikliklerin birden çok değişiklik akışı tüketicisi arasında işlenmesini paralelleştirmek için değişiklik akışı çekme modelini kullanabilirsiniz.

Değişiklik akışı işlemcisi ile karşılaştırma

Birçok senaryo, değişiklik akışı işlemcisini veya değişiklik akışı çekme modelini kullanarak değişiklik akışını işleyebilir. Çekme modelinin devamlılık belirteçleri ve değişiklik akışı işlemcisinin kira kapsayıcısı hem son işlenen öğe için yer işaretleri hem de değişiklik akışındaki öğe toplu işleri için yer işaretleri olarak çalışır.

Ancak, devamlılık belirteçlerini kiralamaya dönüştüremezsiniz veya bunun tersi de geçerlidir.

Note

Çoğu durumda, değişiklik akışından okumanız gerektiğinde, en basit seçenek değişiklik akışı işlemcisini kullanmaktır.

Çekme modelini şu senaryolarda kullanmayı düşünmelisiniz:

  • Belirli bir bölüm anahtarındaki değişiklikleri okumak için
  • Müşterinizin işlem için değişiklikleri alma hızını kontrol etmek için
  • Değişiklik akışındaki mevcut verilerin tek seferlik okunmasını gerçekleştirmek için (örneğin, veri geçişi yapmak için)

Değişiklik akışı işlemcisi ile değişiklik akışı çekme modeli arasındaki bazı önemli farklar şunlardır:

Feature Değişiklik beslemesi işlemcisi Değişiklik akış çekme sistemi
Değişiklik akışını işlemede geçerli noktanın takibini yapma Kira (Azure Cosmos DB kapta depolanır) Devamlılık belirteci (bellekte depolanır veya elle kalıcı olarak kaydedilir)
Geçmiş değişiklikleri yeniden yürütme olanağı Evet, push modeliyle Evet, çekme modeliyle
Gelecekteki değişiklikler için yoklama Kullanıcı tarafından belirtilen WithPollInterval değere göre değişiklikleri otomatik olarak denetler Manual
Yeni değişiklik olmadığında davranış değeri WithPollInterval için otomatik bekleyin ve ardından yeniden kontrol edin Durumu denetlemeli ve el ile yeniden denetlemelidir
Bir kapsayıcının tümünden değişiklikleri işlemek Evet ve aynı kapsayıcıdan tüketen birden çok iş parçacığı ve makine arasında otomatik olarak paralelleştirilmiştir Evet ve FeedRange kullanılarak elle paralelleştirilmiş
Yalnızca tek bir bölüm anahtarından gelen değişiklikleri işleme al Desteklenmiyor Yes

Note

Çekme modelini kullandığınızda, değişiklik akışı işleyicisi kullanarak okuma yapmanın aksine, yeni bir değişiklik olmadığında bu durumları açıkça ele almanız gerekir. Bu, http 304 NotModifiedile gösterilir. HTTP 200 durum koduna sahip 0 OK belge döndüren bir değişiklik akışı yanıtı, değişiklik akışının sonuna ulaştığınız ve yoklama işlemine devam ettiğiniz anlamına gelmez.

Çekme modeliyle çalışma

Çekme modelini kullanarak değişiklik akışını işlemek için bir örneği FeedIteratoroluşturun. öğesini ilk oluşturduğunuzdaFeedIterator, hem değişiklikleri okumak için başlangıç konumundan hem de için ChangeFeedStartFromkullanmak istediğiniz değerden oluşan gerekli FeedRange bir değer belirtmeniz gerekir. FeedRange bir bölüm anahtarı değerleri aralığıdır ve değişiklik akışından okunabilecek öğeleri belirlemek için belirli bir FeedIterator kullanır. Değişiklikleri işlemek istediğiniz mod için de gerekli ChangeFeedMode bir değer belirtmeniz gerekir: en son sürüm veya tüm sürümler ve silmeler. Ya ChangeFeedMode.LatestVersion ya da ChangeFeedMode.AllVersionsAndDeletes öğesini, değişiklik akışını okumak için hangi modu kullanmak istediğinizi belirtmek amacıyla kullanın. Tüm sürümleri ve silme modunu kullandığınızda, ya Now() değerini ya da belirli bir devamlılık belirteci değerini başlangıç noktası olarak seçmelisiniz.

İsteğe bağlı olarak bir ChangeFeedRequestOptions belirtebilir ve bir PageSizeHint ayarlayabilirsiniz. Bu özellik ayarlandığında, sayfa başına alınan en fazla öğe sayısını ayarlar. İzlenen koleksiyondaki işlemler saklı yordamlar aracılığıyla gerçekleştiriliyorsa, değişiklik akışındaki öğeler okunurken işlem kapsamı korunur. Sonuç olarak, alınan öğe sayısı belirtilen değerden yüksek olabilir, böylece aynı işlem tarafından değiştirilen öğeler tek bir atomik toplu işlemin parçası olarak döndürülür.

İşte varlık nesnelerini döndüren en son sürüm modunda bir FeedIterator elde etmenin bir örneği: Bu durumda, bir User nesnesi elde edilir.

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tip

3.34.0 sürümünden önceki sürümler için, ChangeFeedMode.Incremental ayarıyla en son sürüm modu kullanılabilir. Incremental ve LatestVersion, değişiklik akışının en son sürüm modunu işaret eder ve bu modlardan herhangi birini kullanan uygulamalar aynı davranışı görürler.

Tüm sürümler ve silmeler modu önizleme aşamasındadır ve önizleme .NET SDK sürümleri >= 3.32.0-previewile kullanılabilir. Aşağıda, tüm sürümlerde ve silme modlarında FeedIterator elde etme ve User nesneleri döndüren bir örnek verilmiştir:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Note

En son sürüm modunda, değiştirilen öğeyi temsil eden nesneleri ve bazı ek meta verileri alırsınız. Tüm sürümler ve silmeler modu farklı bir veri modeli döndürür.

En son sürüm modu veya tüm sürümler ve silmeler modu için tam örneği alabilirsiniz.

Akışlar aracılığıyla değişiklik akışını kullanma

FeedIterator her iki değişiklik akışı modu için iki seçenek vardır. Varlık nesnelerini döndüren örneklere ek olarak, Stream desteğiyle yanıtı da alabilirsiniz. Akışlar, verileri ilk seri durumdan çıkarmadan okumanıza olanak tanıyarak istemci kaynaklarından tasarruf etmenizi sağlar.

Aşağıda, döndüren en son sürüm modunda nasıl edinildiğini FeedIterator gösteren bir örnek verilmişti Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Kapsayıcının tamamı için değişiklikleri kullanma

parametresini FeedRangesağlamazsanızFeedIterator, kapsayıcının değişiklik akışının tamamını kendi hızınızda işleyebilirsiniz. Aşağıda, en son sürüm modunu kullanarak geçerli zamanda başlayarak tüm değişiklikleri okumaya başlayan bir örnek verilmişti:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan, değeri HasMoreResults her zaman trueolur. Değişiklik akışını okumaya çalıştığınızda ve kullanılabilir yeni bir değişiklik olmadığında, NotModified durum koduyla bir yanıt alırsınız. Bu, değişiklik ve OK durum içermeyen bir yanıt almaktan farklıdır. Daha fazla değişiklik mevcut iken boş değişiklik akışı yanıtları almanız mümkündür ve NotModified alınana kadar yoklamaya devam etmelisiniz. Yukarıdaki örnekte, NotModified değişiklikler yeniden denetlenmeden önce beş saniye beklenerek işlenir.

Bölüm anahtarı için değişiklikleri kullanma

Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz. Belirli bir bölüm anahtarı için FeedIterator alabilir ve değişiklikleri, kapsayıcının tamamında yaptığınız gibi aynı şekilde işleyebilirsiniz.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Paralelleştirme için FeedRange kullanma

Değişiklik akışı işlemcisinde, çalışma otomatik olarak birden çok tüketiciye yayılır. Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz FeedRange . A FeedRange , bölüm anahtarı değerleri aralığını temsil eder.

Kapsayıcınız için aralıkların listesinin nasıl alındığını gösteren bir örnek aşağıda verilmiştir:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Kapsayıcınız için değerlerin FeedRange listesini aldığınızda, her FeedRange için bir alırsınız.

FeedRange kullanarak, değişiklik akışının birden çok makine veya iş parçacığı arasında işlenmesini paralelleştirmek için bir FeedIterator oluşturabilirsiniz. Kapsayıcının tamamı için veya tek bir bölüm anahtarının nasıl alındığını FeedIterator gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işleyebilen birden çok FeedIterator elde etmek için FeedRanges kullanabilirsiniz.

FeedRanges kullanmak istediğiniz durumlarda, FeedRanges'i alan ve bunları bu makinelere dağıtan bir düzenleyici işlemine sahip olmanız gerekir. Bu dağıtım şöyle olabilir:

  • FeedRange.ToJsonString kullanarak bu dize değerini dağıtma. Tüketiciler bu değeri FeedRange.FromJsonString ile kullanabilir.
  • Dağıtım devam ediyorsa, FeedRange nesne başvurusunu geçirin.

Aşağıda, paralel olarak okuma yapan iki varsayımsal ayrı makine kullanılarak, kapsayıcının değişiklik akışının başından nasıl okunduğunu gösteren bir örnek verilmiştir.

1. Makine:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

2. Makine:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Devamlılık belirteçlerini kaydetme

Devam belirtecini alarak FeedIterator konumunu kaydedebilirsiniz. Devamlılık belirteci, FeedIterator'ınızın son işlenen değişikliklerini takip eden ve daha sonra FeedIterator bu noktadan devam edilmesini sağlayan bir dize değeridir. Devamlılık belirteci belirtilirse, başlangıç zamanından önceliklidir ve başlangıç değerlerinden başlar. Aşağıdaki kod, kapsayıcı oluşturma işleminden bu yana değişiklik akışını okur. Daha fazla değişiklik mevcut olmadığında, değişiklik akışı tüketiminin daha sonra sürdürülebilmesi için bir devam belirteci kalıcı olarak saklanır.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

En son sürüm modunu kullanırken, Azure Cosmos DB kapsayıcısı FeedIterator hala mevcut olduğu sürece devamlılık belirtecinin süresi hiçbir zaman dolmaz. Tüm sürümleri ve silme modu özelliğini kullanırken, FeedIterator sürekli yedeklemeler için saklama süresi içinde değişiklikler yapıldığı sürece devamlılık belirteci geçerli olur.

Sonraki Adımlar