Senden von Ereignissen an oder Empfangen von Ereignissen von Event Hubs mithilfe von Java

In dieser Schnellstartanleitung erfahren Sie, wie Sie mithilfe des Java-Pakets azure-messaging-eventhubs Ereignisse an einen Event Hub senden bzw. von dort empfangen.

Tipp

Wenn Sie mit Azure Event Hubs-Ressourcen in einer Spring-Anwendung arbeiten, sollten Sie Spring Cloud Azure als Alternative in Betracht ziehen. Spring Cloud Azure ist ein Open-Source-Projekt, das eine nahtlose Spring-Integration mit Azure-Diensten ermöglicht. Weitere Informationen zu Spring Cloud Azure und ein Beispiel für die Verwendung von Event Hubs finden Sie unter Spring Cloud Stream mit Azure Event Hubs.

Voraussetzungen

Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.

Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:

  • Microsoft Azure-Abonnement. Für die Verwendung von Azure-Diensten benötigen Sie ein Abonnement. Das gilt auch für Azure Event Hubs. Falls Sie noch nicht über ein Azure-Konto verfügen, können Sie sich für eine kostenlose Testversion registrieren oder beim Erstellen eines Kontos Ihre MSDN-Abonnentenvorteile nutzen.
  • Eine Java-Entwicklungsumgebung. In dieser Schnellstartanleitung wird Eclipse verwendet. Es wird mindestens Java Development Kit (JDK) mit der Version 8 benötigt.
  • Erstellen Sie einen Event Hubs-Namespace und einen Event Hub. Verwenden Sie zunächst das Azure-Portal, um einen Namespace vom Typ „Event Hubs“ zu erstellen, und beschaffen Sie die Verwaltungsanmeldeinformationen, die Ihre Anwendung für die Kommunikation mit dem Event Hub benötigt. Erstellen Sie anhand der Anleitung in diesem Artikel einen Namespace und einen Event Hub. Gehen Sie dann wie im folgenden Artikel beschrieben vor, um die Verbindungszeichenfolge für den Event Hubs-Namespace abzurufen: Abrufen der Verbindungszeichenfolge. Sie verwenden die Verbindungszeichenfolge im weiteren Verlauf dieser Schnellstartanleitung.

Senden von Ereignisse

In diesem Abschnitt erfahren Sie, wie Sie eine Java-Anwendung zum Senden von Ereignissen an einen Event Hub erstellen.

Hinzufügen von Verweisen auf die Azure Event Hubs-Bibliothek

Erstellen Sie zuerst ein neues Maven-Projekt für eine Konsolen-/Shellanwendung in Ihrer bevorzugten Java-Entwicklungsumgebung. Aktualisieren Sie die Datei pom.xml wie folgt. Die Java-Clientbibliothek für Event Hubs steht über das zentrale Maven-Repository zur Verfügung.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Hinweis

Aktualisieren Sie Ihre Version auf die neueste Version, die im Maven-Repository veröffentlicht wurde.

Authentifizieren der App bei Azure

In dieser Schnellstartanleitung werden zwei Möglichkeiten zum Herstellen einer Verbindung mit Azure Event Hubs gezeigt: kennwortlos und Verbindungszeichenfolge. Die erste Option zeigt Ihnen, wie Sie Ihren Sicherheitsprinzipal in Microsoft Entra ID und die rollenbasierte Zugriffssteuerung (RBAC) verwenden, um eine Verbindung zu einem Event Hubs Namespace herzustellen. Sie müssen sich keine Gedanken darüber machen, dass hartcodierte Verbindungszeichenfolgen in Ihrem Code oder in einer Konfigurationsdatei oder in einem sicheren Speicher wie Azure Key Vault vorhanden sind. Die zweite Option zeigt, wie Sie mithilfe einer Verbindungszeichenfolge eine Verbindung mit einem Event Hubs-Namespace herstellen. Wenn Sie noch nicht mit Azure vertraut sind, ist die Option mit der Verbindungszeichenfolge möglicherweise einfacher. In realen Anwendungen und Produktionsumgebungen wird die kennwortlose Option empfohlen. Weitere Informationen finden Sie unter Authentifizierung und Autorisierung. Weitere Informationen zur kennwortlosen Authentifizierung finden Sie auch auf der Übersichtsseite.

Zuweisen von Rollen zu Ihrem Microsoft Entra-Benutzer

