Używanie języka Java do wysyłania zdarzeń do usługi Azure Event Hubs lub odbierania z niej zdarzeń

W tym przewodniku Szybki start pokazano, jak wysyłać zdarzenia do centrum zdarzeń i odbierać je z centrum zdarzeń przy użyciu pakietu java azure-messaging-eventhubs .

Napiwek

Jeśli pracujesz z zasobami usługi Azure Event Hubs w aplikacji Spring, zalecamy rozważenie platformy Azure spring cloud jako alternatywy. Spring Cloud Azure to projekt typu open source, który zapewnia bezproblemową integrację platformy Spring z usługami platformy Azure. Aby dowiedzieć się więcej na temat platformy Azure Spring Cloud i zapoznać się z przykładem korzystania z usługi Event Hubs, zobacz Spring Cloud Stream with Azure Event Hubs (Usługa Spring Cloud Stream z usługą Azure Event Hubs).

Wymagania wstępne

Jeśli dopiero zaczynasz korzystać z usługi Azure Event Hubs, zobacz Omówienie usługi Event Hubs przed wykonaniem tego przewodnika Szybki start.

Do wykonania kroków tego przewodnika Szybki start niezbędne jest spełnienie następujących wymagań wstępnych:

  • Subskrypcja platformy Microsoft Azure. Do korzystania z usług platformy Azure, w tym usługi Azure Event Hubs, potrzebna jest subskrypcja. Jeśli nie masz istniejącego konta platformy Azure, możesz utworzyć konto bezpłatnej wersji próbnej lub skorzystać z korzyści dla subskrybentów MSDN podczas tworzenia konta.
  • Środowisko programistyczne Java. W tym przewodniku Szybki start jest używane środowisko Eclipse. Wymagany jest zestaw Java Development Kit (JDK) w wersji 8 lub nowszej.
  • Utwórz przestrzeń nazw usługi Event Hubs i centrum zdarzeń. Pierwszym krokiem jest skorzystanie z witryny Azure Portal w celu utworzenia przestrzeni nazw typu Event Hubs i uzyskania poświadczeń zarządzania wymaganych przez aplikację do komunikacji z centrum zdarzeń. Aby utworzyć przestrzeń nazw i centrum zdarzeń, wykonaj procedurę opisaną w tym artykule. Następnie pobierz parametry połączenia dla przestrzeni nazw usługi Event Hubs, postępując zgodnie z instrukcjami z artykułu: Pobierz parametry połączenia. W dalszej części tego przewodnika Szybki start użyjesz parametry połączenia.

Wysyłanie zdarzeń

W tej sekcji pokazano, jak utworzyć aplikację Java w celu wysyłania zdarzeń centrum zdarzeń.

Dodawanie odwołania do biblioteki usługi Azure Event Hubs

Najpierw utwórz nowy projekt Maven dla aplikacji konsolowej/powłoki w ulubionym środowisku programistycznym Java. pom.xml Zaktualizuj plik w następujący sposób. Biblioteka klienta Języka Java dla usługi Event Hubs jest dostępna w centralnym repozytorium Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Uwaga

Zaktualizuj wersję do najnowszej wersji opublikowanej w repozytorium Maven.

Uwierzytelnianie aplikacji na platformie Azure

W tym przewodniku Szybki start przedstawiono dwa sposoby nawiązywania połączenia z usługą Azure Event Hubs: bez hasła i parametry połączenia. Pierwsza opcja pokazuje, jak używać podmiotu zabezpieczeń w usłudze Microsoft Entra ID i kontroli dostępu opartej na rolach (RBAC) w celu nawiązania połączenia z przestrzenią nazw usługi Event Hubs. Nie musisz martwić się o zakodowane parametry połączenia w kodzie lub w pliku konfiguracji lub w bezpiecznym magazynie, na przykład w usłudze Azure Key Vault. Druga opcja pokazuje, jak używać parametry połączenia do nawiązywania połączenia z przestrzenią nazw usługi Event Hubs. Jeśli dopiero zaczynasz korzystać z platformy Azure, możesz znaleźć opcję parametry połączenia łatwiejszą do naśladowania. Zalecamy użycie opcji bez hasła w rzeczywistych aplikacjach i środowiskach produkcyjnych. Aby uzyskać więcej informacji, zobacz Uwierzytelnianie i autoryzacja. Więcej informacji na temat uwierzytelniania bez hasła można również uzyskać na stronie przeglądu.

