次の方法で共有


Flink/Delta コネクタの使用方法

重要

現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新については、Azure HDInsight コミュニティのフォローをお願いいたします。

Apache Flink と Delta Lake を組み合わせて使用することで、信頼性と拡張性に優れたデータ レイクハウス アーキテクチャを作成できます。 Flink/Delta コネクタを使用すると、ACID トランザクションと 1 回の処理で Delta テーブルにデータを書き込めます。 つまり、チェックポイントから Flink パイプラインを再起動した場合でも、データ ストリームの一貫性が確保され、エラーが発生しません。 Flink/Delta コネクタを使用すると、データが失われたり重複したりせず、Flink セマンティクスと一致することが保証されます。

この記事では、Flink-Delta コネクタの使用方法について説明します。

  • デルタ テーブルからデータを読み取る。
  • デルタ テーブルにデータを書き込む。
  • それを Power BI で表示する。

Flink/Delta コネクタとは

Flink/Delta Connector は、Delta スタンドアロン JVM ライブラリを使用して Apache Flink アプリケーションから Delta テーブルにデータを読み書きするための JVM ライブラリです。 コネクタは、厳密に 1 回の配信を保証します。

Flink/Delta コネクタには次のものが含まれます。

Apache Flink から Delta テーブルにデータを書き込むための DeltaSink。 Apache Flink を使用して Delta テーブルを読み取るための DeltaSource。

Apache Flink-Delta コネクタには、次のものが含まれます。

コネクタのバージョンに応じて、次の Apache Flink バージョンで使用できます。

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

前提条件

  • AKS 上の HDInsight Flink 1.17.0 クラスター
  • Flink-Delta コネクタ 0.7.0
  • MSI を使用して ADLS Gen2 にアクセスする
  • 開発用 IntelliJ

デルタ テーブルからデータを読み取る

デルタ ソースは、次の 2 つのモードのいずれかで動作します。

  • 境界モード: 特定のテーブル バージョンについてのみ Delta テーブルの内容を読み取るバッチ ジョブに適しています。 DeltaSource.forBoundedRowData API を使って、このモードのソースを作成します。

  • 継続モード: Delta テーブルで新しい変更とバージョンを継続的に確認する、ストリーミング ジョブに適しています。 DeltaSource.forContinuousRowData API を使って、このモードのソースを作成します。

例: 境界モードですべての列を読み取るための Delta テーブルのソース作成。 バッチ ジョブに適しています。 この例では、最新のテーブル バージョンを読み込みます。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Delta シンクへの書き込み

Delta シンクでは現在、次の Flink メトリックが公開されています。

Flink メトリックのテーブルを示すスクリーンショット。

パーティション分割されていないテーブルのシンクの作成

この例では、DeltaSink を作成し、既存の org.apache.flink.streaming.api.datastream.DataStream に接続する方法を示します。

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

コード全体

デルタ テーブルとシンクから別のデルタ テーブルにデータを読み取ります。

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. jar を ABFS にアップロードします。 アプリ モードの jar ファイルを示すスクリーンショット。

  2. AppMode クラスター内のジョブ jar 情報を渡します。

    クラスターの構成を示すスクリーンショット。

    Note

    ADLS への読み取り/書き込み中は常に hadoop.classpath.enable を有効にします。

  3. クラスターにジョブを送信すると、Flink UI でそのジョブが表示されます。

    Flink ダッシュボードを示すスクリーンショット。

  4. ADLS で結果を検索します。

    出力を示すスクリーンショット。

Power BI 統合

データが Delta シンクに格納されたら、Power BI Desktop でクエリを実行し、レポートを作成できます。

  1. Power BI Desktop を開き、ADLS Gen2 コネクタを使ってデータを取得します。

    Power BI Desktop を示すスクリーンショット。

    ADLSGen 2 コネクタを示すスクリーンショット。

  2. ストレージ アカウントの URL。

    ストレージ アカウントの URL を示すスクリーンショット。

    ADLS Gen2 の詳細を示すスクリーンショット。

  3. ソースの M クエリを作成し、ストレージ アカウントからデータを照会する関数を呼び出します。

  4. データを使用できるようになったら、レポートを作成できます。

    レポートの作成方法を示すスクリーンショット。

関連情報