Compartir a través de


Uso de Azure Service Bus en aplicaciones Spring

En este artículo se muestra cómo usar Azure Event Hubs en aplicaciones Java creadas con Spring Framework.

Azure Event Hubs es una plataforma de streaming de macrodatos y un servicio de ingesta de eventos. Puede recibir y procesar millones de eventos por segundo. Los datos enviados a un centro de eventos se pueden transformar y almacenar con cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes y almacenamiento.

Spring Cloud Azure proporciona varios módulos para enviar mensajes a Event Hubs y recibirlos mediante marcos de Spring.

Puede usar los siguientes módulos de forma independiente o combinarlos para distintos casos de uso:

Requisitos previos

Nota:

Para conceder a su cuenta acceso a los recursos, en Azure Event Hubs, asigne el rol Azure Event Hubs Data Receiver y Azure Event Hubs Data Sender a la cuenta de Microsoft Entra que está usando actualmente. A continuación, en la cuenta de Azure Storage, asigne el rol Storage Blob Data Contributor a la cuenta de Microsoft Entra que está usando actualmente. Para más información sobre cómo conceder roles de acceso, consulte Asignación de roles de Azure mediante Azure Portal y Autorización del acceso a recursos de Event Hubs mediante el identificador de Microsoft Entra.

Importante

Se necesita Spring Boot versión 2.5 o superior para completar los pasos descritos en este tutorial.

Preparación de su entorno local

En este tutorial sección, las configuraciones y el código no tienen ninguna operación de autenticación. Sin embargo, la conexión al servicio de Azure requiere autenticación. Para completar la autenticación, debe usar la biblioteca cliente de Azure Identity. Spring Cloud Azure usa DefaultAzureCredential, que la biblioteca de Azure Identity proporciona para ayudarle a obtener credenciales sin cambios en el código.

DefaultAzureCredential admite varios métodos de autenticación y determina qué método se usa en tiempo de ejecución. Este enfoque permite que la aplicación use diferentes métodos de autenticación en distintos entornos, como entornos locales o de producción, sin implementar código específico del entorno. Para más información, consulte la sección DefaultAzureCredential de Autenticación de aplicaciones Java hospedadas en Azure.

Para usar la CLI de Azure, IntelliJ u otros métodos para completar la autenticación en entornos de desarrollo local, consulte Autenticación de Azure en entornos de desarrollo de Java. Para completar la autenticación en entornos de hospedaje de Azure, se recomienda usar la identidad administrada. Para más información, consulte ¿Qué son las identidades administradas para los recursos de Azure?

Uso de Spring Cloud Azure Event Hubs Starter

El módulo Spring Cloud Azure Event Hubs Starter importa la biblioteca cliente de Java de Event Hubs con el marco de Spring Boot. Puede usar Spring Cloud Azure y el SDK de Azure juntos, en un patrón no excluyente mutuamente. Por lo tanto, puede seguir usando la API de cliente de Java de Event Hubs en la aplicación Spring.

Adición de dependencias

Para instalar el módulo Spring Cloud Azure Event Hubs Starter, agregue las siguientes dependencias al archivo depom.xml :

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.20.0. Esta lista de materiales (BOM) debe configurarse en la <dependencyManagement> sección del archivo pom.xml . Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte La versión de Spring Cloud que se debe usar en Azure.

  • El artefacto de Spring Cloud Azure Event Hubs:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-eventhubs</artifactId>
    </dependency>
    

Codifica la aplicación para enviar y recibir mensajes

En esta guía se explica cómo usar los clientes de Java de Event Hubs en el contexto de una aplicación Spring. Aquí presentamos las dos opciones siguientes:

  • Usar la configuración automática de Spring Boot y usar clientes predefinidos desde el contexto de Spring (recomendado).
  • Compilar el cliente mediante programación.

La forma de conexión automática de beans de cliente desde el contenedor de Spring IoC tiene las siguientes ventajas, lo que le proporciona una experiencia más flexible y eficaz al desarrollar con clientes de Event Hubs:

  • Aplica la configuración externa para que puedas trabajar con el mismo código de aplicación en entornos diferentes.
  • Puede delegar al marco de Spring Boot el proceso de aprendizaje del patrón de generador y registrar este cliente en el contexto de la aplicación. Esta delegación le permite centrarse en cómo usar los clientes con sus propios requisitos empresariales.
  • Puede usar el indicador de estado de una manera fácil para inspeccionar el estado y el estado de la aplicación y los componentes internos.

En las secciones siguientes se proporcionan ejemplos de código que muestran cómo usar EventProcessorClient y EventHubProducerClient con las dos alternativas.

Nota:

El SDK de Java de Azure para Event Hubs proporciona varios clientes para interactuar con Event Hubs. El iniciador también proporciona la configuración automática para todos los clientes de Event Hubs, así como para los generadores de clientes. En este artículo se utilizan solo EventProcessorClient y EventHubProducerClient como ejemplos.

Uso de la configuración automática de Spring Boot

Para enviar y recibir mensajes desde Event Hubs, configure la aplicación mediante los pasos siguientes:

  1. Use las siguientes opciones de propiedades para configurar el espacio de nombres de Event Hubs y el nombre del centro de eventos:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.event-hub-name=<your-event-hub-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    spring.cloud.azure.eventhubs.processor.consumer-group=$Default
    
  2. Cree una nueva clase Java EventHubProcessorClientConfiguration como se muestra en el ejemplo siguiente. Esta clase se usa para registrar el mensaje y el controlador de errores para EventProcessorClient.

    import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
    import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class EventHubProcessorClientConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubProcessorClientConfiguration.class);
    
        @Bean
        EventHubsRecordMessageListener processEvent() {
            return eventContext->LOGGER.info("Processing event from partition {} with sequence number {} with body: {}",
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
                eventContext.getEventData().getBodyAsString());
        }
    
        @Bean
        EventHubsErrorHandler processError() {
            return errorContext->LOGGER.info("Error occurred in partition processor for partition {}, {}",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        }
    
    }
    
  3. Inyecte EventProcessorClient y EventHubProducerClient en la aplicación Spring y llame a las API relacionadas para enviar y recibir mensajes, como se muestra en el ejemplo siguiente:

    import com.azure.messaging.eventhubs.EventData;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import java.util.Collections;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class EventHubClientApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class);
        private final EventHubProducerClient eventHubProducerClient;
        private final EventProcessorClient eventProcessorClient;
    
        public EventHubClientApplication(EventHubProducerClient eventHubProducerClient,
                                         EventProcessorClient eventProcessorClient) {
            this.eventHubProducerClient = eventHubProducerClient;
            this.eventProcessorClient = eventProcessorClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            eventProcessorClient.start();
            // Wait for the processor client to be ready
            TimeUnit.SECONDS.sleep(10);
    
            eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
            LOGGER.info("Successfully sent a message to Event Hubs.");
            eventHubProducerClient.close();
            LOGGER.info("Skip stopping and closing the processor since the processor may not complete the receiving process yet.");
        }
    
    }
    
  4. Inicie la aplicación. Se muestran registros similares al siguiente ejemplo:

    Successfully sent a message to Event Hubs.
    ...
    Processing event from partition 0 with sequence number 0 with body: Hello World
    ...
    Stopping and closing the processor.
    

Compilación del cliente mediante programación

