分享方式:


使用 Apache Airflow 協調 Azure Databricks 作業

本文說明 Apache Airflow 對使用 Azure Databricks 協調資料管線的支援,提供在本機安裝和設定 Airflow 的指示,並提供使用 Airflow 部署和執行 Azure Databricks 工作流程的範例。

資料管線中的作業協調流程

開發和部署資料處理管線通常需要管理工作之間的複雜相依性。 例如,管線可能會從來源讀取資料、清理資料、轉換已清理的資料,以及將轉換的資料寫入目標。 當您運作管線時,您也需要測試、排程和錯誤疑難排解支援。

工作流程系統可讓您定義工作之間的相依性、排程管線執行時間,以及監視工作流程,來解決這些挑戰。 Apache Airflow 是管理及排程資料管線的開放原始碼解決方案。 Airflow 以有向非循環圖 (DAG) 的形式表示作業的資料管線。 您可以在 Python 檔案中定義工作流程,而 Airflow 會管理排程和執行。 Airflow Azure Databricks 連線可讓您利用 Azure Databricks 所提供的最佳化 Spark 引擎,以及 Airflow 的排程功能。

需求

  • Airflow 與 Azure Databricks 之間的整合需要 Airflow 2.5.0 版和更新版本。 本文中的範例會使用 Airflow 2.6.1 版進行測試。
  • Airflow 需要使用 Python 3.8、3.9、3.10 或 3.11。 本文中的範例會使用 Python 3.8 進行測試。
  • 本文中安裝和執行 Airflow 的指示需要使用 pipenv 來建立 Python 虛擬環境

Databricks 的 Airflow 運算子

Airflow DAG 由工作組成,其中每個工作都會執行 Airflow 運算子。 支援與 Databricks 整合的 Airflow 運算子會在 Databricks 提供者中實作。

Databricks 提供者包含針對 Azure Databricks 工作區執行多項工作的運算子,這包括將資料匯入資料表執行 SQL 查詢,以及使用 Databricks Git 資料夾

Databricks 提供者會實作兩個運算子來觸發作業:

若要建立新的 Azure Databricks 作業或重設現有的作業,Databricks 提供者會實作 DatabricksCreateJobsOperatorDatabricksCreateJobsOperator 會使用 POST /api/2.1/jobs/createPOST /api/2.1/jobs/reset API 要求。 您可以搭配 DatabricksCreateJobsOperator 使用 DatabricksRunNowOperator,來建立和執行作業。

注意

使用 Databricks 運算子來觸發作業,需要提供 Databricks 連線設定中的認證。 請參閱建立適用於 Airflow 的 Azure Databricks 個人存取權杖

Databricks Airflow 運算子會每 polling_period_seconds (預設值為 30 秒) 一次,將作業執行頁面 URL 寫入至 Airflow 記錄。 如需詳細資訊,請參閱 Airflow 網站上的 apache-airflow-providers-databricks 套件頁面。

在本機安裝 Airflow Azure Databricks 整合

若要在本機安裝用於測試和開發的 Airflow 和 Databricks 提供者,請使用下列步驟。 如需其他 Airflow 安裝選項,包括建立生產安裝,請參閱 Airflow 文件中的安裝

開啟終端機,然後執行下列命令:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

以您的使用者名稱和電子郵件取代 <firstname><lastname><email>。 系統會提示您輸入管理使用者密碼。 請務必儲存此密碼,因為登入 Airflow UI 需要它。

此指令碼會執行下列步驟:

  1. 建立名為 airflow 的目錄,並變更至該目錄。
  2. 使用 pipenv 來建立和繁衍 Python 虛擬環境。 Databricks 建議使用 Python 虛擬環境,將套件版本和程式碼相依性與該環境隔離。 此隔離有助於減少非預期的套件版本不符和程式碼相依性衝突。
  3. 初始化名為 AIRFLOW_HOME 的環境變數,將其設定為 airflow 目錄的路徑。
  4. 安裝 Airflow 和 Airflow Databricks 提供者套件。
  5. 建立 airflow/dags 目錄。 Airflow 會使用 dags 目錄來儲存 DAG 定義。
  6. 初始化 Airflow 用來追蹤中繼資料的 SQLite 資料庫。 在生產 Airflow 部署中,您會使用標準資料庫設定 Airflow。 Airflow 部署的 SQLite 資料庫和預設組態會在 airflow 目錄中初始化。
  7. 建立 Airflow 的管理使用者。

