توصيل Apache Flink® على HDInsight على AKS مع مراكز أحداث Azure ل Apache Kafka®

إشعار

سنتقاعد Azure HDInsight على AKS في 31 يناير 2025. قبل 31 يناير 2025، ستحتاج إلى ترحيل أحمال العمل الخاصة بك إلى Microsoft Fabric أو منتج Azure مكافئ لتجنب الإنهاء المفاجئ لأحمال العمل الخاصة بك. سيتم إيقاف المجموعات المتبقية على اشتراكك وإزالتها من المضيف.

سيتوفر الدعم الأساسي فقط حتى تاريخ الإيقاف.

هام

هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.

حالة استخدام معروفة ل Apache Flink هي تحليلات الدفق. الاختيار الشائع من قبل العديد من المستخدمين لاستخدام تدفقات البيانات، والتي يتم استيعابها باستخدام Apache Kafka. تبدأ عمليات التثبيت النموذجية ل Flink وKafka بتدفقات الأحداث التي يتم دفعها إلى Kafka، والتي يمكن استهلاكها بواسطة وظائف Flink. توفر Azure Event Hubs نقطة نهاية Apache Kafka على مركز أحداث، والتي تمكن المستخدمين من الاتصال بمركز الأحداث باستخدام بروتوكول Kafka.

في هذه المقالة، نستكشف كيفية توصيل مراكز أحداث Azure ب Apache Flink على HDInsight على AKS ونغطي ما يلي

  • إنشاء مساحة اسم مراكز الأحداث
  • إنشاء HDInsight على نظام مجموعة AKS باستخدام Apache Flink
  • تشغيل منتج Flink
  • حزمة Jar ل Apache Flink
  • إرسال الوظيفة والتحقق من صحتها

إنشاء مساحة اسم مراكز الأحداث ومراكز الأحداث

  1. لإنشاء مساحة اسم مراكز الأحداث ومراكز الأحداث، راجع هنا

    لقطة شاشة تعرض إعداد مراكز الأحداث.

  1. باستخدام HDInsight الموجود على مجموعة نظام مجموعة AKS، يمكنك إنشاء مجموعة Flink

  2. تشغيل منتج Flink بإضافة bootstrap.servers والمعلومات producer.config

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. للحصول على إرشادات حول الحصول على سلسلة الاتصال، راجع تفاصيل حول كيفية الحصول على مراكز الأحداث سلسلة الاتصال.

    على سبيل المثال،

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. com.example.app الحزمة؛

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. أضف القصاصة البرمجية لتشغيل منتج Flink.

    لقطة شاشة توضح كيفية اختبار Flink في مراكز الأحداث.

  3. بمجرد تنفيذ التعليمات البرمجية، يتم تخزين الأحداث في الموضوع "topic1"

    لقطة شاشة تعرض مراكز الأحداث المخزنة في الموضوع.

المرجع