Condividi tramite


Modello di pull del feed di modifiche in Azure Cosmos DB

Il modello di pull del feed di modifiche consente di utilizzare il feed di modifiche di Azure Cosmos DB in modo personalizzato. Analogamente al processore di feed di modifiche, è possibile usare il modello di pull del feed delle modifiche per parallelizzare l'elaborazione delle modifiche tra più consumatori del feed delle modifiche.

Confronto con il processore dei feed di modifiche

In molti scenari è possibile elaborare il feed di modifiche usando sia il processore del feed di modifiche o il modello di pull del feed di modifiche. I token di continuazione del modello di pull e il contenitore lease del processore dei feed di modifiche funzionano entrambi come "segnalibri" per l'ultimo elemento o batch di elementi elaborati nel feed di modifiche.

Tuttavia, non è possibile convertire i token di continuazione in un contenitore lease o viceversa.

Annotazioni

Nella maggior parte dei casi, quando è necessario leggere dal feed di modifiche, l'opzione più semplice consiste nell'usare il processore del feed di modifiche.

È opportuno usare il modello di pull negli scenari seguenti:

  • Per leggere le modifiche da una chiave di partizione specifica
  • Per controllare il ritmo in cui il client riceve le modifiche per l'elaborazione
  • Per eseguire una sola lettura dei dati esistenti nel feed di modifiche( ad esempio, per eseguire una migrazione dei dati)

Di seguito sono riportate alcune differenze fondamentali tra il modello di pull del feed di modifiche e il processore dei feed di modifiche.

Caratteristica / Funzionalità Processore feed di modifiche Modello di pull del feed di modifiche
Tenere traccia del punto corrente nell'elaborazione del feed di modifiche Lease (archiviato in un contenitore Azure Cosmos DB) Token di continuazione (archiviato in memoria o reso persistente manualmente)
Possibilità di riprodurre modifiche precedenti Sì, con modello di push Sì, con modello di pull
Sondaggio per le modifiche future Verificare automaticamente la presenza di modifiche in base al parametro WithPollInterval specificato dall'utente Manuale
Comportamento quando non sono presenti nuove modifiche Attendere automaticamente il valore per WithPollInterval quindi ricontrollare È necessario controllare lo stato e ricontrollare manualmente
Elaborare le modifiche da un intero contenitore Sì, è parallelizzato automaticamente tra più thread e computer che utilizzano lo stesso contenitore Sì, è parallelizzato manualmente tramite FeedRange
Elaborare le modifiche da una singola chiave di partizione Non supportato Yes

Annotazioni

Quando si usa il modello di pull, a differenza della lettura tramite il processore del feed di modifiche, è necessario gestire in modo esplicito i casi in cui non sono presenti nuove modifiche. Ciò è indicato da un HTTP 304 NotModified. Una risposta al feed di modifiche che restituisce 0 documenti con un codice di stato HTTP 200 OK non significa necessariamente che sia stata raggiunta la fine del feed di modifiche e si dovrebbe continuare a eseguire il polling.

Usare il modello di pull

Per elaborare il feed di modifiche usando il modello di pull, creare un'istanza di FeedIterator. Quando si crea inizialmente FeedIterator, è necessario specificare un valore obbligatorio ChangeFeedStartFrom, costituito sia dalla posizione iniziale per la lettura delle modifiche che dal valore da utilizzare per FeedRange. FeedRange è un intervallo di valori di chiave di partizione e specifica gli elementi che possono essere letti dal feed di modifiche usando tale FeedIterator specifico. È inoltre necessario specificare un valore obbligatorio ChangeFeedMode per la modalità in cui si desidera elaborare le modifiche: versione più recente o tutte le versioni ed eliminazioni. Usare ChangeFeedMode.LatestVersion o ChangeFeedMode.AllVersionsAndDeletes per indicare la modalità che si desidera usare per leggere il feed di modifiche. Quando si usa la modalità tutte le versioni ed eliminazioni, è necessario selezionare un avvio del feed di modifiche dal valore di Now() o da un token di continuazione specifico.

È possibile specificare facoltativamente ChangeFeedRequestOptions per impostare un PageSizeHint. Se impostata, questa proprietà imposta il numero massimo di elementi ricevuti per pagina. Se le operazioni nella raccolta monitorata vengono eseguite tramite procedure memorizzate, l'ambito della transazione viene mantenuto durante la lettura degli elementi dal feed delle modifiche. Di conseguenza, il numero di elementi ricevuti potrebbe essere superiore al valore specificato in modo che gli elementi modificati dalla stessa transazione vengano restituiti come parte di un batch atomico.

Ecco un esempio che mostra come ottenere un oggetto FeedIterator in modalità versione più recente che restituisce oggetti di entità, in questo caso un oggetto User:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Suggerimento

Per le versioni precedenti a 3.34.0, è possibile usare la modalità versione più recente impostando ChangeFeedMode.Incremental. Sia Incremental che LatestVersion fanno riferimento alla modalità di versione più recente del feed di modifiche e alle applicazioni che usano entrambe le modalità vedono lo stesso comportamento.

