Değişiklik akışı çekme modelini kullanarak Azure Cosmos DB değişiklik akışını kendi hızınızda kullanabilirsiniz. Değişiklik akışı işlemcisine benzer şekilde, değişikliklerin birden çok değişiklik akışı tüketicisi arasında işlenmesini paralelleştirmek için değişiklik akışı çekme modelini kullanabilirsiniz.
Ancak, devamlılık belirteçlerini kiralamaya dönüştüremezsiniz veya bunun tersi de geçerlidir.
Değişiklik akışı işlemcisi ile değişiklik akışı çekme modeli arasındaki bazı önemli farklar şunlardır:
Çekme modelini kullanarak değişiklik akışını işlemek için bir örneği FeedIteratoroluşturun. öğesini ilk oluşturduğunuzdaFeedIterator, hem değişiklikleri okumak için başlangıç konumundan hem de için ChangeFeedStartFromkullanmak istediğiniz değerden oluşan gerekli FeedRange bir değer belirtmeniz gerekir.
FeedRange bir bölüm anahtarı değerleri aralığıdır ve değişiklik akışından okunabilecek öğeleri belirlemek için belirli bir FeedIterator kullanır. Değişiklikleri işlemek istediğiniz mod için de gerekli ChangeFeedMode bir değer belirtmeniz gerekir: en son sürüm veya tüm sürümler ve silmeler. Ya ChangeFeedMode.LatestVersion ya da ChangeFeedMode.AllVersionsAndDeletes öğesini, değişiklik akışını okumak için hangi modu kullanmak istediğinizi belirtmek amacıyla kullanın. Tüm sürümleri ve silme modunu kullandığınızda, ya Now() değerini ya da belirli bir devamlılık belirteci değerini başlangıç noktası olarak seçmelisiniz.
İsteğe bağlı olarak bir ChangeFeedRequestOptions belirtebilir ve bir PageSizeHint ayarlayabilirsiniz. Bu özellik ayarlandığında, sayfa başına alınan en fazla öğe sayısını ayarlar. İzlenen koleksiyondaki işlemler saklı yordamlar aracılığıyla gerçekleştiriliyorsa, değişiklik akışındaki öğeler okunurken işlem kapsamı korunur. Sonuç olarak, alınan öğe sayısı belirtilen değerden yüksek olabilir, böylece aynı işlem tarafından değiştirilen öğeler tek bir atomik toplu işlemin parçası olarak döndürülür.
İşte varlık nesnelerini döndüren en son sürüm modunda bir FeedIterator elde etmenin bir örneği: Bu durumda, bir User nesnesi elde edilir.
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tip
3.34.0 sürümünden önceki sürümler için, ChangeFeedMode.Incremental ayarıyla en son sürüm modu kullanılabilir.
Incremental ve LatestVersion, değişiklik akışının en son sürüm modunu işaret eder ve bu modlardan herhangi birini kullanan uygulamalar aynı davranışı görürler.
Tüm sürümler ve silmeler modu önizleme aşamasındadır ve önizleme .NET SDK sürümleri >= 3.32.0-previewile kullanılabilir. Aşağıda, tüm sürümlerde ve silme modlarında FeedIterator elde etme ve User nesneleri döndüren bir örnek verilmiştir:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Note
En son sürüm modunda, değiştirilen öğeyi temsil eden nesneleri ve bazı ek meta verileri alırsınız. Tüm sürümler ve silmeler modu farklı bir veri modeli döndürür.
En son sürüm modu veya tüm sürümler ve silmeler modu için tam örneği alabilirsiniz.
Akışlar aracılığıyla değişiklik akışını kullanma
FeedIterator her iki değişiklik akışı modu için iki seçenek vardır. Varlık nesnelerini döndüren örneklere ek olarak, Stream desteğiyle yanıtı da alabilirsiniz. Akışlar, verileri ilk seri durumdan çıkarmadan okumanıza olanak tanıyarak istemci kaynaklarından tasarruf etmenizi sağlar.
Aşağıda, döndüren en son sürüm modunda nasıl edinildiğini FeedIterator gösteren bir örnek verilmişti Stream:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Kapsayıcının tamamı için değişiklikleri kullanma
parametresini FeedRangesağlamazsanızFeedIterator, kapsayıcının değişiklik akışının tamamını kendi hızınızda işleyebilirsiniz. Aşağıda, en son sürüm modunu kullanarak geçerli zamanda başlayarak tüm değişiklikleri okumaya başlayan bir örnek verilmişti:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan, değeri HasMoreResults her zaman trueolur. Değişiklik akışını okumaya çalıştığınızda ve kullanılabilir yeni bir değişiklik olmadığında, NotModified durum koduyla bir yanıt alırsınız. Bu, değişiklik ve OK durum içermeyen bir yanıt almaktan farklıdır. Daha fazla değişiklik mevcut iken boş değişiklik akışı yanıtları almanız mümkündür ve NotModified alınana kadar yoklamaya devam etmelisiniz. Yukarıdaki örnekte, NotModified değişiklikler yeniden denetlenmeden önce beş saniye beklenerek işlenir.
Bölüm anahtarı için değişiklikleri kullanma
Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz. Belirli bir bölüm anahtarı için FeedIterator alabilir ve değişiklikleri, kapsayıcının tamamında yaptığınız gibi aynı şekilde işleyebilirsiniz.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Paralelleştirme için FeedRange kullanma
Değişiklik akışı işlemcisinde, çalışma otomatik olarak birden çok tüketiciye yayılır. Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz FeedRange . A FeedRange , bölüm anahtarı değerleri aralığını temsil eder.
Kapsayıcınız için aralıkların listesinin nasıl alındığını gösteren bir örnek aşağıda verilmiştir:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Kapsayıcınız için değerlerin FeedRange listesini aldığınızda, her FeedRange için bir alırsınız.
FeedRange kullanarak, değişiklik akışının birden çok makine veya iş parçacığı arasında işlenmesini paralelleştirmek için bir FeedIterator oluşturabilirsiniz. Kapsayıcının tamamı için veya tek bir bölüm anahtarının nasıl alındığını FeedIterator gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işleyebilen birden çok FeedIterator elde etmek için FeedRanges kullanabilirsiniz.
FeedRanges kullanmak istediğiniz durumlarda, FeedRanges'i alan ve bunları bu makinelere dağıtan bir düzenleyici işlemine sahip olmanız gerekir. Bu dağıtım şöyle olabilir:
-
FeedRange.ToJsonString kullanarak bu dize değerini dağıtma. Tüketiciler bu değeri FeedRange.FromJsonString ile kullanabilir.
- Dağıtım devam ediyorsa,
FeedRange nesne başvurusunu geçirin.
Aşağıda, paralel olarak okuma yapan iki varsayımsal ayrı makine kullanılarak, kapsayıcının değişiklik akışının başından nasıl okunduğunu gösteren bir örnek verilmiştir.
1. Makine:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
2. Makine:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Devamlılık belirteçlerini kaydetme
Devam belirtecini alarak FeedIterator konumunu kaydedebilirsiniz. Devamlılık belirteci, FeedIterator'ınızın son işlenen değişikliklerini takip eden ve daha sonra FeedIterator bu noktadan devam edilmesini sağlayan bir dize değeridir. Devamlılık belirteci belirtilirse, başlangıç zamanından önceliklidir ve başlangıç değerlerinden başlar. Aşağıdaki kod, kapsayıcı oluşturma işleminden bu yana değişiklik akışını okur. Daha fazla değişiklik mevcut olmadığında, değişiklik akışı tüketiminin daha sonra sürdürülebilmesi için bir devam belirteci kalıcı olarak saklanır.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
En son sürüm modunu kullanırken, Azure Cosmos DB kapsayıcısı FeedIterator hala mevcut olduğu sürece devamlılık belirtecinin süresi hiçbir zaman dolmaz. Tüm sürümleri ve silme modu özelliğini kullanırken, FeedIterator sürekli yedeklemeler için saklama süresi içinde değişiklikler yapıldığı sürece devamlılık belirteci geçerli olur.
Çekme modelini kullanarak değişiklik akışını işlemek için bir örneği Iterator<FeedResponse<JsonNode>> responseIteratoroluşturun. oluştururken CosmosChangeFeedRequestOptions, değişiklik akışının nereden okunacağını ve kullanmak istediğiniz parametrenin FeedRange geçirileceğini belirtmeniz gerekir.
FeedRange, değişiklik akışından okunabilecek öğeleri belirten bir bölüm anahtarı değerleri aralığıdır.
Değişiklik akışını tüm sürümler ve silme modunda okumak istiyorsanız, oluştururken allVersionsAndDeletes()'i de belirtmeniz gerekir. Tüm sürümler ve silmeler modu, değişiklik akışının baştan veya belirli bir noktadan işlenmesini desteklemez. Değişiklikleri şu andan itibaren veya bir devamlılık belirtecinden işlemeniz gerekir. Tüm sürümler ve silmeler modu önizleme aşamasındadır ve Java SDK sürümü >= 4.42.0ile kullanılabilir.
Kapsayıcının tamamı için değişiklikleri kullanma
belirtirseniz FeedRange.forFullRange(), kapsayıcının tamamı için değişiklik akışını kendi hızınızda işleyebilirsiniz. İsteğe bağlı olarak içinde byPage()bir değer belirtebilirsiniz. Bu özellik ayarlandığında, sayfa başına alınan en fazla öğe sayısını ayarlar.
En son sürüm modunda bir responseIterator değer elde etme örneği aşağıda verilmişti:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Tüm sürümlerde ve silme modunda nasıl responseIterator edineceğinizi gösteren bir örnek aşağıda verilmiştir:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Ardından sonuçları yineleyebiliriz. Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan, değeri responseIterator.hasNext() her zaman trueolur. En son sürüm modunda, baştan başlayarak tüm değişiklikleri okuyan bir örnek aşağıda verilmiştir. Her yineleme, tüm olayları işledikten sonra bir devamlılık belirteci kalıcı hale gelir. Değişiklik akışındaki son işlenen noktadan devam eder ve createForProcessingFromContinuation kullanılarak işlenir:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Bölüm anahtarının değişikliklerini kullanma
Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz. Belirli bir bölüm anahtarındaki değişiklikleri, kapsayıcının tamamında yaptığınız gibi işleyebilirsiniz. İşte en son sürüm modunu kullanan bir örnek:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Paralelleştirme için FeedRange kullanma
Değişiklik akışı işlemcisinde, çalışma otomatik olarak birden çok tüketiciye yayılır. Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz FeedRange . A FeedRange , bölüm anahtarı değerleri aralığını temsil eder.
Kapsayıcınız için aralık listesinin nasıl alındığını gösteren en son sürüm modunu kullanan bir örnek aşağıda verilmiştir:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
Kapsayıcınız için FeedRanges listesini aldığınızda, her bir FeedRange için bir alırsınız.
kullanarak FeedRange, değişiklik akışını birden çok makine veya iş parçacığı arasında işlemeyi paralel hale getirebilirsiniz. Kapsayıcının tamamı veya tek bir bölüm anahtarı için değişiklikleri işlemeyi gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işlemek için FeedRanges'i kullanabilirsiniz.
FeedRanges kullanmak istediğiniz durumlarda, FeedRanges'i alan ve bunları bu makinelere dağıtan bir düzenleyici işlemine sahip olmanız gerekir. Bu dağıtım şöyle olabilir:
-
FeedRange.toString() kullanarak bu dize değerini dağıtma.
- Dağıtım devam ediyorsa,
FeedRange nesne başvurusunu geçirin.
İşte en son sürüm modunu kullanan bir örnek. Kapsayıcının değişiklik akışının başından itibaren nasıl okunacağını, iki ayrı makineyi paralel olarak kullanarak gösterir.
1. Makine:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
2. Makine:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Çekme modelini kullanarak değişiklik akışını işlemek için türüne ItemPaged[Dict[str, Any]]sahip bir responseIterator örneği oluşturun.
Değişiklik akışı API'sini çağırdığınızda, değişiklik akışını okumaya nereden başlayacağınızı belirtmeniz ve kullanmak istediğiniz parametreyi feed_range geçirmeniz gerekir.
feed_range, değişiklik akışından okunabilecek öğeleri belirten bir bölüm anahtarı değerleri aralığıdır.
Değişiklikleri işlemek istediğiniz değişiklik akışı modu için parametre de belirtebilirsiniz mode : LatestVersion veya AllVersionsAndDeletes. Varsayılan değer LatestVersion değeridir.
Ya LatestVersion ya da AllVersionsAndDeletes öğesini, değişiklik akışını okumak için hangi modu kullanmak istediğinizi belirtmek amacıyla kullanın.
AllVersionsAndDeletes modunu kullandığınızda, değişiklikleri işlemeye ya bundan sonra ya da bir continuation belirteçten başlayabilirsiniz.
Değişiklik akışının başlangıçtan veya belirli bir start_time noktadan okunması desteklenmez.
Kapsayıcının tamamı için değişiklikleri kullanma
Parametre sağlamazsanız feed_range , kapsayıcının değişiklik akışının tamamını kendi hızınızda işleyebilirsiniz.
İşte responseIterator üzerinden LatestVersion modunda Beginning elde etmenin bir örneği.
LatestVersion Varsayılan mod olduğundan parametrenin mode geçirilmesi gerekmez:
responseIterator = container.query_items_change_feed(start_time="Beginning")
İşte responseIterator'den AllVersionsAndDeletesNow modunda nasıl elde edileceğine dair bir örnek verilmiştir. Now parametresinin start_time varsayılan değeri olduğu için geçirilmesi gerekmez.
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Ardından sonuçları yineleyebiliriz. Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan responseIterator sonsuz döngüye girebilir.
En son sürüm modunda, baştan başlayarak tüm değişiklikleri okuyan bir örnek aşağıda verilmiştir.
Her yineleme, belgeler için değişiklik akışlarını yazdırır.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Bölüm anahtarının değişikliklerini kullanma
Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz.
Değişiklikleri, partition_key parametresiyle bir kapsayıcının tamamında olduğu gibi işleyebilirsiniz.
Modu kullanan LatestVersion bir örnek aşağıda verilmişti:
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Paralelleştirme için FeedRange kullanma
Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz feed_range .
A feed_range , bölüm anahtarı değerleri aralığını temsil eder.
Kapsayıcınız için aralıkların listesinin nasıl alındığını gösteren bir örnek aşağıda verilmiştir.
list komutu yineleyiciyi listeye dönüştürür:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
Kapsayıcınız için değerlerin feed_range listesini aldığınızda, her feed_range için bir alırsınız.
kullanarak feed_range, değişiklik akışının birden çok makine veya iş parçacığı arasında işlenmesini paralel hale getirmek için yineleyici oluşturabilirsiniz.
Kapsayıcının tamamı için veya tek bir bölüm anahtarının nasıl alındığını responseIterator gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işleyebilen birden çok yineleyici elde etmek için kullanabilirsiniz feed_range .
Aşağıda, paralel olarak okuma yapan iki varsayımsal ayrı makine kullanılarak, kapsayıcının değişiklik akışının başından nasıl okunduğunu gösteren bir örnek verilmiştir.
1. Makine:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
2. Makine:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Devamlılık belirteçlerini kaydetme
Devam belirtecini alarak yineleyicinizin konumunu kaydedebilirsiniz.
Devamlılık belirteci, son işlenen değişikliklerinizi responseIterator izleyen ve yineleyicinin daha sonra bu noktada sürdürülmesini sağlayan bir dize değeridir.
Devamlılık belirteci belirtilirse, başlangıç zamanından önceliklidir ve başlangıç değerlerinden başlar.
Aşağıdaki kod, kapsayıcı oluşturma işleminden bu yana değişiklik akışını okur.
Daha fazla değişiklik mevcut olmadığında, değişiklik akışı tüketiminin daha sonra sürdürülebilmesi için bir devam belirteci kalıcı olarak saklanır.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Note
continuation belirteci daha önce kullanılan mode parametreyi içerdiğinden, eğer continuation kullanıldıysa, mode parametresi yoksayılır ve bunun yerine mode belirtecinden continuation kullanılır.
İşte kapsayıcının değişiklik akışından bir belirteç kullanarak nasıl okunacağını gösteren bir örnek continuation:
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Çekme modelini kullanarak değişiklik akışını işlemek için bir örneği ChangeFeedPullModelIteratoroluşturun.
ChangeFeedPullModelIterator ilk oluşturulduğunda, changeFeedStartFrom içinde hem değişiklikleri okumak için başlangıç konumunu hem de değişikliklerin getirileceği kaynak (bölüm anahtarı veya FeedRange) için gerekli ChangeFeedIteratorOptions değerini belirtmeniz gerekir.
Note
Eğer hiçbir changeFeedStartFrom değeri belirtilmezse, değişiklik akışı Now() noktası itibarıyla kapsayıcının tamamı için getirilir.
Şu anda JavaScript SDK'sı tarafından yalnızca en son sürüm desteklenir ve varsayılan olarak seçilidir.
sayfa başına alınan en fazla öğe sayısını ayarlamak için isteğe bağlı olarak içinde maxItemCount kullanabilirsinizChangeFeedIteratorOptions.
Burada, varlık nesnelerini döndüren en son sürüm modunda yineleyicinin nasıl alındığını gösteren bir örnek verilmiştir:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Kapsayıcının tamamı için değişiklikleri kullanma
FeedRange içinde PartitionKey veya ChangeFeedStartFrom parametresi sağlamazsanız, bir kapsayıcının değişiklik akışının tamamını kendi hızınızda işleyebilirsiniz. Geçerli zamanda başlayarak tüm değişiklikleri okumaya başlayan bir örnek aşağıda verilmişti:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan, değeri hasMoreResults her zaman trueolur. Değişiklik akışını okumaya çalıştığınızda ve kullanılabilir yeni bir değişiklik olmadığında, NotModified durum koduyla bir yanıt alırsınız. Bu, değişiklik ve OK durum içermeyen bir yanıt almaktan farklıdır. Daha fazla değişiklik mevcut iken boş değişiklik akışı yanıtları almanız mümkündür ve NotModified alınana kadar yoklamaya devam etmelisiniz. Yukarıdaki örnekte, NotModified değişiklikler yeniden denetlenmeden önce beş saniye beklenerek işlenir.
Bölüm anahtarı için değişiklikleri kullanma
Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz. Belirli bir bölüm anahtarı için yineleyici alabilir ve değişiklikleri kapsayıcının tamamında olduğu gibi işleyebilirsiniz.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Paralelleştirme için FeedRange kullanma
Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz FeedRange . A FeedRange , bölüm anahtarı değerleri aralığını temsil eder.
Kapsayıcınız için aralıkların listesinin nasıl alındığını gösteren bir örnek aşağıda verilmiştir:
const ranges = await container.getFeedRanges();
Kapsayıcınız için değerlerin FeedRange listesini aldığınızda, her FeedRange için bir alırsınız.
kullanarak FeedRange, değişiklik akışının birden çok makine veya iş parçacığı arasında işlenmesini paralel hale getirmek için yineleyici oluşturabilirsiniz. Kapsayıcının tamamı veya tek bir bölüm anahtarı için değişiklik akışı yineleyicisinin nasıl alındığını gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işleyebilen birden çok yineleyici elde etmek için FeedRanges kullanabilirsiniz.
Aşağıda, paralel olarak okuma yapan iki varsayımsal ayrı makine kullanılarak, kapsayıcının değişiklik akışının başından nasıl okunduğunu gösteren bir örnek verilmiştir.
1. Makine:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
2. Makine:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Devamlılık belirteçlerini kaydetme
Devam belirteci alarak yineleyicinizin konumunu kaydedebilirsiniz. Devamlılık belirteci, değişiklik akışı yineleyicinizin son işlenen değişikliklerini izleyen ve yineleyicinin bu noktadan devam etmesine olanak tanıyan bir dize değeridir. Devamlılık belirteci belirtilirse, başlangıç zamanından önceliklidir ve başlangıç değerlerinden başlar. Aşağıdaki kod, kapsayıcı oluşturma işleminden bu yana değişiklik akışını okur. Daha fazla değişiklik mevcut olmadığında, değişiklik akışı tüketiminin daha sonra sürdürülebilmesi için bir devam belirteci kalıcı olarak saklanır.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Azure Cosmos DB kapsayıcısı hala mevcut olduğu sürece devamlılık belirtecinin süresi asla dolmaz.
AsyncIterator kullanma
Değişiklik akışını getirmek için JavaScript'i AsyncIterator kullanabilirsiniz. Aşağıda bir örneği verilmiştır AsyncIterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}