搭配適用於 Kafka API 的 Azure 事件中樞 使用 Spring Kafka

本教學課程說明如何設定 Java 型 Spring Cloud Stream Binder,以針對 Kafka 使用 Azure 事件中樞 來傳送和接收具有 Azure 事件中樞 的訊息。 如需詳細資訊,請參閱從 Apache Kafka 應用程式使用 Azure 事件中樞

在本教學課程中,我們將包含兩種驗證方法: Microsoft Entra 驗證共用存取簽章 (SAS) 驗證。 [無密碼] 索引標籤會顯示 Microsoft Entra 驗證,而 [連線 ion 字串] 索引標籤標會顯示 SAS 驗證。

Microsoft Entra 驗證是使用 Microsoft Entra 識別碼中定義的身分識別連線到 Kafka Azure 事件中樞 的機制。 透過 Microsoft Entra 驗證,您可以在集中的位置管理資料庫使用者的身分識別和其他 Microsoft 服務,從而簡化權限管理。

SAS 驗證會使用 Azure 事件中樞 命名空間的 連接字串,來委派 Kafka 事件中樞的存取權。 如果您選擇使用共用存取簽章作為認證,您必須自行管理 連接字串。

必要條件

重要

需要 Spring Boot 2.5 版或更高版本,才能完成本教學課程中的步驟。

準備認證

Azure 事件中樞支援使用 Microsoft Entra ID 來授權對事件中樞資源的要求。 您可以使用 Microsoft Entra ID,使用 Azure 角色型存取控制 (Azure RBAC) 將許可權 授與安全性主體,這可能是使用者或應用程式服務主體。

如果您想要使用 Microsoft Entra 驗證在本機執行此範例,請確定您的使用者帳戶已透過適用於 IntelliJ 的 Azure 工具組、Visual Studio Code Azure 帳戶外掛程式或 Azure CLI 進行驗證。 此外,請確定帳戶已獲得足夠的許可權。

注意

使用無密碼連線時,您必須將資源的存取權授與您的帳戶。 在 Azure 事件中樞 中,將 和 Azure Event Hubs Data Sender 角色指派Azure Event Hubs Data Receiver給您目前使用的 Microsoft Entra 帳戶。 如需授與存取角色的詳細資訊,請參閱使用 Microsoft Entra ID,使用 Azure 入口網站 指派 Azure 角色和授權事件中樞資源的存取權。

從 Azure 事件中樞 傳送和接收訊息

透過 Azure 事件中樞,您可以使用 Spring Cloud Azure 來傳送和接收訊息。

若要安裝 Spring Cloud Azure Starter 模組,請將下列相依性新增至您的 pom.xml 檔案:

  • Spring Cloud Azure 材料帳單(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    注意

    如果您使用 Spring Boot 2.x,請務必將 spring-cloud-azure-dependencies 版本設定為 4.17.0。 此材料帳單 (BOM) 應該在<dependencyManagement>pom.xml檔案的 區段中設定。 這可確保所有 Spring Cloud Azure 相依性都使用相同的版本。 如需此 BOM 所用版本的詳細資訊,請參閱 應該使用哪個版本的 Spring Cloud Azure。

  • Spring Cloud Azure 入門成品:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    

編碼應用程式

使用下列步驟來設定應用程式,以使用 Azure 事件中樞 產生和取用訊息。

  1. 將下列屬性新增至 application.properties 檔案,以設定事件中樞認證。

    spring.cloud.stream.kafka.binder.brokers=${AZ_EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093
    spring.cloud.function.definition=consume;supply
    spring.cloud.stream.bindings.consume-in-0.destination=${AZ_EVENTHUB_NAME}
    spring.cloud.stream.bindings.consume-in-0.group=$Default
    spring.cloud.stream.bindings.supply-out-0.destination=${AZ_EVENTHUB_NAME}
    

    提示

    如果您使用 版本 spring-cloud-azure-dependencies:4.3.0,則應該使用 值 com.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration來新增 屬性spring.cloud.stream.binders.<kafka-binder-name>.environment.spring.main.sources

    由於 4.4.0會自動新增這個屬性,因此不需要手動新增它。

    下表描述群組態中的欄位:

    欄位 描述
    spring.cloud.stream.kafka.binder.brokers 指定 Azure 事件中樞 端點。
    spring.cloud.stream.bindings.consume-in-0.destination 指定輸入目的地事件中樞,本教學課程是您稍早建立的中樞。
    spring.cloud.stream.bindings.consume-in-0.group 從 Azure 事件中樞 指定取用者群組,您可以將其設定$Default為 ,以使用您在建立 Azure 事件中樞 實例時建立的基本取用者群組。
    spring.cloud.stream.bindings.supply-out-0.destination 指定輸出目的地事件中樞,本教學課程與輸入目的地相同。

    注意

    如果您啟用自動主題建立,請務必新增組態專案 spring.cloud.stream.kafka.binder.replicationFactor,並將值設定為至少 1。 如需詳細資訊,請參閱 Spring Cloud Stream Kafka Binder 參考指南

  2. 編輯啟動類別檔案以顯示下列內容。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.GenericMessage;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    
    @SpringBootApplication
    public class EventHubKafkaBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaBinderApplication.class);
    
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubKafkaBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->LOGGER.info("New message received: '{}'", message.getPayload());
        }
    
        @Override
        public void run(String... args) {
            many.emitNext(new GenericMessage<>("Hello World"), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    提示

    在本教學課程中,組態或程式代碼中沒有任何驗證作業。 不過,連線到 Azure 服務需要驗證。 若要完成驗證,您需要使用 Azure Identity。 Spring Cloud Azure 使用 DefaultAzureCredential,Azure 身分識別連結庫會提供它來協助您取得認證,而不需要變更任何程序代碼。

    DefaultAzureCredential 支援多種驗證方法,並在執行階段判斷應使用的方法。 這種方法可讓您的應用程式在不同的環境中使用不同的驗證方法(例如本機和生產環境),而不需要實作環境特定的程序代碼。 如需詳細資訊,請參閱 DefaultAzureCredential

    若要在本機開發環境中完成驗證,您可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 如需詳細資訊,請參閱 Java 開發環境中的 Azure 驗證。 若要在 Azure 裝載環境中完成驗證,建議您使用使用者指派的受控識別。 如需詳細資訊,請參閱什麼是 Azure 資源的受控識別?

  3. 啟動應用程式。 如下列範例的訊息將會張貼在應用程式記錄檔中:

    Kafka version: 3.0.1
    Kafka commitId: 62abe01bee039651
    Kafka startTimeMs: 1622616433956
    New message received: 'Hello World'
    

部署至 Azure Spring Apps

現在您已在本機執行 Spring Boot 應用程式,現在可以將其移至生產環境。 Azure Spring Apps 可讓您輕鬆地將 Spring Boot 應用程式部署至 Azure,而不需要變更任何程式代碼。 服務會管理 Spring 應用程式的基礎結構,讓開發人員可以專注於處理程式碼。 Azure Spring 應用程式提供生命週期管理,使用全方位的監視和診斷、組態管理、服務探索、持續整合與持續傳遞的整合、藍綠部署等等。 若要將應用程式部署至 Azure Spring Apps,請參閱 將第一個應用程式部署至 Azure Spring Apps

下一步