分享方式:


將 Apache Flink® DataStream 納入 Azure Databricks Delta Lake 數據表

此範例示範如何使用 Azure Databricks 自動載入器,從 AKS 上 HDInsight 上的 Apache Flink 叢集將 Azure ADLS Gen2 中的數據接收至 Delta Lake 數據表。

必要條件

Azure Databricks 自動載入器

Databricks 自動載入器可讓您輕鬆地將數據從 Flink 應用程式串流到 Delta Lake 資料表中的物件記憶體。 自動載入器 提供名為 cloudFiles 的結構化串流來源。

以下是如何在 Azure Databricks 差異實時數據表中使用來自 Flink 的數據的步驟。

在此步驟中,您可以在 Flink SQL 上建立 Kafka 數據表和 ADLS Gen2。 在本檔案中,我們使用 airplanes_state_real_time table。 您可以使用您選擇的任何文章。

您需要使用代碼段中的 Kafka 叢集來更新訊息代理程式 IP。

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

接下來,您可以在 Flink SQL 上建立 ADLSgen2 數據表。

使用您的 ADLS Gen2 詳細數據,更新代碼段中的 container-name 和 storage-account-name。

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

此外,您可以將 Kafka 數據表插入 Flink SQL 上的 ADLSgen2 資料表。

顯示將 Kafka 資料表插入 ADLSgen2 資料表的螢幕快照。

顯示驗證 Flink 上串流作業的螢幕快照。

在 Azure 入口網站 的 Azure 儲存體 中檢查 Kafka 的數據接收

顯示 Azure 儲存體 上 Kafka 檢查數據接收的螢幕快照。

驗證 Azure 儲存體 和 Azure Databricks 筆記本

ADLS Gen2 為您的 Microsoft Entra 應用程式服務主體提供 OAuth 2.0,以從 Azure Databricks Notebook 進行驗證,然後掛接至 Azure Databricks DBFS。

讓我們取得服務主體 appid、租使用者標識碼和秘密密鑰。

顯示取得服務主體 appid、租使用者標識碼和秘密密鑰的螢幕快照。

在 Azure 入口網站 上授與服務主體 儲存體 Blob 數據擁有者

此螢幕快照顯示 Azure 入口網站 上 儲存體 Blob 數據擁有者的服務主體。

在 Azure Databricks Notebook 上將 ADLS Gen2 掛接至 DBFS

顯示 Azure Databricks Notebook 上將 ADLS Gen2 掛接至 DBFS 的螢幕快照。

準備筆記本

讓我們撰寫下列程式代碼:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

定義 Delta Live Table Pipeline 並在 Azure Databricks 上執行

顯示 Delta Live Table Pipeline 並在 Azure Databricks 上執行的螢幕快照。

顯示 Delta Live Table Pipeline 並在 Azure Databricks 上執行的螢幕快照。

檢查 Azure Databricks Notebook 上的 Delta Live Table

顯示 Azure Databricks Notebook 上差異實時數據表的螢幕快照。

參考