在 Spring 應用程式中使用事件中樞

本文將教你如何在使用 Spring Framework 所建構的Java應用程式中使用 Azure 事件中樞。

Azure 事件中樞 是一個大數據串流平台與事件擷取服務。 它每秒可接收和處理數百萬個事件。 傳送至事件中樞的資料可以使用任何即時分析提供者或批次/儲存體配接器來轉換並儲存。

Spring Cloud Azure 提供多種模組,用於使用 Spring 框架向事件中心發送訊息及接收訊息。

您可以獨立使用下列模組,或針對不同的使用案例加以合併:

必要條件

  • 一個 Azure 事件中樞 實例。 欲了解更多資訊,請參閱 快速入門:使用Azure入口網站建立活動中心

  • 用於事件中心檢查點的 Azure 儲存帳戶。 如需詳細資訊,請參閱建立儲存體帳戶

  • Spring Boot 應用程式。 如果您沒有 Maven 專案,請使用 Spring Initializr 建立 Maven 專案。 記得選擇 Maven Project,然後在 Dependencies 下方新增 Spring Web 相依,然後選擇版本 8 或更高版本Java。

注意

要將你目前使用的 Microsoft Entra 帳號賦予存取 Azure 事件中樞 資源的權限,請在 Azure 事件中樞 中將 Azure 事件中樞 Data ReceiverAzure 事件中樞 Data Sender 這兩個角色指派給該帳號。 接著在Azure 儲存體帳號中,將 Storage Blob Data Contributor 角色指派到你目前使用的Microsoft Entra帳號。 欲了解更多關於指派存取角色的資訊,請參閱 使用 Azure 入口網站指派 Azure 角色 以及 使用 Microsoft Entra ID 授權對事件中樞資源的存取

重要

需要 Spring Boot 2.5 版或更高版本,才能完成本教學課程中的步驟。

準備在地環境

在本教學課程中,設定和程式代碼沒有任何驗證作業。 然而,連接 Azure 服務需要驗證。 要完成驗證,你需要使用 Azure Identity 用戶端函式庫。 Spring Cloud Azure 使用 DefaultAzureCredential,這是 Azure Identity 函式庫提供的,幫助你在不更改程式碼的情況下取得憑證。

DefaultAzureCredential 支援多種驗證方法,並在執行階段判斷應使用的方法。 這種方法可讓您的應用程式在不同的環境中使用不同的驗證方法,例如本機或生產環境,而不需要實作環境特定的程序代碼。 欲了解更多資訊,請參閱 Authenticate Azure-hosted Java applicationsDefaultAzureCredential 章節。

若要使用 Azure CLI、IntelliJ 或其他方法在本地開發環境中完成認證,請參見 Azure Java開發環境中的認證。 在 Azure 主機環境中完成認證時,我們建議使用管理身份。 欲了解更多資訊,請參閱 Azure資源的受管理身份是什麼?

使用 Spring Cloud Azure 事件中樞 Starter

Spring Cloud Azure 事件中樞 Starter模組透過 Spring Boot 框架匯入 Event Hubs Java客戶端函式庫。 你可以同時使用 Spring Cloud Azure 和 Azure SDK,且以不互斥的模式運行。 因此,你可以繼續在 Spring 應用程式中使用 Event Hubs 的 Java 用戶端 API。

新增相依性