Puede compilar los beans de cliente por su cuenta, pero el proceso es complicado. En las aplicaciones de Spring Boot, tiene que administrar propiedades, aprender el patrón de generador y registrar el cliente en el contexto de la aplicación Spring. Se muestra cómo hacerlo en los pasos siguientes.

  1. Cree una nueva clase Java EventHubClientConfiguration como se muestra en el ejemplo siguiente. Esta clase se usa para declarar los beans EventProcessorClient y EventHubProducerClient. Asegúrese de reemplazar los marcadores de posición <your event-hubs-namespace>, <your-event-hub-name>, <your-storage-account-name>, and <your-storage-account-container-name> por sus valores reales.

    import com.azure.identity.DefaultAzureCredentialBuilder;
    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.ErrorContext;
    import com.azure.messaging.eventhubs.models.EventContext;
    import com.azure.storage.blob.BlobContainerAsyncClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class EventHubClientConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientConfiguration.class);
        private static final String EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<your event-hubs-namespace>.servicebus.windows.net";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private static final String STORAGE_ACCOUNT_ENDPOINT = "https://<your-storage-account-name>.blob.core.windows.net";
        private static final String STORAGE_CONTAINER_NAME = "<your-storage-account-container-name>";
    
        @Bean
        EventHubClientBuilder eventHubClientBuilder() {
            return new EventHubClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME,
                new DefaultAzureCredentialBuilder()
                    .build());
        }
    
        @Bean
        BlobContainerClientBuilder blobContainerClientBuilder() {
            return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder()
                                                       .build())
                                                   .endpoint(STORAGE_ACCOUNT_ENDPOINT)
                                                   .containerName(STORAGE_CONTAINER_NAME);
        }
    
        @Bean
        BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) {
            return blobContainerClientBuilder.buildAsyncClient();
        }
    
        @Bean
        EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) {
            return new EventProcessorClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME,
                                                        new DefaultAzureCredentialBuilder()
                                                            .build())
                                                    .consumerGroup(CONSUMER_GROUP)
                                                    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                                                    .processEvent(EventHubClientConfiguration::processEvent)
                                                    .processError(EventHubClientConfiguration::processError);
        }
    
        @Bean
        EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) {
            return eventHubClientBuilder.buildProducerClient();
    
        }
    
        @Bean
        EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) {
            return eventProcessorClientBuilder.buildEventProcessorClient();
        }
    
        public static void processEvent(EventContext eventContext) {
            LOGGER.info("Processing event from partition {} with sequence number {} with body: {}",
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
                eventContext.getEventData().getBodyAsString());
        }
    
        public static void processError(ErrorContext errorContext) {
            LOGGER.info("Error occurred in partition processor for partition {}, {}",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        }
    
    }
    
  2. Inyecte EventProcessorClient y EventHubProducerClient en la aplicación Spring, como se muestra en el ejemplo siguiente:

    import com.azure.messaging.eventhubs.EventData;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import java.util.Collections;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class EventHubClientApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class);
        private final EventHubProducerClient eventHubProducerClient;
        private final EventProcessorClient eventProcessorClient;
    
        public EventHubClientApplication(EventHubProducerClient eventHubProducerClient,
                                         EventProcessorClient eventProcessorClient) {
            this.eventHubProducerClient = eventHubProducerClient;
            this.eventProcessorClient = eventProcessorClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            eventProcessorClient.start();
            // Wait for the processor client to be ready
            TimeUnit.SECONDS.sleep(10);
    
            eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
            LOGGER.info("Successfully sent a message to Event Hubs.");
            eventHubProducerClient.close();
            LOGGER.info("Stopping and closing the processor");
            eventProcessorClient.stop();
        }
    
    }
    
  3. Inicie la aplicación. Se muestran registros similares al siguiente ejemplo:

    Successfully sent a message to Event Hubs.
    ...
    Processing event from partition 0 with sequence number 0 with body: Hello World
    ...
    Stopping and closing the processor.
    

En la lista siguiente se muestran algunos motivos por los que este código no es flexible o correcto:

  • El espacio de nombres de Event Hubs y el nombre del centro de eventos están codificados de forma fija.
  • Si usa @Value para obtener configuraciones del entorno de Spring, no puede tener sugerencias del IDE en el archivo application.properties.
  • Si tiene un escenario de microservicio, debe duplicar el código de cada proyecto y es fácil cometer errores y es difícil ser coherente.

