Partager via


Utilisation d’Apache Kafka® sur HDInsight avec Apache Flink® sur HDInsight sur AKS

Important

Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.

Un cas d’usage bien connu d’Apache Flink est l’analyse de flux. Le choix populaire de nombreux utilisateurs d’utiliser les flux de données ingérés à l’aide d’Apache Kafka. Les installations typiques de Flink et Kafka commencent par le transfert de flux d’événements vers Kafka, qui peuvent être consommés par les tâches Flink.

Cet exemple utilise HDInsight sur des clusters AKS exécutant Flink 1.17.0 pour traiter les données de diffusion en continu consommant et produisant un sujet Kafka.

Remarque

FlinkKafkaConsumer est obsolète et sera supprimé avec Flink 1.17, veuillez utiliser KafkaSource à la place. FlinkKafkaProducer est obsolète et sera supprimé avec Flink 1.15, veuillez utiliser KafkaSink à la place.

Prérequis

Connecteur Apache Kafka

Flink fournit un connecteur Apache Kafka pour lire et écrire des données dans des sujets Kafka avec des garanties uniques.

Dépendance Maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Construire un évier Kafka

Le récepteur Kafka fournit une classe de générateur pour construire une instance de KafkaSink. Nous utilisons la même chose pour construire notre Sink et l’utilisons avec le cluster Flink exécutant HDInsight sur AKS

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Écrire un programme Java Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Sur Webssh, chargez le fichier jar et soumettez-le

Capture d’écran montrant le travail en cours d’exécution sur Flink.

Sur l’interface utilisateur du tableau de bord Flink

Capture d’écran montrant comment soumettre le fichier jar empaqueté du sujet Kafka en tant que travail à Flink.

Produire le sujet – clics sur Kafka

Capture d’écran montrant comment produire un sujet Kafka.

Consommer le sujet – événements sur Kafka

Capture d’écran montrant comment consommer un sujet Kafka.

Référence