將訊息傳送至 Azure 服務匯流排 主題,並從訂用帳戶接收主題的訊息(Java)

在本快速入門中,您會使用 azure-messaging-servicebus 套件撰寫 Java 程式代碼,將訊息傳送至 Azure 服務匯流排 主題,然後將訊息從訂用帳戶接收至該主題。

注意

本快速入門提供將一批訊息傳送至 服務匯流排 主題並從主題訂用帳戶接收這些訊息的簡單案例逐步指示。 您可以在 GitHub 上的 Azure SDK for Java 存放庫中找到適用於 Azure 服務匯流排 的預先建置 Java 範例。

提示

如果您在 Spring 應用程式中使用 Azure 服務匯流排 資源,建議您將 Spring Cloud Azure 視為替代方案。 Spring Cloud Azure 是開放原始碼專案,可提供與 Azure 服務的無縫 Spring 整合。 若要深入瞭解 Spring Cloud Azure,以及查看使用 服務匯流排 的範例,請參閱使用 Azure 服務匯流排 的 Spring Cloud Stream。

必要條件

在 Azure 入口網站 中建立命名空間

若要開始在 Azure 中使用 服務匯流排 傳訊實體,您必須先建立命名空間,其名稱在 Azure 中是唯一的。 命名空間提供應用程式內 服務匯流排 資源(佇列、主題等)的範圍容器。

若要建立命名空間:

  1. 登入 Azure 入口網站

  2. 流覽至 [ 所有服務] 頁面

  3. 在左側導覽列上,從類別清單中選取 [整合],將滑鼠停留在 服務匯流排 上方,然後在 服務匯流排 磚上選取+按鈕。

    Image showing selection of Create a resource, Integration, and then Service Bus in the menu.

  4. 在 [建立命名空間] 頁面的 [基本] 標籤中,遵循下列步驟:

    1. 針對 [ 訂用帳戶],選擇要在其中建立命名空間的 Azure 訂用帳戶。

    2. 針對 [ 資源群組],選擇命名空間將存留在其中的現有資源群組,或建立新的資源群組。

    3. 輸入命名空間的名稱。 命名空間名稱應遵守下列命名慣例:

      • 名稱在整個 Azure 中必須是唯一的。 系統會立即檢查此名稱是否可用。
      • 名稱長度至少為 6,最多 50 個字元。
      • 名稱只能包含字母、數位、連字元 “-”。
      • 名稱必須以字母開頭,並以字母或數字結尾。
      • 名稱結尾不是 “-sb” 或 “-mgmt”。
    4. 針對 [ 位置],選擇您的命名空間應該裝載所在的區域。

    5. 針對 [定價層],選取命名空間的定價層 [基本]、[標準] 或 [進階版]。 在本快速入門中,選取 [ 標準]。

      重要

      如果您想要使用主題和訂帳戶,請選擇 [標準] 或 [進階版]。 基本定價層不支援主題/訂用帳戶。

      如果您選取 進階版 定價層,請指定傳訊單位數目。 進階層會在 CPU 和記憶體層級提供資源隔離,讓每個工作負載以隔離方式執行。 此資源容器稱為傳訊單位。 進階命名空間至少有一個傳訊單位。 您可以為每個 服務匯流排 進階版 命名空間選取 1、2、4、8 或 16 個傳訊單位。 如需詳細資訊,請參閱 服務匯流排 進階版 傳訊

    6. 選取頁面底部的 [檢閱 + 建立] 。

      Image showing the Create a namespace page

    7. 在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立]

  5. 部署資源成功后,請選取 部署頁面上的 [移至資源 ]。

    Image showing the deployment succeeded page with the Go to resource link.

  6. 您會看到服務總線命名空間的首頁。

    Image showing the home page of the Service Bus namespace created.

使用 Azure 入口網站 建立主題

  1. 在 [服務匯流排 命名空間] 頁面上,選取左側功能表上的 [主題]。

  2. 選取 工具列上的 [+ 主題 ]。

  3. 輸入主題的名稱。 保留其他選項的預設值。

  4. 選取 建立

    Image showing the Create topic page.

建立主題的訂用帳戶

  1. 選取您在上一節中建立的主題

    Image showing the selection of topic from the list of topics.

  2. [服務匯流排 主題] 頁面上,選取工具列上的 [+ 訂用帳戶]。

    Image showing the Add subscription button.

  3. 在 [ 建立訂用帳戶] 頁面上,遵循下列步驟:

    1. 輸入 S1 以取得 訂用帳戶的名稱

    2. 針對 [最大傳遞計數] 輸入 3

    3. 然後,選取 [建立 ] 以建立訂用帳戶。

      Image showing the Create subscription page.

