在 HDInsight 上使用 Apache Kafka® 搭配 AKS 上的 HDInsight 上的 Apache Flink®
重要
此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight 上提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群上取得更多更新。
Apache Flink 的已知使用案例是串流分析。 許多使用者使用使用 Apache Kafka 擷取的數據流的熱門選擇。 Flink 和 Kafka 的一般安裝會從推送至 Kafka 的事件串流開始,Flink 作業可以使用。
此範例在執行 Flink 1.17.0 的 AKS 叢集上使用 HDInsight 來處理取用和產生 Kafka 主題的串流數據。
注意
FlinkKafkaConsumer 已被取代,且將會隨著 Flink 1.17 一起移除,請改用 KafkaSource。 FlinkKafkaProducer 已被取代,並將使用 Flink 1.15 移除,請改用 KafkaSink。
必要條件
Kafka 和 Flink 都必須位於相同的 VNet 中,或兩個叢集之間應該有 vnet 對等互連。
在相同的 VNet 中建立 Kafka 叢集。 您可以根據目前的使用量,選擇 HDInsight 上的 Kafka 3.2 或 2.4。
在虛擬網路區段中新增 VNet 詳細數據。
在具有相同 VNet 的 AKS 叢集集區上建立 HDInsight。
建立 Flink 叢集至建立的叢集集區。
Apache Kafka 連線 or
Flink 提供 Apache Kafka 連線 or,可透過一次保證,從 Kafka 主題讀取和寫入數據。
Maven 相依性
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
建置 Kafka 接收
Kafka 接收提供建置器類別來建構 KafkaSink 的實例。 我們會使用相同的 來建構接收,並與在 AKS 上的 HDInsight 上執行的 Flink 叢集搭配使用
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
撰寫 Java 程式Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
封裝 jar 並將作業提交至 Flink
在 Webssh 上,上傳 jar 並提交 jar
在 Flink 儀錶板 UI 上
產生主題 - 按兩下 Kafka
取用主題 - Kafka 上的事件
參考
- Apache Kafka 連線 or
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的 開放原始碼 項目名稱是 Apache Software Foundation (ASF) 的商標。
意見反映
https://aka.ms/ContentUserFeedback。
即將推出:我們會在 2024 年淘汰 GitHub 問題,並以全新的意見反應系統取代並作為內容意見反應的渠道。 如需更多資訊,請參閱:提交及檢視以下的意見反映: