Processore del feed di modifiche in Azure Cosmos DB
SI APPLICA A: NoSQL
Il processore del feed di modifiche fa parte degli SDK DB .NET V3 e Java V4 di Azure Cosmos DB. Semplifica il processo di lettura del feed di modifiche e distribuisce in modo efficace l'elaborazione degli eventi tra più consumer.
Il vantaggio principale del processore di feed di modifiche è il comportamento a tolleranza di errore che garantisce un recapito di tipo "at-least-once" di tutti gli eventi nel feed di modifiche.
SDK supportati
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
✓ | ✓ | ✕ | ✕ |
Componenti del processore dei feed di modifiche
Il processore di feed di modifiche ha quattro componenti principali:
Contenitore monitorato: il contenitore monitorato include i dati da cui viene generato il feed di modifiche. Eventuali inserimenti e aggiornamenti nel contenitore monitorato si riflettono nel feed di modifiche del contenitore.
Contenitore di leasing: il contenitore di leasing agisce come una risorsa di archiviazione stato e coordina l'elaborazione dei feed di modifiche tra più ruoli di lavoro. Il contenitore di lease può essere archiviato nello stesso account del contenitore monitorato o in un account diverso.
Istanza di calcolo: un'istanza di calcolo ospita il processore del feed di modifiche per l'ascolto delle modifiche. A seconda della piattaforma, può essere rappresentato da una macchina virtuale, un pod Kubernetes, un'istanza del Servizio app di Azure o da un computer fisico effettivo. L'istanza di calcolo ha un identificatore univoco denominato nome dell'istanza in questo articolo.
Delegato: il delegato è il codice che definisce le operazioni che lo sviluppatore vuole eseguire con ogni batch di modifiche letto dal processore del feed di modifiche.
Per una migliore comprensione dell'interazione tra i quattro elementi del processore di feed di modifiche, è consigliabile esaminare un esempio nel diagramma seguente. Il contenitore monitorato archivia gli elementi e usa 'City' come chiave di partizione. I valori della chiave di partizione vengono distribuiti in intervalli (ogni intervallo rappresenta una partizione fisica) che contiene elementi.
Il diagramma mostra due istanze di calcolo e il processore di feed di modifiche assegna diversi intervalli a ogni istanza per ottimizzare la distribuzione di calcolo. Ogni istanza ha un nome univoco diverso.
Ogni intervallo viene letto in parallelo. Lo stato di avanzamento dell'intervallo viene mantenuto separatamente dagli altri intervalli nel contenitore di lease tramite un documento di lease. La combinazione dei lease rappresenta lo stato corrente del processore di feed di modifiche.
Implementare il processore dei feed di modifiche
Il processore del feed di modifiche in .NET è disponibile per la modalità Versione più recente e quella Tutte le versioni ed eliminazioni. La modalità Tutte le versioni ed eliminazioni è in anteprima ed è supportata per il processore del feed di modifiche a partire dalla versione 3.40.0-preview.0
. Il punto di ingresso per entrambe le modalità è sempre il contenitore monitorato.
Per leggere usando la modalità Versione più recente, in un'istanza Container
chiamare GetChangeFeedProcessorBuilder
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Per leggere usando tutte le versioni e la modalità di eliminazione, chiamare GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
dall'istanza Container
:
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Per entrambe le modalità, il primo parametro è un nome distinto che descrive l'obiettivo di questo processore. Il secondo nome è l'implementazione del delegato che gestisce le modifiche.
Ecco un esempio di delegato per la modalità di versione più recente:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Di seguito è riportato un esempio di delegato per tutte le versioni ed eliminazioni in modalità:
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
Successivamente, si definisce il nome dell'istanza di calcolo o l'identificatore univoco usando WithInstanceName
. Il nome dell'istanza di calcolo deve essere univoco e diverso per ogni istanza di calcolo che viene distribuita. Impostare il contenitore per mantenere lo stato del lease usando WithLeaseContainer
.
Chiamando Build
, si otterrà l'istanza del processore che è possibile avviare chiamando StartAsync
.
Nota
I frammenti di codice precedenti sono tratti da esempi in GitHub. È possibile ottenere l'esempio per la modalità versione più recente o la modalità tutte le versioni e le eliminazioni.
Ciclo di vita dell'elaborazione
Il normale ciclo di vita di un'istanza dell'host è:
- Leggere il feed di modifiche.
- Se non sono state apportate modifiche, sospendere per un periodo di tempo predefinito personalizzabile con
WithPollInterval
nel generatore e procedere al passaggio 1. - Se ci sono modifiche, inviarle al delegato.
- Quando il delegato termina correttamente l'elaborazione delle modifiche, aggiornare l'archivio dei lease con l'ultimo punto nel tempo elaborato e passare a #1.
Gestione degli errori
Il processore dei feed di modifiche è resiliente agli errori del codice utente. Se l'implementazione del delegato presenta un'eccezione non gestita (passaggio 4), il thread che elabora tale batch di modifiche verrà arrestato e verrà creato un nuovo thread. Il nuovo thread controlla l'ultimo momento nel tempo in cui l'archivio di lease ha salvato per tale intervallo di valori di chiave di partizione. Il nuovo thread riparte da quel momento, inviando effettivamente lo stesso batch di modifiche al delegato. Questo comportamento continua fino a quando il delegato non elabora correttamente le modifiche ed è il motivo per cui il processore di feed di modifiche ha una garanzia di tipo "at least once".
Nota
In un solo scenario non viene eseguito un nuovo tentativo per un batch di modifiche. Se l'errore si verifica durante la prima esecuzione del delegato, l'archivio di leasing non ha uno stato salvato precedente da usare per il nuovo tentativo. In questi casi, il nuovo tentativo usa laconfigurazione iniziale, che potrebbe o meno includere l'ultimo batch.
Per evitare che il processore di feed di modifiche si blocchi continuamente durante la ripetizione dei tentativi per lo stesso batch di modifiche, è necessario aggiungere nel codice del delegato la logica per scrivere documenti, in caso di eccezione, in una coda di messaggi con errori. Questa progettazione garantisce la possibilità di tenere traccia delle modifiche non elaborate continuando comunque a elaborare le modifiche future. La coda di messaggi con errori può essere un altro contenitore di Azure Cosmos DB. L'archivio dati esatto non è importante. Si vuole semplicemente salvare in modo permanente le modifiche non elaborate.
È anche possibile usare lo strumento di stima del feed di modifiche per monitorare lo stato di avanzamento delle istanze del processore durante la lettura del feed di modifiche o usare le notifiche sul ciclo di vita per rilevare gli errori sottostanti.
Notifiche sul ciclo di vita
È possibile connettere il processore di feed di modifiche a qualsiasi evento pertinente nel ciclo di vita. È possibile scegliere di ricevere una notifica per uno o per tutti. È consigliabile registrare almeno la notifica di errore:
- Registrare un gestore per
WithLeaseAcquireNotification
per ricevere una notifica quando l'host corrente acquisisce un lease per avviarne l'elaborazione. - Registrare un gestore per
WithLeaseReleaseNotification
per ricevere una notifica quando l'host corrente rilascia un lease per arrestarne l'elaborazione. - Registrare un gestore per
WithErrorNotification
per ricevere una notifica quando l'host corrente riscontra un'eccezione durante l'elaborazione. È necessario essere in grado di distinguere se l'origine è il delegato utente (un'eccezione non gestita) o un errore rilevato dal processore quando tenta di accedere al contenitore monitorato (ad esempio, problemi di rete).
Le notifiche del ciclo di vita sono disponibili in entrambe le modalità del feed di modifiche. Ecco un esempio di notifiche del ciclo di vita nella modalità versione più recente:
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Unità di distribuzione
Una singola unità di distribuzione del processore di feed di modifiche è costituita da una o più istanze di calcolo con lo stesso valore per processorName
e la stessa configurazione del contenitore di lease, ma nomi di istanza diversi. Possono essere presenti numerose unità di distribuzione in cui ciascuna di esse prevede un flusso aziendale diverso per le modifiche e ogni unità di distribuzione è costituita da una o più istanze.
Ad esempio, si potrebbe avere un'unità di distribuzione che attiva un'API esterna ogni volta che viene apportata una modifica al contenitore. Un'altra unità di distribuzione potrebbe spostare i dati in tempo reale ogni volta che viene apportata una modifica. Quando si verifica una modifica nel contenitore monitorato, tutte le unità di distribuzione riceveranno una notifica.
Scalabilità dinamica
Come indicato in precedenza, all'interno di un'unità di distribuzione possono essere presenti una o più istanze. Per sfruttare i vantaggi della distribuzione di calcolo all'interno dell'unità di distribuzione, gli unici requisiti fondamentali sono i seguenti:
- Tutte le istanze devono avere la stessa configurazione del contenitore di lease.
- Tutte le istanze devono avere lo stesso valore per
processorName
. - Ogni istanza deve avere un nome di istanza diverso (
WithInstanceName
).
Se queste tre condizioni sono soddisfatte, il processore di feed di modifiche distribuirà tutti i lease nel contenitore di lease in tutte le istanze in esecuzione di tale unità di distribuzione e parallelizzerà il calcolo usando un algoritmo di distribuzione uguale. Un lease è sempre di proprietà di un'istanza, quindi il numero di istanze non deve essere maggiore del numero di lease.
Il numero di istanze può aumentare e diminuire. Il processore di feed di modifiche regola dinamicamente il carico ridistribuendolo di conseguenza.
Inoltre, il processore di feed di modifiche può adattarsi dinamicamente alle dimensioni dei contenitori se la velocità effettiva o lo spazio di archiviazione del contenitore aumentano. Quando il contenitore si espande, il processore di feed di modifiche gestisce in modo trasparente questi scenari aumentando dinamicamente i lease e distribuendo i nuovi lease tra le istanze esistenti.
Data/ora di avvio
Quando un processore di feed di modifiche viene avviato per la prima volta, inizializza il contenitore di lease e avvia il ciclo di vita di elaborazione. Eventuali modifiche apportate al contenitore monitorato precedentemente alla prima inizializzazione del processore dei feed di modifiche non verranno rilevate.
Lettura da una data e un'ora precedenti
È possibile inizializzare il processore di feed di modifiche in modo da leggere le modifiche a partire da una data e ora specifiche, passando un'istanza di DateTime
all'estensione del generatore WithStartTime
:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
Il processore di feed di modifiche verrà inizializzato per la data e l'ora specifiche e inizierà a leggere le modifiche che si sono verificate in seguito.
Lettura dall'inizio
In altri scenari, ad esempio la migrazione dei dati o l'analisi dell'intera cronologia di un contenitore, è necessario leggere il feed di modifiche dall'inizio della durata del contenitore. È possibile usare WithStartTime
sull'estensione del generatore, ma passando DateTime.MinValue.ToUniversalTime()
, che genera la rappresentazione UTC del valore minimo DateTime
, come nell'esempio seguente:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
Il processore di feed di modifiche viene inizializzato e inizia a leggere le modifiche dall'inizio della durata del contenitore.
Nota
Queste opzioni di personalizzazione funzionano solo per impostare il punto iniziale nel tempo del processore di feed di modifiche. Una volta inizializzato il contenitore dei lease per la prima volta, modificare queste opzioni non ha alcun effetto.
La personalizzazione del punto di partenza è disponibile solo per la modalità di feed delle modifiche della versione più recente. Quando si usa la modalità tutte le versioni ed eliminazioni, è necessario iniziare a leggere dal momento in cui il processore viene avviato o riprendere da uno stato di leasing precedente che rientra nel periodo di conservazione del backup continuo dell'account.
Feed di modifiche e velocità effettiva di cui viene effettuato il provisioning
Le operazioni di lettura del feed di modifiche nel contenitore monitorato utilizzano unità richiesta. Assicurarsi che il contenitore monitorato non stia riscontrando una limitazione. La limitazione aggiunge ritardi nella ricezione di eventi del feed di modifiche nei processori.
Le operazioni sul contenitore di lease (aggiornamento e gestione dello stato) utilizzano unità richiesta. Maggiore è il numero di istanze che usano lo stesso contenitore di lease, maggiore è il potenziale utilizzo di unità richiesta. Assicurarsi che il contenitore di lease non stia riscontrando una limitazione. La limitazione aggiunge ritardi nella ricezione di eventi del feed di modifiche. La limitazione può anche terminare completamente l'elaborazione.
Condividere il contenitore di lease
È possibile condividere un contenitore di lease tra più unità di distribuzione. In un contenitore di lease condiviso ogni unità di distribuzione è in ascolto di un contenitore monitorato diverso o ha un valore diverso per processorName
. In questa configurazione ogni unità di distribuzione mantiene uno stato indipendente nel contenitore di lease. Esaminare l'utilizzo di unità richiesta in un contenitore di lease per assicurarsi che la velocità effettiva di cui è stato effettuato il provisioning sia sufficiente per tutte le unità di distribuzione.
Configurazione avanzata del lease
Esistono tre configurazioni principali che possono influire sul funzionamento del processore di feed di modifiche. Ogni configurazione influisce sull'utilizzo di unità richiesta nel contenitore di lease. È possibile impostare una di queste configurazioni quando si crea il processore di feed di modifiche, ma è necessario usarle con cautela:
- Acquisizione lease: per impostazione predefinita, ogni 17 secondi. Un host controlla periodicamente lo stato dell'archivio di lease e valuta l'acquisizione di lease come parte del processo di scalabilità dinamica. Questo processo viene eseguito mediante una query sul contenitore di lease. La riduzione di questo valore accelera il ribilanciamento e l'acquisizione dei lease, ma aumenta l'utilizzo di unità richiesta nel contenitore di lease.
- Scadenza lease: per impostazione predefinita, 60 secondi. Definisce la quantità massima di tempo in base alla quale un lease può esistere senza alcuna attività di rinnovo prima che venga acquisito da un altro host. Quando un host si arresta in modo anomalo, i lease di sua proprietà vengono prelevati da altri host dopo questo periodo di tempo più l'intervallo di rinnovo configurato. La riduzione di questo valore accelera il ripristino dopo un arresto anomalo dell'host, ma il valore di scadenza non deve mai essere inferiore all'intervallo di rinnovo.
- Rinnovo lease: per impostazione predefinita, ogni 13 secondi. Un host proprietario di un lease rinnova periodicamente il lease, anche se non sono presenti nuove modifiche da utilizzare. Questo processo viene eseguito mediante una sostituzione sul lease. La riduzione di questo valore riduce il tempo necessario per rilevare i lease persi da un host arrestato in modo anomalo, ma aumenta l'utilizzo di unità richiesta nel contenitore di lease.
Dove ospitare il processore di feed di modifiche
Il processore di feed di modifiche può essere ospitato in qualsiasi piattaforma che supporta processi o attività a esecuzione prolungata. Di seguito sono riportati alcuni esempi.
- Un'istanza in esecuzione continua di processi Web nel Servizio app di Azure
- Un processo in un'istanza di Macchine virtuali di Azure
- Un processo in background nel servizio Azure Kubernetes
- Una funzione serverless in Funzioni di Azure
- Un servizio ospitato ASP.NET
Anche se il processore di feed di modifiche può essere eseguito in ambienti di breve durata perché il contenitore di lease mantiene lo stato, il ciclo di avvio di questi ambienti aggiunge ritardi al tempo necessario per ricevere le notifiche (a causa del sovraccarico derivante dall'avvio del processore ogni volta che viene avviato l'ambiente).
Requisiti di accesso in base al ruolo
Quando si usa Microsoft Entra ID come meccanismo di autenticazione, assicurarsi che l'identità disponga delle autorizzazioni appropriate:
- Nel contenitore monitorato:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- Nel contenitore di leasing:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
Risorse aggiuntive
- SDK di Azure Cosmos DB
- Applicazione di esempio completa in GitHub
- Esempi di utilizzo aggiuntivi in GitHub
- Lab del workshop su Azure Cosmos DB per il processore di feed di modifiche
Passaggi successivi
Altre informazioni sul processore di feed di modifiche sono disponibili negli articoli seguenti: