Spring Cloud Stream с Центры событий Azure

В этом руководстве показано, как отправлять и получать сообщения с помощью Центры событий Azure и Spring Cloud Stream Binder Eventhubs в приложении Spring Boot.

Предварительные условия

Примечание.

Чтобы предоставить учетной записи доступ к ресурсам, в Центры событий Azure назначьте роль Центры событий Azure Data Receiver и Центры событий Azure Data Sender учетной записи Microsoft Entra, которую вы используете. Затем в учетной записи служба хранилища Azure назначьте роль Storage Blob Data Contributor для учетной записи Microsoft Entra, которую вы используете. Дополнительные сведения о предоставлении ролей доступа см. в статье Назначение ролей Azure с помощью портала Azure и Авторизация доступа к ресурсам Центров событий с помощью Microsoft Entra ID.

Внимание

Для выполнения действий, описанных в этом руководстве, требуется Spring Boot версии 2.5 или более поздней.

Отправка и получение сообщений из Центры событий Azure

С помощью учетной записи служба хранилища Azure и концентратора событий Azure можно отправлять и получать сообщения с помощью Центров событий Spring Cloud Azure Stream Binder.

Чтобы установить модуль Центров событий Stream Binder Azure Spring Cloud, добавьте следующие зависимости в файл pom.xml:

  • Акт технических спецификаций Spring Cloud Azure (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.2.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Примечание.

    Если вы используете Spring Boot 4.0.x, обязательно установите версию, указанную в spring-cloud-azure-dependencies, на значение 7.2.0.

    Если вы используете Spring Boot 3.5.x, обязательно задайте версию spring-cloud-azure-dependencies на 6.2.0.

    Если вы используете Spring Boot 3.1.x-3.5.x, обязательно установите для нее версию spring-cloud-azure-dependencies на 5.25.0.

    Если вы используете Spring Boot 2.x, обязательно установите версию spring-cloud-azure-dependencies на 4.20.0.

    Эта ведомость материалов (BOM) должна быть сконфигурирована в <dependencyManagement> разделе файла pom.xml. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию.

    Дополнительные сведения о версии, используемой для этого BOM, см. в разделе Какую версию Spring Cloud Azure я должен использовать.

  • Артефакт Центров событий Stream Binder в Spring Cloud Azure:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Программирование приложения

Выполните следующие действия, чтобы настроить приложение для создания и использования сообщений с помощью Центры событий Azure.

  1. Настройте учетные данные концентратора событий, добавив следующие свойства в файл application.properties .

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    В следующей таблице описаны поля в конфигурации:

    Поле Описание
    spring.cloud.azure.eventhubs.namespace Укажите пространство имен вашего концентратора событий, полученное на портале Azure.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Укажите учетную запись хранения, созданную в этом руководстве.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Укажите контейнер учетной записи хранения.
    spring.cloud.stream.bindings.consume-in-0.destination Укажите концентратор событий, используемый в этом руководстве.
    spring.cloud.stream.bindings.consume-in-0.group Укажите группы потребителей в экземпляре Event Hubs.
    spring.cloud.stream.bindings.supply-out-0.destination Укажите тот же концентратор событий, который использовался в этом руководстве.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode Укажите MANUAL.
    spring.cloud.function.definition Укажите, какой функциональный bean-компонент нужно привязать к внешним точкам назначения, раскрываемым привязками.
    spring.cloud.stream.poller.initial-delay Укажите начальную задержку для периодических триггеров. Значение по умолчанию — 0.
    spring.cloud.stream.poller.fixed-delay Укажите фиксированную задержку для опросчика по умолчанию в миллисекундах. Значение по умолчанию — 1000 L.
  2. Измените файл класса запуска, чтобы отобразить следующее содержимое.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Совет

    В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует DefaultAzureCredential, которую предоставляет библиотека удостоверений Azure для получения учетных данных без необходимости изменения кода.

    DefaultAzureCredential поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.

    Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в разделе аутентификация Azure в средах разработки на Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. Дополнительные сведения см. в разделе Управляемые удостоверения для ресурсов Azure?

  3. Запустите приложение. Такие сообщения будут размещены в журнале приложений, как показано в следующем примере выходных данных:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Развертывание в Приложения Azure Spring

Теперь, когда у вас есть приложение Spring Boot, работающее локально, пришло время переместить его в рабочую среду. Приложения Azure Spring упрощает развертывание приложений Spring Boot для Azure без каких-либо изменений кода. Эта служба управляет инфраструктурой приложений Spring, благодаря чему разработчики могут сосредоточиться на коде. Приложения Azure Spring обеспечивает управление жизненным циклом с помощью комплексного мониторинга и диагностики, управления конфигурацией, обнаружения служб, интеграции CI/CD, развертывания по схеме blue-green и прочее. Чтобы развернуть ваше приложение в Приложения Azure Spring, см. раздел Как развернуть первое приложение в Приложения Azure Spring.

Следующие шаги