كتابة رسائل الحدث في Azure Data Lake Storage Gen2 باستخدام Apache Flink® DataStream API
هام
هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.
يستخدم Apache Flink أنظمة الملفات لاستهلاك البيانات وتخزينها باستمرار، سواء لنتائج التطبيقات أو للتسامح مع الخطأ والاسترداد. في هذه المقالة، تعرف على كيفية كتابة رسائل الحدث في Azure Data Lake Storage Gen2 باستخدام واجهة برمجة تطبيقات DataStream.
المتطلبات الأساسية
- مجموعة Apache Flink على HDInsight على AKS
- مجموعة Apache Kafka على HDInsight
- مطلوب منك التأكد من مراعاة إعدادات الشبكة كما هو موضح في استخدام Apache Kafka على HDInsight. تأكد من أن HDInsight على مجموعات AKS وHDInsight موجودة في نفس الشبكة الظاهرية.
- استخدام MSI للوصول إلى ADLS Gen2
- IntelliJ للتطوير على جهاز Azure الظاهري في HDInsight على شبكة AKS الظاهرية
موصل Apache Flink FileSystem
يوفر موصل نظام الملفات هذا نفس الضمانات لكل من BATCH وD STREAMING وتم تصميمه لتوفير دلالات مرة واحدة بالضبط لتنفيذ البث. لمزيد من المعلومات، راجع Flink DataStream Filesystem.
Apache Kafka الاتصال or
يوفر Flink موصل Apache Kafka لقراءة البيانات من مواضيع Kafka وكتابتها مع ضمانات مرة واحدة بالضبط. لمزيد من المعلومات، راجع Apache Kafka الاتصال or.
إنشاء المشروع ل Apache Flink
pom.xml على IntelliJ IDEA
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
برنامج ADLS Gen2 Sink
abfsGen2.java
إشعار
استبدل Apache Kafka على نظام مجموعة HDInsight bootStrapServers بوسطاء Kafka 3.2
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
حزم jar، وإرسالها إلى Apache Flink.
تحميل الجرة إلى ABFS.
تمرير معلومات جرة الوظيفة في
AppMode
إنشاء نظام المجموعة.إشعار
تأكد من إضافة classloader.resolve-order ك "parent-first" و hadoop.classpath.enable ك
true
حدد تجميع سجل الوظيفة لدفع سجلات الوظائف إلى حساب التخزين.
يمكنك رؤية المهمة قيد التشغيل.
التحقق من صحة تدفق البيانات على ADLS Gen2
نحن نرى البث click_events
إلى ADLS Gen2.
يمكنك تحديد نهج متجدد يقوم بطرح ملف الجزء قيد التقدم على أي من الشروط الثلاثة التالية:
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
المرجع
- Apache Kafka الاتصال or
- نظام ملفات Flink DataStream
- موقع ويب Apache Flink
- Apache وApache Kafka وKafka وApache Flink وFlink وأسماء مشاريع مصدر مفتوح المرتبطة هي علامات تجارية ل Apache Software Foundation (ASF).