Änderungsfeedprozessor in Azure Cosmos DB

GILT FÜR: NoSQL

Der Änderungsfeedprozessor ist Teil der SDKs .NET V3 und Java V4 für Azure Cosmos DB. Er vereinfacht das Lesen des Änderungsfeeds sowie das effektive Verteilen der Ereignisverarbeitung auf mehrere Consumer.

Der Hauptvorteil des Änderungsfeedprozessors ist sein fehlertolerantes Verhalten, das eine mindestens einmalige (At-Least-Once) Übermittlung aller Ereignisse im Änderungsfeed gewährleistet.

Komponenten des Änderungsfeedprozessors

Der Änderungsfeedprozessor verfügt über vier Hauptkomponenten:

  • Überwachter Container: Der überwachte Container enthält die Daten, aus denen der Änderungsfeed generiert wird. Alle Einfügungen und Aktualisierungen für den überwachten Container werden im Änderungsfeed des Containers berücksichtigt.

  • Der Lease-Container: Der Lease-Container fungiert als Statusspeicher und koordiniert die Verarbeitung des Änderungsfeeds zwischen mehreren Workern. Er kann im gleichen Konto wie der überwachte Container oder in einem separaten Konto gespeichert werden.

  • Die Compute-Instanz: Eine Compute-Instanz hostet den Änderungsfeedprozessor, um Änderungen zu überwachen. Je nach Plattform kann sie ein virtueller Computer (VM), ein Kubernetes-Pod, eine Azure App Service-Instanz oder ein tatsächlicher physischer Computer sein. Die Compute-Instanz verfügt über einen eindeutigen Bezeichner, der in diesem Artikel als Instanz-Name bezeichnet wird.

  • Delegat: Der Delegat ist der Code, der definiert, wie Sie als Entwickler mit dem jeweiligen Batch von Änderungen verfahren möchten, der vom Änderungsfeedprozessor gelesen wurde.

Um besser zu verstehen, wie diese vier Elemente des Änderungsfeedprozessors zusammenarbeiten, sehen wir uns ein Beispiel in der folgenden Abbildung an. Der überwachte Container speichert Elemente und verwendet „City“ als Partitionsschlüssel. Die Partitionsschlüsselwerte werden in Bereichen verteilt (jeder Bereich stellt eine physische Partition dar), die Elemente enthalten.

Das Diagramm zeigt zwei Computeinstanzen, und der Änderungsfeedprozessor weist jeder Instanz unterschiedliche Bereiche zu, um die Computeverteilung zu maximieren. Jede Instanz hat einen anderen, eindeutigen Namen.

Jeder Bereich wird parallel gelesen. Der Fortschritt eines Bereichs wird getrennt von anderen Bereichen Leasecontainer über ein Lease-Dokument verwaltet. Die Kombination der Leasedauer stellt den aktuellen Zustand des Änderungsfeedprozessors dar.

Beispiel für den Änderungsfeedprozessor

Implementieren des Änderungsfeedprozessors

Der Änderungsfeedprozessor in .NET ist derzeit nur für den Modus „Neueste Version“ verfügbar. Der Einstiegspunkt ist immer der überwachte Container. In einer Container-Instanz rufen Sie GetChangeFeedProcessorBuilder auf:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Der erste Parameter ist ein eindeutiger Name, der das Ziel dieses Prozessors beschreibt. Der zweite Name ist die Delegatimplementierung, die Änderungen verarbeitet.

Hier ein Beispiel für einen Delegaten:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Anschließend definieren Sie den Namen der Compute-Instanz oder den eindeutigen Bezeichner mithilfe von WithInstanceName. Der Name der Compute-Instanz sollte für jede Compute-Instanz, die Sie bereitstellen, eindeutig und unterschiedlich sein. Sie legen den Container so fest, dass der Leasestatus beibehalten wird, indem Sie WithLeaseContainer verwenden.

Durch Aufrufen von Build wird die Prozessorinstanz erstellt, die dann durch Aufrufen von StartAsync gestartet werden kann.

Verarbeitungslebenszyklus

Der normale Lebenszyklus einer Hostinstanz sieht wie folgt aus:

  1. Der Änderungsfeed wird gelesen.
  2. Sollten keine Änderungen vorhanden sein, wird nach einer vordefinierten Pause (die im Generator mithilfe von WithPollInterval angepasst werden kann) noch mal Schritt 1 ausgeführt.
  3. Sind Änderungen vorhanden, werden sie an den Delegaten gesendet.
  4. Nach erfolgreicher Verarbeitung der Änderungen durch den Delegaten wird der Leasespeicher mit dem neuesten verarbeiteten Zeitpunkt aktualisiert, und es wird wieder Schritt 1 ausgeführt.

