如何使用 Flink/Delta 連接器
重要
此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight 上提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群上取得更多更新。
藉由同時使用 Apache Flink 和 Delta Lake,您可以建立可靠且可調整的資料湖存放庫架構。 Flink/Delta 連接器可讓您使用 ACID 交易並確切一次性處理將資料寫入 Delta 資料表。 這表示即使從檢查點重新啟動 Flink 管線,資料流也會保持一致且無錯誤。 Flink/Delta 連接器確保資料不會遺失或重複,並且與 Flink 語意相符。
在本文中,您將瞭解如何使用 Flink-Delta 連接器。
- 從 Delta 資料表中讀取資料。
- 將資料寫入 Delta 資料表。
- 在 Power BI 中查詢資料。
什麼是 Flink/Delta 連接器
Flink/Delta 連接器是一個 JVM 程式庫,可以使用 Delta 獨立 JVM 程式庫從 Apache Flink 應用程式讀取資料並將其寫入 Delta 資料表。 連接器只提供一次傳遞保證。
Flink/Delta 連線 or 包括:
用於將資料從 Apache Flink 寫入至 Delta 資料表的 DeltaSink。 用於使用 Apache Flink 讀取 Delta 資料表的 DeltaSource。
Apache Flink-Delta 連線 or 包括:
視連接器版本而定,您可以將其與下列 Apache Flink 版本搭配使用:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
如需詳細資訊,請參閱 Flink/Delta 連線 or。
必要條件
- AKS 上的 HDInsight Flink 1.17.0 叢集
- Flink-Delta 連線 or 0.7.0
- 使用 MSI 存取 ADLS Gen2
- 用於開發的 IntelliJ
從 Delta 資料表讀取資料
差異來源可以在兩種模式的其中一種運作,如下所示。
限定模式 適用於批次作業,我們想要只讀取特定數據表版本的 Delta 數據表內容。 使用 DeltaSource.forBoundedRowData API 建立此模式的來源。
適用於串流作業的連續模式,我們想要在其中持續檢查 Delta 數據表是否有新的變更和版本。 使用 DeltaSource.forContinuousRowData API 建立此模式的來源。
範例:建立 Delta 數據表的來源,以讀取系結模式中的所有數據行。 適用於批次作業。 此範例會載入最新的數據表版本。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
如需其他連續模型範例,請參閱 數據源模式。
寫入至 Delta 接收器
Delta Sink 目前會公開下列 Flink 計量:
非分割數據表的接收建立
在此範例中,我們會示範如何建立 DeltaSink,並將其插入現有的 org.apache.flink.streaming.api.datastream.DataStream
。
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
如需其他接收建立範例,請參閱 數據接收計量。
完整程式碼
從差異數據表讀取數據,並接收至另一個差異數據表。
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
封裝 jar 並將它提交至 Flink 叢集以執行
在 AppMode 叢集中傳遞作業 jar 資訊。
注意
一律在讀取/寫入ADLS時啟用
hadoop.classpath.enable
。提交叢集,您應該能夠在 Flink UI 中看到作業。
在 ADLS 中尋找結果。
Power BI 整合
當資料位於 Delta 接收器中,您就可以在 Power BI Desktop 中執行查詢並建立報表。
開啟 Power BI Desktop 以使用 ADLS Gen2 連接器取得數據。
儲存體帳戶的 URL。
建立來源的 M-query 並叫用函式,以從儲存體帳戶查詢資料。 請參閱 Delta Power BI 連接器。
一旦資料準備就緒,您就可以建立報表。
參考資料
- Delta 連接器。
- Delta Power BI 連接器。
- Apache、Apache Flink、Flink 和相關聯的開放原始碼專案名稱為 Apache Software Foundation (ASF) 的商標。
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應