共用方式為


具有 Azure 服務匯流排 的 Spring Cloud Stream

本文示範如何使用 Spring Cloud Stream Binder 將訊息傳送至和接收來自 服務匯流排 queuestopics的訊息。

Azure 提供稱為Azure 服務匯流排」(「服務匯流排」)的異步傳訊平臺,以進階消息佇列通訊協定 1.0(“AMQP 1.0”) 標準為基礎。 Service Bus 可用於各種受支援的 Azure 平台。

必要條件

注意

若要授予您的帳戶存取 Azure 服務匯流排資源的權限,請將 Azure Service Bus Data SenderAzure Service Bus Data Receiver 角色指派給您目前使用的 Microsoft Entra 帳戶。 如需有關授與存取角色的更多資訊,請參閱使用 Azure 入口網站指派 Azure 角色使用 Microsoft Entra ID 驗證和授權應用程式以存取 Azure 服務匯流排實體

重要

需要 Spring Boot 2.5 版或更高版本,才能完成本文中的步驟。

從 Azure Service Bus 傳送和接收訊息

使用 Azure 服務匯流排中的佇列或主題,您可以利用 Spring Cloud Azure Stream Binder 服務匯流排來傳送和接收訊息。

若要安裝 Spring Cloud Azure Stream Binder 服務匯流排 模組,請將下列相依性新增至您的pom.xml檔案:

  • Spring Cloud Azure 材料帳單(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>6.0.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    注意

    如果您使用的是 Spring Boot 3.0.x-3.4.x,請務必將版本設定 spring-cloud-azure-dependencies5.23.0

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.20.0

    此材料帳單 (BOM) 應該在<dependencyManagement>pom.xml檔案的 區段中設定。 這可確保所有 Spring Cloud Azure 相依性都使用相同的版本。

    如需此 BOM 所用版本的詳細資訊,請參閱 應該使用哪個版本的 Spring Cloud Azure。

  • Spring Cloud Azure Stream Binder Service Bus 工件:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

編碼應用程式

使用下列步驟來設定應用程式,以使用 服務匯流排 佇列或主題來傳送和接收訊息。

  1. 在組態檔 application.properties中設定 服務匯流排 認證。

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    下表描述群組態中的欄位:

    欄位 描述
    spring.cloud.azure.servicebus.namespace 指定您在 Azure 入口網站中從您的 Service Bus 取得的命名空間。
    spring.cloud.stream.bindings.consume-in-0.destination 請指定您在本教程中使用的服務匯流排佇列或服務匯流排主題。
    spring.cloud.stream.bindings.supply-out-0.destination 指定用於輸入目的地的相同值。
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete 指定是否自動處理訊息。 如果設定為 false,將會新增的 Checkpointer 訊息標頭,讓開發人員手動解決訊息。
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type 指定輸出系結的實體類型可以是 queuetopic
    spring.cloud.function.definition 指定將哪個功能元件綁定至綁定所公開的外部目的地。
    spring.cloud.stream.poller.fixed-delay 指定預設輪詢器的固定延遲時間(以毫秒為單位)。 預設值是 1000 L。 建議的值是 60000
    spring.cloud.stream.poller.initial-delay 指定定期觸發事件的初始延遲。 預設值是 0
  2. 編輯啟動類別檔案以顯示下列內容。

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    提示

    在本教學課程中,組態或程式代碼中沒有任何驗證作業。 不過,連線到 Azure 服務需要驗證。 若要完成驗證,您需要使用 Azure Identity。 Spring Cloud Azure 使用 DefaultAzureCredential,Azure 身分識別連結庫會提供它來協助您取得認證,而不需要變更任何程序代碼。

    DefaultAzureCredential 支援多種驗證方法,並在執行階段判斷應使用的方法。 這種方法可讓您的應用程式在不同的環境中使用不同的驗證方法(例如本機和生產環境),而不需要實作環境特定的程序代碼。 如需詳細資訊,請參閱 DefaultAzureCredential

    若要在本機開發環境中完成驗證,您可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 如需詳細資訊,請參閱 Java 開發環境中的 Azure 驗證。 若要在 Azure 裝載環境中完成驗證,建議您使用使用者指派的受控識別。 如需詳細資訊,請參閱什麼是 Azure 資源受控識別?

  3. 啟動應用程式。 如下列範例的訊息將會張貼在應用程式記錄檔中:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

下一步