向 Azure 驗證應用程式

本快速入門示範兩種連線到 Azure 服務匯流排 的方式:密碼和 連接字串

第一個選項示範如何使用 Microsoft Entra ID 和角色型存取控制 (RBAC) 中的安全性主體來連線到 服務匯流排 命名空間。 您不需要擔心在程式代碼或組態檔中或 Azure 金鑰保存庫 之類的安全記憶體中,有硬式編碼 連接字串。

第二個選項示範如何使用 連接字串 來連線到 服務匯流排 命名空間。 如果您不熟悉 Azure,您可能會發現 連接字串 選項更容易遵循。 我們建議在真實世界應用程式和生產環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概 觀頁面上深入瞭解無密碼驗證。

將角色指派給 Microsoft Entra 使用者

在本機開發時,請確定連線到 Azure 服務匯流排 的用戶帳戶具有正確的許可權。 您需要 Azure 服務匯流排 數據擁有者角色,才能傳送和接收訊息。 若要指派此角色,您需要使用者存取 管理員 istrator 角色,或包含Microsoft.Authorization/roleAssignments/write動作的另一個角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 在範圍概觀頁面上深入瞭解角色指派的可用範圍。

下列範例會將Azure Service Bus Data Owner角色指派給用戶帳戶,以提供 Azure 服務匯流排 資源的完整存取權。 在實際案例中,遵循 最低許可權 原則,只為使用者提供更安全的生產環境所需的最低許可權。

適用於 Azure 服務匯流排的 Azure 內建角色

針對 Azure 服務匯流排,透過 Azure 入口網站 和 Azure 資源管理 API 來管理命名空間和所有相關資源,已使用 Azure RBAC 模型加以保護。 Azure 提供下列 Azure 內建角色,以授權存取 服務匯流排 命名空間:

如果您想要建立自定義角色,請參閱 服務匯流排 作業所需的許可權。

將 Microsoft Entra 使用者新增至 Azure 服務匯流排 擁有者角色

在 服務匯流排 命名空間層級,將 Microsoft Entra 用戶名稱新增至 Azure 服務匯流排 數據擁有者角色。 它可讓在使用者帳戶內容中執行的應用程式將訊息傳送至佇列或主題,以及接收來自佇列或主題訂用帳戶的訊息。

重要

在大部分情況下,在 Azure 中傳播角色指派需要一兩分鐘的時間。 在罕見的情況下,最多可能需要 八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。

  1. 如果您沒有在 Azure 入口網站 中開啟 [服務匯流排 命名空間] 頁面,請使用主要搜尋列或左側導覽來尋找您的 服務匯流排 命名空間。

  2. 在 [概觀] 頁面上,從左側功能表中選取 [訪問控制][IAM ]。

  3. 在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。

  4. 從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]

    A screenshot showing how to assign a role.

  5. 使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋 Azure Service Bus Data Owner 並選取相符的結果。 接著,選擇 [下一步]

  6. 在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]

  7. 在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]

  8. 選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。

將訊息傳送至主題

在本節中,您會建立 Java 控制台專案,並新增程式代碼以將訊息傳送至您所建立的主題。

建立Java控制台專案

使用 Eclipse 或您選擇的工具建立 Java 專案。

設定您的應用程式以使用 服務匯流排

新增 Azure Core 和 Azure 服務匯流排 連結庫的參考。

如果您使用 Eclipse 並建立 Java 控制台應用程式,請將 Java 項目轉換成 Maven:以滑鼠右鍵按兩下 [套件總管] 視窗中的項目,選取 [設定 -> 轉換為 Maven 專案]。 然後,將相依性新增至這兩個連結庫,如下列範例所示。

更新 檔案pom.xml,將相依性新增至 Azure 服務匯流排 和 Azure 身分識別套件。

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

