Spring Cloud Stream avec Azure Service Bus

Cet article explique comment utiliser Spring Cloud Stream Binder pour échanger des messages avec des queues et topics Service Bus.

Azure fournit une plateforme de messagerie asynchrone appelée Azure Service Bus (« Service Bus ») basée sur la norme Advanced Message Queuing Protocol 1.0 (« AMQP 1.0 »). Le plateforme Service Bus peut être utilisée sur toute la gamme de plateformes Azure prises en charge.

Prérequis

Remarque

Pour accorder à votre compte l’accès à vos ressources Azure Service Bus, attribuez le rôle et Azure Service Bus Data Receiver le Azure Service Bus Data Sender rôle au compte Microsoft Entra que vous utilisez actuellement. Pour plus d’informations sur l’octroi de rôles d’accès, consultez Affecter des rôles Azure à l’aide des Portail Azure et authentifier et autoriser une application avec Microsoft Entra ID pour accéder aux entités Azure Service Bus.

Important

Spring Boot version 2.5 ou ultérieure est nécessaire pour effectuer les étapes décrites dans cet article.

Envoyer et recevoir des messages d’Azure Service Bus

Avec une file d’attente ou une rubrique pour Azure Service Bus, vous pouvez envoyer et recevoir des messages à l’aide de Spring Cloud Azure Stream Binder Service Bus.

Pour installer le module Spring Cloud Azure Stream Binder Service Bus, ajoutez les dépendances suivantes à votre fichier pom.xml :

  • The Spring Cloud Azure Bill of Materials (BOM) :

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.10.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Remarque

    Si vous utilisez Spring Boot 2.x, veillez à définir la spring-cloud-azure-dependencies version 4.16.0sur . Cette facture de matériel (BOM) doit être configurée dans la <dependencyManagement> section de votre fichier pom.xml . Cela garantit que toutes les dépendances Azure Spring Cloud utilisent la même version. Pour plus d’informations sur la version utilisée pour ce boM, consultez La version de Spring Cloud Azure à utiliser.

  • Artefact Spring Cloud Azure Stream Binder Service Bus :

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Coder l’application

Procédez comme suit pour configurer votre application afin d’utiliser une file d’attente ou une rubrique Service Bus pour envoyer et recevoir des messages.

  1. Configurez les informations d’identification Service Bus dans le fichier application.propertiesde configuration.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Le tableau suivant décrit les champs de la configuration :

    Champ Description
    spring.cloud.azure.servicebus.namespace Spécifiez l’espace de noms que vous avez obtenu dans votre Service Bus à partir de la Portail Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Spécifiez la file d’attente ou la rubrique Service Bus que vous avez utilisées dans ce didacticiel.
    spring.cloud.stream.bindings.supply-out-0.destination Spécifiez la même valeur que celle utilisée pour la destination d’entrée.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Spécifiez s’il faut régler automatiquement les messages. Si la valeur est false, un en-tête de message est ajouté pour permettre aux développeurs de Checkpointer régler manuellement les messages.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Spécifiez le type d’entité pour la liaison de sortie, peut être queue ou topic.
    spring.cloud.function.definition Spécifiez le composant Bean fonctionnel à lier aux destinations externes qui sont exposées par les liaisons.
    spring.cloud.stream.poller.fixed-delay Spécifiez un délai fixe pour l’polleur par défaut en millisecondes. La valeur par défaut est 1 000 L. La valeur recommandée est 6 0000.
    spring.cloud.stream.poller.initial-delay Spécifiez le délai initial pour les déclencheurs périodiques. La valeur par défaut est 0.
  2. Modifiez le fichier de classe de démarrage pour afficher le contenu suivant.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Conseil

    Dans ce tutoriel, il n’existe aucune opération d’authentification dans les configurations ou le code. Toutefois, la connexion aux services Azure nécessite une authentification. Pour effectuer l’authentification, vous devez utiliser Identité Azure. Spring Cloud Azure utilise DefaultAzureCredential, que la bibliothèque d’identités Azure fournit pour vous aider à obtenir des informations d’identification sans aucune modification du code.

    DefaultAzureCredential prend en charge plusieurs méthodes d’authentification et détermine quelle méthode doit être utilisée au moment de l’exécution. Cette approche permet à votre application d’utiliser différentes méthodes d’authentification dans différents environnements (tels que les environnements locaux et de production) sans implémenter de code spécifique à l’environnement. Pour plus d’informations, consultez DefaultAzureCredential.

    Pour terminer l’authentification dans les environnements de développement locaux, vous pouvez utiliser Azure CLI, Visual Studio Code, PowerShell ou d’autres méthodes. Pour plus d’informations, consultez l’authentification Azure dans les environnements de développement Java. Pour terminer l’authentification dans les environnements d’hébergement Azure, nous vous recommandons d’utiliser l’identité managée affectée par l’utilisateur. Pour plus d’informations, consultez Que sont les identités managées pour les ressources Azure ?

  3. Lancez l’application. Les messages comme l’exemple suivant seront publiés dans le journal de votre application :

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Étapes suivantes