共用方式為


在 Azure 服務匯流排佇列 (Java) 中傳送和接收訊息

本快速入門提供將訊息傳送至服務匯流排佇列,並加以接收的簡單案例逐步指示。 您可以建立 Java 應用程式,以將訊息傳送至 Azure 服務總線佇列,並從中接收訊息。 您可以在 GitHub 上的 Azure SDK for Java 存放庫中,找到適用於 Azure 服務總線的預先建置 Java 範例。

秘訣

如果您在 Spring 應用程式中使用 Azure 服務總線資源,建議您考慮 Spring Cloud Azure。 Spring Cloud Azure 是開放原始碼專案,可將 Spring 和 Azure 服務無縫整合。 若要深入了解 Spring Cloud Azure,以及查看使用服務匯流排的範例,請參閱使用 Azure 服務匯流排搭配 Spring Cloud Stream (機器翻譯)。

先決條件

在 Azure 入口網站中建立命名空間

若要開始使用 Azure 中的服務總線傳訊實體,請建立命名空間,其名稱在 Azure 中是唯一的。 命名空間會為應用程式中的服務總線資源提供範圍容器,例如佇列和主題。

若要建立命名空間:

  1. 登入 Azure 入口網站

  2. 從左上方選取展開選單,然後導航至所有服務頁面

  3. 在左側導覽列上,選取 [整合]。

  4. 向下卷動至 [傳訊服務>服務總線 ],然後選取 [ 建立]。

    顯示選取 [建立資源]、[整合] 和功能表中 [服務總線] 的螢幕快照。

  5. 在 [建立命名空間] 頁面的 [基本] 索引標籤中,遵循下列步驟:

    1. 針對 [訂用帳戶],選擇要在其中建立命名空間的 Azure 訂用帳戶。

    2. 針對資源群組,請選擇現有資源群組或建立新的資源群組。

    3. 輸入符合下列命名慣例的 命名空間名稱

      • 此名稱在整個 Azure 中必須是唯一的。 系統會立即檢查此名稱是否可用。
      • 名稱長度至少 6 個字元,最多 50 個字元。
      • 名稱僅可包含字母、數字和連字號 -
      • 名稱開頭必須為字母,且結尾必須為字母或數字。
      • 名稱結尾不得為 -sb-mgmt
    4. 針對 位置,選擇用於部署命名空間的區域。

    5. 針對定價層,選取命名空間的定價層 (基本、標準或進階)。 針對本快速入門,選取 [標準]

      如果您選取 Premium 層,您可以啟用命名空間的 異地複寫。 異地復寫功能可確保命名空間的元數據和數據會持續從主要區域複寫到一或多個次要區域。

      重要事項

      如果您想要使用主題和訂用帳戶,請選擇 [標準] 或 [進階]。 基本定價層不支援主題和訂用帳戶。

      若已選取 [進階] 定價層,請指定傳訊單位數目。 進階層可讓您的資源在 CPU 和記憶體層級上獲得隔離,讓每個工作負載能夠獨立執行。 此資源容器稱為「傳訊單位」 。 進階命名空間都至少有一個傳訊單位。 您可以為每個服務匯流排的進階命名空間選取 1、2、4、8 或 16 個傳訊單位。 如需詳細資訊,請參閱 服務總線進階傳訊層

    6. 選取頁面底部的 [檢閱 + 建立]

      顯示 [建立命名空間] 頁面的螢幕快照

    7. 在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立]

  6. 部署資源成功之後,請選取部署頁面上的 [移至資源 ]。

    顯示部署成功頁面的螢幕快照,其中包含 [移至資源] 連結。

  7. 您會看到服務匯流排命名空間的首頁。

    顯示所建立服務總線命名空間首頁的螢幕快照。

在 Azure 入口網站中建立佇列

  1. 在 [ Azure 服務匯流排命名空間] 頁面上,展開左側導覽功能表上的 [實體],然後選取 [佇列]

  2. 佇列 頁面上的工具列中,選取 + 佇列

  3. 輸入佇列的名稱。 將其他值保留為預設值。

  4. 選取 ,創建

    顯示 [建立佇列] 頁面的螢幕擷取畫面。

向 Azure 驗證應用程式

本文說明連線到 Azure 服務總線的兩種方式: 無密碼連接字串

第一個選項說明如何使用 Microsoft Entra ID 中的安全性主體和角色型存取控制 (RBAC),來連線到服務匯流排命名空間。 您不需要擔心在程式代碼、組態檔中或 Azure Key Vault 等安全記憶體中有硬式編碼連接字串。

第二個選項說明如何使用連接字串來連線到服務匯流排命名空間。 如果您不熟悉 Azure,您可能會覺得連接字串選項較容易遵循。 不過建議您在實際應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱 服務總線驗證和授權。 若要深入瞭解無密碼驗證,請參閱 驗證 .NET 應用程式

將角色指派給 Microsoft Entra 使用者

當您在本機開發時,請確定連線到 Azure 服務總線的用戶帳戶具有正確的許可權。 您需要 Azure 服務總線數據擁有者 角色,才能傳送和接收訊息。 若要自我指派此角色,您需要使用者存取管理員角色,或包括 Microsoft.Authorization/roleAssignments/write 動作的另一個角色。

您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 若要深入瞭解角色指派的可用範圍,請參閱 瞭解 Azure RBAC 的範圍

下列範例會將 Azure Service Bus Data Owner 角色指派給您的使用者帳戶,該角色提供對 Azure 服務匯流排資源的完整存取權。 在實際案例中,遵循 最低許可權原則 ,只為使用者提供更安全的生產環境所需的最低許可權。

Azure 服務匯流排的 Azure 內建角色

對於 Azure 服務匯流排來說,透過 Azure 入口網站和 Azure 資源管理 API 來的管理命名空間和所有相關資源的作業,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取服務匯流排命名空間:

如果您想要建立自訂角色,請參閱服務匯流排作業所需的權限 (機器翻譯)。

將 Microsoft Entra 使用者新增至 Azure 服務匯流排擁有者角色

將您的 Microsoft Entra 使用者名稱新增至服務匯流排命名空間層級的 Azure 服務匯流排資料擁有者角色, 此設定可讓在使用者帳戶內容中執行的應用程式,將訊息傳送至佇列或主題。 它可以從佇列或主題的訂閱接收訊息。

重要事項

在大部分情況下,角色指派在 Azure 中傳播需要一兩分鐘的時間。 在罕見的情況下,可能需要高達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。

  1. 如果您沒有在 Azure 入口網站中開啟 [服務匯流排命名空間] 頁面,請使用主要搜尋列或左側導覽找出您的服務匯流排命名空間。

  2. 在 [ 概觀] 頁面上,從左側功能表中選取 [訪問控制][IAM ]。

  3. 在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。

  4. 從頂端功能表中選取 [+ 新增 ],然後 選取 [新增角色指派]。

    顯示如何指派角色的螢幕擷取畫面。

  5. 使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋 Azure Service Bus Data Owner 並選取相符的結果。 接著,選擇 [下一步]

  6. 在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]

  7. 在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]

  8. 選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。

傳送訊息至佇列

在本節中,您會建立 Java 控制台專案,並新增程式代碼以將訊息傳送至您稍早建立的佇列。

建立 Java 主控台專案

使用 Eclipse 或您選擇的工具建立 Java 專案。

設定應用程式以使用服務匯流排

新增 Azure 核心及 Azure 服務匯流排程式庫的參考。

如果您使用 Eclipse 並建立 Java 控制台應用程式,請將 Java 專案轉換為 Maven:在 套件總管 視窗中以滑鼠右鍵點擊專案。 選擇 [設定>轉換為 Maven 專案]。 接著如下列範例所示,將相依性新增至這兩個程式庫。

