將 Apache Flink® DataStream 納入 Azure Databricks Delta Lake 數據表
此範例示範如何使用 Azure Databricks 自動載入器,從 AKS 上 HDInsight 上的 Apache Flink 叢集將 Azure ADLS Gen2 中的數據接收至 Delta Lake 數據表。
必要條件
- AKS 上的 HDInsight 上的 Apache Flink 1.17.0
- HDInsight 上的 Apache Kafka 3.2
- AKS 上與 HDInsight 相同虛擬網路中的 Azure Databricks
- ADLS Gen2 和服務主體
Azure Databricks 自動載入器
Databricks 自動載入器可讓您輕鬆地將數據從 Flink 應用程式串流到 Delta Lake 資料表中的物件記憶體。 自動載入器 提供名為 cloudFiles 的結構化串流來源。
以下是如何在 Azure Databricks 差異實時數據表中使用來自 Flink 的數據的步驟。
在 Apache Flink® SQL 上建立 Apache Kafka® 數據表
在此步驟中,您可以在 Flink SQL 上建立 Kafka 數據表和 ADLS Gen2。 在本檔案中,我們使用 airplanes_state_real_time table
。 您可以使用您選擇的任何文章。
您需要使用代碼段中的 Kafka 叢集來更新訊息代理程式 IP。
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
接下來,您可以在 Flink SQL 上建立 ADLSgen2 數據表。
使用您的 ADLS Gen2 詳細數據,更新代碼段中的 container-name 和 storage-account-name。
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
此外,您可以將 Kafka 數據表插入 Flink SQL 上的 ADLSgen2 資料表。
驗證 Flink 上的串流作業
在 Azure 入口網站 的 Azure 儲存體 中檢查 Kafka 的數據接收
驗證 Azure 儲存體 和 Azure Databricks 筆記本
ADLS Gen2 為您的 Microsoft Entra 應用程式服務主體提供 OAuth 2.0,以從 Azure Databricks Notebook 進行驗證,然後掛接至 Azure Databricks DBFS。
讓我們取得服務主體 appid、租使用者標識碼和秘密密鑰。
在 Azure 入口網站 上授與服務主體 儲存體 Blob 數據擁有者
在 Azure Databricks Notebook 上將 ADLS Gen2 掛接至 DBFS
準備筆記本
讓我們撰寫下列程式代碼:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
定義 Delta Live Table Pipeline 並在 Azure Databricks 上執行
檢查 Azure Databricks Notebook 上的 Delta Live Table
參考
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的 開放原始碼 項目名稱是 Apache Software Foundation (ASF) 的商標。