使用 Apache Airflow 協調 Azure Databricks 作業

本文說明使用 Azure Databricks 協調數據管線的 Apache Airflow 支援,提供在本機安裝和設定 Airflow 的指示,並提供使用 Airflow 部署和執行 Azure Databricks 工作流程的範例。

數據管線中的作業協調流程

開發和部署數據處理管線通常需要管理工作之間的複雜相依性。 例如,管線可能會從來源讀取數據、清除數據、轉換已清除的數據,以及將轉換的數據寫入目標。 當您操作管線時,您也需要測試、排程和疑難解答錯誤的支援。

工作流程系統可讓您定義工作之間的相依性、排程管線執行的時間,以及監視工作流程,來解決這些挑戰。 Apache Airflow 是管理及排程數據管線 開放原始碼 解決方案。 數據流代表數據管線,做為作業的有向無迴圈圖(DAG)。 您可以在 Python 檔案中定義工作流程,而 Airflow 會管理排程和執行。 Airflow Azure Databricks 連線可讓您利用 Azure Databricks 所提供的優化 Spark 引擎,以及 Airflow 的排程功能。

需求

  • Airflow 與 Azure Databricks 之間的整合需要 Airflow 2.5.0 版和更新版本。 本文中的範例會使用 Airflow 2.6.1 版進行測試。
  • Airflow 需要 Python 3.8、3.9、3.10 或 3.11。 本文中的範例會使用 Python 3.8 進行測試。
  • 本文中安裝和執行 Airflow 的指示需要 pipenv 來建立 Python 虛擬環境

Databricks 的空氣流量運算符

Airflow DAG 是由工作所組成,其中每個工作都會執行 Airflow 運算符。 支援與 Databricks 整合的 Airflow 運算符會在 Databricks 提供者中實作。

Databricks 提供者包含針對 Azure Databricks 工作區執行多項工作的運算符,包括 將數據匯入數據表執行 SQL 查詢,以及使用 Databricks Git 資料夾

Databricks 提供者會實作兩個運算符來觸發作業:

若要建立新的 Azure Databricks 作業或重設現有的作業,Databricks 提供者會實作 DatabricksCreateJobsOperator。 會 DatabricksCreateJobsOperator 使用 POST /api/2.1/jobs/createPOST /api/2.1/jobs/reset API 要求。 您可以使用 DatabricksCreateJobsOperator 搭配 DatabricksRunNowOperator 來建立及執行作業。

注意

使用 Databricks 運算子來觸發作業需要提供 Databricks 連線設定中的認證。 請參閱 建立適用於 Airflow 的 Azure Databricks 個人存取令牌。

Databricks Airflow 運算符會將作業執行頁面 URL 寫入至 Airflow 記錄每一個 polling_period_seconds (預設值為 30 秒)。 如需詳細資訊,請參閱 Airflow 網站上的 apache-airflow-providers-databricks 套件頁面。

在本機安裝 Airflow Azure Databricks 整合

若要在本機安裝 Airflow 和 Databricks 提供者以進行測試和開發,請使用下列步驟。 如需其他 Airflow 安裝選項,包括建立生產安裝,請參閱 Airflow 檔中的安裝

開啟終端機,然後執行下列命令:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

以您的使用者名稱與電子郵件取代 <firstname><lastname><email> 。 系統會提示您輸入系統管理員用戶的密碼。 請務必儲存此密碼,因為必須登入 Airflow UI。

此文稿會執行下列步驟:

  1. 建立名為 airflow 的目錄,並將變更至該目錄。
  2. 用來 pipenv 建立和繁衍 Python 虛擬環境。 Databricks 建議使用 Python 虛擬環境來隔離套件版本和程式代碼相依性。 此隔離有助於減少非預期的套件版本不符和程式代碼相依性衝突。
  3. 初始化名為 AIRFLOW_HOME 設定為目錄路徑的 airflow 環境變數。
  4. 安裝 Airflow 和 Airflow Databricks 提供者套件。
  5. 建立 airflow/dags 目錄。 Airflow 會 dags 使用 目錄來儲存 DAG 定義。
  6. 初始化 Airflow 用來追蹤元數據的 SQLite 資料庫。 在生產 Airflow 部署中,您會使用標準資料庫設定 Airflow。 您的 Airflow 部署的 SQLite 資料庫和預設組態會在 目錄中初始化 airflow
  7. 建立 Airflow 的系統管理員使用者。

提示

若要確認 Databricks 提供者的安裝,請在 Airflow 安裝目錄中執行下列命令:

airflow providers list

啟動 Airflow 網頁伺服器和排程器

需要 Airflow 網頁伺服器才能檢視 Airflow UI。 若要啟動網頁伺服器,請在 Airflow 安裝目錄中開啟終端機,然後執行下列命令:

注意

如果 Airflow 網頁伺服器因為埠衝突而無法啟動,您可以在 Airflow 設定中變更預設埠。

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

排程器是排程 DAG 的 Airflow 元件。 若要啟動排程器,請在 Airflow 安裝目錄中開啟新的終端機,然後執行下列命令:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

測試 Airflow 安裝

若要確認 Airflow 安裝,您可以執行 Airflow 隨附的其中一個範例 DAG:

  1. 在瀏覽器視窗中,開啟 http://localhost:8080/home。 使用您在安裝 Airflow 時建立的使用者名稱和密碼登入 Airflow UI。 [Airflow DAG ] 頁面隨即出現。
  2. 按兩下 [暫停/取消暫停DAG] 切換以取消暫停其中一個範例DAG,例如 。example_python_operator
  3. 按兩下 [觸發 DAG] 按鈕以 觸發範例 DAG
  4. 按兩下 DAG 名稱以檢視詳細資料,包括 DAG 的執行狀態。

建立 Airflow 的 Azure Databricks 個人存取令牌

Airflow 會使用 Azure Databricks 個人存取令牌 (PAT) 連線到 Databricks。 若要建立 PAT:

  1. 在 Azure Databricks 工作區中,按兩下頂端列中的 Azure Databricks 使用者名稱,然後從下拉式清單中選取 [設定]。
  2. 按兩下 [ 開發人員]。
  3. 按兩下 [存取令牌] 旁的 [管理]。
  4. 按兩下 [ 產生新的令牌]。
  5. (選擇性)輸入批注,協助您在未來識別此令牌,並變更令牌的預設存留期 90 天。 若要建立沒有存留期的令牌(不建議),請將 [ 存留期(天)] 方塊保留空白(空白)。
  6. 按一下 [產生]
  7. 將顯示的令牌複製到安全的位置,然後按兩下 [ 完成]。

注意

請務必將複製的令牌儲存在安全的位置。 請勿與其他人共享複製的令牌。 如果您遺失複製的令牌,就無法重新產生完全相同的令牌。 相反地,您必須重複此程式來建立新的令牌。 如果您遺失複製的令牌,或您認為令牌已遭入侵,Databricks 強烈建議您按兩下存取令牌頁面上令牌旁邊的垃圾桶 (Revoke) 圖示,立即從工作區中刪除該令牌

如果您無法在工作區中建立或使用令牌,這可能是因為您的工作區系統管理員已停用令牌,或未授與您建立或使用令牌的許可權。 請參閱您的工作區管理員或下列專案:

注意

作為安全性最佳做法,當您使用自動化工具、系統、腳本和應用程式進行驗證時,Databricks 建議您使用屬於 服務主體 的個人存取令牌,而不是工作區使用者。 若要建立服務主體的令牌,請參閱 管理服務主體的令牌。

您也可以使用 Microsoft Entra ID(先前稱為 Azure Active Directory) 令牌向 Azure Databricks 進行驗證。 請參閱 Airflow 檔中的 Databricks 連線 ion

設定 Azure Databricks 連線

您的 Airflow 安裝包含 Azure Databricks 的預設連線。 若要使用您在上面建立的個人存取令牌更新連線至工作區:

  1. 在瀏覽器視窗中,開啟 http://localhost:8080/connection/list/。 如果系統提示您登入,請輸入您的系統管理員使用者名稱和密碼。
  2. 在 [Conn 標識符] 下,找出databricks_default,然後按兩下 [編輯記錄] 按鈕。
  3. 以 Azure Databricks 部署的工作區實例名稱取代 [主機] 欄位中的值,例如 https://adb-123456789.cloud.databricks.com
  4. 在 [ 密碼] 字段中,輸入您的 Azure Databricks 個人存取令牌。
  5. 按一下 [檔案] 。

如果您使用 Microsoft Entra ID 令牌,請參閱 Airflow 檔中的 Databricks 連線 ion,以取得設定驗證的相關信息。

範例:建立 Airflow DAG 以執行 Azure Databricks 作業

下列範例示範如何建立可在本機計算機上執行的簡單Airflow部署範例 DAG 以觸發 Azure Databricks 中的執行。 在這裡範例中,您將:

  1. 建立新的筆記本,並新增程序代碼,以根據已設定的參數列印問候語。
  2. 使用執行筆記本的單一工作建立 Azure Databricks 作業。
  3. 設定 Azure Databricks 工作區的 Airflow 連線。
  4. 建立 Airflow DAG 以觸發筆記本作業。 您可以使用 在 Python 腳本 DatabricksRunNowOperator中定義 DAG。
  5. 使用 Airflow UI 來觸發 DAG 並檢視執行狀態。

建立筆記本

此範例使用包含兩個儲存格的筆記本:

若要建立筆記本:

  1. 移至您的 Azure Databricks 工作區,按兩下新增圖示提要字段中的 [新增],然後選取 [Notebook]。

  2. 為您的筆記本命名,例如 Hello Airflow,並確定預設語言已設定為 Python

  3. 複製下列 Python 程式代碼,並將它貼到筆記本的第一個數據格中。

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. 在第一個儲存格下方新增儲存格,並將下列 Python 程式代碼複製並貼到新的資料格中:

    print("hello {}".format(greeting))
    

建立作業

  1. 按兩下 作業圖示提要欄位中的 [工作流程 ]。

  2. 按一下 建立作業按鈕

    [工作] 索引標籤隨即出現,其中包含 [建立工作] 對話框。

    建立第一個工作對話框

  3. 將 [新增作業的名稱...] 取代為您的作業名稱。

  4. 在 [ 任務名稱] 字段中,輸入任務的名稱 ,例如 greeting-task

  5. 在 [ 類型 ] 下拉功能表中,選取 [筆記本]。

  6. 在 [ 來源 ] 下拉功能表中,選取 [ 工作區]。

  7. 按兩下 [ 路徑] 文字框,並使用檔案瀏覽器來尋找您建立的筆記本、按下筆記本名稱,然後按兩下 [ 確認]。

  8. 按兩下 [參數] 底下的 [新增]。 在 [ 金鑰 ] 欄位中, 輸入 greeting。 在 [ 值] 欄位中,輸入 Airflow user

  9. 按兩下 [ 建立工作]。

在 [ 作業詳細數據] 面板中,複製 [作業標識符 ] 值。 需要這個值,才能從 Airflow 觸發作業。

執行作業

若要在 Azure Databricks 工作流程 UI 中測試您的新作業,請按兩下 [立即執行] 按鈕 右上角。 執行完成時,您可以檢視 作業執行詳細數據來驗證輸出。

建立新的 Airflow DAG

您可以在 Python 檔案中定義 Airflow DAG。 若要建立 DAG 以觸發範例筆記本作業:

  1. 在文字編輯器或 IDE 中,使用下列內容建立名為 databricks_dag.py 的新檔案:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    將取代 JOB_ID 為稍早儲存之作業標識碼的值。

  2. 將檔案儲存在 airflow/dags 目錄中。 Airflow 會自動讀取並安裝儲存在 中的 airflow/dags/DAG 檔案。

在 Airflow 中安裝和驗證 DAG

若要觸發並驗證 Airflow UI 中的 DAG:

  1. 在瀏覽器視窗中,開啟 http://localhost:8080/home。 [數據流 DAG ] 畫面隨即出現。
  2. 找出 databricks_dag 並按兩下 [暫停/取消暫停 DAG ] 切換以取消暫停 DAG。
  3. 按兩下 [觸發 DAG] 按鈕以 觸發 DAG
  4. 按兩下 [ 執行] 資料列中的執行,以檢視執行的狀態和詳細數據。