提示

若要確認 Databricks 提供者的安裝,請在 Airflow 安裝目錄中執行下列命令:

airflow providers list

啟動 Airflow 網頁伺服器和排程器

需要 Airflow 網頁伺服器才能檢視 Airflow UI。 若要啟動網頁伺服器,請在 Airflow 安裝目錄中開啟終端機,然後執行下列命令:

注意

如果 Airflow 網頁伺服器因為連接埠衝突而無法啟動,您可以在 Airflow 設定中變更預設連接埠。

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

排程器是排程 DAG 的 Airflow 元件。 若要啟動排程器,請在 Airflow 安裝目錄中開啟新的終端機,然後執行下列命令:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

測試 Airflow 安裝

若要確認 Airflow 安裝,您可以執行 Airflow 隨附的其中一個範例 DAG:

  1. 在瀏覽器視窗中,開啟 http://localhost:8080/home。 使用使用者名稱和您在安裝 Airflow 時所建立的密碼來登入 Airflow UI。 Airflow [DAG] 頁面隨即出現。
  2. 按一下 [暫停/取消暫停 DAG] 切換,以取消暫停其中一個範例 DAG,例如 example_python_operator
  3. 按一下 [觸發 DAG] 按鈕以觸發範例 DAG
  4. 按一下 DAG 名稱以檢視詳細資料,包括 DAG 的執行狀態。

建立適用於 Airflow 的 Azure Databricks 個人存取權杖

Airflow 會使用 Azure Databricks 個人存取權杖 (PAT) 連線到 Databricks。 若要建立 PAT:

  1. 在 Azure Databricks 工作區中,按一下頂端列中的 Azure Databricks 使用者名稱,然後從下拉式清單中選取 [設定]
  2. 按一下 [開發人員]
  3. 在 [存取權杖] 旁,按一下 [管理]
  4. 按一下 產生新權杖
  5. (選擇性) 輸入可協助您之後識別此權杖的註解,並變更權杖的預設存留期 90 天。 若要建立沒有存留期的權杖 (不建議),請將 [存留期 (天)] 方塊留空 (空白)。
  6. 按一下 產生
  7. 將顯示的權杖複製到安全位置,然後選取 [完成]

注意

請務必將複製的權杖儲存在安全位置。 請勿與其他人共用複製的權杖。 如果您遺失複製的權杖,就無法重新產生完全相同的權杖。 相反地,您必須重複此程序來建立新的權杖。 如果您遺失複製的權杖,或您認為權杖已遭入侵,Databricks 強烈建議您按一下 [存取權杖] 頁面上權杖旁邊的垃圾桶 (撤銷) 圖示,立即從工作區中刪除該權杖。

注意:如果您無法在工作區中建立或使用 PAT,這可能是因為您的工作區系統管理員已停用權杖,或未授與您建立或使用權杖的權限。 請諮詢您的工作區系統管理員或參閱下列主題:

注意

作為安全性最佳做法,當您使用自動化工具、系統、指令碼和應用程式進行驗證時,Databricks 建議您使用屬於服務主體的個人存取權杖,而不是工作區使用者。 若要建立服務主體的權杖,請參閱管理服務主體的權杖

您也可以使用 Microsoft Entra ID 權杖向 Azure Databricks 進行驗證。 請參閱 Airflow 文件中的 Databricks 連線

設定 Azure Databricks 連線

您的 Airflow 安裝包含 Azure Databricks 的預設連線。 要使用以上建立的個人存取權杖來更新連線,以連線到工作區:

  1. 在瀏覽器視窗中,開啟 http://localhost:8080/connection/list/。 出現登入提示時,輸入系統管理員使用者名稱和密碼。
  2. 在 [連線 ID] 下,找出 databricks_default,然後按一下 [編輯記錄] 按鈕。
  3. 以 Azure Databricks 部署的工作區執行個體名稱,例如 https://adb-123456789.cloud.databricks.com,取代 [主機] 欄位中的值。
  4. 在 [密碼] 欄位中,輸入 Azure Databricks 個人存取權杖。
  5. 按一下 [檔案] 。

如果您使用 Microsoft Entra ID 權杖,請參閱 Airflow 文件中的 Databricks 連線,以取得設定驗證的相關資訊。

範例:建立 Airflow DAG 以執行 Azure Databricks 作業

下列範例示範如何建立一個簡單的 Airflow 部署,其在本機電腦上執行並會部署一個範例 DAG 以觸發 Azure Databricks 中的執行。 在此範例中,您將會:

  1. 建立新的筆記本,並新增程式碼,以根據已設定的參數列印問候語。
  2. 使用執行筆記本的單一工作建立一個 Azure Databricks 作業。
  3. 設定與 Azure Databricks 工作區的 Airflow 連線。
  4. 建立一個 Airflow DAG 以觸發筆記本作業。 您可以使用 DatabricksRunNowOperator 在 Python 指令碼中定義 DAG。
  5. 使用 Airflow UI 來觸發 DAG 並檢視執行狀態。

建立筆記本

此範例使用包含兩個儲存格的筆記本:

  • 第一個儲存格包含 Databricks 公用程式文字小工具,其定義名為 greeting 的變數並設定為預設值 world
  • 第二個儲存格會列印 greeting 變數的值,前面加上 hello 前置詞。

若要建立筆記本:

  1. 移至您的 Azure Databricks 工作區,按一下新增圖示側邊欄中的 [新增],然後選取 [筆記本]。

  2. 為您的筆記本命名,例如 Hello Airflow,並確定預設語言已設定為 Python

  3. 將下列 Python 程式碼複製並貼到筆記本的第一個儲存格中。

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. 在第一個儲存格下方新增一個儲存格,並將下列 Python 程式碼複製並貼到新的儲存格中:

    print("hello {}".format(greeting))
    

建立作業

  1. 按一下 工作流程圖示 側邊欄中的 [工作流程]。

  2. 按一下 [建立作業] 按鈕

    [工作] 索引標籤隨即出現,其中包含 [建立工作] 對話方塊。

    建立第一個工作對話方塊

  3. 將 [新增作業名稱...] 取代為您的作業名稱。

  4. 在 [任務名稱] 欄位中,輸入任務的名稱,例如 greeting-task

  5. 在 [類型] 下拉式功能表中,選取 [筆記本]。

  6. 在 [來源] 下拉式功能表中,選取 [工作區]。

  7. 按一下 [路徑] 文字輸入框,並使用檔案瀏覽器來尋找您建立的筆記本,按一下筆記本名稱,然後按一下 [確認]。

  8. 按一下 [參數] 底下的 [新增]。 在 [金鑰] 欄位中,輸入 greeting。 在 [值] 欄位中輸入 Airflow user

  9. 按一下 [建立工作]

在 [作業詳細資料] 面板中,複製 [作業 ID] 值。 需要這個值,才能從 Airflow 觸發作業。

執行作業

若要在 Azure Databricks 作業 UI 中測試您的新作業,請按一下右上角的 [立即執行] 按鈕。 完成執行後,您可以透過檢視 [作業執行詳細資料] 來驗證輸出。

建立新的 Airflow DAG

您可以在 Python 檔案中定義 Airflow DAG。 若要建立 DAG 以觸發範例筆記本作業:

  1. 在文字編輯器或 IDE 中,建立包含下列內容、名為 databricks_dag.py 的新檔案:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    JOB_ID 取代為稍早儲存的作業 ID 值。

  2. 將檔案儲存在 airflow/dags 目錄中。 Airflow 會自動讀取並安裝儲存在 airflow/dags/ 中的 DAG 檔案。

在 Airflow 中安裝和驗證 DAG

若要觸發並驗證 Airflow UI 中的 DAG:

  1. 在瀏覽器視窗中,開啟 http://localhost:8080/home。 Airflow [DAG] 畫面隨即出現。
  2. 找出 databricks_dag 並按一下 [暫停/取消暫停 DAG] 切換以取消暫停 DAG。
  3. 按一下 [觸發 DAG] 按鈕以觸發 DAG
  4. 按一下 [執行] 資料列中的執行,以檢視該執行的狀態和詳細資料。