Il modello di pull del feed di modifiche consente di utilizzare il feed di modifiche di Azure Cosmos DB in modo personalizzato. Analogamente al processore di feed di modifiche, è possibile usare il modello di pull del feed delle modifiche per parallelizzare l'elaborazione delle modifiche tra più consumatori del feed delle modifiche.
Tuttavia, non è possibile convertire i token di continuazione in un contenitore lease o viceversa.
Di seguito sono riportate alcune differenze fondamentali tra il modello di pull del feed di modifiche e il processore dei feed di modifiche.
Per elaborare il feed di modifiche usando il modello di pull, creare un'istanza di FeedIterator. Quando si crea inizialmente FeedIterator, è necessario specificare un valore obbligatorio ChangeFeedStartFrom, costituito sia dalla posizione iniziale per la lettura delle modifiche che dal valore da utilizzare per FeedRange.
FeedRange è un intervallo di valori di chiave di partizione e specifica gli elementi che possono essere letti dal feed di modifiche usando tale FeedIterator specifico. È inoltre necessario specificare un valore obbligatorio ChangeFeedMode per la modalità in cui si desidera elaborare le modifiche: versione più recente o tutte le versioni ed eliminazioni. Usare ChangeFeedMode.LatestVersion o ChangeFeedMode.AllVersionsAndDeletes per indicare la modalità che si desidera usare per leggere il feed di modifiche. Quando si usa la modalità tutte le versioni ed eliminazioni, è necessario selezionare un avvio del feed di modifiche dal valore di Now() o da un token di continuazione specifico.
È possibile specificare facoltativamente ChangeFeedRequestOptions per impostare un PageSizeHint. Se impostata, questa proprietà imposta il numero massimo di elementi ricevuti per pagina. Se le operazioni nella raccolta monitorata vengono eseguite tramite procedure memorizzate, l'ambito della transazione viene mantenuto durante la lettura degli elementi dal feed delle modifiche. Di conseguenza, il numero di elementi ricevuti potrebbe essere superiore al valore specificato in modo che gli elementi modificati dalla stessa transazione vengano restituiti come parte di un batch atomico.
Ecco un esempio che mostra come ottenere un oggetto FeedIterator in modalità versione più recente che restituisce oggetti di entità, in questo caso un oggetto User:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Suggerimento
Per le versioni precedenti a 3.34.0, è possibile usare la modalità versione più recente impostando ChangeFeedMode.Incremental. Sia Incremental che LatestVersion fanno riferimento alla modalità di versione più recente del feed di modifiche e alle applicazioni che usano entrambe le modalità vedono lo stesso comportamento.
La modalità tutte le versioni ed eliminazioni è in anteprima e può essere utilizzata con le versioni di .NET SDK di anteprima >= 3.32.0-preview. Di seguito è riportato un esempio per ottenere FeedIterator nella modalità tutte le versioni ed eliminazioni che restituisce oggetti User:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Usare il feed di modifiche tramite flussi
FeedIterator per entrambe le modalità del feed di modifiche sono disponibili due opzioni. Oltre agli esempi che restituiscono oggetti entità, è anche possibile ottenere la risposta con il supporto Stream. Stream consente di leggere i dati senza prima deserializzarli e quindi di risparmiare risorse del client.
Ecco un esempio che mostra come ottenere un oggetto FeedIterator in modalità versione più recente che restituisce Stream:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Usare le modifiche per un intero contenitore
Se non si fornisce un parametro FeedRange a FeedIterator, è possibile elaborare il feed di modifiche di un intero contenitore in modo personalizzato. L'esempio riportato di seguito inizia a leggere tutte le modifiche a partire dall'ora corrente, usando la modalità versione più recente:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Poiché il feed di modifiche è effettivamente un elenco infinito di elementi che includono tutte le scritture e gli aggiornamenti futuri, il valore di HasMoreResults è sempre true. Quando si tenta di leggere il feed di modifiche e non sono disponibili nuove modifiche, si riceve una risposta con stato NotModified. Questa operazione è diversa rispetto alla ricezione di una risposta che indica nessuna modifica e uno stato OK. Potresti ricevere risposte vuote dal flusso di modifiche, anche se sono disponibili altre modifiche, e si dovrebbe continuare a eseguire il polling fino a ricevere NotModified. Nell'esempio precedente, NotModified viene gestito attendendo cinque secondi prima di controllare di nuovo le modifiche.
Usare le modifiche per una chiave di partizione
In alcuni casi, è possibile elaborare solo le modifiche per una chiave di partizione specifica. È possibile ottenere un oggetto FeedIterator per una chiave di partizione specifica ed elaborare le modifiche come si farebbe per un intero contenitore.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Usare FeedRange per la parallelizzazione
Nel processore del feed di modifiche il lavoro viene distribuito automaticamente tra più consumatori. Nel modello di pull del feed di modifiche è possibile usare FeedRange per impostare l'elaborazione parallela del feed di modifiche. Un elemento FeedRange rappresenta un intervallo di valori di chiave di partizione.
Ecco un esempio che illustra come ottenere un elenco di intervalli per il contenitore:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Quando si ottiene un elenco di valori FeedRange per il contenitore, si otterrà una FeedRange per partizione fisica.
Con FeedRange è quindi possibile creare un oggetto FeedIterator per parallelizzare il feed di modifiche tra più computer o thread. A differenza dell'esempio precedente in cui è stato illustrato come ottenere un oggetto FeedIterator per l'intero contenitore o una singola chiave di partizione, è possibile usare FeedRanges per ottenere più oggetti FeedIterator, che possono elaborare il feed di modifiche in parallelo.
Nel caso in cui si intenda usare FeedRange, è necessario disporre di un processo dell'agente di orchestrazione che ottenga FeedRange e li distribuisca in tali computer. Questa distribuzione potrebbe essere:
- Uso di
FeedRange.ToJsonString e distribuzione di questo valore stringa. I consumer possono usare questo valore con FeedRange.FromJsonString.
- Se la distribuzione è in corso, passaggio del riferimento all'oggetto
FeedRange.
Ecco un esempio che illustra come leggere dall'inizio del feed di modifiche del contenitore usando due ipotetici computer distinti che eseguono la lettura in parallelo:
Macchina 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Computer 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Salvare i token di continuazione
È possibile salvare la posizione del tuo FeedIterator ottenendo il token di continuazione. Un token di continuazione è un valore stringa che tiene traccia delle ultime modifiche elaborate di FeedIterator e che consente a FeedIterator di riprendere dallo stesso punto in un secondo momento. Il token di continuazione, se specificato, avrà la priorità sull'ora di inizio e su inizia dai valori iniziali. Il codice seguente legge tramite il feed di modifiche dopo la creazione del contenitore. Quando non sono più disponibili modifiche, viene salvato in modo permanente un token di continuazione che consentirà di riprendere il consumo del feed di modifiche in un secondo momento.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Quando si usa la modalità versione più recente, il token di continuazione FeedIterator non scade finché è presente il contenitore Azure Cosmos DB. Quando si usa tutte le versioni ed elimina la modalità, il token di continuazione FeedIterator è valido purché le modifiche siano state apportate all'interno della finestra di conservazione per i backup continui.
Per elaborare il feed di modifiche usando il modello di pull, creare un'istanza di Iterator<FeedResponse<JsonNode>> responseIterator. Quando si crea CosmosChangeFeedRequestOptions, è necessario specificare da dove iniziare a leggere il feed di modifiche e passare il parametro FeedRange che si desidera utilizzare. L'oggetto FeedRange è un intervallo di valori di chiave di partizione che specifica gli elementi che possono essere letti dal feed di modifiche.
Se si desidera leggere il feed di modifiche in modalità tutte le versioni ed eliminazioni, è necessario specificare anche allVersionsAndDeletes() quando si crea CosmosChangeFeedRequestOptions. Tutte le versioni ed elimina la modalità non supporta l'elaborazione del feed di modifiche dall'inizio o da un momento specifico. È necessario elaborare le modifiche da questo momento o da un token di continuazione. Tutte le versioni ed elimina la modalità è disponibile in anteprima ed nella versione di Java SDK >= 4.42.0.
Usare le modifiche per un intero contenitore
Se si specifica FeedRange.forFullRange(), è possibile elaborare il feed di modifiche per un intero contenitore in modo personalizzato. Facoltativamente, è possibile specificare un valore in byPage(). Se impostata, questa proprietà imposta il numero massimo di elementi ricevuti per pagina.
Ecco un esempio su come ottenere un valore responseIterator in modalità versione più recente:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Di seguito è riportato un esempio di come ottenere un valore responseIterator in modalità tutte le versioni ed eliminazioni:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
È quindi possibile eseguire l'iterazione dei risultati. Poiché il feed di modifiche è effettivamente un elenco infinito di elementi che includono tutte le scritture e gli aggiornamenti futuri, il valore di responseIterator.hasNext() è sempre true. Ecco un esempio di modalità versione più recente, che legge tutte le modifiche, a partire dall'inizio. Ogni iterazione mantiene un token di continuazione dopo aver elaborato tutti gli eventi. Viene prelevato dall'ultimo punto elaborato nel feed di modifiche e viene gestito usando createForProcessingFromContinuation:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Usare le modifiche di una chiave di partizione
In alcuni casi, è possibile elaborare solo le modifiche per una chiave di partizione specifica. È possibile elaborare le modifiche per una chiave di partizione specifica come si farebbe per un intero contenitore. Ecco un esempio che usa la modalità versione più recente:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Usare FeedRange per la parallelizzazione
Nel processore del feed di modifiche il lavoro viene distribuito automaticamente tra più consumatori. Nel modello di pull del feed di modifiche è possibile usare FeedRange per impostare l'elaborazione parallela del feed di modifiche. Un elemento FeedRange rappresenta un intervallo di valori di chiave di partizione.
Ecco un esempio che utilizza la modalità della versione più recente per illustrare come ottenere un elenco di intervalli del tuo contenitore.
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
Quando si ottiene un elenco di elementi FeedRange per il contenitore, si otterrà un elemento FeedRange per ogni partizione fisica.
Con FeedRange è quindi possibile parallelizzare l'elaborazione il feed di modifiche tra più computer o thread. A differenza dell'esempio precedente in cui è stato illustrato come elaborare le modifiche per l'intero contenitore o per una singola chiave di partizione, è possibile usare FeedRanges elaborare il feed di modifiche in parallelo.
Nel caso in cui si intenda usare FeedRange, è necessario disporre di un processo dell'agente di orchestrazione che ottenga FeedRange e li distribuisca in tali computer. Questa distribuzione potrebbe essere:
- Uso di
FeedRange.toString() e distribuzione di questo valore stringa.
- Se la distribuzione è in corso, passaggio del riferimento all'oggetto
FeedRange.
Ecco un esempio che usa la modalità versione più recente. Esso illustra come leggere dall'inizio del feed di modifiche del contenitore usando due ipotetici computer diversi che eseguono la lettura in parallelo:
Macchina 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Computer 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Per elaborare il feed di modifiche usando il modello pull, creare un'istanza di responseIterator con il tipo ItemPaged[Dict[str, Any]].
Quando si chiama l'API del feed di modifiche, è necessario specificare da dove iniziare a leggere il feed di modifiche e passare il feed_range parametro da usare.
L'oggetto feed_range è un intervallo di valori di chiave di partizione che specifica gli elementi che possono essere letti dal feed di modifiche.
È anche possibile specificare mode il parametro per la modalità feed di modifiche in cui si desidera elaborare le modifiche: LatestVersion o AllVersionsAndDeletes. Il valore predefinito è LatestVersion.
Usare LatestVersion o AllVersionsAndDeletes per indicare la modalità che si desidera usare per leggere il feed di modifiche.
Quando si usa la modalità AllVersionsAndDeletes, è possibile avviare l'elaborazione delle modifiche da ora o da un token continuation.
La lettura del feed di modifiche dall'inizio o da un momento specifico utilizzando start_time non è supportata.
Usare le modifiche per un intero contenitore
Se non si specifica un feed_range parametro, è possibile elaborare il feed di modifiche di un intero contenitore in base al proprio ritmo.
Annotazioni
Tutti i frammenti di codice seguenti sono tratti da esempi in GitHub.
Ecco un esempio di come ottenere responseIterator in modalità LatestVersion da Beginning. Poiché LatestVersion è una modalità predefinita, mode non è necessario passare un parametro:
responseIterator = container.query_items_change_feed(start_time="Beginning")
Ecco un esempio di come ottenere responseIterator in modalità AllVersionsAndDeletes da Now. Poiché Now è un valore predefinito del parametro start_time, non è necessario passarlo.
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
È quindi possibile eseguire l'iterazione dei risultati. Poiché il feed di modifiche è in effetti un elenco infinito di elementi che include tutte le scritture e gli aggiornamenti futuri, responseIterator può eseguire un ciclo infinito.
Ecco un esempio di modalità versione più recente, che legge tutte le modifiche, a partire dall'inizio.
Ogni iterazione stampa feed di modifiche per i documenti.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Usare le modifiche di una chiave di partizione
In alcuni casi, è possibile elaborare solo le modifiche per una chiave di partizione specifica.
È possibile elaborare le modifiche allo stesso modo possibile per un intero contenitore con il partition_key parametro .
Ecco un esempio che usa la LatestVersion modalità:
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Usare FeedRange per la parallelizzazione
Nel modello di pull del feed di modifiche è possibile usare feed_range per impostare l'elaborazione parallela del feed di modifiche.
Un elemento feed_range rappresenta un intervallo di valori di chiave di partizione.
Ecco un esempio che illustra come ottenere un elenco di intervalli per il contenitore.
list il comando converte l'iteratore in un elenco:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
Quando si ottiene un elenco di valori feed_range per il contenitore, si otterrà una feed_range per partizione fisica.
Con feed_range è quindi possibile creare un iteratore per parallelizzare l'elaborazione del feed di modifiche tra più computer o thread.
A differenza dell'esempio precedente che ha illustrato come ottenere un responseIterator per l'intero contenitore o una singola chiave di partizione, è possibile usare feed_range per ottenere più iteratori, che possono elaborare il feed di modifiche in parallelo.
Ecco un esempio che illustra come leggere dall'inizio del feed di modifiche del contenitore usando due ipotetici computer distinti che eseguono la lettura in parallelo:
Macchina 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
Computer 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Salvare i token di continuazione
È possibile salvare la posizione dell'iteratore ottenendo un token di continuazione.
Un token di continuazione è un valore stringa che tiene traccia delle responseIterator ultime modifiche elaborate e consente all'iteratore di riprendere a questo punto successivamente.
Il token di continuazione, se specificato, avrà la priorità sull'ora di inizio e su inizia dai valori iniziali.
Il codice seguente legge tramite il feed di modifiche dopo la creazione del contenitore.
Quando non sono più disponibili modifiche, viene salvato in modo permanente un token di continuazione che consentirà di riprendere il consumo del feed di modifiche in un secondo momento.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Annotazioni
Poiché il token continuation contiene il parametro mode usato in precedenza, se è stato usato continuation, il parametro mode viene ignorato e viene utilizzato invece il mode dal token continuation.
Ecco un esempio che mostra come leggere dal feed di modifiche del contenitore usando un continuation token:
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Per elaborare il feed di modifiche usando il modello di pull, creare un'istanza di ChangeFeedPullModelIterator. Quando si crea ChangeFeedPullModelIteratorinizialmente , è necessario specificare un valore obbligatorio changeFeedStartFrom all'interno di ChangeFeedIteratorOptions, costituito sia dalla posizione iniziale per la lettura delle modifiche che dalla risorsa (chiave di partizione o FeedRange) per cui devono essere recuperate le modifiche.
Annotazioni
Se non viene specificato alcun changeFeedStartFrom valore, il feed di modifiche viene recuperato per un intero contenitore da Now().
Attualmente, solo la versione più recente è supportata da JavaScript SDK ed è selezionata per impostazione predefinita.
Facoltativamente, è possibile usare maxItemCount in ChangeFeedIteratorOptions per impostare il numero massimo di elementi ricevuti per pagina.
Ecco un esempio di come ottenere un iteratore in modalità versione più recente che restituisce oggetti di entità:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Usare le modifiche per un intero contenitore
Se non fornisci un parametro FeedRange o PartitionKey all'interno di ChangeFeedStartFrom, puoi elaborare il feed di modifiche di un intero contenitore al tuo ritmo. L'esempio riportato di seguito inizia a leggere tutte le modifiche a partire dall'ora corrente:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Poiché il feed di modifiche è effettivamente un elenco infinito di elementi che includono tutte le scritture e gli aggiornamenti futuri, il valore di hasMoreResults è sempre true. Quando si tenta di leggere il feed di modifiche e non sono disponibili nuove modifiche, si riceve una risposta con stato NotModified. Questa operazione è diversa rispetto alla ricezione di una risposta che indica nessuna modifica e uno stato OK. Potresti ricevere risposte vuote dal flusso di modifiche, anche se sono disponibili altre modifiche, e si dovrebbe continuare a eseguire il polling fino a ricevere NotModified. Nell'esempio precedente, NotModified viene gestito attendendo cinque secondi prima di controllare di nuovo le modifiche.
Usare le modifiche per una chiave di partizione
In alcuni casi, è possibile elaborare solo le modifiche per una chiave di partizione specifica. È possibile ottenere un iteratore per una chiave di partizione specifica ed elaborare le modifiche come si farebbe per un intero contenitore.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Usare FeedRange per la parallelizzazione
Nel modello di pull del feed di modifiche è possibile usare FeedRange per impostare l'elaborazione parallela del feed di modifiche. Un elemento FeedRange rappresenta un intervallo di valori di chiave di partizione.
Ecco un esempio che illustra come ottenere un elenco di intervalli per il contenitore:
const ranges = await container.getFeedRanges();
Quando si ottiene un elenco di valori FeedRange per il contenitore, si otterrà una FeedRange per partizione fisica.
Con FeedRange è quindi possibile creare un iteratore per parallelizzare l'elaborazione del feed di modifiche tra più computer o thread. A differenza dell'esempio precedente che ha illustrato come ottenere un iteratore del feed di modifiche per l'intero contenitore o una singola chiave di partizione, è possibile usare FeedRanges per ottenere più iteratori, che possono elaborare il feed di modifiche in parallelo.
Ecco un esempio che illustra come leggere dall'inizio del feed di modifiche del contenitore usando due ipotetici computer distinti che eseguono la lettura in parallelo:
Macchina 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Computer 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Salvare i token di continuazione
È possibile salvare la posizione dell'iteratore ottenendo un token di continuazione. Un token di continuazione è un valore stringa che tiene traccia dell'ultima modifica elaborata dell'iteratore del feed di modifiche e consente all'iteratore di riprendere in questo momento in un secondo momento. Il token di continuazione, se specificato, avrà la priorità sull'ora di inizio e su inizia dai valori iniziali. Il codice seguente legge tramite il feed di modifiche dopo la creazione del contenitore. Quando non sono più disponibili modifiche, viene salvato in modo permanente un token di continuazione che consentirà di riprendere il consumo del feed di modifiche in un secondo momento.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Il token di continuazione non scade mai finché il contenitore Azure Cosmos DB esiste ancora.
Usare AsyncIterator
È possibile usare JavaScript AsyncIterator per recuperare il feed di modifiche. Ecco un esempio di AsyncIterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}