將訊息傳送至 Azure 服務匯流排 佇列並從中接收訊息 (Java)

在本快速入門中,您會建立 Java 應用程式,以將訊息傳送至 Azure 服務匯流排 佇列並接收訊息。

注意

本快速入門提供將訊息傳送至 服務匯流排 佇列並接收訊息的簡單案例逐步指示。 您可以在 GitHub 上的 Azure SDK for Java 存放庫中找到適用於 Azure 服務匯流排 的預先建置 Java 範例。

提示

如果您要在 Spring 應用程式中使用 Azure 服務匯流排 資源,我們建議您將 Spring Cloud Azure 視為替代方案。 Spring Cloud Azure 是開放原始碼專案,可提供與 Azure 服務的無縫 Spring 整合。 若要深入瞭解 Spring Cloud Azure,以及查看使用 服務匯流排 的範例,請參閱使用 Azure 服務匯流排 的 Spring Cloud Stream。

必要條件

在 Azure 入口網站 中建立命名空間

若要開始在 Azure 中使用 服務匯流排 傳訊實體,您必須先建立命名空間,其名稱在 Azure 中是唯一的。 命名空間會為應用程式內的 服務匯流排 資源(佇列、主題等)提供範圍容器。

若要建立命名空間:

  1. 登入 Azure 入口網站

  2. 流覽至 [ 所有服務] 頁面

  3. 在左側導覽列上,從類別清單中選取 [整合],將滑鼠停留在 服務匯流排 上方,然後在 服務匯流排 磚上選取+按鈕。

    Image showing selection of Create a resource, Integration, and then Service Bus in the menu.

  4. 在 [建立命名空間] 頁面的 [基本] 標籤中,遵循下列步驟:

    1. 針對 [ 訂用帳戶],選擇要在其中建立命名空間的 Azure 訂用帳戶。

    2. 針對 [ 資源群組],選擇命名空間將存留在其中的現有資源群組,或建立新的資源群組。

    3. 輸入命名空間的名稱。 命名空間名稱應遵守下列命名慣例:

      • 名稱在整個 Azure 中必須是唯一的。 系統會立即檢查此名稱是否可用。
      • 名稱長度至少為 6,最多 50 個字元。
      • 名稱只能包含字母、數位、連字元 “-”。
      • 名稱必須以字母開頭,並以字母或數字結尾。
      • 名稱結尾不是 “-sb” 或 “-mgmt”。
    4. 針對 [ 位置],選擇您的命名空間應該裝載所在的區域。

    5. 針對 [定價層],選取命名空間的定價層 [基本]、[標準] 或 [進階版]。 在本快速入門中,選取 [ 標準]。

      重要

      如果您想要使用主題和訂帳戶,請選擇 [標準] 或 [進階版]。 基本定價層不支援主題/訂用帳戶。

      如果您選取 進階版 定價層,請指定傳訊單位數目。 進階層會在 CPU 和記憶體層級提供資源隔離,讓每個工作負載以隔離方式執行。 此資源容器稱為傳訊單位。 進階命名空間至少有一個傳訊單位。 您可以為每個 服務匯流排 進階版 命名空間選取 1、2、4、8 或 16 個傳訊單位。 如需詳細資訊,請參閱 服務匯流排 進階版 傳訊

    6. 選取頁面底部的 [檢閱 + 建立] 。

      Image showing the Create a namespace page

    7. 在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立]

  5. 部署資源成功后,請選取 部署頁面上的 [移至資源 ]。

    Image showing the deployment succeeded page with the Go to resource link.

  6. 您會看到服務總線命名空間的首頁。

    Image showing the home page of the Service Bus namespace created.

在 Azure 入口網站 中建立佇列

  1. [服務匯流排 命名空間] 頁面上,選取左側導覽功能表中的 [佇列]。

  2. 在 [ 佇列] 頁面上,選取 工具列上的 [+ 佇列 ]。

  3. 輸入佇列的名稱,並將其他值保留預設值。

  4. 現在,選取 [ 建立]。

    Image showing creation of a queue in the portal