Przypisywanie ról do użytkownika firmy Microsoft Entra

Podczas tworzenia aplikacji lokalnie upewnij się, że konto użytkownika, które nawiązuje połączenie z usługą Azure Event Hubs, ma odpowiednie uprawnienia. Aby wysyłać i odbierać komunikaty, musisz mieć rolę Właściciela danych usługi Azure Event Hubs. Aby przypisać sobie tę rolę, musisz mieć rolę Administracja istratora dostępu użytkowników lub inną rolę obejmującą Microsoft.Authorization/roleAssignments/write akcję. Role RBAC platformy Azure można przypisać użytkownikowi przy użyciu witryny Azure Portal, interfejsu wiersza polecenia platformy Azure lub programu Azure PowerShell. Dowiedz się więcej o dostępnych zakresach przypisań ról na stronie przeglądu zakresu.

Poniższy przykład przypisuje Azure Event Hubs Data Owner rolę do konta użytkownika, co zapewnia pełny dostęp do zasobów usługi Azure Event Hubs. W rzeczywistym scenariuszu postępuj zgodnie z zasadą najniższych uprawnień , aby dać użytkownikom tylko minimalne uprawnienia wymagane do bezpieczniejszego środowiska produkcyjnego.

Wbudowane role platformy Azure dla usługi Azure Event Hubs

W przypadku usługi Azure Event Hubs zarządzanie przestrzeniami nazw i wszystkimi powiązanymi zasobami za pośrednictwem witryny Azure Portal i interfejsu API zarządzania zasobami platformy Azure jest już chronione przy użyciu modelu RBAC platformy Azure. Platforma Azure udostępnia poniższe wbudowane role platformy Azure umożliwiające autoryzowanie dostępu do przestrzeni nazw usługi Event Hubs:

  • Właściciel danych usługi Azure Event Hubs: umożliwia dostęp danych do przestrzeni nazw usługi Event Hubs i jej jednostek (kolejek, tematów, subskrypcji i filtrów)
  • Nadawca danych usługi Azure Event Hubs: ta rola umożliwia nadawcy dostęp do przestrzeni nazw usługi Event Hubs i jej jednostek.
  • Odbiornik danych usługi Azure Event Hubs: ta rola umożliwia odbiorcy dostęp do przestrzeni nazw usługi Event Hubs i jej jednostek.

Jeśli chcesz utworzyć rolę niestandardową, zobacz Prawa wymagane dla operacji usługi Event Hubs.

Ważne

W większości przypadków propagacja przypisania roli na platformie Azure potrwa minutę lub dwie. W rzadkich przypadkach może upłynąć do ośmiu minut. Jeśli podczas pierwszego uruchomienia kodu wystąpią błędy uwierzytelniania, zaczekaj chwilę i spróbuj ponownie.

  1. W witrynie Azure Portal znajdź przestrzeń nazw usługi Event Hubs przy użyciu głównego paska wyszukiwania lub nawigacji po lewej stronie.

  2. Na stronie przeglądu wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) z menu po lewej stronie.

  3. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz kartę Przypisania ról.

  4. Wybierz pozycję + Dodaj z górnego menu, a następnie pozycję Dodaj przypisanie roli z wyświetlonego menu rozwijanego.

    A screenshot showing how to assign a role.

  5. Użyj pola wyszukiwania, aby filtrować wyniki do żądanej roli. W tym przykładzie wyszukaj Azure Event Hubs Data Owner i wybierz pasujący wynik. Następnie wybierz pozycję Dalej.

  6. W obszarze Przypisz dostęp do wybierz pozycję Użytkownik, grupa lub jednostka usługi, a następnie wybierz pozycję + Wybierz członków.

  7. W oknie dialogowym wyszukaj nazwę użytkownika firmy Microsoft Entra (zazwyczaj adres e-mail user@domain ), a następnie wybierz pozycję Wybierz w dolnej części okna dialogowego.

  8. Wybierz pozycję Przejrzyj i przypisz , aby przejść do ostatniej strony, a następnie ponownie przejrzyj i przypisz, aby ukończyć proces.

Pisanie kodu w celu wysyłania komunikatów do centrum zdarzeń

Dodaj klasę o nazwie Senderi dodaj następujący kod do klasy:

Ważne

  • Zaktualizuj <NAMESPACE NAME> ciąg nazwą przestrzeni nazw usługi Event Hubs.
  • Zaktualizuj <EVENT HUB NAME> element przy użyciu nazwy centrum zdarzeń.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Skompiluj program i upewnij się, że nie ma żadnych błędów. Uruchomisz ten program po uruchomieniu programu odbiorcy.

Odbieranie zdarzeń

Kod w tym samouczku jest oparty na przykładzie EventProcessorClient w usłudze GitHub, który można sprawdzić, aby wyświetlić pełną działającą aplikację.

Postępuj zgodnie z poniższymi zaleceniami w przypadku korzystania z usługi Azure Blob Storage jako magazynu punktów kontrolnych:

  • Użyj oddzielnego kontenera dla każdej grupy odbiorców. Możesz użyć tego samego konta magazynu, ale użyj jednego kontenera dla każdej grupy.
  • Nie używaj kontenera dla żadnych innych elementów i nie używaj konta magazynu dla innych elementów.
  • Konto magazynu powinno znajdować się w tym samym regionie co wdrożona aplikacja. Jeśli aplikacja jest lokalna, spróbuj wybrać możliwy region najbliżej.

Na stronie Konto magazynu w witrynie Azure Portal w sekcji Blob Service upewnij się, że następujące ustawienia są wyłączone.

  • Hierarchiczna przestrzeń nazw
  • Usuwanie nietrwałe obiektów blob
  • Wersje

Tworzenie usługi Azure Storage i kontenera obiektów blob

W tym przewodniku Szybki start użyjesz usługi Azure Storage (w szczególności usługi Blob Storage) jako magazynu punktów kontrolnych. Tworzenie punktów kontrolnych to proces, za pomocą którego procesor zdarzeń oznacza lub zatwierdza pozycję ostatniego pomyślnie przetworzonego zdarzenia w ramach partycji. Oznaczanie punktu kontrolnego jest zwykle wykonywane w ramach funkcji, która przetwarza zdarzenia. Aby dowiedzieć się więcej na temat tworzenia punktów kontrolnych, zobacz Procesor zdarzeń.

Wykonaj następujące kroki, aby utworzyć konto usługi Azure Storage.

  1. Tworzenie konta usługi Azure Storage
  2. Tworzenie kontenera obiektów blob
  3. Uwierzytelnianie w kontenerze obiektów blob

Podczas tworzenia aplikacji lokalnie upewnij się, że konto użytkownika, które uzyskuje dostęp do danych obiektów blob, ma odpowiednie uprawnienia. Będziesz potrzebować współautora danych obiektu blob usługi Storage, aby odczytywać i zapisywać dane obiektów blob. Aby przypisać sobie tę rolę, musisz przypisać rolę Administracja istratora dostępu użytkowników lub inną rolę obejmującą akcję Microsoft.Authorization/roleAssignments/write. Role RBAC platformy Azure można przypisać użytkownikowi przy użyciu witryny Azure Portal, interfejsu wiersza polecenia platformy Azure lub programu Azure PowerShell. Więcej informacji na temat dostępnych zakresów przypisań ról można znaleźć na stronie przeglądu zakresu.

