具有 Azure 服務匯流排 的 Spring Cloud Stream

本文示範如何使用 Spring Cloud Stream Binder 將訊息傳送至和接收來自 服務匯流排 queuestopics的訊息。

Azure 提供稱為「Azure 服務匯流排」(「服務匯流排」)的異步傳訊平臺,其以進階消息佇列通訊協定 1.0(“AMQP 1.0”) 標準為基礎。 服務匯流排 可用於支援的 Azure 平台範圍。

必要條件

注意

若要授與帳戶對 Azure 服務匯流排 資源的存取權,請將 和 Azure Service Bus Data Receiver 角色指派Azure Service Bus Data Sender給您目前使用的 Microsoft Entra 帳戶。 如需授與存取角色的詳細資訊,請參閱使用 Azure 入口網站 指派 Azure 角色,並使用 Microsoft Entra 標識符來授權應用程式存取 Azure 服務匯流排 實體

重要

需要 Spring Boot 2.5 版或更高版本,才能完成本文中的步驟。

從 Azure 服務匯流排 傳送和接收訊息

使用 Azure 服務匯流排 的佇列或主題,您可以使用 Spring Cloud Azure Stream Binder 服務匯流排 來傳送和接收訊息。

若要安裝 Spring Cloud Azure Stream Binder 服務匯流排 模組,請將下列相依性新增至您的pom.xml檔案:

  • Spring Cloud Azure 材料帳單(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    注意

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.17.0。 此材料帳單 (BOM) 應該在<dependencyManagement>pom.xml檔案的 區段中設定。 這可確保所有 Spring Cloud Azure 相依性都使用相同的版本。 如需此 BOM 所用版本的詳細資訊,請參閱 應該使用哪個版本的 Spring Cloud Azure。

  • Spring Cloud Azure Stream Binder 服務匯流排 成品:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

編碼應用程式

使用下列步驟來設定應用程式,以使用 服務匯流排 佇列或主題來傳送和接收訊息。

  1. 在組態檔 application.properties中設定 服務匯流排 認證。

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    下表描述群組態中的欄位:

    欄位 描述
    spring.cloud.azure.servicebus.namespace 指定您在 服務匯流排 中從 Azure 入口網站 取得的命名空間。
    spring.cloud.stream.bindings.consume-in-0.destination 指定您在本教學課程中使用的 服務匯流排 佇列或 服務匯流排 主題。
    spring.cloud.stream.bindings.supply-out-0.destination 指定用於輸入目的地的相同值。
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete 指定是否自動結算訊息。 如果設定為 false,將會新增 的 Checkpointer 訊息標頭,讓開發人員手動解決訊息。
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type 指定輸出系結的實體類型可以是 queuetopic
    spring.cloud.function.definition 指定要系結至系結所公開之外部目的地的功能豆。
    spring.cloud.stream.poller.fixed-delay 指定預設輪詢器以毫秒為單位的固定延遲。 預設值為 1000 L。建議值為 60000
    spring.cloud.stream.poller.initial-delay 指定定期觸發程式的初始延遲。 預設值為 0
  2. 編輯啟動類別檔案以顯示下列內容。

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    提示

    在本教學課程中,組態或程式代碼中沒有任何驗證作業。 不過,連線到 Azure 服務需要驗證。 若要完成驗證,您需要使用 Azure Identity。 Spring Cloud Azure 使用 DefaultAzureCredential,Azure 身分識別連結庫會提供它來協助您取得認證,而不需要變更任何程序代碼。

    DefaultAzureCredential 支援多種驗證方法,並在執行階段判斷應使用的方法。 這種方法可讓您的應用程式在不同的環境中使用不同的驗證方法(例如本機和生產環境),而不需要實作環境特定的程序代碼。 如需詳細資訊,請參閱 DefaultAzureCredential

    若要在本機開發環境中完成驗證,您可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 如需詳細資訊,請參閱 Java 開發環境中的 Azure 驗證。 若要在 Azure 裝載環境中完成驗證,建議您使用使用者指派的受控識別。 如需詳細資訊,請參閱什麼是 Azure 資源的受控識別?

  3. 啟動應用程式。 如下列範例的訊息將會張貼在應用程式記錄檔中:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

下一步