Wijzigingenfeed verkennen in Azure Cosmos DB

Voltooid

Wijzigingenfeed in Azure Cosmos DB is een permanente record van wijzigingen in een container in de volgorde waarin ze plaatsvinden. Wijzigingenfeed-ondersteuning in Azure Cosmos DB-taken door te luisteren naar een Azure Cosmos DB-container voor alle wijzigingen. Als output verschijnt er vervolgens een gesorteerde lijst met gewijzigde documenten op volgorde van wijziging. De persistente wijzigingen kunnen asynchroon en incrementeel worden verwerkt en de uitvoer kan worden gedistribueerd over een of meer consumenten voor parallelle verwerking.

Wijzigingenfeed en verschillende bewerkingen

Vandaag ziet u alle invoegingen en updates in de wijzigingenfeed. U kunt de wijzigingenfeed niet filteren op een specifiek type bewerking. Op dit moment worden verwijderbewerkingen niet vastgelegd in de wijzigingenfeed. Als tijdelijke oplossing kunt u een zachte markering toevoegen aan de items die worden verwijderd. U kunt bijvoorbeeld een kenmerk toevoegen in het item met de naam 'verwijderd', de waarde ervan instellen op 'true' en vervolgens een TTL-waarde (Time-to-Live) voor het item instellen. Als u de TTL instelt, zorgt u ervoor dat het item automatisch wordt verwijderd.

Wijzigingenfeed in Azure Cosmos DB lezen

U kunt met de Azure Cosmos DB-wijzigingenfeed werken met behulp van een pushmodel of een pull-model. Met een pushmodel pusht de wijzigingenfeedprocessor werk naar een client met bedrijfslogica voor het verwerken van dit werk. De complexiteit bij het controleren van werk en het opslaan van de status voor het laatst verwerkte werk wordt echter verwerkt in de verwerker van de wijzigingenfeed.

Met een pull-model moet de client het werk van de server ophalen. In dit geval heeft de client niet alleen bedrijfslogica voor het verwerken van werk, maar ook de opslagstatus voor het laatst verwerkte werk, het afhandelen van taakverdeling voor meerdere clients die werk parallel verwerken en fouten afhandelen.

Notitie

Het wordt aanbevolen om het pushmodel te gebruiken, omdat u zich geen zorgen hoeft te maken over het peilen van de wijzigingenfeed voor toekomstige wijzigingen, het opslaan van de status voor de laatst verwerkte wijziging en andere voordelen.

De meeste scenario's die gebruikmaken van de Azure Cosmos DB-wijzigingenfeed, maken gebruik van een van de pushmodelopties. Er zijn echter enkele scenario's waarin u mogelijk het extra beheer op laag niveau van het pull-model wilt. Deze omvatten:

  • Wijzigingen van een bepaalde partitiesleutel lezen
  • Het tempo bepalen waarop uw klant wijzigingen ontvangt voor verwerking
  • Eenmalig lezen van de bestaande gegevens in de wijzigingenfeed (bijvoorbeeld om een gegevensmigratie uit te voeren)

Wijzigingenfeed lezen met een pushmodel

Er zijn twee manieren waarop u de wijzigingenfeed kunt lezen met een pushmodel: Azure Functions Azure Cosmos DB-triggers en de processorbibliotheek voor wijzigingenfeeds. Azure Functions maakt achter de schermen gebruik van de processor voor wijzigingenfeeds, dus dit zijn beide vergelijkbare manieren om de wijzigingenfeed te lezen. U kunt Azure Functions beschouwen als een hostingplatform voor de wijzigingenfeedprocessor, niet als een geheel andere manier om de wijzigingenfeed te lezen. Azure Functions maakt achter de schermen gebruik van de processor voor wijzigingenfeeds. De verwerking van wijzigingen wordt automatisch geparallelliseert in de partities van uw container.

Azure Functions

U kunt kleine reactieve Azure Functions maken die automatisch wordt geactiveerd voor elke nieuwe gebeurtenis in de wijzigingenfeed van uw Azure Cosmos DB-container. Met de Azure Functions-trigger voor Azure Cosmos DB kunt u de schaalfunctie van de wijzigingenfeedprocessor en betrouwbare functionaliteit voor gebeurtenisdetectie gebruiken zonder dat u een werkrolinfrastructuur hoeft te onderhouden.

Diagram showing the change feed triggering Azure Functions for processing.

Verwerker van wijzigingenfeed

De wijzigingenfeedprocessor maakt deel uit van de SDK's van Azure Cosmos DB .NET V3 en Java V4 . Het vereenvoudigt het proces van het lezen van de wijzigingenfeed en distribueert de gebeurtenisverwerking effectief over meerdere consumenten.

Er zijn vier belangrijke onderdelen bij de implementatie van de wijzigingenfeedverwerker:

  1. De bewaakte container: de bewaakte container bevat de gegevens waaruit de wijzigingenfeed wordt gegenereerd. Eventuele toevoegingen en updates van de bewaakte container worden weergegeven in de wijzigingenfeed van de container.

  2. De leasecontainer: De leasecontainer fungeert als een statusopslag en coördineert de verwerking van de wijzigingenfeed voor meerdere werkrollen. De leasecontainer kan worden opgeslagen in hetzelfde account als de bewaakte container of in een afzonderlijk account.

  3. Het rekenproces: een rekenproces fungeert als host voor de processor van de wijzigingenfeed om te luisteren naar wijzigingen. Afhankelijk van het platform kan het worden vertegenwoordigd door een virtuele machine, een kubernetes-pod, een Azure-app Service-exemplaar, een werkelijke fysieke machine. In dit artikel wordt naar een unieke id verwezen als de naam van het exemplaar.

  4. De gemachtigde: De gemachtigde is de code die definieert wat u, de ontwikkelaar, wilt doen met elke batch wijzigingen die de wijzigingenfeedprocessor leest.

Wanneer u de verwerker van de wijzigingenfeed implementeert, is het ingangspunt altijd de bewaakte container, vanaf een Container exemplaar dat u aanroept GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Wanneer de eerste parameter een afzonderlijke naam is die het doel van deze processor beschrijft en de tweede naam de implementatie van de gemachtigde is die wijzigingen verwerkt. Hier volgt een voorbeeld van een gemachtigde:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Daarna definieert u de naam van het rekenproces of de unieke id met WithInstanceName, dit moet uniek en verschillend zijn in elk rekenproces dat u implementeert, en ten slotte is dit de container waarmee de leasestatus WithLeaseContainermoet worden onderhouden.

Aanroepen Build geeft u het processorexemplaren dat u kunt starten door aan te roepen StartAsync.

De normale levenscyclus van een host-exemplaar is:

  1. Lees de wijzigingenfeed.
  2. Als er geen wijzigingen zijn, slaapstand voor een vooraf gedefinieerde hoeveelheid tijd (aanpasbaar met WithPollInterval in de Builder) en gaat u naar #1.
  3. Als er wijzigingen zijn, stuurt u deze naar de gemachtigde.
  4. Wanneer de gemachtigde de verwerking van de wijzigingen heeft voltooid, werkt u het lease-archief bij met het meest recente verwerkingspunt en gaat u naar #1.