Senden von Ereignissen an oder Empfangen von Ereignissen von Event Hubs mithilfe von Java
In dieser Schnellstartanleitung erfahren Sie, wie Sie mithilfe des Java-Pakets azure-messaging-eventhubs Ereignisse an einen Event Hub senden bzw. von dort empfangen.
Tipp
Wenn Sie mit Azure Event Hubs-Ressourcen in einer Spring-Anwendung arbeiten, sollten Sie Spring Cloud Azure als Alternative in Betracht ziehen. Spring Cloud Azure ist ein Open-Source-Projekt, das eine nahtlose Spring-Integration mit Azure-Diensten ermöglicht. Weitere Informationen zu Spring Cloud Azure und ein Beispiel für die Verwendung von Event Hubs finden Sie unter Spring Cloud Stream mit Azure Event Hubs.
Voraussetzungen
Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.
Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:
- Microsoft Azure-Abonnement. Für die Verwendung von Azure-Diensten benötigen Sie ein Abonnement. Das gilt auch für Azure Event Hubs. Falls Sie noch nicht über ein Azure-Konto verfügen, können Sie sich für eine kostenlose Testversion registrieren oder beim Erstellen eines Kontos Ihre MSDN-Abonnentenvorteile nutzen.
- Eine Java-Entwicklungsumgebung. In dieser Schnellstartanleitung wird Eclipse verwendet. Es wird mindestens Java Development Kit (JDK) mit der Version 8 benötigt.
- Erstellen Sie einen Event Hubs-Namespace und einen Event Hub. Verwenden Sie zunächst das Azure-Portal, um einen Namespace vom Typ „Event Hubs“ zu erstellen, und beschaffen Sie die Verwaltungsanmeldeinformationen, die Ihre Anwendung für die Kommunikation mit dem Event Hub benötigt. Erstellen Sie anhand der Anleitung in diesem Artikel einen Namespace und einen Event Hub. Gehen Sie dann wie im folgenden Artikel beschrieben vor, um die Verbindungszeichenfolge für den Event Hubs-Namespace abzurufen: Abrufen der Verbindungszeichenfolge. Sie verwenden die Verbindungszeichenfolge im weiteren Verlauf dieser Schnellstartanleitung.
Senden von Ereignisse
In diesem Abschnitt erfahren Sie, wie Sie eine Java-Anwendung zum Senden von Ereignissen an einen Event Hub erstellen.
Hinzufügen von Verweisen auf die Azure Event Hubs-Bibliothek
Erstellen Sie zuerst ein neues Maven-Projekt für eine Konsolen-/Shellanwendung in Ihrer bevorzugten Java-Entwicklungsumgebung. Aktualisieren Sie die Datei pom.xml
wie folgt. Die Java-Clientbibliothek für Event Hubs steht über das zentrale Maven-Repository zur Verfügung.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.18.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.11.2</version>
<scope>compile</scope>
</dependency>
Hinweis
Aktualisieren Sie Ihre Version auf die neueste Version, die im Maven-Repository veröffentlicht wurde.
Authentifizieren der App bei Azure
In dieser Schnellstartanleitung werden zwei Möglichkeiten zum Herstellen einer Verbindung mit Azure Event Hubs gezeigt: kennwortlos und Verbindungszeichenfolge. Die erste Option zeigt Ihnen, wie Sie Ihren Sicherheitsprinzipal in Microsoft Entra ID und die rollenbasierte Zugriffssteuerung (RBAC) verwenden, um eine Verbindung zu einem Event Hubs Namespace herzustellen. Sie müssen sich keine Gedanken darüber machen, dass hartcodierte Verbindungszeichenfolgen in Ihrem Code oder in einer Konfigurationsdatei oder in einem sicheren Speicher wie Azure Key Vault vorhanden sind. Die zweite Option zeigt, wie Sie mithilfe einer Verbindungszeichenfolge eine Verbindung mit einem Event Hubs-Namespace herstellen. Wenn Sie noch nicht mit Azure vertraut sind, ist die Option mit der Verbindungszeichenfolge möglicherweise einfacher. In realen Anwendungen und Produktionsumgebungen wird die kennwortlose Option empfohlen. Weitere Informationen finden Sie unter Authentifizierung und Autorisierung. Weitere Informationen zur kennwortlosen Authentifizierung finden Sie auch auf der Übersichtsseite.
Zuweisen von Rollen zu Ihrem Microsoft Entra-Benutzer
Achten Sie bei der lokalen Entwicklung darauf, dass das Benutzerkonto, das die Verbindung mit Azure Event Hubs herstellt, über die korrekten Berechtigungen verfügt. Zum Senden und Empfangen von Nachrichten ist die Rolle Azure Event Hubs-Datenbesitzer erforderlich. Um sich selbst diese Rolle zuweisen zu können, benötigen Sie die Rolle „Benutzerzugriffsadministrator“ oder eine andere Rolle, die die Aktion Microsoft.Authorization/roleAssignments/write
umfasst. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen zu den verfügbaren Bereichen für Rollenzuweisungen finden Sie auf der Seite Grundlegendes zum Bereich von Azure RBAC.
Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Azure Event Hubs Data Owner
zugewiesen. Diese Rolle bietet Vollzugriff auf Azure Event Hubs-Ressourcen. Halten Sie sich in einem echten Szenario an das Prinzip der geringsten Rechte, um Benutzern nur die benötigten Mindestberechtigungen zu erteilen und so die Produktionsumgebung besser zu schützen.
In Azure integrierte Rollen für Azure Event Hubs
Bei Azure Event Hubs ist die Verwaltung der Namespaces und aller zugehörigen Ressourcen über das Azure-Portal und die Azure-Ressourcenverwaltungs-API bereits durch das Azure RBAC-Modell geschützt. Azure stellt die folgenden integrierten Azure-Rollen zum Autorisieren des Zugriffs auf einen Event Hubs-Namespace bereit:
- Azure Event Hubs-Datenbesitzer: Ermöglicht den Datenzugriff auf einen Event Hubs-Namespace und seine Entitäten (Warteschlangen, Themen, Abonnements und Filter).
- Azure Event Hubs-Datensender: Verwenden Sie diese Rolle, um dem Event Hubs-Namespace und seinen Entitäten Sendezugriff zu erteilen.
- Azure Event Hubs-Datenempfänger: Verwenden Sie diese Rolle, um dem Event Hubs-Namespace und seinen Entitäten Empfangszugriff zu erteilen.
Informationen zum Erstellen einer benutzerdefinierten Rolle finden Sie unter Erforderliche Rechte für Event Hubs-Vorgänge.
Wichtig
In der Regel dauert die Verteilung der Rollenzuweisung in Azure ein bis zwei Minuten. In seltenen Fällen kann sie aber bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.
Navigieren Sie im Azure-Portal zu Ihrem Event Hubs-Namespace. Verwenden Sie dazu entweder die Hauptsuchleiste oder die linke Navigationsleiste.
Wählen Sie auf der Übersichtsseite im linken Menü die Option Zugriffssteuerung (IAM) aus.
Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.
Wählen Sie im oberen Menü + Hinzufügen und aus dem dann angezeigten Dropdownmenü die Option Rollenzuweisung hinzufügen aus.
Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach
Azure Event Hubs Data Owner
, und wählen Sie das entsprechende Ergebnis aus. Klicken Sie dann auf Weiter.Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.
Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.
Wählen Sie Überprüfen und zuweisen aus, um zur letzten Seite zu gelangen, und wählen Sie erneut Überprüfen und zuweisen aus, um den Vorgang abzuschließen.
Schreiben von Code zum Senden von Nachrichten an den Event Hub
Fügen Sie eine Klasse namens Sender
sowie den folgenden Code zur Klasse hinzu:
Wichtig
- Ersetzen Sie
<NAMESPACE NAME>
durch den Namen Ihres Event Hubs-Namespace. - Ersetzen Sie
<EVENT HUB NAME>
durch den Namen Ihres Event Hubs.
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hub.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
Erstellen Sie das Programm, und vergewissern Sie sich, dass keine Fehler vorhanden sind. Dieses Programm wird nach dem Empfängerprogramm ausgeführt.
Empfangen von Ereignissen
Der Code in diesem Tutorial basiert auf dem EventProcessorClient-Beispiel auf GitHub. Dort können Sie sich die vollständige funktionierende Anwendung ansehen.
Befolgen Sie diese Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:
- Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
- Verwenden Sie weder den Container noch das Speicherkonto für andere Zwecke.
- Das Speicherkonto sollte sich in derselben Region befinden, in der sich die bereitgestellte Anwendung befindet. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.
Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.
- Hierarchischer Namespace
- Vorläufiges Löschen von Blobs
- Versionsverwaltung
Erstellen eines Azure Storage-Kontos und eines Blobcontainers
In dieser Schnellstartanleitung wird Azure Storage (insbesondere Blob Storage) als Prüfpunktspeicher verwendet. Das Festlegen von Prüfpunkten ist ein Vorgang, durch den ein Ereignisprozessor die Position des letzten erfolgreich verarbeiteten Ereignisses innerhalb einer Partition markiert oder committet. Das Markieren eines Prüfpunkts erfolgt in der Regel innerhalb der Funktion, die die Ereignisse verarbeitet. Weitere Informationen zu Prüfpunkten finden Sie unter Ereignisprozessor.
Führen Sie die folgenden Schritte aus, um ein Azure Storage-Konto zu erstellen:
- Erstellen Sie ein Azure Storage-Konto.
- Erstellen eines Blobcontainers
- Authentifizieren beim Blobcontainer
Stellen Sie beim lokalen Entwickeln sicher, dass das Benutzerkonto, das auf Blobdaten zugreift, die richtigen Berechtigungen hat. Sie benötigen die Berechtigung Mitwirkender an Storage-Blobdaten zum Lesen und Schreiben von Blobdaten. Um sich selbst diese Rolle zuweisen zu können, benötigen Sie die Rolle Benutzerzugriffsadministrator oder eine andere Rolle, die die Aktion Microsoft.Authorization/roleAssignments/write enthält. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen zu den verfügbaren Bereichen für Rollenzuweisungen finden Sie auf der Seite Bereichsübersicht.
In diesem Szenario weisen Sie Ihrem Benutzerkonto Berechtigungen zu, die auf das Speicherkonto zugeschnitten sind, um dem Prinzip der geringsten Rechte zu folgen. Auf diese Weise erhalten Benutzer nur die erforderlichen Mindestberechtigungen, und es entstehen sicherere Produktionsumgebungen.
Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Mitwirkender an Storage-Blobdaten zugewiesen, die sowohl Lese- als auch Schreibzugriff auf Blobdaten in Ihrem Speicherkonto ermöglicht.
Wichtig
In den meisten Fällen dauert es eine oder zwei Minute(n), bis die Rollenzuweisung in Azure weitergegeben wird. In seltenen Fällen kann es aber bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.
Suchen Sie im Azure-Portal Ihr Speicherkonto mithilfe der Hauptsuchleiste oder der linken Navigationsleiste.
Wählen Sie auf der Übersichtsseite des Speicherkontos im linken Menü die Option Zugriffssteuerung (IAM) aus.
Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.
Wählen Sie im oberen Menü + Hinzufügen und aus dem dann angezeigten Dropdownmenü die Option Rollenzuweisung hinzufügen aus.
Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Mitwirkender an Speicherblobdaten, wählen Sie das entsprechende Ergebnis und dann Weiter aus.
Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.
Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.
Wählen Sie Überprüfen und zuweisen aus, um zur letzten Seite zu gelangen, und wählen Sie erneut Überprüfen und zuweisen aus, um den Vorgang abzuschließen.
Hinzufügen Event Hubs-Bibliotheken zu Ihrem Java-Projekt
Fügen Sie in der Datei „pom.xml“ die folgenden Abhängigkeiten ein.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
Fügen Sie die folgenden
import
-Anweisungen am Anfang der Java-Datei ein.import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
Erstellen Sie eine Klasse namens
Receiver
, und fügen Sie die den Zeichenfolgenvariablen der Klasse hinzu. Ersetzen Sie die Platzhalter durch die richtigen Werte.Wichtig
Ersetzen Sie die Platzhalter durch die richtigen Werte.
<NAMESPACE NAME>
durch den Namen Ihres Event Hubs-Namespace.<EVENT HUB NAME>
mit dem Namen Ihres Event Hubs im Namespace.
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
Fügen Sie der Klasse die folgende
main
-Methode hinzu.Wichtig
Ersetzen Sie die Platzhalter durch die richtigen Werte.
<STORAGE ACCOUNT NAME>
durch den Namen Ihres Azure Storage-Kontos<CONTAINER NAME>
durch den Namen des Blobcontainers im Speicherkonto
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
Fügen Sie die beiden Hilfsmethoden (
PARTITION_PROCESSOR
undERROR_HANDLER
) hinzu, die Ereignisse und Fehler in derReceiver
-Klasse verarbeiten.public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
Erstellen Sie das Programm, und vergewissern Sie sich, dass keine Fehler vorhanden sind.
Ausführen der Anwendungen
Führen Sie zuerst die Anwendung receiver (Empfänger) aus.
Führen Sie dann die Anwendung sender (Absender) aus.
Vergewissern Sie sich im Fenster der Anwendung receiver (Empfänger), dass die von der Absenderanwendung veröffentlichten Ereignisse angezeigt werden.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
Drücken Sie im Fenster der Anwendung receiver (Empfänger) die EINGABETASTE, um die Anwendung zu beenden.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
Nächste Schritte
Im Folgenden sind auf GitHub verfügbare Beispiele aufgeführt: