Azure Event Hubs Prüfpunktspeicher-Clientbibliothek für Java– Version 1.17.0
Verwenden von Speicherblobs
Azure Event Hubs Prüfpunktspeicher kann zum Speichern von Prüfpunkten verwendet werden, während Ereignisse von Azure Event Hubs verarbeitet werden.
Dieses Paket verwendet Storage-Blobs als beständigen Speicher zum Verwalten von Prüfpunkten und Partitionsbesitzinformationen.
Der BlobCheckpointStore
in diesem Paket bereitgestellte kann an EventProcessor
angeschlossen werden.
Quellcode | API-Referenzdokumentation | Produktdokumentation | Beispiele
Erste Schritte
Voraussetzungen
- Java Development Kit (JDK), Version 8 oder höher.
- Maven
- Microsoft Azure-Abonnement
- Sie können ein kostenloses Konto erstellen unter: https://azure.microsoft.com
- Azure Event Hubs instance
- Schritt-für-Schritt-Anleitung zum Erstellen eines Event Hubs über das Azure-Portal
- Azure-Speicherkonto
- Schritt-für-Schritt-Anleitung zum Erstellen eines Speicherkontos über das Azure-Portal
Einschließen des Pakets
BOM-Datei einfügen
Fügen Sie azure-sdk-bom in Ihr Projekt ein, um von der Allgemeinverfügbarkeitsversion der Bibliothek abhängig zu sein. Ersetzen Sie im folgenden Codeausschnitt den Platzhalter {bom_version_to_target} durch die Versionsnummer. Weitere Informationen zur BOM finden Sie in der INFODATEI FÜR AZURE SDK-STÜCKLISTEN.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
und fügen Sie dann die direkte Abhängigkeit wie unten dargestellt ohne das Versionstag in den Abschnitt abhängigkeiten ein.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
</dependency>
</dependencies>
Direkte Abhängigkeiten einfügen
Wenn Sie eine Abhängigkeit von einer bestimmten Version der Bibliothek annehmen möchten, die nicht in der BoM vorhanden ist, fügen Sie die direkte Abhängigkeit wie folgt zu Ihrem Projekt hinzu.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.17.0</version>
</dependency>
Authentifizieren des Speichercontainerclients
Um eine instance von BlobCheckpointStore
zu erstellen, sollte zuerst ein ContainerAsyncClient
mit dem entsprechenden SAS-Token mit Schreibzugriff und Verbindungszeichenfolge erstellt werden. Um dies zu ermöglichen, benötigen Sie die Konto-SAS-Zeichenfolge (Shared Access Signature) des Speicherkontos. Weitere Informationen finden Sie unter SAS-Token.
Wichtige Begriffe
Setzen von Prüfpunkten
Setzen von Prüfpunkten ist ein Vorgang, bei dem Leser ihre Position innerhalb einer Partitionsereignissequenz markieren oder bestätigen. Dies liegt in der Verantwortung des Consumers und erfolgt auf Partitionsbasis innerhalb einer Consumergruppe. Das bedeutet, dass jeder Partitionsleser für jede Consumergruppe seine aktuelle Position im Ereignisstream nachverfolgen muss und den Dienst informieren kann, wenn er den Datenstrom als abgeschlossen betrachtet. Wenn ein Leser die Verbindung zu eine Partition trennt, beginnt nach dem erneuten Herstellen der Verbindung das Lesen bei dem Prüfpunkt, der zuvor durch den letzten Leser dieser Partition in dieser Consumergruppe übermittelt wurde. Wenn der Leser verbunden ist, übergibt er diesen Offset an den Event Hub, um die Position für den nächsten Lesevorgang anzugeben. Auf diese Weise können mithilfe von Prüfpunkten Ereignisse von Downstreamanwendungen als abgeschlossen markiert werden. Darüber hinaus sorgen Prüfpunkte für Resilienz bei einem Failover zwischen Lesern, die auf unterschiedlichen Computern ausgeführt werden. Sie können ältere Daten zurückgeben, indem Sie einen niedrigeren Offset aus diesem Prüfpunktprozess angeben. Durch diesen Mechanismus ermöglicht das Setzen von Prüfpunkten Failoverstabilität und eine erneute Wiedergabe des Ereignisstreams.
Offsets-Sequenznummern &
Beide Offsetsequenznummern & beziehen sich auf die Position eines Ereignisses innerhalb einer Partition. Sie können sich sie als clientseitigen Cursor vorstellen. Der Offset ist eine Nummerierung des Ereignisses in Byte. Mit der Offset-/Sequenznummer kann ein Ereignisconsumer (Reader) einen Punkt im Ereignisdatenstrom angeben, von dem aus mit dem Lesen von Ereignissen begonnen werden soll. Sie können den Zeitstempel so angeben, dass Sie Ereignisse empfangen, die erst nach dem angegebenen Zeitstempel in die Warteschlange eingereiht wurden. Consumer sind dafür verantwortlich, ihre eigenen Offsetwerte außerhalb des Event Hubs-Diensts zu speichern. Innerhalb einer Partition enthält jedes Ereignis einen Offset, eine Sequenznummer und den Zeitstempel, zu dem es in die Warteschlange gestellt wurde.
Beispiele
- Erstellen einer instance des Speichercontainerclients
- Erstellen eines instance mithilfe von Azure Identity
- Nutzen von Ereignissen aus allen Event Hub-Partitionen
- Angeben der Speicherversion zum Erstellen des Prüfpunktspeichers
Erstellen einer instance des Speichercontainers mit SAS-Token
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
Nutzen von Ereignissen mithilfe eines Ereignisprozessorclients
Um Ereignisse für alle Partitionen eines Event Hubs zu nutzen, erstellen Sie eine EventProcessorClient
für eine bestimmte Consumergruppe. Wenn ein Event Hub erstellt wird, wird eine Standardconsumergruppe bereitgestellt, die für die ersten Schritte verwendet werden kann.
Die EventProcessorClient
delegieren die Verarbeitung von Ereignissen an eine von Ihnen bereitgestellte Rückruffunktion, sodass Sie sich auf die Logik konzentrieren können, die zum Bereitstellen von Werten erforderlich ist, während der Prozessor für die Verwaltung der zugrunde liegenden Consumervorgänge verantwortlich ist.
In unserem Beispiel konzentrieren wir uns auf das Erstellen von EventProcessor
, verwenden BlobCheckpointStore
sie und eine einfache Rückruffunktion, um die von den Event Hubs empfangenen Ereignisse zu verarbeiten, in die Konsole zu schreiben und den Prüfpunkt im Blobspeicher nach jedem Ereignis zu aktualisieren.
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
Problembehandlung
Aktivieren der Clientprotokollierung
Das Azure SDK für Java bietet einen konsistenten Protokollierungsverlauf, um die Behandlung von Anwendungsfehlern zu unterstützen und deren Lösung zu beschleunigen. Die erstellten Protokolle erfassen den Flow einer Anwendung, bevor sie den Endzustand erreichen. Dies trägt zur Ermittlung der Grundursache bei. Informationen zum Aktivieren der Protokollierung finden Sie im Protokollierungswiki.
SSL-Standardbibliothek
Alle Clientbibliotheken verwenden standardmäßig die Tomcat-native Boring-SSL-Bibliothek, um die Leistung auf nativer Ebene für SSL-Vorgänge zu ermöglichen. Die Boring-SSL-Bibliothek ist eine Uber-JAR-Datei mit nativen Bibliotheken für Linux/macOS/Windows und bietet im Vergleich zur SSL-Standardimplementierung im JDK eine bessere Leistung. Weitere Informationen, einschließlich zur Reduzierung der Abhängigkeitsgröße, finden Sie im Abschnitt Leistungsoptimierung des Wikis.
Nächste Schritte
Beginnen Sie, indem Sie sich die Beispiele hier ansehen.
Mitwirken
Wenn Sie ein aktiver Mitwirkender zu diesem Projekt werden möchten, lesen Sie bitte unsere Beitragsrichtlinien für weitere Informationen.