Fehlerbehandlung

Der Änderungsfeedprozessor bietet Resilienz bei Benutzercodefehlern. Wenn Ihre Delegatimplementierung eine nicht behandelte Ausnahme (Schritt 4) aufweist, wird der Thread, der diesen speziellen Batch von Änderungen verarbeitet, beendet, und schließlich wird ein neuer Thread erstellt. Der neue Thread prüft den letzten Zeitpunkt, den der Leasespeicher für diesen Bereich von Partitionsschlüsselwerten gespeichert hat. Der neue Thread startet von dort aus neu und sendet effektiv denselben Batch von Änderungen an den Delegaten. Dieses Verhalten setzt sich fort, bis Ihr Delegat die Änderungen korrekt verarbeitet hat, und das ist der Grund, warum der Änderungsfeedprozessor eine "mindestens einmal"-Garantie enthält. Die Nutzung des Änderungsfeeds mit letztlicher Konsistenz kann auch zu duplizierten Ereignissen zwischen aufeinanderfolgenden Änderungsfeed-Lesevorgängen führen. So kann beispielsweise das letzte Ereignis eines Lesevorgangs als das erste Ereignis des nächsten Vorgangs erscheinen.

Hinweis

In nur einem Szenario wird ein Batch von Änderungen nicht erneut versucht. Wenn der Fehler bei der ersten Delegatausführung eintritt, gibt es keinen zuvor gespeicherten Zustand des Leasespeichers, der für den Wiederholungsversuch verwendet werden kann. In diesen Fällen wird bei der Wiederholung die anfängliche Startkonfiguration verwendet, die möglicherweise den letzten Batch enthält.

Um zu verhindern, dass der Änderungsfeedprozessor fortlaufend denselben Batch von Änderungen wiederholt, sollten Sie im Delegatcode Logik hinzufügen, um bei einer Ausnahme Dokumente in eine Warteschlange für Fehlermeldungen zu schreiben. Dieser Entwurf stellt sicher, dass Sie nicht verarbeitete Änderungen nachverfolgen und gleichzeitig zukünftige Änderungen verarbeiten können. Die Warteschlange für Fehlermeldungen kann ein anderer Azure Cosmos DB-Container sein. Der genaue Datenspeicher spielt keine Rolle. Sie möchten einfach, dass die nicht verarbeiteten Änderungen beibehalten werden.

Darüber hinaus können Sie den Änderungsfeed-Schätzer verwenden, um den Fortschritt Ihrer Änderungsfeed-Prozessorinstanzen beim Lesen des Änderungsfeeds zu überwachen, oder die Lebenszyklusbenachrichtigungen verwenden, um zugrunde liegende Ausfälle zu erkennen.

Lebenszyklusbenachrichtigungen

Sie können den Änderungsfeedprozessor mit jedem relevanten Ereignis in seinem Lebenszyklus verbinden. Sie können wählen, ob Sie über ein oder alle Ereignisse benachrichtigt werden möchten. Es wird empfohlen, zumindest die Fehlerbenachrichtigung zu registrieren:

  • Registrieren Sie einen Handler für WithLeaseAcquireNotification, um benachrichtigt zu werden, wenn der aktuelle Host einen Lease erwirbt, um mit der Verarbeitung zu beginnen.
  • Registrieren Sie einen Handler für WithLeaseReleaseNotification, um benachrichtigt zu werden, wenn der aktuelle Host einen Lease freigibt und dessen Bearbeitung beendet.
  • Registrieren Sie einen Handler für WithErrorNotification, um benachrichtigt zu werden, wenn beim aktuellen Host während der Verarbeitung eine Ausnahme auftritt. Sie müssen in der Lage sein zu unterscheiden, ob die Quelle der Benutzerdelegat (eine nicht behandelte Ausnahme) oder ein Fehler ist, der beim Versuch, auf den überwachten Container zuzugreifen (z. B. Netzwerkprobleme), auftritt.
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Bereitstellungseinheit

Eine Bereitstellungseinheit für den Änderungsfeedprozessor besteht aus einer oder mehreren Compute-Instanzen mit demselben Wert für processorName und derselben Leasecontainerkonfiguration, jedoch mit jeweils unterschiedlichem Instanznamen. Sie können mehrere Bereitstellungseinheiten besitzen, deren Geschäftsabläufe für Änderungen sich unterschieden und die jeweils aus einer oder mehreren Instanzen bestehen.

Beispielsweise könnte eine Ihrer Bereitstellungseinheiten immer dann eine externe API auslösen, wenn eine Änderung am Container erfolgt. Eine andere Bereitstellungseinheit könnte immer dann Daten in Echtzeit verschieben, wenn eine Änderung erfolgt. Wenn in Ihrem überwachten Container eine Änderung auftritt, werden alle Bereitstellungseinheiten benachrichtigt.

Dynamische Skalierung

Wie bereits erwähnt, kann eine Bereitstellungseinheit eine oder mehrere Compute-Instanzen enthalten. Sie müssen nur die folgenden wichtigen Voraussetzungen erfüllen, um die Computeverteilung innerhalb der Bereitstellungseinheit zu nutzen:

  • Alle Instanzen müssen über die gleiche Leasecontainerkonfiguration verfügen.
  • Alle Instanzen müssen den gleichen processorName-Wert besitzen.
  • Die einzelnen Instanzen müssen jeweils einen anderen Instanznamen (WithInstanceName) besitzen.

Wenn diese drei Bedingungen erfüllt sind, verteilt der Änderungsfeedprozessor unter Verwendung eines Algorithmus für gleichmäßige Verteilung alle Leases im Leasecontainer auf alle aktiven Instanzen dieser Bereitstellungseinheit und parallelisiert Computevorgänge. Eine Lease ist immer nur im Besitz einer Instanz. Daher darf die Anzahl der Instanzen nicht größer sein als die Anzahl der Leases.

Die Anzahl der Instanzen kann sich vergrößern und verkleinern. Der Änderungsfeedprozessor passt die Last dynamisch an, indem er sie entsprechend umverteilt.

Darüber hinaus kann der Änderungsfeedprozessor die Skalierung eines Containers dynamisch anpassen, wenn der Durchsatz oder Speicher des Containers zunimmt. Bei zunehmender Größe des Containers verarbeitet der Änderungsfeedprozessor diese Szenarien transparent, indem die Leases dynamisch erhöht und die neuen Leases auf vorhandene Instanzen verteilt werden.

Startzeit

Wenn ein Änderungsfeedprozessor zum ersten Mal gestartet wird, initialisiert er standardmäßig den Leasecontainer und startet seinen Verarbeitungslebenszyklus. Änderungen, die im überwachten Container aufgetreten sind, bevor der Änderungsfeedprozessor zum ersten Mal initialisiert wurde, werden nicht erkannt.

Lesen ab einem früheren Datum und einer früheren Uhrzeit

Es ist möglich, den Änderungsfeedprozessor so zu initialisieren, dass er an einem bestimmten Datum und zu einer bestimmten Uhrzeit mit dem Lesen von Daten beginnt. Hierfür muss eine Instanz von DateTime an die WithStartTime-Generator-Erweiterung übergeben werden:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Der Änderungsfeedprozessor wird für dieses bestimmte Datum und die jeweilige Uhrzeit initialisiert und beginnt mit dem Lesen der anschließenden Änderungen.

Lesen ab Anfang

In anderen Szenarien wie Datenmigrationen oder Analysen des gesamten Verlaufs eines Containers muss der Änderungsfeed vom Anfang der Lebensdauer dieses Containers gelesen werden. Sie können für die Generatorerweiterung WithStartTime verwenden, aber übergeben DateTime.MinValue.ToUniversalTime(), wodurch die UTC-Darstellung des Minimalwerts DateTime wie in diesem Beispiel generiert wird:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Der Änderungsfeedprozessor wird initialisiert und beginnt mit dem Lesen von Änderungen ab dem Anfang der Lebensdauer des Containers.

Hinweis

Diese Anpassungsoptionen dienen nur dazu, den Startzeitpunkt für den Änderungsfeedprozessor festzulegen. Sobald der Leasecontainer zum ersten Mal initialisiert wurde, hat eine Änderung dieser Optionen keine Auswirkungen mehr.

Änderungsfeed und bereitgestellter Durchsatz

Lesevorgänge des Änderungsfeeds für den überwachten Container verbrauchen Anforderungseinheiten. Stellen Sie sicher, dass für Ihren überwachten Container keine Drosselung auftritt. Die Drosselung führt zu Verzögerungen beim Empfang von Änderungsfeedereignissen auf Ihren Prozessoren.

Vorgänge im Leasedauercontainer (Aktualisierung und Erhaltung des Status) nutzen Anforderungseinheiten. Je höher die Anzahl der Instanzen, die denselben Leasedauercontainer verwenden, desto höher ist der potenzielle Verbrauch an Anforderungseinheiten. Stellen Sie sicher, dass für Ihren Leasecontainer keine Drosselung auftritt. Die Einschränkung führt zu Verzögerungen beim Empfangen von Änderungsfeedereignissen. Die Drosselung kann die Verarbeitung sogar vollständig beenden.

Den Leasecontainer freigeben

Sie können einen Leasecontainer über mehrere Bereitstellungseinheiten hinweg freigeben. In einem freigegebenen Leasecontainer lauscht jede Bereitstellungseinheit auf einen anderen überwachten Container oder hat einen anderen Wert für processorName. Mit dieser Konfiguration würde jede Bereitstellungseinheit einen unabhängigen Status im Leasedauercontainer beibehalten. Überprüfen Sie den Anforderungseinheitenverbrauch im Leasedauercontainer, um sicherzustellen, dass der bereitgestellte Durchsatz für alle Bereitstellungseinheiten ausreichend ist.

Erweiterte Leasekonfiguration

Drei Schlüsselkonfigurationen können die Funktionsweise des Änderungsfeedprozessors beeinflussen. Jede Konfiguration wirkt sich auf den Anforderungseinheitenverbrauch im Leasecontainer aus. Sie können eine der folgenden Konfigurationen festlegen, wenn Sie den Änderungsfeedprozessor erstellen, verwenden Sie sie aber mit Bedacht:

  • Leaseabruf: Standardmäßig alle 17 Sekunden. Ein Host überprüft in regelmäßigen Abständen den Status des Leasespeichers und erwägt den Erwerb von Leases im Rahmen des dynamischen Skalierungsprozesses. Dieser Prozess erfolgt durch Ausführen einer Abfrage für den Leasedauercontainer. Durch die Reduzierung dieses Werts werden die Neugewichtung und der Erwerb von Leases beschleunigt, aber der Verbrauch von Anforderungseinheiten für den Leasedauercontainer wird erhöht.
  • Leaseablauf: Standardmäßig 60 Sekunden. Definiert die maximale Zeitspanne, die eine Lease ohne Erneuerungsaktivität bestehen kann, bevor sie von einem anderen Host abgerufen wird. Wenn ein Host abstürzt, werden die Leases, die er besitzt, nach diesem Zeitraum plus dem konfigurierten Verlängerungsintervall von anderen Hosts abgerufen. Wenn Sie diesen Wert reduzieren, erfolgt die Wiederherstellung nach einem Hostabsturz schneller, aber der Ablaufwert sollte niemals niedriger als das Verlängerungsintervall sein.
  • Leaseverlängerung: Standardmäßig alle 13 Sekunden. Ein Host, der eine Lease besitzt, erneuert sie regelmäßig, auch wenn keine neuen Änderungen verwendet werden müssen. Dieser Prozess erfolgt durch Ausführen eines Ersetzenvorgangs für die Lease. Wenn Sie diesen Wert reduzieren, wird die Zeit verringert, die zum Erkennen von Leaseverlusten durch einen Hostabsturz erforderlich ist, aber der Verbrauch von Anforderungseinheiten für den Leasedauercontainer wird erhöht.

Hosten des Änderungsfeedprozessors

Der Änderungsfeedprozessor kann auf allen Plattformen gehostet werden, die zeitintensive Prozesse oder Tasks unterstützen. Im Folgenden finden Sie einige Beispiele:

Der Änderungsfeedprozessor kann zwar in kurzlebigen Umgebungen ausgeführt werden, da der Leasecontainer den Zustand beibehält, durch den Startzyklus dieser Umgebungen kommt es jedoch zu einer Verzögerung beim Empfangen der Benachrichtigungen (aufgrund des Mehraufwands beim Prozessorstart bei jedem Start der Umgebung).

Zusätzliche Ressourcen

Nächste Schritte

In den folgenden Artikeln erfahren Sie mehr über den Änderungsfeedprozessor: