Použití služby Java Message Service 1.1 se standardem Azure Service Bus a AMQP 1.0

Upozorňující

Tento článek se zabývá omezenou podporou rozhraní API JMS (Java Message Service) 1.1 a existuje pouze pro úroveň Standard služby Azure Service Bus.

Úplná podpora rozhraní API služby Java Message Service 2.0 je dostupná pouze na úrovni Premium služby Azure Service Bus. Doporučujeme použít tuto úroveň.

Tento článek vysvětluje, jak používat funkce zasílání zpráv service Bus z aplikací v Javě pomocí oblíbeného standardu rozhraní JMS API. Mezi tyto funkce zasílání zpráv patří fronty a publikování nebo přihlášení k odběru témat. Doprovodný článek vysvětluje, jak to udělat stejně pomocí rozhraní .NET API služby Azure Service Bus. Tyto dva články můžete společně použít k získání informací o zasílání zpráv mezi platformami pomocí protokolu AMQP (Advanced Message Queuing Protocol) 1.0.

AMQP 1.0 je efektivní, spolehlivý protokol zasílání zpráv na úrovni drátu, který můžete použít k vytváření robustních aplikací pro zasílání zpráv napříč platformami.

Podpora AMQP 1.0 ve službě Service Bus znamená, že pomocí efektivního binárního protokolu můžete používat funkce řazení do front a publikování nebo odběru zprostředkovaných zpráv z celé řady platforem. Můžete také vytvářet aplikace složené z komponent vytvořených pomocí kombinace jazyků, architektur a operačních systémů.

Začínáme se službou Service Bus

Tento článek předpokládá, že již máte obor názvů služby Service Bus, který obsahuje frontu s názvem basicqueue. Pokud ne, můžete vytvořit obor názvů a frontu pomocí webu Azure Portal. Další informace o tom, jak vytvářet obory názvů a fronty služby Service Bus, najdete v tématu Začínáme s frontami služby Service Bus.

Poznámka:

Dělené fronty a témata také podporují AMQP. Další informace najdete v tématu Dělení na entity zasílání zpráv a podpora AMQP 1.0 pro dělené fronty a témata služby Service Bus.

Stažení klientské knihovny AMQP 1.0 JMS

Informace o tom, kde stáhnout nejnovější verzi klientské knihovny Apache Qpid JMS AMQP 1.0, najdete na webu pro stažení Apache Qpid.

Při sestavování a spouštění aplikací JMS pomocí service Bus je nutné přidat následující soubory JAR z distribučního archivu Apache Qpid JMS AMQP 1.0 do proměnné prostředí Java CLASSPATH:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Poznámka:

Můžou se změnit názvy a verze JMS JAR. Další informace najdete v tématu Qpid JMS AMQP 1.0.

Kódování aplikací v Javě

Pojmenování Java a rozhraní adresáře

JMS používá rozhraní JNDI (Java Naming and Directory Interface) k vytvoření oddělení mezi logickými názvy a fyzickými názvy. Dva typy objektů JMS se přeloží pomocí JNDI: Připojení ionFactory a Destination. JNDI používá model poskytovatele, do kterého můžete připojit různé adresářové služby ke zpracování povinností překladu ip adres. Knihovna Apache Qpid JMS AMQP 1.0 se dodává s jednoduchým zprostředkovatelem JNDI založeným na souborech vlastností, který je nakonfigurovaný pomocí souboru vlastností následujícího formátu:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Nastavení kontextu JNDI a konfigurace objektu Připojení ionFactory

Odkazovaný připojovací řetězec je ten, který je k dispozici v zásadách sdíleného přístupu na webu Azure Portal v části Primární řetězec Připojení ionu.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Konfigurace cílových front producenta a příjemce

Položka použitá k definování cíle v souboru vlastností Qpid je zprostředkovatel JNDI v následujícím formátu.

Vytvoření cílové fronty pro producenta:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Vytvoření cílové fronty pro příjemce:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Napsání aplikace JMS

Pokud používáte JMS se službou Service Bus, nevyžadují se žádná speciální rozhraní API ani možnosti. Existuje několik omezení, která budou probíraná později. Stejně jako u jakékoli aplikace JMS je potřeba nejprve nakonfigurovat prostředí JNDI, aby bylo možné přeložit Připojení ionFactory objekt a cíle.

Konfigurace objektu JNDI InitialContext

Prostředí JNDI je nakonfigurováno předáním hash tabulky konfiguračních informací do konstruktoru javax.naming.InitialContext třídy. Dva požadované elementy v tabulce hash jsou název třídy Objekt pro vytváření počátečního kontextu a adresy URL zprostředkovatele. Následující kód ukazuje, jak nakonfigurovat prostředí JNDI tak, aby používalo zprostředkovatele JNDI vlastností Qpid se souborem vlastností s názvem servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Jednoduchá aplikace JMS, která používá frontu služby Service Bus

Následující ukázkový program odešle textové zprávy JMS do fronty Service Bus s logickým názvem FRONTY JNDI a přijme zprávy zpět.

Přístup ke všem zdrojovým kódům a konfiguračním informacím najdete v rychlém startu fronty JMS s ukázkami služby Azure Service Bus.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Spuštění aplikace

Předejte řetězec Připojení ion ze zásad sdíleného přístupu ke spuštění aplikace. Následující výstup je ve formuláři, na kterém je aplikace spuštěná:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP disposition and Service Bus operation mapping

Tady je postup, jak se dispozice AMQP překládá na operaci Service Bus:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Témata JMS vs. témata služby Service Bus

Použití témat a odběrů služby Service Bus prostřednictvím rozhraní JMS API poskytuje základní možnosti odesílání a příjmu. Je to vhodná volba, když portujete aplikace z jiných zprostředkovatelů zpráv pomocí rozhraní API kompatibilních s JMS, i když se témata služby Service Bus liší od témat JMS a vyžadují několik úprav.

Témata služby Service Bus směrují zprávy do pojmenovaných, sdílených a trvalých předplatných spravovaných prostřednictvím rozhraní správy prostředků Azure, nástrojů příkazového řádku Azure nebo webu Azure Portal. Každé předplatné umožňuje až 2 000 pravidel výběru, z nichž každé může mít podmínku filtru a pro filtry SQL také akci transformace metadat. Každá podmínka filtru odpovídá vstupní zprávě, která se má zkopírovat do odběru.

Příjem zpráv z odběrů je stejný jako příjem zpráv z front. Každé předplatné má přidruženou frontu nedoručených zpráv a možnost automatického přeposílání zpráv do jiné fronty nebo témat.

Témata JMS umožňují klientům dynamicky vytvářet nedurovatelné a odolné odběratele, kteří volitelně umožňují filtrování zpráv pomocí selektorů zpráv. Service Bus tyto nesdílené entity nepodporuje. Syntaxe pravidla filtru SQL pro Service Bus je podobná syntaxi selektoru zpráv podporovanou JMS.

Stránka vydavatele tématu JMS je kompatibilní se službou Service Bus, jak je znázorněno v této ukázce, ale dynamické předplatitele nejsou. Service Bus nepodporuje následující rozhraní JMS API související s topologií.

Nepodporované funkce a omezení

Při použití JMS přes AMQP 1.0 se službou Service Bus existují následující omezení:

  • Pro každou relaci je povolen pouze jeden objekt MessageProducer nebo MessageConsumer . Pokud potřebujete v aplikaci vytvořit více objektů MessageProducer nebo MessageConsumer , vytvořte pro každý z nich vyhrazenou relaci.
  • Nestálá předplatná témat se v současné době nepodporují.
  • Objekty MessageSelector se v současné době nepodporují.
  • Distribuované transakce se nepodporují, ale podporují se transakční relace.

Service Bus rozdělí řídicí rovinu z roviny dat, takže nepodporuje několik funkcí dynamické topologie JMS.

Nepodporovaná metoda Replace with
createDurableSubscriber Vytvořte odběr tématu, který portuje selektor zpráv.
createDurableConsumer Vytvořte odběr tématu, který portuje selektor zpráv.
createSharedConsumer Témata služby Service Bus jsou vždy sdíletelná. Viz část Témata JMS vs. témata služby Service Bus.
createSharedDurableConsumer Témata služby Service Bus jsou vždy sdíletelná. Viz část Témata JMS vs. témata služby Service Bus.
createTemporaryTopic Vytvořte téma prostřednictvím rozhraní API pro správu, nástrojů nebo portálu s autoDeleteOnIdle nastaveným na dobu vypršení platnosti.
createTopic Vytvořte téma prostřednictvím rozhraní API pro správu, nástrojů nebo portálu.
Odhlásit Odstraňte rozhraní API, nástroje nebo portál pro správu témat.
createBrowser Nepodporuje se. Použijte funkci Náhled() rozhraní API služby Service Bus.
createQueue Vytvořte frontu prostřednictvím rozhraní API pro správu, nástrojů nebo portálu.
createTemporaryQueue Vytvořte frontu prostřednictvím rozhraní API pro správu, nástrojů nebo portálu s automatickým nastavením na dobu vypršení platnosti.
receiveNoWait Použijte metodu receive(), kterou poskytuje sada SDK služby Service Bus, a zadejte velmi nízký nebo nulový časový limit.

Shrnutí

Tento článek vám ukázal, jak používat funkce zprostředkovaného zasílání zpráv služby Service Bus, jako jsou fronty a publikování nebo přihlášení k odběru témat, z Javy pomocí oblíbeného rozhraní JMS API a AMQP 1.0.

Service Bus AMQP 1.0 můžete použít také z jiných jazyků, jako jsou .NET, C, Python a PHP. Komponenty vytvořené pomocí těchto různých jazyků můžou spolehlivě a plně věrně vyměňovat zprávy pomocí podpory AMQP 1.0 ve službě Service Bus.

Další kroky