你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将 HDInsight 上的 Apache Kafka® 与 HDInsight on AKS 上的 Apache Flink® 配合使用

注意

我们将于 2025 年 1 月 31 日停用 Azure HDInsight on AKS。 在 2025 年 1 月 31 日之前,你需要将工作负荷迁移到 Microsoft Fabric 或同等的 Azure 产品,以避免工作负荷突然终止。 订阅上的剩余群集会被停止并从主机中移除。

在停用日期之前,仅提供基本支持。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版的补充使用条款包含适用于 beta 版、预览版或其他尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 如有疑问或功能建议,请在 AskHDInsight 上提交请求并附上详细信息,并关注我们以获取 Azure HDInsight Community 的更多更新。

Apache Flink 的已知用例是流分析。 许多用户普遍选择使用数据流,这些数据流是使用 Apache Kafka 引入的。 Flink 和 Kafka 的典型安装从推送到 Kafka 的事件流开始,Flink 作业可以使用该流。

此示例使用运行 Flink 1.17.0 的 HDInsight on AKS 群集来处理使用和生成 Kafka 主题的流数据。

注意

FlinkKafkaConsumer 已弃用,并将随 Flink 1.17 一起删除,请改用 KafkaSource。 FlinkKafkaProducer 已弃用,并将随 Flink 1.15 一起删除,请改用 KafkaSink。

先决条件

Apache Kafka 连接器

Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka 主题读取数据以及将数据写入到 Kafka 主题,并提供一次保证。

Maven 依赖项

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

生成 Kafka 接收器

Kafka 接收器提供用于构造 KafkaSink 实例的生成器类。 我们将使用这种群集来构造接收器,并将其与 HDInsight on AKS 上运行的 Flink 群集一起使用

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

编写 Java 程序 Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

在 Webssh 上,上传 jar 并提交 jar

显示在 Flink 上运行的作业的屏幕截图。

在 Flink 仪表板 UI 上

显示如何将 Kafka 主题打包的 jar 作为作业提交到 Flink 的屏幕截图。

生成主题 - 单击 Kafka

显示如何生成 Kafka 主题的屏幕截图。

使用主题 - Kafka 上的事件

显示如何使用 Kafka 主题的屏幕截图。

参考