توصيل Apache Flink® على HDInsight على AKS مع مراكز أحداث Azure ل Apache Kafka®
إشعار
سنتقاعد Azure HDInsight على AKS في 31 يناير 2025. قبل 31 يناير 2025، ستحتاج إلى ترحيل أحمال العمل الخاصة بك إلى Microsoft Fabric أو منتج Azure مكافئ لتجنب الإنهاء المفاجئ لأحمال العمل الخاصة بك. سيتم إيقاف المجموعات المتبقية على اشتراكك وإزالتها من المضيف.
سيتوفر الدعم الأساسي فقط حتى تاريخ الإيقاف.
هام
هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.
حالة استخدام معروفة ل Apache Flink هي تحليلات الدفق. الاختيار الشائع من قبل العديد من المستخدمين لاستخدام تدفقات البيانات، والتي يتم استيعابها باستخدام Apache Kafka. تبدأ عمليات التثبيت النموذجية ل Flink وKafka بتدفقات الأحداث التي يتم دفعها إلى Kafka، والتي يمكن استهلاكها بواسطة وظائف Flink. توفر Azure Event Hubs نقطة نهاية Apache Kafka على مركز أحداث، والتي تمكن المستخدمين من الاتصال بمركز الأحداث باستخدام بروتوكول Kafka.
في هذه المقالة، نستكشف كيفية توصيل مراكز أحداث Azure ب Apache Flink على HDInsight على AKS ونغطي ما يلي
- إنشاء مساحة اسم مراكز الأحداث
- إنشاء HDInsight على نظام مجموعة AKS باستخدام Apache Flink
- تشغيل منتج Flink
- حزمة Jar ل Apache Flink
- إرسال الوظيفة والتحقق من صحتها
إنشاء مساحة اسم مراكز الأحداث ومراكز الأحداث
لإنشاء مساحة اسم مراكز الأحداث ومراكز الأحداث، راجع هنا
إعداد نظام مجموعة Flink على HDInsight على AKS
باستخدام HDInsight الموجود على مجموعة نظام مجموعة AKS، يمكنك إنشاء مجموعة Flink
تشغيل منتج Flink بإضافة bootstrap.servers والمعلومات
producer.config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
استبدل
{YOUR.EVENTHUBS.CONNECTION.STRING}
بسلسلة الاتصال لمساحة اسم مراكز أحداث. للحصول على إرشادات حول الحصول على سلسلة الاتصال، راجع تفاصيل حول كيفية الحصول على مراكز الأحداث سلسلة الاتصال.على سبيل المثال،
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
تعبئة JAR ل Flink
com.example.app الحزمة؛
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
أضف القصاصة البرمجية لتشغيل منتج Flink.
بمجرد تنفيذ التعليمات البرمجية، يتم تخزين الأحداث في الموضوع "topic1"
المرجع
- موقع ويب Apache Flink
- Apache وApache Kafka وKafka وApache Flink وFlink وأسماء مشاريع مصدر مفتوح المرتبطة هي علامات تجارية ل Apache Software Foundation (ASF).