使用適用於 Apache Kafka® 的 Azure 事件中樞,在 AKS 上 HDInsight 上連接 Apache Flink®
注意
AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。
在淘汰日期之前,只有基本支援可用。
重要
此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。
Apache Flink 的已知使用案例是串流分析。 許多使用者使用資料流的熱門選擇,會使用 Apache Kafka 來擷取這些資料流。 Flink 和 Kafka 的一般安裝會從正在推送至 Kafka 的事件串流開始,然後由 Flink 作業使用。 Azure 事件中樞在事件中樞上提供一個 Apache Kafka 端點,可讓使用者使用 Kafka 通訊協定連線到事件中樞。
在本文中,我們會探索如何將 Azure 事件中樞與在 AKS 上 HDInsight 上的 Apache Flink 連接,並說明下列內容
- 建立事件中樞命名空間
- 使用 Apache Flink 建立在 AKS 上的 HDInsight 叢集
- 執行 Flink 生產者
- 適用於 Apache Flink 的套件 Jar
- 作業提交和驗證
建立事件中樞命名空間和事件中樞
若要建立事件中樞命名空間和事件中樞,請參閱此處
在 AKS 上 HDInsight 上設定 Flink 叢集
您可以使用現有在 AKS 上 HDInsight 叢集集區,建立 Flink 叢集
執行會新增 bootstrap.servers 和
producer.config
資訊的 Flink 產生者bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
將
{YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱如何取得事件中樞連接字串的詳細資料。例如,
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
封裝適用於 Flink 的 JAR
封裝 com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
新增程式碼片段以執行 Flink 產生者。
執行程式碼之後,事件會儲存在主題 "topic1" 中
參考
- Apache Flink 網站
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的開放原始碼專案名稱是 Apache Software Foundation (ASF) 的商標。