使用 Apache Flink 搭配適用於 Apache Kafka 的 Azure 事件中樞
本教學課程說明如何將 Apache Flink 連線至事件中樞,而不需要變更通訊協定用戶端或執行您自己的叢集。 如需有關事件中樞對於 Apache Kafka 取用者通訊協定支援的詳細資訊,請參閱適用於 Apache Kafka 的事件中樞。
在本教學課程中,您會了解如何:
- 建立事件中樞命名空間
- 複製範例專案
- 執行 Flink 生產者
- 執行 Flink 取用者
注意
您可在 GitHub 上取得此範例
必要條件
若要完成本教學課程,請確定您具有下列必要條件:
- 請參閱適用於 Apache Kafka 的事件中樞一文。
- Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
- Java Development Kit (JDK) 1.7+
- 在 Ubuntu 上,執行
apt-get install default-jdk
來安裝 JDK。 - 務必設定 JAVA_HOME 環境變數,以指向 JDK 安裝所在的資料夾。
- 在 Ubuntu 上,執行
- 下載並安裝 Maven 二進位封存檔
- 在 Ubuntu 上,您可以執行
apt-get install maven
來安裝 Maven。
- 在 Ubuntu 上,您可以執行
- Git
- 在 Ubuntu 上,您可以執行
sudo apt-get install git
來安裝 Git。
- 在 Ubuntu 上,您可以執行
建立事件中樞命名空間
您需要事件中樞命名空間,才能從任何事件中樞服務傳送或接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請務必複製事件中樞連接字串以供稍後使用。
複製範例專案
既然您已經有事件中樞連接字串,請複製適用於 Kafka 的 Azure 事件中樞存放庫,並瀏覽至 flink
子資料夾:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink
執行 Flink 生產者
使用提供的 Flink 生產者範例,傳送訊息到事件中樞服務。
提供事件中樞 Kafka 端點
producer.config
更新 producer/src/main/resources/producer.config
中 bootstrap.servers
和 sasl.jaas.config
的值,以使用正確的驗證將生產者導向至事件中樞 Kafka 端點。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
從命令列執行生產者
若要從命令列執行生產者,請從 Maven 中產生 JAR 然後從中執行 (或是使用 Maven 產生 JAR,然後將必要的 Kafka JAR 加入至 classpath,在 Java 中執行):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"
生產者現在會開始將事件傳送到事件中樞 (位於主題 test
),並將事件印出至 stdout。
執行 Flink 取用者
使用提供的取用者範例,接收來自事件中樞的訊息。
提供事件中樞 Kafka 端點
consumer.config
更新 consumer/src/main/resources/consumer.config
中 bootstrap.servers
和 sasl.jaas.config
的值,以使用正確的驗證將取用者導向至事件中樞 Kafka 端點。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
從命令列執行取用者
若要從命令列執行取用者,請從 Maven 中產生 JAR 然後從中執行 (或是使用 Maven 產生 JAR,然後將必要的 Kafka JAR 加入至 classpath,以在 Java 中執行):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"
如果事件中樞有事件 (例如您的生產者也正在執行),則取用者現在會開始接收來自主題 test
的事件。
如需有關將 Flink 連線至 Kafka 的詳細資訊,請參閱 Flink 的 Kafka 連接器指南 \(英文\)。
下一步
若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: