共用方式為


在 Azure 服務匯流排佇列 (Java) 中傳送和接收訊息

在本快速入門中,您將建立 Java 應用程式以將訊息傳送至 Azure 服務匯流排佇列,並從中接收訊息。

注意

本快速入門提供將訊息傳送至服務匯流排佇列,並加以接收的簡單案例逐步指示。 在 GitHub 上的 Azure SDK for Java 存放庫中,您可找到預先建置的 Azure 服務匯流排 Java 樣本。

提示

如果您在 Spring 應用程式中使用 Azure 服務匯流排資源,建議您考慮將 Spring Cloud Azure (機器翻譯) 作為替代方案。 Spring Cloud Azure 是開放原始碼專案,可提供與 Azure 服務的 Spring 無縫整合。 若要深入了解 Spring Cloud Azure,以及查看使用服務匯流排的範例,請參閱使用 Azure 服務匯流排搭配 Spring Cloud Stream (機器翻譯)。

必要條件

在 Azure 入口網站中建立命名空間

若要開始在 Azure 中使用服務匯流排傳訊實體,您必須先使用 Azure 中的唯一名稱建立命名空間。 命名空間為應用程式內的服務匯流排資源 (佇列、主題等) 提供範圍容器。

若要建立命名空間:

  1. 登入 Azure 入口網站

  2. 瀏覽至 [所有服務] 頁面

  3. 在左側導覽列上,從類別清單中選取 [整合],將滑鼠停留在 [服務匯流排] 上方,然後選取 [服務匯流排] 圖格上的 + 按鈕。

    此圖顯示在功能表中依序選取 [建立資源]、[整合] 及 [服務匯流排]。

  4. 在 [建立命名空間] 頁面的 [基本] 標籤中,遵循下列步驟:

    1. 針對 [訂用帳戶],選擇要在其中建立命名空間的 Azure 訂用帳戶。

    2. 針對 [資源群組],選擇將存留命名空間的現有資源群組,或是建立新的資源群組。

    3. 輸入命名空間的名稱。 命名空間名稱應遵循下列命名慣例:

      • 名稱在整個 Azure 中必須是唯一的。 系統會立即檢查此名稱是否可用。
      • 名稱長度至少 6 個字元,最多 50 個字元。
      • 名稱僅可包含字母、數字和連字號 "-"。
      • 名稱開頭必須為字母,且結尾必須為字母或數字。
      • 名稱結尾不得為「-sb」或「-mgmt」。
    4. 針對 [位置],選擇應裝載命名空間的區域。

    5. 針對定價層,選取命名空間的定價層 (基本、標準或進階)。 針對本快速入門,選取 [標準]

      重要

      如果您想要使用主題和訂用帳戶,請選擇 [標準] 或 [進階]。 基本定價層不支援主題/訂用帳戶。

      若已選取 [進階] 定價層,請指定傳訊單位數目。 進階層可讓您的資源在 CPU 和記憶體層級上獲得隔離,讓每個工作負載能夠獨立執行。 此資源容器稱為傳訊單位。 進階命名空間都至少有一個傳訊單位。 您可以為每個服務匯流排的進階命名空間選取 1、2、4、8 或 16 個傳訊單位。 如需詳細資訊,請參閱服務匯流排進階傳訊

    6. 選取頁面底部的 [檢閱 + 建立] 。

      此圖顯示 [建立命名空間] 頁面

    7. 在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立]

  5. 一旦部署資源成功,請在部署頁面上選取 [移至資源]

    此圖顯示部署成功頁面及 [前往資源] 連結。

  6. 您會看到服務匯流排命名空間的首頁。

    此圖顯示所建立服務匯流排命名空間的首頁。

在 Azure 入口網站中建立佇列

  1. 在 [服務匯流排命名空間] 頁面的左側導覽功能表中選取 [佇列]

  2. 在 [佇列] 頁面上,選取工具列上的 [+ 佇列]

  3. 輸入佇列的 [名稱],並且讓其他值保留其預設值。

  4. 現在,選取 [建立]

    此圖顯示在入口網站中建立佇列

向 Azure 驗證應用程式

本快速入門顯示兩種連線到 Azure 服務匯流排的方式:無密碼連接字串

第一個選項顯示如何使用 Microsoft Entra ID 中的安全性主體和角色型存取控制 (RBAC),來連線到服務匯流排命名空間。 您不需要擔心在程式碼或設定檔或 Azure Key Vault 等安全儲存體中,有硬式編碼連接字串。

第二個選項顯示如何使用連接字串來連線到服務匯流排命名空間。 如果您不熟悉 Azure,則可能會發現連接字串選項更容易遵循。 建議在真實世界應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概觀頁面上,深入了解無密碼驗證。

將角色指派給 Microsoft Entra 使用者

在本機開發時,請確定連線到 Azure 服務匯流排的使用者帳戶具有正確的權限。 您需要 Azure 服務匯流排資料擁有者角色,才能傳送和接收訊息。 若要將此角色指派給您自己,您需要使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write 動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上,深入了解角色指派的可用範圍。

下列範例會將 Azure Service Bus Data Owner 角色指派給您的使用者帳戶,該角色提供對 Azure 服務匯流排資源的完整存取權。 在實際案例中,遵循最低權限原則,只為使用者提供更安全實際執行環境所需的最低權限。

Azure 服務匯流排的 Azure 內建角色

對於 Azure 服務匯流排來說,透過 Azure 入口網站和 Azure 資源管理 API 來的管理命名空間和所有相關資源的作業,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取服務匯流排命名空間:

如果您想要建立自訂角色,請參閱服務匯流排作業所需的權限

