共用方式為


如何使用 Flink/Delta 連接器

重要

此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群取得更多更新。

藉由同時使用 Apache Flink 和 Delta Lake,您可以建立可靠且可調整的資料湖存放庫架構。 Flink/Delta 連接器可讓您使用 ACID 交易並確切一次性處理將資料寫入 Delta 資料表。 這表示即使從檢查點重新啟動 Flink 管線,資料流也會保持一致且無錯誤。 Flink/Delta 連接器確保資料不會遺失或重複,並且與 Flink 語意相符。

在本文中,您將瞭解如何使用 Flink-Delta 連接器。

  • 從 Delta 資料表中讀取資料。
  • 將資料寫入 Delta 資料表。
  • 在 Power BI 中查詢資料。

什麼是 Flink/Delta 連接器

Flink/Delta 連接器是一個 JVM 程式庫,可以使用 Delta 獨立 JVM 程式庫從 Apache Flink 應用程式讀取資料並將其寫入 Delta 資料表。 連接器只提供一次傳遞保證。

Flink/Delta 連線 or 包括:

用於將資料從 Apache Flink 寫入至 Delta 資料表的 DeltaSink。 用於使用 Apache Flink 讀取 Delta 資料表的 DeltaSource。

Apache Flink-Delta 連線 or 包括:

視連接器版本而定,您可以將其與下列 Apache Flink 版本搭配使用:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

如需詳細資訊,請參閱 Flink/Delta 連線 or

必要條件

  • AKS 上的 HDInsight Flink 1.17.0 叢集
  • Flink-Delta 連線 or 0.7.0
  • 使用 MSI 存取 ADLS Gen2
  • 用於開發的 IntelliJ

從 Delta 資料表讀取資料

差異來源可以在兩種模式的其中一種運作,如下所示。

  • 限定模式 適用於批次作業,我們想要只讀取特定數據表版本的 Delta 數據表內容。 使用 DeltaSource.forBoundedRowData API 建立此模式的來源。

  • 適用於串流作業的連續模式,我們想要在其中持續檢查 Delta 數據表是否有新的變更和版本。 使用 DeltaSource.forContinuousRowData API 建立此模式的來源。

範例:建立 Delta 數據表的來源,以讀取系結模式中的所有數據行。 適用於批次作業。 此範例會載入最新的數據表版本。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

如需其他連續模型範例,請參閱 數據源模式

寫入至 Delta 接收器

Delta Sink 目前會公開下列 Flink 計量:

顯示 Flink 計量數據表的螢幕快照。

非分割數據表的接收建立

在此範例中,我們會示範如何建立 DeltaSink,並將其插入現有的 org.apache.flink.streaming.api.datastream.DataStream

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

如需其他接收建立範例,請參閱 數據接收計量

完整程式碼

從差異數據表讀取數據,並接收至另一個差異數據表。

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. 將 jar 上傳至 ABFS。 顯示應用程式模式 jar 檔案的螢幕快照。

  2. 在 AppMode 叢集中傳遞作業 jar 資訊。

    顯示叢集設定的螢幕快照。

    注意

    一律在讀取/寫入ADLS時啟用 hadoop.classpath.enable

  3. 提交叢集,您應該能夠在 Flink UI 中看到作業。

    顯示 Flink 儀錶板的螢幕快照。

  4. 在 ADLS 中尋找結果。

    顯示輸出的螢幕快照。

Power BI 整合

當資料位於 Delta 接收器中,您就可以在 Power BI Desktop 中執行查詢並建立報表。

  1. 開啟 Power BI Desktop 以使用 ADLS Gen2 連接器取得數據。

    顯示 Power BI Desktop 的螢幕快照。

    顯示 ADLSGen 2 連接器的螢幕快照。

  2. 儲存體帳戶的 URL。

    顯示記憶體帳戶 URL 的螢幕快照。

    顯示ADLS Gen2詳細資料的螢幕快照。

  3. 建立來源的 M-query 並叫用函式,以從儲存體帳戶查詢資料。 請參閱 Delta Power BI 連接器

  4. 一旦資料準備就緒,您就可以建立報表。

    顯示如何建立報表的螢幕快照。

參考資料