Use Event Hubs in Spring applications
This article shows you how to use Azure Event Hubs in Java applications built with the Spring Framework.
Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.
Spring Cloud Azure provides various modules for sending messages to and receiving messages from Event Hubs using Spring frameworks.
You can use the following modules independently or combine them for different use cases:
Spring Cloud Azure Event Hubs Starter enables you to send and receive messages with Event Hubs Java SDK client library with Spring Boot features.
Spring Messaging Azure Event Hubs enables you to interact with Event Hubs via Spring Messaging API.
Spring Integration Azure Event Hubs enables you to connect Spring Integration Message Channels with Event Hubs.
Spring Cloud Azure Stream Event Hubs Binder enables you to use Event Hubs as a messaging middleware in Spring Cloud Stream applications.
Spring Kafka with Azure Event Hubs enables you to use Spring Kafka to send messages to and receive messages from Event Hubs.
Spring Cloud Stream Kafka Binder with Azure Event Hubs enables you to send and receive message via Spring Cloud Stream Kafka Binder with Event Hubs.
Prerequisites
An Azure subscription - create one for free.
Java Development Kit (JDK), version 8 or higher.
An Azure Event Hubs instance. For more information, see Quickstart: Create an event hub using Azure portal.
An Azure Storage Account for Event Hubs checkpoints. For more information, see Create a storage account.
A Spring Boot application. If you don't have one, create a Maven project with the Spring Initializr. Remember to select Maven Project and, under Dependencies, add the Spring Web dependency, then select Java version 8 or higher.
Note
To grant your account access to resources, in Azure Event Hubs, assign the Azure Event Hubs Data Receiver
and Azure Event Hubs Data Sender
role to the Microsoft Entra account you're currently using. Then, in the Azure Storage account, assign the Storage Blob Data Contributor
role to the Microsoft Entra account you're currently using. For more information about granting access roles, see Assign Azure roles using the Azure portal and Authorize access to Event Hubs resources using Microsoft Entra ID.
Important
Spring Boot version 2.5 or higher is required to complete the steps in this tutorial.
Prepare your local environment
In this tutorial, the configurations and code don't have any authentication operations. However, connecting to an Azure service requires authentication. To complete the authentication, you need to use the Azure Identity client library. Spring Cloud Azure uses DefaultAzureCredential
, which the Azure Identity library provides to help you get credentials without any code changes.
DefaultAzureCredential
supports multiple authentication methods and determines which method to use at runtime. This approach enables your app to use different authentication methods in different environments - such as local or production environments - without implementing environment-specific code. For more information, see the DefaultAzureCredential section of Authenticate Azure-hosted Java applications.
To use Azure CLI, IntelliJ, or other methods to complete the authentication in local development environments, see Azure authentication in Java development environments. To complete the authentication in Azure hosting environments, we recommend using managed identity. For more information, see What are managed identities for Azure resources?
Use Spring Cloud Azure Event Hubs Starter
The Spring Cloud Azure Event Hubs Starter module imports the Event Hubs Java client library with the Spring Boot framework. You can use Spring Cloud Azure and the Azure SDK together, in a non-mutually exclusive pattern. Thus, you can continue using the Event Hubs Java client API in your Spring application.
Add dependencies
To install the Spring Cloud Azure Event Hubs Starter module, add the following dependencies to your pom.xml file:
The Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
If you're using Spring Boot 2.x, be sure to set the
spring-cloud-azure-dependencies
version to4.17.0
. This Bill of Material (BOM) should be configured in the<dependencyManagement>
section of your pom.xml file. This ensures that all Spring Cloud Azure dependencies are using the same version. For more information about the version used for this BOM, see Which Version of Spring Cloud Azure Should I Use.The Spring Cloud Azure Event Hubs artifact:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-eventhubs</artifactId> </dependency>
Code the application to send and receive messages
This guide teaches you how to use the Event Hubs Java clients in the context of a Spring application. Here, we introduce the following two options:
- Use Spring Boot autoconfiguration and use out-of-the-box clients from the Spring context (recommended).
- Build the client programmatically.
The way of autowiring client beans from the Spring IoC container has the following advantages, which can provide you with a more flexible and efficient experience when developing with Event Hubs clients:
- It applies the externalized configuration so that you can work with the same application code in different environments.
- You can delegate to the Spring Boot framework the process of learning the builder pattern and registering this client to the application context. This delegation enables you to focus on how to use the clients with your own business requirement.
- You can use health indicator in an easy way to inspect the status and health of your application and internal components.
The following sections provide code examples that show you how to use EventProcessorClient
and EventHubProducerClient
with the two alternatives.
Note
Azure Java SDK for Event Hubs provides multiple clients to interact with Event Hubs. The starter also provides autoconfiguration for all the Event Hubs clients as well as client builders. This article uses only EventProcessorClient
and EventHubProducerClient
as examples.
Use Spring Boot Autoconfiguration
To send messages to and receive messages from Event Hubs, configure the application by using the following steps:
Use the following property settings to configure your Event Hubs namespace and event hub name:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.event-hub-name=<your-event-hub-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name> spring.cloud.azure.eventhubs.processor.consumer-group=$Default
Create a new
EventHubProcessorClientConfiguration
Java class as shown in the following example. This class is used to register the message and error handler forEventProcessorClient
.import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler; import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EventHubProcessorClientConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubProcessorClientConfiguration.class); @Bean EventHubsRecordMessageListener processEvent() { return eventContext->LOGGER.info("Processing event from partition {} with sequence number {} with body: {}", eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString()); } @Bean EventHubsErrorHandler processError() { return errorContext->LOGGER.info("Error occurred in partition processor for partition {}, {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); } }
Inject the
EventProcessorClient
andEventHubProducerClient
in your Spring application, and call the related APIs to send and receive messages, as shown in the following example:import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Collections; import java.util.concurrent.TimeUnit; @SpringBootApplication public class EventHubClientApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class); private final EventHubProducerClient eventHubProducerClient; private final EventProcessorClient eventProcessorClient; public EventHubClientApplication(EventHubProducerClient eventHubProducerClient, EventProcessorClient eventProcessorClient) { this.eventHubProducerClient = eventHubProducerClient; this.eventProcessorClient = eventProcessorClient; } public static void main(String[] args) { SpringApplication.run(EventHubClientApplication.class, args); } @Override public void run(String... args) throws Exception { eventProcessorClient.start(); // Wait for the processor client to be ready TimeUnit.SECONDS.sleep(10); eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World"))); LOGGER.info("Successfully sent a message to Event Hubs."); eventHubProducerClient.close(); LOGGER.info("Stopping and closing the processor"); eventProcessorClient.stop(); } }
Start the application. You're shown logs similar to the following example:
Successfully sent a message to Event Hubs. ... Processing event from partition 0 with sequence number 0 with body: Hello World ... Stopping and closing the processor.
Build the client programmatically
You can build the client beans by yourself, but the process is complicated. In Spring Boot applications, you have to manage properties, learn the builder pattern, and register the client to your Spring application context. The following steps show you how to do that:
Create a new
EventHubClientConfiguration
Java class as shown in the following example. This class is used to declare theEventProcessorClient
andEventHubProducerClient
beans. Be sure to replace the<your event-hubs-namespace>
,<your-event-hub-name>
,<your-storage-account-name>
, and<your-storage-account-container-name>
placeholders with your actual values.import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EventHubClientConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientConfiguration.class); private static final String EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<your event-hubs-namespace>.servicebus.windows.net"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private static final String STORAGE_ACCOUNT_ENDPOINT = "https://<your-storage-account-name>.blob.core.windows.net"; private static final String STORAGE_CONTAINER_NAME = "<your-storage-account-container-name>"; @Bean EventHubClientBuilder eventHubClientBuilder() { return new EventHubClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME, new DefaultAzureCredentialBuilder() .build()); } @Bean BlobContainerClientBuilder blobContainerClientBuilder() { return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder() .build()) .endpoint(STORAGE_ACCOUNT_ENDPOINT) .containerName(STORAGE_CONTAINER_NAME); } @Bean BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) { return blobContainerClientBuilder.buildAsyncClient(); } @Bean EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) { return new EventProcessorClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME, new DefaultAzureCredentialBuilder() .build()) .consumerGroup(CONSUMER_GROUP) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .processEvent(EventHubClientConfiguration::processEvent) .processError(EventHubClientConfiguration::processError); } @Bean EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) { return eventHubClientBuilder.buildProducerClient(); } @Bean EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) { return eventProcessorClientBuilder.buildEventProcessorClient(); } public static void processEvent(EventContext eventContext) { LOGGER.info("Processing event from partition {} with sequence number {} with body: {}", eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString()); } public static void processError(ErrorContext errorContext) { LOGGER.info("Error occurred in partition processor for partition {}, {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); } }
Inject the
EventProcessorClient
andEventHubProducerClient
in your Spring application, as shown in the following example:import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventProcessorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Collections; import java.util.concurrent.TimeUnit; @SpringBootApplication public class EventHubClientApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class); private final EventHubProducerClient eventHubProducerClient; private final EventProcessorClient eventProcessorClient; public EventHubClientApplication(EventHubProducerClient eventHubProducerClient, EventProcessorClient eventProcessorClient) { this.eventHubProducerClient = eventHubProducerClient; this.eventProcessorClient = eventProcessorClient; } public static void main(String[] args) { SpringApplication.run(EventHubClientApplication.class, args); } @Override public void run(String... args) throws Exception { eventProcessorClient.start(); // Wait for the processor client to be ready TimeUnit.SECONDS.sleep(10); eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World"))); LOGGER.info("Successfully sent a message to Event Hubs."); eventHubProducerClient.close(); LOGGER.info("Stopping and closing the processor"); eventProcessorClient.stop(); } }
Start the application. You're shown logs similar to the following example:
Successfully sent a message to Event Hubs. ... Processing event from partition 0 with sequence number 0 with body: Hello World ... Stopping and closing the processor.
The following list shows some reasons why this code isn't flexible or graceful:
- The Event Hubs namespace and event hub name are hard coded.
- If you use
@Value
to get configurations from the Spring environment, you can't have IDE hints in your application.properties file. - If you have a microservice scenario, you must duplicate the code in each project, and it's easy to make mistakes and hard to be consistent.
Fortunately, building the client beans by yourself isn't necessary with Spring Cloud Azure. Instead, you can directly inject them and use the configuration properties that you're already familiar with to configure Storage queue. For more information, see Spring Cloud Azure configuration.
Spring Cloud Azure also provides the following global configurations for different scenarios. For more information, see the Global configuration for Azure Service SDKs section of the Spring Cloud Azure configuration.
- Proxy options.
- Retry options.
- AMQP transport client options.
You can also connect to different Azure clouds. For more information, see Connect to different Azure clouds.
Use Spring Messaging Azure Event Hubs
The Spring Messaging Azure Event Hubs module provides support for Spring Messaging framework with Event Hubs.
If you're using Spring Messaging Azure Event Hubs, then you can use the following features:
EventHubsTemplate
: Send messages to an Event Hubs asynchronously and synchronously.@EventHubsListener
: Mark a method to be the target of an Event Hubs message listener on the destination.
This guide shows you how to use Spring Messaging Azure Event Hubs to send messages to and receive messages from Event Hubs.
Add dependencies
To install the Spring Messaging Azure Event Hubs module, add the following dependencies to your pom.xml file:
The Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
If you're using Spring Boot 2.x, be sure to set the
spring-cloud-azure-dependencies
version to4.17.0
. This Bill of Material (BOM) should be configured in the<dependencyManagement>
section of your pom.xml file. This ensures that all Spring Cloud Azure dependencies are using the same version. For more information about the version used for this BOM, see Which Version of Spring Cloud Azure Should I Use.The Spring Cloud Azure starter, Spring Messaging Event Hubs and Azure Event Hubs Checkpoint Store artifacts:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-messaging-azure-eventhubs</artifactId> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId> </dependency>
Code the application to send and receive messages
To send messages to and receive messages from Event Hubs, configure the application by using the following steps:
Use the following property settings to configure the Event Hubs namespace and Storage Blob:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
Add spring.factories for the autoconfiguration for
@EventHubsListener
. You need to create a directory called META-INF under the application's resource directory. Then, create a file named spring.factories under META-INF and add the following content:org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.azure.spring.cloud.autoconfigure.messaging.AzureMessagingListenerAutoConfiguration
Create a new
ConsumerService
Java class as shown in the following example. This class is used to define a message receiver. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.import com.azure.spring.messaging.eventhubs.implementation.core.annotation.EventHubsListener; import org.springframework.stereotype.Service; @Service public class ConsumerService { private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$DEFAULT"; @EventHubsListener(destination = EVENT_HUB_NAME, group = CONSUMER_GROUP) public void handleMessageFromEventHub(String message) { System.out.printf("New message received: %s%n", message); } }
Wire up a sender and a receiver to send and receive messages with Spring, as shown in the following example. Be sure to replace the
<your-event-hub-name>
placeholder with your actual value.import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.messaging.support.MessageBuilder; @SpringBootApplication @EnableAzureMessaging public class EventHubMessagingApplication { private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final Logger LOGGER = LoggerFactory.getLogger(EventHubMessagingApplication.class); public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubMessagingApplication.class); EventHubsTemplate eventHubsTemplate = applicationContext.getBean(EventHubsTemplate.class); LOGGER.info("Sending a message to the Event Hubs."); eventHubsTemplate.sendAsync(EVENT_HUB_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe(); } }
Tip
Remember to add the
@EnableAzureMessaging
annotation, which triggers the discovery of methods annotated with@EventHubsListener
, creating the message listener container under the covers.Start the application. You're shown logs similar to the following example:
Sending a message to the Event Hubs. New message received: Hello world
Use Spring Integration Azure Event Hubs
The Spring Integration Azure Event Hubs module provides support for the Spring Integration framework with Event Hubs.
If your Spring application uses Spring Integration message channels, you can route messages between your message channels and Event Hubs using channel adapters.
An inbound channel adapter forwards messages from an event hub to a message channel. An outbound channel adapter publishes messages from a message channel to an event hub.
This guide shows you how to use Spring Integration Azure Event Hubs to send messages to and receive messages from Event Hubs.
Add dependencies
To install the Spring Cloud Azure Event Hubs Integration Starter module, add the following dependencies to your pom.xml file:
The Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
If you're using Spring Boot 2.x, be sure to set the
spring-cloud-azure-dependencies
version to4.17.0
. This Bill of Material (BOM) should be configured in the<dependencyManagement>
section of your pom.xml file. This ensures that all Spring Cloud Azure dependencies are using the same version. For more information about the version used for this BOM, see Which Version of Spring Cloud Azure Should I Use.The Spring Cloud Azure Event Hubs Integration artifact:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId> </dependency>
Code the application to send and receive messages
To send messages to and receive messages from Event Hubs, configure the application by using the following steps:
Use the following property settings to configure the Event Hubs namespace and Storage Blob:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
Create a new
MessageReceiveConfiguration
Java class as shown in the following example. This class is used to define a message receiver. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory; import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.messaging.MessageChannel; @Configuration public class MessageReceiveConfiguration { private static final String INPUT_CHANNEL = "input"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiveConfiguration.class); @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload) { String message = new String(payload); LOGGER.info("New message received: {}", message); } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENT_HUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } @Bean public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public MessageChannel input() { return new DirectChannel(); } }
Create a new
MessageSendConfiguration
Java class as shown in the following example. This class is used to define a message sender. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.import com.azure.spring.integration.core.handler.DefaultMessageHandler; import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.MessageHandler; import org.springframework.util.concurrent.ListenableFutureCallback; @Configuration public class MessageSendConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(MessageSendConfiguration.class); private static final String OUTPUT_CHANNEL = "output"; private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENT_HUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Wire up a sender and a receiver to send and receive messages with Spring, as shown in the following example:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Configuration; import org.springframework.integration.config.EnableIntegration; @SpringBootApplication @EnableIntegration @Configuration(proxyBeanMethods = false) public class EventHubIntegrationApplication { public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubIntegrationApplication.class, args); MessageSendConfiguration.EventHubOutboundGateway outboundGateway = applicationContext.getBean(MessageSendConfiguration.EventHubOutboundGateway.class); outboundGateway.send("Hello World"); } }
Tip
Remember to add the
@EnableIntegration
annotation, which enables the Spring Integration infrastructure.Start the application. You're shown logs similar to the following example:
Message was sent successfully. New message received: Hello World
Use Spring Cloud Azure Stream Event Hubs Binder
To call the Event Hubs API in a Spring Cloud Stream application, use the Spring Cloud Azure Event Hubs Stream Binder module.
This guide shows you how to use Spring Cloud Stream Event Hubs Binder to send messages to and receive messages from Event Hubs.
Add dependencies
To install the Spring Cloud Azure Event Hubs Stream Binder module, add the following dependencies to your pom.xml file:
The Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
If you're using Spring Boot 2.x, be sure to set the
spring-cloud-azure-dependencies
version to4.17.0
. This Bill of Material (BOM) should be configured in the<dependencyManagement>
section of your pom.xml file. This ensures that all Spring Cloud Azure dependencies are using the same version. For more information about the version used for this BOM, see Which Version of Spring Cloud Azure Should I Use.The Spring Cloud Azure Event Hubs Stream Binder artifact:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
Code the application to send and receive messages
To send messages to and receive messages from Event Hubs, configure the application by using the following steps:
Use the following property settings to configure the Event Hubs namespace and Storage Blob:
spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace> spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name> spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
Create the message receiver.
To use your application as an event sink, configure the input binder by completing the following tasks:
Declare a
Consumer
bean that defines message handling logic. For example, the followingConsumer
bean is namedconsume
:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: %s%n", message.getPayload()); }; }
Add the following configuration to specify the
Event Hub
name for consuming. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.# name for the above `Consumer` bean spring.cloud.stream.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name> spring.cloud.stream.bindings.consume-in-0.group=$Default spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
Create the message sender.
To use your application as an event source, configure the output binder by completing the following tasks:
Define a
Supplier
bean that defines where messages come from within your application, as shown in the following example:@Bean public Supplier<Message<String>> supply() { return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }
Add the following configuration to specify the
Event Hub
name for sending. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.# "consume" is added from the above step spring.cloud.stream.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
Start the application. You're shown logs similar to the following example:
Sending a message. New message received: Hello world.
Use Spring Kafka with Azure Event Hubs
Event Hubs provides a Kafka endpoint that your existing Kafka based applications can use. This approach provides an alternative to running your own Kafka cluster. Event Hubs works with many of your existing Kafka applications. For more information, see Event Hubs for Apache Kafka.
This guide shows you how to use Azure Event Hubs and Spring Kafka to send messages to and receive messages from Event Hubs.
Add dependencies
To install the Spring Cloud Azure starter and Spring Kafka modules, adding the following dependencies to your pom.xml file:
The Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
If you're using Spring Boot 2.x, be sure to set the
spring-cloud-azure-dependencies
version to4.17.0
. This Bill of Material (BOM) should be configured in the<dependencyManagement>
section of your pom.xml file. This ensures that all Spring Cloud Azure dependencies are using the same version. For more information about the version used for this BOM, see Which Version of Spring Cloud Azure Should I Use.The Spring Cloud Azure starter and Spring Kafka artifact:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Code the application to send and receive messages
To send messages to and receive messages from Event Hubs, configure the application by using the following steps:
Use the following property setting to configure the Event Hubs namespace:
spring.kafka.bootstrap-servers=<your event-hubs-namespace>.servicebus.windows.net:9093
Use
KafkaTemplate
to send messages and@KafkaListener
to receive messages, as shown in the following example. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; @SpringBootApplication public class EventHubKafkaApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaApplication.class); private static final String EVENT_HUB_NAME = "<your-event-hub-name>"; private static final String CONSUMER_GROUP = "$Default"; private final KafkaTemplate<String, String> kafkaTemplate; public EventHubKafkaApplication(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public static void main(String[] args) { SpringApplication.run(EventHubKafkaApplication.class, args); } @Override public void run(String... args) { kafkaTemplate.send(EVENT_HUB_NAME, "Hello World"); LOGGER.info("Message was sent successfully."); } @KafkaListener(topics = EVENT_HUB_NAME, groupId = CONSUMER_GROUP) public void receive(String message) { LOGGER.info("New message received: {}", message); } }
Start the application. You're shown logs similar to the following example:
Message was sent successfully. New message received: Hello world
Use Spring Cloud Stream Kafka Binder with Azure Event Hubs
Spring Cloud Stream is a framework that enables application developers to write message-driven microservices. The bridge between a messaging system and Spring Cloud Stream is through the binder abstraction. Binders exist for several messaging systems, but one of the most commonly used binders is for Apache Kafka.
This guide shows you how to use Azure Event Hubs and Spring Cloud Stream Kafka Binder to send messages to and receive messages from Event Hubs.
Add dependencies
To install the Spring Cloud Azure starter and Spring Cloud Stream binder Kafka modules, adding the following dependencies to your pom.xml file:
The Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
If you're using Spring Boot 2.x, be sure to set the
spring-cloud-azure-dependencies
version to4.17.0
. This Bill of Material (BOM) should be configured in the<dependencyManagement>
section of your pom.xml file. This ensures that all Spring Cloud Azure dependencies are using the same version. For more information about the version used for this BOM, see Which Version of Spring Cloud Azure Should I Use.The Spring Cloud Azure starter artifact:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
Code the application to send and receive messages
To send messages to and receive messages from Event Hubs, configure the application by using the following steps:
Use the following property setting to configure the Kafka broker:
spring.cloud.stream.kafka.binder.brokers=<your event-hubs-namespace>.servicebus.windows.net:9093
Create the message receiver.
To use your application as an event sink, configure the input binder by completing the following tasks:
Declare a
Consumer
bean that defines message handling logic. For example, the followingConsumer
bean is namedconsume
:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: %s%n", message.getPayload()); }; }
Add the following configuration to specify the
Event Hub
name for consuming. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.# name for the above `Consumer` bean spring.cloud.stream.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name> spring.cloud.stream.bindings.consume-in-0.group=$Default
Create the message sender.
To use your application as an event source, configure the output binder by completing the following tasks:
Define a
Supplier
bean that defines where messages come from within your application, as shown in the following example:@Bean public Supplier<Message<String>> supply() { return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }
Add the following configuration to specify the
Event Hub
name for sending. Be sure to replace the<your-event-hub-name>
placeholder with your actual value.# "consume" is added from the above step spring.cloud.stream.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
Start the application. You're shown logs similar to the following example:
Sending a message. New message received: Hello world.
Deploy to Azure Spring Apps
Now that you have the Spring Boot application running locally, it's time to move it to production. Azure Spring Apps makes it easy to deploy Spring Boot applications to Azure without any code changes. The service manages the infrastructure of Spring applications so developers can focus on their code. Azure Spring Apps provides lifecycle management using comprehensive monitoring and diagnostics, configuration management, service discovery, CI/CD integration, blue-green deployments, and more. To deploy your application to Azure Spring Apps, see Deploy your first application to Azure Spring Apps.
Next steps
See also
For more information about the other Spring Boot Starters that are available for Microsoft Azure, see What is Spring Cloud Azure?
Feedback
https://aka.ms/ContentUserFeedback.
Coming soon: Throughout 2024 we will be phasing out GitHub Issues as the feedback mechanism for content and replacing it with a new feedback system. For more information see:Submit and view feedback for