Spring Cloud Azure support for Spring Cloud Stream
Članak
This article applies to: ✅ Version 4.19.0 ✅ Version 5.20.1
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices. These best practices include support for persistent pub/sub semantics, consumer groups, and stateful partitions.
The Spring Cloud Stream Binder for Azure Event Hubs provides the binding implementation for the Spring Cloud Stream framework.
This implementation uses Spring Integration Event Hubs Channel Adapters at its foundation. From design's perspective,
Event Hubs is similar as Kafka. Also, Event Hubs could be accessed via Kafka API. If your project has tight dependency
on Kafka API, you can try Events Hub with Kafka API Sample
Consumer group
Event Hubs provides similar support of consumer group as Apache Kafka, but with slight different logic. While Kafka stores all committed offsets in the broker, you have to store offsets of Event Hubs messages being processed manually. Event Hubs SDK provides the function to store such offsets inside Azure Storage.
Partitioning support
Event Hubs provides a similar concept of physical partition as Kafka. But unlike Kafka's auto rebalancing between consumers and partitions, Event Hubs provides a kind of preemptive mode. The storage account acts as a lease to determine which consumer owns which partition. When a new consumer starts, it tries to steal some partitions from the most heavily loaded consumers to achieve the workload balance.
To specify the load balancing strategy, properties of spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* are provided. For more information, see the Consumer properties section.
To work with the batch-consumer mode, set the spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode property to true. When enabled, a message with a payload of a list of batched events is received and passed to the Consumer function. Each message header is also converted to a list, of which the content is the associated header value parsed from each event. The communal headers of partition ID, checkpointer, and last enqueued properties are presented as a single value because the entire batch of events shares the same value. For more information, see the Event Hubs message headers section of Spring Cloud Azure support for Spring Integration.
Napomena
The checkpoint header only exists when the MANUAL checkpoint mode is used.
Checkpointing of batch consumer supports two modes: BATCH and MANUAL. BATCH mode is an auto checkpointing mode to checkpoint the entire batch of events together once the binder receives them. MANUAL mode is to checkpoint the events by users. When used, the Checkpointer is passed into the message header, and users could use it to do checkpointing.
You can specify the batch size by setting the max-size and max-wait-time properties that have a prefix of spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. The max-size property is necessary and the max-wait-time property is optional. For more information, see the Consumer properties section.
The binder provides the following three parts of configuration options:
Connection configuration properties
This section contains the configuration options used for connecting to Azure Event Hubs.
Napomena
If you choose to use a security principal to authenticate and authorize with Microsoft Entra ID for accessing an Azure resource, see Authorize access with Microsoft Entra ID to make sure the security principal has been granted the sufficient permission to access the Azure resource.
Connection configurable properties of spring-cloud-azure-stream-binder-eventhubs:
Property
Type
Description
spring.cloud.azure.eventhubs.enabled
boolean
Whether an Azure Event Hubs is enabled.
spring.cloud.azure.eventhubs.connection-string
String
Event Hubs Namespace connection string value.
spring.cloud.azure.eventhubs.namespace
String
Event Hubs Namespace value, which is the prefix of the FQDN. A FQDN should be composed of NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name
String
Domain name of an Azure Event Hubs Namespace value.
Common Azure Service SDK configuration options are configurable for the Spring Cloud Azure Stream Event Hubs binder as well. The supported configuration options are introduced in Spring Cloud Azure configuration, and could be configured with either the unified prefix spring.cloud.azure. or the prefix of spring.cloud.azure.eventhubs..
This section contains the configuration options for the Storage Blobs service, which is used for persisting partition ownership and checkpoint information.
Napomena
From version 4.0.0, when the property of spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists isn't enabled manually, no Storage container will be created automatically with the name from spring.cloud.stream.bindings.binding-name.destination.
Checkpointing configurable properties of spring-cloud-azure-stream-binder-eventhubs:
Common Azure Service SDK configuration options are configurable for Storage Blob checkpoint store as well. The supported configuration options are introduced in Spring Cloud Azure configuration, and could be configured with either the unified prefix spring.cloud.azure. or the prefix of spring.cloud.azure.eventhubs.processor.checkpoint-store.
Azure Event Hubs Binding configuration properties
The following options are divided into four sections: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.
Consumer properties
These properties are exposed via EventHubsConsumerProperties.
Napomena
To avoid repetition, since version 4.19.0 and 5.20.1, Spring Cloud Azure Stream Binder Event Hubs supports setting values for all channels, in the format of spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.
Consumer configurable properties of spring-cloud-azure-stream-binder-eventhubs:
Whether the event processor should request information on the last enqueued event on its associated partition, and track that information as events are received.
Map with the key as the partition ID, and values of StartPositionProperties
The map containing the event position to use for each partition if a checkpoint for the partition does not exist in checkpoint store. This map is keyed off of the partition ID.
Napomena
The initial-partition-event-position configuration accepts a map to specify the initial position for each event hub. Thus, its key is the partition ID, and the value is of StartPositionProperties, which includes properties of offset, sequence number, enqueued date time and whether inclusive. For example, you can set it as
The above connection, checkpoint, and common Azure SDK client configuration support customization for each binder consumer, which you can configure with the prefix spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..
Producer properties
These properties are exposed via EventHubsProducerProperties.
Napomena
To avoid repetition, since version 4.19.0 and 5.20.1, Spring Cloud Azure Stream Binder Event Hubs supports setting values for all channels, in the format of spring.cloud.stream.eventhubs.default.producer.<property>=<value>.
Producer configurable properties of spring-cloud-azure-stream-binder-eventhubs:
The amount of time to wait for a response after a send operation. Will take effect only when a sync producer is enabled.
Advanced producer configuration
The above connection and common Azure SDK client configuration support customization for each binder producer, which you can configure with the prefix spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..
Basic usage
Sending and receiving messages from/to Event Hubs
Fill the configuration options with credential information.
For credentials as connection string, configure the following properties in your application.yml file:
Microsoft recommends using the most secure authentication flow available. The authentication flow described in this procedure, such as for databases, caches, messaging, or AI services, requires a very high degree of trust in the application and carries risks not present in other flows. Use this flow only when more secure options, like managed identities for passwordless or keyless connections, are not viable. For local machine operations, prefer user identities for passwordless or keyless connections.
For credentials as service principal, configure the following properties in your application.yml file:
A PartitionSupplier with user-provided partition information is created to configure the partition information about the message to be sent. The following flowchart shows the process of obtaining different priorities for the partition ID and key:
Batch consumer support
Provide the batch configuration options, as shown in the following example:
YAML
spring: cloud: function: definition:consume stream: bindings: consume-in-0: destination:${AZURE_EVENTHUB_NAME} group:${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode:true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size:10# Required for batch-consumer mode max-wait-time:1m# Optional, the default value is null checkpoint: mode:BATCH# or MANUAL as needed
Define supplier and consumer.
For checkpointing mode as BATCH, you can use the following code to send messages and consume in batches.
In the batch-consuming mode, the default content type of Spring Cloud Stream binder is application/json, so make sure the message payload is aligned with the content type. For example, when using the default content type of application/json to receive messages with String payload, the payload should be JSON String, surrounded with double quotes for the original String text. While for text/plain content type, it can be a String object directly. For more information, see Spring Cloud Stream Content Type Negotiation.
By default, Spring Integration creates a global error channel called errorChannel. Configure the following message endpoint to handle outbound binding error messages.
Spring Cloud Stream Event Hubs Binder supports one solution to handle errors for the inbound message bindings: error handlers.
Error Handler:
Spring Cloud Stream exposes a mechanism for you to provide a custom error handler by adding a Consumer that accepts ErrorMessage instances. For more information, see Handle Error Messages in the Spring Cloud Stream documentation.
Binding-default error handler
Configure a single Consumer bean to consume all inbound binding error messages. The following default function subscribes to each inbound binding error channel:
You also need to set the spring.cloud.stream.default.error-handler-definition property to the function name.
Binding-specific error handler
Configure a Consumer bean to consume the specific inbound binding error messages. The following function subscribes to the specific inbound binding error channel and has a higher priority than the binding-default error handler:
You also need to set the spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition property to the function name.
Handle outbound binding error messages
By default, Spring Integration creates a global error channel called errorChannel. Configure the following message endpoint to handle outbound binding error messages:
Spring Cloud Stream Event Hubs Binder supports two solutions to handle errors for the inbound message bindings: custom error channels and handlers.
Error channel:
Spring Cloud Stream provides an error channel for each inbound binding. An ErrorMessage is sent to the error channel. For more information, see Handling Errors in the Spring Cloud Stream documentation.
Default error channel
You can use a global error channel named errorChannel to consume all inbound binding error messages. To handle these messages, configure the following message endpoint:
You can use a specific error channel to consume the specific inbound binding error messages with a higher priority than the default error channel. To handle these messages, configure the following message endpoint:
Java
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination// Replace group with spring.cloud.stream.bindings.<input-binding-name>.group@ServiceActivator(inputChannel = "{destination}.{group}.errors")
publicvoidhandleError(ErrorMessage message){
LOGGER.error("Handling inbound binding error: " + message);
}
Napomena
The binding-specific error channel is mutually exclusive with other provided error handlers and channels.
Error Handler:
Spring Cloud Stream exposes a mechanism for you to provide a custom error handler by adding a Consumer that accepts ErrorMessage instances. For more information, see Error Handling in the Spring Cloud Stream documentation.
Napomena
When any binding error handler is configured, it can work with the default error channel.
Binding-default error handler
Configure a single Consumer bean to consume all inbound binding error messages. The following default function subscribes to each inbound binding error channel:
You also need to set the spring.cloud.stream.default.error-handler-definition property to the function name.
Binding-specific error handler
Configure a Consumer bean to consume the specific inbound binding error messages. The following function subscribes to the specific inbound binding error channel and has a higher priority than the binding-default error handler:
Connection to multiple Event Hubs namespaces is also supported by using multiple binders. This sample takes a connection string as example. Credentials of service principals and managed identities are also supported. You can set related properties in each binder's environment settings.
To use multiple binders with Event Hubs, configure the following properties in your application.yml file:
The previous application file shows how to configure a single default poller for application to all bindings. If you want to configure the poller for a specific binding, you can use a configuration such as spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.
Napomena
Microsoft recommends using the most secure authentication flow available. The authentication flow described in this procedure, such as for databases, caches, messaging, or AI services, requires a very high degree of trust in the application and carries risks not present in other flows. Use this flow only when more secure options, like managed identities for passwordless or keyless connections, are not viable. For local machine operations, prefer user identities for passwordless or keyless connections.
The Spring Cloud Stream Binder for Azure Service Bus provides the binding implementation for the Spring Cloud Stream Framework.
This implementation uses Spring Integration Service Bus Channel Adapters at its foundation.
Scheduled message
This binder supports submitting messages to a topic for delayed processing. Users can send scheduled messages with header x-delay
expressing in milliseconds a delay time for the message. The message will be delivered to the respective topics after x-delay milliseconds.
Consumer group
Service Bus Topic provides similar support of consumer group as Apache Kafka, but with slight different logic.
This binder relies on Subscription of a topic to act as a consumer group.
The binder provides the following two parts of configuration options:
Connection configuration properties
This section contains the configuration options used for connecting to Azure Service Bus.
Napomena
If you choose to use a security principal to authenticate and authorize with Microsoft Entra ID for accessing an Azure resource, see Authorize access with Microsoft Entra ID to make sure the security principal has been granted the sufficient permission to access the Azure resource.
Connection configurable properties of spring-cloud-azure-stream-binder-servicebus:
The custom endpoint address to use when connecting to Service Bus.
spring.cloud.azure.servicebus.namespace
String
Service Bus Namespace value, which is the prefix of the FQDN. A FQDN should be composed of NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name
String
Domain name of an Azure Service Bus Namespace value.
Napomena
Common Azure Service SDK configuration options are configurable for the Spring Cloud Azure Stream Service Bus binder as well. The supported configuration options are introduced in Spring Cloud Azure configuration, and could be configured with either the unified prefix spring.cloud.azure. or the prefix of spring.cloud.azure.servicebus..
Azure Service Bus binding configuration properties
The following options are divided into four sections: Consumer Properties, Advanced Consumer
Configurations, Producer Properties and Advanced Producer Configurations.
Consumer properties
These properties are exposed via ServiceBusConsumerProperties.
Napomena
To avoid repetition, since version 4.19.0 and 5.20.1, Spring Cloud Azure Stream Binder Service Bus supports setting values for all channels, in the format of spring.cloud.stream.servicebus.default.consumer.<property>=<value>.
Consumer configurable properties of spring-cloud-azure-stream-binder-servicebus:
Whether to settle messages automatically. If set as false, a message header of Checkpointer will be added to enable developers to settle messages manually.
The duration after which the message expires, starting from when the message is sent to Service Bus.
Važno
When you use the Azure Resource Manager (ARM), you must configure the spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type property. For more information, see the servicebus-queue-binder-arm sample on GitHub.
Advanced consumer configuration
The above connection and common Azure SDK client configuration support customization for each binder consumer, which you can configure with the prefix spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..
Producer properties
These properties are exposed via ServiceBusProducerProperties.
Napomena
To avoid repetition, since version 4.19.0 and 5.20.1, Spring Cloud Azure Stream Binder Service Bus supports setting values for all channels, in the format of spring.cloud.stream.servicebus.default.producer.<property>=<value>.
Producer configurable properties of spring-cloud-azure-stream-binder-servicebus:
The duration after which the message expires, starting from when the message is sent to Service Bus.
Važno
When using the binding producer, property of spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type is required to be configured.
Advanced producer configuration
The above connection and common Azure SDK client configuration support customization for each binder producer, which you can configure with the prefix spring.cloud.stream.servicebus.bindings.<binding-name>.producer..
Basic usage
Sending and receiving messages from/to Service Bus
Fill the configuration options with credential information.
For credentials as connection string, configure the following properties in your application.yml file:
YAML
spring: cloud: azure: servicebus: connection-string:${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition:consume;supply stream: bindings: consume-in-0: destination:${SERVICEBUS_ENTITY_NAME}# If you use Service Bus Topic, add the following configuration# group: ${SUBSCRIPTION_NAME} supply-out-0: destination:${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete:false supply-out-0: producer: entity-type:queue# set as "topic" if you use Service Bus Topic
Napomena
Microsoft recommends using the most secure authentication flow available. The authentication flow described in this procedure, such as for databases, caches, messaging, or AI services, requires a very high degree of trust in the application and carries risks not present in other flows. Use this flow only when more secure options, like managed identities for passwordless or keyless connections, are not viable. For local machine operations, prefer user identities for passwordless or keyless connections.
For credentials as service principal, configure the following properties in your application.yml file:
YAML
spring: cloud: azure: credential: client-id:${AZURE_CLIENT_ID} client-secret:${AZURE_CLIENT_SECRET} profile: tenant-id:<tenant> servicebus: namespace:${SERVICEBUS_NAMESPACE} function: definition:consume;supply stream: bindings: consume-in-0: destination:${SERVICEBUS_ENTITY_NAME}# If you use Service Bus Topic, add the following configuration# group: ${SUBSCRIPTION_NAME} supply-out-0: destination:${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete:false supply-out-0: producer: entity-type:queue# set as "topic" if you use Service Bus Topic
For credentials as managed identities, configure the following properties in your application.yml file:
YAML
spring: cloud: azure: credential: managed-identity-enabled:true client-id:${MANAGED_IDENTITY_CLIENT_ID}# Only needed when using a user-assigned managed identity servicebus: namespace:${SERVICEBUS_NAMESPACE} function: definition:consume;supply stream: bindings: consume-in-0: destination:${SERVICEBUS_ENTITY_NAME}# If you use Service Bus Topic, add the following configuration# group: ${SUBSCRIPTION_NAME} supply-out-0: destination:${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete:false supply-out-0: producer: entity-type:queue# set as "topic" if you use Service Bus Topic
The binder supports Service Bus partitioning by allowing setting partition key and session ID in the message header. This section introduces how to set partition key for messages.
Spring Cloud Stream provides a partition key SpEL expression property spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. For example, setting this property as "'partitionKey-' + headers[<message-header-key>]" and add a header called message-header-key. Spring Cloud Stream uses the value for this header when evaluating the expression to assign a partition key. The following code provides an example producer:
According to Service Bus partitioning, session ID has higher priority than partition key. So when both of ServiceBusMessageHeaders#SESSION_ID and ServiceBusMessageHeaders#PARTITION_KEY headers are set, the value of the session ID is eventually used to overwrite the value of the partition key.
By default, Spring Integration creates a global error channel called errorChannel. Configure the following message endpoint to handle outbound binding error message.
Spring Cloud Stream Service Bus Binder supports two solutions to handle errors for the inbound message bindings: the binder error handler and handlers.
Binder error handler:
The default binder error handler handles the inbound binding. You use this handler to send failed messages to the dead-letter queue when spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected is enabled. Otherwise, the failed messages are abandoned. The binder error handler is mutually exclusive with other provided error handlers.
Error handler:
Spring Cloud Stream exposes a mechanism for you to provide a custom error handler by adding a Consumer that accepts ErrorMessage instances. For more information, see Handle Error Messages in the Spring Cloud Stream documentation.
Binding-default error handler
Configure a single Consumer bean to consume all inbound binding error messages. The following default function subscribes to each inbound binding error channel:
You also need to set the spring.cloud.stream.default.error-handler-definition property to the function name.
Binding-specific error handler
Configure a Consumer bean to consume the specific inbound binding error messages. The following function subscribes to the specific inbound binding error channel with a higher priority than the binding-default error handler.
You also need to set the spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition property to the function name.
Handle outbound binding error messages
By default, Spring Integration creates a global error channel called errorChannel. Configure the following message endpoint to handle outbound binding error message.
Spring Cloud Stream Service Bus Binder supports three solutions to handle errors for the inbound message bindings: the binder error handler, custom error channels, and handlers.
Binder error handler:
The default binder error handler handles the inbound binding. You use this handler to send failed messages to the dead-letter queue when spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected is enabled. Otherwise, the failed messages are abandoned. Except for configuring the binding-specific error channel, the binder error handler always takes effect regardless of whether there are other custom error handlers or channels.
Error channel:
Spring Cloud Stream provides an error channel for each inbound binding. An ErrorMessage is sent to the error channel. For more information, see Handling Errors in the Spring Cloud Stream documentation.
Default error channel
You can use a global error channel named errorChannel to consume all inbound binding error messages. To handle these messages, configure the following message endpoint:
You can use a specific error channel to consume the specific inbound binding error messages with a higher priority than the default error channel. To handle these messages, configure the following message endpoint:
Java
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination// Replace group with spring.cloud.stream.bindings.<input-binding-name>.group@ServiceActivator(inputChannel = "{destination}.{group}.errors")
publicvoidhandleError(ErrorMessage message){
LOGGER.error("Handling inbound binding error: " + message);
}
Napomena
The binding-specific error channel is mutually exclusive with other provided error handlers and channels.
Error handler:
Spring Cloud Stream exposes a mechanism for you to provide a custom error handler by adding a Consumer that accepts ErrorMessage instances. For more information, see Error Handling in the Spring Cloud Stream documentation.
Napomena
When any binding error handler is configured, it can work with the default error channel and the binder error handler.
Binding-default error handler
Configure a single Consumer bean to consume all inbound binding error messages. The following default function subscribes to each inbound binding error channel:
You also need to set the spring.cloud.stream.default.error-handler-definition property to the function name.
Binding-specific error handler
Configure a Consumer bean to consume the specific inbound binding error messages. The following function subscribes to the specific inbound binding error channel with a higher priority than the binding-default error handler.
When setting the partition key, the priority of message header is higher than Spring Cloud Stream property. So spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression takes effect only when none of the ServiceBusMessageHeaders#SESSION_ID and ServiceBusMessageHeaders#PARTITION_KEY headers are configured.
Multiple binder support
Connection to multiple Service Bus namespaces is also supported by using multiple binders. This sample takes connection string as example. Credentials of service principals and managed identities are also supported, users can set related properties in each binder's environment settings.
To use multiple binders of ServiceBus, configure the following properties in your application.yml file:
The previous application file shows how to configure a single default poller for application to all bindings. If you want to configure the poller for a specific binding, you can use a configuration such as spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.
Napomena
Microsoft recommends using the most secure authentication flow available. The authentication flow described in this procedure, such as for databases, caches, messaging, or AI services, requires a very high degree of trust in the application and carries risks not present in other flows. Use this flow only when more secure options, like managed identities for passwordless or keyless connections, are not viable. For local machine operations, prefer user identities for passwordless or keyless connections.
Developers can use AzureServiceClientBuilderCustomizer to customize Service Bus Client properties. The following example customizes the sessionIdleTimeout property in ServiceBusClientBuilder:
Pridružite se seriji susreta kako biste s kolegama programerima i stručnjacima izgradili skalabilna rješenja umjetne inteligencije temeljena na stvarnim slučajevima upotrebe.