Afortunadamente, la creación de los beans de cliente por su cuenta no es necesaria con Spring Cloud Azure. En su lugar, puede inyectarlos directamente y usar las propiedades de configuración con las que ya está familiarizado para configurar la cola de Storage. Para más información, consulte Configuración de Azure de Spring Cloud.

Spring Cloud Azure también proporciona las siguientes configuraciones globales para diferentes escenarios. Para más información, consulte la sección Configuración global de los SDK de servicio de Azure de la configuración de Azure de Spring Cloud.

  • Opciones de proxy
  • Opciones de reintento.
  • Opciones de cliente de transporte de AMQP.

También puede conectarse a diferentes nubes de Azure. Para más información, consulte Conexión a diferentes nubes de Azure.

Uso de Spring Messaging Azure Event Hubs

El módulo Spring Messaging de Azure Event Hubs proporciona compatibilidad con spring Messaging Framework con Event Hubs.

Si usa Spring Messaging Azure Event Hubs, puede usar las siguientes características:

  • EventHubsTemplate: envía mensajes a Event Hubs de forma asincrónica y sincrónica.
  • @EventHubsListener: marca un método para que sea el destino de un agente de escucha de mensajes de Event Hubs en el destino.

En esta guía se muestra cómo usar Spring Messaging Azure Event Hubs para enviar y recibir mensajes de Event Hubs.

Adición de dependencias

Para instalar el módulo Spring Messaging de Azure Event Hubs, agregue las siguientes dependencias al archivo pom.xml :

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.20.0. Esta lista de materiales (BOM) debe configurarse en la <dependencyManagement> sección del archivo pom.xml . Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte La versión de Spring Cloud que se debe usar en Azure.

  • Los artefactos de Spring Cloud Azure Starter, Spring Messaging Event Hubs y Azure Event Hubs Checkpoint Store:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-messaging-azure-eventhubs</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    </dependency>
    

Codifica la aplicación para enviar y recibir mensajes

Para enviar y recibir mensajes desde Event Hubs, configure la aplicación mediante los pasos siguientes:

  1. Use las siguientes opciones de propiedades para configurar el espacio de nombres de Event Hubs y el blob de Storage:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. Cree una nueva clase Java ConsumerService como se muestra en el ejemplo siguiente. Esta clase se usa para definir un receptor de mensajes. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

    import com.azure.spring.messaging.eventhubs.implementation.core.annotation.EventHubsListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ConsumerService {
    
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$DEFAULT";
    
        @EventHubsListener(destination = EVENT_HUB_NAME, group = CONSUMER_GROUP)
        public void handleMessageFromEventHub(String message) {
            System.out.printf("New message received: %s%n", message);
        }
    
    }
    
  3. Conecte un emisor y un receptor para enviar y recibir mensajes con Spring, como se muestra en el ejemplo siguiente. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

    import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
    import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.messaging.support.MessageBuilder;
    
    @SpringBootApplication
    @EnableAzureMessaging
    public class EventHubMessagingApplication {
    
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubMessagingApplication.class);
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubMessagingApplication.class);
            EventHubsTemplate eventHubsTemplate = applicationContext.getBean(EventHubsTemplate.class);
            LOGGER.info("Sending a message to the Event Hubs.");
            eventHubsTemplate.sendAsync(EVENT_HUB_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe();
        }
    
    }
    

    Sugerencia

    Recuerde agregar la anotación @EnableAzureMessaging, que desencadena la detección de métodos anotados con @EventHubsListener, creando el contenedor del agente de escucha de mensajes en segundo plano.

  4. Inicie la aplicación. Se muestran registros similares al siguiente ejemplo:

    Sending a message to the Event Hubs.
    New message received: Hello world
    

Uso de Spring Integration Azure Event Hubs

El módulo Spring Integration Azure Event Hubs proporciona compatibilidad con el marco de integración de Spring con Event Hubs.

Si la aplicación Spring usa canales de mensajes de Spring Integration, puede enrutar mensajes entre los canales de mensajes y Event Hubs mediante adaptadores de canal.

Un adaptador de canal de entrada reenvía los mensajes de un centro de eventos a un canal de mensajes. Un adaptador de canal saliente publica mensajes de un canal de mensajes en un centro de eventos.

En esta guía se muestra cómo usar Spring Integration Azure Event Hubs para enviar y recibir mensajes de Event Hubs.

Adición de dependencias

Para instalar el módulo Spring Cloud Azure Event Hubs Integration Starter, agregue las siguientes dependencias al archivo pom.xml :

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.20.0. Esta lista de materiales (BOM) debe configurarse en la <dependencyManagement> sección del archivo pom.xml . Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte La versión de Spring Cloud que se debe usar en Azure.

  • El artefacto de Spring Cloud Azure Event Hubs Integration:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
    </dependency>
    

Codifica la aplicación para enviar y recibir mensajes

Para enviar y recibir mensajes desde Event Hubs, configure la aplicación mediante los pasos siguientes:

  1. Use las siguientes opciones de propiedades para configurar el espacio de nombres de Event Hubs y el blob de Storage:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. Cree una nueva clase Java MessageReceiveConfiguration como se muestra en el ejemplo siguiente. Esta clase se usa para definir un receptor de mensajes. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

    import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
    import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
    import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
    import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
    import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
    import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.messaging.MessageChannel;
    
    @Configuration
    public class MessageReceiveConfiguration {
    
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiveConfiguration.class);
    
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload) {
            String message = new String(payload);
            LOGGER.info("New message received: {}", message);
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENT_HUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                                                                    EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    
    }
    
  3. Cree una nueva clase Java MessageSendConfiguration como se muestra en el ejemplo siguiente. Esta clase se usa para definir un emisor de mensajes. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

    import com.azure.spring.integration.core.handler.DefaultMessageHandler;
    import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    @Configuration
    public class MessageSendConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MessageSendConfiguration.class);
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENT_HUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
    
            return handler;
        }
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    
    }
    
  4. Conecte un emisor y un receptor para enviar y recibir mensajes con Spring, como se muestra en el ejemplo siguiente:

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.config.EnableIntegration;
    
    @SpringBootApplication
    @EnableIntegration
    @Configuration(proxyBeanMethods = false)
    public class EventHubIntegrationApplication {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubIntegrationApplication.class, args);
            MessageSendConfiguration.EventHubOutboundGateway outboundGateway = applicationContext.getBean(MessageSendConfiguration.EventHubOutboundGateway.class);
            outboundGateway.send("Hello World");
        }
    }
    

    Sugerencia

    Recuerde agregar la anotación @EnableIntegration, que habilita la infraestructura de Spring Integration.

  5. Inicie la aplicación. Se muestran registros similares al siguiente ejemplo:

    Message was sent successfully.
    New message received: Hello World
    

Uso de Spring Cloud Azure Stream Event Hubs Binder

Para llamar a Event Hubs API en una aplicación spring Cloud Stream , use el módulo Spring Cloud Azure Event Hubs Stream Binder.

En esta guía se muestra cómo usar Spring Cloud Stream Event Hubs Binder para enviar y recibir mensajes de Event Hubs.

Adición de dependencias

Para instalar el módulo Spring Cloud Azure Event Hubs Stream Binder, agregue las siguientes dependencias al archivo pom.xml :

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.20.0. Esta lista de materiales (BOM) debe configurarse en la <dependencyManagement> sección del archivo pom.xml . Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte La versión de Spring Cloud que se debe usar en Azure.

  • El artefacto de Spring Cloud Azure Event Hubs Stream Binder:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Codifica la aplicación para enviar y recibir mensajes

Para enviar y recibir mensajes desde Event Hubs, configure la aplicación mediante los pasos siguientes:

  1. Use las siguientes opciones de propiedades para configurar el espacio de nombres de Event Hubs y el blob de Storage:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. Cree el receptor del mensaje.

    Para usar la aplicación como receptor de eventos, configure el enlazador de entrada completando las tareas siguientes:

    • Declare un bean Consumer que defina la lógica de control de mensajes. Por ejemplo, el bean siguiente Consumer se denomina consume:

      @Bean
      public Consumer<Message<String>> consume() {
           return message -> {
               System.out.printf("New message received: %s%n", message.getPayload());
           };
      }
      
    • Agregue la siguiente configuración para especificar el nombre Event Hub que se va a consumir. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

      # name for the above `Consumer` bean
      spring.cloud.stream.function.definition=consume
      spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name>
      spring.cloud.stream.bindings.consume-in-0.group=$Default
      spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
      
  3. Cree el emisor del mensaje.

    Para usar la aplicación como origen de eventos, configure el enlazador de salida completando las tareas siguientes:

    • Defina un bean Supplier que defina de dónde proceden los mensajes de la aplicación, como se muestra en el ejemplo siguiente:

      @Bean
      public Supplier<Message<String>> supply() {
          return () -> {
              System.out.println("Sending a message.");
              return MessageBuilder.withPayload("Hello world").build();
          };
      }
      
    • Agregue la siguiente configuración para especificar el nombre Event Hub que se va a enviar. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

      # "consume" is added from the above step
      spring.cloud.stream.function.definition=consume;supply
      spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
      
  4. Inicie la aplicación. Se muestran registros similares al siguiente ejemplo:

    Sending a message.
    New message received: Hello world.
    

Uso de Spring Kafka con Azure Event Hubs

Event Hubs proporciona un punto de conexión de Kafka que pueden usar las aplicaciones basadas en Apache Kafka existentes. Este enfoque proporciona una alternativa a ejecutar su propio clúster de Kafka. Event Hubs funciona con muchas de sus aplicaciones de Kafka actuales. Para más información, consulte Event Hubs para Apache Kafka.

En esta guía se muestra cómo usar Azure Event Hubs y Spring Kafka para enviar y recibir mensajes de Event Hubs.

Adición de dependencias

Para instalar los módulos Spring Cloud Azure Starter y Spring Kafka, agregue las siguientes dependencias al archivo pom.xml :

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.20.0. Esta lista de materiales (BOM) debe configurarse en la <dependencyManagement> sección del archivo pom.xml . Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte La versión de Spring Cloud que se debe usar en Azure.

  • El artefacto Spring Cloud Azure Starter y Spring Kafka:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    

Codifica la aplicación para enviar y recibir mensajes

Para enviar y recibir mensajes desde Event Hubs, configure la aplicación mediante los pasos siguientes:

  1. Use la siguiente configuración de propiedad para configurar el espacio de nombres de Event Hubs:

    spring.kafka.bootstrap-servers=<your event-hubs-namespace>.servicebus.windows.net:9093
    
  2. Use KafkaTemplate para enviar mensajes y @KafkaListener para recibir mensajes, como se muestra en el ejemplo siguiente. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootApplication
    public class EventHubKafkaApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaApplication.class);
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public EventHubKafkaApplication(KafkaTemplate<String, String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubKafkaApplication.class, args);
        }
    
        @Override
        public void run(String... args) {
            kafkaTemplate.send(EVENT_HUB_NAME, "Hello World");
            LOGGER.info("Message was sent successfully.");
        }
    
        @KafkaListener(topics = EVENT_HUB_NAME, groupId = CONSUMER_GROUP)
        public void receive(String message) {
            LOGGER.info("New message received: {}", message);
        }
    
    }
    
  3. Inicie la aplicación. Se muestran registros similares al siguiente ejemplo:

    Message was sent successfully.
    New message received: Hello world
    

