Поделиться через


Spring Cloud поддержка Azure для Spring Cloud Stream

Эта статья относится к: ✔️ версия 4.14.0 ✔️ версии 5.8.0

Spring Cloud Stream — это платформа для создания высокомасштабируемых микрослужб, управляемых событиями, подключенных к общим системам обмена сообщениями.

Платформа предоставляет гибкую модель программирования, созданную на основе уже установленных и знакомых идиом Spring idioms и рекомендаций. Эти рекомендации включают поддержку сохраняемой семантики паба или подсемантики, групп потребителей и секций с отслеживанием состояния.

К текущим реализациям привязки относятся:

Spring Cloud Stream Binder для Центры событий Azure

Основные понятия

Привязка Spring Cloud Stream для Центры событий Azure предоставляет реализацию привязки для платформы Spring Cloud Stream. Эта реализация использует адаптеры каналов Концентраторов событий Spring Integration в своей основе. С точки зрения проектирования центры событий похожи на Kafka. Кроме того, к центрам событий можно получить доступ через API Kafka. Если проект имеет жесткую зависимость от API Kafka, вы можете попробовать Концентратор событий с помощью примера API Kafka

Группа потребителей

Центры событий обеспечивают аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Пока Kafka сохраняет все зафиксированные смещения в брокере, необходимо хранить смещения сообщений Центров событий, обрабатываемых вручную. Пакет SDK центров событий предоставляет функцию для хранения таких смещения в служба хранилища Azure.

Поддержка секционирования

Центры событий предоставляют аналогичную концепцию физической секции, как Kafka. Но в отличие от автоматической перебалансировки Kafka между потребителями и секциями, Центры событий предоставляют вид предварительного режима. Учетная запись хранения выступает в качестве аренды, чтобы определить, какой потребитель владеет какой секцией. При запуске нового потребителя он пытается украсть некоторые секции из наиболее сильно загруженных потребителей для достижения баланса рабочей нагрузки.

Чтобы указать стратегию балансировки нагрузки, предоставляются свойства spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* . Дополнительные сведения см. в разделе "Свойства потребителя".

Поддержка потребителей пакетной службы

Привязка центров событий Spring Cloud Azure Stream поддерживает функцию потребителя пакетной службы Spring Cloud Stream.

Чтобы работать с режимом пакетного потребителя, задайте spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode для свойства значение true. При включении сообщение со полезными данными списка пакетных событий получается и передается в функцию Consumer . Каждый заголовок сообщения также преобразуется в список, из которого содержимое является соответствующим значением заголовка, проанализированным из каждого события. Общие заголовки идентификатора секции, проверка указателя и последние закручанные свойства представлены в виде одного значения, так как весь пакет событий использует одно и то же значение. Дополнительные сведения см. в разделе заголовков сообщений Центров событий Spring Cloud поддержка Azure для Spring Integration.

Примечание.

Заголовок проверка point существует только в том случае, если MANUAL используется режим проверка point.

Контрольная точка для потребителя пакетной службы поддерживает два режима: BATCH и MANUAL. BATCHрежим автоматического проверка назначения, чтобы проверка определить весь пакет событий вместе после получения привязки. MANUALрежим — проверка указать события пользователями. При использовании он Checkpointer передается в заголовок сообщения, и пользователи могут использовать его для проверка назначения.

Размер пакета можно указать, задав max-size префикс spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.и max-wait-time свойства. Свойство max-size необходимо, и max-wait-time свойство является необязательным. Дополнительные сведения см. в разделе "Свойства потребителя".

Настройка зависимостей

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Кроме того, можно использовать начальный центр событий Azure Stream Spring Cloud, как показано в следующем примере для Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Настройка

Привязыватель предоставляет следующие три части параметров конфигурации:

свойства конфигурации Подключение ion

В этом разделе содержатся параметры конфигурации, используемые для подключения к Центры событий Azure.

Примечание.

Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с помощью идентификатора Microsoft Entra ID , чтобы убедиться, что субъект безопасности предоставлен достаточное разрешение на доступ к ресурсу Azure.

Подключение настраиваемые свойства spring-cloud-azure-stream-binder-eventhubs:

Свойство Type Описание
spring.cloud.azure.eventhubs.enabled boolean Включена ли Центры событий Azure.
spring.cloud.azure.eventhubs.connection-string Строка Пространство имен Центров событий строка подключения значение.
spring.cloud.azure.eventhubs.namespace Строка Значение пространства имен Центров событий, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Строка Доменное имя значения пространства имен Центры событий Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address Строка Адрес пользовательской конечной точки.

Совет

