你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
将 HDInsight 上的 Apache Kafka® 与 HDInsight on AKS 上的 Apache Flink® 配合使用
注意
我们将于 2025 年 1 月 31 日停用 Azure HDInsight on AKS。 在 2025 年 1 月 31 日之前,你需要将工作负荷迁移到 Microsoft Fabric 或同等的 Azure 产品,以避免工作负荷突然终止。 订阅上的剩余群集会被停止并从主机中移除。
在停用日期之前,仅提供基本支持。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版的补充使用条款包含适用于 beta 版、预览版或其他尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 如有疑问或功能建议,请在 AskHDInsight 上提交请求并附上详细信息,并关注我们以获取 Azure HDInsight Community 的更多更新。
Apache Flink 的已知用例是流分析。 许多用户普遍选择使用数据流,这些数据流是使用 Apache Kafka 引入的。 Flink 和 Kafka 的典型安装从推送到 Kafka 的事件流开始,Flink 作业可以使用该流。
此示例使用运行 Flink 1.17.0 的 HDInsight on AKS 群集来处理使用和生成 Kafka 主题的流数据。
注意
FlinkKafkaConsumer 已弃用,并将随 Flink 1.17 一起删除,请改用 KafkaSource。 FlinkKafkaProducer 已弃用,并将随 Flink 1.15 一起删除,请改用 KafkaSink。
先决条件
Kafka 和 Flink 都需要位于同一 VNet 中,或者两个群集之间应存在 vnet 对等互连。
在同一 VNet 中创建 Kafka 群集。 可以根据当前使用情况在 HDInsight 上选择 Kafka 3.2 或 2.4。
在虚拟网络部分添加 VNet 详细信息。
使用同一 VNet 在 AKS 群集池上创建 HDInsight。
为创建的群集池创建 Flink 群集。
Apache Kafka 连接器
Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka 主题读取数据以及将数据写入到 Kafka 主题,并提供一次保证。
Maven 依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
生成 Kafka 接收器
Kafka 接收器提供用于构造 KafkaSink 实例的生成器类。 我们将使用这种群集来构造接收器,并将其与 HDInsight on AKS 上运行的 Flink 群集一起使用
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
编写 Java 程序 Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
打包 jar 并将作业提交到 Flink
在 Webssh 上,上传 jar 并提交 jar
在 Flink 仪表板 UI 上
生成主题 - 单击 Kafka
使用主题 - Kafka 上的事件
参考
- Apache Kafka 连接器
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和关联的开源项目名称是 Apache Software Foundation (ASF) 的商标。