Achten Sie bei der lokalen Entwicklung darauf, dass das Benutzerkonto, das die Verbindung mit Azure Event Hubs herstellt, über die korrekten Berechtigungen verfügt. Zum Senden und Empfangen von Nachrichten ist die Rolle Azure Event Hubs-Datenbesitzer erforderlich. Um sich selbst diese Rolle zuweisen zu können, benötigen Sie die Rolle „Benutzerzugriffsadministrator“ oder eine andere Rolle, die die Aktion Microsoft.Authorization/roleAssignments/write umfasst. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen zu den verfügbaren Bereichen für Rollenzuweisungen finden Sie auf der Seite Grundlegendes zum Bereich von Azure RBAC.

Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Azure Event Hubs Data Owner zugewiesen. Diese Rolle bietet Vollzugriff auf Azure Event Hubs-Ressourcen. Halten Sie sich in einem echten Szenario an das Prinzip der geringsten Rechte, um Benutzern nur die benötigten Mindestberechtigungen zu erteilen und so die Produktionsumgebung besser zu schützen.

In Azure integrierte Rollen für Azure Event Hubs

Bei Azure Event Hubs ist die Verwaltung der Namespaces und aller zugehörigen Ressourcen über das Azure-Portal und die Azure-Ressourcenverwaltungs-API bereits durch das Azure RBAC-Modell geschützt. Azure stellt die folgenden integrierten Azure-Rollen zum Autorisieren des Zugriffs auf einen Event Hubs-Namespace bereit:

Informationen zum Erstellen einer benutzerdefinierten Rolle finden Sie unter Erforderliche Rechte für Event Hubs-Vorgänge.

Wichtig

In der Regel dauert die Verteilung der Rollenzuweisung in Azure ein bis zwei Minuten. In seltenen Fällen kann sie aber bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.

  1. Navigieren Sie im Azure-Portal zu Ihrem Event Hubs-Namespace. Verwenden Sie dazu entweder die Hauptsuchleiste oder die linke Navigationsleiste.

  2. Wählen Sie auf der Übersichtsseite im linken Menü die Option Zugriffssteuerung (IAM) aus.

  3. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.

  4. Wählen Sie im oberen Menü + Hinzufügen und aus dem dann angezeigten Dropdownmenü die Option Rollenzuweisung hinzufügen aus.

    A screenshot showing how to assign a role.

  5. Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Azure Event Hubs Data Owner, und wählen Sie das entsprechende Ergebnis aus. Klicken Sie dann auf Weiter.

  6. Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.

  7. Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.

  8. Wählen Sie Überprüfen und zuweisen aus, um zur letzten Seite zu gelangen, und wählen Sie erneut Überprüfen und zuweisen aus, um den Vorgang abzuschließen.

Schreiben von Code zum Senden von Nachrichten an den Event Hub

Fügen Sie eine Klasse namens Sender sowie den folgenden Code zur Klasse hinzu:

Wichtig

  • Ersetzen Sie <NAMESPACE NAME> durch den Namen Ihres Event Hubs-Namespace.
  • Ersetzen Sie <EVENT HUB NAME> durch den Namen Ihres Event Hubs.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Erstellen Sie das Programm, und vergewissern Sie sich, dass keine Fehler vorhanden sind. Dieses Programm wird nach dem Empfängerprogramm ausgeführt.

Empfangen von Ereignissen

Der Code in diesem Tutorial basiert auf dem EventProcessorClient-Beispiel auf GitHub. Dort können Sie sich die vollständige funktionierende Anwendung ansehen.

Befolgen Sie diese Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:

  • Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
  • Verwenden Sie weder den Container noch das Speicherkonto für andere Zwecke.
  • Das Speicherkonto sollte sich in derselben Region befinden, in der sich die bereitgestellte Anwendung befindet. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.

Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.

  • Hierarchischer Namespace
  • Vorläufiges Löschen von Blobs
  • Versionsverwaltung

Erstellen eines Azure Storage-Kontos und eines Blobcontainers

In dieser Schnellstartanleitung wird Azure Storage (insbesondere Blob Storage) als Prüfpunktspeicher verwendet. Das Festlegen von Prüfpunkten ist ein Vorgang, durch den ein Ereignisprozessor die Position des letzten erfolgreich verarbeiteten Ereignisses innerhalb einer Partition markiert oder committet. Das Markieren eines Prüfpunkts erfolgt in der Regel innerhalb der Funktion, die die Ereignisse verarbeitet. Weitere Informationen zu Prüfpunkten finden Sie unter Ereignisprozessor.

