注意
Apache Airflow 作業是由 Apache Airflow 提供電源。
Python 套件可讓您將相關的 Python 模組組織成單一目錄階層。 套件通常會以目錄表示,其中包含稱為 init.py 的特殊檔案。 在套件目錄內,您可以有多個可定義函數、類別和變數的 Python 模組檔案 (.py 檔案)。 使用 Apache Airflow 作業,您可以開發自己的私人套件,以新增自定義的 Apache Airflow 運算元、鉤子、感測器、外掛程式等等。
在本教學課程中,您將建置簡單的自定義運算符作為 Python 套件、將它新增為 Apache Airflow 作業中的需求,並在 DAG 檔案中將私人套件匯入為模組。
使用 Apache Airflow DAG 開發自訂運算子並進行測試
建立名為
sample_operator.py
的檔案,並將其轉換成私人套件。 如果您需要協助,請參閱本指南: 在 Python 中建立套件from airflow.models.baseoperator import BaseOperator class SampleOperator(BaseOperator): def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" return message
接下來,建立名為
sample_dag.py
的 Apache Airflow DAG 檔案,以測試您在第一個步驟中所做的作員。from datetime import datetime from airflow import DAG # Import from private package from airflow_operator.sample_operator import SampleOperator with DAG( "test-custom-package", tags=["example"] description="A simple tutorial DAG", schedule_interval=None, start_date=datetime(2021, 1, 1), ) as dag: task = SampleOperator(task_id="sample-task", name="foo_bar") task
設置 GitHub 存放庫,其中包含位於
Dags
資料夾內的sample_dag.py
檔案以及您的私有套件檔案。 您可以使用zip
、.whl
或tar.gz
等格式。 將檔案放在 『Dags』 或 'Plugins' 資料夾中,無論哪一個最適合。 將您的 Git 存放庫連線到 Apache Airflow 作業,或嘗試 Install-Private-Package 的現成範例。
將套件新增為需求
在 Airflow requirements
底下,使用 /opt/airflow/git/<repoName>/<pathToPrivatePackage>
格式新增套件
例如,如果您的私人套件位於 /dags/test/private.whl
GitHub 存放庫中,只要將 新增 /opt/airflow/git/<repoName>/dags/test/private.whl
至您的 Airflow 環境即可。