Spring Cloud Stream с Служебная шина Azure
В этой статье показано, как использовать Spring Cloud Stream Binder для отправки сообщений в служебную шину и получения сообщений из нее queues
и topics
.
Важно!
Для выполнения действий, описанных в этой статье, требуется Spring Boot 2.5 (или более поздней версии).
Azure предоставляет платформу асинхронного обмена сообщениями, именуемую служебной шиной Azure ("служебная шина"), основанную на стандарте Advanced Message Queueing Protocol 1.0 (Расширенный протокол управления очередью сообщений "AMQP 1.0"). Служебную шину можно использовать в пределах поддерживаемых платформ Azure.
предварительные требования
Ниже перечислены предварительные требования.
Подписка Azure. Если у вас ее еще нет, вы можете активировать преимущества для подписчиков MSDN или зарегистрироваться для получения бесплатной учетной записи.
Поддерживаемый пакет средств разработки Java (JDK) версии 8 или более поздней. Дополнительные сведения о версиях JDK, доступных для разработки в Azure, см. в статье Поддержка Java в Azure и Azure Stack.
Apache Maven версии 3.2 или более поздней.
Если у вас уже есть настроенная очередь или раздел служебной шины, убедитесь, что пространство имен служебной шины соответствует следующим требованиям:
- Разрешен доступ из всех сетей
- Используется ценовая категория "Стандартный" (или выше)
- Имеется политика доступа с доступом на чтение и запись для вашей очереди и раздела
Если у вас нет настроенной очереди или раздела служебной шины, используйте портал Azure, чтобы создать очередь служебной шины или создать раздел служебной шины. Убедитесь, что пространство имен соответствует требованиям, указанным на предыдущем шаге. Кроме того, запишите строку подключения в пространстве имен так, как это требуется для тестового приложения этого руководства.
Если у вас нет приложения Spring Boot, создайте проект Maven с использованием Spring Initializr. Не забудьте выбрать Проект Maven и в разделе Зависимости добавьте веб-зависимость выберите версию Java 8 или 11 .
Важно!
Для выполнения действий, описанных в этой статье, требуется Spring Boot версии 2.5 или более поздней.
Используйте пускатель Spring Cloud Stream Binder
Найдите файл pom.xml в родительском каталоге вашего приложения; например:
C:\SpringBoot\servicebus\pom.xml
- или -
/users/example/home/servicebus/pom.xml
Затем откройте файл pom.xml в текстовом редакторе.
Добавьте следующий блок кода в элемент :
<dependencies>
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Примечание
Дополнительные сведения об управлении версиями библиотеки Spring Cloud Azure с помощью спецификации (BOM) см. в разделе Начало работы .
Сохраните и закройте файл pom.xml.
Настройка приложения для служебной шины
Настроить приложение можно на основе строки подключения или субъекта-службы. В этом руководстве используется строка подключения. Дополнительные сведения об использовании субъекта-службы см. в статье Spring Cloud Azure Stream Binder for Service Bus queue Code Sample ( Пример кода для очереди служебной шины).
Добавьте файл application.yaml в каталог resources вашего приложения, например:
C:\SpringBoot\servicebus\src\main\resources\application.yaml
-или-
/users/example/home/servicebus/src/main/resources/application.yaml
Откройте файл application.yaml в текстовом редакторе и добавьте соответствующий код в конец файла application.yaml (в зависимости от того, используете ли вы очередь или раздел Служебной шины). Используйте Таблицу описания полей, чтобы заменить образцы значений соответствующими свойствами служебной шины.
Очередь служебной шины
spring: cloud: azure: servicebus: namespace: ${AZURE_SERVICEBUS_NAMESPACE} stream: bindings: consume-in-0: destination: ${AZURE_SERVICEBUS_QUEUE_NAME} supply-out-0: destination: ${AZURE_SERVICEBUS_QUEUE_NAME} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue function: definition: consume;supply; poller: fixed-delay: 1000 initial-delay: 0
Раздел служебной шины
spring: cloud: azure: servicebus: namespace: ${AZURE_SERVICEBUS_NAMESPACE} stream: bindings: consume-in-0: destination: ${AZURE_SERVICEBUS_TOPIC_NAME} group: ${AZURE_SERVICEBUS_TOPIC_SUBSCRIPTION_NAME} supply-out-0: destination: ${AZURE_SERVICEBUS_TOPIC_NAME} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: topic function: definition: consume;supply; poller: fixed-delay: 1000 initial-delay: 0
Поле Описание spring.cloud.azure.servicebus.namespace
Укажите пространство имен, полученное в служебной шине из портал Azure. spring.cloud.stream.function.definition
Укажите, какой функциональный bean-компонент нужно привязать к внешним назначениям, предоставляемым привязками. spring.cloud.stream.poller.fixed-delay
Укажите фиксированную задержку для модуля опроса по умолчанию в миллисекундах (по умолчанию: 1000). spring.cloud.stream.poller.initial-delay
Укажите начальную задержку для периодических триггеров (по умолчанию: 0). spring.cloud.stream.bindings.consume-in-0.destination
Укажите очередь служебной шины или раздел служебной шины, использованные в этом руководстве. spring.cloud.stream.bindings.consume-in-0.group
Если вы использовали раздел служебной шины, укажите подписку на раздел. spring.cloud.stream.bindings.supply-out-0.destination
Укажите то же значение, которое использовалось для назначения ввода. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Укажите, следует ли автоматически урегулировать сообщения. Если задано значение false, будет добавлен заголовок Checkpointer
сообщения , чтобы позволить разработчикам урегулировать сообщения вручную.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Укажите тип сущности для выходной привязки queue
: илиtopic
.Сохраните и закройте файл application.yaml.
Реализация основных функций служебной шины
В этом разделе вы создадите классы Java, необходимые для отправки сообщений в служебную шину.
Изменение класса основного приложения
Найдите файл основного приложения Java в каталоге пакета приложения, например:
C:\SpringBoot\servicebus\src\main\java\com\example\servicebus\ServiceBusApplication.java
-или-
/users/example/home/servicebus/src/main/java/com/example/servicebus/ServiceBusApplication.java
Откройте главный файл приложения Java в текстовом редакторе.
Добавьте в него указанный ниже код.
package com.example.servicebus; import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import java.util.function.Consumer; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusApplication { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusApplication.class); public static void main(String[] args) { SpringApplication.run(ServiceBusApplication.class, args); } @Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } }
Сохраните файл и закройте его.
Создание класса конфигурации производителя
С помощью текстового редактора создайте файл Java с именем ServiceProducerConfiguration.java в каталоге пакета приложения.
Добавьте в новый файл указанный ниже код:
package com.example.servicebus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Supplier; @Configuration public class ServiceProducerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerConfiguration.class); @Bean public Sinks.Many<Message<String>> many() { return Sinks.many().unicast().onBackpressureBuffer(); } @Bean public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) { return () -> many.asFlux() .doOnNext(m -> LOGGER.info("Manually sending message {}", m)) .doOnError(t -> LOGGER.error("Error encountered", t)); } }
Сохраните и закройте файл ServiceProducerConfiguration.java.
Создание класса контроллера
С помощью текстового редактора создайте файл Java с именем ServiceProducerController.java в каталоге пакета приложения.
Добавьте в новый файл следующие строки кода:
package com.example.servicebus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Sinks; @RestController public class ServiceProducerController { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerController.class); @Autowired private Sinks.Many<Message<String>> many; @PostMapping("/messages") public ResponseEntity<String> sendMessage(@RequestParam String message) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message).build(), Sinks.EmitFailureHandler.FAIL_FAST); return ResponseEntity.ok("Sent!"); } }
Сохраните и закройте файл ServiceProducerController.java.
Сборка и тестирование приложения
Откройте командную строку.
Измените каталог на расположение файла pom.xml, например:
cd C:\SpringBoot\servicebus
-или-
cd /users/example/home/servicebus
Создайте приложение Spring Boot с помощью Maven и запустите его:
mvn clean spring-boot:run
После запуска приложения вы можете использовать средство curl, чтобы протестировать приложение:
curl -X POST localhost:8080/messages?message=hello
В журнале приложения появится запись "Привет":
New message received: 'hello' Message 'hello' successfully checkpointed
Очистка ресурсов
Если они больше не нужны, используйте портал Azure, чтобы удалить ресурсы, созданные в этой статье во избежание непредвиденных расходов.