要安裝 Spring Cloud Azure 事件中樞 Starter 模組,請在你的 pom.xml 檔案中新增以下相依關係:

  • Spring Cloud Azure 物料清單(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.3.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果你用的是 Spring Boot 4.0.x,記得把版本設 spring-cloud-azure-dependencies7.3.0.

    如果你用的是 Spring Boot 3.5.x,記得把版本設 spring-cloud-azure-dependencies6.4.0

    如果你使用的是 Spring Boot 3.1.x-3.5.x,記得把版本設 spring-cloud-azure-dependencies5.25.0.

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.20.0

    此材料帳單 (BOM) 應該在 <dependencyManagement> 檔案的 區段中設定。 這確保所有 Spring Cloud Azure 相依性都使用相同的版本。

    欲了解更多關於本物料清單所用版本的資訊,請參見 Spring Cloud Azure 應該使用哪個版本

  • Spring Cloud Azure 事件中樞 工件:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-eventhubs</artifactId>
    </dependency>
    

撰寫應用程式以傳送和接收訊息的程序代碼

本指南將教你如何在 Spring 應用程式的情境下使用 Event Hubs Java 用戶端。 在這裡,我們將介紹下列兩個選項:

  • 使用 Spring Boot 自動設定,並在 Spring 應用上下文中使用即用型用戶端(建議使用)。
  • 以程式設計方式建置用戶端。

從 Spring IoC 容器自動裝配客戶端 Bean 的方式具有下列優點,讓您在使用事件中樞客戶端開發時,可享有更具彈性和效率的體驗:

  • 它會套用 外部化組態 ,讓您可以在不同的環境中使用相同的應用程式程序代碼。
  • 您可以將學習產生器模式的程式委派給 Spring Boot 架構,並將此用戶端註冊至應用程式內容。 此委派可讓您專注於如何使用客戶端來符合您自己的商務需求。
  • 您可以使用健康情況指標,輕鬆地檢查應用程式和內部元件的狀態和健康情況。

下列各節提供程式碼範例,示範如何使用 EventProcessorClientEventHubProducerClient 搭配這兩個替代方案。

注意

Azure Java Event Hubs SDK 提供多個客戶端與 Event Hubs 互動。 入門版也會為所有事件中樞用戶端以及客戶端產生器提供自動設定。 本文只使用 EventProcessorClientEventHubProducerClient 作為範例。

使用 Spring Boot 自動配置

若要將訊息傳送至事件中樞並從中接收訊息,請使用下列步驟來設定應用程式:

  1. 使用下列屬性設定來設定事件中樞命名空間和事件中樞名稱:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.event-hub-name=<your-event-hub-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    spring.cloud.azure.eventhubs.processor.consumer-group=$Default
    
  2. 建立一個新的 EventHubProcessorClientConfiguration Java 類別,如下範例所示。 這個類別用來為EventProcessorClient註冊訊息處理程式和錯誤處理程式。

    import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
    import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class EventHubProcessorClientConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubProcessorClientConfiguration.class);
    
        @Bean
        EventHubsRecordMessageListener processEvent() {
            return eventContext->LOGGER.info("Processing event from partition {} with sequence number {} with body: {}",
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
                eventContext.getEventData().getBodyAsString());
        }
    
        @Bean
        EventHubsErrorHandler processError() {
            return errorContext->LOGGER.info("Error occurred in partition processor for partition {}, {}",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        }
    
    }
    
  3. EventProcessorClient在 Spring 應用程式中插入 和 EventHubProducerClient ,並呼叫相關的 API 來傳送和接收訊息,如下列範例所示:

    import com.azure.messaging.eventhubs.EventData;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import java.util.Collections;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class EventHubClientApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class);
        private final EventHubProducerClient eventHubProducerClient;
        private final EventProcessorClient eventProcessorClient;
    
        public EventHubClientApplication(EventHubProducerClient eventHubProducerClient,
                                         EventProcessorClient eventProcessorClient) {
            this.eventHubProducerClient = eventHubProducerClient;
            this.eventProcessorClient = eventProcessorClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            eventProcessorClient.start();
            // Wait for the processor client to be ready
            TimeUnit.SECONDS.sleep(10);
    
            eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
            LOGGER.info("Successfully sent a message to Event Hubs.");
            eventHubProducerClient.close();
            LOGGER.info("Skip stopping and closing the processor since the processor may not complete the receiving process yet.");
        }
    
    }
    
  4. 啟動應用程式。 您會看到類似下列範例的記錄:

    Successfully sent a message to Event Hubs.
    ...
    Processing event from partition 0 with sequence number 0 with body: Hello World
    ...
    Stopping and closing the processor.
    

以程式設計方式建置用戶端

您可以自行建置用戶端組件,但是過程很複雜。 在 Spring Boot 應用程式中,您必須管理屬性、了解產生器模式,以及向 Spring 應用程式內容註冊用戶端。 下列步驟示範如何執行此動作:

  1. 建立一個新的 EventHubClientConfiguration Java 類別,如下範例所示。 這個類別是用來宣告 EventProcessorClientEventHubProducerClient bean。 請務必將<your event-hubs-namespace><your-event-hub-name><your-storage-account-name><your-storage-account-container-name> 這些佔位元替換為您實際的值。

    import com.azure.identity.DefaultAzureCredentialBuilder;
    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.ErrorContext;
    import com.azure.messaging.eventhubs.models.EventContext;
    import com.azure.storage.blob.BlobContainerAsyncClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class EventHubClientConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientConfiguration.class);
        private static final String EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<your event-hubs-namespace>.servicebus.windows.net";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private static final String STORAGE_ACCOUNT_ENDPOINT = "https://<your-storage-account-name>.blob.core.windows.net";
        private static final String STORAGE_CONTAINER_NAME = "<your-storage-account-container-name>";
    
        @Bean
        EventHubClientBuilder eventHubClientBuilder() {
            return new EventHubClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME,
                new DefaultAzureCredentialBuilder()
                    .build());
        }
    
        @Bean
        BlobContainerClientBuilder blobContainerClientBuilder() {
            return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder()
                                                       .build())
                                                   .endpoint(STORAGE_ACCOUNT_ENDPOINT)
                                                   .containerName(STORAGE_CONTAINER_NAME);
        }
    
        @Bean
        BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) {
            return blobContainerClientBuilder.buildAsyncClient();
        }
    
        @Bean
        EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) {
            return new EventProcessorClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME,
                                                        new DefaultAzureCredentialBuilder()
                                                            .build())
                                                    .consumerGroup(CONSUMER_GROUP)
                                                    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                                                    .processEvent(EventHubClientConfiguration::processEvent)
                                                    .processError(EventHubClientConfiguration::processError);
        }
    
        @Bean
        EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) {
            return eventHubClientBuilder.buildProducerClient();
    
        }
    
        @Bean
        EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) {
            return eventProcessorClientBuilder.buildEventProcessorClient();
        }
    
        public static void processEvent(EventContext eventContext) {
            LOGGER.info("Processing event from partition {} with sequence number {} with body: {}",
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
                eventContext.getEventData().getBodyAsString());
        }
    
        public static void processError(ErrorContext errorContext) {
            LOGGER.info("Error occurred in partition processor for partition {}, {}",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        }
    
    }
    
  2. EventProcessorClientEventHubProducerClient 注入到您的 Spring 應用程式中,如下列範例所示:

    import com.azure.messaging.eventhubs.EventData;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import java.util.Collections;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class EventHubClientApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class);
        private final EventHubProducerClient eventHubProducerClient;
        private final EventProcessorClient eventProcessorClient;
    
        public EventHubClientApplication(EventHubProducerClient eventHubProducerClient,
                                         EventProcessorClient eventProcessorClient) {
            this.eventHubProducerClient = eventHubProducerClient;
            this.eventProcessorClient = eventProcessorClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            eventProcessorClient.start();
            // Wait for the processor client to be ready
            TimeUnit.SECONDS.sleep(10);
    
            eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
            LOGGER.info("Successfully sent a message to Event Hubs.");
            eventHubProducerClient.close();
            LOGGER.info("Stopping and closing the processor");
            eventProcessorClient.stop();
        }
    
    }
    
  3. 啟動應用程式。 您會看到類似下列範例的記錄:

    Successfully sent a message to Event Hubs.
    ...
    Processing event from partition 0 with sequence number 0 with body: Hello World
    ...
    Stopping and closing the processor.
    

下列清單顯示此程式碼沒有彈性或不夠優雅的一些原因:

  • 事件中樞命名空間和事件中樞名稱會硬式編碼。
  • 如果您使用 @Value 從 Spring 環境取得組態,則 application.properties 檔案中不能有 IDE 提示。
  • 如果您有微服務案例,則必須在每個專案中複製程序代碼,而且很容易犯錯,而且很難保持一致。

幸好,使用 Spring Cloud Azure 不需要自己建置客戶端組件。 相反地,您可以直接插入它們,並使用您已熟悉的設定屬性來設定記憶體佇列。 欲了解更多資訊,請參閱 Spring Cloud Azure configuration

Spring Cloud Azure 也提供以下針對不同情境的全域配置。 欲了解更多資訊,請參閱Spring Cloud Azure 配置Azure 服務 SDK 的全域配置章節。

  • Proxy 選項。
  • 重試選項。
  • AMQP 傳輸客戶端選項。

你也可以連接不同的 Azure 雲端。 欲了解更多資訊,請參見 連線至不同的 Azure 雲端

使用 Spring Messaging 與 Azure 事件中心

Spring Messaging Azure 事件中樞模組支援 Spring Messaging 框架與事件中心(Event Hubs)。

如果你使用 Spring Messaging Azure 事件中樞,可以使用以下功能:

  • EventHubsTemplate:以異步和同步方式將訊息傳送至事件中樞。
  • @EventHubsListener:將方法標示為目的地上事件中樞訊息接聽程序的目標。

本指南將教你如何使用 Spring Messengeraging 的 Azure 事件中樞 來發送訊息並接收 Event Hubs 的訊息。

新增相依性

要安裝 Spring Messaging Azure 事件中樞 模組,請在你的 pom.xml 檔案中新增以下相依關係:

  • Spring Cloud Azure 物料清單(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.3.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果你用的是 Spring Boot 4.0.x,記得把版本設 spring-cloud-azure-dependencies7.3.0.

    如果你用的是 Spring Boot 3.5.x,記得把版本設 spring-cloud-azure-dependencies6.4.0

    如果你使用的是 Spring Boot 3.1.x-3.5.x,記得把版本設 spring-cloud-azure-dependencies5.25.0.

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.20.0

    此材料帳單 (BOM) 應該在 <dependencyManagement> 檔案的 區段中設定。 這確保所有 Spring Cloud Azure 相依性都使用相同的版本。

    欲了解更多關於本物料清單所用版本的資訊,請參見 Spring Cloud Azure 應該使用哪個版本

  • Spring Cloud Azure Starter、Spring Messaging Event Hubs 與 Azure 事件中樞 Checkpoint Store 組件:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-messaging-azure-eventhubs</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    </dependency>
    

撰寫應用程式以傳送和接收訊息的程序代碼

若要將訊息傳送至事件中樞並從中接收訊息,請使用下列步驟來設定應用程式:

  1. 使用下列屬性設定來配置事件中樞命名空間和儲存體 Blob:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. 建立一個新的 ConsumerService Java 類別,如下範例所示。 這個類別是用來定義訊息接收者。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

    import com.azure.spring.messaging.eventhubs.implementation.core.annotation.EventHubsListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ConsumerService {
    
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$DEFAULT";
    
        @EventHubsListener(destination = EVENT_HUB_NAME, group = CONSUMER_GROUP)
        public void handleMessageFromEventHub(String message) {
            System.out.printf("New message received: %s%n", message);
        }
    
    }
    
  3. 使用 Spring 連接傳送者和接收者來傳送和接收訊息,如下列範例所示。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

    import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
    import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.messaging.support.MessageBuilder;
    
    @SpringBootApplication
    @EnableAzureMessaging
    public class EventHubMessagingApplication {
    
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubMessagingApplication.class);
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubMessagingApplication.class);
            EventHubsTemplate eventHubsTemplate = applicationContext.getBean(EventHubsTemplate.class);
            LOGGER.info("Sending a message to the Event Hubs.");
            eventHubsTemplate.sendAsync(EVENT_HUB_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe();
        }
    
    }
    

    提示

    請記得新增 @EnableAzureMessaging 註釋,這會觸發以 @EventHubsListener 標註的方法探索,自動建立訊息偵聽器容器。

  4. 啟動應用程式。 您會看到類似下列範例的記錄:

    Sending a message to the Event Hubs.
    New message received: Hello world
    

使用 Spring Integration 與 Azure 事件中樞

Spring Integration Azure 事件中樞模組支援 Spring Integration 框架與事件樞紐(Event Hubs)。

如果您的 Spring 應用程式使用 Spring Integration 訊息通道,您可以使用通道配接器在訊息通道和事件中樞之間路由訊息。

輸入通道配接器會將訊息從事件中樞轉送至訊息通道。 輸出通道配接器會將訊息從訊息通道發佈至事件中樞。

本指南說明如何使用 Spring Integration 與 Azure 事件中樞來傳送訊息到事件中樞並從中接收訊息。

新增相依性

要安裝 Spring Cloud Azure 事件中樞 Integration Starter 模組,請在您的 pom.xml 檔案中新增以下相依關係:

  • Spring Cloud Azure 物料清單(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.3.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果你用的是 Spring Boot 4.0.x,記得把版本設 spring-cloud-azure-dependencies7.3.0.

    如果你用的是 Spring Boot 3.5.x,記得把版本設 spring-cloud-azure-dependencies6.4.0

    如果你使用的是 Spring Boot 3.1.x-3.5.x,記得把版本設 spring-cloud-azure-dependencies5.25.0.

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.20.0

    此材料帳單 (BOM) 應該在 <dependencyManagement> 檔案的 區段中設定。 這確保所有 Spring Cloud Azure 相依性都使用相同的版本。

    欲了解更多關於本物料清單所用版本的資訊,請參見 Spring Cloud Azure 應該使用哪個版本

  • Spring Cloud Azure 事件中心整合組件

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
    </dependency>
    

撰寫應用程式以傳送和接收訊息的程序代碼

若要將訊息傳送至事件中樞並從中接收訊息,請使用下列步驟來設定應用程式:

  1. 使用下列屬性設定來配置事件中樞命名空間和儲存體 Blob:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. 建立一個新的 MessageReceiveConfiguration Java 類別,如下範例所示。 這個類別是用來定義訊息接收者。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

    import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
    import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
    import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
    import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
    import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
    import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.messaging.MessageChannel;
    
    @Configuration
    public class MessageReceiveConfiguration {
    
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiveConfiguration.class);
    
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload) {
            String message = new String(payload);
            LOGGER.info("New message received: {}", message);
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENT_HUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                                                                    EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    
    }
    
  3. 建立一個新的 MessageSendConfiguration Java 類別,如下範例所示。 這個類別是用來定義訊息傳送者。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

    import com.azure.spring.integration.core.handler.DefaultMessageHandler;
    import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    @Configuration
    public class MessageSendConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MessageSendConfiguration.class);
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENT_HUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
    
            return handler;
        }
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    
    }
    
  4. 使用 Spring 連接傳送者和接收者來傳送和接收訊息,如下列範例所示:

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.config.EnableIntegration;
    
    @SpringBootApplication
    @EnableIntegration
    @Configuration(proxyBeanMethods = false)
    public class EventHubIntegrationApplication {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubIntegrationApplication.class, args);
            MessageSendConfiguration.EventHubOutboundGateway outboundGateway = applicationContext.getBean(MessageSendConfiguration.EventHubOutboundGateway.class);
            outboundGateway.send("Hello World");
        }
    }
    

    提示

    請記得新增 @EnableIntegration 批注,以啟用 Spring Integration 基礎結構。

  5. 啟動應用程式。 您會看到類似下列範例的記錄:

    Message was sent successfully.
    New message received: Hello World
    

使用 Spring Cloud Azure Stream Event Hubs Binder

若要在 Spring Cloud Stream 應用程式中呼叫 Event Hubs API,請使用 Spring Cloud Azure 事件中樞 Stream Binder 模組。

本指南說明如何使用 Spring Cloud Stream 事件中樞系結器,將訊息傳送至事件中樞並從中樞接收訊息。

新增相依性