新增程式代碼以將訊息傳送至主題

  1. 在 Java 檔案的主題中新增下列 import 語句。

    import com.azure.messaging.servicebus.*;
    import com.azure.identity.*;
    
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. 在類別中,定義變數來保存 連接字串(不需要用於無密碼案例)、主題名稱和訂用帳戶名稱。

    static String topicName = "<TOPIC NAME>";
    static String subName = "<SUBSCRIPTION NAME>";
    

    重要

    取代為主題名稱,並以<SUBSCRIPTION NAME>主題的訂用帳戶名稱取代 <TOPIC NAME>

  3. 在類別中新增名為 sendMessage 的方法,以將一則訊息傳送至主題。

    重要

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessage()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the topic: " + topicName);
    }
    
    
  4. 在類別中新增名為 createMessages 的方法,以建立訊息清單。 一般而言,您會從應用程式的不同部分取得這些訊息。 在這裡,我們會建立範例訊息的清單。

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. 新增名為 sendMessageBatch 方法的方法,以將訊息傳送至您所建立的主題。 這個方法會 ServiceBusSenderClient 建立 主題的 ,叫 createMessages 用 方法以取得訊息清單、準備一或多個批次,以及將批次傳送至主題。

    重要

    以您的服務匯流排命名空間名稱取代 NAMESPACENAME

    static void sendMessageBatch()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
        }
    
        //close the client
        senderClient.close();
    }
    

從訂用帳戶接收訊息

在本節中,您會新增程序代碼,以從訂用帳戶擷取主題中的訊息。

  1. 新增名為 receiveMessages 的方法,以接收來自訂用帳戶的訊息。 這個方法會 ServiceBusProcessorClient 指定處理訊息的處理程式,並指定另一個處理程式來處理錯誤,以建立訂閱的 。 然後,它會啟動處理器、等候幾秒鐘、列印收到的訊息,然後停止並關閉處理器。

    重要

    • 以您的服務匯流排命名空間名稱取代 NAMESPACENAME
    • 將中的 ServiceBusTopicTest::processMessage 取代ServiceBusTopicTest為您的類別名稱。
    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        // Create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
            .credential(credential)
            .processor()
            .topicName(topicName)
            .subscriptionName(subName)
            .processMessage(context -> processMessage(context))
            .processError(context -> processError(context))
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();
    }
    
  2. processMessage新增方法來處理從 服務匯流排 訂閱接收的訊息。

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }
    
  3. processError新增方法來處理錯誤訊息。

    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }
    
  4. main更新 方法以叫sendMessage用、 sendMessageBatchreceiveMessages 方法,並擲回 InterruptedException

    public static void main(String[] args) throws InterruptedException {
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }
    

執行應用程式

執行程式以檢視類似下列輸出的輸出:

  1. 如果您使用 Eclipse,請以滑鼠右鍵按兩下項目,選取 [匯出],展開 [Java],選取 [可執行的 JAR 檔案],然後依照步驟建立可執行的 JAR 檔案。

  2. 如果您使用與新增至 Azure 服務匯流排 數據擁有者角色的用戶帳戶不同的用戶帳戶登入計算機,請遵循下列步驟。 否則,請略過此步驟,然後在下一個步驟中繼續執行 Jar 檔案。

    1. 在您的電腦上安裝 Azure CLI

    2. 執行下列 CLI 命令以登入 Azure。 使用您新增至 Azure 服務匯流排 數據擁有者角色的相同用戶帳戶。

      az login
      
  3. 使用下列命令執行 Jar 檔案。

    java -jar <JAR FILE NAME>
    
  4. 您會在主控台視窗中看到以下輸出。

    Sent a single message to the topic: mytopic
    Sent a batch of messages to the topic: mytopic
    Starting the processor
    Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World!
    Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message
    Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message
    Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message
    

在 Azure 入口網站 中 服務匯流排 命名空間的 [概觀] 頁面上,您可以看到傳入傳出訊息計數。 等候一分鐘左右,然後重新整理頁面以查看最新的值。

Incoming and outgoing message count

切換至中間窗格的 [主題] 索引卷標,然後選取主題以查看主題的 [服務匯流排 主題] 頁面。 在此頁面上,您應該會在 [訊息] 圖表中看到四個傳入和四個傳出訊息。

Incoming and outgoing messages

如果您在 方法中main批注化receiveMessages呼叫,然後再次執行應用程式,請在 [服務匯流排 主題] 頁面上看到 8 個傳入訊息(4 個新),但四個傳出訊息。

Updated topic page

在此頁面上,如果您選取訂用帳戶,您會前往 [服務匯流排 訂用帳戶] 頁面。 您可以在此頁面看到作用中的訊息計數、寄不出的信件訊息計數等等。 在此範例中,接收者尚未收到四個作用中訊息。

Active message count

下一步

請參閱下列檔和範例: