共用方式為


在 Apache Airflow 作業中安裝私人套件作為需求

注意

Apache Airflow 作業是由 Apache Airflow 提供電源。

Python 套件可讓您將相關的 Python 模組組織成單一目錄階層。 套件通常會以目錄表示,其中包含稱為 init.py 的特殊檔案。 在套件目錄內,您可以有多個可定義函數、類別和變數的 Python 模組檔案 (.py 檔案)。 使用 Apache Airflow 作業,您可以開發自己的私人套件,以新增自定義的 Apache Airflow 運算元、鉤子、感測器、外掛程式等等。

在本教學課程中,您將建置簡單的自定義運算符作為 Python 套件、將它新增為 Apache Airflow 作業中的需求,並在 DAG 檔案中將私人套件匯入為模組。

使用 Apache Airflow DAG 開發自訂運算子並進行測試

  1. 建立名為 sample_operator.py 的檔案,並將其轉換成私人套件。 如果您需要協助,請參閱本指南: 在 Python 中建立套件

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. 接下來,建立名為 sample_dag.py 的 Apache Airflow DAG 檔案,以測試您在第一個步驟中所做的作員。

    from datetime import datetime
    from airflow import DAG
    
     # Import from private package
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. 設置 GitHub 存放庫,其中包含位於Dags資料夾內的sample_dag.py檔案以及您的私有套件檔案。 您可以使用 zip.whltar.gz 等格式。 將檔案放在 『Dags』 或 'Plugins' 資料夾中,無論哪一個最適合。 將您的 Git 存放庫連線到 Apache Airflow 作業,或嘗試 Install-Private-Package 的現成範例。

將套件新增為需求

Airflow requirements 底下,使用 /opt/airflow/git/<repoName>/<pathToPrivatePackage> 格式新增套件

例如,如果您的私人套件位於 /dags/test/private.whl GitHub 存放庫中,只要將 新增 /opt/airflow/git/<repoName>/dags/test/private.whl 至您的 Airflow 環境即可。

螢幕擷取畫面,其中顯示已新增為需求之私人套件。

快速入門:建立 Apache Airflow 作業