Share via


Az Flink/Delta Csatlakozás or használata

Fontos

Ez a szolgáltatás jelenleg előzetes kiadásban elérhető. A Microsoft Azure Előzetes verzió kiegészítő használati feltételei további jogi feltételeket tartalmaznak, amelyek a bétaverzióban, előzetes verzióban vagy egyébként még nem általánosan elérhető Azure-funkciókra vonatkoznak. Erről az adott előzetes verzióról az Azure HDInsight az AKS előzetes verziójában tájékozódhat. Ha kérdése vagy funkciójavaslata van, küldjön egy kérést az AskHDInsightban a részletekkel együtt, és kövessen minket további frissítésekért az Azure HDInsight-közösségről.

Az Apache Flink és a Delta Lake együttes használatával megbízható és méretezhető data lakehouse-architektúrát hozhat létre. A Flink/Delta Csatlakozás or lehetővé teszi, hogy acid-tranzakciókkal és pontosan feldolgozás után adatokat írjon Delta-táblákba. Ez azt jelenti, hogy az adatfolyamok konzisztensek és hibamentesek, még akkor is, ha egy ellenőrzőpontról újraindítja az Flink-folyamatot. A Flink/Delta Csatlakozás or biztosítja, hogy az adatok ne vesszenek el vagy duplikálva legyen, és hogy megegyezzenek a Flink szemantikával.

Ebből a cikkből megtudhatja, hogyan használhatja a Flink-Delta összekötőt.

  • Olvassa el az adatokat a delta táblából.
  • Írja be az adatokat egy deltatáblába.
  • Lekérdezés a Power BI-ban.

Mi az a Flink/Delta-összekötő?

A Flink/Delta Csatlakozás or egy JVM-kódtár, amely adatokat olvas be és ír az Apache Flink-alkalmazásokból a Delta-táblákba a Delta önálló JVM-kódtár használatával. Az összekötő pontosan egyszer biztosítja a kézbesítési garanciát.

A Flink/Delta Csatlakozás or a következőket tartalmazza:

DeltaSink az Apache Flinkből egy Delta-táblába való adatíráshoz. DeltaSource a Delta-táblák Apache Flink használatával történő olvasásához.

Az Apache Flink-Delta Csatlakozás or a következőket tartalmazza:

Az összekötő verziójától függően a következő Apache Flink-verziókkal használhatja:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

További információ: Flink/Delta Csatlakozás or.

Előfeltételek

  • HDInsight Flink 1.17.0-fürt az AKS-en
  • Flink-Delta Csatlakozás or 0.7.0
  • Az MSI használata az ADLS Gen2 eléréséhez
  • IntelliJ fejlesztéshez

Adatok olvasása a deltatáblából

A Delta Source két mód egyikében működik, az alábbiak szerint.

  • Kötegfeladatokhoz alkalmas határolt mód, ahol csak adott táblaverzióhoz szeretnénk olvasni a Delta-tábla tartalmát. Ennek a módnak a forrását a DeltaSource.forBoundedRowData API használatával hozhatja létre.

  • Folyamatos mód alkalmas streamelési feladatokhoz, ahol folyamatosan ellenőrizni szeretnénk a Delta táblát az új módosítások és verziók keresése érdekében. Hozzon létre egy forrást ennek a módnak a DeltaSource.forContinuousRowData API-val.

Példa: A Delta-tábla forrásának létrehozása az összes oszlop beolvasásához határolókeretes módban. Kötegelt feladatokhoz alkalmas. Ez a példa betölti a legújabb táblaverziót.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

További folyamatos modellért lásd az Adatforrás módokat.

Írás a Delta-fogadóba

A Delta Sink jelenleg a következő Flink-metrikákat teszi elérhetővé:

Képernyőkép a Flink-metrikák tábláról.

Fogadó létrehozása nem particionált táblákhoz

Ebben a példában bemutatjuk, hogyan hozhat létre DeltaSinket, és hogyan csatlakoztathatja egy meglévőhöz org.apache.flink.streaming.api.datastream.DataStream.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

További fogadólétrehozás-példa: Data Sink Metrics.

Teljes kód

Adatok beolvasása deltatáblából és fogadóból egy másik deltatáblába.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Töltse fel az üveget az ABFS-be. Képernyőkép az alkalmazásmód jar-fájljairól.

  2. Adja át a feladat jar-adatait az AppMode-fürtben.

    Képernyőkép a fürtkonfigurációról.

    Feljegyzés

    Mindig engedélyezze az hadoop.classpath.enable ADLS-be való olvasás/írás közben.

  3. Küldje el a fürtöt, látnia kell a feladatot a Flink felhasználói felületén.

    Képernyőkép a Flink irányítópultról.

  4. Találatok keresése az ADLS-ben.

    Képernyőkép a kimenetről.

Power BI-integráció

Ha az adatok a delta fogadóban lesznek, futtathatja a lekérdezést a Power BI Desktopban, és létrehozhat egy jelentést.

  1. Nyissa meg a Power BI desktopot az adatok ADLS Gen2-összekötővel való lekéréséhez.

    Képernyőkép a Power BI desktopról.

    Képernyőkép az ADLSGen 2-összekötőről.

  2. A tárfiók URL-címe.

    Képernyőkép a tárfiók URL-címéről.

    Képernyőkép az ADLS Gen2-részletekről.

  3. Hozzon létre M-lekérdezést a forráshoz, és hívja meg a függvényt, amely lekérdezi az adatokat a tárfiókból. Tekintse meg a Delta Power BI-összekötőket.

  4. Miután az adatok könnyen elérhetők, jelentéseket hozhat létre.

    Képernyőkép a jelentések létrehozásáról.

Hivatkozások