bibliothèque cliente Azure Service Bus pour Java - version 7.14.4

Microsoft Azure Service Bus est un courtier de messages d’intégration d’entreprise entièrement géré. Service Bus peut découpler les applications et les services. Service Bus offre également une plateforme fiable et sécurisée pour le transfert asynchrone des données et de l’état. Les données sont transférées entre différents services et applications à l’aide de messages. Si vous souhaitez en savoir plus sur Azure Service Bus, vous pouvez consulter : Qu’est-ce que Service Bus

La bibliothèque cliente Azure Service Bus permet d’envoyer et de recevoir des messages Azure Service Bus et peut être utilisée pour :

  • transfert de données d’entreprise, telles que les ventes ou les bons de commande, les journaux ou les mouvements de stock.
  • Dissocier les applications pour améliorer la fiabilité et la scalabilité des applications et des services. Les clients et les services n’ont pas besoin d’être en ligne en même temps.
  • autorisation des relations 1:n entre les éditeurs et les abonnés.
  • implémentation des flux de travail qui nécessitent le classement des messages ou le report de message.

| Code sourceDocumentation de référence sur les | API | Documentation produitÉchantillons | Package (Maven)

Prise en main

Prérequis

Pour créer rapidement les ressources Service Bus nécessaires dans Azure et recevoir une chaîne de connexion, vous pouvez déployer notre exemple de modèle en cliquant sur :

Inclure le package

Inclure le fichier de nomenclature

Incluez azure-sdk-bom dans votre projet pour dépendre de la version de disponibilité générale de la bibliothèque. Dans l’extrait de code suivant, remplacez l’espace réservé {bom_version_to_target} par le numéro de version. Pour en savoir plus sur la nomenclature, consultez le README BOM du KIT DE DÉVELOPPEMENT LOGICIEL AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Incluez ensuite la dépendance directe dans la section des dépendances sans la balise de version.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
  </dependency>
</dependencies>

Inclure une dépendance directe

Si vous souhaitez dépendre d’une version particulière de la bibliothèque qui n’est pas présente dans la nomenclature, ajoutez la dépendance directe à votre projet comme suit.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.14.4</version>
</dependency>

Authentifier le client

Pour que la bibliothèque cliente Service Bus interagisse avec Service Bus, elle doit comprendre comment se connecter et autoriser avec elle.

Créer des clients Service Bus à l’aide d’une chaîne de connexion

Le moyen le plus simple pour l’authentification consiste à utiliser une chaîne de connexion, qui a été créée automatiquement lors de la création d’un espace de noms Service Bus. Si vous n’êtes pas familiarisé avec les stratégies d’accès partagé dans Azure, vous pouvez suivre le guide pas à pas pour obtenir une chaîne de connexion Service Bus.

Les clients d’expéditeur et de récepteur Service Bus asynchrones et synchrones sont instanciés à l’aide ServiceBusClientBuilderde . Les extraits de code ci-dessous créent respectivement un expéditeur Service Bus synchrone et un récepteur asynchrone.

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver()
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .buildAsyncClient();

Créer un client Service Bus à l’aide de la plateforme d’identités Microsoft (anciennement Azure Active Directory)

Le Kit de développement logiciel (SDK) Azure pour Java prend en charge le package Azure Identity, ce qui simplifie l’obtention des informations d’identification à partir du Plateforme d'identités Microsoft. Tout d’abord, ajoutez le package :

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>
  • Problème connu : le fichier pom.xml doit être listé azure-messaging-servicebus avant azure-identity les bibliothèques clientes. Ce problème est résolu avec azure-identity:1.2.1. Pour plus d’informations, cliquez ici.

Les méthodes implémentées pour demander des informations d’identification se trouvent sous le com.azure.identity.credential package. L’exemple ci-dessous montre comment utiliser une clé secrète client d’application Azure Active Directory (AAD) pour autoriser avec Azure Service Bus.

Autorisation avec DefaultAzureCredential

L’autorisation est plus simple à l’aide de DefaultAzureCredential. Il trouve les meilleures informations d’identification à utiliser dans son environnement d’exécution. Pour plus d’informations sur l’utilisation de l’autorisation Azure Active Directory avec Service Bus, reportez-vous à la documentation associée.

Utilisez les informations d’identification de jeton retournées pour authentifier le client :

TokenCredential credential = new DefaultAzureCredentialBuilder()
    .build();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .credential("<<fully-qualified-namespace>>", credential)
    .receiver()
    .queueName("<<queue-name>>")
    .buildAsyncClient();

Concepts clés

Vous pouvez interagir avec les types de ressources principaux au sein d’un espace de noms Service Bus, dont plusieurs peuvent exister et sur lesquels la transmission de messages réelle a lieu. L’espace de noms sert souvent de conteneur d’application :

  • Une file d’attente permet l’envoi et la réception de messages, classés premier entrant, premier sorti. Il est souvent utilisé pour la communication point à point.
  • Une rubrique est mieux adaptée aux scénarios d’éditeur et d’abonné. Une rubrique publie des messages dans des abonnements, dont plusieurs peuvent exister simultanément.
  • Un abonnement reçoit des messages d’une rubrique. Chaque abonnement est indépendant et reçoit une copie du message envoyé à la rubrique.

Service Bus Clients

Le générateur ServiceBusClientBuilder est utilisé pour créer tous les clients Service Bus.

Exemples

Envoyer des messages

Vous devez créer un asynchrone ServiceBusSenderAsyncClient ou un synchrone ServiceBusSenderClient pour envoyer des messages. Chaque expéditeur peut envoyer des messages à une file d’attente ou à une rubrique.

L’extrait de code ci-dessous crée un synchrone ServiceBusSenderClient pour publier un message dans une file d’attente.

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
    new ServiceBusMessage("Hello world").setMessageId("1"),
    new ServiceBusMessage("Bonjour").setMessageId("2"));

sender.sendMessages(messages);

// When you are done using the sender, dispose of it.
sender.close();

Recevoir des messages

Pour recevoir des messages, vous devez créer un ServiceBusProcessorClient avec des rappels pour les messages entrants et toute erreur qui se produit dans le processus. Vous pouvez ensuite démarrer et arrêter le client en fonction des besoins.

Lors de la réception d’un message en mode PeekLock , il indique au répartiteur que la logique d’application souhaite régler explicitement les messages reçus (par exemple, terminer, abandonner).

// Sample code that processes a single message which is received in PeekLock mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
    // handling message reaches desired state such that it doesn't require Service Bus to redeliver
    // the same message, then context.complete() should be called otherwise context.abandon().
    final boolean success = Math.random() < 0.5;
    if (success) {
        try {
            context.complete();
        } catch (Exception completionError) {
            System.out.printf("Completion of the message %s failed\n", message.getMessageId());
            completionError.printStackTrace();
        }
    } else {
        try {
            context.abandon();
        } catch (Exception abandonError) {
            System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
            abandonError.printStackTrace();
        }
    }
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                                .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
                                .processor()
                                .queueName("<< QUEUE NAME >>")
                                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                                .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
                                .processMessage(processMessage)
                                .processError(processError)
                                .disableAutoComplete()
                                .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

Lors de la réception d’un message en mode ReceiveAndDelete , indique au répartiteur de considérer tous les messages qu’il envoie au client de réception comme réglés lors de l’envoi.

// Sample code that processes a single message which is received in ReceiveAndDelete mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("handler processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
        message.getSequenceNumber(), message.getBody());
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .processor()
    .queueName("<< QUEUE NAME >>")
    .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

Il existe quatre façons de régler les messages à l’aide des méthodes sur le contexte de message passé à votre rappel.

  • Terminé : entraîne la suppression du message de la file d’attente ou de la rubrique.
  • Abandon : libère le verrou du récepteur sur le message, ce qui permet à d’autres récepteurs de recevoir le message.
  • Différer : diffère la réception du message par des moyens normaux. Pour recevoir des messages différés, le numéro de séquence du message doit être conservé.
  • Lettre morte : déplace le message vers la file d’attente de lettres mortes. Cela empêchera la réception du message à nouveau. Pour recevoir des messages de la file d’attente de lettres mortes, un récepteur limité à la file d’attente de lettres mortes est nécessaire.

Envoyer et recevoir à partir de files d’attente ou de rubriques activées pour la session

L’utilisation de sessions vous oblige à créer une file d’attente ou un abonnement activé pour la session. Pour plus d’informations sur la configuration de ce paramètre, consultez « Sessions de messages ».

Les sessions Azure Service Bus permettent un traitement conjoint et chronologique de séquences illimitées de messages associés. Vous pouvez utiliser des sessions dans des modèles premier entré, premier sorti (FIFO) et requête-réponse. Tout expéditeur peut créer une session lors de l’envoi de messages dans une rubrique ou une file d’attente en définissant la ServiceBusMessage.setSessionId(String) propriété sur un identificateur défini par l’application qui est propre à la session.

Contrairement aux files d’attente ou aux abonnements non activés pour la session, un seul récepteur peut lire à tout moment à partir d’une session. Lorsqu’un récepteur récupère une session, Service Bus verrouille la session pour ce récepteur et dispose d’un accès exclusif aux messages de cette session.

Envoyer un message à une session

Créez un ServiceBusSenderClient pour une file d’attente ou un abonnement à une rubrique activé pour une session. La définition ServiceBusMessage.setSessionId(String) sur un ServiceBusMessage publiera le message dans cette session. Si la session n’existe pas, elle est créée.

// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
    .setSessionId("greetings");

sender.sendMessage(message);

Recevoir des messages d’une session

La réception de messages à partir de sessions est similaire à la réception de messages provenant d’une file d’attente ou d’un abonnement non activé pour la session. La différence réside dans le générateur et la classe que vous utilisez.

En cas de non-session, vous utiliserez le sous-générateur processor(). Dans le cas de sessions, vous utilisez le sous-générateur sessionProcessor(). Les deux sous-générateurs créent un instance de ServiceBusProcessorClient configuré pour fonctionner sur une session ou une entité Service Bus non session. Dans le cas du processeur de session, vous pouvez également passer le nombre maximal de sessions que vous souhaitez que le processeur traite simultanément.

Créer un récepteur de file d’attente de lettres mortes

Azure Service Bus files d’attente et les abonnements aux rubriques fournissent une sous-file d’attente secondaire, appelée file d’attente de lettres mortes (DLQ). La file d’attente de lettres mortes n’a pas besoin d’être explicitement créée et ne peut pas être supprimée ni gérée indépendamment de l’entité principale. Pour les abonnements à la rubrique ou à la file d’attente non session activés ou non, le récepteur de lettres mortes peut être créé de la même façon qu’indiqué ci-dessous. Pour en savoir plus sur la file d’attente de lettres mortes , cliquez ici.

ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver() // Use this for session or non-session enabled queue or topic/subscriptions
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .subQueue(SubQueue.DEAD_LETTER_QUEUE)
    .buildClient();

Partage de la connexion entre les clients

La création d’une connexion physique à Service Bus nécessite des ressources. Une application doit partager la connexion
entre les clients, ce qui peut être réalisé en partageant le générateur de niveau supérieur, comme indiqué ci-dessous.

// Create shared builder.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>");
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
    .receiver()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();

Quand utiliser « ServiceBusProcessorClient ».

Quand utiliser « ServiceBusProcessorClient », « ServiceBusReceiverClient » ou ServiceBusReceiverAsyncClient ? Le processeur est créé à l’aide de « ServiceBusReceiverAsyncClient », il fournit un moyen pratique de recevoir des messages avec la saisie automatique par défaut et le renouvellement automatique des verrous de messages en mode « PEEK_LOCK ». Le processeur est approprié lorsque les applications n’ont pas effectué un déplacement complet vers le client récepteur asynchrone et souhaitent traiter le message en mode synchrone. Le processeur reçoit des messages pour toujours, car il récupère des erreurs réseau en interne. L’appel de fonction « ServiceBusProcessorClient:processMessage() » est effectué pour chaque message. Vous pouvez également utiliser « ServiceBusReceiverClient », il s’agit d’un client de niveau inférieur et fournit un plus large éventail d’API. Si le traitement asynchrone est
adapté à votre application, vous pouvez utiliser « ServiceBusReceiverAsyncClient ».

Dépannage

Activer la journalisation du client

Le Kit de développement logiciel (SDK) Azure pour Java offre une journalisation cohérente pour aider à résoudre les erreurs d’application et à accélérer leur résolution. Les journaux produits capturent le flux d’une application avant d’atteindre l’état terminal pour faciliter la localisation du problème racine. Consultez le wiki de journalisation pour obtenir des conseils sur l’activation de la journalisation.

Activer la journalisation du transport AMQP

Si l’activation de la journalisation client n’est pas suffisante pour diagnostiquer vos problèmes. Vous pouvez activer la journalisation dans un fichier dans la bibliothèque AMQP sous-jacente, Qpid Proton-J. Qpid Proton-J utilise java.util.logging. Vous pouvez activer la journalisation en créant un fichier de configuration avec le contenu ci-dessous. Ou définissez proton.trace.level=ALL et les options de configuration souhaitées pour l’implémentation java.util.logging.Handler . Les classes d’implémentation et leurs options sont disponibles dans Java 8 SDK javadoc.

Pour suivre les trames de transport AMQP, définissez la variable d’environnement : PN_TRACE_FRM=1.

Exemple de fichier « logging.properties »

Le fichier de configuration ci-dessous consigne la sortie de trace de proton-j vers le fichier « proton-trace.log ».

handlers=java.util.logging.FileHandler
.level=OFF
proton.trace.level=ALL
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=proton-trace.log
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n

Exceptions courantes

Exception AMQP

Il s’agit d’une exception générale pour les échecs liés à AMQP, qui inclut les erreurs AMQP en tant que ErrorCondition et le contexte à l’origine de cette exception en tant que AmqpErrorContext. isTransient est une valeur booléenne indiquant si l’exception est une erreur temporaire ou non. Si une exception AMQP temporaire se produit, la bibliothèque cliente retente l’opération autant de fois que amqpRetryOptions le permet. Après les mots, l’opération échoue et une exception est propagée à l’utilisateur.

AmqpErrorCondition contient des conditions d’erreur communes au protocole AMQP et utilisées par les services Azure. Lorsqu’une exception AMQP est levée, l’examen du champ de condition d’erreur peut indiquer aux développeurs pourquoi l’exception AMQP s’est produite et, si possible, comment atténuer cette exception. Vous trouverez la liste de toutes les exceptions AMQP dans les erreurs de transport OASIS AMQP version 1.0.

La méthode recommandée pour résoudre l’exception spécifique que représente l’exception AMQP consiste à suivre les instructions relatives aux exceptions de messagerie Service Bus .

Présentation du comportement des API

Le document fournit ici des informations sur le comportement attendu de l’API synchrone receiveMessages lors de son utilisation pour obtenir plusieurs messages (c’est-à-dire une préversion implicite).

Étapes suivantes

Au-delà de ceux abordés, la bibliothèque cliente Azure Service Bus offre une prise en charge de nombreux scénarios supplémentaires pour vous aider à tirer parti de l’ensemble de fonctionnalités complet du service Azure Service Bus. Pour vous aider à explorer certains de ces scénarios, l’ensemble d’exemples suivant est disponible ici.

Contribution

Si vous souhaitez devenir un contributeur actif à ce projet, consultez nos Lignes directrices sur les contributions pour plus d’informations.

Impressions