Общие параметры конфигурации пакета SDK службы Azure можно настроить для привязки Центров событий Azure Stream Spring Cloud. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azure и могут быть настроены с помощью единого spring.cloud.azure.eventhubs.префикса или префиксаspring.cloud.azure..

Привязка также поддерживает Spring Could Azure Resource Manager по умолчанию. Дополнительные сведения о том, как получить строка подключения с субъектами безопасности, которые не предоставляются со Data связанными ролями, см. в разделе "Базовое использование" Spring Could Azure Resource Manager.

Свойства конфигурации контрольной точки

В этом разделе содержатся параметры конфигурации службы служба хранилища BLOB-объектов, которая используется для сохранения владельца секции и проверка точек.

Примечание.

В версии 4.0.0, когда свойство spring.cloud.azure.eventhubs.processor.проверкаpoint-store.create-container-if-not-exists не включен вручную, контейнер служба хранилища не будет создан автоматически с именем spring.cloud.stream.bindings.bindings.binding-name.destination.

Контрольные точки настраиваемых свойств spring-cloud-azure-stream-binder-eventhubs:

Свойство Type Описание
spring.cloud.azure.eventhubs.processor. проверка point-store.create-container-if-not-exists Логический Разрешить ли создание контейнеров, если они отсутствуют.
spring.cloud.azure.eventhubs.processor. проверка point-store.account-name Строка Имя учетной записи хранения.
spring.cloud.azure.eventhubs.processor. проверка point-store.account-key Строка ключ доступа к учетной записи хранения;
spring.cloud.azure.eventhubs.processor. проверка point-store.container-name Строка служба хранилища имя контейнера.

Совет

Общие параметры конфигурации пакета SDK службы Azure можно настроить для хранилища служба хранилища BLOB-объектов проверка point. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azure и могут быть настроены с помощью единого spring.cloud.azure.eventhubs.processor.checkpoint-storeпрефикса или префиксаspring.cloud.azure..

свойства конфигурации привязки Центры событий Azure

Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.

Свойства потребителя

Эти свойства предоставляются через EventHubsConsumerProperties.

Настраиваемые свойства потребителей spring-cloud-azure-stream-binder-eventhubs:

Свойство Type Описание
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.проверкаpoint.mode Контрольная точкаMode Режим контрольной точки, используемый, когда потребитель решает, как проверка point message
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.проверкаpoint.count Целое Определяет объем сообщения для каждой секции, чтобы сделать одну проверка точку. Вступают в силу только в том случае, если PARTITION_COUNT используется режим проверка point.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.проверкаpoint.interval Длительность Определяет интервал времени для выполнения одной проверка точки. Вступают в силу только в том случае, если TIME используется режим проверка point.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Целое Максимальное количество событий в пакете. Требуется для режима пакетного потребителя.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Длительность Максимальная длительность использования пакетной службы. Вступают в силу только в том случае, если включен режим пакетного потребителя и является необязательным.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.update-interval Длительность Длительность интервала обновления.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.strategy LoadBalancingStrategy Стратегия балансировки нагрузки.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.partition-ownership-expiration-interval Длительность Срок действия, после которого истекает срок владения секцией.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Логический Должен ли обработчик событий запрашивать сведения о последнем заквеченном событии в связанной секции и отслеживать эти сведения по мере получения событий.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Целое Число, используемое потребителем для управления числом событий, которые потребитель Концентратора событий будет активно получать и очереди локально.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Сопоставление с ключом в качестве идентификатора секции и значений StartPositionProperties Карта, содержащая позицию события, используемую для каждой секции, если точка проверка для секции не существует в хранилище проверка point. Эта карта будет ключом от идентификатора секции.

Примечание.

Конфигурация initial-partition-event-position принимает начальную map позицию для каждого концентратора событий. Таким образом, его ключом является идентификатор секции, а значение состоит из StartPositionProperties свойств смещения, порядкового номера, заквеченного времени даты и того, является ли включено. Например, его можно задать как

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Расширенная конфигурация потребителя

Приведенное выше подключение, проверка point и общая настройка конфигурации клиента Azure SDK для каждого потребителя привязки, которую можно настроить с префиксомspring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Свойства производителя

Эти свойства предоставляются через EventHubsProducerProperties.

Настраиваемые свойства spring-cloud-azure-stream-binder-eventhubs:

Свойство Type Описание
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolean Флаг переключателя для синхронизации производителя. Если задано значение true, производитель ожидает ответа после операции отправки.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout длинный Время ожидания ответа после операции отправки. Вступают в силу только в том случае, если производитель синхронизации включен.
Расширенная конфигурация производителя