Führen Sie die folgenden Schritte aus, um ein Azure Storage-Konto zu erstellen:

  1. Erstellen Sie ein Azure Storage-Konto.
  2. Erstellen eines Blobcontainers
  3. Authentifizieren beim Blobcontainer

Stellen Sie beim lokalen Entwickeln sicher, dass das Benutzerkonto, das auf Blobdaten zugreift, die richtigen Berechtigungen hat. Sie benötigen die Berechtigung Mitwirkender an Storage-Blobdaten zum Lesen und Schreiben von Blobdaten. Um sich selbst diese Rolle zuweisen zu können, benötigen Sie die Rolle Benutzerzugriffsadministrator oder eine andere Rolle, die die Aktion Microsoft.Authorization/roleAssignments/write enthält. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen zu den verfügbaren Bereichen für Rollenzuweisungen finden Sie auf der Seite Bereichsübersicht.

In diesem Szenario weisen Sie Ihrem Benutzerkonto Berechtigungen zu, die auf das Speicherkonto zugeschnitten sind, um dem Prinzip der geringsten Rechte zu folgen. Auf diese Weise erhalten Benutzer nur die erforderlichen Mindestberechtigungen, und es entstehen sicherere Produktionsumgebungen.

Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Mitwirkender an Storage-Blobdaten zugewiesen, die sowohl Lese- als auch Schreibzugriff auf Blobdaten in Ihrem Speicherkonto ermöglicht.

Wichtig

In den meisten Fällen dauert es eine oder zwei Minute(n), bis die Rollenzuweisung in Azure weitergegeben wird. In seltenen Fällen kann es aber bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.

  1. Suchen Sie im Azure-Portal Ihr Speicherkonto mithilfe der Hauptsuchleiste oder der linken Navigationsleiste.

  2. Wählen Sie auf der Übersichtsseite des Speicherkontos im linken Menü die Option Zugriffssteuerung (IAM) aus.

  3. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.

  4. Wählen Sie im oberen Menü + Hinzufügen und aus dem dann angezeigten Dropdownmenü die Option Rollenzuweisung hinzufügen aus.

    A screenshot showing how to assign a storage account role.

  5. Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Mitwirkender an Speicherblobdaten, wählen Sie das entsprechende Ergebnis und dann Weiter aus.

  6. Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.

  7. Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.

  8. Wählen Sie Überprüfen und zuweisen aus, um zur letzten Seite zu gelangen, und wählen Sie erneut Überprüfen und zuweisen aus, um den Vorgang abzuschließen.

Hinzufügen Event Hubs-Bibliotheken zu Ihrem Java-Projekt

Fügen Sie in der Datei „pom.xml“ die folgenden Abhängigkeiten ein.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Fügen Sie die folgenden import-Anweisungen am Anfang der Java-Datei ein.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Erstellen Sie eine Klasse namens Receiver, und fügen Sie die den Zeichenfolgenvariablen der Klasse hinzu. Ersetzen Sie die Platzhalter durch die richtigen Werte.

    Wichtig

    Ersetzen Sie die Platzhalter durch die richtigen Werte.

    • <NAMESPACE NAME> durch den Namen Ihres Event Hubs-Namespace.
    • <EVENT HUB NAME> mit dem Namen Ihres Event Hubs im Namespace.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Fügen Sie der Klasse die folgende main-Methode hinzu.

    Wichtig

    Ersetzen Sie die Platzhalter durch die richtigen Werte.

    • <STORAGE ACCOUNT NAME> durch den Namen Ihres Azure Storage-Kontos
    • <CONTAINER NAME> durch den Namen des Blobcontainers im Speicherkonto
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Fügen Sie die beiden Hilfsmethoden (PARTITION_PROCESSOR und ERROR_HANDLER) hinzu, die Ereignisse und Fehler in der Receiver-Klasse verarbeiten.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Erstellen Sie das Programm, und vergewissern Sie sich, dass keine Fehler vorhanden sind.

Ausführen der Anwendungen

  1. Führen Sie zuerst die Anwendung receiver (Empfänger) aus.

  2. Führen Sie dann die Anwendung sender (Absender) aus.

  3. Vergewissern Sie sich im Fenster der Anwendung receiver (Empfänger), dass die von der Absenderanwendung veröffentlichten Ereignisse angezeigt werden.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Drücken Sie im Fenster der Anwendung receiver (Empfänger) die EINGABETASTE, um die Anwendung zu beenden.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Nächste Schritte

Im Folgenden sind auf GitHub verfügbare Beispiele aufgeführt: