كيفية استخدام Flink/Delta Connector
إشعار
سنتقاعد Azure HDInsight على AKS في 31 يناير 2025. قبل 31 يناير 2025، ستحتاج إلى ترحيل أحمال العمل الخاصة بك إلى Microsoft Fabric أو منتج Azure مكافئ لتجنب الإنهاء المفاجئ لأحمال العمل الخاصة بك. سيتم إيقاف المجموعات المتبقية على اشتراكك وإزالتها من المضيف.
سيتوفر الدعم الأساسي فقط حتى تاريخ الإيقاف.
هام
هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.
باستخدام Apache Flink وData Lake معا، يمكنك إنشاء بنية مستودع بيانات موثوقة وقابلة للتطوير. يسمح لك Flink/Delta Connector بكتابة البيانات إلى جداول Delta باستخدام معاملات ACID والمعالجة مرة واحدة بالضبط. وهذا يعني أن تدفقات البيانات الخاصة بك متسقة وخالية من الأخطاء، حتى إذا قمت بإعادة تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية Flink من نقطة تحقق. يضمن Flink/Delta Connector عدم فقدان بياناتك أو تكرارها، وتطابقها مع دلالات Flink.
في هذه المقالة، ستتعلم كيفية استخدام موصل Flink-Delta.
- اقرأ البيانات من جدول دلتا.
- اكتب البيانات إلى جدول دلتا.
- استعلم عنه في Power BI.
ما هو موصل Flink/Delta
Flink/Delta Connector هي مكتبة JVM لقراءة البيانات وكتابتها من تطبيقات Apache Flink إلى جداول Delta باستخدام مكتبة Delta Standalone JVM. يوفر الموصل ضمانات تسليم مرة واحدة بالضبط.
يتضمن Flink/Delta Connector ما يلي:
DeltaSink لكتابة البيانات من Apache Flink إلى جدول Delta. DeltaSource لقراءة جداول Delta باستخدام Apache Flink.
يتضمن Apache Flink-Delta Connector ما يلي:
اعتمادا على إصدار الموصل يمكنك استخدامه مع إصدارات Apache Flink التالية:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
المتطلبات الأساسية
- نظام مجموعة HDInsight Flink 1.17.0 على AKS
- Flink-Delta Connector 0.7.0
- استخدام MSI للوصول إلى ADLS Gen2
- IntelliJ للتطوير
قراءة البيانات من جدول دلتا
يمكن أن يعمل مصدر دلتا في أحد الوضعين، الموضحين على النحو التالي.
الوضع المحدد مناسب للوظائف الدفعية، حيث نريد قراءة محتوى جدول Delta لإصدار جدول معين فقط. إنشاء مصدر لهذا الوضع باستخدام DeltaSource.forBoundedRowData API.
الوضع المستمر مناسب لمهام الدفق، حيث نريد التحقق باستمرار من جدول Delta بحثا عن التغييرات والإصدارات الجديدة. إنشاء مصدر لهذا الوضع باستخدام DeltaSource.forContinuousRowData API.
مثال: إنشاء مصدر لجدول Delta، لقراءة جميع الأعمدة في الوضع المحدد. مناسب لوظائف الدفعات. يقوم هذا المثال بتحميل أحدث إصدار من الجدول.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
الكتابة إلى متلقي Delta
يعرض Delta Sink حاليا مقاييس Flink التالية:
إنشاء المتلقي للجداول غير المقطوعة
في هذا المثال، نعرض كيفية إنشاء DeltaSink وتوصيلته ب .org.apache.flink.streaming.api.datastream.DataStream
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
تعليمات برمجية كاملة
قراءة البيانات من جدول دلتا والمتلقي إلى جدول دلتا آخر.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
قم بحزم الجرة وإرسالها إلى مجموعة Flink للتشغيل
تمرير معلومات jar للوظيفة في مجموعة AppMode.
إشعار
تمكين
hadoop.classpath.enable
دائما أثناء القراءة/الكتابة إلى ADLS.إرسال نظام المجموعة، يجب أن تكون قادرا على رؤية المهمة في واجهة مستخدم Flink.
البحث عن النتائج في ADLS.
تكامل Power BI
بمجرد أن تكون البيانات في متلقي دلتا، يمكنك تشغيل الاستعلام في Power BI لسطح المكتب وإنشاء تقرير.
افتح Power BI desktop للحصول على البيانات باستخدام موصل ADLS Gen2.
عنوان URL لحساب التخزين.
إنشاء استعلام M للمصدر واستدعاء الدالة التي تستعلم عن البيانات من حساب التخزين.
بمجرد توفر البيانات بسهولة، يمكنك إنشاء تقارير.
المراجع
- Apache وApache Flink وFlink وأسماء مشاريع مصدر مفتوح المقترنة هي علامات تجارية ل Apache Software Foundation (ASF).