要安裝 Spring Cloud Azure 事件中樞 Stream Binder 模組,請在您的 pom.xml 檔案中新增以下相依關係:

  • Spring Cloud Azure 物料清單(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.3.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果你用的是 Spring Boot 4.0.x,記得把版本設 spring-cloud-azure-dependencies7.3.0.

    如果你用的是 Spring Boot 3.5.x,記得把版本設 spring-cloud-azure-dependencies6.4.0

    如果你使用的是 Spring Boot 3.1.x-3.5.x,記得把版本設 spring-cloud-azure-dependencies5.25.0.

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.20.0

    此材料帳單 (BOM) 應該在 <dependencyManagement> 檔案的 區段中設定。 這確保所有 Spring Cloud Azure 相依性都使用相同的版本。

    欲了解更多關於本物料清單所用版本的資訊,請參見 Spring Cloud Azure 應該使用哪個版本

  • Spring Cloud Azure 事件中樞流匯流排元件:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

撰寫應用程式以傳送和接收訊息的程序代碼

若要將訊息傳送至事件中樞並從中接收訊息,請使用下列步驟來設定應用程式:

  1. 使用下列屬性設定來配置事件中樞命名空間和儲存體 Blob:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. 建立訊息接收者。

    若要使用您的應用程式作為事件接收,請完成下列工作來設定輸入系結器:

    • 宣告一個定義訊息處理邏輯的 Consumer Bean。 例如,下列 Consumer 豆子名為 consume

      @Bean
      public Consumer<Message<String>> consume() {
           return message -> {
               System.out.printf("New message received: %s%n", message.getPayload());
           };
      }
      
    • 新增下列組態以指定用於使用的 Event Hub 名稱。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

      # name for the above `Consumer` bean
      spring.cloud.stream.function.definition=consume
      spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name>
      spring.cloud.stream.bindings.consume-in-0.group=$Default
      spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
      
  3. 建立訊息寄件者。

    若要使用您的應用程式作為事件來源,請完成下列工作來設定輸出系結器:

    • 定義一個Supplier bean,以指定訊息在您應用程式中的來源,如下示例所示:

      @Bean
      public Supplier<Message<String>> supply() {
          return () -> {
              System.out.println("Sending a message.");
              return MessageBuilder.withPayload("Hello world").build();
          };
      }
      
    • 新增下列設定組態,以指定用於傳送的名稱 Event Hub。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

      # "consume" is added from the above step
      spring.cloud.stream.function.definition=consume;supply
      spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
      
  4. 啟動應用程式。 您會看到類似下列範例的記錄:

    Sending a message.
    New message received: Hello world.
    

使用 Spring Kafka 與 Azure 事件中樞

事件中樞提供您現有 Kafka 應用程式可以使用的 Kafka 端點。 此方法提供執行您自己的 Kafka 叢集的替代方案。 事件中樞適用於許多現有的 Kafka 應用程式。 如需詳細資訊,請參閱 適用於 Apache Kafka 的事件中樞

本指南教你如何使用 Azure 事件中心和 Spring Kafka 來將訊息發送至事件中心,並從事件中心接收訊息。

新增相依性

要安裝 Spring Cloud Azure starter 和 Spring Kafka 模組,請在你的 pom.xml 檔案中新增以下相依關係:

  • Spring Cloud Azure 物料清單(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.3.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果你用的是 Spring Boot 4.0.x,記得把版本設 spring-cloud-azure-dependencies7.3.0.

    如果你用的是 Spring Boot 3.5.x,記得把版本設 spring-cloud-azure-dependencies6.4.0

    如果你使用的是 Spring Boot 3.1.x-3.5.x,記得把版本設 spring-cloud-azure-dependencies5.25.0.

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.20.0

    此材料帳單 (BOM) 應該在 <dependencyManagement> 檔案的 區段中設定。 這確保所有 Spring Cloud Azure 相依性都使用相同的版本。

    欲了解更多關於本物料清單所用版本的資訊,請參見 Spring Cloud Azure 應該使用哪個版本

  • Spring Cloud Azure 啟動程式與 Spring Kafka 產物:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    

撰寫應用程式以傳送和接收訊息的程序代碼

若要將訊息傳送至事件中樞並從中接收訊息,請使用下列步驟來設定應用程式:

  1. 使用下列屬性設定來設定事件中樞命名空間:

    spring.kafka.bootstrap-servers=<your event-hubs-namespace>.servicebus.windows.net:9093
    
  2. 使用 KafkaTemplate 來傳送訊息和 @KafkaListener 接收訊息,如下列範例所示。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootApplication
    public class EventHubKafkaApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaApplication.class);
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public EventHubKafkaApplication(KafkaTemplate<String, String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubKafkaApplication.class, args);
        }
    
        @Override
        public void run(String... args) {
            kafkaTemplate.send(EVENT_HUB_NAME, "Hello World");
            LOGGER.info("Message was sent successfully.");
        }
    
        @KafkaListener(topics = EVENT_HUB_NAME, groupId = CONSUMER_GROUP)
        public void receive(String message) {
            LOGGER.info("New message received: {}", message);
        }
    
    }
    
  3. 啟動應用程式。 您會看到類似下列範例的記錄:

    Message was sent successfully.
    New message received: Hello world
    

