Azure Cosmos DB'de akış çekme modelini değiştirme
UYGULANANLAR: NOSQL
Azure Cosmos DB değişiklik akışını kendi hızınızda kullanmak için değişiklik akışı çekme modelini kullanabilirsiniz. Değişiklik akışı işlemcisine benzer şekilde, değişikliklerin birden çok değişiklik akışı tüketicisinde işlenmesini paralelleştirmek için değişiklik akışı çekme modelini kullanabilirsiniz.
Değişiklik akışı işlemcisi ile karşılaştırma
Birçok senaryo, değişiklik akışı işlemcisini veya değişiklik akışı çekme modelini kullanarak değişiklik akışını işleyebilir. Çekme modelinin devamlılık belirteçleri ve değişiklik akışı işlemcisinin kira kapsayıcısı, hem son işlenen öğe için yer işaretleri hem de değişiklik akışındaki öğeler grubu olarak çalışır.
Ancak, devamlılık belirteçlerini kiraya dönüştüremezsiniz veya bunun tersi de geçerlidir.
Not
Çoğu durumda, değişiklik akışından okumanız gerektiğinde, en basit seçenek değişiklik akışı işlemcisini kullanmaktır.
Çekme modelini şu senaryolarda kullanmayı düşünmelisiniz:
- Belirli bir bölüm anahtarındaki değişiklikleri okumak için.
- İstemcinizin işlenmek üzere değişiklik alma hızını denetlemek için.
- Değişiklik akışındaki mevcut verilerin tek seferlik okunmasını gerçekleştirmek için (örneğin, veri geçişi yapmak için).
Değişiklik akışı işlemcisi ile değişiklik akışı çekme modeli arasındaki bazı önemli farklar şunlardır:
Özellik | Değişiklik akışı işlemcisi | Değişiklik akışı çekme modeli |
---|---|---|
Değişiklik akışını işlemedeki geçerli noktayı izleme | Kira (Azure Cosmos DB kapsayıcısında depolanır) | Devamlılık belirteci (bellekte depolanır veya el ile kalıcı hale gelir) |
Geçmiş değişiklikleri yeniden yürütme olanağı | Evet, gönderme modeliyle | Evet, çekme modeliyle |
Gelecekteki değişiklikler için yoklama | Kullanıcı tarafından belirtilen WithPollInterval değere göre değişiklikleri otomatik olarak denetler |
El ile |
Yeni değişiklik yapılmadığı durumlarda davranış | değerini WithPollInterval otomatik olarak bekleyin ve ardından yeniden denetleyin |
Durumu denetlemeli ve el ile yeniden denetlemelidir |
Kapsayıcının tamamından değişiklikleri işleme | Evet ve aynı kapsayıcıdan kullanan birden çok iş parçacığı ve makine arasında otomatik olarak paralelleştirilmiştir | Evet ve kullanarak el ile paralelleştirilmiş FeedRange |
Değişiklikleri yalnızca tek bir bölüm anahtarından işleme | Desteklenmez | Yes |
Not
Çekme modelini kullanırken, değişiklik akışı işlemcisini kullanarak okumanın aksine, yeni değişiklik olmayan durumları açıkça işlemeniz gerekir.
Çekme modeliyle çalışma
Çekme modelini kullanarak değişiklik akışını işlemek için bir örneği FeedIterator
oluşturun. İlk oluşturduğunuzdaFeedIterator
, hem değişiklikleri okumak için başlangıç konumundan hem de için FeedRange
kullanmak istediğiniz değerden oluşan gerekli ChangeFeedStartFrom
bir değer belirtmeniz gerekir. FeedRange
bölüm anahtarı değerleri aralığıdır ve değişiklik akışından okunabilecek öğeleri belirli FeedIterator
bir kullanarak belirtir. Değişiklikleri işlemek istediğiniz mod için de gerekli ChangeFeedMode
bir değer belirtmeniz gerekir: en son sürüm veya tüm sürümler ve silmeler. ChangeFeedMode.LatestVersion
Değişiklik akışını okumak için hangi modu kullanmak istediğinizi belirtmek için veya ChangeFeedMode.AllVersionsAndDeletes
kullanın. Tüm sürümleri ve silme modunu kullandığınızda, belirli bir devamlılık belirtecinden veya değerinden Now()
bir değişiklik akışı başlangıcı seçmeniz gerekir.
İsteğe bağlı olarak bir PageSizeHint
ayarlamak için belirtebilirsinizChangeFeedRequestOptions
. Ayarlandığında, bu özellik sayfa başına alınan en fazla öğe sayısını ayarlar. İzlenen koleksiyondaki işlemler saklı yordamlar aracılığıyla gerçekleştiriliyorsa, değişiklik akışındaki öğeler okunurken işlem kapsamı korunur. Sonuç olarak, alınan öğelerin sayısı belirtilen değerden yüksek olabilir, böylece aynı işlem tarafından değiştirilen öğeler tek bir atomik toplu işin parçası olarak döndürülür.
Burada, varlık nesnelerini döndüren en son sürüm modunda nasıl edinildiğini FeedIterator
gösteren bir örnek verilmiştir. Bu örnekte bir User
nesne verilmiştir:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
İpucu
sürümünden 3.34.0
önce, ayarıyla ChangeFeedMode.Incremental
en son sürüm modu kullanılabilir. LatestVersion
Hem hem de Incremental
değişiklik akışının en son sürüm moduna bakın ve her iki modu kullanan uygulamalar da aynı davranışı görür.
Tüm sürümler ve silmeler modu önizleme aşamasındadır ve önizleme .NET SDK sürümleri >= 3.32.0-preview
ile kullanılabilir. Aşağıda, dinamik nesneler döndüren tüm sürümler ve silmeler modunda elde etme FeedIterator
örneği verilmiştir:
FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Not
En son sürüm modunda, değiştirilen öğeyi temsil eden nesneleri ve bazı ek meta verileri alırsınız. Tüm sürümler ve silmeler modu farklı bir veri modeli döndürür. Daha fazla bilgi için bkz. Yanıt nesnesini ayrıştırma.
Akışlar aracılığıyla değişiklik akışını kullanma
FeedIterator
her iki değişiklik akışı modu için iki seçenek vardır. Varlık nesnelerini döndüren örneklere ek olarak, yanıtı destekle Stream
de alabilirsiniz. Akışlar, verileri önce seri durumdan çıkarmadan okumanıza olanak sağlar, bu nedenle istemci kaynaklarına kaydedersiniz.
Aşağıda, döndüren en son sürüm modunda nasıl edinildiğini FeedIterator
gösteren bir örnek verilmişti Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Kapsayıcının tamamı için değişiklikleri kullanma
parametresini FeedIterator
sağlamazsanızFeedRange
, kapsayıcının değişiklik akışının tamamını kendi hızınızda işleyebilirsiniz. Aşağıda, en son sürüm modunu kullanarak geçerli zamanda başlayarak tüm değişiklikleri okumaya başlayan bir örnek verilmişti:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan, değeri HasMoreResults
her zaman true
olur. Değişiklik akışını okumaya çalıştığınızda ve kullanılabilir yeni bir değişiklik olmadığında, durum bilgisi olan NotModified
bir yanıt alırsınız. Yukarıdaki örnekte, değişiklikler yeniden denetlenmeden önce beş saniye beklenerek işlenir.
Bölüm anahtarı için değişiklikleri kullanma
Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz. Belirli bir bölüm anahtarı için alabilir FeedIterator
ve değişiklikleri kapsayıcının tamamında yaptığınız gibi işleyebilirsiniz.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Paralelleştirme için FeedRange kullanma
Değişiklik akışı işlemcisinde iş otomatik olarak birden çok tüketiciye yayılır. Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz FeedRange
. A FeedRange
, bölüm anahtarı değerleri aralığını temsil eder.
Kapsayıcınız için aralıkların listesinin nasıl alındığını gösteren bir örnek aşağıda verilmiştir:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Kapsayıcınız için değerlerin FeedRange
listesini aldığınızda, fiziksel bölüm başına bir tane FeedRange
alırsınız.
kullanarak FeedRange
, değişiklik akışının birden çok makine veya iş parçacığı arasında işlenmesini paralelleştirmek için bir FeedIterator
oluşturabilirsiniz. Kapsayıcının tamamı için veya tek bir bölüm anahtarının nasıl alındığını FeedIterator
gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işleyebilen birden çok FeedIterator elde etmek için FeedRanges kullanabilirsiniz.
FeedRanges kullanmak istediğiniz durumlarda, FeedRanges'i alan ve bunları bu makinelere dağıtan bir düzenleyici işlemine sahip olmanız gerekir. Bu dağıtım şöyle olabilir:
- Bu dize değerini kullanma
FeedRange.ToJsonString
ve dağıtma. Tüketiciler bu değeri ileFeedRange.FromJsonString
kullanabilir. - Dağıtım devam ediyorsa, nesne başvuruyu
FeedRange
geçirin.
Aşağıda, paralel olarak okunan iki varsayımsal ayrı makine kullanılarak kapsayıcının değişiklik akışının başından itibaren nasıl okunduğunu gösteren bir örnek verilmiştir:
Makine 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Makine 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Devam belirteçlerini kaydetme
Devamlılık belirtecini alarak konumunuzu FeedIterator
kaydedebilirsiniz. Devamlılık belirteci, FeedIterator'ınızın son işlenen değişikliklerini izleyen ve daha sonra bu noktada sürdürülmesini sağlayan bir dize değeridir FeedIterator
. Belirtilirse devamlılık belirteci başlangıç zamanından önceliklidir ve başlangıç değerlerinden başlar. Aşağıdaki kod, kapsayıcı oluşturma işleminden bu yana değişiklik akışını okur. Daha fazla değişiklik sağlandıktan sonra, değişiklik akışı tüketiminin daha sonra sürdürülebilmesi için bir devamlılık belirteci kalıcı hale gelir.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
En son sürüm modunu kullanırken, Azure Cosmos DB kapsayıcısı FeedIterator
hala mevcut olduğu sürece devamlılık belirtecinin süresi hiçbir zaman dolmaz. Tüm sürümleri ve silme modunu kullanırken, FeedIterator
sürekli yedeklemeler için bekletme penceresinde değişiklikler olduğu sürece devamlılık belirteci geçerli olur.