更新 pom.xml 檔案,將相依性新增至 Azure 服務匯流排和 Azure 身分識別套件。

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

新增程式碼以將訊息傳送到佇列

  1. 在 Java 檔案頂端新增下列 import 陳述式。

    import com.azure.messaging.servicebus.*;
    import com.azure.identity.*;
    
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. 在類別中,定義變數來保存連接字串和佇列名稱。

    static String queueName = "<QUEUE NAME>";
    

    重要事項

    <QUEUE NAME> 取代為佇列名稱。

  3. 在類別中新增名為 sendMessage 的方法,以將一則訊息傳送至佇列。

    重要事項

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessage()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // send one message to the queue
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the queue: " + queueName);
    }
    
    
  4. 在類別中新增名為 createMessages 的方法,以建立訊息清單。 一般來說,您會從應用程式的不同部分取得這些訊息。 在此範例中,您會使用範例訊息的清單。

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. 新增名為 sendMessageBatch 方法的方法,將訊息傳送至您所建立的佇列。 這個方法會建立佇列的 ServiceBusSenderClient,叫用 createMessages 方法來取得訊息清單、準備一或多個批次,並將批次傳送至佇列。

    重要事項

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessageBatch()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
        }
    
        //close the client
        senderClient.close();
    }
    

從佇列接收訊息

在本節中,您將新增程式碼以從佇列中取出訊息。

  1. 新增名為 receiveMessages 的方法以從佇列接收訊息。 這個方法會藉由指定處理訊息的處理常式,以及另一項處理錯誤的方式,來建立佇列的 ServiceBusProcessorClient。 然後方法會啟動處理器、等待數秒並列印收到的訊息,然後停止和關閉處理器。

    重要事項

    • 以您的服務匯流排命名空間名稱取代 NAMESPACENAME
    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .processor()
                .queueName(queueName)
                .processMessage(context -> processMessage(context))
                .processError(context -> processError(context))
                .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();
    }
    
  2. 新增 processMessage 方法,以處理從服務匯流排訂用帳戶收到的訊息。

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }
    
  3. 新增 processError 方法以處理錯誤訊息。

    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }
    
  4. 更新 main 方法以叫用 sendMessagesendMessageBatch,並更新 receiveMessages 方法以擲回 InterruptedException

    public static void main(String[] args) throws InterruptedException {
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }
    

執行應用程式

  1. 如果您使用 Eclipse,請以滑鼠右鍵按一下專案,選取 [匯出],展開 [JAVA],選取 [可執行的 JAR 檔案],然後依照步驟建立可執行的 JAR 檔案。

  2. 如果您登入的使用者帳戶與您新增至 Azure 服務匯流排資料擁有者 角色的帳戶不同,請遵循下列步驟。 否則,請繼續在下一個步驟中執行 Jar 檔案。

    1. 在機器上安裝 Azure CLI (機器翻譯)。

    2. 執行下列 CLI 命令以登入 Azure。 使用您新增至 Azure 服務匯流排資料擁有者角色的相同使用者帳戶。

      az login
      
  3. 使用下列命令執行 Jar 檔案。

    java -jar <JAR FILE NAME>
    
  4. 您會在主控台視窗中看到下列輸出。

    Sent a single message to the queue: myqueue
    Sent a batch of messages to the queue: myqueue
    Starting the processor
    Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
    Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
    Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
    Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
    Stopping and closing the processor
    

在 Azure 入口網站中服務匯流排命名空間的概觀頁面上,您可以看到傳入傳出訊息計數。 請等候一分鐘左右,然後重新整理頁面以查看最新的值。

顯示傳入和傳出訊息計數的螢幕快照。

選取此概觀頁面上的佇列,瀏覽至服務匯流排佇列頁面。 您也會在此頁面上看到傳入傳出訊息計數。 您也會看到其他資訊,例如佇列的目前大小,以及作用中訊息計數的大小上限

螢幕快照顯示此佇列訊息的詳細資訊。

請參閱下列文件和範例: