分享方式:


使用 Akka Streams 搭配適用於 Apache Kafka 的事件中樞

本教學課程說明如何透過 Apache Kafka 的事件中樞支援連線至 Akka Streams,而不需要變更通訊協定用戶端或執行您自己的叢集。

在本教學課程中,您會了解如何:

  • 建立事件中樞命名空間
  • 複製範例專案
  • 執行 Akka Streams 生產者
  • 執行 Akka Streams 取用者

注意

您可在 GitHub 上取得此範例

必要條件

若要完成本教學課程,請確定您具有下列必要條件:

  • 請參閱適用於 Apache Kafka 的事件中樞一文。
  • Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶
  • Java Development Kit (JDK) 1.8+ \(英文\)
    • 在 Ubuntu 上,執行 apt-get install default-jdk 來安裝 JDK。
    • 務必設定 JAVA_HOME 環境變數,以指向 JDK 安裝所在的資料夾。
  • 下載安裝 Maven 二進位封存檔
    • 在 Ubuntu 上,您可以執行 apt-get install maven 來安裝 Maven。
  • Git
    • 在 Ubuntu 上,您可以執行 sudo apt-get install git 來安裝 Git。

建立事件中樞命名空間

您需要事件中樞命名空間,才能從任何事件中樞服務傳送或接收。 如需詳細資訊,請參閱建立事件中樞。 請務必複製事件中樞連接字串以供稍後使用。

複製範例專案

既然您已經有事件中樞連接字串,請複製適用於 Kafka 的 Azure 事件中樞存放庫,並瀏覽至 akka 子資料夾:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java

執行 Akka Streams 生產者

使用提供的 Akka Streams 生產者範例,傳送訊息到事件中樞服務。

提供事件中樞 Kafka 端點

生產者 application.conf

更新 producer/src/main/resources/application.confbootstrap.serverssasl.jaas.config 的值,以使用正確的驗證將生產者導向至事件中樞 Kafka 端點。

akka.kafka.producer {
    #Akka Kafka producer properties can be defined here


    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
    # can be defined in this configuration section.
    kafka-clients {
        bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
        sasl.mechanism=PLAIN
        security.protocol=SASL_SSL
        sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

從命令列執行生產者

若要從命令列執行生產者,請從 Maven 中產生 JAR 然後從中執行 (或是使用 Maven 產生 JAR,然後將必要的 Kafka JAR 加入至 classpath,在 Java 中執行):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"

生產者會開始將事件傳送到事件中樞 (位於主題 test),並將事件印出至 stdout。

執行 Akka Streams 取用者

使用提供的取用者範例,接收來自事件中樞的訊息。

提供事件中樞 Kafka 端點

取用者 application.conf

更新 consumer/src/main/resources/application.confbootstrap.serverssasl.jaas.config 的值,以使用正確的驗證將取用者導向至事件中樞 Kafka 端點。

akka.kafka.consumer {
    #Akka Kafka consumer properties defined here
    wakeup-timeout=60s

    # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
    # defined in this configuration section.
    kafka-clients {
       request.timeout.ms=60000
       group.id=akka-example-consumer

       bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
       sasl.mechanism=PLAIN
       security.protocol=SASL_SSL
       sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

從命令列執行取用者

若要從命令列執行取用者,請從 Maven 中產生 JAR 然後從中執行 (或是使用 Maven 產生 JAR,然後將必要的 Kafka JAR 加入至 classpath,以在 Java 中執行):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"

如果事件中樞有事件 (例如您的生產者也正在執行),則取用者會開始接收來自主題 test 的事件。

如需有關 Akka Streams 的詳細資訊,請參閱 Akka Streams Kafka 指南 \(英文\)。

下一步

若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: