Spring Cloud Stream с Служебная шина Azure

В этой статье показано, как использовать Spring Cloud Stream Binder для отправки сообщений в служебную шину и получения сообщений из нее queues и topics.

Важно!

Для выполнения действий, описанных в этой статье, требуется Spring Boot 2.5 (или более поздней версии).

Azure предоставляет платформу асинхронного обмена сообщениями, именуемую служебной шиной Azure ("служебная шина"), основанную на стандарте Advanced Message Queueing Protocol 1.0 (Расширенный протокол управления очередью сообщений "AMQP 1.0"). Служебную шину можно использовать в пределах поддерживаемых платформ Azure.

предварительные требования

Ниже перечислены предварительные требования.

  1. Подписка Azure. Если у вас ее еще нет, вы можете активировать преимущества для подписчиков MSDN или зарегистрироваться для получения бесплатной учетной записи.

  2. Поддерживаемый пакет средств разработки Java (JDK) версии 8 или более поздней. Дополнительные сведения о версиях JDK, доступных для разработки в Azure, см. в статье Поддержка Java в Azure и Azure Stack.

  3. Apache Maven версии 3.2 или более поздней.

  4. Если у вас уже есть настроенная очередь или раздел служебной шины, убедитесь, что пространство имен служебной шины соответствует следующим требованиям:

    1. Разрешен доступ из всех сетей
    2. Используется ценовая категория "Стандартный" (или выше)
    3. Имеется политика доступа с доступом на чтение и запись для вашей очереди и раздела
  5. Если у вас нет настроенной очереди или раздела служебной шины, используйте портал Azure, чтобы создать очередь служебной шины или создать раздел служебной шины. Убедитесь, что пространство имен соответствует требованиям, указанным на предыдущем шаге. Кроме того, запишите строку подключения в пространстве имен так, как это требуется для тестового приложения этого руководства.

  6. Если у вас нет приложения Spring Boot, создайте проект Maven с использованием Spring Initializr. Не забудьте выбрать Проект Maven и в разделе Зависимости добавьте веб-зависимость выберите версию Java 8 или 11 .

Важно!

Для выполнения действий, описанных в этой статье, требуется Spring Boot версии 2.5 или более поздней.

Используйте пускатель Spring Cloud Stream Binder

  1. Найдите файл pom.xml в родительском каталоге вашего приложения; например:

    C:\SpringBoot\servicebus\pom.xml

    - или -

    /users/example/home/servicebus/pom.xml

  2. Затем откройте файл pom.xml в текстовом редакторе.

  3. Добавьте следующий блок кода в элемент :<dependencies>

    <dependency>
        <groupId>com.azure.spring</groupId>
        <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

    Примечание

    Дополнительные сведения об управлении версиями библиотеки Spring Cloud Azure с помощью спецификации (BOM) см. в разделе Начало работы .

  4. Сохраните и закройте файл pom.xml.

Настройка приложения для служебной шины

Настроить приложение можно на основе строки подключения или субъекта-службы. В этом руководстве используется строка подключения. Дополнительные сведения об использовании субъекта-службы см. в статье Spring Cloud Azure Stream Binder for Service Bus queue Code Sample ( Пример кода для очереди служебной шины).

  1. Добавьте файл application.yaml в каталог resources вашего приложения, например:

    C:\SpringBoot\servicebus\src\main\resources\application.yaml

    -или-

    /users/example/home/servicebus/src/main/resources/application.yaml

  2. Откройте файл application.yaml в текстовом редакторе и добавьте соответствующий код в конец файла application.yaml (в зависимости от того, используете ли вы очередь или раздел Служебной шины). Используйте Таблицу описания полей, чтобы заменить образцы значений соответствующими свойствами служебной шины.

    Очередь служебной шины

    spring:
      cloud:
        azure:
          servicebus:
            namespace: ${AZURE_SERVICEBUS_NAMESPACE}
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_SERVICEBUS_QUEUE_NAME}
            supply-out-0:
              destination: ${AZURE_SERVICEBUS_QUEUE_NAME}
          servicebus:
            bindings:
              consume-in-0:
                consumer:
                  auto-complete: false
              supply-out-0:
                producer:
                  entity-type: queue
          function:
            definition: consume;supply;
          poller:
            fixed-delay: 1000
            initial-delay: 0
    

    Раздел служебной шины

    spring:
      cloud:
        azure:
          servicebus:
            namespace: ${AZURE_SERVICEBUS_NAMESPACE}
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_SERVICEBUS_TOPIC_NAME}
              group: ${AZURE_SERVICEBUS_TOPIC_SUBSCRIPTION_NAME}
            supply-out-0:
              destination: ${AZURE_SERVICEBUS_TOPIC_NAME}
          servicebus:
            bindings:
              consume-in-0:
                consumer:
                  auto-complete: false
              supply-out-0:
                producer:
                  entity-type: topic
          function:
            definition: consume;supply;
          poller:
            fixed-delay: 1000
            initial-delay: 0
    

    Описания полей

    Поле Описание
    spring.cloud.azure.servicebus.namespace Укажите пространство имен, полученное в служебной шине из портал Azure.
    spring.cloud.stream.function.definition Укажите, какой функциональный bean-компонент нужно привязать к внешним назначениям, предоставляемым привязками.
    spring.cloud.stream.poller.fixed-delay Укажите фиксированную задержку для модуля опроса по умолчанию в миллисекундах (по умолчанию: 1000).
    spring.cloud.stream.poller.initial-delay Укажите начальную задержку для периодических триггеров (по умолчанию: 0).
    spring.cloud.stream.bindings.consume-in-0.destination Укажите очередь служебной шины или раздел служебной шины, использованные в этом руководстве.
    spring.cloud.stream.bindings.consume-in-0.group Если вы использовали раздел служебной шины, укажите подписку на раздел.
    spring.cloud.stream.bindings.supply-out-0.destination Укажите то же значение, которое использовалось для назначения ввода.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Укажите, следует ли автоматически урегулировать сообщения. Если задано значение false, будет добавлен заголовок Checkpointer сообщения , чтобы позволить разработчикам урегулировать сообщения вручную.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Укажите тип сущности для выходной привязки queue : или topic.
  3. Сохраните и закройте файл application.yaml.

Реализация основных функций служебной шины

В этом разделе вы создадите классы Java, необходимые для отправки сообщений в служебную шину.

Изменение класса основного приложения

  1. Найдите файл основного приложения Java в каталоге пакета приложения, например:

    C:\SpringBoot\servicebus\src\main\java\com\example\servicebus\ServiceBusApplication.java

    -или-

    /users/example/home/servicebus/src/main/java/com/example/servicebus/ServiceBusApplication.java

  2. Откройте главный файл приложения Java в текстовом редакторе.

  3. Добавьте в него указанный ниже код.

    package com.example.servicebus;
    
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    
    import java.util.function.Consumer;
    
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusApplication {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusApplication.class);
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusApplication.class, args);
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e -> LOGGER.error("Error found", e))
                            .block();
            };
        }
    }
    
  4. Сохраните файл и закройте его.

Создание класса конфигурации производителя

  1. С помощью текстового редактора создайте файл Java с именем ServiceProducerConfiguration.java в каталоге пакета приложения.

  2. Добавьте в новый файл указанный ниже код:

    package com.example.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.Message;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    
    import java.util.function.Supplier;
    
    @Configuration
    public class ServiceProducerConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerConfiguration.class);
    
        @Bean
        public Sinks.Many<Message<String>> many() {
            return Sinks.many().unicast().onBackpressureBuffer();
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) {
            return () -> many.asFlux()
                             .doOnNext(m -> LOGGER.info("Manually sending message {}", m))
                             .doOnError(t -> LOGGER.error("Error encountered", t));
        }
    }
    
  3. Сохраните и закройте файл ServiceProducerConfiguration.java.

Создание класса контроллера

  1. С помощью текстового редактора создайте файл Java с именем ServiceProducerController.java в каталоге пакета приложения.

  2. Добавьте в новый файл следующие строки кода:

    package com.example.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.ResponseEntity;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Sinks;
    
    @RestController
    public class ServiceProducerController {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerController.class);
    
        @Autowired
        private Sinks.Many<Message<String>> many;
    
        @PostMapping("/messages")
        public ResponseEntity<String> sendMessage(@RequestParam String message) {
            LOGGER.info("Going to add message {} to Sinks.Many.", message);
            many.emitNext(MessageBuilder.withPayload(message).build(), Sinks.EmitFailureHandler.FAIL_FAST);
            return ResponseEntity.ok("Sent!");
        }
    }
    
  3. Сохраните и закройте файл ServiceProducerController.java.

Сборка и тестирование приложения

  1. Откройте командную строку.

  2. Измените каталог на расположение файла pom.xml, например:

    cd C:\SpringBoot\servicebus

    -или-

    cd /users/example/home/servicebus

  3. Создайте приложение Spring Boot с помощью Maven и запустите его:

    mvn clean spring-boot:run
    
  4. После запуска приложения вы можете использовать средство curl, чтобы протестировать приложение:

    curl -X POST localhost:8080/messages?message=hello
    

    В журнале приложения появится запись "Привет":

    New message received: 'hello'
    Message 'hello' successfully checkpointed
    

Очистка ресурсов

Если они больше не нужны, используйте портал Azure, чтобы удалить ресурсы, созданные в этой статье во избежание непредвиденных расходов.

Дальнейшие действия