Uso de Spring Cloud Stream Kafka Binder con Azure Event Hubs

Spring Cloud Stream es un marco que permite a los desarrolladores de aplicaciones escribir microservicios controlados por mensajes. El puente entre un sistema de mensajería y Spring Cloud Stream es a través de la abstracción del enlazador. Existen enlazadores para varios sistemas de mensajería, pero uno de los enlazadores más usados es para Apache Kafka.

En esta guía se muestra cómo usar Azure Event Hubs y Spring Cloud Stream Kafka Binder para enviar y recibir mensajes de Event Hubs.

Adición de dependencias

Para instalar el Spring Cloud Azure Starter y los módulos de Spring Cloud Stream Binder Kafka, agregue las siguientes dependencias a su archivo pom.xml:

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.20.0. Esta lista de materiales (BOM) debe configurarse en la <dependencyManagement> sección del archivo pom.xml . Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte La versión de Spring Cloud que se debe usar en Azure.

  • El artefacto de Spring Cloud Azure Starter:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    

Codifica la aplicación para enviar y recibir mensajes

Para enviar y recibir mensajes desde Event Hubs, configure la aplicación mediante los pasos siguientes:

  1. Use la siguiente configuración de propiedad para configurar el agente de Kafka:

    spring.cloud.stream.kafka.binder.brokers=<your event-hubs-namespace>.servicebus.windows.net:9093
    
  2. Cree el receptor del mensaje.

    Para usar la aplicación como receptor de eventos, configure el enlazador de entrada completando las tareas siguientes:

    • Declare un bean Consumer que defina la lógica de control de mensajes. Por ejemplo, el bean siguiente Consumer se denomina consume:

      @Bean
      public Consumer<Message<String>> consume() {
          return message -> {
              System.out.printf("New message received: %s%n", message.getPayload());
          };
      }
      
    • Agregue la siguiente configuración para especificar el nombre Event Hub que se va a consumir. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

      # name for the above `Consumer` bean
      spring.cloud.stream.function.definition=consume
      spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name>
      spring.cloud.stream.bindings.consume-in-0.group=$Default
      
  3. Cree el emisor del mensaje.

    Para usar la aplicación como origen de eventos, configure el enlazador de salida completando las tareas siguientes:

    • Defina un bean Supplier que defina de dónde proceden los mensajes de la aplicación, como se muestra en el ejemplo siguiente:

      @Bean
      public Supplier<Message<String>> supply() {
          return () -> {
              System.out.println("Sending a message.");
              return MessageBuilder.withPayload("Hello world").build();
          };
      }
      
    • Agregue la siguiente configuración para especificar el nombre Event Hub que se va a enviar. Asegúrese de reemplazar el marcador de posición <your-event-hub-name> por el valor real.

      # "consume" is added from the above step
      spring.cloud.stream.function.definition=consume;supply
      spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
      
  4. Inicie la aplicación. Se muestran registros similares al siguiente ejemplo:

    Sending a message.
    New message received: Hello world.
    

Implementación en Azure Spring Apps

Ahora que tiene la aplicación Spring Boot que se ejecuta localmente, es el momento de moverla a producción. Azure Spring Apps facilita la implementación de aplicaciones de Spring Boot en Azure sin cambios en el código. El servicio administra la infraestructura de las aplicaciones de Spring, con el fin de que los desarrolladores puedan centrarse en el código. Azure Spring Apps proporciona administración del ciclo de vida mediante el uso de una supervisión y un diagnóstico completos, administración de la configuración, detección de servicios, integración de CI/CD e implementaciones azul-verde, entre otros. Para implementar la aplicación en Azure Spring Apps, consulte Implementación de la primera aplicación en Azure Spring Apps.

Pasos siguientes

Consulte también

Para obtener más información sobre los otros iniciadores de Spring Boot que están disponibles para Microsoft Azure, consulte ¿Qué es Spring Cloud Azure?