Muokkaa

Jaa


Incorporate Apache Flink® DataStream into Azure Databricks Delta Lake Tables

This example shows how to sink stream data in Azure ADLS Gen2 from Apache Flink cluster on HDInsight on AKS into Delta Lake tables using Azure Databricks Auto Loader.

Prerequisites

Azure Databricks Auto Loader

Databricks Auto Loader makes it easy to stream data land into object storage from Flink applications into Delta Lake tables. Auto Loader provides a Structured Streaming source called cloudFiles.

Here are the steps how you can use data from Flink in Azure Databricks delta live tables.

In this step, you can create Kafka table and ADLS Gen2 on Flink SQL. In this document, we're using a airplanes_state_real_time table. You can use any article of your choice.

You need to update the broker IPs with your Kafka cluster in the code snippet.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Next, you can create ADLSgen2 table on Flink SQL.

Update the container-name and storage-account-name in the code snippet with your ADLS Gen2 details.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Further, you can insert Kafka table into ADLSgen2 table on Flink SQL.

Screenshot shows insert Kafka table into ADLSgen2 table.

Screenshot shows validate the streaming job on Flink.

Check data sink from Kafka in Azure Storage on Azure portal

Screenshot shows check data sink from Kafka on Azure Storage.

Authentication of Azure Storage and Azure Databricks notebook

ADLS Gen2 provides OAuth 2.0 with your Microsoft Entra application service principal for authentication from an Azure Databricks notebook and then mount into Azure Databricks DBFS.

Let's get service principle appid, tenant ID, and secret key.

Screenshot shows get service principle appid, tenant ID, and secret key.

Grant service principle the Storage Blob Data Owner on Azure portal

Screenshot shows service principle the Storage Blob Data Owner on Azure portal.

Mount ADLS Gen2 into DBFS, on Azure Databricks notebook

Screenshot shows mount ADLS Gen2 into DBFS, on Azure Databricks notebook.

Prepare notebook

Let's write the following code:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Define Delta Live Table Pipeline and run on Azure Databricks

Screenshot shows Delta Live Table Pipeline and run on Azure Databricks.

Screenshot shows Delta Live Table Pipeline and run on the Azure Databricks.

Check Delta Live Table on Azure Databricks Notebook

Screenshot shows check Delta Live Table on Azure Databricks Notebook.

Reference