次の方法で共有


Apache Airflow ジョブの要件としてプライベート パッケージをインストールする

Apache Airflow ジョブは、Apache Airflow を使用しています。

Python パッケージを使用すると、関連する Python モジュールを 1 つのディレクトリ階層に整理できます。 パッケージは通常、init.py という特殊なファイルを含むディレクトリとして表されます。 パッケージ ディレクトリ内には、関数、クラス、変数を定義した複数の Python モジュール ファイル (.py ファイル) を含めることができます。 Apache エアフロー ジョブを使用すると、独自のプライベート パッケージを開発して、カスタム Apache エアフローオペレーター、フック、センサー、プラグインなどを追加できます。

このチュートリアルでは、単純なカスタム 演算子を Python パッケージとして構築し、Apache エアフロー ジョブの要件として追加し、DAG ファイルにモジュールとしてプライベート パッケージをインポートします。

Apache Airflow Dag を使用してカスタム演算子を開発し、テストする

  1. sample_operator.pyという名前のファイルを作成し、プライベート パッケージに変換します。 ヘルプが必要な場合は、次のガイドを参照してください。 Python でのパッケージの作成

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. 次に、最初の手順で行ったオペレーターをテストするために、 sample_dag.py という Apache エアフロー DAG ファイルを作成します。

    from datetime import datetime
    from airflow import DAG
    
     # Import from private package
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. プライベート パッケージ ファイルと共に、 sample_dag.py ファイル Dags フォルダーに GitHub リポジトリを設定します。 zip.whltar.gzなどの形式を使用できます。 ファイルを 'Dags' フォルダーまたは 'Plugins' フォルダーのいずれかに配置します。どちらが最適か。 Git リポジトリを Apache エアフロー ジョブに接続するか、 Install-Private-Package で既製の例を試してください。

パッケージを要件として追加する

形式を使用してパッケージを Airflow requirements の下に追加する /opt/airflow/git/<repoName>/<pathToPrivatePackage>

たとえば、プライベート パッケージが GitHub リポジトリの /dags/test/private.whl にある場合は、 /opt/airflow/git/<repoName>/dags/test/private.whl をエアフロー環境に追加するだけです。

要件として追加されたプライベート パッケージを示すスクリーンショット。

クイック スタート: Apache Airflow ジョブを作成する