استخدام Apache Kafka® على HDInsight مع Apache Flink® على HDInsight على AKS

إشعار

سنتقاعد Azure HDInsight على AKS في 31 يناير 2025. قبل 31 يناير 2025، ستحتاج إلى ترحيل أحمال العمل الخاصة بك إلى Microsoft Fabric أو منتج Azure مكافئ لتجنب الإنهاء المفاجئ لأحمال العمل الخاصة بك. سيتم إيقاف المجموعات المتبقية على اشتراكك وإزالتها من المضيف.

سيتوفر الدعم الأساسي فقط حتى تاريخ الإيقاف.

هام

هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.

حالة استخدام معروفة ل Apache Flink هي تحليلات الدفق. الاختيار الشائع من قبل العديد من المستخدمين لاستخدام تدفقات البيانات، والتي يتم استيعابها باستخدام Apache Kafka. تبدأ عمليات التثبيت النموذجية ل Flink وKafka بتدفقات الأحداث التي يتم دفعها إلى Kafka، والتي يمكن استهلاكها بواسطة وظائف Flink.

يستخدم هذا المثال HDInsight على مجموعات AKS التي تقوم بتشغيل Flink 1.17.0 لمعالجة دفق البيانات المستهلكة لموضوع Kafka وإنتاجه.

إشعار

تم إهمال FlinkKafkaConsumer وسيتم إزالته باستخدام Flink 1.17، يرجى استخدام KafkaSource بدلا من ذلك. تم إهمال FlinkKafkaProducer وسيتم إزالته باستخدام Flink 1.15، يرجى استخدام KafkaSink بدلا من ذلك.

المتطلبات الأساسية

  • يجب أن يكون كل من Kafka وFlink في نفس VNet أو يجب أن يكون هناك نظير vnet بين نظامي المجموعات.

  • إنشاء VNet.

  • إنشاء نظام مجموعة Kafka في نفس VNet. يمكنك اختيار Kafka 3.2 أو 2.4 على HDInsight استنادا إلى استخدامك الحالي.

    لقطة شاشة توضح كيفية إنشاء نظام مجموعة Kafka في نفس VNet.

  • أضف تفاصيل الشبكة الظاهرية في قسم الشبكة الظاهرية.

  • إنشاء HDInsight على مجموعة نظام مجموعة AKS باستخدام نفس VNet.

  • إنشاء مجموعة Flink إلى مجموعة نظام المجموعة التي تم إنشاؤها.

موصل Apache Kafka

يوفر Flink موصل Apache Kafka لقراءة البيانات من مواضيع Kafka وكتابتها مع ضمانات مرة واحدة بالضبط.

تبعية Maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

بناء Kafka Sink

يوفر متلقي Kafka فئة منشئ لإنشاء مثيل KafkaSink. نستخدم نفس الشيء لإنشاء المتلقي الخاص بنا واستخدامه جنبا إلى جنب مع مجموعة Flink التي تعمل على HDInsight على AKS

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

كتابة برنامج Java Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

على Webssh، قم بتحميل الجرة وإرسال الجرة

لقطة شاشة تعرض الوظيفة قيد التشغيل على Flink.

على واجهة مستخدم لوحة معلومات Flink

لقطة شاشة توضح كيفية إرسال ملف Jar المجمع لموضوع Kafka كوظيفة إلى Flink.

إنتاج الموضوع - النقرات على Kafka

لقطة شاشة توضح كيفية إنتاج موضوع Kafka.

استهلاك الموضوع - الأحداث على Kafka

لقطة شاشة توضح كيفية استهلاك موضوع Kafka.

المرجع