注
Apache Airflow ジョブは、Apache Airflow を使用しています。
Python パッケージを使用すると、関連する Python モジュールを 1 つのディレクトリ階層に整理できます。 パッケージは通常、init.py という特殊なファイルを含むディレクトリとして表されます。 パッケージ ディレクトリ内には、関数、クラス、変数を定義した複数の Python モジュール ファイル (.py ファイル) を含めることができます。 Apache エアフロー ジョブを使用すると、独自のプライベート パッケージを開発して、カスタム Apache エアフローオペレーター、フック、センサー、プラグインなどを追加できます。
このチュートリアルでは、単純なカスタム 演算子を Python パッケージとして構築し、Apache エアフロー ジョブの要件として追加し、DAG ファイルにモジュールとしてプライベート パッケージをインポートします。
Apache Airflow Dag を使用してカスタム演算子を開発し、テストする
sample_operator.py
という名前のファイルを作成し、プライベート パッケージに変換します。 ヘルプが必要な場合は、次のガイドを参照してください。 Python でのパッケージの作成from airflow.models.baseoperator import BaseOperator class SampleOperator(BaseOperator): def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" return message
次に、最初の手順で行ったオペレーターをテストするために、
sample_dag.py
という Apache エアフロー DAG ファイルを作成します。from datetime import datetime from airflow import DAG # Import from private package from airflow_operator.sample_operator import SampleOperator with DAG( "test-custom-package", tags=["example"] description="A simple tutorial DAG", schedule_interval=None, start_date=datetime(2021, 1, 1), ) as dag: task = SampleOperator(task_id="sample-task", name="foo_bar") task
プライベート パッケージ ファイルと共に、
sample_dag.py
ファイルDags
フォルダーに GitHub リポジトリを設定します。zip
、.whl
、tar.gz
などの形式を使用できます。 ファイルを 'Dags' フォルダーまたは 'Plugins' フォルダーのいずれかに配置します。どちらが最適か。 Git リポジトリを Apache エアフロー ジョブに接続するか、 Install-Private-Package で既製の例を試してください。
パッケージを要件として追加する
形式を使用してパッケージを Airflow requirements
の下に追加する /opt/airflow/git/<repoName>/<pathToPrivatePackage>
たとえば、プライベート パッケージが GitHub リポジトリの /dags/test/private.whl
にある場合は、 /opt/airflow/git/<repoName>/dags/test/private.whl
をエアフロー環境に追加するだけです。