將 Microsoft Entra 使用者新增至 Azure 服務匯流排擁有者角色

將您的 Microsoft Entra 使用者名稱新增至服務匯流排命名空間層級的 Azure 服務匯流排資料擁有者角色。 其可讓在使用者帳戶內容中執行的應用程式將訊息傳送至佇列或主題,以及從佇列或主題的訂用帳戶接收訊息。

重要

在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘。 在少數情況下,可能需要長達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。

  1. 如果沒有在 Azure 入口網站中開啟 [服務匯流排命名空間] 頁面,請使用主要搜尋列或左側導覽找出您的服務匯流排命名空間。

  2. 在概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]

  3. 在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。

  4. 從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]

    顯示如何指派角色的螢幕擷取畫面。

  5. 使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋 Azure Service Bus Data Owner 並選取相符的結果。 接著,選擇 [下一步]

  6. 在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]

  7. 在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]

  8. 選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。

傳送訊息至佇列

在本節中,您將建立 Java 主控台專案,並新增程式碼以將訊息傳送至您稍早建立的佇列。

建立 Java 主控台專案

使用 Eclipse 或您選擇的工具建立 Java 專案。

設定應用程式以使用服務匯流排

新增 Azure 核心及 Azure 服務匯流排程式庫的參考。

若您使用 Eclipse 且已建立 JAVA 主控台應用程式,請將 JAVA 專案轉換成 Maven:以滑鼠右鍵按一下 [封裝總管] 視窗中的專案,選取 [設定] -> [轉換成 Maven 專案]。 接著如下列範例所示,將相依性新增至這兩個程式庫。

更新 pom.xml 檔案,將相依性新增至 Azure 服務匯流排和 Azure 身分識別套件。

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

新增程式碼以將訊息傳送到佇列

  1. 在 Java 檔案頂端新增下列 import 陳述式。

    import com.azure.messaging.servicebus.*;
    import com.azure.identity.*;
    
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. 在類別中,定義變數來保存連接字串和佇列名稱。

    static String queueName = "<QUEUE NAME>";
    

    重要

    <QUEUE NAME> 取代為佇列名稱。

  3. 在類別中新增名為 sendMessage 的方法,以將一則訊息傳送至佇列。

    重要

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessage()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // send one message to the queue
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the queue: " + queueName);
    }
    
    
  4. 在類別中新增名為 createMessages 的方法,以建立訊息清單。 一般來說,您會從應用程式的不同部分取得這些訊息。 在這裡,我們會建立範例訊息清單。

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. 新增名為 sendMessageBatch 方法的方法,將訊息傳送至您所建立的佇列。 這個方法會建立佇列的 ServiceBusSenderClient,叫用 createMessages 方法來取得訊息清單、準備一或多個批次,並將批次傳送至佇列。

    重要

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessageBatch()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
        }
    
        //close the client
        senderClient.close();
    }
    

從佇列接收訊息

在本節中,您將新增程式碼以從佇列中取出訊息。

  1. 新增名為 receiveMessages 的方法以從佇列接收訊息。 這個方法會藉由指定處理訊息的處理常式,以及另一項處理錯誤的方式,來建立佇列的 ServiceBusProcessorClient。 然後方法會啟動處理器、等待數秒並列印收到的訊息,然後停止和關閉處理器。

    重要

    • 以您的服務匯流排命名空間名稱取代 NAMESPACENAME
    • 在程式碼中,以類別名稱取代 QueueTest::processMessage 中的 QueueTest
    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .processor()
                .queueName(queueName)
                .processMessage(context -> processMessage(context))
                .processError(context -> processError(context))
                .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();
    }
    
  2. 新增 processMessage 方法,以處理從服務匯流排訂用帳戶收到的訊息。

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }
    
  3. 新增 processError 方法以處理錯誤訊息。

    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }
    
  4. 更新 main 方法以叫用 sendMessagesendMessageBatch,並更新 receiveMessages 方法以擲回 InterruptedException

    public static void main(String[] args) throws InterruptedException {
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }
    

執行應用程式

  1. 如果您使用 Eclipse,請以滑鼠右鍵按一下專案,選取 [匯出],展開 [JAVA],選取 [可執行的 JAR 檔案],然後依照步驟建立可執行的 JAR 檔案。

  2. 如果您使用與新增至 Azure 服務匯流排資料擁有者角色不同的使用者帳戶登入機器,請遵循下列步驟。 否則請略過此步驟,然後在下一個步驟中繼續執行 Jar 檔案。

    1. 在機器上安裝 Azure CLI (機器翻譯)。

    2. 執行下列 CLI 命令以登入 Azure。 使用您新增至 Azure 服務匯流排資料擁有者角色的相同使用者帳戶。

      az login
      
  3. 使用下列命令執行 Jar 檔案。

    java -jar <JAR FILE NAME>
    
  4. 您會在主控台視窗中看到以下輸出。

    Sent a single message to the queue: myqueue
    Sent a batch of messages to the queue: myqueue
    Starting the processor
    Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
    Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
    Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
    Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
    Stopping and closing the processor
    

在 Azure 入口網站中服務匯流排命名空間的概觀頁面上,您可以看到傳入傳出訊息計數。 請等候一分鐘左右,然後重新整理頁面以查看最新的值。

內送和外寄郵件計數

選取此概觀頁面上的佇列,瀏覽至服務匯流排佇列頁面。 您也會在此頁面上看到傳入傳出訊息計數。 您也會看到其他資訊,例如佇列的目前大小大小上限作用中訊息計數等等。

佇列詳細資料

後續步驟

請參閱下列文件和範例: