Condividi tramite


Uso di Apache Kafka® in HDInsight con Apache Flink® in HDInsight nel servizio Azure Kubernetes

Importante

Questa funzionalità è attualmente disponibile solo in anteprima. Le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure disponibili in versione beta, in anteprima o non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire microsoft per altri aggiornamenti nella community di Azure HDInsight.

Un caso d'uso noto per Apache Flink è l'analisi di flusso. Scelta comune da parte di molti utenti per l'uso dei flussi di dati, inseriti con Apache Kafka. Le installazioni tipiche di Flink e Kafka iniziano con il push dei flussi di eventi in Kafka, che possono essere utilizzati dai processi Flink.

Questo esempio usa HDInsight nei cluster del servizio Azure Kubernetes che eseguono Flink 1.17.0 per elaborare l'utilizzo dei dati di streaming e produrre un argomento Kafka.

Nota

FlinkKafkaConsumer è deprecato e verrà rimosso con Flink 1.17. Usare invece KafkaSource. FlinkKafkaProducer è deprecato e verrà rimosso con Flink 1.15. Usare invece KafkaSink.

Prerequisiti

  • Sia Kafka che Flink devono trovarsi nella stessa rete virtuale o devono essere presenti peering tra i due cluster.

  • Creazione della rete virtuale.

  • Creare un cluster Kafka nella stessa rete virtuale. È possibile scegliere Kafka 3.2 o 2.4 in HDInsight in base all'utilizzo corrente.

    Screenshot che mostra come creare un cluster Kafka nella stessa rete virtuale.

  • Aggiungere i dettagli della rete virtuale nella sezione rete virtuale.

  • Creare un pool di cluster HDInsight nel servizio Azure Kubernetes con la stessa rete virtuale.

  • Creare un cluster Flink nel pool di cluster creato.

Apache Kafka Connessione or

Flink fornisce un Connessione or Apache Kafka per la lettura e la scrittura di dati in argomenti Kafka con garanzie una sola volta.

Dipendenza maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Compilazione del sink Kafka

Il sink Kafka fornisce una classe builder per costruire un'istanza di KafkaSink. Viene usato lo stesso per costruire il sink e usarlo insieme al cluster Flink in esecuzione in HDInsight nel servizio Azure Kubernetes

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Scrittura di un programma Java Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

In Webssh caricare il file JAR e inviare il file JAR

Screenshot che mostra il processo in esecuzione in Flink.

Nell'interfaccia utente del dashboard Flink

Screenshot che mostra come inviare il file JAR in pacchetto dell'argomento Kafka come processo a Flink.

Produrre l'argomento : fa clic su Kafka

Screenshot che mostra come produrre un argomento Kafka.

Utilizzare l'argomento - Eventi in Kafka

Screenshot che mostra come usare l'argomento Kafka.

Riferimento