Aracılığıyla paylaş


AKS üzerinde HDInsight üzerinde Apache Flink® ile HDInsight üzerinde Apache Kafka® kullanma

Önemli

Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.

Apache Flink için iyi bilinen bir kullanım örneği akış analizidir. Apache Kafka kullanılarak alınan veri akışlarını kullanmak için birçok kullanıcı tarafından popüler seçim. Flink ve Kafka'nın tipik yüklemeleri, Flink işleri tarafından tüketilebilen Kafka'ya gönderilen olay akışlarıyla başlar.

Bu örnekte, Kafka konusunu kullanan ve üreten akış verilerini işlemek için Flink 1.17.0 çalıştıran AKS kümelerinde HDInsight kullanılır.

Not

FlinkKafkaConsumer kullanım dışıdır ve Flink 1.17 ile kaldırılacaktır, lütfen bunun yerine KafkaSource kullanın. FlinkKafkaProducer kullanım dışıdır ve Flink 1.15 ile kaldırılacaktır, lütfen bunun yerine KafkaSink kullanın.

Önkoşullar

  • Kafka ve Flink'in aynı sanal ağda olması veya iki küme arasında sanal ağ eşlemesi olması gerekir.

  • Sanal ağ oluşturma.

  • Aynı sanal ağda bir Kafka kümesi oluşturun. Geçerli kullanımınıza bağlı olarak HDInsight üzerinde Kafka 3.2 veya 2.4'i seçebilirsiniz.

    Aynı sanal ağda Kafka kümesinin nasıl oluşturulacağını gösteren ekran görüntüsü.

  • Sanal ağ ayrıntılarını sanal ağ bölümüne ekleyin.

  • Aynı sanal ağa sahip AKS Kümesi havuzunda HDInsight oluşturun.

  • Oluşturulan küme havuzuna bir Flink kümesi oluşturun.

Apache Kafka Bağlan or

Flink, kafka konularından veri okumak ve kafka konularına veri yazmak için bir Apache Kafka Bağlan veya tam olarak bir kez garanti eder.

Maven bağımlılığı

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Kafka Havuzu Oluşturma

Kafka havuzu, KafkaSink örneğini oluşturmak için bir oluşturucu sınıfı sağlar. Havuzumuzu oluşturmak ve AKS üzerinde HDInsight üzerinde çalışan Flink kümesiyle birlikte kullanmak için de aynısını kullanırız

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Java programı Event.java yazma

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Webssh'te jar dosyasını karşıya yükleyin ve jar dosyasını gönderin

Flink üzerinde çalışan işi gösteren ekran görüntüsü.

Flink Panosu kullanıcı arabiriminde

Kafka konusu paketlenmiş jar dosyasınıN Flink'e iş olarak nasıl gönderıldığını gösteren ekran görüntüsü.

Konuyu oluşturma - Kafka'ya tıklar

Kafka konusunun nasıl üretildiğini gösteren ekran görüntüsü.

Konuyu kullanma - Kafka'da olaylar

Kafka konusunu kullanmayı gösteren ekran görüntüsü.

Başvuru

  • Apache Kafka Bağlan or
  • Apache, Apache Kafka, Kafka, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.