Så här använder du Flink/Delta Anslut or
Viktigt!
Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.
Genom att använda Apache Flink och Delta Lake tillsammans kan du skapa en tillförlitlig och skalbar data lakehouse-arkitektur. Med Flink/Delta-Anslut eller kan du skriva data till Delta-tabeller med ACID-transaktioner och exakt en gång bearbetning. Det innebär att dina dataströmmar är konsekventa och felfria, även om du startar om Flink-pipelinen från en kontrollpunkt. Flink/Delta-Anslut eller säkerställer att dina data inte tappas bort eller dupliceras och att de matchar Flink-semantiken.
I den här artikeln får du lära dig hur du använder Flink-Delta-anslutningsappen.
- Läs data från deltatabellen.
- Skriv data till en deltatabell.
- Fråga den i Power BI.
Vad är Flink/Delta-anslutningsprogram
Flink/Delta Anslut or är ett JVM-bibliotek för att läsa och skriva data från Apache Flink-program till Delta-tabeller med hjälp av Delta Standalone JVM-biblioteket. Anslutningsappen ger leveransgarantier exakt en gång.
Flink/Delta Anslut eller innehåller:
DeltaSink för att skriva data från Apache Flink till en Delta-tabell. DeltaSource för att läsa Delta-tabeller med Apache Flink.
Apache Flink-Delta-Anslut eller innehåller:
Beroende på vilken version av anslutningsappen du kan använda med följande Apache Flink-versioner:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
Mer information finns i Flink/Delta Anslut or.
Förutsättningar
- HDInsight Flink 1.17.0-kluster på AKS
- Flink-Delta Anslut eller 0.7.0
- Använda MSI för att komma åt ADLS Gen2
- IntelliJ för utveckling
Läsa data från deltatabell
Delta Source kan fungera i något av två lägen, vilket beskrivs på följande sätt.
Begränsat läge Lämpligt för batchjobb, där vi endast vill läsa innehållet i Delta-tabellen för specifik tabellversion. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forBoundedRowData.
Kontinuerligt läge Lämpligt för direktuppspelningsjobb, där vi kontinuerligt vill kontrollera Delta-tabellen för nya ändringar och versioner. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forContinuousRowData.
Exempel: Skapa källa för Delta-tabellen för att läsa alla kolumner i begränsat läge. Lämplig för batchjobb. Det här exemplet läser in den senaste tabellversionen.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Andra exempel på kontinuerliga modeller finns i Lägen för datakälla.
Skriva till Delta-mottagare
Delta Sink exponerar för närvarande följande Flink-mått:
Skapa mottagare för icke-partitionerade tabeller
I det här exemplet visar vi hur du skapar en DeltaSink och ansluter den till en befintlig org.apache.flink.streaming.api.datastream.DataStream
.
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Andra exempel på skapande av mottagare finns i Mått för datamottagare.
Fullständig kod
Läsa data från en deltatabell och mottagare till en annan deltatabell.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Paketera jar-filen och skicka den till Flink-klustret för att köra
Skicka information om jobbburken i AppMode-klustret.
Kommentar
Aktivera
hadoop.classpath.enable
alltid när du läser/skriver till ADLS.Skicka klustret. Du bör kunna se jobbet i Flink-användargränssnittet.
Hitta resultat i ADLS.
Power BI-integrering
När data finns i deltamottagaren kan du köra frågan i Power BI Desktop och skapa en rapport.
Öppna Power BI Desktop för att hämta data med hjälp av ADLS Gen2-anslutningsappen.
URL för lagringskontot.
Skapa M-fråga för källan och anropa funktionen, som frågar efter data från lagringskontot. Se Delta Power BI-anslutningsappar.
När data är lättillgängliga kan du skapa rapporter.
Referenser
- Deltakopplingar.
- Delta Power BI-anslutningsappar.
- Apache, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärkensom tillhör Apache Software Foundation (ASF).