Verwenden von Apache Kafka® in HDInsight mit Apache Flink® in HDInsight on AKS
Hinweis
Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.
Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.
Wichtig
Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.
Ein bekannter Anwendungsfall für Apache Flink ist Stream Analytics. Dies ist eine beliebte Wahl vieler Benutzer*innen, um die Datenströme zu verwenden, die mit Apache Kafka erfasst werden. Typische Installationen von Flink und Kafka beginnen mit Ereignisstreams, die an Kafka übertragen werden und von Flink-Aufträgen genutzt werden können.
In diesem Beispiel wird HDInsight auf AKS-Clustern mit Flink 1.17.0 verwendet, um Streamingdaten zu verarbeiten, die Kafka-Themen verarbeiten und produzieren.
Hinweis
FlinkKafkaConsumer ist veraltet und wird mit Flink 1.17 entfernt, verwenden Sie stattdessen KafkaSource. FlinkKafkaProducer ist veraltet und wird mit Flink 1.15 entfernt, verwenden Sie stattdessen KafkaSink.
Voraussetzungen
Sowohl Kafka als auch Flink müssen sich im gleichen VNet befinden, oder es sollte vnet-peering zwischen den beiden Clustern vorhanden sein.
Erstellen Sie einen Kafka-Cluster im selben VNet. Sie können Kafka 3.2 oder 2.4 auf HDInsight basierend auf Ihrer aktuellen Nutzung auswählen.
Fügen Sie die VNet-Details im Abschnitt „Virtuelles Netzwerk“ hinzu.
Erstellen Sie einen HDInsight auf AKS-Clusterpool mit demselben VNet.
Erstellen Sie einen Flink-Cluster im erstellten Clusterpool.
Apache Kafka-Connector
Flink bietet einen Apache Kafka-Connector zum Lesen von Daten aus und Schreiben von Daten in Kafka-Themen mit Genau-Einmal-Garantien.
Maven-Abhängigkeit
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Erstellen eines Kafka-Sinks
Ein Kafka-Sink stellt eine Generatorklasse bereit, um eine Instanz eines KafkaSink zu erstellen. Wir verwenden dasselbe, um unser Sink zu konstruieren und zusammen mit dem Flink-Cluster zu verwenden, der auf HDInsight on AKS ausgeführt wird.
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Schreiben eines Java-Programms Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Packen Sie die JAR-Datei und übermitteln Sie den Auftrag an Flink
Laden Sie die JAR-Datei in WebSSH hoch, und übermitteln Sie sie.
Auf der Benutzeroberfläche des Flink-Dashboards
Produzieren des Themas - Klicks auf Kafka
Nutzen des Themas - Ereignisse auf Kafka
Verweis
- Apache Kafka-Connector
- Apache, Apache Kafka, Kafka, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Marken der Apache Software Foundation (ASF).