W tym scenariuszu przypiszesz uprawnienia do konta użytkownika w zakresie konta magazynu, aby postępować zgodnie z zasadą najniższych uprawnień. Ta praktyka zapewnia użytkownikom tylko minimalne wymagane uprawnienia i tworzy bezpieczniejsze środowiska produkcyjne.

W poniższym przykładzie do konta użytkownika zostanie przypisana rola Współautor danych obiektu blob usługi Storage, która zapewnia zarówno dostęp do odczytu, jak i zapisu do danych obiektów blob na koncie magazynu.

Ważne

W większości przypadków propagacja przypisania roli na platformie Azure potrwa minutę lub dwie, ale w rzadkich przypadkach może upłynąć do ośmiu minut. Jeśli podczas pierwszego uruchomienia kodu wystąpią błędy uwierzytelniania, zaczekaj chwilę i spróbuj ponownie.

  1. W witrynie Azure Portal znajdź konto magazynu przy użyciu głównego paska wyszukiwania lub nawigacji po lewej stronie.

  2. Na stronie przeglądu konta magazynu wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) z menu po lewej stronie.

  3. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz kartę Przypisania ról.

  4. Wybierz pozycję + Dodaj z górnego menu, a następnie pozycję Dodaj przypisanie roli z wyświetlonego menu rozwijanego.

    A screenshot showing how to assign a storage account role.

  5. Użyj pola wyszukiwania, aby filtrować wyniki do żądanej roli. W tym przykładzie wyszukaj pozycję Współautor danych obiektu blob usługi Storage i wybierz pasujący wynik, a następnie wybierz pozycję Dalej.

  6. W obszarze Przypisz dostęp do wybierz pozycję Użytkownik, grupa lub jednostka usługi, a następnie wybierz pozycję + Wybierz członków.

  7. W oknie dialogowym wyszukaj nazwę użytkownika firmy Microsoft Entra (zazwyczaj adres e-mail user@domain ), a następnie wybierz pozycję Wybierz w dolnej części okna dialogowego.

  8. Wybierz pozycję Przejrzyj i przypisz , aby przejść do ostatniej strony, a następnie ponownie przejrzyj i przypisz, aby ukończyć proces.

Dodawanie bibliotek usługi Event Hubs do projektu Java

Dodaj następujące zależności w pliku pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Dodaj następujące import instrukcje w górnej części pliku Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Utwórz klasę o nazwie Receiveri dodaj do klasy następujące zmienne ciągu. Zastąp symbole zastępcze poprawnymi wartościami.

    Ważne

    Zastąp symbole zastępcze poprawnymi wartościami.

    • <NAMESPACE NAME> na nazwę Twojej przestrzeni nazw usługi Event Hubs.
    • <EVENT HUB NAME> z nazwą centrum zdarzeń w przestrzeni nazw.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Dodaj następującą main metodę do klasy .

    Ważne

    Zastąp symbole zastępcze poprawnymi wartościami.

    • <STORAGE ACCOUNT NAME> z nazwą konta usługi Azure Storage.
    • <CONTAINER NAME> z nazwą kontenera obiektów blob na koncie magazynu
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Dodaj dwie metody pomocnicze (PARTITION_PROCESSOR i ERROR_HANDLER), które przetwarzają zdarzenia i błędy do Receiver klasy.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Skompiluj program i upewnij się, że nie ma żadnych błędów.

Uruchamianie aplikacji

  1. Najpierw uruchom aplikację odbiornika .

  2. Następnie uruchom aplikację Nadawca.

  3. W oknie Aplikacja odbiorcy upewnij się, że są widoczne zdarzenia opublikowane przez aplikację Nadawca.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Naciśnij klawisz ENTER w oknie aplikacji odbiorcy, aby zatrzymać aplikację.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Następne kroki

Zobacz następujące przykłady w witrynie GitHub: