Udostępnij za pośrednictwem


Jak używać łącznika Flink/Delta

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

Korzystając ze sobą usług Apache Flink i Delta Lake, można utworzyć niezawodną i skalowalną architekturę usługi Data Lakehouse. Łącznik Flink/Delta umożliwia zapisywanie danych w tabelach delty przy użyciu transakcji ACID i dokładnie raz przetwarzania. Oznacza to, że strumienie danych są spójne i wolne od błędów, nawet jeśli ponownie uruchomisz potok Flink z punktu kontrolnego. Łącznik Flink/Delta zapewnia, że dane nie zostaną utracone ani zduplikowane oraz że są zgodne z semantykami Flink.

Z tego artykułu dowiesz się, jak używać łącznika Flink-Delta.

  • Odczytywanie danych z tabeli delty.
  • Zapisywanie danych w tabeli delty.
  • Wykonaj zapytanie w usłudze Power BI.

Co to jest łącznik Flink/Delta

Flink/Delta Connector to biblioteka JVM do odczytywania i zapisywania danych z aplikacji Apache Flink do tabel delty korzystających z biblioteki Delta Standalone JVM. Łącznik zapewnia dokładnie jednokrotne gwarancje dostarczania.

Łącznik Flink/Delta obejmuje następujące elementy:

DeltaSink do zapisywania danych z platformy Apache Flink do tabeli delty. Usługa DeltaSource do odczytywania tabel delty przy użyciu narzędzia Apache Flink.

Łącznik apache Flink-Delta obejmuje:

W zależności od wersji łącznika można go używać z następującymi wersjami narzędzia Apache Flink:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Wymagania wstępne

  • Klaster usługi HDInsight Flink 1.17.0 w usłudze AKS
  • Łącznik Flink-Delta Connector 0.7.0
  • Uzyskiwanie dostępu do usługi ADLS Gen2 przy użyciu tożsamości usługi ZARZĄDZANEj
  • IntelliJ na potrzeby programowania

Odczytywanie danych z tabeli delty

Źródło różnicowe może działać w jednym z dwóch trybów opisanych w następujący sposób.

  • Tryb ograniczony odpowiedni dla zadań wsadowych, w którym chcemy odczytywać zawartość tabeli delty tylko dla określonej wersji tabeli. Utwórz źródło tego trybu przy użyciu interfejsu API DeltaSource.forBoundedRowData.

  • Tryb ciągły odpowiedni dla zadań przesyłania strumieniowego, w którym chcemy stale sprawdzać tabelę delty pod kątem nowych zmian i wersji. Utwórz źródło tego trybu przy użyciu interfejsu API DeltaSource.forContinuousRowData.

Przykład: Tworzenie źródła dla tabeli delty w celu odczytania wszystkich kolumn w trybie ograniczonym. Nadaje się do zadań wsadowych. W tym przykładzie ładuje najnowszą wersję tabeli.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Zapisywanie w ujściu delty

Ujście delty uwidacznia obecnie następujące metryki linku Flink:

Zrzut ekranu przedstawiający tabelę metryk Flink.

Tworzenie ujścia dla tabel niepartycyjnych

W tym przykładzie pokazano, jak utworzyć obiekt DeltaSink i podłączyć go do istniejącego org.apache.flink.streaming.api.datastream.DataStreamelementu .

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Pełny kod

Odczytywanie danych z tabeli delty i ujście do innej tabeli różnicowej.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Przekaż plik jar do systemu ABFS. Zrzut ekranu przedstawiający pliki jar trybu aplikacji.

  2. Przekaż informacje o pliku jar zadania w klastrze AppMode.

    Zrzut ekranu przedstawiający konfigurację klastra.

    Uwaga

    Zawsze włączaj hadoop.classpath.enable podczas odczytu/zapisu w usłudze ADLS.

  3. Prześlij klaster. Powinno być możliwe wyświetlenie zadania w interfejsie użytkownika Flink.

    Zrzut ekranu przedstawiający pulpit nawigacyjny Flink.

  4. Znajdź wyniki w usłudze ADLS.

    Zrzut ekranu przedstawiający dane wyjściowe.

Integracja usługi Power BI

Gdy dane są w ujściu różnicowym, możesz uruchomić zapytanie w programie Power BI Desktop i utworzyć raport.

  1. Otwórz program Power BI Desktop, aby pobrać dane przy użyciu łącznika usługi ADLS Gen2.

    Zrzut ekranu przedstawiający program Power BI Desktop.

    Zrzut ekranu przedstawiający łącznik usługi ADLSGen 2.

  2. Adres URL konta magazynu.

    Zrzut ekranu przedstawiający adres URL konta magazynu.

    Zrzut ekranu przedstawiający szczegóły usługi ADLS Gen2.

  3. Utwórz zapytanie M dla źródła i wywołaj funkcję, która wysyła zapytania do danych z konta magazynu.

  4. Gdy dane będą łatwo dostępne, możesz tworzyć raporty.

    Zrzut ekranu przedstawiający sposób tworzenia raportów.

Informacje