Приведенное выше подключение и общая настройка конфигурации клиента Azure SDK для каждого производителя привязки, которую можно настроить с префиксомspring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Базовое использование

Отправка и получение сообщений из центров событий

  1. Заполните параметры конфигурации учетными данными.

    • Для учетных данных в качестве строка подключения настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Примечание.

Допустимые значенияtenant-id: common, organizationsconsumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.

  • Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml :

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Определение поставщика и потребителя.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Поддержка секционирования

Для PartitionSupplier настройки сведений о секции, предоставленных пользователем, создается настройка сведений о секционированиях для отправки сообщения. В следующей блок-схеме показано, как получить различные приоритеты для идентификатора секции и ключа:

Diagram showing a flowchart of the partitioning support process.

Поддержка потребителей пакетной службы

  1. Укажите параметры пакетной конфигурации, как показано в следующем примере:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Определение поставщика и потребителя.

    Для режима проверка назначения BATCHкак можно использовать следующий код для отправки сообщений и использования в пакетах.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Для режима проверка назначения MANUALкак можно использовать следующий код для отправки сообщений и использования и проверка point в пакетах.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Примечание.

В пакетном режиме используется application/jsonтип содержимого по умолчанию привязки Spring Cloud Stream, поэтому убедитесь, что полезные данные сообщения соответствуют типу контента. Например, при использовании типа application/json контента по умолчанию для получения сообщений с String полезными данными полезные данные должны быть JSON Stringокружены двойными кавычками исходного String текста. В то время как для text/plain типа контента он может быть объектом напрямую String . Дополнительные сведения см. в разделе "Согласование типов контента Spring Cloud Stream".

Реагирование на сообщения об ошибках

  • Обработка сообщений об ошибках исходящей привязки

    По умолчанию Spring Integration создает глобальный канал errorChannelошибок. Настройте следующую конечную точку сообщения для обработки исходящих сообщений об ошибках привязки:

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Обработка сообщений об ошибках входящего привязки

    Binder Центров событий Spring Cloud Stream поддерживает два решения для обработки ошибок для привязок входящих сообщений: пользовательских каналов ошибок и обработчиков.

    Канал ошибок:

    Spring Cloud Stream предоставляет канал ошибок для каждой входящей привязки. Отправляется ErrorMessage в канал ошибок. Дополнительные сведения см. в документации по Spring Cloud Stream с обработкой ошибок .

    • Канал ошибок по умолчанию

      Глобальный канал ошибок с именем errorChannel можно использовать для использования всех входящих сообщений об ошибках привязки. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Канал ошибок, зависящий от привязки

      Вы можете использовать определенный канал ошибок для использования определенных сообщений об ошибках входящего трафика с более высоким приоритетом, чем канал ошибок по умолчанию. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Примечание.

      Канал ошибок, зависящий от привязки, является взаимоисключающим с другими предоставленными обработчиками ошибок и каналами.

    Обработчик ошибок:

    Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления Consumer экземпляра ErrorMessage , который принимает экземпляры. Дополнительные сведения см . в документации по Spring Cloud Stream об обработке ошибок.

    Примечание.

    При настройке любого обработчика ошибок привязки он может работать с каналом ошибок по умолчанию.

    • Обработчик ошибок по умолчанию привязки

      Настройте одну Consumer бобовую строку для использования всех сообщений об ошибках входящего привязки. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать spring.cloud.stream.default.error-handler-definition для свойства имя функции.

    • Обработчик ошибок, зависящих от привязки

      Consumer Настройте bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика и имеет более высокий приоритет, чем обработчик ошибок по умолчанию привязки:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition для свойства имя функции.

Заголовки сообщений Центров событий

Основные поддерживаемые заголовки сообщений см. в разделе заголовков сообщений Центров событий Spring Cloud поддержка Azure для Spring Integration.

Поддержка нескольких привязок

Подключение в несколько пространств имен Центров событий также поддерживается с помощью нескольких привязок. В этом примере в качестве примера используется строка подключения. Также поддерживаются учетные данные субъектов-служб и управляемых удостоверений. Связанные свойства можно задать в параметрах среды привязки.

  1. Чтобы использовать несколько привязок с Центрами событий, настройте следующие свойства в файле application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Примечание.

    В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать такую конфигурацию, как spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Нам нужно определить двух поставщиков и двух потребителей:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Подготовка ресурсов

Привязка Центров событий поддерживает подготовку концентратора событий и группы потребителей, пользователи могут использовать следующие свойства для включения подготовки.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Примечание.

Допустимые значенияtenant-id: common, organizationsconsumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.

Примеры

Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.

Spring Cloud Stream Binder для Служебной шины Azure

Основные понятия

Привязка Spring Cloud Stream Binder для Служебная шина Azure предоставляет реализацию привязки для Spring Cloud Stream Framework. Эта реализация использует адаптеры канала Spring Integration служебная шина в своей основе.

Запланированное сообщение

Эта привязка поддерживает отправку сообщений в раздел для отложенной обработки. Пользователи могут отправлять запланированные сообщения с заголовком x-delay в миллисекундах время задержки сообщения. Сообщение будет доставлено в соответствующие разделы после x-delay миллисекунда.

Группа потребителей

служебная шина тема обеспечивает аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Этот привязчик зависит от Subscription раздела, который будет выступать в качестве группы потребителей.

Настройка зависимостей

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Кроме того, можно использовать Azure Stream Spring Cloud служебная шина Starter, как показано в следующем примере для Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Настройка

Привязка предоставляет следующие две части параметров конфигурации:

свойства конфигурации Подключение ion

В этом разделе содержатся параметры конфигурации, используемые для подключения к Служебная шина Azure.

Примечание.

Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с помощью идентификатора Microsoft Entra ID , чтобы убедиться, что субъект безопасности предоставлен достаточное разрешение на доступ к ресурсу Azure.

Подключение настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:

Свойство Type Описание
spring.cloud.azure.servicebus.enabled boolean Включена ли Служебная шина Azure.
spring.cloud.azure.servicebus.connection-string Строка служебная шина значение пространства имен строка подключения.
Spring.cloud.azure.servicebus.namespace Строка служебная шина значение пространства имен, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Строка Доменное имя значения пространства имен Служебная шина Azure.

Примечание.

Общие параметры конфигурации пакета SDK службы Azure можно настроить для привязки Azure Stream Spring Cloud служебная шина. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azure и могут быть настроены с помощью единого spring.cloud.azure.servicebus.префикса или префиксаspring.cloud.azure..

Привязка также поддерживает Spring Could Azure Resource Manager по умолчанию. Дополнительные сведения о том, как получить строка подключения с субъектами безопасности, которые не предоставляются со Data связанными ролями, см. в разделе "Базовое использование" Spring Could Azure Resource Manager.

свойства конфигурации привязки Служебная шина Azure

Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.

Свойства потребителя

Эти свойства предоставляются через ServiceBusConsumerProperties.

Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:

Свойство Тип По умолчанию. Description
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolean false Если неудачные сообщения перенаправляются в DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Целое 1 Максимальное число одновременных сообщений, которые должен обрабатывать клиент обработчика служебная шина.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-session Целое null Максимальное количество одновременных сеансов для обработки в любое время.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Логический null Включен ли сеанс.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Целое 0 Число предварительных выборок клиента процессора служебная шина.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue ничего Тип вложенной очереди для подключения.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Длительность 5 м Время для продолжения автоматического продления блокировки.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Режим получения клиента процессора служебная шина.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Логический true Следует ли автоматически урегулировать сообщения. Если задано значение false, заголовок Checkpointer сообщения будет добавлен, чтобы разработчики могли вручную отрегулировать сообщения.
Расширенная конфигурация потребителя

Приведенное выше подключение и общая настройка конфигурации клиента Azure SDK для каждого потребителя привязки, которую можно настроить с префиксомspring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Свойства производителя

Эти свойства предоставляются через ServiceBusProducerProperties.

Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:

Свойство Тип По умолчанию. Description
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolean false Переключение флага для синхронизации производителя.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout длинный 10000 Значение времени ожидания для отправки производителя.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType null служебная шина тип сущности производителя, необходимый для производителя привязки.

Важно!

При использовании производителя привязки spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type необходимо настроить свойство.

Расширенная конфигурация производителя

Приведенное выше подключение и общая настройка конфигурации клиента Azure SDK для каждого производителя привязки, которую можно настроить с префиксомspring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Базовое использование

Отправка и получение сообщений из служебная шина

  1. Заполните параметры конфигурации учетными данными.

    • Для учетных данных в качестве строка подключения настройте следующие свойства в файле application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml :

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Примечание.

Допустимые значенияtenant-id: common, organizationsconsumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.

  • Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml :

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Определение поставщика и потребителя.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Поддержка ключа секции

Привязка поддерживает секционирование служебная шина путем задания ключа секции и идентификатора сеанса в заголовке сообщения. В этом разделе описывается настройка ключа секции для сообщений.

Spring Cloud Stream предоставляет свойство spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionвыражения выражения spEL ключа раздела. Например, присвойите этому свойству "'partitionKey-' + headers[<message-header-key>]" значение и добавьте заголовок с именем message-header-key. Spring Cloud Stream использует значение этого заголовка при оценке выражения для назначения ключа секции. В следующем коде представлен пример производителя:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Поддержка сеансов

Привязка поддерживает сеансы сообщений служебная шина. Идентификатор сеанса сообщения можно задать с помощью заголовка сообщения.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Примечание.

Согласно служебная шина секционирование, идентификатор сеанса имеет более высокий приоритет, чем ключ секции. Поэтому при установке обоих ServiceBusMessageHeaders#SESSION_ID заголовков ServiceBusMessageHeaders#PARTITION_KEY значение идентификатора сеанса в конечном итоге используется для перезаписи значения ключа секции.

Реагирование на сообщения об ошибках

  • Обработка сообщений об ошибках исходящей привязки

    По умолчанию Spring Integration создает глобальный канал errorChannelошибок. Настройте следующую конечную точку сообщения для обработки сообщения об ошибке исходящей привязки.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Обработка сообщений об ошибках входящего привязки

    Spring Cloud Stream служебная шина Binder поддерживает три решения для обработки ошибок для привязок входящих сообщений: обработчик ошибок привязки, настраиваемых каналов ошибок и обработчиков.

    Обработчик ошибок привязки:

    Обработчик ошибок привязки по умолчанию обрабатывает входящий привязку. Этот обработчик используется для отправки неудачных сообщений в очередь недоставленных писем при spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected включении. В противном случае не удалось отказаться от сообщений. За исключением настройки канала ошибок, зависящем от привязки, обработчик ошибок привязки всегда действует независимо от того, существуют ли другие пользовательские обработчики ошибок или каналы.

    Канал ошибок:

    Spring Cloud Stream предоставляет канал ошибок для каждой входящей привязки. Отправляется ErrorMessage в канал ошибок. Дополнительные сведения см. в документации по Spring Cloud Stream с обработкой ошибок .

    • Канал ошибок по умолчанию

      Глобальный канал ошибок с именем errorChannel можно использовать для использования всех входящих сообщений об ошибках привязки. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Канал ошибок, зависящий от привязки

      Вы можете использовать определенный канал ошибок для использования определенных сообщений об ошибках входящего трафика с более высоким приоритетом, чем канал ошибок по умолчанию. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Примечание.

      Канал ошибок, зависящий от привязки, является взаимоисключающим с другими предоставленными обработчиками ошибок и каналами.

    Обработчик ошибок:

    Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления Consumer экземпляра ErrorMessage , который принимает экземпляры. Дополнительные сведения см . в документации по Spring Cloud Stream об обработке ошибок.

    Примечание.

    При настройке любого обработчика ошибок привязки он может работать с каналом ошибок по умолчанию и обработчиком ошибок привязки.

    • Обработчик ошибок по умолчанию привязки

      Настройте одну Consumer бобовую строку для использования всех сообщений об ошибках входящего привязки. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать spring.cloud.stream.default.error-handler-definition для свойства имя функции.

    • Обработчик ошибок, зависящих от привязки

      Consumer Настройте bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика с более высоким приоритетом, чем обработчик ошибок по умолчанию привязки.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition для свойства имя функции.

заголовки сообщений служебная шина

Основные поддерживаемые заголовки сообщений см. в разделе заголовков сообщений служебная шина Spring Cloud поддержка Azure для Spring Integration.

Примечание.

При настройке ключа секции приоритет заголовка сообщения выше свойства Spring Cloud Stream. Поэтому spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression вступает в силу только в том случае, если ни один из ServiceBusMessageHeaders#SESSION_IDServiceBusMessageHeaders#PARTITION_KEY заголовков не настроен.

Поддержка нескольких привязок

Подключение в несколько пространств имен служебная шина также поддерживается с помощью нескольких привязок. Этот пример принимает строка подключения в качестве примера. Учетные данные субъектов-служб и управляемых удостоверений также поддерживаются, пользователи могут задавать связанные свойства в параметрах среды каждого привязки.

  1. Чтобы использовать несколько привязок ServiceBus, настройте следующие свойства в файле application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Примечание.

    В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать такую конфигурацию, как spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. нам нужно определить двух поставщиков и двух потребителей

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Подготовка ресурсов

Привязка служебной шины поддерживает подготовку очередей, раздела и подписки, пользователи могут использовать следующие свойства для включения подготовки.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Примечание.

Допустимые значенияtenant-id: common, organizationsconsumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.

Примеры

Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.