Bagikan melalui


Cara menggunakan Konektor Flink/Delta

Penting

Fitur ini masih dalam mode pratinjau. Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure mencakup lebih banyak persyaratan hukum yang berlaku untuk fitur Azure yang dalam versi beta, dalam pratinjau, atau belum dirilis ke ketersediaan umum. Untuk informasi tentang pratinjau khusus ini, lihat Azure HDInsight pada informasi pratinjau AKS. Untuk pertanyaan atau saran fitur, kirimkan permintaan di AskHDInsight dengan detail dan ikuti kami untuk pembaruan lebih lanjut di Komunitas Azure HDInsight.

Dengan menggunakan Apache Flink dan Delta Lake bersama-sama, Anda dapat membuat arsitektur data lakehouse yang andal dan dapat diskalakan. Konektor Flink/Delta memungkinkan Anda menulis data ke tabel Delta dengan transaksi ACID dan pemrosesan tepat sekali. Ini berarti bahwa aliran data Anda konsisten dan bebas kesalahan, bahkan jika Anda menghidupkan ulang alur Flink Anda dari titik pemeriksaan. Konektor Flink/Delta memastikan bahwa data Anda tidak hilang atau diduplikasi, dan cocok dengan semantik Flink.

Dalam artikel ini, Anda mempelajari cara menggunakan konektor Flink-Delta.

  • Baca data dari tabel delta.
  • Tulis data ke tabel delta.
  • Mengkuerinya di Power BI.

Apa itu konektor Flink/Delta

Konektor Flink/Delta adalah pustaka JVM untuk membaca dan menulis data dari aplikasi Apache Flink ke tabel Delta yang menggunakan pustaka JVM Mandiri Delta. Konektor memberikan jaminan pengiriman tepat sekali.

Konektor Flink/Delta meliputi:

DeltaSink untuk menulis data dari Apache Flink ke tabel Delta. DeltaSource untuk membaca tabel Delta menggunakan Apache Flink.

Konektor Apache Flink-Delta meliputi:

Bergantung pada versi konektor, Anda dapat menggunakannya dengan versi Apache Flink berikut:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Prasyarat

  • Kluster HDInsight Flink 1.17.0 pada AKS
  • Konektor Flink-Delta 0.7.0
  • Menggunakan MSI untuk mengakses ADLS Gen2
  • IntelliJ untuk pengembangan

Membaca data dari tabel delta

Sumber Delta dapat berfungsi dalam salah satu dari dua mode, yang dijelaskan sebagai berikut.

  • Mode Terikat Cocok untuk pekerjaan batch, di mana kita ingin membaca konten tabel Delta hanya untuk versi tabel tertentu. Buat sumber mode ini menggunakan API DeltaSource.forBoundedRowData.

  • Mode Berkelanjutan Cocok untuk pekerjaan streaming, di mana kita ingin terus memeriksa tabel Delta untuk perubahan dan versi baru. Buat sumber mode ini menggunakan API DeltaSource.forContinuousRowData.

Contoh: Pembuatan sumber untuk tabel Delta, untuk membaca semua kolom dalam mode terikat. Cocok untuk pekerjaan batch. Contoh ini memuat versi tabel terbaru.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Menulis ke sink Delta

Delta Sink saat ini mengekspos metrik Flink berikut:

Cuplikan layar memperlihatkan tabel untuk metrik Flink.

Pembuatan sink untuk tabel yang tidak dipartisi

Dalam contoh ini, kami menunjukkan cara membuat DeltaSink dan menyambungkannya ke org.apache.flink.streaming.api.datastream.DataStream.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Kode lengkap

Membaca data dari tabel delta dan sink ke tabel delta lain.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Unggah jar ke ABFS. Cuplikan layar memperlihatkan file jar Mode aplikasi.

  2. Berikan informasi jar pekerjaan di kluster AppMode.

    Cuplikan layar memperlihatkan konfigurasi kluster.

    Catatan

    Selalu aktifkan hadoop.classpath.enable saat membaca/menulis ke ADLS.

  3. Kirim kluster, Anda akan dapat melihat pekerjaan di Flink UI.

    Cuplikan layar memperlihatkan dasbor Flink.

  4. Temukan Hasil di ADLS.

    Cuplikan layar memperlihatkan output.

Integrasi Power BI

Setelah data berada di sink delta, Anda bisa menjalankan kueri di desktop Power BI dan membuat laporan.

  1. Buka desktop Power BI untuk mendapatkan data menggunakan konektor ADLS Gen2.

    Cuplikan layar memperlihatkan desktop Power BI.

    Cuplikan layar memperlihatkan konektor ADLSGen 2.

  2. URL akun penyimpanan.

    Cuplikan layar memperlihatkan URL akun penyimpanan.

    Cuplikan layar memperlihatkan detail ADLS Gen2.

  3. Buat kueri M untuk sumber dan panggil fungsi , yang meminta data dari akun penyimpanan.

  4. Setelah data tersedia, Anda dapat membuat laporan.

    Cuplikan layar memperlihatkan cara membuat laporan.

Referensi