La modalità tutte le versioni ed eliminazioni è in anteprima e può essere utilizzata con le versioni di .NET SDK di anteprima >= 3.32.0-preview. Di seguito è riportato un esempio per ottenere FeedIterator nella modalità tutte le versioni ed eliminazioni che restituisce oggetti User:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Annotazioni

Nella modalità versione più recente si ricevono oggetti che rappresentano l'elemento modificato, con alcuni metadati aggiuntivi. Tutte le versioni e la modalità elimina restituiscono un modello di dati diverso.

È possibile ottenere l'esempio completo per la modalità versione più recente o la modalità tutte le versioni ed eliminazioni.

Usare il feed di modifiche tramite flussi

FeedIterator per entrambe le modalità del feed di modifiche sono disponibili due opzioni. Oltre agli esempi che restituiscono oggetti entità, è anche possibile ottenere la risposta con il supporto Stream. Stream consente di leggere i dati senza prima deserializzarli e quindi di risparmiare risorse del client.

Ecco un esempio che mostra come ottenere un oggetto FeedIterator in modalità versione più recente che restituisce Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Usare le modifiche per un intero contenitore

Se non si fornisce un parametro FeedRange a FeedIterator, è possibile elaborare il feed di modifiche di un intero contenitore in modo personalizzato. L'esempio riportato di seguito inizia a leggere tutte le modifiche a partire dall'ora corrente, usando la modalità versione più recente:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Poiché il feed di modifiche è effettivamente un elenco infinito di elementi che includono tutte le scritture e gli aggiornamenti futuri, il valore di HasMoreResults è sempre true. Quando si tenta di leggere il feed di modifiche e non sono disponibili nuove modifiche, si riceve una risposta con stato NotModified. Questa operazione è diversa rispetto alla ricezione di una risposta che indica nessuna modifica e uno stato OK. Potresti ricevere risposte vuote dal flusso di modifiche, anche se sono disponibili altre modifiche, e si dovrebbe continuare a eseguire il polling fino a ricevere NotModified. Nell'esempio precedente, NotModified viene gestito attendendo cinque secondi prima di controllare di nuovo le modifiche.

Usare le modifiche per una chiave di partizione

In alcuni casi, è possibile elaborare solo le modifiche per una chiave di partizione specifica. È possibile ottenere un oggetto FeedIterator per una chiave di partizione specifica ed elaborare le modifiche come si farebbe per un intero contenitore.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Usare FeedRange per la parallelizzazione

Nel processore del feed di modifiche il lavoro viene distribuito automaticamente tra più consumatori. Nel modello di pull del feed di modifiche è possibile usare FeedRange per impostare l'elaborazione parallela del feed di modifiche. Un elemento FeedRange rappresenta un intervallo di valori di chiave di partizione.

Ecco un esempio che illustra come ottenere un elenco di intervalli per il contenitore:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Quando si ottiene un elenco di valori FeedRange per il contenitore, si otterrà una FeedRange per partizione fisica.

Con FeedRange è quindi possibile creare un oggetto FeedIterator per parallelizzare il feed di modifiche tra più computer o thread. A differenza dell'esempio precedente in cui è stato illustrato come ottenere un oggetto FeedIterator per l'intero contenitore o una singola chiave di partizione, è possibile usare FeedRanges per ottenere più oggetti FeedIterator, che possono elaborare il feed di modifiche in parallelo.

Nel caso in cui si intenda usare FeedRange, è necessario disporre di un processo dell'agente di orchestrazione che ottenga FeedRange e li distribuisca in tali computer. Questa distribuzione potrebbe essere:

  • Uso di FeedRange.ToJsonString e distribuzione di questo valore stringa. I consumer possono usare questo valore con FeedRange.FromJsonString.
  • Se la distribuzione è in corso, passaggio del riferimento all'oggetto FeedRange.

Ecco un esempio che illustra come leggere dall'inizio del feed di modifiche del contenitore usando due ipotetici computer distinti che eseguono la lettura in parallelo:

Macchina 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Computer 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Salvare i token di continuazione

È possibile salvare la posizione del tuo FeedIterator ottenendo il token di continuazione. Un token di continuazione è un valore stringa che tiene traccia delle ultime modifiche elaborate di FeedIterator e che consente a FeedIterator di riprendere dallo stesso punto in un secondo momento. Il token di continuazione, se specificato, avrà la priorità sull'ora di inizio e su inizia dai valori iniziali. Il codice seguente legge tramite il feed di modifiche dopo la creazione del contenitore. Quando non sono più disponibili modifiche, viene salvato in modo permanente un token di continuazione che consentirà di riprendere il consumo del feed di modifiche in un secondo momento.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Quando si usa la modalità versione più recente, il token di continuazione FeedIterator non scade finché è presente il contenitore Azure Cosmos DB. Quando si usa tutte le versioni ed elimina la modalità, il token di continuazione FeedIterator è valido purché le modifiche siano state apportate all'interno della finestra di conservazione per i backup continui.

Passaggi successivi