Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье показано, как использовать Центры событий Azure в приложениях Java, созданных с помощью Spring Framework.
Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий. Она может получать и обрабатывать миллионы событий в секунду. Данные, отправляемые в концентратор событий, можно преобразовывать и сохранять с помощью любого поставщика аналитики в реальном времени, а также с помощью адаптеров пакетной обработки или хранения.
Spring Cloud Azure предоставляет различные модули для отправки сообщений и получения сообщений из Центров событий с помощью платформ Spring.
Для разных вариантов использования можно использовать следующие модули независимо или объединить их.
Spring Cloud Azure Event Hubs Starter позволяет отправлять и получать сообщения, используя клиентскую библиотеку Java SDK для Event Hubs с функциональностью Spring Boot.
Spring Messaging Центры событий Azure позволяет взаимодействовать с Центрами событий через API Spring Messaging.
Azure Event Hubs для Spring Integration позволяет подключать каналы сообщений Spring Integration к центрам событий.
Биндер для Spring Cloud Azure Stream Event Hubs позволяет использовать Центры событий в качестве промежуточного слоя обмена сообщениями в приложениях Spring Cloud Stream.
Spring Kafka с Центрами событий Azure позволяет использовать Spring Kafka для отправки и получения сообщений через Центры событий.
Spring Cloud Stream Kafka Binder с центрами событий Azure позволяет отправлять и получать сообщения через Spring Cloud Stream Kafka Binder с Центрами событий Azure.
Предварительные условия
Подписка Azure — создайте бесплатную учетную запись.
Пакет средств разработки Java (JDK) версии 8 или более поздней.
Экземпляр шины событий Azure. Для получения дополнительной информации см. раздел «Быстрое начало: создание концентратора событий с использованием портала Azure».
Учетная запись хранилища Azure для контрольных точек Event Hubs. Дополнительные сведения см. в разделе Создание учетной записи хранения.
Приложение Spring Boot. Если у вас его нет, создайте проект Maven с помощью Spring Initializr. Не забудьте выбрать Проект Maven и в разделе "Зависимости" добавьте зависимость Spring Web , а затем выберите Java версии 8 или более поздней.
Примечание.
Чтобы предоставить вашей учетной записи доступ к ресурсам в Центрах событий Azure, назначьте роли Azure Event Hubs Data Receiver
и Azure Event Hubs Data Sender
учетной записи Microsoft Entra, которую вы используете. Затем в учетной записи Azure Storage назначьте учетной записи Microsoft Entra используемую вами роль Storage Blob Data Contributor
. Дополнительные сведения о предоставлении ролей доступа см. в статьях Назначение ролей Azure с помощью портала Azure и Авторизация доступа к ресурсам Центров событий с помощью Microsoft Entra ID.
Внимание
Для выполнения действий, описанных в этом руководстве, требуется Spring Boot версии 2.5 или более поздней.
Подготовьте локальную среду
В этом руководстве конфигурации и код не имеют никаких операций проверки подлинности. Однако для подключения к службе Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать клиентская библиотека удостоверений Azure. Spring Cloud Azure использует DefaultAzureCredential
библиотеку Azure Identity, которая помогает получить учетные данные без изменения кода.
DefaultAzureCredential
поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах, таких как локальные или рабочие среды, без реализации кода для конкретной среды. Дополнительные сведения см. в разделе DefaultAzureCredential документации Аутентификация Java-приложений, размещенных в Azure.
Чтобы использовать Azure CLI, IntelliJ или другие методы для выполнения проверки подлинности в локальных средах разработки, ознакомьтесь с проверкой подлинности Azure в средах разработки Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение. Для получения более подробной информации см. Что такое управляемые удостоверения для ресурсов Azure?
Использование Spring Cloud Azure Event Hubs Starter
Модуль Spring Cloud Azure Event Hubs Starter импортирует библиотеку клиента Java для Центров событий в рамках фреймворка Spring Boot. Вы можете использовать вместе Spring Cloud Azure и Azure SDK в неограниченном режиме. Таким образом, вы можете продолжать использовать клиентский API Event Hubs Java в вашем приложении Spring.
Добавление зависимостей
Чтобы установить модуль Starter для Azure Spring Cloud Event Hubs, добавьте зависимости, указанные ниже, в файл pom.xml.
Спецификация компонентов Spring Cloud Azure (BOM)
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.22.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, обязательно установите версию
spring-cloud-azure-dependencies
на4.20.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Артефакт Spring Cloud Azure Event Hubs:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-eventhubs</artifactId> </dependency>
Код приложения для отправки и получения сообщений
В этом руководстве описано, как использовать клиенты Java Центров событий в контексте приложения Spring. Ниже приведены два варианта.
- Используйте автоматическую настройку Spring Boot и используйте встроенные клиенты из контекста Spring (рекомендуется).
- Создайте клиент программным способом.
Способ автоматического связывания клиентских компонентов из контейнера Spring IoC имеет следующие преимущества, которые могут обеспечить более гибкий и эффективный опыт при разработке с клиентами Event Hubs:
- Она применяет внешнюю конфигурацию , чтобы работать с одинаковым кодом приложения в разных средах.
- Вы можете делегировать фреймворку Spring Boot процесс использования шаблона построителя и регистрации этого клиента в контексте приложения. Делегирование позволяет вам сосредоточиться на том, как использовать клиентов в соответствии с вашими собственными бизнес-требованиями.
- С помощью индикатора работоспособности можно легко проверить состояние и работоспособность приложения и внутренних компонентов.
В следующих разделах приведены примеры кода, показывающие использование EventProcessorClient
и EventHubProducerClient
с двумя альтернативами.
Примечание.
Пакет SDK Java для Центров событий Azure предоставляет несколько клиентов для взаимодействия с Центрами событий. Стартер также предоставляет автоконфигурацию для всех клиентов Event Hubs, а также билдеров клиентов. В этой статье используются только EventProcessorClient
и EventHubProducerClient
в качестве примеров.
Использование автоматической настройки Spring Boot
Чтобы отправлять сообщения и получать сообщения из Центров событий, настройте приложение, выполнив следующие действия.
Используйте следующие настройки свойств, чтобы настроить пространство имен Event Hubs и имя узла событий.
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.event-hub-name=<your-event-hub-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name> spring.cloud.azure.eventhubs.processor.consumer-group=$Default
Создайте класс
EventHubProcessorClientConfiguration
Java, как показано в следующем примере. Этот класс используется для регистрации обработчика сообщений и ошибокEventProcessorClient
.import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler; import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EventHubProcessorClientConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubProcessorClientConfiguration.class); @Bean EventHubsRecordMessageListener processEvent() { return eventContext->LOGGER.info("Processing event from partition {} with sequence number {} with body: {}", eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString()); } @Bean EventHubsErrorHandler processError() { return errorContext->LOGGER.info("Error occurred in partition processor for partition {}, {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); } }
Вставьте
EventProcessorClient
иEventHubProducerClient
в ваше приложение Spring и вызовите связанные API для отправки и получения сообщений, как показано в следующем примере:import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Collections; import java.util.concurrent.TimeUnit; @SpringBootApplication public class EventHubClientApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class); private final EventHubProducerClient eventHubProducerClient; private final EventProcessorClient eventProcessorClient; public EventHubClientApplication(EventHubProducerClient eventHubProducerClient, EventProcessorClient eventProcessorClient) { this.eventHubProducerClient = eventHubProducerClient; this.eventProcessorClient = eventProcessorClient; } public static void main(String[] args) { SpringApplication.run(EventHubClientApplication.class, args); } @Override public void run(String... args) throws Exception { eventProcessorClient.start(); // Wait for the processor client to be ready TimeUnit.SECONDS.sleep(10); eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World"))); LOGGER.info("Successfully sent a message to Event Hubs."); eventHubProducerClient.close(); LOGGER.info("Skip stopping and closing the processor since the processor may not complete the receiving process yet."); } }
Запустите приложение. Вы видите журналы, аналогичные следующему примеру:
Successfully sent a message to Event Hubs. ... Processing event from partition 0 with sequence number 0 with body: Hello World ... Stopping and closing the processor.
Создание клиента программным способом
Вы можете создать клиентские бины самостоятельно, но процесс сложный. В приложениях Spring Boot необходимо управлять свойствами, изучать шаблон построителя и регистрировать клиент в контексте приложения Spring. Ниже показано, как это сделать:
Создайте класс
EventHubClientConfiguration
Java, как показано в следующем примере. Этот класс используется для объявленияEventProcessorClient
иEventHubProducerClient
бобов. Обязательно замените заполнители<your event-hubs-namespace>
,<your-event-hub-name>
,<your-storage-account-name>
и<your-storage-account-container-name>
вашими фактическими значениями.import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EventHubClientConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientConfiguration.class); private static final String EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<your event-hubs-namespace>.servicebus.windows.net"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private static final String STORAGE_ACCOUNT_ENDPOINT = "https://<your-storage-account-name>.blob.core.windows.net"; private static final String STORAGE_CONTAINER_NAME = "<your-storage-account-container-name>"; @Bean EventHubClientBuilder eventHubClientBuilder() { return new EventHubClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME, new DefaultAzureCredentialBuilder() .build()); } @Bean BlobContainerClientBuilder blobContainerClientBuilder() { return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder() .build()) .endpoint(STORAGE_ACCOUNT_ENDPOINT) .containerName(STORAGE_CONTAINER_NAME); } @Bean BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) { return blobContainerClientBuilder.buildAsyncClient(); } @Bean EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) { return new EventProcessorClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME, new DefaultAzureCredentialBuilder() .build()) .consumerGroup(CONSUMER_GROUP) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .processEvent(EventHubClientConfiguration::processEvent) .processError(EventHubClientConfiguration::processError); } @Bean EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) { return eventHubClientBuilder.buildProducerClient(); } @Bean EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) { return eventProcessorClientBuilder.buildEventProcessorClient(); } public static void processEvent(EventContext eventContext) { LOGGER.info("Processing event from partition {} with sequence number {} with body: {}", eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString()); } public static void processError(ErrorContext errorContext) { LOGGER.info("Error occurred in partition processor for partition {}, {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); } }
Внедрите
EventProcessorClient
иEventHubProducerClient
в приложение Spring, как показано в следующем примере:import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Collections; import java.util.concurrent.TimeUnit; @SpringBootApplication public class EventHubClientApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class); private final EventHubProducerClient eventHubProducerClient; private final EventProcessorClient eventProcessorClient; public EventHubClientApplication(EventHubProducerClient eventHubProducerClient, EventProcessorClient eventProcessorClient) { this.eventHubProducerClient = eventHubProducerClient; this.eventProcessorClient = eventProcessorClient; } public static void main(String[] args) { SpringApplication.run(EventHubClientApplication.class, args); } @Override public void run(String... args) throws Exception { eventProcessorClient.start(); // Wait for the processor client to be ready TimeUnit.SECONDS.sleep(10); eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World"))); LOGGER.info("Successfully sent a message to Event Hubs."); eventHubProducerClient.close(); LOGGER.info("Stopping and closing the processor"); eventProcessorClient.stop(); } }
Запустите приложение. Вы видите журналы, аналогичные следующему примеру:
Successfully sent a message to Event Hubs. ... Processing event from partition 0 with sequence number 0 with body: Hello World ... Stopping and closing the processor.
В следующем списке показаны некоторые причины, по которым этот код не является гибким или изящным:
- Пространство имен Центров событий и имя концентратора событий жестко закодированы.
- Если вы используете
@Value
для получения конфигураций из среды Spring, в файле application.properties невозможно указать IDE. - Если у вас есть сценарий микросервисов, вам придется дублировать код в каждом проекте, и это легко может привести к ошибкам и затруднить поддержание согласованности.
К счастью, вам не нужно самостоятельно создавать клиентские beans с Spring Cloud Azure. Вместо этого можно напрямую внедрить их и использовать свойства конфигурации, с которыми вы уже знакомы для настройки очереди хранилища. Дополнительные сведения см. в разделе "Конфигурация Azure Spring Cloud".
Azure Spring Cloud также предоставляет следующие глобальные конфигурации для различных сценариев. Для получения дополнительной информации см. раздел "Глобальная конфигурация для SDK служб Azure" в конфигурации Spring Cloud Azure.
- Параметры прокси-сервера.
- Опции повтора.
- Опции транспортного клиента AMQP.
Вы также можете подключиться к разным облакам Azure. Дополнительные сведения см. в статье "Подключение к разным облакам Azure".
Использовать Spring Messaging с Event Hubs в Azure
Модуль Spring Messaging Центры событий Azure предоставляет поддержку платформы Spring Messaging с центрами событий.
Если вы используете Spring Messaging Центры событий Azure, можно использовать следующие функции:
-
EventHubsTemplate
: асинхронно и синхронно отправлять сообщения в Центры событий. -
@EventHubsListener
: Маркируйте метод, чтобы он был целевым слушателем сообщений в Центрах событий на объекте назначения.
В этом руководстве показано, как использовать Spring Messaging Центры событий Azure для отправки сообщений и получения сообщений из Центров событий.
Добавление зависимостей
Чтобы установить модуль Spring Messaging Центры событий Azure, добавьте следующие зависимости в файл pom.xml:
Спецификация компонентов Spring Cloud Azure (BOM)
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.22.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, обязательно установите версию
spring-cloud-azure-dependencies
на4.20.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Начальный элемент Spring Cloud Azure, Spring Messaging для Центров событий и компоненты хранилища контрольных точек для Azure Event Hubs:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-messaging-azure-eventhubs</artifactId> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId> </dependency>
Код приложения для отправки и получения сообщений
Чтобы отправлять сообщения и получать сообщения из Центров событий, настройте приложение, выполнив следующие действия.
Используйте следующие параметры свойства, чтобы настроить пространство имен Event Hubs и хранилище Blob.
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
Создайте класс
ConsumerService
Java, как показано в следующем примере. Этот класс используется для определения приемника сообщений. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.import com.azure.spring.messaging.eventhubs.implementation.core.annotation.EventHubsListener; import org.springframework.stereotype.Service; @Service public class ConsumerService { private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$DEFAULT"; @EventHubsListener(destination = EVENT_HUB_NAME, group = CONSUMER_GROUP) public void handleMessageFromEventHub(String message) { System.out.printf("New message received: %s%n", message); } }
Подключите отправителя и получателя для отправки и получения сообщений с помощью Spring, как показано в следующем примере. Обязательно замените плейсхолдер
<your-event-hub-name>
на фактическое значение.import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.messaging.support.MessageBuilder; @SpringBootApplication @EnableAzureMessaging public class EventHubMessagingApplication { private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final Logger LOGGER = LoggerFactory.getLogger(EventHubMessagingApplication.class); public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubMessagingApplication.class); EventHubsTemplate eventHubsTemplate = applicationContext.getBean(EventHubsTemplate.class); LOGGER.info("Sending a message to the Event Hubs."); eventHubsTemplate.sendAsync(EVENT_HUB_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe(); } }
Совет
Не забудьте добавить заметку
@EnableAzureMessaging
, которая активирует обнаружение методов, аннотированных с помощью@EventHubsListener
, создавая контейнер прослушивателя сообщений в фоновом режиме.Запустите приложение. Вы видите журналы, аналогичные следующему примеру:
Sending a message to the Event Hubs. New message received: Hello world
Использование Spring Integration с Event Hubs Azure
Модуль Spring Integration Центры событий Azure предоставляет поддержку платформы Spring Integration с Центрами событий.
Если приложение Spring использует каналы сообщений Spring Integration, вы можете направлять сообщения между каналами сообщений и Центрами событий с помощью адаптеров каналов.
Адаптер входящего канала перенаправит сообщения из концентратора событий в канал сообщений. Адаптер исходящего канала публикует сообщения из канала сообщений в концентратор событий.
В этом руководстве показано, как использовать Spring Integration Центры событий Azure для отправки сообщений и получения сообщений из Центров событий.
Добавление зависимостей
Чтобы установить модуль интеграции Spring Cloud Azure Event Hubs Starter, добавьте в файл pom.xml, следующие зависимости:
Спецификация компонентов Spring Cloud Azure (BOM)
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.22.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, обязательно установите версию
spring-cloud-azure-dependencies
на4.20.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Компонент интеграции Spring Cloud с Event Hubs Azure:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId> </dependency>
Код приложения для отправки и получения сообщений
Чтобы отправлять сообщения и получать сообщения из Центров событий, настройте приложение, выполнив следующие действия.
Используйте следующие параметры свойства, чтобы настроить пространство имен Event Hubs и хранилище Blob.
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
Создайте класс
MessageReceiveConfiguration
Java, как показано в следующем примере. Этот класс используется для определения приемника сообщений. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory; import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.messaging.MessageChannel; @Configuration public class MessageReceiveConfiguration { private static final String INPUT_CHANNEL = "input"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiveConfiguration.class); @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload) { String message = new String(payload); LOGGER.info("New message received: {}", message); } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENT_HUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } @Bean public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public MessageChannel input() { return new DirectChannel(); } }
Создайте класс
MessageSendConfiguration
Java, как показано в следующем примере. Этот класс используется для определения отправителя сообщения. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.import com.azure.spring.integration.core.handler.DefaultMessageHandler; import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.MessageHandler; import org.springframework.util.concurrent.ListenableFutureCallback; @Configuration public class MessageSendConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(MessageSendConfiguration.class); private static final String OUTPUT_CHANNEL = "output"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENT_HUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Подключите отправителя и получателя для отправки и получения сообщений с помощью Spring, как показано в следующем примере:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Configuration; import org.springframework.integration.config.EnableIntegration; @SpringBootApplication @EnableIntegration @Configuration(proxyBeanMethods = false) public class EventHubIntegrationApplication { public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubIntegrationApplication.class, args); MessageSendConfiguration.EventHubOutboundGateway outboundGateway = applicationContext.getBean(MessageSendConfiguration.EventHubOutboundGateway.class); outboundGateway.send("Hello World"); } }
Совет
Не забудьте добавить аннотацию
@EnableIntegration
, которая включает инфраструктуру Spring Integration.Запустите приложение. Вы видите журналы, аналогичные следующему примеру:
Message was sent successfully. New message received: Hello World
Используйте привязку Spring Cloud Azure Stream для Event Hubs
Чтобы вызвать API Event Hubs в приложении Spring Cloud Stream, используйте модуль Spring Cloud Azure Event Hubs Stream Binder.
В этом руководстве показано, как использовать Привязку Центров событий Spring Cloud Stream для отправки сообщений и получения сообщений из Центров событий.
Добавление зависимостей
Для установки модуля Azure Event Hubs Stream Binder для Spring Cloud добавьте следующие зависимости в файл pom.xml:
Спецификация компонентов Spring Cloud Azure (BOM)
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.22.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, обязательно установите версию
spring-cloud-azure-dependencies
на4.20.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Azure Spring Cloud артефакт преобразователя потока для Центров событий Azure:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
Код приложения для отправки и получения сообщений
Чтобы отправлять сообщения и получать сообщения из Центров событий, настройте приложение, выполнив следующие действия.
Используйте следующие параметры свойства, чтобы настроить пространство имен Event Hubs и хранилище Blob.
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
Создайте приемник сообщений.
Чтобы использовать приложение в качестве приемника событий, настройте входной соединитель, выполнив следующие задачи:
Объявите бин, определяющий логику
Consumer
обработки сообщений. Например, следующаяConsumer
фасоль называетсяconsume
:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: %s%n", message.getPayload()); }; }
Добавьте следующую конфигурацию, чтобы указать
Event Hub
имя для потребления. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.# name for the above `Consumer` bean spring.cloud.stream.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name> spring.cloud.stream.bindings.consume-in-0.group=$Default spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
Создайте отправителя сообщения.
Чтобы использовать ваше приложение в качестве источника событий, настройте выходной связыватель, выполнив следующую задачу:
Определите компонент
Supplier
, который определяет, откуда приходят сообщения в вашем приложении, как показано в следующем примере:@Bean public Supplier<Message<String>> supply() { return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }
Добавьте следующую конфигурацию, чтобы указать
Event Hub
имя для отправки. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.# "consume" is added from the above step spring.cloud.stream.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
Запустите приложение. Вы видите журналы, аналогичные следующему примеру:
Sending a message. New message received: Hello world.
Использование Spring Kafka с Центрами событий Azure
Центры событий предоставляют конечную точку Kafka, которую могут использовать существующие приложения на основе Kafka. Этот подход предоставляет альтернативу запуску собственного кластера Kafka. Центры событий работают со многими из ваших существующих приложений Kafka. Дополнительные сведения см. в разделе Центры событий для Apache Kafka.
В этом руководстве показано, как использовать Центры событий Azure и Spring Kafka для отправки сообщений и получения сообщений из Центров событий.
Добавление зависимостей
Чтобы установить начальные модули Azure Spring Cloud и Spring Kafka, добавьте следующие зависимости в файл pom.xml :
Спецификация компонентов Spring Cloud Azure (BOM)
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.22.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, обязательно установите версию
spring-cloud-azure-dependencies
на4.20.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Стартовый компонент Spring Cloud Azure и артефакт Spring Kafka:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Код приложения для отправки и получения сообщений
Чтобы отправлять сообщения и получать сообщения из Центров событий, настройте приложение, выполнив следующие действия.
Используйте следующий параметр, чтобы настроить пространство имен для Event Hubs:
spring.kafka.bootstrap-servers=<your event-hubs-namespace>.servicebus.windows.net:9093
Используется
KafkaTemplate
для отправки сообщений и@KafkaListener
получения сообщений, как показано в следующем примере. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; @SpringBootApplication public class EventHubKafkaApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaApplication.class); private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private final KafkaTemplate<String, String> kafkaTemplate; public EventHubKafkaApplication(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public static void main(String[] args) { SpringApplication.run(EventHubKafkaApplication.class, args); } @Override public void run(String... args) { kafkaTemplate.send(EVENT_HUB_NAME, "Hello World"); LOGGER.info("Message was sent successfully."); } @KafkaListener(topics = EVENT_HUB_NAME, groupId = CONSUMER_GROUP) public void receive(String message) { LOGGER.info("New message received: {}", message); } }
Запустите приложение. Вы видите журналы, аналогичные следующему примеру:
Message was sent successfully. New message received: Hello world
Использование привязки Spring Cloud Stream Kafka Binder с Центром событий Azure
Spring Cloud Stream — это платформа, которая позволяет разработчикам приложений записывать микрослужбы на основе сообщений. Мост между системой обмена сообщениями и Spring Cloud Stream осуществляется через абстракцию биндера. Привязки существуют для нескольких систем обмена сообщениями, но один из наиболее часто используемых привязок предназначен для Apache Kafka.
В этом руководстве показано, как использовать Центры событий Azure и Spring Cloud Stream Kafka Binder для отправки сообщений и получения сообщений из Центров событий.
Добавление зависимостей
Чтобы установить начальный модуль Spring Cloud Azure и модули Kafka для привязки Spring Cloud Stream, добавьте следующие зависимости в файл pom.xml :
Спецификация компонентов Spring Cloud Azure (BOM)
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.22.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, обязательно установите версию
spring-cloud-azure-dependencies
на4.20.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Начальный модуль Spring Cloud Azure
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
Код приложения для отправки и получения сообщений
Чтобы отправлять сообщения и получать сообщения из Центров событий, настройте приложение, выполнив следующие действия.
Используйте следующую настройку параметра, чтобы настроить брокера Kafka:
spring.cloud.stream.kafka.binder.brokers=<your event-hubs-namespace>.servicebus.windows.net:9093
Создайте приемник сообщений.
Чтобы использовать приложение в качестве приемника событий, настройте входной соединитель, выполнив следующие задачи:
Объявите бин, определяющий логику
Consumer
обработки сообщений. Например, следующаяConsumer
фасоль называетсяconsume
:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: %s%n", message.getPayload()); }; }
Добавьте следующую конфигурацию, чтобы указать
Event Hub
имя для потребления. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.# name for the above `Consumer` bean spring.cloud.stream.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name> spring.cloud.stream.bindings.consume-in-0.group=$Default
Создайте отправителя сообщения.
Чтобы использовать ваше приложение в качестве источника событий, настройте выходной связыватель, выполнив следующую задачу:
Определите компонент
Supplier
, который определяет, откуда приходят сообщения в вашем приложении, как показано в следующем примере:@Bean public Supplier<Message<String>> supply() { return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }
Добавьте следующую конфигурацию, чтобы указать
Event Hub
имя для отправки. Обязательно замените плейсхолдер<your-event-hub-name>
на фактическое значение.# "consume" is added from the above step spring.cloud.stream.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
Запустите приложение. Вы видите журналы, аналогичные следующему примеру:
Sending a message. New message received: Hello world.
Развертывание в Azure Spring Apps
Теперь, когда у вас есть приложение Spring Boot, работающее локально, пришло время переместить его в рабочую среду. Azure Spring Apps упрощает развертывание приложений Spring Boot в Azure без каких-либо изменений кода. Эта служба управляет инфраструктурой приложений Spring, благодаря чему разработчики могут сосредоточиться на коде. Azure Spring Apps обеспечивает управление жизненным циклом за счет комплексного мониторинга и диагностики, управления конфигурацией, обнаружения служб, интеграции CI/CD, выполнения сине-зеленых развертываний и прочего. Сведения о развертывании приложения в Azure Spring Apps см. в статье "Развертывание первого приложения в Azure Spring Apps".
Следующие шаги
См. также
Дополнительные сведения о других начальных программах Spring Boot, доступных для Microsoft Azure, см. в статье "Что такое Spring Cloud Azure?