Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule pokazano, jak używać Azure Event Hubs w aplikacjach Java utworzonych za pomocą platformy Spring Framework.
Azure Event Hubs to platforma przesyłania strumieniowego danych big data i usługa pozyskiwania zdarzeń. Może odbierać i przetwarzać miliony zdarzeń na sekundę. Dane wysłane do centrum zdarzeń mogą zostać przekształcone i zmagazynowane przy użyciu dowolnego dostawcy analityki czasu rzeczywistego lub adapterów przetwarzania wsadowego/magazynowania.
Usługa Spring Cloud Azure udostępnia różne moduły do wysyłania komunikatów do usługi Event Hubs i odbierania ich za pomocą platform Spring.
Następujące moduły można użyć niezależnie lub połączyć je w różnych przypadkach użycia:
Spring Cloud Azure Event Hubs Starter umożliwia wysyłanie i odbieranie komunikatów za pomocą biblioteki klienta zestawu SDK usługi Event Hubs Java z funkcjami platformy Spring Boot.
Spring Messaging Azure Event Hubs umożliwia interakcję z usługą Event Hubs za pośrednictwem Spring Messaging API.
Spring Integration Azure Event Hubs umożliwia łączenie kanałów Spring Integration Message z usługą Event Hubs.
Spring Cloud Azure Stream Event Hubs Binder umożliwia używanie usługi Event Hubs jako oprogramowania pośredniczącego obsługi komunikatów w aplikacjach spring Cloud Stream.
Spring Kafka z Azure Event Hubs umożliwia korzystanie z usługi Spring Kafka wysyłania komunikatów do usługi Event Hubs i odbierania ich.
Spring Cloud Stream Kafka Binder with Azure Event Hubs umożliwia wysyłanie i odbieranie komunikatów za pośrednictwem Spring Cloud Stream Kafka Binder z Event Hubs.
Wymagania wstępne
Subskrypcja Azure — utwórz ją bezpłatnie.
Java Development Kit (JDK) w wersji 8 lub nowszej.
Instancja Azure Event Hubs. Aby uzyskać więcej informacji, zobacz Quickstart: Tworzenie centrum zdarzeń przy użyciu portalu Azure.
Konto Azure Storage dla punktów kontrolnych usługi Event Hubs. Aby uzyskać więcej informacji, zobacz temat Tworzenie konta.
Aplikacja Spring Boot. Jeśli go nie masz, utwórz projekt Maven za pomocą narzędzia Spring Initializr. Pamiętaj, aby wybrać Maven Project i w obszarze Dependencies dodaj Spring Web zależność, a następnie wybierz Java w wersji 8 lub nowszej.
Uwaga
Aby udzielić kontu dostępu do zasobów w Azure Event Hubs, przypisz rolę Azure Event Hubs Data Receiver i Azure Event Hubs Data Sender do konta Microsoft Entra, którego aktualnie używasz. Następnie na koncie Azure Storage przypisz rolę Storage Blob Data Contributor do aktualnie używanego konta Microsoft Entra. Aby uzyskać więcej informacji na temat udzielania ról dostępu, zobacz Przypisanie ról Azure przy użyciu portalu Azure i Autoryzowanie dostępu do zasobów Event Hubs przy użyciu Microsoft Entra ID.
Ważne
Do wykonania kroków opisanych w tym samouczku jest wymagana wersja 2.5 lub nowsza Spring Boot.
Przygotowywanie środowiska lokalnego
W tym samouczku ustawienia i kod nie mają żadnych operacji uwierzytelniania. Jednak nawiązywanie połączenia z usługą Azure wymaga uwierzytelniania. Aby ukończyć uwierzytelnianie, należy użyć biblioteki klienta Azure Identity. Usługa Spring Cloud Azure używa DefaultAzureCredential, którą udostępnia biblioteka Azure Identity, aby ułatwić uzyskiwanie poświadczeń bez żadnych zmian w kodzie.
DefaultAzureCredential obsługuje wiele metod uwierzytelniania i określa, która metoda ma być używana w czasie wykonywania. Takie podejście umożliwia aplikacji używanie różnych metod uwierzytelniania w różnych środowiskach — takich jak środowiska lokalne lub produkcyjne — bez implementowania kodu specyficznego dla środowiska. Aby uzyskać więcej informacji, zobacz sekcję DefaultAzureCredential sekcji Authenticate Azure-hosted Java applications.
Aby użyć Azure CLI, IntelliJ lub innych metod do ukończenia uwierzytelniania w lokalnych środowiskach deweloperskich, zobacz Azure uwierzytelnianie w środowiskach deweloperskich Java. Aby ukończyć uwierzytelnianie w środowiskach hostingu Azure, zalecamy użycie tożsamości zarządzanej. Aby uzyskać więcej informacji, zobacz Jak są tożsamości zarządzane dla zasobów Azure?
Użyj startera Spring Cloud Azure Event Hubs
Moduł Spring Cloud Azure Event Hubs Starter importuje moduł Event Hubs Java biblioteki klienta z platformą Spring Boot. Usługi Spring Cloud Azure i Azure SDK można używać razem w sposób nie wykluczające się wzajemnie. W związku z tym możesz nadal używać interfejsu API klienta usługi Event Hubs Java w aplikacji Spring.
Dodawanie zależności
Aby zainstalować moduł Spring Cloud Azure Event Hubs Starter, dodaj następujące zależności do pliku pom.xml:
Projekt Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>7.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>Uwaga
Jeśli używasz platformy Spring Boot 4.0.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na7.2.0.Jeśli używasz platformy Spring Boot 3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na6.2.0.Jeśli używasz platformy Spring Boot 3.1.x-3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na5.25.0.Jeśli używasz środowiska Spring Boot 2.x, ustaw wersję
spring-cloud-azure-dependenciesna4.20.0.Tę listę materiałów (BOM) należy skonfigurować w sekcji
<dependencyManagement>pliku pom.xml. Dzięki temu wszystkie zależności usługi Spring Cloud Azure korzystają z tej samej wersji.Aby uzyskać więcej informacji na temat wersji używanej w tym BOM, zobacz Którą wersję Spring Cloud Azure powinienem użyć?.
Komponent Spring Cloud dla Azure Event Hubs:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-eventhubs</artifactId> </dependency>
Kod aplikacji do wysyłania i odbierania komunikatów
W tym przewodniku pokazano, jak używać klientów usługi Event Hubs Java w kontekście aplikacji Spring. W tym miejscu przedstawimy następujące dwie opcje:
- Użyj automatycznej konfiguracji platformy Spring Boot i użyj wbudowanych klientów z kontekstu Spring (zalecane).
- Programowe kompilowanie klienta.
Sposób automatycznego wstrzykiwania komponentów klienckich z kontenera Spring IoC ma następujące zalety, które mogą zapewnić bardziej elastyczne i wydajne środowisko podczas pracy z klientami Event Hubs.
- Stosuje on konfigurację zewnętrzną, aby można było pracować z tym samym kodem aplikacji w różnych środowiskach.
- Możesz delegować do struktury Spring Boot proces uczenia wzorca konstruktora i rejestrowania tego klienta w kontekście aplikacji. To delegowanie umożliwia skoncentrowanie się na tym, jak używać klientów zgodnie z własnymi wymaganiami biznesowymi.
- Wskaźnik kondycji można użyć w łatwy sposób, aby sprawdzić stan i kondycję aplikacji i składników wewnętrznych.
W poniższych sekcjach przedstawiono przykłady kodu, które pokazują, jak używać metod EventProcessorClient i EventHubProducerClient z dwiema alternatywami.
Uwaga
zestaw SDK Azure Java dla usługi Event Hubs udostępnia wielu klientów do interakcji z usługą Event Hubs. Starter zapewnia również automatyczną konfigurację dla wszystkich klientów usługi Event Hubs, a także konstruktorów klientów. W tym artykule użyto tylko EventProcessorClient i EventHubProducerClient jako przykładów.
Korzystanie z autokonfiguracji Spring Boot
Aby wysyłać komunikaty do usługi Event Hubs i odbierać je z usługi Event Hubs, skonfiguruj aplikację, wykonując następujące czynności:
Użyj następujących ustawień właściwości, aby skonfigurować przestrzeń nazw usługi Event Hubs i nazwę centrum zdarzeń:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.event-hub-name=<your-event-hub-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name> spring.cloud.azure.eventhubs.processor.consumer-group=$DefaultUtwórz nową klasę
EventHubProcessorClientConfigurationJava, jak pokazano w poniższym przykładzie. Ta klasa służy do rejestrowania komunikatu i procedury obsługi błędów dla programuEventProcessorClient.import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler; import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EventHubProcessorClientConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubProcessorClientConfiguration.class); @Bean EventHubsRecordMessageListener processEvent() { return eventContext->LOGGER.info("Processing event from partition {} with sequence number {} with body: {}", eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString()); } @Bean EventHubsErrorHandler processError() { return errorContext->LOGGER.info("Error occurred in partition processor for partition {}, {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); } }Wstrzykuj element
EventProcessorClientiEventHubProducerClientw aplikacji Spring i wywołaj powiązane interfejsy API w celu wysyłania i odbierania komunikatów, jak pokazano w poniższym przykładzie:import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Collections; import java.util.concurrent.TimeUnit; @SpringBootApplication public class EventHubClientApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class); private final EventHubProducerClient eventHubProducerClient; private final EventProcessorClient eventProcessorClient; public EventHubClientApplication(EventHubProducerClient eventHubProducerClient, EventProcessorClient eventProcessorClient) { this.eventHubProducerClient = eventHubProducerClient; this.eventProcessorClient = eventProcessorClient; } public static void main(String[] args) { SpringApplication.run(EventHubClientApplication.class, args); } @Override public void run(String... args) throws Exception { eventProcessorClient.start(); // Wait for the processor client to be ready TimeUnit.SECONDS.sleep(10); eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World"))); LOGGER.info("Successfully sent a message to Event Hubs."); eventHubProducerClient.close(); LOGGER.info("Skip stopping and closing the processor since the processor may not complete the receiving process yet."); } }Uruchom aplikację. Wyświetlane są dzienniki podobne do następującego przykładu:
Successfully sent a message to Event Hubs. ... Processing event from partition 0 with sequence number 0 with body: Hello World ... Stopping and closing the processor.
Programowe kompilowanie klienta
Możesz samodzielnie stworzyć komponenty klienckie, ale proces jest skomplikowany. W aplikacjach Spring Boot musisz zarządzać właściwościami, uczyć się wzorca konstruktora i rejestrować klienta w kontekście aplikacji Spring. W poniższych krokach pokazano, jak to zrobić:
Utwórz nową klasę
EventHubClientConfigurationJava, jak pokazano w poniższym przykładzie. Ta klasa służy do deklarowania fasoliEventProcessorClientiEventHubProducerClient. Pamiętaj, aby zastąpić<your event-hubs-namespace>,<your-event-hub-name>,<your-storage-account-name>i<your-storage-account-container-name>symbole zastępcze rzeczywistymi wartościami.import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EventHubClientConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientConfiguration.class); private static final String EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<your event-hubs-namespace>.servicebus.windows.net"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private static final String STORAGE_ACCOUNT_ENDPOINT = "https://<your-storage-account-name>.blob.core.windows.net"; private static final String STORAGE_CONTAINER_NAME = "<your-storage-account-container-name>"; @Bean EventHubClientBuilder eventHubClientBuilder() { return new EventHubClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME, new DefaultAzureCredentialBuilder() .build()); } @Bean BlobContainerClientBuilder blobContainerClientBuilder() { return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder() .build()) .endpoint(STORAGE_ACCOUNT_ENDPOINT) .containerName(STORAGE_CONTAINER_NAME); } @Bean BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) { return blobContainerClientBuilder.buildAsyncClient(); } @Bean EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) { return new EventProcessorClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME, new DefaultAzureCredentialBuilder() .build()) .consumerGroup(CONSUMER_GROUP) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .processEvent(EventHubClientConfiguration::processEvent) .processError(EventHubClientConfiguration::processError); } @Bean EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) { return eventHubClientBuilder.buildProducerClient(); } @Bean EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) { return eventProcessorClientBuilder.buildEventProcessorClient(); } public static void processEvent(EventContext eventContext) { LOGGER.info("Processing event from partition {} with sequence number {} with body: {}", eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString()); } public static void processError(ErrorContext errorContext) { LOGGER.info("Error occurred in partition processor for partition {}, {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); } }Wstrzyknij elementy
EventProcessorClientiEventHubProducerClientdo aplikacji Spring, jak pokazano poniżej:import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Collections; import java.util.concurrent.TimeUnit; @SpringBootApplication public class EventHubClientApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class); private final EventHubProducerClient eventHubProducerClient; private final EventProcessorClient eventProcessorClient; public EventHubClientApplication(EventHubProducerClient eventHubProducerClient, EventProcessorClient eventProcessorClient) { this.eventHubProducerClient = eventHubProducerClient; this.eventProcessorClient = eventProcessorClient; } public static void main(String[] args) { SpringApplication.run(EventHubClientApplication.class, args); } @Override public void run(String... args) throws Exception { eventProcessorClient.start(); // Wait for the processor client to be ready TimeUnit.SECONDS.sleep(10); eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World"))); LOGGER.info("Successfully sent a message to Event Hubs."); eventHubProducerClient.close(); LOGGER.info("Stopping and closing the processor"); eventProcessorClient.stop(); } }Uruchom aplikację. Wyświetlane są dzienniki podobne do następującego przykładu:
Successfully sent a message to Event Hubs. ... Processing event from partition 0 with sequence number 0 with body: Hello World ... Stopping and closing the processor.
Poniższa lista przedstawia niektóre powody, dla których ten kod nie jest elastyczny ani elegancki.
- Przestrzeń nazw usługi Event Hubs i nazwa centrum zdarzeń są zakodowane w kodzie.
- Jeśli używasz
@Valuedo pobierania konfiguracji ze środowiska Spring, nie możesz mieć wskazówek IDE w pliku application.properties. - Jeśli masz scenariusz mikrousług, musisz zduplikować kod w każdym projekcie, i łatwo jest popełnić błędy, a trudno jest zachować spójność.
Na szczęście tworzenie własnoręczne komponentów klienta nie jest konieczne z użyciem Spring Cloud Azure. Zamiast tego można je bezpośrednio wstrzyknąć i użyć właściwości konfiguracji, które już znasz, aby skonfigurować kolejkę usługi Storage. Aby uzyskać więcej informacji, zobacz Spring Cloud Azure configuration.
Usługa Spring Cloud Azure udostępnia również następujące konfiguracje globalne dla różnych scenariuszy. Aby uzyskać więcej informacji, zobacz sekcję Global configuration for Azure Service SDK sekcji Spring Cloud Azure configuration.
- Opcje serwera proxy.
- Opcje ponawiania prób.
- Opcje klienta transportu AMQP.
Możesz również nawiązać połączenie z różnymi chmurami Azure. Aby uzyskać więcej informacji, zobacz Połączenie z różnymi chmurami Azure.
Korzystanie z usługi Spring Messaging Azure Event Hubs
Moduł Spring Messaging Azure Event Hubs zapewnia obsługę Spring Messaging w usłudze Event Hubs.
Jeśli używasz Azure Event Hubs Spring Messaging, możesz użyć następujących funkcji:
-
EventHubsTemplate: wysyłaj komunikaty do usługi Event Hubs asynchronicznie i synchronicznie. -
@EventHubsListener: Oznacz metodę jako docelową odbiornika komunikatów usługi Event Hubs w miejscu docelowym.
W tym przewodniku pokazano, jak używać usługi Spring Messaging Azure Event Hubs do wysyłania komunikatów do usługi Event Hubs i odbierania ich z usługi Event Hubs.
Dodawanie zależności
Aby zainstalować moduł Spring Messaging Azure Event Hubs, dodaj następujące zależności do pliku pom.xml:
Projekt Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>7.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>Uwaga
Jeśli używasz platformy Spring Boot 4.0.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na7.2.0.Jeśli używasz platformy Spring Boot 3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na6.2.0.Jeśli używasz platformy Spring Boot 3.1.x-3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na5.25.0.Jeśli używasz środowiska Spring Boot 2.x, ustaw wersję
spring-cloud-azure-dependenciesna4.20.0.Tę listę materiałów (BOM) należy skonfigurować w sekcji
<dependencyManagement>pliku pom.xml. Dzięki temu wszystkie zależności usługi Spring Cloud Azure korzystają z tej samej wersji.Aby uzyskać więcej informacji na temat wersji używanej w tym BOM, zobacz Którą wersję Spring Cloud Azure powinienem użyć?.
Szablon startowy Spring Cloud Azure, spring Messaging Event Hubs i artefakty magazynu punktów kontrolnych Azure Event Hubs:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-messaging-azure-eventhubs</artifactId> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId> </dependency>
Kod aplikacji do wysyłania i odbierania komunikatów
Aby wysyłać komunikaty do usługi Event Hubs i odbierać je z usługi Event Hubs, skonfiguruj aplikację, wykonując następujące czynności:
Użyj następujących ustawień właściwości, aby skonfigurować przestrzeń nazw usługi Event Hubs i obiekt blob usługi Storage:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>Utwórz nową klasę
ConsumerServiceJava, jak pokazano w poniższym przykładzie. Ta klasa służy do definiowania odbiornika komunikatów. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.import com.azure.spring.messaging.eventhubs.implementation.core.annotation.EventHubsListener; import org.springframework.stereotype.Service; @Service public class ConsumerService { private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$DEFAULT"; @EventHubsListener(destination = EVENT_HUB_NAME, group = CONSUMER_GROUP) public void handleMessageFromEventHub(String message) { System.out.printf("New message received: %s%n", message); } }Podłącz nadawcę i odbiorcę do wysyłania i odbierania komunikatów za pomocą platformy Spring, jak pokazano w poniższym przykładzie. Pamiętaj, aby zastąpić
<your-event-hub-name>symbol zastępczy wartością rzeczywistą.import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.messaging.support.MessageBuilder; @SpringBootApplication @EnableAzureMessaging public class EventHubMessagingApplication { private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final Logger LOGGER = LoggerFactory.getLogger(EventHubMessagingApplication.class); public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubMessagingApplication.class); EventHubsTemplate eventHubsTemplate = applicationContext.getBean(EventHubsTemplate.class); LOGGER.info("Sending a message to the Event Hubs."); eventHubsTemplate.sendAsync(EVENT_HUB_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe(); } }Wskazówka
Pamiętaj, aby dodać adnotację
@EnableAzureMessaging, która wyzwala odnajdywanie metod oznaczonych adnotacją@EventHubsListener, tworząc kontener nasłuchiwania komunikatów w tle.Uruchom aplikację. Wyświetlane są dzienniki podobne do następującego przykładu:
Sending a message to the Event Hubs. New message received: Hello world
Korzystanie z usługi Spring Integration Azure Event Hubs
Moduł Spring Integration Azure Event Hubs zapewnia obsługę platformy Spring Integration z usługą Event Hubs.
Jeśli aplikacja Spring używa kanałów komunikatów Spring Integration, możesz kierować komunikaty między kanałami komunikatów a usługą Event Hubs przy użyciu adapterów kanałów.
Adapter kanału przychodzącego przekazuje komunikaty z centrum zdarzeń do kanału komunikatów. Adapter kanału wychodzącego publikuje komunikaty z kanału komunikatów do centrum zdarzeń.
W tym przewodniku pokazano, jak używać platformy Spring Integration Azure Event Hubs do wysyłania komunikatów do usługi Event Hubs i odbierania ich z usługi Event Hubs.
Dodawanie zależności
Aby zainstalować moduł Spring Cloud Azure Event Hubs Integration Starter, dodaj następujące zależności do pliku pom.xml:
Projekt Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>7.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>Uwaga
Jeśli używasz platformy Spring Boot 4.0.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na7.2.0.Jeśli używasz platformy Spring Boot 3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na6.2.0.Jeśli używasz platformy Spring Boot 3.1.x-3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na5.25.0.Jeśli używasz środowiska Spring Boot 2.x, ustaw wersję
spring-cloud-azure-dependenciesna4.20.0.Tę listę materiałów (BOM) należy skonfigurować w sekcji
<dependencyManagement>pliku pom.xml. Dzięki temu wszystkie zależności usługi Spring Cloud Azure korzystają z tej samej wersji.Aby uzyskać więcej informacji na temat wersji używanej w tym BOM, zobacz Którą wersję Spring Cloud Azure powinienem użyć?.
Artefakt integracji platformy Spring Cloud Azure Event Hubs:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId> </dependency>
Kod aplikacji do wysyłania i odbierania komunikatów
Aby wysyłać komunikaty do usługi Event Hubs i odbierać je z usługi Event Hubs, skonfiguruj aplikację, wykonując następujące czynności:
Użyj następujących ustawień właściwości, aby skonfigurować przestrzeń nazw usługi Event Hubs i obiekt blob usługi Storage:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>Utwórz nową klasę
MessageReceiveConfigurationJava, jak pokazano w poniższym przykładzie. Ta klasa służy do definiowania odbiornika komunikatów. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory; import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.messaging.MessageChannel; @Configuration public class MessageReceiveConfiguration { private static final String INPUT_CHANNEL = "input"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiveConfiguration.class); @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload) { String message = new String(payload); LOGGER.info("New message received: {}", message); } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENT_HUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } @Bean public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public MessageChannel input() { return new DirectChannel(); } }Utwórz nową klasę
MessageSendConfigurationJava, jak pokazano w poniższym przykładzie. Ta klasa służy do definiowania nadawcy komunikatów. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.import com.azure.spring.integration.core.handler.DefaultMessageHandler; import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.MessageHandler; import org.springframework.util.concurrent.ListenableFutureCallback; @Configuration public class MessageSendConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(MessageSendConfiguration.class); private static final String OUTPUT_CHANNEL = "output"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENT_HUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }Podłącz nadawcę i odbiorcę do wysyłania i odbierania komunikatów za pomocą platformy Spring, jak pokazano w poniższym przykładzie:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Configuration; import org.springframework.integration.config.EnableIntegration; @SpringBootApplication @EnableIntegration @Configuration(proxyBeanMethods = false) public class EventHubIntegrationApplication { public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubIntegrationApplication.class, args); MessageSendConfiguration.EventHubOutboundGateway outboundGateway = applicationContext.getBean(MessageSendConfiguration.EventHubOutboundGateway.class); outboundGateway.send("Hello World"); } }Wskazówka
Pamiętaj, aby dodać adnotację
@EnableIntegration, która umożliwia korzystanie z infrastruktury integracji spring.Uruchom aplikację. Wyświetlane są dzienniki podobne do następującego przykładu:
Message was sent successfully. New message received: Hello World
Użyj Spring Cloud Azure Stream z binderem Event Hubs
Aby wywołać interfejs API usługi Event Hubs w aplikacji Spring Cloud Stream, użyj modułu Spring Cloud Azure Event Hubs Stream Binder.
W tym przewodniku pokazano, jak używać Spring Cloud Stream Event Hubs Binder do wysyłania komunikatów do usługi Event Hubs i odbierania komunikatów z tej usługi.
Dodawanie zależności
Aby zainstalować moduł Spring Cloud Azure Event Hubs Stream Binder, dodaj następujące zależności do pliku pom.xml:
Projekt Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>7.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>Uwaga
Jeśli używasz platformy Spring Boot 4.0.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na7.2.0.Jeśli używasz platformy Spring Boot 3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na6.2.0.Jeśli używasz platformy Spring Boot 3.1.x-3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na5.25.0.Jeśli używasz środowiska Spring Boot 2.x, ustaw wersję
spring-cloud-azure-dependenciesna4.20.0.Tę listę materiałów (BOM) należy skonfigurować w sekcji
<dependencyManagement>pliku pom.xml. Dzięki temu wszystkie zależności usługi Spring Cloud Azure korzystają z tej samej wersji.Aby uzyskać więcej informacji na temat wersji używanej w tym BOM, zobacz Którą wersję Spring Cloud Azure powinienem użyć?.
Artefakt łącznika strumienia Spring Cloud Azure Event Hubs:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
Kod aplikacji do wysyłania i odbierania komunikatów
Aby wysyłać komunikaty do usługi Event Hubs i odbierać je z usługi Event Hubs, skonfiguruj aplikację, wykonując następujące czynności:
Użyj następujących ustawień właściwości, aby skonfigurować przestrzeń nazw usługi Event Hubs i obiekt blob usługi Storage:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>Utwórz odbiornik komunikatów.
Aby użyć aplikacji jako komponentu odbierającego zdarzenia, skonfiguruj powiązanie wejściowe, wykonując następujące zadania:
Zadeklaruj komponent
Consumer, który definiuje logikę obsługi komunikatów. Na przykład, poniższyConsumerkomponent ma nazwęconsume:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: %s%n", message.getPayload()); }; }Dodaj następującą konfigurację, aby określić nazwę
Event Hubdo konsumpcji. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.# name for the above `Consumer` bean spring.cloud.stream.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name> spring.cloud.stream.bindings.consume-in-0.group=$Default spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
Utwórz nadawcę wiadomości.
Aby użyć aplikacji jako źródła zdarzeń, skonfiguruj output binder, wykonując następujące zadania:
Zdefiniuj fasolę
Supplier, która definiuje, gdzie komunikaty pochodzą z aplikacji, jak pokazano w poniższym przykładzie:@Bean public Supplier<Message<String>> supply() { return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }Dodaj następującą konfigurację, aby zdefiniować nazwę
Event Hubdo wysyłania. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.# "consume" is added from the above step spring.cloud.stream.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
Uruchom aplikację. Wyświetlane są dzienniki podobne do następującego przykładu:
Sending a message. New message received: Hello world.
Używanie platformy Spring Kafka z Azure Event Hubs
Usługa Event Hubs udostępnia punkt końcowy platformy Kafka, którego mogą używać istniejące aplikacje oparte na platformie Kafka. Takie podejście stanowi alternatywę dla uruchamiania własnego klastra platformy Kafka. Usługa Event Hubs współpracuje z wieloma istniejącymi aplikacjami platformy Kafka. Aby uzyskać więcej informacji, zobacz Event Hubs for Apache Kafka (Usługa Event Hubs dla platformy Apache Kafka).
W tym przewodniku pokazano, jak używać Azure Event Hubs i Spring Kafka wysyłać komunikaty do usługi Event Hubs i odbierać je.
Dodawanie zależności
Aby zainstalować moduły startowe Spring Cloud Azure i Spring Kafka, dodaj następujące zależności do pliku pom.xml:
Projekt Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>7.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>Uwaga
Jeśli używasz platformy Spring Boot 4.0.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na7.2.0.Jeśli używasz platformy Spring Boot 3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na6.2.0.Jeśli używasz platformy Spring Boot 3.1.x-3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na5.25.0.Jeśli używasz środowiska Spring Boot 2.x, ustaw wersję
spring-cloud-azure-dependenciesna4.20.0.Tę listę materiałów (BOM) należy skonfigurować w sekcji
<dependencyManagement>pliku pom.xml. Dzięki temu wszystkie zależności usługi Spring Cloud Azure korzystają z tej samej wersji.Aby uzyskać więcej informacji na temat wersji używanej w tym BOM, zobacz Którą wersję Spring Cloud Azure powinienem użyć?.
Szablon startowy Spring Cloud Azure i artefakt Spring Kafka:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Kod aplikacji do wysyłania i odbierania komunikatów
Aby wysyłać komunikaty do usługi Event Hubs i odbierać je z usługi Event Hubs, skonfiguruj aplikację, wykonując następujące czynności:
Użyj następującego ustawienia właściwości, aby skonfigurować przestrzeń nazw usługi Event Hubs:
spring.kafka.bootstrap-servers=<your event-hubs-namespace>.servicebus.windows.net:9093Służy
KafkaTemplatedo wysyłania komunikatów i@KafkaListenerodbierania komunikatów, jak pokazano w poniższym przykładzie. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; @SpringBootApplication public class EventHubKafkaApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaApplication.class); private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private final KafkaTemplate<String, String> kafkaTemplate; public EventHubKafkaApplication(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public static void main(String[] args) { SpringApplication.run(EventHubKafkaApplication.class, args); } @Override public void run(String... args) { kafkaTemplate.send(EVENT_HUB_NAME, "Hello World"); LOGGER.info("Message was sent successfully."); } @KafkaListener(topics = EVENT_HUB_NAME, groupId = CONSUMER_GROUP) public void receive(String message) { LOGGER.info("New message received: {}", message); } }Uruchom aplikację. Wyświetlane są dzienniki podobne do następującego przykładu:
Message was sent successfully. New message received: Hello world
Używanie Spring Cloud Stream Kafka Binder z Azure Event Hubs
Spring Cloud Stream to struktura, która umożliwia deweloperom aplikacji pisanie mikrousług opartych na komunikatach. Łącze między systemem obsługi komunikatów a Spring Cloud Stream odbywa się za pośrednictwem abstrakcji powiązań. Złącza istnieją dla kilku systemów obsługi komunikatów, ale jednym z najczęściej używanych złączy jest to dla Apache Kafka.
W tym przewodniku pokazano, jak używać Azure Event Hubs i Spring Cloud Stream Binder do wysyłania komunikatów i odbierania komunikatów z usługi Event Hubs.
Dodawanie zależności
Aby zainstalować moduły spring cloud Azure starter i Spring Cloud Stream Binder Kafka, dodaj następujące zależności do pliku pom.xml:
Projekt Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>7.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>Uwaga
Jeśli używasz platformy Spring Boot 4.0.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na7.2.0.Jeśli używasz platformy Spring Boot 3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na6.2.0.Jeśli używasz platformy Spring Boot 3.1.x-3.5.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencieswersję na5.25.0.Jeśli używasz środowiska Spring Boot 2.x, ustaw wersję
spring-cloud-azure-dependenciesna4.20.0.Tę listę materiałów (BOM) należy skonfigurować w sekcji
<dependencyManagement>pliku pom.xml. Dzięki temu wszystkie zależności usługi Spring Cloud Azure korzystają z tej samej wersji.Aby uzyskać więcej informacji na temat wersji używanej w tym BOM, zobacz Którą wersję Spring Cloud Azure powinienem użyć?.
Artefakt startowy Spring Cloud Azure:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
Kod aplikacji do wysyłania i odbierania komunikatów
Aby wysyłać komunikaty do usługi Event Hubs i odbierać je z usługi Event Hubs, skonfiguruj aplikację, wykonując następujące czynności:
Użyj następującego ustawienia właściwości, aby skonfigurować brokera platformy Kafka:
spring.cloud.stream.kafka.binder.brokers=<your event-hubs-namespace>.servicebus.windows.net:9093Utwórz odbiornik komunikatów.
Aby użyć aplikacji jako komponentu odbierającego zdarzenia, skonfiguruj powiązanie wejściowe, wykonując następujące zadania:
Zadeklaruj komponent
Consumer, który definiuje logikę obsługi komunikatów. Na przykład, poniższyConsumerkomponent ma nazwęconsume:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: %s%n", message.getPayload()); }; }Dodaj następującą konfigurację, aby określić nazwę
Event Hubdo konsumpcji. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.# name for the above `Consumer` bean spring.cloud.stream.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name> spring.cloud.stream.bindings.consume-in-0.group=$Default
Utwórz nadawcę wiadomości.
Aby użyć aplikacji jako źródła zdarzeń, skonfiguruj output binder, wykonując następujące zadania:
Zdefiniuj fasolę
Supplier, która definiuje, gdzie komunikaty pochodzą z aplikacji, jak pokazano w poniższym przykładzie:@Bean public Supplier<Message<String>> supply() { return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }Dodaj następującą konfigurację, aby zdefiniować nazwę
Event Hubdo wysyłania. Pamiętaj, aby zastąpić<your-event-hub-name>symbol zastępczy wartością rzeczywistą.# "consume" is added from the above step spring.cloud.stream.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
Uruchom aplikację. Wyświetlane są dzienniki podobne do następującego przykładu:
Sending a message. New message received: Hello world.
Wdrażanie do Azure Spring Apps
Teraz, gdy aplikacja Spring Boot działa lokalnie, nadszedł czas, aby przenieść ją do środowiska produkcyjnego. Azure Spring Apps ułatwia wdrażanie aplikacji Spring Boot w Azure bez żadnych zmian w kodzie. Usługa zarządza infrastrukturą aplikacji Spring, aby deweloperzy mogli skupić się na swoim kodzie. Azure Spring Apps zapewnia zarządzanie cyklem życia przy użyciu kompleksowego monitorowania i diagnostyki, zarządzania konfiguracją, odnajdywania usług, integracji CI/CD, wdrożeń blue-green i nie tylko. Aby wdrożyć aplikację do Azure Spring Apps, zapoznaj się z Wdrożenie pierwszej aplikacji do Azure Spring Apps.
Następne kroki
Zobacz też
Aby uzyskać więcej informacji na temat innych starterów Spring Boot dostępnych dla Microsoft Azure, zobacz Co to jest Spring Cloud Azure?