Spring Cloud Stream с Служебная шина Azure

В этой статье показано, как использовать Spring Cloud Stream Binder для отправки сообщений в служебную шину и получения сообщений из нее queues и topics.

Azure предоставляет платформу асинхронного обмена сообщениями, именуемую служебной шиной Azure ("служебная шина"), основанную на стандарте Advanced Message Queueing Protocol 1.0 (Расширенный протокол управления очередью сообщений "AMQP 1.0"). Служебную шину можно использовать в пределах поддерживаемых платформ Azure.

Необходимые компоненты

Примечание.

Чтобы предоставить учетной записи доступ к ресурсам Служебная шина Azure, назначьте Azure Service Bus Data SenderAzure Service Bus Data Receiver и роль учетной записи Microsoft Entra, которую вы используете. Дополнительные сведения о предоставлении ролей доступа см. в статье "Назначение ролей Azure" с помощью портал Azure и проверки подлинности и авторизация приложения с помощью идентификатора Microsoft Entra для доступа к сущностям Служебная шина Azure.

Важно!

Для выполнения действий, описанных в этой статье, требуется spring Boot версии 2.5 или более поздней.

Отправка и получение сообщений из Служебная шина Azure

С помощью очереди или раздела для Служебная шина Azure вы можете отправлять и получать сообщения с помощью служебная шина Spring Cloud Azure Stream Binder.

Чтобы установить модуль Служебная шина Azure Stream Binder Spring Cloud, добавьте следующие зависимости в файл pom.xml:

  • Выставление счетов за материалы Spring Cloud (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.9.1</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Примечание.

    Если вы используете Spring Boot 2.x, обязательно установите для нее spring-cloud-azure-dependencies значение 4.15.0. Этот счет материалов (BOM) должен быть настроен в <dependencyManagement> разделе pom.xml файла. Это гарантирует, что все зависимости Azure Spring Cloud используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".

  • Артефакт Azure Stream Binder Spring Cloud служебная шина:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Добавление кода приложения

Чтобы настроить приложение для отправки и получения сообщений, используйте очередь или раздел служебная шина.

  1. Настройте учетные данные служебная шина в файле application.propertiesконфигурации.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    В следующей таблице описаны поля в конфигурации:

    Поле Description
    spring.cloud.azure.servicebus.namespace Укажите пространство имен, полученное в служебная шина из портал Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Укажите очередь служебной шины или раздел служебной шины, использованные в этом руководстве.
    spring.cloud.stream.bindings.supply-out-0.destination Укажите то же значение, которое использовалось для назначения ввода.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Укажите, следует ли автоматически урегулировать сообщения. Если задано значение false, заголовок Checkpointer сообщения будет добавлен, чтобы разработчики могли вручную отрегулировать сообщения.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Укажите тип сущности для выходной привязки, может быть queue или topic.
    spring.cloud.function.definition Укажите, какой функциональный bean-компонент нужно привязать к внешним назначениям, предоставляемым привязками.
    spring.cloud.stream.poller.fixed-delay Укажите фиксированную задержку для опроса по умолчанию в миллисекундах. Значение по умолчанию — 1000 L. Рекомендуемое значение — 60000.
    spring.cloud.stream.poller.initial-delay Укажите начальную задержку для периодических триггеров. Значение по умолчанию — 0.
  2. Измените файл класса запуска, чтобы отобразить следующее содержимое.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Совет

    В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует DefaultAzureCredentialбиблиотеку удостоверений Azure, чтобы получить учетные данные без каких-либо изменений кода.

    DefaultAzureCredential поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.

    Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в статье о проверке подлинности Azure в средах разработки Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. См. сведения об управляемых удостоверениях для ресурсов Azure.

  3. Запустите приложение. Сообщения, как показано в следующем примере, будут размещены в журнале приложений:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Следующие шаги