Verwenden von Java Message Service 1.1 mit Azure Service Bus Standard und AMQP 1.0

Warnung

Dieser Artikel bezieht sich auf die eingeschränkte Unterstützung für die JMS 1.1-API (Java Message Service). Diese ist nur im Standard-Tarif von Azure Service Bus verfügbar.

Vollständige Unterstützung für die Java Message Service 2.0-API ist nur für die Premium-Dienstebene von Azure Service Bus verfügbar. Es wird empfohlen, diesen Tarif zu verwenden.

In diesem Artikel wird beschrieben, wie die Messagingfeatures von Service Bus in Java-Anwendungen mit dem verbreiteten API-Standard JMS verwendet werden. Zu diesen Messagingfeatures zählen Warteschlangen und das Veröffentlichen oder Abonnieren von Themen. In einem verwandten Artikel wird erläutert, wie Sie dieselbe Aufgabe mithilfe der .NET-API für Azure Service Bus erledigen. In diesen beiden Artikeln zusammen finden Sie Informationen zum plattformübergreifenden Messaging unter Verwendung des Advance Message Queuing Protocol (AMQP) 1.0.

AMQP 1.0 ist ein effizientes, zuverlässiges Messagingprotokoll auf Wire-Ebene, mit dem Sie robuste und plattformübergreifende Messaginganwendungen erstellen können.

Unterstützung für AMQP 1.0 in Service Bus bedeutet, dass Sie die Brokermessagingfeatures für Warteschlangen und Veröffentlichen bzw. Abonnieren mithilfe eines effizienten Binärprotokolls auf unterschiedlichen Plattformen nutzen können. Sie können auch Anwendungen erstellen, deren Komponenten mit verschiedenen Sprachen, Frameworks und Betriebssystemen erstellt wurden.

Erste Schritte mit Service Bus

In diesem Artikel wird davon ausgegangen, dass Sie bereits über einen Service Bus-Namespace verfügen, der eine Warteschlange mit dem Namen basicqueue enthält. Andernfalls können Sie den Namespace und die Warteschlange im Azure-Portal erstellen. Weitere Informationen zum Erstellen von Namespaces und Warteschlangen für Service Bus finden Sie unter Erste Schritte mit Service Bus-Warteschlangen.

Hinweis

Partitionierte Warteschlangen und Themen unterstützen zudem AMQP. Weitere Informationen finden Sie unter Partitionierte Messagingentitäten und AMQP 1.0-Unterstützung für partitionierte Warteschlangen und Themen von Service Bus.

Herunterladen der AMQP 1.0-JMS-Clientbibliothek

Informationen zum Herunterladen der neuesten Version der AMQP 1.0-JMS-Clientbibliothek Apache Qpid finden Sie auf der Downloadwebsite für Apache Qpid.

Folgende JAR-Dateien aus dem Apache Qpid-JMS-AMQP 1.0-Verteilungsarchiv müssen der Java-Umgebungsvariable „CLASSPATH“ hinzugefügt werden, wenn Sie JMS-Anwendungen mit Service Bus erstellen und ausführen:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Hinweis

Die JAR-Namen und -Versionen für JMS haben sich möglicherweise geändert. Weitere Informationen finden Sie unter Qpid-JMS-AMQP 1.0.

Programmieren von Java-Anwendungen

Java Naming and Directory Interface (Java Benennungs- und Verzeichnisschnittstelle)

JMS verwendet die Java Naming and Directory Interface (JNDI), um eine Trennung zwischen logischen und physischen Namen umzusetzen. Mit JNDI werden zwei Arten von JMS-Objekten aufgelöst: ConnectionFactory und Destination. JNDI verwendet ein Anbietermodell, das Sie mit verschiedenen Verzeichnisdiensten verbinden können, um Namensauflösungsfunktionen zu implementieren. Die Apache Qpid-JMS-AMQP 1.0-Bibliothek enthält einen einfachen JNDI-Anbieter, der mit einer Eigenschaftendatei im folgenden Format konfiguriert wird:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Einrichten des JNDI-Kontexts und Konfigurieren des ConnectionFactory-Objekts

Die Verbindungszeichenfolge, auf die verwiesen wird, ist in den SAS-Richtlinien im Azure-Portal unter Primäre Verbindungszeichenfolge angegeben.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Konfigurieren von Zielwarteschlangen für Producer und Consumer

Der Eintrag zum Definieren eines Ziels in der Qpid-Eigenschaftendatei für den JNDI-Anbieter hat das folgende Format:

So erstellen Sie eine Zielwarteschlange für den Producer

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

So erstellen Sie eine Zielwarteschlange für den Consumer

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Schreiben der JMS-Anwendung

Für die Verwendung von JMS mit Service Bus werden keine speziellen APIs oder Optionen benötigt. Es gibt jedoch einige Einschränkungen, die weiter unten erläutert werden. Wie bei allen JMS-Anwendungen müssen Sie zuerst die JNDI-Umgebung konfigurieren, um ein ConnectionFactory-Objekt und Ziele auflösen zu können.

Konfigurieren des JNDI-Objekts InitialContext

Zur Konfiguration der JNDI-Umgebung wird eine Hashtabelle mit Konfigurationsinformationen an den Konstruktor der Klasse „javax.naming.InitialContext“ übergeben. Obligatorisch sind in der Hashtabelle zwei Elemente: der Klassenname der InitialContext-Factory und die Anbieter-URL. Der folgende Code zeigt, wie Sie die JNDI-Umgebung mithilfe des JNDI-Anbieters für Qpid mit der Eigenschaftendatei servicebus.properties konfigurieren.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Einfache JMS-Anwendung mit einer Service Bus-Warteschlange

Das folgende Beispielprogramm sendet JMS-Textnachrichten an eine Service Bus-Warteschlange mit dem logischen JNDI-Namen „QUEUE“ und empfängt die zurückkommenden Nachrichten.

Den gesamten Quellcode und die Konfigurationsinformationen finden Sie in den Azure Service Bus-Beispielen des Schnellstarts zu JMS-Warteschlangen.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Ausführen der Anwendung

Übergeben Sie die Verbindungszeichenfolge aus den freigegebenen Zugriffsrichtlinien, um die Anwendung auszuführen. Die folgende Ausgabe entsteht durch Ausführen der Anwendung:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP-Disposition und Service Bus-Vorgangszuordnung

Hier sehen Sie die Umwandlung einer AMQP-Disposition in einen Service Bus-Vorgang:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS-Themen im Vergleich zu Service Bus-Themen

Durch die Verwendung von Service Bus-Themen und -Abonnements über die JMS-API werden grundlegende Sende- und Empfangsfunktionen bereitgestellt. Dies ist praktisch, wenn Sie Anwendungen von anderen Nachrichtenbrokern mit JMS-konformen APIs portieren, obwohl die Service Bus-Themen sich von den JMS-Themen unterscheiden und einige Anpassungen erfordern.

Service Bus-Themen leiten Nachrichten in benannte, freigegebene und dauerhafte Abonnements weiter, die über die Azure Resource Manager-Benutzeroberfläche, die Azure-Befehlszeilentools oder das Azure-Portal verwaltet werden. Jedes Abonnement kann bis zu 2.000 Auswahlregeln aufweisen, wobei jede Regel eine Filterbedingung und bei SQL-Filtern auch eine Metadaten-Transformationsaktion aufweisen kann. Für jede Übereinstimmung einer Filterbedingung wird die Eingabenachricht ausgewählt, die in das Abonnement kopiert werden soll.

Das Empfangen von Nachrichten aus Abonnements ist identisch mit dem Empfangen von Nachrichten aus Warteschlangen. Jedes Abonnement hat eine Warteschlange für unzustellbare Nachrichten sowie die Möglichkeit, Nachrichten automatisch an eine andere Warteschlange oder andere Themen weiterzuleiten.

Über JMS-Themen können Clients dynamisch nicht dauerhafte und dauerhafte Abonnenten erstellen, die optional ein Filtern von Nachrichten mit Nachrichtenselektoren ermöglichen. Diese nicht freigegebenen Elemente werden von Service Bus nicht unterstützt. Die Syntax der SQL-Filterregeln für Service Bus ist der von JMS unterstützten Nachrichtenselektorsyntax ähnlich.

Die Herausgeberseite eines JMS-Themas ist mit Service Bus kompatibel, wie in diesem Beispiel gezeigt, dynamische Abonnenten sind dies aber nicht. Die folgenden topologiebezogenen JMS-APIs werden mit Service Bus nicht unterstützt.

Nicht unterstützte Funktionen und Einschränkungen

Bei der Verwendung von JMS über AMQP 1.0 mit Service Bus gelten die folgenden Einschränkungen:

  • Pro Sitzung ist nur ein MessageProducer- oder MessageConsumer-Objekt erlaubt. Falls Sie mehrere MessageProducer- oder MessageConsumer-Objekte in einer Anwendung benötigen, müssen Sie für diese jeweils eine eigene Sitzung erstellen.
  • Flüchtige Themenabonnements werden momentan nicht unterstützt.
  • MessageSelector-Objekte werden derzeit nicht unterstützt.
  • Verteilte Transaktionen werden nicht unterstützt, aber Sitzungen mit Transaktionen werden unterstützt.

Service Bus trennt die Steuerungsebene von der Datenebene und unterstützt daher viele der dynamischen Topologiefunktionen von JMS nicht.

Nicht unterstützte Methode Ersetzen durch
createDurableSubscriber Erstellen eines Themenabonnement, das den Nachrichtenselektor portiert.
createDurableConsumer Erstellen eines Themenabonnement, das den Nachrichtenselektor portiert.
createSharedConsumer Service Bus-Themen können immer freigegeben werden. Weitere Informationen finden Sie im Abschnitt „JMS-Themen im Vergleich zu Service Bus-Themen“.
createSharedDurableConsumer Service Bus-Themen können immer freigegeben werden. Weitere Informationen finden Sie im Abschnitt „JMS-Themen im Vergleich zu Service Bus-Themen“.
createTemporaryTopic Erstellen eines Themas über Verwaltungs-API, Tools oder das Portal, wobei AutoDeleteOnIdle auf einen Ablaufzeitraum festgelegt ist.
createTopic Erstellen eines Themas über Verwaltungs-API, Tools oder das Portal.
unsubscribe Löschen des Themas über Verwaltungs-API, Tools oder das Portal.
createBrowser Nicht unterstützt. Verwenden Sie die Peek()-Funktion der Service Bus-API.
createQueue Erstellen einer Warteschlange über Verwaltungs-API, Tools oder das Portal.
createTemporaryQueue Erstellen einer Warteschlange über Verwaltungs-API, Tools oder das Portal, wobei AutoDeleteOnIdle auf einen Ablaufzeitraum festgelegt ist.
receiveNoWait Verwenden Sie die vom Service Bus-SDK bereitgestellte receive()-Methode, und legen Sie das Timeout auf einen sehr niedrigen Wert oder auf null fest.

Zusammenfassung

In diesem Artikel wurde gezeigt, wie die Brokermessagingfeatures von Service Bus wie Warteschlangen und das Veröffentlichen oder Abonnieren von Themen in Java mit der verbreiteten JMS-API und AMQP 1.0 verwendet werden.

Sie können Service Bus-AMQP 1.0 auch in anderen Sprachen verwenden, z. B. .NET, C, Python und PHP. Komponenten, die mit diesen unterschiedlichen Sprachen geschrieben wurden, können mit der AMQP 1.0-Unterstützung in Service Bus Nachrichten zuverlässig und bei voller Vertraulichkeit austauschen.

Nächste Schritte