Så här använder du Flink/Delta Anslut or

Viktigt!

Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.

Genom att använda Apache Flink och Delta Lake tillsammans kan du skapa en tillförlitlig och skalbar data lakehouse-arkitektur. Med Flink/Delta-Anslut eller kan du skriva data till Delta-tabeller med ACID-transaktioner och exakt en gång bearbetning. Det innebär att dina dataströmmar är konsekventa och felfria, även om du startar om Flink-pipelinen från en kontrollpunkt. Flink/Delta-Anslut eller säkerställer att dina data inte tappas bort eller dupliceras och att de matchar Flink-semantiken.

I den här artikeln får du lära dig hur du använder Flink-Delta-anslutningsappen.

  • Läs data från deltatabellen.
  • Skriv data till en deltatabell.
  • Fråga den i Power BI.

Vad är Flink/Delta-anslutningsprogram

Flink/Delta Anslut or är ett JVM-bibliotek för att läsa och skriva data från Apache Flink-program till Delta-tabeller med hjälp av Delta Standalone JVM-biblioteket. Anslutningsappen ger leveransgarantier exakt en gång.

Flink/Delta Anslut eller innehåller:

DeltaSink för att skriva data från Apache Flink till en Delta-tabell. DeltaSource för att läsa Delta-tabeller med Apache Flink.

Apache Flink-Delta-Anslut eller innehåller:

Beroende på vilken version av anslutningsappen du kan använda med följande Apache Flink-versioner:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Mer information finns i Flink/Delta Anslut or.

Förutsättningar

  • HDInsight Flink 1.17.0-kluster på AKS
  • Flink-Delta Anslut eller 0.7.0
  • Använda MSI för att komma åt ADLS Gen2
  • IntelliJ för utveckling

Läsa data från deltatabell

Delta Source kan fungera i något av två lägen, vilket beskrivs på följande sätt.

  • Begränsat läge Lämpligt för batchjobb, där vi endast vill läsa innehållet i Delta-tabellen för specifik tabellversion. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forBoundedRowData.

  • Kontinuerligt läge Lämpligt för direktuppspelningsjobb, där vi kontinuerligt vill kontrollera Delta-tabellen för nya ändringar och versioner. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forContinuousRowData.

Exempel: Skapa källa för Delta-tabellen för att läsa alla kolumner i begränsat läge. Lämplig för batchjobb. Det här exemplet läser in den senaste tabellversionen.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Andra exempel på kontinuerliga modeller finns i Lägen för datakälla.

Skriva till Delta-mottagare

Delta Sink exponerar för närvarande följande Flink-mått:

Skärmbild som visar tabellen för Flink-mått.

Skapa mottagare för icke-partitionerade tabeller

I det här exemplet visar vi hur du skapar en DeltaSink och ansluter den till en befintlig org.apache.flink.streaming.api.datastream.DataStream.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Andra exempel på skapande av mottagare finns i Mått för datamottagare.

Fullständig kod

Läsa data från en deltatabell och mottagare till en annan deltatabell.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Ladda upp jar-filen till ABFS. Skärmbild som visar jar-filer i appläge.

  2. Skicka information om jobbburken i AppMode-klustret.

    Skärmbild som visar klusterkonfiguration.

    Kommentar

    Aktivera hadoop.classpath.enable alltid när du läser/skriver till ADLS.

  3. Skicka klustret. Du bör kunna se jobbet i Flink-användargränssnittet.

    Skärmbild som visar Flink-instrumentpanelen.

  4. Hitta resultat i ADLS.

    Skärmbild som visar utdata.

Power BI-integrering

När data finns i deltamottagaren kan du köra frågan i Power BI Desktop och skapa en rapport.

  1. Öppna Power BI Desktop för att hämta data med hjälp av ADLS Gen2-anslutningsappen.

    Skärmbild som visar Power BI Desktop.

    Skärmbild som visar anslutningsappen ADLSGen 2.

  2. URL för lagringskontot.

    Skärmbild som visar URL:en för lagringskontot.

    Skärmbild som visar ADLS Gen2-information.

  3. Skapa M-fråga för källan och anropa funktionen, som frågar efter data från lagringskontot. Se Delta Power BI-anslutningsappar.

  4. När data är lättillgängliga kan du skapa rapporter.

    Skärmbild som visar hur du skapar rapporter.

Referenser