使用 Spring Cloud Stream Kafka Binder 與 Azure 事件中樞搭配運作

Spring Cloud Stream 是一種架構,可讓應用程式開發人員撰寫訊息驅動微服務。 傳訊系統和 Spring Cloud Stream 之間的橋接是透過繫結器抽象化。 系結器適用於數個傳訊系統,但其中一個最常用的系結器適用於 Apache Kafka。

本指南說明如何使用 Azure 事件中樞 和 Spring Cloud Stream Kafka Binder 來發送和接收來自 Event Hubs 的訊息。

新增相依性

要安裝 Spring Cloud Azure starter 和 Spring Cloud Stream binder Kafka 模組,請在你的 pom.xml 檔案中新增以下相依關係:

  • Spring Cloud Azure 物料清單(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.3.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果你用的是 Spring Boot 4.0.x,記得把版本設 spring-cloud-azure-dependencies7.3.0.

    如果你用的是 Spring Boot 3.5.x,記得把版本設 spring-cloud-azure-dependencies6.4.0

    如果你使用的是 Spring Boot 3.1.x-3.5.x,記得把版本設 spring-cloud-azure-dependencies5.25.0.

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.20.0

    此材料帳單 (BOM) 應該在 <dependencyManagement> 檔案的 區段中設定。 這確保所有 Spring Cloud Azure 相依性都使用相同的版本。

    欲了解更多關於本物料清單所用版本的資訊,請參見 Spring Cloud Azure 應該使用哪個版本

  • Spring Cloud Azure 启动元件:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    

撰寫應用程式以傳送和接收訊息的程序代碼

若要將訊息傳送至事件中樞並從中接收訊息,請使用下列步驟來設定應用程式:

  1. 使用下列屬性設定來設定 Kafka 訊息代理程式:

    spring.cloud.stream.kafka.binder.brokers=<your event-hubs-namespace>.servicebus.windows.net:9093
    
  2. 建立訊息接收者。

    若要使用您的應用程式作為事件接收,請完成下列工作來設定輸入系結器:

    • 宣告一個定義訊息處理邏輯的 Consumer Bean。 例如,下列 Consumer 豆子名為 consume

      @Bean
      public Consumer<Message<String>> consume() {
          return message -> {
              System.out.printf("New message received: %s%n", message.getPayload());
          };
      }
      
    • 新增下列組態以指定用於使用的 Event Hub 名稱。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

      # name for the above `Consumer` bean
      spring.cloud.stream.function.definition=consume
      spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name>
      spring.cloud.stream.bindings.consume-in-0.group=$Default
      
  3. 建立訊息寄件者。

    若要使用您的應用程式作為事件來源,請完成下列工作來設定輸出系結器:

    • 定義一個Supplier bean,以指定訊息在您應用程式中的來源,如下示例所示:

      @Bean
      public Supplier<Message<String>> supply() {
          return () -> {
              System.out.println("Sending a message.");
              return MessageBuilder.withPayload("Hello world").build();
          };
      }
      
    • 新增下列設定組態,以指定用於傳送的名稱 Event Hub。 請務必將 <your-event-hub-name> 佔位元替換成您的實際值。

      # "consume" is added from the above step
      spring.cloud.stream.function.definition=consume;supply
      spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
      
  4. 啟動應用程式。 您會看到類似下列範例的記錄:

    Sending a message.
    New message received: Hello world.
    

部署到 Azure Spring 應用程式

現在您已在本機執行 Spring Boot 應用程式,現在可以將其移至生產環境。 Azure Spring 應用程式 讓 Spring Boot 應用程式部署到 Azure 變得簡單,無需修改程式碼。 服務會管理 Spring 應用程式的基礎結構,讓開發人員可以專注於處理程式碼。 Azure Spring 應用程式 提供生命週期管理,包含全面的監控與診斷、組態管理、服務發現、CI/CD 整合、藍綠部署等多項功能。 要將您的應用程式部署到 Azure Spring 應用程式,請參見 將您的第一個應用程式部署至 Azure Spring 應用程式

下一步

另請參閱

欲了解更多適用於 Microsoft Azure 的其他 Spring Boot 起始器,請參見 什麼是 Spring Cloud Azure?