AKS üzerinde HDInsight üzerinde Apache Flink® ile HDInsight üzerinde Apache Kafka® kullanma
Önemli
Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.
Apache Flink için iyi bilinen bir kullanım örneği akış analizidir. Apache Kafka kullanılarak alınan veri akışlarını kullanmak için birçok kullanıcı tarafından popüler seçim. Flink ve Kafka'nın tipik yüklemeleri, Flink işleri tarafından tüketilebilen Kafka'ya gönderilen olay akışlarıyla başlar.
Bu örnekte, Kafka konusunu kullanan ve üreten akış verilerini işlemek için Flink 1.17.0 çalıştıran AKS kümelerinde HDInsight kullanılır.
Not
FlinkKafkaConsumer kullanım dışıdır ve Flink 1.17 ile kaldırılacaktır, lütfen bunun yerine KafkaSource kullanın. FlinkKafkaProducer kullanım dışıdır ve Flink 1.15 ile kaldırılacaktır, lütfen bunun yerine KafkaSink kullanın.
Önkoşullar
Kafka ve Flink'in aynı sanal ağda olması veya iki küme arasında sanal ağ eşlemesi olması gerekir.
Sanal ağ oluşturma.
Aynı sanal ağda bir Kafka kümesi oluşturun. Geçerli kullanımınıza bağlı olarak HDInsight üzerinde Kafka 3.2 veya 2.4'i seçebilirsiniz.
Sanal ağ ayrıntılarını sanal ağ bölümüne ekleyin.
Aynı sanal ağa sahip AKS Kümesi havuzunda HDInsight oluşturun.
Oluşturulan küme havuzuna bir Flink kümesi oluşturun.
Apache Kafka Bağlan or
Flink, kafka konularından veri okumak ve kafka konularına veri yazmak için bir Apache Kafka Bağlan veya tam olarak bir kez garanti eder.
Maven bağımlılığı
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Kafka Havuzu Oluşturma
Kafka havuzu, KafkaSink örneğini oluşturmak için bir oluşturucu sınıfı sağlar. Havuzumuzu oluşturmak ve AKS üzerinde HDInsight üzerinde çalışan Flink kümesiyle birlikte kullanmak için de aynısını kullanırız
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Java programı Event.java yazma
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Jar dosyasını paketleyin ve işi Flink'e gönderin
Webssh'te jar dosyasını karşıya yükleyin ve jar dosyasını gönderin
Flink Panosu kullanıcı arabiriminde
Konuyu oluşturma - Kafka'ya tıklar
Konuyu kullanma - Kafka'da olaylar
Başvuru
- Apache Kafka Bağlan or
- Apache, Apache Kafka, Kafka, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin