Spring Cloud Stream con Azure Service Bus
En este artículo se muestra cómo usar Spring Cloud Stream Binder para enviar y recibir mensajes de queues
y topics
de Azure Service Bus.
Azure proporciona una plataforma de mensajería asincrónica denominada Azure Service Bus ("Service Bus") que se basa en el estándar Advanced Message Queueing Protocol 1.0 (AMQP 1.0). Service Bus puede usarse en el intervalo de plataformas de Azure admitidas.
Requisitos previos
Una suscripción a Azure: cree una cuenta gratuita.
Java Development Kit (JDK) versión 8 o posterior.
Apache Maven, versión 3.2 o posterior.
cURL o una utilidad HTTP similar para probar la funcionalidad.
Cola o tema para Azure Service Bus. Si no tiene una, cree una cola de Service Bus o cree un tema de Service Bus.
Una aplicación de Spring Boot. Si no tiene ninguna, cree un proyecto de Maven con Spring Initializr. Asegúrese de seleccionar Proyecto de Maven y, en Dependencias, agregue las dependencias de Spring Web y Soporte técnico de Azure y, después, seleccione Java versión 8 o posterior.
Nota:
Para conceder a la cuenta acceso a los recursos de Azure Service Bus, asigne el Azure Service Bus Data Sender
rol y Azure Service Bus Data Receiver
a la cuenta de Azure AD que usa actualmente. Para más información sobre cómo conceder roles de acceso, consulte Asignación de roles de Azure mediante el Azure Portal y Autenticación y autorización de una aplicación con Azure Active Directory para acceder a Azure Service Bus entidades.
Importante
Se requiere Spring Boot versión 2.5 o posterior para completar los pasos de este artículo.
Envío y recepción de mensajes de Azure Service Bus
Con una cola o tema para Azure Service Bus, puede enviar y recibir mensajes mediante Spring Cloud Azure Stream Binder Service Bus.
Para instalar el módulo Spring Cloud Azure Stream Binder Service Bus, agregue las siguientes dependencias al archivo pom.xml :
La lista de materiales (BOM) de Azure de Spring Cloud:
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>4.8.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Nota
Si usa Spring Boot 3.x, asegúrese de establecer la
spring-cloud-azure-dependencies
versión5.2.0
en . Para más información sobre laspring-cloud-azure-dependencies
versión, consulte Qué versión de Spring Cloud Azure debería usar.El artefacto de Spring Cloud Azure Stream Binder Service Bus:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Incorporación del código de la aplicación
Siga estos pasos para configurar la aplicación para que use una cola o un tema de Service Bus para enviar y recibir mensajes.
Configure las credenciales de Service Bus en el archivo
application.properties
de configuración .spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=1000 spring.cloud.stream.poller.initial-delay=0
En la tabla siguiente se describen los campos de la configuración:
Campo Descripción spring.cloud.azure.servicebus.namespace
Especifique el espacio de nombres que obtuvo en service Bus desde el Azure Portal. spring.cloud.stream.bindings.consume-in-0.destination
Especifique la cola o el tema de Service Bus que usó en este tutorial. spring.cloud.stream.bindings.supply-out-0.destination
Especifique el mismo valor que se usó para el destino de entrada. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Especifique si se van a liquidar los mensajes automáticamente. Si se establece como false, se agregará un encabezado de mensaje de Checkpointer
para permitir a los desarrolladores liquidar los mensajes manualmente.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Especifique el tipo de entidad para el enlace de salida, puede ser queue
otopic
.spring.cloud.function.definition
Especifique el bean funcional que se va a enlazar a los destinos externos expuestos por los enlaces. spring.cloud.stream.poller.fixed-delay
Especifique un retraso fijo para el sondeo predeterminado en milisegundos. El valor predeterminado es 1000 L. spring.cloud.stream.poller.initial-delay
Especifique el retraso inicial para los desencadenadores periódicos. El valor predeterminado es 0. Edite el archivo de clase de inicio para mostrar el siguiente contenido.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello Word"); many.emitNext(MessageBuilder.withPayload("Hello Word").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Sugerencia
En este tutorial, no hay ninguna operación de autenticación en las configuraciones ni en el código. Sin embargo, la conexión a los servicios de Azure requiere autenticación. Para completar la autenticación, debe usar Azure Identity. Spring Cloud Azure usa
DefaultAzureCredential
, que la biblioteca de identidades de Azure proporciona para ayudarle a obtener credenciales sin cambios en el código.DefaultAzureCredential
admite varios métodos de autenticación y determina qué método se usa en tiempo de ejecución. Este enfoque permite que la aplicación use distintos métodos de autenticación en entornos diferentes (como entornos locales y de producción) sin implementar código específico del entorno. Para más información, consulte la sección Credencial de Azure predeterminada de Autenticación de aplicaciones de Java hospedadas en Azure.Para completar la autenticación en entornos de desarrollo local, puede usar la CLI de Azure, Visual Studio Code, PowerShell u otros métodos. Para más información, consulte Autenticación de Azure en entornos de desarrollo de Java. Para completar la autenticación en entornos de hospedaje de Azure, se recomienda usar la identidad administrada. Para obtener más información, consulte ¿Qué son las identidades administradas para recursos de Azure?
Inicie la aplicación. Los mensajes como el ejemplo siguiente se publicarán en el registro de la aplicación:
New message received: 'Hello Word' Message 'Hello Word' successfully checkpointed