向 Azure 驗證應用程式

本快速入門說明連線到 Azure 服務匯流排的兩種方式:密碼和 連接字串

第一個選項示範如何使用 Microsoft Entra ID 和角色型存取控制 (RBAC) 中的安全性主體來連線到 服務匯流排 命名空間。 您不需要擔心在程式代碼或組態檔或安全記憶體中,例如 Azure 金鑰保存庫,將硬式編碼 連接字串。

第二個選項示範如何使用 連接字串 來連線到 服務匯流排 命名空間。 如果您不熟悉 Azure,您可能會發現 連接字串 選項更容易遵循。 我們建議在真實世界應用程式和生產環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概 觀頁面上深入瞭解無密碼驗證。

將角色指派給 Microsoft Entra 使用者

在本機開發時,請確定連線到 Azure 服務匯流排 的用戶帳戶具有正確的許可權。 您需要 Azure 服務匯流排 數據擁有者角色,才能傳送和接收訊息。 若要指派此角色,您需要使用者存取 管理員 istrator 角色,或包含Microsoft.Authorization/roleAssignments/write動作的另一個角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 在範圍概觀頁面上深入瞭解角色指派的可用範圍。

下列範例會將Azure Service Bus Data Owner角色指派給您的用戶帳戶,以提供 Azure 服務匯流排 資源的完整存取權。 在實際案例中,遵循 最低許可權 原則,只為使用者提供更安全的生產環境所需的最低許可權。

適用於 Azure 服務匯流排的 Azure 內建角色

針對 Azure 服務匯流排,透過 Azure 入口網站 和 Azure 資源管理 API 來管理命名空間和所有相關資源,已使用 Azure RBAC 模型加以保護。 Azure 提供下列 Azure 內建角色,以授權存取 服務匯流排 命名空間:

如果您想要建立自定義角色,請參閱 服務匯流排 作業所需的許可權。

將 Microsoft Entra 使用者新增至 Azure 服務匯流排 擁有者角色

將 Microsoft Entra 使用者名稱新增至 服務匯流排 命名空間層級 Azure 服務匯流排 數據擁有者角色。 它可讓在使用者帳戶內容中執行的應用程式將訊息傳送至佇列或主題,以及接收來自佇列或主題訂用帳戶的訊息。

重要

在大部分情況下,在 Azure 中傳播角色指派需要一兩分鐘的時間。 在罕見的情況下,最多可能需要 八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。

  1. 如果您沒有在 Azure 入口網站 中開啟 [服務匯流排 命名空間] 頁面,請使用主要搜尋列或左側導覽來尋找您的 服務匯流排 命名空間。

  2. 在 [概觀] 頁面上,從左側功能表中選取 [訪問控制][IAM ]。

  3. 在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。

  4. 從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]

    A screenshot showing how to assign a role.

  5. 使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋 Azure Service Bus Data Owner 並選取相符的結果。 接著,選擇 [下一步]

  6. 在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]

  7. 在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]

  8. 選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。

傳送訊息至佇列

在本節中,您將建立 Java 控制台專案,並新增程式代碼以將訊息傳送至您稍早建立的佇列。

建立Java控制台專案

使用 Eclipse 或您選擇的工具建立 Java 專案。

設定您的應用程式以使用 服務匯流排

新增 Azure Core 和 Azure 服務匯流排 連結庫的參考。

如果您使用 Eclipse 並建立 Java 控制台應用程式,請將 Java 項目轉換成 Maven:以滑鼠右鍵按兩下 [套件總管] 視窗中的項目,選取 [設定 -> 轉換為 Maven 專案]。 然後,將相依性新增至這兩個連結庫,如下列範例所示。

更新 檔案pom.xml,以將相依性新增至 Azure 服務匯流排 和 Azure 身分識別套件。

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

新增程式代碼以將訊息傳送至佇列

  1. 在 Java 檔案的主題中新增下列 import 語句。

    import com.azure.messaging.servicebus.*;
    import com.azure.identity.*;
    
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. 在類別中,定義要保存 連接字串 和佇列名稱的變數。

    static String queueName = "<QUEUE NAME>";
    

    重要

    將取代 <QUEUE NAME> 為佇列的名稱。

  3. 在類別中新增名為 sendMessage 的方法,以將一則訊息傳送至佇列。

    重要

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessage()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // send one message to the queue
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the queue: " + queueName);
    }
    
    
  4. 在類別中新增名為 createMessages 的方法,以建立訊息清單。 一般而言,您會從應用程式的不同部分取得這些訊息。 在這裡,我們會建立範例訊息的清單。

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. 新增名為 sendMessageBatch 方法的方法,以將訊息傳送至您所建立的佇列。 這個方法會 ServiceBusSenderClient 建立佇列的 、叫 createMessages 用 方法以取得訊息清單、準備一或多個批次,以及將批次傳送至佇列。

    重要

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessageBatch()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
        }
    
        //close the client
        senderClient.close();
    }
    

從佇列接收訊息

在本節中,您會新增程序代碼以從佇列擷取訊息。

  1. 新增名為 receiveMessages 的方法,以接收來自佇列的訊息。 這個方法會 ServiceBusProcessorClient 指定處理訊息的處理程式,並指定另一個處理程式來處理錯誤,以建立佇列的 。 然後,它會啟動處理器、等候幾秒鐘、列印收到的訊息,然後停止並關閉處理器。

    重要

    • 以您的服務匯流排命名空間名稱取代 NAMESPACENAME
    • 將中的 QueueTest::processMessage 取代QueueTest為您的類別名稱。
    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .processor()
                .queueName(queueName)
                .processMessage(context -> processMessage(context))
                .processError(context -> processError(context))
                .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();
    }
    
  2. processMessage新增方法來處理從 服務匯流排 訂閱收到的訊息。

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }
    
  3. processError新增方法來處理錯誤訊息。

    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }
    
  4. main更新 方法以叫sendMessage用、 sendMessageBatchreceiveMessages 方法,並擲回 InterruptedException

    public static void main(String[] args) throws InterruptedException {
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }
    

執行應用程式

  1. 如果您使用 Eclipse,請以滑鼠右鍵按兩下項目,選取 [匯出],展開 [Java],選取 [可執行的 JAR 檔案],然後依照步驟建立可執行的 JAR 檔案。

  2. 如果您使用與新增至 Azure 服務匯流排 數據擁有者角色的用戶帳戶不同的用戶帳戶登入計算機,請遵循下列步驟。 否則,請略過此步驟,然後在下一個步驟中繼續執行 Jar 檔案。

    1. 在您的電腦上安裝 Azure CLI

    2. 執行下列 CLI 命令以登入 Azure。 使用您新增至 Azure 服務匯流排 數據擁有者角色的相同用戶帳戶。

      az login
      
  3. 使用下列命令執行 Jar 檔案。

    java -jar <JAR FILE NAME>
    
  4. 您會在主控台視窗中看到以下輸出。

    Sent a single message to the queue: myqueue
    Sent a batch of messages to the queue: myqueue
    Starting the processor
    Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
    Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
    Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
    Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
    Stopping and closing the processor
    

在 Azure 入口網站 中 服務匯流排 命名空間的 [概觀] 頁面上,您可以看到傳入傳出訊息計數。 等候一分鐘左右,然後重新整理頁面以查看最新的值。

Incoming and outgoing message count

選取此 [概觀] 頁面上的佇列,以流覽至 [服務匯流排 佇列] 頁面。 您也會在此頁面上看到 傳入傳出 訊息計數。 您也會看到其他資訊,例如 佇列的目前大小大小上限、 作用中訊息計數等等。

Queue details

後續步驟

請參閱下列檔和範例: