搭配使用 Java 訊息服務 1.1 與 Azure 服務匯流排標準和 AMQP 1.0

警告

本文說明對 Java 訊息服務 (JMS) 1.1 API 的有限支援,且僅適用於 Azure 服務匯流排標準層。

Java 訊息服務 2.0 API 的完整支援僅適用於 Azure 服務匯流排進階層。 建議您使用此服務層級。

本文說明如何使用常用的 JMS API 標準,從 Java 應用程式使用服務匯流排傳訊功能。 這些傳訊功能包括佇列和發佈或訂閱主題。 另有附屬文章說明如何使用 Azure 服務匯流排 .NET API 達到相同的效用。 您可以利用這兩篇文章,了解使用進階訊息佇列通訊協定 (AMQP) 1.0 的跨平台傳訊。

AMQP 1.0 是一個有效率且可靠的有線等級訊息通訊協定,可以用來建置強大的跨平台訊息應用程式。

在服務匯流排中支援 AMQP 1.0 代表您現在能夠從一組平台中使用有效率的二進位通訊協定,來運用佇列與發佈或訂閱代理傳訊功能。 您也可以建置由混用語言、架構及作業系統所建置的元件組成的應用程式。

開始使用服務匯流排

本文假設您已經有服務匯流排命名空間,其中包含名為 basicqueue 的佇列。 如果沒有,您可以使用 Azure 入口網站建立命名空間和佇列。 如需有關如何建立服務匯流排命名空間和佇列的相關詳細資訊,請參閱開始使用服務匯流排佇列

注意

分割的佇列和主題也支援 AMQP。 如需詳細資訊,請參閱分割傳訊實體服務匯流排分割佇列和主題的 AMQP 1.0 支援

下載 AMQP 1.0 JMS 用戶端程式庫

如需可從哪裡下載最新版 Apache Qpid JMS AMQP 1.0 用戶端程式庫的相關資訊,請參閱 Apache Qpid 下載網站

您使用服務匯流排建置和執行 JMS 應用程式時,必須從 Apache Qpid JMS AMQP 1.0 散發封裝將下列 JAR 檔新增至 Java CLASSPATH 環境變數:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[版本].jar

注意

JMS JAR 名稱和版本可能已變更。 如需詳細資訊,請參閱 Qpid JMS AMQP 1.0

編碼 Java 應用程式

Java 命名和目錄介面

JMS 使用 Java 命名及目錄介面 (JNDI) 建立邏輯名稱與實際名稱之間的區別。 使用 JNDI 可以解析兩種 JMS 物件:ConnectionFactoryDestination。 JNDI 使用提供者模型,您可以在其中插入不同的目錄服務處理名稱解析作業。 Apache Qpid JMS AMQP 1.0 程式庫提供使用下列格式的內容檔案設定的簡單內容檔案型 JNDI 提供者:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

設定 JNDI 內容及設定 ConnectionFactory 物件

參考的連接字串,是在 Azure 入口網站中可從 [主要連接字串] 底下的 [共用存取原則] 中取得的連接字串。

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

設定產生者和取用者目的地佇列

在 Qpid 屬性檔案 JNDI 提供者中用來定義目的地的項目使用下列格式。

若要建立產生者的目的地佇列:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

建立取用者的目的地佇列:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

撰寫 JMS 應用程式

搭配使用 JMS 與服務匯流排時,不需要特殊 API 或選項。 後續將說明一些限制。 和任何 JMS 應用程式一樣,首先需要設定 JNDI 環境,才能夠解析 ConnectionFactory 物件和目的地。

設定 JNDI InitialContext 物件

將設定資訊的雜湊表傳入 javax.naming.InitialContext 類別的建構函式,將設定 JNDI 環境。 雜湊表中的兩個所需項目是 Initial Context Factory 和 Provider URL 的類別名稱。 下列程式碼顯示如何使用名稱為 servicebus.properties 的內容檔案,設定 JNDI 環境使用 Qpid 內容檔案型 JNDI 提供者。

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

使用服務匯流排佇列的簡單 JMS 應用程式

