استخدام Apache Kafka® على HDInsight مع Apache Flink® على HDInsight على AKS
إشعار
سنتقاعد Azure HDInsight على AKS في 31 يناير 2025. قبل 31 يناير 2025، ستحتاج إلى ترحيل أحمال العمل الخاصة بك إلى Microsoft Fabric أو منتج Azure مكافئ لتجنب الإنهاء المفاجئ لأحمال العمل الخاصة بك. سيتم إيقاف المجموعات المتبقية على اشتراكك وإزالتها من المضيف.
سيتوفر الدعم الأساسي فقط حتى تاريخ الإيقاف.
هام
هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.
حالة استخدام معروفة ل Apache Flink هي تحليلات الدفق. الاختيار الشائع من قبل العديد من المستخدمين لاستخدام تدفقات البيانات، والتي يتم استيعابها باستخدام Apache Kafka. تبدأ عمليات التثبيت النموذجية ل Flink وKafka بتدفقات الأحداث التي يتم دفعها إلى Kafka، والتي يمكن استهلاكها بواسطة وظائف Flink.
يستخدم هذا المثال HDInsight على مجموعات AKS التي تقوم بتشغيل Flink 1.17.0 لمعالجة دفق البيانات المستهلكة لموضوع Kafka وإنتاجه.
إشعار
تم إهمال FlinkKafkaConsumer وسيتم إزالته باستخدام Flink 1.17، يرجى استخدام KafkaSource بدلا من ذلك. تم إهمال FlinkKafkaProducer وسيتم إزالته باستخدام Flink 1.15، يرجى استخدام KafkaSink بدلا من ذلك.
المتطلبات الأساسية
يجب أن يكون كل من Kafka وFlink في نفس VNet أو يجب أن يكون هناك نظير vnet بين نظامي المجموعات.
إنشاء نظام مجموعة Kafka في نفس VNet. يمكنك اختيار Kafka 3.2 أو 2.4 على HDInsight استنادا إلى استخدامك الحالي.
أضف تفاصيل الشبكة الظاهرية في قسم الشبكة الظاهرية.
إنشاء HDInsight على مجموعة نظام مجموعة AKS باستخدام نفس VNet.
إنشاء مجموعة Flink إلى مجموعة نظام المجموعة التي تم إنشاؤها.
موصل Apache Kafka
يوفر Flink موصل Apache Kafka لقراءة البيانات من مواضيع Kafka وكتابتها مع ضمانات مرة واحدة بالضبط.
تبعية Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
بناء Kafka Sink
يوفر متلقي Kafka فئة منشئ لإنشاء مثيل KafkaSink. نستخدم نفس الشيء لإنشاء المتلقي الخاص بنا واستخدامه جنبا إلى جنب مع مجموعة Flink التي تعمل على HDInsight على AKS
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
كتابة برنامج Java Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
قم بحزم الجرة وإرسال المهمة إلى Flink
على Webssh، قم بتحميل الجرة وإرسال الجرة
على واجهة مستخدم لوحة معلومات Flink
إنتاج الموضوع - النقرات على Kafka
استهلاك الموضوع - الأحداث على Kafka
المرجع
- موصل Apache Kafka
- Apache وApache Kafka وKafka وApache Flink وFlink وأسماء مشاريع مصدر مفتوح المرتبطة هي علامات تجارية ل Apache Software Foundation (ASF).