分享方式:


在 HDInsight 上使用 Apache Kafka® 搭配 AKS 上的 HDInsight 上的 Apache Flink®

重要

此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群取得更多更新。

Apache Flink 的已知使用案例是串流分析。 許多使用者使用使用 Apache Kafka 擷取的數據流的熱門選擇。 Flink 和 Kafka 的一般安裝會從推送至 Kafka 的事件串流開始,Flink 作業可以使用。

此範例在執行 Flink 1.17.0 的 AKS 叢集上使用 HDInsight 來處理取用和產生 Kafka 主題的串流數據。

注意

FlinkKafkaConsumer 已被取代,且將會隨著 Flink 1.17 一起移除,請改用 KafkaSource。 FlinkKafkaProducer 已被取代,並將使用 Flink 1.15 移除,請改用 KafkaSink。

必要條件

  • Kafka 和 Flink 都必須位於相同的 VNet 中,或兩個叢集之間應該有 vnet 對等互連。

  • 建立 VNet

  • 在相同的 VNet 中建立 Kafka 叢集。 您可以根據目前的使用量,選擇 HDInsight 上的 Kafka 3.2 或 2.4。

    顯示如何在相同 VNet 中建立 Kafka 叢集的螢幕快照。

  • 在虛擬網路區段中新增 VNet 詳細數據。

  • 在具有相同 VNet 的 AKS 叢集集區上建立 HDInsight。

  • 建立 Flink 叢集至建立的叢集集區。

Apache Kafka 連線 or

Flink 提供 Apache Kafka 連線 or,可透過一次保證,從 Kafka 主題讀取和寫入數據。

Maven 相依性

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

建置 Kafka 接收

Kafka 接收提供建置器類別來建構 KafkaSink 的實例。 我們會使用相同的 來建構接收,並與在 AKS 上的 HDInsight 上執行的 Flink 叢集搭配使用

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

撰寫 Java 程式Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

在 Webssh 上,上傳 jar 並提交 jar

顯示 Flink 上執行之作業的螢幕快照。

在 Flink 儀錶板 UI 上

顯示如何將 Kafka 主題封裝 jar 提交為 Flink 作業的螢幕快照。

產生主題 - 按兩下 Kafka

顯示如何產生 Kafka 主題的螢幕快照。

取用主題 - Kafka 上的事件

顯示如何使用 Kafka 主題的螢幕快照。

參考