下列範例程式會將 JMS TextMessages 傳送到 JNDI 邏輯名稱為 QUEUE 的服務匯流排佇列,並收到傳回的訊息。

您可以從 Azure 服務匯流排範例 JMS 佇列快速入門,對所有原始程式碼和設定資訊進行存取。

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

執行應用程式

傳遞來自 [共用存取原則] 的 [連接字串],以執行應用程式。 以下是執行應用程式時產生的輸出:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP 配置和服務匯流排作業對應

以下是 AMQP 配置轉譯為服務匯流排作業的方式:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS 主題與服務匯流排主題

透過 JMS API 使用服務匯流排主題和訂用帳戶,可提供基本的傳送和接收功能。 當您使用符合 JMS 規範的 API 從其他訊息代理程式移植應用程式時,這會是方便的選擇,即使服務匯流排主題與 JMS 主題不同而需要些許調整,仍是如此。

服務匯流排主題會將訊息路由傳送至透過 Azure 資源管理介面、Azure 命令列工具或 Azure 入口網站來管理的具名、共用的永久性訂用帳戶。 每個訂用帳戶最多可允許 2,000 個選取規則,每個規則可以有一個篩選條件,以及適用於 SQL 篩選條件的中繼資料轉換動作。 每個篩選條件比對都會選取要複製到訂用帳戶中的輸入訊息。

從訂用帳戶接收訊息和從佇列接收訊息是相同的。 每個訂用帳戶都有相關聯的無效信件佇列,且能夠自動將訊息轉送至另一個佇列或主題。

JMS 主題可讓用戶端動態建立非永久性和永久性訂閱者,且訂閱者可選擇性地允許使用訊息選取器來篩選訊息。 服務匯流排不支援這些非共用實體。 服務匯流排的 SQL 篩選規則語法類似於 JMS 所支援的訊息選取器語法。

JMS 主題發行者端與服務匯流排相容 (如此範例所示),但動態訂閱者則不相容。 服務匯流排不支援下列與拓撲相關的 JMS API。

不支援的功能和限制

透過 AMQP 1.0 將 JMS 與服務匯流排搭配使用,會有下列限制:

  • 對於每個工作階段僅允許一個 MessageProducerMessageConsumer 物件。 如果您需要在應用程式中建立多個 MessageProducerMessageConsumer 物件,請分別建立其專用的工作階段。
  • 目前不支援可變更的主題訂用帳戶。
  • 目前不支援 MessageSelector 物件。
  • 不支援分散式交易,但支援交易式工作階段。

服務匯流排會分割控制平面與資料平面,並因此不支援數個 JMS 的動態拓撲函式。

不支援的方法 Replace with
createDurableSubscriber 建立移植訊息選取器的主題訂用帳戶。
createDurableConsumer 建立移植訊息選取器的主題訂用帳戶。
createSharedConsumer 服務匯流排主題一律可共用。 請參閱「JMS 主題與服務匯流排主題」一節。
createSharedDurableConsumer 服務匯流排主題一律可共用。 請參閱「JMS 主題與服務匯流排主題」一節。
createTemporaryTopic 透過管理 API、工具或入口網站建立主題,並將 AutoDeleteOnIdle 設定為到期期間。
createTopic 透過管理 API、工具或入口網站建立主題。
取消訂閱 刪除主題管理 API、工具或入口網站。
createBrowser 不支援。 使用服務匯流排 API 的 Peek() 功能。
createQueue 透過管理 API、工具或入口網站建立佇列。
createTemporaryQueue 透過管理 API、工具或入口網站建立佇列,並將 AutoDeleteOnIdle 設定為到期期間。
receiveNoWait 使用服務匯流排 SDK 所提供的 receive() 方法,並指定非常低或零的逾時。

摘要

本文說明說明如何以常用的 JMS API 和 AMQP 1.0 從 Java 使用服務匯流排代理訊息功能,例如佇列和發佈或訂閱主題。

您也可以使用以其他語言撰寫的服務匯流排 AMQP 1.0,例如 .NET、C、Python 和 PHP。 使用這些不同的語言撰寫的元件可使用服務匯流排中的 AMQP 1.0 支援確實完整交換訊息。

下一步