Spring Cloud поддержка Azure для Spring Cloud Stream
Эта статья относится к: ✔️ версия 4.14.0 ✔️ версии 5.8.0
Spring Cloud Stream — это платформа для создания высокомасштабируемых микрослужб, управляемых событиями, подключенных к общим системам обмена сообщениями.
Платформа предоставляет гибкую модель программирования, созданную на основе уже установленных и знакомых идиом Spring idioms и рекомендаций. Эти рекомендации включают поддержку сохраняемой семантики паба или подсемантики, групп потребителей и секций с отслеживанием состояния.
К текущим реализациям привязки относятся:
spring-cloud-azure-stream-binder-eventhubs
— дополнительные сведения см. в разделе Spring Cloud Stream Binder для Центры событий Azurespring-cloud-azure-stream-binder-servicebus
— дополнительные сведения см. в разделе Spring Cloud Stream Binder для Служебная шина Azure
Spring Cloud Stream Binder для Центры событий Azure
Основные понятия
Привязка Spring Cloud Stream для Центры событий Azure предоставляет реализацию привязки для платформы Spring Cloud Stream. Эта реализация использует адаптеры каналов Концентраторов событий Spring Integration в своей основе. С точки зрения проектирования центры событий похожи на Kafka. Кроме того, к центрам событий можно получить доступ через API Kafka. Если проект имеет жесткую зависимость от API Kafka, вы можете попробовать Концентратор событий с помощью примера API Kafka
Группа потребителей
Центры событий обеспечивают аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Пока Kafka сохраняет все зафиксированные смещения в брокере, необходимо хранить смещения сообщений Центров событий, обрабатываемых вручную. Пакет SDK центров событий предоставляет функцию для хранения таких смещения в служба хранилища Azure.
Поддержка секционирования
Центры событий предоставляют аналогичную концепцию физической секции, как Kafka. Но в отличие от автоматической перебалансировки Kafka между потребителями и секциями, Центры событий предоставляют вид предварительного режима. Учетная запись хранения выступает в качестве аренды, чтобы определить, какой потребитель владеет какой секцией. При запуске нового потребителя он пытается украсть некоторые секции из наиболее сильно загруженных потребителей для достижения баланса рабочей нагрузки.
Чтобы указать стратегию балансировки нагрузки, предоставляются свойства spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
. Дополнительные сведения см. в разделе "Свойства потребителя".
Поддержка потребителей пакетной службы
Привязка центров событий Spring Cloud Azure Stream поддерживает функцию потребителя пакетной службы Spring Cloud Stream.
Чтобы работать с режимом пакетного потребителя, задайте spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
для свойства значение true
. При включении сообщение со полезными данными списка пакетных событий получается и передается в функцию Consumer
. Каждый заголовок сообщения также преобразуется в список, из которого содержимое является соответствующим значением заголовка, проанализированным из каждого события. Общие заголовки идентификатора секции, проверка указателя и последние закручанные свойства представлены в виде одного значения, так как весь пакет событий использует одно и то же значение. Дополнительные сведения см. в разделе заголовков сообщений Центров событий Spring Cloud поддержка Azure для Spring Integration.
Примечание.
Заголовок проверка point существует только в том случае, если MANUAL
используется режим проверка point.
Контрольная точка для потребителя пакетной службы поддерживает два режима: BATCH
и MANUAL
. BATCH
режим автоматического проверка назначения, чтобы проверка определить весь пакет событий вместе после получения привязки. MANUAL
режим — проверка указать события пользователями. При использовании он Checkpointer
передается в заголовок сообщения, и пользователи могут использовать его для проверка назначения.
Размер пакета можно указать, задав max-size
префикс spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
и max-wait-time
свойства. Свойство max-size
необходимо, и max-wait-time
свойство является необязательным. Дополнительные сведения см. в разделе "Свойства потребителя".
Настройка зависимостей
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Кроме того, можно использовать начальный центр событий Azure Stream Spring Cloud, как показано в следующем примере для Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Настройка
Привязыватель предоставляет следующие три части параметров конфигурации:
свойства конфигурации Подключение ion
В этом разделе содержатся параметры конфигурации, используемые для подключения к Центры событий Azure.
Примечание.
Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с помощью идентификатора Microsoft Entra ID , чтобы убедиться, что субъект безопасности предоставлен достаточное разрешение на доступ к ресурсу Azure.
Подключение настраиваемые свойства spring-cloud-azure-stream-binder-eventhubs:
Свойство | Type | Описание |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Включена ли Центры событий Azure. |
spring.cloud.azure.eventhubs.connection-string | Строка | Пространство имен Центров событий строка подключения значение. |
spring.cloud.azure.eventhubs.namespace | Строка | Значение пространства имен Центров событий, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Строка | Доменное имя значения пространства имен Центры событий Azure. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Строка | Адрес пользовательской конечной точки. |
Совет
Общие параметры конфигурации пакета SDK службы Azure можно настроить для привязки Центров событий Azure Stream Spring Cloud. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azure и могут быть настроены с помощью единого spring.cloud.azure.eventhubs.
префикса или префиксаspring.cloud.azure.
.
Привязка также поддерживает Spring Could Azure Resource Manager по умолчанию. Дополнительные сведения о том, как получить строка подключения с субъектами безопасности, которые не предоставляются со Data
связанными ролями, см. в разделе "Базовое использование" Spring Could Azure Resource Manager.
Свойства конфигурации контрольной точки
В этом разделе содержатся параметры конфигурации службы служба хранилища BLOB-объектов, которая используется для сохранения владельца секции и проверка точек.
Примечание.
В версии 4.0.0, когда свойство spring.cloud.azure.eventhubs.processor.проверкаpoint-store.create-container-if-not-exists не включен вручную, контейнер служба хранилища не будет создан автоматически с именем spring.cloud.stream.bindings.bindings.binding-name.destination.
Контрольные точки настраиваемых свойств spring-cloud-azure-stream-binder-eventhubs:
Свойство | Type | Описание |
---|---|---|
spring.cloud.azure.eventhubs.processor. проверка point-store.create-container-if-not-exists | Логический | Разрешить ли создание контейнеров, если они отсутствуют. |
spring.cloud.azure.eventhubs.processor. проверка point-store.account-name | Строка | Имя учетной записи хранения. |
spring.cloud.azure.eventhubs.processor. проверка point-store.account-key | Строка | ключ доступа к учетной записи хранения; |
spring.cloud.azure.eventhubs.processor. проверка point-store.container-name | Строка | служба хранилища имя контейнера. |
Совет
Общие параметры конфигурации пакета SDK службы Azure можно настроить для хранилища служба хранилища BLOB-объектов проверка point. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azure и могут быть настроены с помощью единого spring.cloud.azure.eventhubs.processor.checkpoint-store
префикса или префиксаspring.cloud.azure.
.
свойства конфигурации привязки Центры событий Azure
Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.
Свойства потребителя
Эти свойства предоставляются через EventHubsConsumerProperties
.
Настраиваемые свойства потребителей spring-cloud-azure-stream-binder-eventhubs:
Свойство | Type | Описание |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.проверкаpoint.mode | Контрольная точкаMode | Режим контрольной точки, используемый, когда потребитель решает, как проверка point message |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.проверкаpoint.count | Целое | Определяет объем сообщения для каждой секции, чтобы сделать одну проверка точку. Вступают в силу только в том случае, если PARTITION_COUNT используется режим проверка point. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.проверкаpoint.interval | Длительность | Определяет интервал времени для выполнения одной проверка точки. Вступают в силу только в том случае, если TIME используется режим проверка point. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Целое | Максимальное количество событий в пакете. Требуется для режима пакетного потребителя. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Длительность | Максимальная длительность использования пакетной службы. Вступают в силу только в том случае, если включен режим пакетного потребителя и является необязательным. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.update-interval | Длительность | Длительность интервала обновления. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.strategy | LoadBalancingStrategy | Стратегия балансировки нагрузки. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.partition-ownership-expiration-interval | Длительность | Срок действия, после которого истекает срок владения секцией. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Логический | Должен ли обработчик событий запрашивать сведения о последнем заквеченном событии в связанной секции и отслеживать эти сведения по мере получения событий. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Целое | Число, используемое потребителем для управления числом событий, которые потребитель Концентратора событий будет активно получать и очереди локально. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Сопоставление с ключом в качестве идентификатора секции и значений StartPositionProperties |
Карта, содержащая позицию события, используемую для каждой секции, если точка проверка для секции не существует в хранилище проверка point. Эта карта будет ключом от идентификатора секции. |
Примечание.
Конфигурация initial-partition-event-position
принимает начальную map
позицию для каждого концентратора событий. Таким образом, его ключом является идентификатор секции, а значение состоит из StartPositionProperties
свойств смещения, порядкового номера, заквеченного времени даты и того, является ли включено. Например, его можно задать как
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Расширенная конфигурация потребителя
Приведенное выше подключение, проверка point и общая настройка конфигурации клиента Azure SDK для каждого потребителя привязки, которую можно настроить с префиксомspring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Свойства производителя
Эти свойства предоставляются через EventHubsProducerProperties
.
Настраиваемые свойства spring-cloud-azure-stream-binder-eventhubs:
Свойство | Type | Описание |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | boolean | Флаг переключателя для синхронизации производителя. Если задано значение true, производитель ожидает ответа после операции отправки. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | длинный | Время ожидания ответа после операции отправки. Вступают в силу только в том случае, если производитель синхронизации включен. |
Расширенная конфигурация производителя
Приведенное выше подключение и общая настройка конфигурации клиента Azure SDK для каждого производителя привязки, которую можно настроить с префиксомspring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Базовое использование
Отправка и получение сообщений из центров событий
Заполните параметры конфигурации учетными данными.
Для учетных данных в качестве строка подключения настройте следующие свойства в файле application.yml:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Примечание.
Допустимые значенияtenant-id
: common
, organizations
consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.
Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Определение поставщика и потребителя.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Поддержка секционирования
Для PartitionSupplier
настройки сведений о секции, предоставленных пользователем, создается настройка сведений о секционированиях для отправки сообщения. В следующей блок-схеме показано, как получить различные приоритеты для идентификатора секции и ключа:
Поддержка потребителей пакетной службы
Укажите параметры пакетной конфигурации, как показано в следующем примере:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Определение поставщика и потребителя.
Для режима проверка назначения
BATCH
как можно использовать следующий код для отправки сообщений и использования в пакетах.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Для режима проверка назначения
MANUAL
как можно использовать следующий код для отправки сообщений и использования и проверка point в пакетах.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Примечание.
В пакетном режиме используется application/json
тип содержимого по умолчанию привязки Spring Cloud Stream, поэтому убедитесь, что полезные данные сообщения соответствуют типу контента. Например, при использовании типа application/json
контента по умолчанию для получения сообщений с String
полезными данными полезные данные должны быть JSON String
окружены двойными кавычками исходного String
текста. В то время как для text/plain
типа контента он может быть объектом напрямую String
. Дополнительные сведения см. в разделе "Согласование типов контента Spring Cloud Stream".
Реагирование на сообщения об ошибках
Обработка сообщений об ошибках исходящей привязки
По умолчанию Spring Integration создает глобальный канал
errorChannel
ошибок. Настройте следующую конечную точку сообщения для обработки исходящих сообщений об ошибках привязки:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Обработка сообщений об ошибках входящего привязки
Binder Центров событий Spring Cloud Stream поддерживает два решения для обработки ошибок для привязок входящих сообщений: пользовательских каналов ошибок и обработчиков.
Канал ошибок:
Spring Cloud Stream предоставляет канал ошибок для каждой входящей привязки. Отправляется
ErrorMessage
в канал ошибок. Дополнительные сведения см. в документации по Spring Cloud Stream с обработкой ошибок .Канал ошибок по умолчанию
Глобальный канал ошибок с именем
errorChannel
можно использовать для использования всех входящих сообщений об ошибках привязки. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Канал ошибок, зависящий от привязки
Вы можете использовать определенный канал ошибок для использования определенных сообщений об ошибках входящего трафика с более высоким приоритетом, чем канал ошибок по умолчанию. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Примечание.
Канал ошибок, зависящий от привязки, является взаимоисключающим с другими предоставленными обработчиками ошибок и каналами.
Обработчик ошибок:
Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления
Consumer
экземпляраErrorMessage
, который принимает экземпляры. Дополнительные сведения см . в документации по Spring Cloud Stream об обработке ошибок.Примечание.
При настройке любого обработчика ошибок привязки он может работать с каналом ошибок по умолчанию.
Обработчик ошибок по умолчанию привязки
Настройте одну
Consumer
бобовую строку для использования всех сообщений об ошибках входящего привязки. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать
spring.cloud.stream.default.error-handler-definition
для свойства имя функции.Обработчик ошибок, зависящих от привязки
Consumer
Настройте bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика и имеет более высокий приоритет, чем обработчик ошибок по умолчанию привязки:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
для свойства имя функции.
Заголовки сообщений Центров событий
Основные поддерживаемые заголовки сообщений см. в разделе заголовков сообщений Центров событий Spring Cloud поддержка Azure для Spring Integration.
Поддержка нескольких привязок
Подключение в несколько пространств имен Центров событий также поддерживается с помощью нескольких привязок. В этом примере в качестве примера используется строка подключения. Также поддерживаются учетные данные субъектов-служб и управляемых удостоверений. Связанные свойства можно задать в параметрах среды привязки.
Чтобы использовать несколько привязок с Центрами событий, настройте следующие свойства в файле application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Примечание.
В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать такую конфигурацию, как
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Нам нужно определить двух поставщиков и двух потребителей:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Подготовка ресурсов
Привязка Центров событий поддерживает подготовку концентратора событий и группы потребителей, пользователи могут использовать следующие свойства для включения подготовки.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Примечание.
Допустимые значенияtenant-id
: common
, organizations
consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.
Примеры
Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.
Spring Cloud Stream Binder для Служебной шины Azure
Основные понятия
Привязка Spring Cloud Stream Binder для Служебная шина Azure предоставляет реализацию привязки для Spring Cloud Stream Framework. Эта реализация использует адаптеры канала Spring Integration служебная шина в своей основе.
Запланированное сообщение
Эта привязка поддерживает отправку сообщений в раздел для отложенной обработки. Пользователи могут отправлять запланированные сообщения с заголовком x-delay
в миллисекундах время задержки сообщения. Сообщение будет доставлено в соответствующие разделы после x-delay
миллисекунда.
Группа потребителей
служебная шина тема обеспечивает аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой.
Этот привязчик зависит от Subscription
раздела, который будет выступать в качестве группы потребителей.
Настройка зависимостей
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Кроме того, можно использовать Azure Stream Spring Cloud служебная шина Starter, как показано в следующем примере для Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Настройка
Привязка предоставляет следующие две части параметров конфигурации:
свойства конфигурации Подключение ion
В этом разделе содержатся параметры конфигурации, используемые для подключения к Служебная шина Azure.
Примечание.
Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с помощью идентификатора Microsoft Entra ID , чтобы убедиться, что субъект безопасности предоставлен достаточное разрешение на доступ к ресурсу Azure.
Подключение настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:
Свойство | Type | Описание |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Включена ли Служебная шина Azure. |
spring.cloud.azure.servicebus.connection-string | Строка | служебная шина значение пространства имен строка подключения. |
Spring.cloud.azure.servicebus.namespace | Строка | служебная шина значение пространства имен, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Строка | Доменное имя значения пространства имен Служебная шина Azure. |
Примечание.
Общие параметры конфигурации пакета SDK службы Azure можно настроить для привязки Azure Stream Spring Cloud служебная шина. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azure и могут быть настроены с помощью единого spring.cloud.azure.servicebus.
префикса или префиксаspring.cloud.azure.
.
Привязка также поддерживает Spring Could Azure Resource Manager по умолчанию. Дополнительные сведения о том, как получить строка подключения с субъектами безопасности, которые не предоставляются со Data
связанными ролями, см. в разделе "Базовое использование" Spring Could Azure Resource Manager.
свойства конфигурации привязки Служебная шина Azure
Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.
Свойства потребителя
Эти свойства предоставляются через ServiceBusConsumerProperties
.
Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:
Свойство | Тип | По умолчанию. | Description |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | boolean | false | Если неудачные сообщения перенаправляются в DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Целое | 1 | Максимальное число одновременных сообщений, которые должен обрабатывать клиент обработчика служебная шина. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-session | Целое | null | Максимальное количество одновременных сеансов для обработки в любое время. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Логический | null | Включен ли сеанс. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Целое | 0 | Число предварительных выборок клиента процессора служебная шина. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | ничего | Тип вложенной очереди для подключения. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Длительность | 5 м | Время для продолжения автоматического продления блокировки. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Режим получения клиента процессора служебная шина. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Логический | true | Следует ли автоматически урегулировать сообщения. Если задано значение false, заголовок Checkpointer сообщения будет добавлен, чтобы разработчики могли вручную отрегулировать сообщения. |
Расширенная конфигурация потребителя
Приведенное выше подключение и общая настройка конфигурации клиента Azure SDK для каждого потребителя привязки, которую можно настроить с префиксомspring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Свойства производителя
Эти свойства предоставляются через ServiceBusProducerProperties
.
Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:
Свойство | Тип | По умолчанию. | Description |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | boolean | false | Переключение флага для синхронизации производителя. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | длинный | 10000 | Значение времени ожидания для отправки производителя. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | null | служебная шина тип сущности производителя, необходимый для производителя привязки. |
Важно!
При использовании производителя привязки spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
необходимо настроить свойство.
Расширенная конфигурация производителя
Приведенное выше подключение и общая настройка конфигурации клиента Azure SDK для каждого производителя привязки, которую можно настроить с префиксомspring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Базовое использование
Отправка и получение сообщений из служебная шина
Заполните параметры конфигурации учетными данными.
Для учетных данных в качестве строка подключения настройте следующие свойства в файле application.yml:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Примечание.
Допустимые значенияtenant-id
: common
, organizations
consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.
Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Определение поставщика и потребителя.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Поддержка ключа секции
Привязка поддерживает секционирование служебная шина путем задания ключа секции и идентификатора сеанса в заголовке сообщения. В этом разделе описывается настройка ключа секции для сообщений.
Spring Cloud Stream предоставляет свойство spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
выражения выражения spEL ключа раздела. Например, присвойите этому свойству "'partitionKey-' + headers[<message-header-key>]"
значение и добавьте заголовок с именем message-header-key. Spring Cloud Stream использует значение этого заголовка при оценке выражения для назначения ключа секции. В следующем коде представлен пример производителя:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Поддержка сеансов
Привязка поддерживает сеансы сообщений служебная шина. Идентификатор сеанса сообщения можно задать с помощью заголовка сообщения.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Примечание.
Согласно служебная шина секционирование, идентификатор сеанса имеет более высокий приоритет, чем ключ секции. Поэтому при установке обоих ServiceBusMessageHeaders#SESSION_ID
заголовков ServiceBusMessageHeaders#PARTITION_KEY
значение идентификатора сеанса в конечном итоге используется для перезаписи значения ключа секции.
Реагирование на сообщения об ошибках
Обработка сообщений об ошибках исходящей привязки
По умолчанию Spring Integration создает глобальный канал
errorChannel
ошибок. Настройте следующую конечную точку сообщения для обработки сообщения об ошибке исходящей привязки.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Обработка сообщений об ошибках входящего привязки
Spring Cloud Stream служебная шина Binder поддерживает три решения для обработки ошибок для привязок входящих сообщений: обработчик ошибок привязки, настраиваемых каналов ошибок и обработчиков.
Обработчик ошибок привязки:
Обработчик ошибок привязки по умолчанию обрабатывает входящий привязку. Этот обработчик используется для отправки неудачных сообщений в очередь недоставленных писем при
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
включении. В противном случае не удалось отказаться от сообщений. За исключением настройки канала ошибок, зависящем от привязки, обработчик ошибок привязки всегда действует независимо от того, существуют ли другие пользовательские обработчики ошибок или каналы.Канал ошибок:
Spring Cloud Stream предоставляет канал ошибок для каждой входящей привязки. Отправляется
ErrorMessage
в канал ошибок. Дополнительные сведения см. в документации по Spring Cloud Stream с обработкой ошибок .Канал ошибок по умолчанию
Глобальный канал ошибок с именем
errorChannel
можно использовать для использования всех входящих сообщений об ошибках привязки. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Канал ошибок, зависящий от привязки
Вы можете использовать определенный канал ошибок для использования определенных сообщений об ошибках входящего трафика с более высоким приоритетом, чем канал ошибок по умолчанию. Чтобы обработать эти сообщения, настройте следующую конечную точку сообщения:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Примечание.
Канал ошибок, зависящий от привязки, является взаимоисключающим с другими предоставленными обработчиками ошибок и каналами.
Обработчик ошибок:
Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления
Consumer
экземпляраErrorMessage
, который принимает экземпляры. Дополнительные сведения см . в документации по Spring Cloud Stream об обработке ошибок.Примечание.
При настройке любого обработчика ошибок привязки он может работать с каналом ошибок по умолчанию и обработчиком ошибок привязки.
Обработчик ошибок по умолчанию привязки
Настройте одну
Consumer
бобовую строку для использования всех сообщений об ошибках входящего привязки. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать
spring.cloud.stream.default.error-handler-definition
для свойства имя функции.Обработчик ошибок, зависящих от привязки
Consumer
Настройте bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика с более высоким приоритетом, чем обработчик ошибок по умолчанию привязки.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
для свойства имя функции.
заголовки сообщений служебная шина
Основные поддерживаемые заголовки сообщений см. в разделе заголовков сообщений служебная шина Spring Cloud поддержка Azure для Spring Integration.
Примечание.
При настройке ключа секции приоритет заголовка сообщения выше свойства Spring Cloud Stream. Поэтому spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
вступает в силу только в том случае, если ни один из ServiceBusMessageHeaders#SESSION_ID
ServiceBusMessageHeaders#PARTITION_KEY
заголовков не настроен.
Поддержка нескольких привязок
Подключение в несколько пространств имен служебная шина также поддерживается с помощью нескольких привязок. Этот пример принимает строка подключения в качестве примера. Учетные данные субъектов-служб и управляемых удостоверений также поддерживаются, пользователи могут задавать связанные свойства в параметрах среды каждого привязки.
Чтобы использовать несколько привязок ServiceBus, настройте следующие свойства в файле application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Примечание.
В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать такую конфигурацию, как
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.нам нужно определить двух поставщиков и двух потребителей
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Подготовка ресурсов
Привязка служебной шины поддерживает подготовку очередей, раздела и подписки, пользователи могут использовать следующие свойства для включения подготовки.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Примечание.
Допустимые значенияtenant-id
: common
, organizations
consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе "Использована неправильная конечная точка (личные и учетные записи организации) в разделе "Ошибка" AADSTS50020 . Учетная запись пользователя от поставщика удостоверений не существует в клиенте. Сведения о преобразовании приложения с одним клиентом см. в разделе "Преобразование однотенантного приложения в мультитенантный" в идентификатор Microsoft Entra.
Примеры
Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по