Jak používat Připojení or Flink/Delta

Důležité

Tato funkce je aktuálně dostupná jako ukázková verze. Doplňkové podmínky použití pro Microsoft Azure Preview obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nejsou vydány v obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight o službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích v komunitě Azure HDInsight.

Pomocí Apache Flinku a Delta Lake společně můžete vytvořit spolehlivou a škálovatelnou architekturu datového jezerahouse. Flink/Delta Připojení or umožňuje zapisovat data do tabulek Delta s transakcemi ACID a přesně jednou zpracovat. To znamená, že datové streamy jsou konzistentní a bez chyb, i když kanál Flink restartujete z kontrolního bodu. Flink/Delta Připojení or zajišťuje, že vaše data nebudou ztracena nebo duplikována a že odpovídají sémantice Flink.

V tomto článku se dozvíte, jak používat konektor Flink-Delta.

  • Přečtěte si data z tabulky Delta.
  • Zapište data do tabulky Delta.
  • Dotazujte se na ni v Power BI.

Co je konektor Flink/Delta

Flink/Delta Připojení or je knihovna JVM pro čtení a zápis dat z aplikací Apache Flink do tabulek Delta využívajících knihovnu Delta Standalone JVM. Konektor poskytuje přesně jednou záruku doručení.

Flink/Delta Připojení or zahrnuje:

DeltaSink pro zápis dat z Apache Flink do tabulky Delta DeltaSource pro čtení tabulek Delta pomocí Apache Flinku

Apache Flink-Delta Připojení or zahrnuje:

V závislosti na verzi konektoru ho můžete použít s následujícími verzemi Apache Flink:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Další informace najdete v tématu Flink/Delta Připojení or.

Požadavky

  • Cluster HDInsight Flink 1.17.0 v AKS
  • Flink-Delta Připojení or 0.7.0
  • Použití MSI pro přístup k ADLS Gen2
  • IntelliJ pro vývoj

Čtení dat z tabulky Delta

Rozdílový zdroj může fungovat v jednom ze dvou režimů, jak je popsáno níže.

  • Ohraničený režim vhodný pro dávkové úlohy, kde chceme číst obsah tabulky Delta pouze pro konkrétní verzi tabulky. Vytvořte zdroj tohoto režimu pomocí rozhraní DeltaSource.forBoundedRowData API.

  • Průběžný režim vhodný pro úlohy streamování, kde chceme průběžně kontrolovat nové změny a verze tabulky Delta. Vytvořte zdroj tohoto režimu pomocí rozhraní DeltaSource.forContinuousRowData API.

Příklad: Vytvoření zdroje pro tabulku Delta pro čtení všech sloupců v vázaném režimu Vhodné pro dávkové úlohy. Tento příklad načte nejnovější verzi tabulky.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Další příklad průběžného modelu najdete v tématu Režimy zdroje dat.

Zápis do jímky Delta

Delta Sink v současné době zveřejňuje následující metriky Flink:

Snímek obrazovky zobrazující tabulku pro metriky Flink

Vytvoření jímky pro tabulky bez oddílů

V tomto příkladu si ukážeme, jak vytvořit DeltaSink a připojit ho k existujícímu org.apache.flink.streaming.api.datastream.DataStream.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Další příklad vytvoření jímky najdete v tématu Metriky jímky dat.

Úplný kód

Čtení dat z tabulky Delta a jímky do jiné tabulky delta

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Pom.xml Mavenu

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Nahrajte soubor JAR do ABFS. Snímek obrazovky zobrazující soubory JAR v režimu aplikace

  2. Předejte informace o souboru JAR úlohy v clusteru AppMode.

    Snímek obrazovky znázorňující konfiguraci clusteru

    Poznámka:

    Při čtení a zápisu do ADLS vždy povolte hadoop.classpath.enable .

  3. Odešlete cluster, měli byste být schopni zobrazit úlohu v uživatelském rozhraní Flink.

    Snímek obrazovky s řídicím panelem Flink

  4. Výsledky najdete v ADLS.

    Snímek obrazovky znázorňující výstup

Integrace Power BI

Jakmile jsou data v delta jímce, můžete dotaz spustit v Power BI Desktopu a vytvořit sestavu.

  1. Otevřete Power BI Desktop a získejte data pomocí konektoru ADLS Gen2.

    Snímek obrazovky znázorňující Power BI Desktop

    Snímek obrazovky znázorňující konektor ADLSGen 2

  2. Adresa URL účtu úložiště

    Snímek obrazovky s adresou URL účtu úložiště

    Snímek obrazovky znázorňující podrobnosti ADLS Gen2

  3. Vytvořte dotaz M pro zdroj a vyvoláte funkci, která dotazuje data z účtu úložiště. Další informace najdete v konektorech Delta Power BI.

  4. Jakmile jsou data snadno dostupná, můžete vytvářet sestavy.

    Snímek obrazovky ukazuje, jak vytvářet sestavy.

Reference