分享方式:


使用 Apache Flink 搭配適用於 Apache Kafka 的 Azure 事件中樞

本教學課程說明如何將 Apache Flink 連線至事件中樞,而不需要變更通訊協定用戶端或執行您自己的叢集。 如需有關事件中樞對於 Apache Kafka 取用者通訊協定支援的詳細資訊,請參閱適用於 Apache Kafka 的事件中樞

在本教學課程中,您會了解如何:

  • 建立事件中樞命名空間
  • 複製範例專案
  • 執行 Flink 生產者
  • 執行 Flink 取用者

注意

您可在 GitHub 上取得此範例

必要條件

若要完成本教學課程,請確定您具有下列必要條件:

  • 請參閱適用於 Apache Kafka 的事件中樞一文。
  • Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶
  • Java Development Kit (JDK) 1.7+
    • 在 Ubuntu 上,執行 apt-get install default-jdk 來安裝 JDK。
    • 務必設定 JAVA_HOME 環境變數,以指向 JDK 安裝所在的資料夾。
  • 下載安裝 Maven 二進位封存檔
    • 在 Ubuntu 上,您可以執行 apt-get install maven 來安裝 Maven。
  • Git
    • 在 Ubuntu 上,您可以執行 sudo apt-get install git 來安裝 Git。

建立事件中樞命名空間

您需要事件中樞命名空間,才能從任何事件中樞服務傳送或接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請務必複製事件中樞連接字串以供稍後使用。

複製範例專案

既然您已經有事件中樞連接字串,請複製適用於 Kafka 的 Azure 事件中樞存放庫,並瀏覽至 flink 子資料夾:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink

使用提供的 Flink 生產者範例,傳送訊息到事件中樞服務。

提供事件中樞 Kafka 端點

producer.config

更新 producer/src/main/resources/producer.configbootstrap.serverssasl.jaas.config 的值,以使用正確的驗證將生產者導向至事件中樞 Kafka 端點。

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

從命令列執行生產者

若要從命令列執行生產者,請從 Maven 中產生 JAR 然後從中執行 (或是使用 Maven 產生 JAR,然後將必要的 Kafka JAR 加入至 classpath,在 Java 中執行):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"

生產者現在會開始將事件傳送到事件中樞 (位於主題 test),並將事件印出至 stdout。

使用提供的取用者範例,接收來自事件中樞的訊息。

提供事件中樞 Kafka 端點

consumer.config

更新 consumer/src/main/resources/consumer.configbootstrap.serverssasl.jaas.config 的值,以使用正確的驗證將取用者導向至事件中樞 Kafka 端點。

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

從命令列執行取用者

若要從命令列執行取用者,請從 Maven 中產生 JAR 然後從中執行 (或是使用 Maven 產生 JAR,然後將必要的 Kafka JAR 加入至 classpath,以在 Java 中執行):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"

如果事件中樞有事件 (例如您的生產者也正在執行),則取用者現在會開始接收來自主題 test 的事件。

如需有關將 Flink 連線至 Kafka 的詳細資訊,請參閱 Flink 的 Kafka 連接器指南 \(英文\)。

下一步

若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: