Jak používat Připojení or Flink/Delta
Důležité
Tato funkce je aktuálně dostupná jako ukázková verze. Doplňkové podmínky použití pro Microsoft Azure Preview obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nejsou vydány v obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight o službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích v komunitě Azure HDInsight.
Pomocí Apache Flinku a Delta Lake společně můžete vytvořit spolehlivou a škálovatelnou architekturu datového jezerahouse. Flink/Delta Připojení or umožňuje zapisovat data do tabulek Delta s transakcemi ACID a přesně jednou zpracovat. To znamená, že datové streamy jsou konzistentní a bez chyb, i když kanál Flink restartujete z kontrolního bodu. Flink/Delta Připojení or zajišťuje, že vaše data nebudou ztracena nebo duplikována a že odpovídají sémantice Flink.
V tomto článku se dozvíte, jak používat konektor Flink-Delta.
- Přečtěte si data z tabulky Delta.
- Zapište data do tabulky Delta.
- Dotazujte se na ni v Power BI.
Co je konektor Flink/Delta
Flink/Delta Připojení or je knihovna JVM pro čtení a zápis dat z aplikací Apache Flink do tabulek Delta využívajících knihovnu Delta Standalone JVM. Konektor poskytuje přesně jednou záruku doručení.
Flink/Delta Připojení or zahrnuje:
DeltaSink pro zápis dat z Apache Flink do tabulky Delta DeltaSource pro čtení tabulek Delta pomocí Apache Flinku
Apache Flink-Delta Připojení or zahrnuje:
V závislosti na verzi konektoru ho můžete použít s následujícími verzemi Apache Flink:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
Další informace najdete v tématu Flink/Delta Připojení or.
Požadavky
- Cluster HDInsight Flink 1.17.0 v AKS
- Flink-Delta Připojení or 0.7.0
- Použití MSI pro přístup k ADLS Gen2
- IntelliJ pro vývoj
Čtení dat z tabulky Delta
Rozdílový zdroj může fungovat v jednom ze dvou režimů, jak je popsáno níže.
Ohraničený režim vhodný pro dávkové úlohy, kde chceme číst obsah tabulky Delta pouze pro konkrétní verzi tabulky. Vytvořte zdroj tohoto režimu pomocí rozhraní DeltaSource.forBoundedRowData API.
Průběžný režim vhodný pro úlohy streamování, kde chceme průběžně kontrolovat nové změny a verze tabulky Delta. Vytvořte zdroj tohoto režimu pomocí rozhraní DeltaSource.forContinuousRowData API.
Příklad: Vytvoření zdroje pro tabulku Delta pro čtení všech sloupců v vázaném režimu Vhodné pro dávkové úlohy. Tento příklad načte nejnovější verzi tabulky.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Další příklad průběžného modelu najdete v tématu Režimy zdroje dat.
Zápis do jímky Delta
Delta Sink v současné době zveřejňuje následující metriky Flink:
Vytvoření jímky pro tabulky bez oddílů
V tomto příkladu si ukážeme, jak vytvořit DeltaSink a připojit ho k existujícímu org.apache.flink.streaming.api.datastream.DataStream
.
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Další příklad vytvoření jímky najdete v tématu Metriky jímky dat.
Úplný kód
Čtení dat z tabulky Delta a jímky do jiné tabulky delta
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Pom.xml Mavenu
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Zabalte soubor JAR a odešlete ho do clusteru Flink a spusťte ho.
Předejte informace o souboru JAR úlohy v clusteru AppMode.
Poznámka:
Při čtení a zápisu do ADLS vždy povolte
hadoop.classpath.enable
.Odešlete cluster, měli byste být schopni zobrazit úlohu v uživatelském rozhraní Flink.
Výsledky najdete v ADLS.
Integrace Power BI
Jakmile jsou data v delta jímce, můžete dotaz spustit v Power BI Desktopu a vytvořit sestavu.
Otevřete Power BI Desktop a získejte data pomocí konektoru ADLS Gen2.
Adresa URL účtu úložiště
Vytvořte dotaz M pro zdroj a vyvoláte funkci, která dotazuje data z účtu úložiště. Další informace najdete v konektorech Delta Power BI.
Jakmile jsou data snadno dostupná, můžete vytvářet sestavy.
Reference
- Rozdílové konektory.
- Rozdílové konektory Power BI
- Názvy apache, Apache Flink, Flink a přidružených opensourcových projektů jsou ochranné známky Apache Software Foundation (ASF).