Freigeben über


Verwenden von Apache Kafka® in HDInsight mit Apache Flink® in HDInsight on AKS

Hinweis

Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.

Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Ein bekannter Anwendungsfall für Apache Flink ist Stream Analytics. Dies ist eine beliebte Wahl vieler Benutzer*innen, um die Datenströme zu verwenden, die mit Apache Kafka erfasst werden. Typische Installationen von Flink und Kafka beginnen mit Ereignisstreams, die an Kafka übertragen werden und von Flink-Aufträgen genutzt werden können.

In diesem Beispiel wird HDInsight auf AKS-Clustern mit Flink 1.17.0 verwendet, um Streamingdaten zu verarbeiten, die Kafka-Themen verarbeiten und produzieren.

Hinweis

FlinkKafkaConsumer ist veraltet und wird mit Flink 1.17 entfernt, verwenden Sie stattdessen KafkaSource. FlinkKafkaProducer ist veraltet und wird mit Flink 1.15 entfernt, verwenden Sie stattdessen KafkaSink.

Voraussetzungen

  • Sowohl Kafka als auch Flink müssen sich im gleichen VNet befinden, oder es sollte vnet-peering zwischen den beiden Clustern vorhanden sein.

  • Erstellung von VNet.

  • Erstellen Sie einen Kafka-Cluster im selben VNet. Sie können Kafka 3.2 oder 2.4 auf HDInsight basierend auf Ihrer aktuellen Nutzung auswählen.

    Screenshot des Erstellens eines Kafka-Clusters im selben VNet.

  • Fügen Sie die VNet-Details im Abschnitt „Virtuelles Netzwerk“ hinzu.

  • Erstellen Sie einen HDInsight auf AKS-Clusterpool mit demselben VNet.

  • Erstellen Sie einen Flink-Cluster im erstellten Clusterpool.

Apache Kafka-Connector

Flink bietet einen Apache Kafka-Connector zum Lesen von Daten aus und Schreiben von Daten in Kafka-Themen mit Genau-Einmal-Garantien.

Maven-Abhängigkeit

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Erstellen eines Kafka-Sinks

Ein Kafka-Sink stellt eine Generatorklasse bereit, um eine Instanz eines KafkaSink zu erstellen. Wir verwenden dasselbe, um unser Sink zu konstruieren und zusammen mit dem Flink-Cluster zu verwenden, der auf HDInsight on AKS ausgeführt wird.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Schreiben eines Java-Programms Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Laden Sie die JAR-Datei in WebSSH hoch, und übermitteln Sie sie.

Screenshot des ausgeführten Auftrags in Flink.

Auf der Benutzeroberfläche des Flink-Dashboards

Screenshot des Übermittelns des als JAR-Datei gepackten Kafka-Themas als Auftrag an Flink.

Produzieren des Themas - Klicks auf Kafka

Screenshot des Erstellens eines Kafka-Themas.

Nutzen des Themas - Ereignisse auf Kafka

Screenshot des Verarbeitens eines Kafka-Themas.

Verweis