Connect to OneLake in Microsoft Fabric with HDInsight on AKS cluster for Apache Flink®
Important
This feature is currently in preview. The Supplemental Terms of Use for Microsoft Azure Previews include more legal terms that apply to Azure features that are in beta, in preview, or otherwise not yet released into general availability. For information about this specific preview, see Azure HDInsight on AKS preview information. For questions or feature suggestions, please submit a request on AskHDInsight with the details and follow us for more updates on Azure HDInsight Community.
This example demonstrates on how to use HDInsight on AKS cluster for Apache Flink® with Microsoft Fabric.
Microsoft Fabric is an all-in-one analytics solution for enterprises that covers everything from data movement to data science, Real-Time Analytics, and business intelligence. It offers a comprehensive suite of services, including data lake, data engineering, and data integration, all in one place.
With Fabric, you don't need to piece together different services from multiple vendors. Instead, you can enjoy a highly integrated, end-to-end, and easy-to-use product that is designed to simplify your analytics needs.
In this example, you learn how to connect to OneLake in Microsoft Fabric with HDInsight on AKS cluster for Apache Flink.
Prerequisites
- Flink cluster on HDInsight on AKS
- Create a License Mode of at least Premium Capacity Workspace on Power BI
- Create a Lake House on this workspace
Connect to One Lake Storage
Microsoft Fabric and Lakehouse
Lakehouse in Microsoft Fabric
Microsoft Fabric Lakehouse is a data architecture platform for storing, managing, and analyzing structured and unstructured data in a single location.
Note
Microsoft Fabric is in preview
Managed identity access to the Fabric workspace
In this step, we provide access to the user managed identity to Fabric. You're required to type the user assigned managed identity and add to your Fabric workspace.
Prepare a Delta table under LakeHouse Files folder
In this step, you see how we prepare a Delta table on the lakehouse on Microsoft Fabric; Flink developers can build into broader Lakehouse architecture with this setup.
Apache Flink DataStream Source code
In this step, we prepare the jar to submit to the HDInsight on AKS, Apache Flink cluster.
This step illustrates, that we package dependencies needed for onelakeDemo
maven pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
main: onelakeDemo
In this step, we read parquet file on Fabric lakehouse and then sink to another file in the same folder:
package contoso.example;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
public class onelakeDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Path sourcePath = new Path("abfss://contosoworkspace1@msit-onelake.dfs.fabric.microsoft.com/contosolakehouse.Lakehouse/Files/delta/tab1/");
Path sinkPath = new Path("abfss://contosoworkspace1@msit-onelake.dfs.fabric.microsoft.com/contosolakehouse.Lakehouse/Files/delta/tab1_out/");
Schema avroSchema = new Schema.Parser()
.parse("{\"type\":\"record\",\"name\":\"example\",\"fields\":[{\"name\":\"Date\",\"type\":\"string\"},{\"name\":\"Time\",\"type\":\"string\"},{\"name\":\"TargetTemp\",\"type\":\"string\"},{\"name\":\"ActualTemp\",\"type\":\"string\"},{\"name\":\"System\",\"type\":\"string\"},{\"name\":\"SystemAge\",\"type\":\"string\"},{\"name\":\"BuildingID\",\"type\":\"string\"}]}");
FileSource<GenericRecord> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(avroSchema), sourcePath)
.build();
StreamingFileSink<GenericRecord> sink =
StreamingFileSink.forBulkFormat(
sinkPath,
ParquetAvroWriters.forGenericRecord(avroSchema))
.build();
env.enableCheckpointing(10L);
DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
stream.addSink((SinkFunction<GenericRecord>) sink);
env.execute();
}
}
Package the jar and upload it into Webssh and submit the job:
bin/flink run -c contoso.example.onelakeDemo -j OneLakeDemo-1.0-SNAPSHOT.jar
Check Job running on Flink UI:
Results on Microsoft Fabric
Let's check the output on Microsoft Fabric
References
- Microsoft Fabric
- Microsoft Fabric Lakehouse
- Apache, Apache Flink, Flink, and associated open source project names are trademarks of the Apache Software Foundation (ASF).
Feedback
https://aka.ms/ContentUserFeedback.
Coming soon: Throughout 2024 we will be phasing out GitHub Issues as the feedback mechanism for content and replacing it with a new feedback system. For more information see:Submit and view feedback for