本文示範如何使用 Spring Cloud Stream Binder 將訊息傳送至和接收來自 服務匯流排 queues 和 topics的訊息。
Azure 提供稱為「Azure 服務匯流排」(「服務匯流排」)的異步傳訊平臺,以進階消息佇列通訊協定 1.0(“AMQP 1.0”) 標準為基礎。 Service Bus 可用於各種受支援的 Azure 平台。
必要條件
Azure 訂用帳戶 - 建立免費帳戶。
Java Development Kit (JDK) 第 8 版或更高版本。
Apache Maven 3.2 版或更高版本。
cURL 或類似的 HTTP 公用程式來測試功能。
Spring Boot 應用程式。 如果您沒有這個應用程式,請使用 Spring Initializr 來建立 Maven 專案。 請務必選取 Maven 專案,然後在 [相依性] 底下新增 Spring Web 和 Azure 支援相依性,然後選取 [Java 第 8 版] 或更新版本。
注意
若要授予您的帳戶存取 Azure 服務匯流排資源的權限,請將 Azure Service Bus Data Sender 和 Azure Service Bus Data Receiver 角色指派給您目前使用的 Microsoft Entra 帳戶。 如需有關授與存取角色的更多資訊,請參閱使用 Azure 入口網站指派 Azure 角色和使用 Microsoft Entra ID 驗證和授權應用程式以存取 Azure 服務匯流排實體。
重要
需要 Spring Boot 2.5 版或更高版本,才能完成本文中的步驟。
從 Azure Service Bus 傳送和接收訊息
使用 Azure 服務匯流排中的佇列或主題,您可以利用 Spring Cloud Azure Stream Binder 服務匯流排來傳送和接收訊息。
若要安裝 Spring Cloud Azure Stream Binder 服務匯流排 模組,請將下列相依性新增至您的pom.xml檔案:
Spring Cloud Azure 材料帳單(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>6.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>注意
如果您使用的是 Spring Boot 3.0.x-3.4.x,請務必將版本設定
spring-cloud-azure-dependencies為5.23.0。如果您使用 Spring Boot 2.x,請務必將
spring-cloud-azure-dependencies版本設定為4.20.0。此材料帳單 (BOM) 應該在
<dependencyManagement>pom.xml檔案的 區段中設定。 這可確保所有 Spring Cloud Azure 相依性都使用相同的版本。如需此 BOM 所用版本的詳細資訊,請參閱 應該使用哪個版本的 Spring Cloud Azure。
Spring Cloud Azure Stream Binder Service Bus 工件:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
編碼應用程式
使用下列步驟來設定應用程式,以使用 服務匯流排 佇列或主題來傳送和接收訊息。
在組態檔
application.properties中設定 服務匯流排 認證。spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0下表描述群組態中的欄位:
欄位 描述 spring.cloud.azure.servicebus.namespace指定您在 Azure 入口網站中從您的 Service Bus 取得的命名空間。 spring.cloud.stream.bindings.consume-in-0.destination請指定您在本教程中使用的服務匯流排佇列或服務匯流排主題。 spring.cloud.stream.bindings.supply-out-0.destination指定用於輸入目的地的相同值。 spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete指定是否自動處理訊息。 如果設定為 false,將會新增的Checkpointer訊息標頭,讓開發人員手動解決訊息。spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type指定輸出系結的實體類型可以是 queue或topic。spring.cloud.function.definition指定將哪個功能元件綁定至綁定所公開的外部目的地。 spring.cloud.stream.poller.fixed-delay指定預設輪詢器的固定延遲時間(以毫秒為單位)。 預設值是 1000 L。 建議的值是60000。spring.cloud.stream.poller.initial-delay指定定期觸發事件的初始延遲。 預設值是 0。編輯啟動類別檔案以顯示下列內容。
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }提示
在本教學課程中,組態或程式代碼中沒有任何驗證作業。 不過,連線到 Azure 服務需要驗證。 若要完成驗證,您需要使用 Azure Identity。 Spring Cloud Azure 使用
DefaultAzureCredential,Azure 身分識別連結庫會提供它來協助您取得認證,而不需要變更任何程序代碼。DefaultAzureCredential支援多種驗證方法,並在執行階段判斷應使用的方法。 這種方法可讓您的應用程式在不同的環境中使用不同的驗證方法(例如本機和生產環境),而不需要實作環境特定的程序代碼。 如需詳細資訊,請參閱 DefaultAzureCredential。若要在本機開發環境中完成驗證,您可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 如需詳細資訊,請參閱 Java 開發環境中的 Azure 驗證。 若要在 Azure 裝載環境中完成驗證,建議您使用使用者指派的受控識別。 如需詳細資訊,請參閱什麼是 Azure 資源受控識別?
啟動應用程式。 如下列範例的訊息將會張貼在應用程式記錄檔中:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed