Flink/Delta Bağlayıcısı'nı kullanma
Önemli
Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.
Apache Flink ve Delta Lake'i birlikte kullanarak güvenilir ve ölçeklenebilir bir data lakehouse mimarisi oluşturabilirsiniz. Flink/Delta Bağlayıcısı, ACID işlemleriyle ve tam olarak bir kez işleme ile Delta tablolarına veri yazmanızı sağlar. Bu, Flink işlem hattınızı bir denetim noktasından yeniden başlatsanız bile veri akışlarınızın tutarlı ve hatasız olduğu anlamına gelir. Flink/Delta Bağlayıcısı, verilerinizin kaybolmamasını veya çoğaltılmamasını ve Flink semantiğiyle eşleşmesini sağlar.
Bu makalede Flink-Delta bağlayıcısını kullanmayı öğreneceksiniz.
- Delta tablosundaki verileri okuyun.
- Verileri bir delta tablosuna yazın.
- Power BI'da sorgulayın.
Flink/Delta bağlayıcısı nedir?
Flink/Delta Connector, Tek Başına Delta JVM kitaplığını kullanarak Apache Flink uygulamalarından Delta tablolarına veri okumak ve yazmak için bir JVM kitaplığıdır. Bağlayıcı, tam olarak bir kez teslim garantisi sağlar.
Flink/Delta Connector şunları içerir:
Apache Flink'ten Delta tablosuna veri yazmak için DeltaSink. Apache Flink kullanarak Delta tablolarını okumak için DeltaSource.
Apache Flink-Delta Bağlayıcısı şunları içerir:
Bağlayıcının sürümüne bağlı olarak, bunu aşağıdaki Apache Flink sürümleriyle kullanabilirsiniz:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
Önkoşullar
- AKS üzerinde HDInsight Flink 1.17.0 kümesi
- Flink-Delta Bağlayıcısı 0.7.0
- ADLS 2. Nesil'e erişmek için MSI kullanma
- Geliştirme için IntelliJ
Delta tablosundan veri okuma
Delta Kaynağı, aşağıdaki gibi açıklanan iki moddan birinde çalışabilir.
Sınırlanmış Mod Yalnızca belirli bir tablo sürümü için Delta tablosunun içeriğini okumak istediğimiz toplu işler için uygundur. DeltaSource.forBoundedRowData API'sini kullanarak bu modun kaynağını oluşturun.
Sürekli Mod Yeni değişiklikler ve sürümler için Delta tablosunu sürekli denetlemek istediğimiz akış işleri için uygundur. DeltaSource.forContinuousRowData API'sini kullanarak bu modun kaynağını oluşturun.
Örnek: Sınırlanmış modda tüm sütunları okumak için Delta tablosu için kaynak oluşturma. Toplu işler için uygundur. Bu örnek en son tablo sürümünü yükler.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Delta havuzuna yazma
Delta Havuzu şu anda aşağıdaki Flink ölçümlerini kullanıma sunar:
Bölümlenmemiş tablolar için havuz oluşturma
Bu örnekte DeltaSink oluşturma ve var olan org.apache.flink.streaming.api.datastream.DataStream
bir öğesine bağlama gösterilmektedir.
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Tam kod
Delta tablosundaki ve havuzdaki verileri başka bir delta tablosuna okuma.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Jar dosyasını paketleyin ve çalıştırmak için Flink kümesine gönderin
AppMode kümesinde iş jar bilgilerini geçirin.
Not
ADLS'de okurken/yazarken her zaman etkinleştirin
hadoop.classpath.enable
.Kümeyi gönderin, işi Flink kullanıcı arabiriminde görebilmeniz gerekir.
ADLS'de Sonuçları Bul.
Power BI tümleştirmesi
Veriler delta havuzuna girdikten sonra sorguyu Power BI desktop'ta çalıştırabilir ve bir rapor oluşturabilirsiniz.
ADLS 2. Nesil bağlayıcısını kullanarak verileri almak için Power BI desktop'ı açın.
Depolama hesabının URL'si.
Kaynak için M sorgusu oluşturun ve depolama hesabından verileri sorgulayan işlevini çağırın.
Veriler kullanıma sunulduktan sonra rapor oluşturabilirsiniz.
Başvurular
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin